Commit c4d06e2a authored by AlexStocks's avatar AlexStocks

Add: task worker pool

parent 8b0e100a
...@@ -14,6 +14,10 @@ ...@@ -14,6 +14,10 @@
## develop history ## ## develop history ##
--- ---
- 2019/06/08
> Improvement
* add task worker pool
- 2019/06/07 - 2019/06/07
> Improvement > Improvement
* use time.After instead of wheel.After * use time.After instead of wheel.After
......
...@@ -13,7 +13,9 @@ import ( ...@@ -13,7 +13,9 @@ import (
"compress/flate" "compress/flate"
"net" "net"
"time" "time"
)
import (
perrors "github.com/pkg/errors" perrors "github.com/pkg/errors"
) )
...@@ -151,6 +153,8 @@ type Session interface { ...@@ -151,6 +153,8 @@ type Session interface {
SetCronPeriod(int) SetCronPeriod(int)
SetRQLen(int) SetRQLen(int)
SetWQLen(int) SetWQLen(int)
SetTaskPoolSize(int)
SetTaskQueueLength(int)
SetWaitTime(time.Duration) SetWaitTime(time.Duration)
GetAttribute(interface{}) interface{} GetAttribute(interface{}) interface{}
......
...@@ -29,6 +29,7 @@ const ( ...@@ -29,6 +29,7 @@ const (
netIOTimeout = 1e9 // 1s netIOTimeout = 1e9 // 1s
period = 60 * 1e9 // 1 minute period = 60 * 1e9 // 1 minute
pendingDuration = 3e9 pendingDuration = 3e9
defaultTaskQLen = 128
defaultSessionName = "session" defaultSessionName = "session"
defaultTCPSessionName = "tcp-session" defaultTCPSessionName = "tcp-session"
defaultUDPSessionName = "udp-session" defaultUDPSessionName = "udp-session"
...@@ -43,26 +44,40 @@ const ( ...@@ -43,26 +44,40 @@ const (
// getty base session // getty base session
type session struct { type session struct {
name string name string
endPoint EndPoint endPoint EndPoint
maxMsgLen int32
// net read Write // net read Write
Connection Connection
// pkgHandler ReadWriter
reader Reader // @reader should be nil when @conn is a gettyWSConn object.
writer Writer
listener EventListener listener EventListener
once sync.Once
done chan struct{}
// errFlag bool
// codec
reader Reader // @reader should be nil when @conn is a gettyWSConn object.
writer Writer
// read & write
rQ chan interface{}
wQ chan interface{}
// handle logic
maxMsgLen int32
// task queue
tQLen int32
tQPoolSize int32
tQPool *taskPool
// heartbeat
period time.Duration period time.Duration
wait time.Duration
rQ chan interface{} // done
wQ chan interface{} wait time.Duration
once sync.Once
done chan struct{}
// attribute // attribute
attrs *ValuesContext attrs *ValuesContext
// goroutines sync // goroutines sync
grNum int32 grNum int32
lock sync.RWMutex lock sync.RWMutex
...@@ -70,14 +85,19 @@ type session struct { ...@@ -70,14 +85,19 @@ type session struct {
func newSession(endPoint EndPoint, conn Connection) *session { func newSession(endPoint EndPoint, conn Connection) *session {
ss := &session{ ss := &session{
name: defaultSessionName, name: defaultSessionName,
endPoint: endPoint, endPoint: endPoint,
maxMsgLen: maxReadBufLen,
Connection: conn, Connection: conn,
done: make(chan struct{}),
period: period, maxMsgLen: maxReadBufLen,
wait: pendingDuration, tQLen: defaultTaskQLen,
attrs: NewValuesContext(nil),
period: period,
done: make(chan struct{}),
wait: pendingDuration,
attrs: NewValuesContext(nil),
} }
ss.Connection.setSession(ss) ss.Connection.setSession(ss)
...@@ -115,7 +135,6 @@ func (s *session) Reset() { ...@@ -115,7 +135,6 @@ func (s *session) Reset() {
s.name = defaultSessionName s.name = defaultSessionName
s.once = sync.Once{} s.once = sync.Once{}
s.done = make(chan struct{}) s.done = make(chan struct{})
// s.errFlag = false
s.period = period s.period = period
s.wait = pendingDuration s.wait = pendingDuration
s.attrs = NewValuesContext(nil) s.attrs = NewValuesContext(nil)
...@@ -190,30 +209,50 @@ func (s *session) IsClosed() bool { ...@@ -190,30 +209,50 @@ func (s *session) IsClosed() bool {
} }
// set maximum pacakge length of every pacakge in (EventListener)OnMessage(@pkgs) // set maximum pacakge length of every pacakge in (EventListener)OnMessage(@pkgs)
func (s *session) SetMaxMsgLen(length int) { s.maxMsgLen = int32(length) } func (s *session) SetMaxMsgLen(length int) {
s.lock.Lock()
defer s.lock.Unlock()
s.maxMsgLen = int32(length)
}
// set session name // set session name
func (s *session) SetName(name string) { s.name = name } func (s *session) SetName(name string) {
s.lock.Lock()
defer s.lock.Unlock()
s.name = name
}
// set EventListener // set EventListener
func (s *session) SetEventListener(listener EventListener) { func (s *session) SetEventListener(listener EventListener) {
s.lock.Lock()
defer s.lock.Unlock()
s.listener = listener s.listener = listener
} }
// set package handler // set package handler
func (s *session) SetPkgHandler(handler ReadWriter) { func (s *session) SetPkgHandler(handler ReadWriter) {
s.lock.Lock()
defer s.lock.Unlock()
s.reader = handler s.reader = handler
s.writer = handler s.writer = handler
// s.pkgHandler = handler
} }
// set Reader // set Reader
func (s *session) SetReader(reader Reader) { func (s *session) SetReader(reader Reader) {
s.lock.Lock()
defer s.lock.Unlock()
s.reader = reader s.reader = reader
} }
// set Writer // set Writer
func (s *session) SetWriter(writer Writer) { func (s *session) SetWriter(writer Writer) {
s.lock.Lock()
defer s.lock.Unlock()
s.writer = writer s.writer = writer
} }
...@@ -224,8 +263,8 @@ func (s *session) SetCronPeriod(period int) { ...@@ -224,8 +263,8 @@ func (s *session) SetCronPeriod(period int) {
} }
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock()
s.period = time.Duration(period) * time.Millisecond s.period = time.Duration(period) * time.Millisecond
s.lock.Unlock()
} }
// set @session's read queue size // set @session's read queue size
...@@ -235,9 +274,9 @@ func (s *session) SetRQLen(readQLen int) { ...@@ -235,9 +274,9 @@ func (s *session) SetRQLen(readQLen int) {
} }
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock()
s.rQ = make(chan interface{}, readQLen) s.rQ = make(chan interface{}, readQLen)
s.lock.Unlock() log.Debug("%s, [session.SetRQLen] rQ{len:%d, cap:%d}", s.Stat(), len(s.rQ), cap(s.rQ))
log.Debugf("%s, [session.SetRQLen] rQ{len:%d, cap:%d}", s.Stat(), len(s.rQ), cap(s.rQ))
} }
// set @session's Write queue size // set @session's Write queue size
...@@ -247,9 +286,9 @@ func (s *session) SetWQLen(writeQLen int) { ...@@ -247,9 +286,9 @@ func (s *session) SetWQLen(writeQLen int) {
} }
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock()
s.wQ = make(chan interface{}, writeQLen) s.wQ = make(chan interface{}, writeQLen)
s.lock.Unlock() log.Debug("%s, [session.SetWQLen] wQ{len:%d, cap:%d}", s.Stat(), len(s.wQ), cap(s.wQ))
log.Debugf("%s, [session.SetWQLen] wQ{len:%d, cap:%d}", s.Stat(), len(s.wQ), cap(s.wQ))
} }
// set maximum wait time when session got error or got exit signal // set maximum wait time when session got error or got exit signal
...@@ -259,8 +298,26 @@ func (s *session) SetWaitTime(waitTime time.Duration) { ...@@ -259,8 +298,26 @@ func (s *session) SetWaitTime(waitTime time.Duration) {
} }
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock()
s.wait = waitTime s.wait = waitTime
s.lock.Unlock() }
// set task pool size
func (s *session) SetTaskPoolSize(poolSize int) {
if poolSize < 1 {
panic("@poolSize < 1")
}
atomic.StoreInt32(&s.tQPoolSize, int32(poolSize))
}
// set task queue length
func (s *session) SetTaskQueueLength(taskQueueLen int) {
if taskQueueLen < 1 {
panic("@taskQueueLen < 1")
}
atomic.StoreInt32(&s.tQLen, int32(taskQueueLen))
} }
// set attribute of key @session:key // set attribute of key @session:key
...@@ -400,12 +457,21 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error { ...@@ -400,12 +457,21 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error {
// func (s *session) RunEventLoop() { // func (s *session) RunEventLoop() {
func (s *session) run() { func (s *session) run() {
if s.rQ == nil || s.wQ == nil { if s.wQ == nil {
errStr := fmt.Sprintf("session{name:%s, rQ:%#v, wQ:%#v}", errStr := fmt.Sprintf("session{name:%s, wQ:%#v}",
s.name, s.rQ, s.wQ) s.name, s.wQ)
log.Error(errStr) log.Error(errStr)
panic(errStr) panic(errStr)
} }
tQPoolSize := atomic.LoadInt32(&s.tQPoolSize)
if s.rQ == nil && tQPoolSize == 0 {
errStr := fmt.Sprintf("session{name:%s, rQ:%#v, tQPool:%#v}",
s.name, s.rQ, s.tQPool)
log.Error(errStr)
panic(errStr)
}
if s.Connection == nil || s.listener == nil || s.writer == nil { if s.Connection == nil || s.listener == nil || s.writer == nil {
errStr := fmt.Sprintf("session{name:%s, conn:%#v, listener:%#v, writer:%#v}", errStr := fmt.Sprintf("session{name:%s, conn:%#v, listener:%#v, writer:%#v}",
s.name, s.Connection, s.listener, s.writer) s.name, s.Connection, s.listener, s.writer)
...@@ -420,6 +486,12 @@ func (s *session) run() { ...@@ -420,6 +486,12 @@ func (s *session) run() {
return return
} }
// start task pool
if tQPoolSize != 0 {
s.tQPool = newTaskPool(s.tQPoolSize, atomic.LoadInt32(&s.tQLen))
}
// start read/write gr
atomic.AddInt32(&(s.grNum), 2) atomic.AddInt32(&(s.grNum), 2)
go s.handleLoop() go s.handleLoop()
go s.handlePackage() go s.handlePackage()
...@@ -448,10 +520,8 @@ func (s *session) handleLoop() { ...@@ -448,10 +520,8 @@ func (s *session) handleLoop() {
} }
grNum = atomic.AddInt32(&(s.grNum), -1) grNum = atomic.AddInt32(&(s.grNum), -1)
// if !s.errFlag {
s.listener.OnClose(s) s.listener.OnClose(s)
// } log.Info("%s, [session.handleLoop] goroutine exit now, left gr num %d", s.Stat(), grNum)
log.Infof("%s, [session.handleLoop] goroutine exit now, left gr num %d", s.Stat(), grNum)
s.gc() s.gc()
}() }()
...@@ -516,6 +586,14 @@ LOOP: ...@@ -516,6 +586,14 @@ LOOP:
} }
} }
func (s *session) addTask(pkg interface{}) {
if s.tQPool != nil {
s.tQPool.AddTask(task{session: s, pkg: pkg})
} else {
s.rQ <- pkg
}
}
func (s *session) handlePackage() { func (s *session) handlePackage() {
var ( var (
err error err error
...@@ -619,8 +697,6 @@ func (s *session) handleTCPPackage() error { ...@@ -619,8 +697,6 @@ func (s *session) handleTCPPackage() error {
if err != nil { if err != nil {
log.Warnf("%s, [session.handleTCPPackage] = len{%d}, error:%+v", log.Warnf("%s, [session.handleTCPPackage] = len{%d}, error:%+v",
s.sessionToken(), pkgLen, err) s.sessionToken(), pkgLen, err)
// for (Codec)OnErr
// s.errFlag = true
exit = true exit = true
break break
} }
...@@ -630,7 +706,7 @@ func (s *session) handleTCPPackage() error { ...@@ -630,7 +706,7 @@ func (s *session) handleTCPPackage() error {
} }
// handle case 3 // handle case 3
s.UpdateActive() s.UpdateActive()
s.rQ <- pkg s.addTask(pkg)
pktBuf.Next(pkgLen) pktBuf.Next(pkgLen)
// continue to handle case 4 // continue to handle case 4
} }
...@@ -705,7 +781,7 @@ func (s *session) handleUDPPackage() error { ...@@ -705,7 +781,7 @@ func (s *session) handleUDPPackage() error {
} }
s.UpdateActive() s.UpdateActive()
s.rQ <- UDPContext{Pkg: pkg, PeerAddr: addr} s.addTask(UDPContext{Pkg: pkg, PeerAddr: addr})
} }
return perrors.WithStack(err) return perrors.WithStack(err)
...@@ -749,9 +825,10 @@ func (s *session) handleWSPackage() error { ...@@ -749,9 +825,10 @@ func (s *session) handleWSPackage() error {
s.sessionToken(), length, err) s.sessionToken(), length, err)
continue continue
} }
s.rQ <- unmarshalPkg
s.addTask(unmarshalPkg)
} else { } else {
s.rQ <- pkg s.addTask(pkg)
} }
} }
...@@ -786,8 +863,12 @@ func (s *session) gc() { ...@@ -786,8 +863,12 @@ func (s *session) gc() {
s.attrs = nil s.attrs = nil
close(s.wQ) close(s.wQ)
s.wQ = nil s.wQ = nil
close(s.rQ) if s.tQPool != nil {
s.rQ = nil s.tQPool.close()
} else {
close(s.rQ)
s.rQ = nil
}
s.Connection.close((int)((int64)(s.wait))) s.Connection.close((int)((int64)(s.wait)))
} }
s.lock.Unlock() s.lock.Unlock()
...@@ -797,5 +878,6 @@ func (s *session) gc() { ...@@ -797,5 +878,6 @@ func (s *session) gc() {
// or (session)handleLoop automatically. It's thread safe. // or (session)handleLoop automatically. It's thread safe.
func (s *session) Close() { func (s *session) Close() {
s.stop() s.stop()
log.Infof("%s closed now. its current gr num is %d", s.sessionToken(), atomic.LoadInt32(&(s.grNum))) log.Info("%s closed now. its current gr num is %d",
s.sessionToken(), atomic.LoadInt32(&(s.grNum)))
} }
package getty
import (
"fmt"
"sync"
"sync/atomic"
)
import (
log "github.com/AlexStocks/log4go"
)
//func init() {
// rand.Seed(time.Now().UnixNano())
//}
//
//var (
// // The golang rand generators are *not* intrinsically thread-safe.
// randIDLock sync.Mutex
// randIDGen = rand.New(rand.NewSource(time.Now().UnixNano()))
//)
//
//func randID() uint64 {
// randIDLock.Lock()
// defer randIDLock.Unlock()
//
// return uint64(randIDGen.Int63())
//}
// task t
type task struct {
session *session
pkg interface{}
}
// task pool: manage task ts
type taskPool struct {
idx int32
qLen int32 // task t queue length
size int32 // pool size
qArray []chan task
wg sync.WaitGroup
once sync.Once
done chan struct{}
}
// build a task pool
func newTaskPool(poolSize int32, taskQLen int32) *taskPool {
return &taskPool{
size: poolSize,
qLen: taskQLen,
qArray: make([]chan task, poolSize),
done: make(chan struct{}),
}
}
// start task pool
func (p *taskPool) start() {
if p.size == 0 {
panic(fmt.Sprintf("[getty][task_pool] illegal pool size %d", p.size))
}
if p.qLen == 0 {
panic(fmt.Sprintf("[getty][task_pool] illegal t queue length %d", p.qLen))
}
for i := int32(0); i < p.size; i++ {
p.qArray[i] = make(chan task, p.qLen)
p.wg.Add(1)
taskID := i
p.run(int(taskID))
}
}
// task
func (p *taskPool) run(id int) {
defer p.wg.Done()
var (
ok bool
t task
)
for {
select {
case <-p.done:
log.Warn("[getty][task_pool] task %d exit now while its task length is %d",
id, len(p.qArray[id]))
return
case t, ok = <-p.qArray[id]:
if ok {
t.session.listener.OnMessage(t.session, t.pkg)
}
}
}
}
// add task
func (p *taskPool) AddTask(t task) {
//id := randID() % uint64(p.size)
id := atomic.AddInt32(&p.idx, 1) % p.size
select {
case <-p.done:
return
case p.qArray[id] <- t:
}
}
// stop all tasks
func (p *taskPool) stop() {
select {
case <-p.done:
return
default:
p.once.Do(func() {
close(p.done)
})
}
}
// check whether the session has been closed.
func (p *taskPool) isClosed() bool {
select {
case <-p.done:
return true
default:
return false
}
}
func (p *taskPool) close() {
p.stop()
p.wg.Wait()
for i := range p.qArray {
close(p.qArray[i])
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment