Commit a9e318f8 authored by AlexStocks's avatar AlexStocks

Imp: move task pool from session to client/server

parent c4d06e2a
......@@ -51,6 +51,9 @@ type client struct {
newSession NewSessionCallback
ssMap map[Session]struct{}
// task queue pool
tQPool *taskPool
sync.Once
done chan struct{}
wg sync.WaitGroup
......@@ -76,6 +79,14 @@ func newClient(t EndPointType, opts ...ClientOption) *client {
c.ssMap = make(map[Session]struct{}, c.number)
if c.tQPoolSize > 0 {
qLen := c.tQLen
if qLen == 0 {
qLen = defaultTaskQLen
}
c.tQPool = newTaskPool(c.tQPoolSize, qLen)
}
return c
}
......@@ -350,7 +361,7 @@ func (c *client) connect() {
}
err = c.newSession(ss)
if err == nil {
// ss.RunEventLoop()
ss.(*session).SetTaskPool(c.tQPool)
ss.(*session).run()
c.Lock()
if c.ssMap == nil {
......@@ -421,6 +432,11 @@ func (c *client) stop() {
s.Close()
}
c.ssMap = nil
if c.tQPool != nil {
c.tQPool.close()
c.tQPool = nil
}
c.Unlock()
})
}
......
......@@ -153,8 +153,6 @@ type Session interface {
SetCronPeriod(int)
SetRQLen(int)
SetWQLen(int)
SetTaskPoolSize(int)
SetTaskQueueLength(int)
SetWaitTime(time.Duration)
GetAttribute(interface{}) interface{}
......
......@@ -14,11 +14,19 @@ package getty
// Server Options
/////////////////////////////////////////
const (
defaultTaskQLen = 128
)
type ServerOption func(*ServerOptions)
type ServerOptions struct {
addr string
// task pool
tQLen int32
tQPoolSize int32
// websocket
path string
cert string
......@@ -33,6 +41,20 @@ func WithLocalAddress(addr string) ServerOption {
}
}
// @size is the task queue pool size
func WithServerTaskPoolSize(size int32) ServerOption {
return func(o *ServerOptions) {
o.tQPoolSize = size
}
}
// @length is the task queue length
func WithServerTaskQueueLength(length int32) ServerOption {
return func(o *ServerOptions) {
o.tQLen = length
}
}
// @path: websocket request url path
func WithWebsocketServerPath(path string) ServerOption {
return func(o *ServerOptions) {
......@@ -72,6 +94,10 @@ type ClientOptions struct {
number int
reconnectInterval int // reConnect Interval
// task pool
tQLen int32
tQPoolSize int32
// the cert file of wss server which may contain server domain, server ip, the starting effective date, effective
// duration, the hash alg, the len of the private key.
// wss client will use it.
......@@ -99,6 +125,20 @@ func WithConnectionNumber(num int) ClientOption {
}
}
// @size is the task queue pool size
func WithClientTaskPoolSize(size int32) ClientOption {
return func(o *ClientOptions) {
o.tQPoolSize = size
}
}
// @length is the task queue length
func WithClientTaskQueueLength(length int32) ClientOption {
return func(o *ClientOptions) {
o.tQLen = length
}
}
// @cert is client certificate file. it can be empty.
func WithRootCertificateFile(cert string) ClientOption {
return func(o *ClientOptions) {
......
......@@ -41,6 +41,9 @@ type server struct {
endPointType EndPointType
server *http.Server // for ws or wss server
// task queue pool
tQPool *taskPool
sync.Once
done chan struct{}
wg sync.WaitGroup
......@@ -64,6 +67,14 @@ func newServer(t EndPointType, opts ...ServerOption) *server {
panic(fmt.Sprintf("@addr:%s", s.addr))
}
if s.tQPoolSize > 0 {
qLen := s.tQLen
if qLen == 0 {
qLen = defaultTaskQLen
}
s.tQPool = newTaskPool(s.tQPoolSize, qLen)
}
return s
}
......@@ -130,6 +141,10 @@ func (s *server) stop() {
s.pktListener.Close()
s.pktListener = nil
}
if s.tQPool != nil {
s.tQPool.close()
s.tQPool = nil
}
})
}
}
......@@ -252,7 +267,7 @@ func (s *server) runTcpEventLoop(newSession NewSessionCallback) {
continue
}
delay = 0
// client.RunEventLoop()
client.(*session).SetTaskPool(s.tQPool)
client.(*session).run()
}
}()
......@@ -267,6 +282,7 @@ func (s *server) runUDPEventLoop(newSession NewSessionCallback) {
if err := newSession(ss); err != nil {
panic(err.Error())
}
ss.(*session).SetTaskPool(s.tQPool)
ss.(*session).run()
}
......@@ -323,7 +339,7 @@ func (s *wsHandler) serveWSRequest(w http.ResponseWriter, r *http.Request) {
if ss.(*session).maxMsgLen > 0 {
conn.SetReadLimit(int64(ss.(*session).maxMsgLen))
}
// ss.RunEventLoop()
ss.(*session).SetTaskPool(s.server.tQPool)
ss.(*session).run()
}
......
......@@ -29,7 +29,7 @@ const (
netIOTimeout = 1e9 // 1s
period = 60 * 1e9 // 1 minute
pendingDuration = 3e9
defaultTaskQLen = 128
defaultQLen = 1024
defaultSessionName = "session"
defaultTCPSessionName = "tcp-session"
defaultUDPSessionName = "udp-session"
......@@ -63,9 +63,7 @@ type session struct {
// handle logic
maxMsgLen int32
// task queue
tQLen int32
tQPoolSize int32
tQPool *taskPool
tQPool *taskPool
// heartbeat
period time.Duration
......@@ -223,6 +221,7 @@ func (s *session) SetName(name string) {
s.name = name
}
// set EventListener
func (s *session) SetEventListener(listener EventListener) {
s.lock.Lock()
......@@ -302,22 +301,12 @@ func (s *session) SetWaitTime(waitTime time.Duration) {
s.wait = waitTime
}
// set task pool size
func (s *session) SetTaskPoolSize(poolSize int) {
if poolSize < 1 {
panic("@poolSize < 1")
}
atomic.StoreInt32(&s.tQPoolSize, int32(poolSize))
}
// set task queue length
func (s *session) SetTaskQueueLength(taskQueueLen int) {
if taskQueueLen < 1 {
panic("@taskQueueLen < 1")
}
// set task pool
func (s *session) SetTaskPool(p *taskPool) {
s.lock.Lock()
defer s.lock.Unlock()
atomic.StoreInt32(&s.tQLen, int32(taskQueueLen))
s.tQPool = p
}
// set attribute of key @session:key
......@@ -457,26 +446,19 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error {
// func (s *session) RunEventLoop() {
func (s *session) run() {
if s.wQ == nil {
errStr := fmt.Sprintf("session{name:%s, wQ:%#v}",
s.name, s.wQ)
if s.Connection == nil || s.listener == nil || s.writer == nil {
errStr := fmt.Sprintf("session{name:%s, conn:%#v, listener:%#v, writer:%#v}",
s.name, s.Connection, s.listener, s.writer)
log.Error(errStr)
panic(errStr)
}
tQPoolSize := atomic.LoadInt32(&s.tQPoolSize)
if s.rQ == nil && tQPoolSize == 0 {
errStr := fmt.Sprintf("session{name:%s, rQ:%#v, tQPool:%#v}",
s.name, s.rQ, s.tQPool)
log.Error(errStr)
panic(errStr)
if s.wQ == nil {
s.wQ = make(chan interface{}, defaultQLen)
}
if s.Connection == nil || s.listener == nil || s.writer == nil {
errStr := fmt.Sprintf("session{name:%s, conn:%#v, listener:%#v, writer:%#v}",
s.name, s.Connection, s.listener, s.writer)
log.Error(errStr)
panic(errStr)
if s.rQ == nil && s.tQPool == nil {
s.rQ = make(chan interface{}, defaultQLen)
}
// call session opened
......@@ -486,11 +468,6 @@ func (s *session) run() {
return
}
// start task pool
if tQPoolSize != 0 {
s.tQPool = newTaskPool(s.tQPoolSize, atomic.LoadInt32(&s.tQLen))
}
// start read/write gr
atomic.AddInt32(&(s.grNum), 2)
go s.handleLoop()
......@@ -861,11 +838,11 @@ func (s *session) gc() {
s.lock.Lock()
if s.attrs != nil {
s.attrs = nil
close(s.wQ)
s.wQ = nil
if s.tQPool != nil {
s.tQPool.close()
} else {
if s.wQ != nil {
close(s.wQ)
s.wQ = nil
}
if s.rQ != nil {
close(s.rQ)
s.rQ = nil
}
......
......@@ -35,9 +35,9 @@ type task struct {
// task pool: manage task ts
type taskPool struct {
idx int32
qLen int32 // task t queue length
size int32 // pool size
idx uint32
qLen int32 // task queue length
size int32 // task queue pool size
qArray []chan task
wg sync.WaitGroup
......@@ -100,7 +100,7 @@ func (p *taskPool) run(id int) {
// add task
func (p *taskPool) AddTask(t task) {
//id := randID() % uint64(p.size)
id := atomic.AddInt32(&p.idx, 1) % p.size
id := atomic.AddUint32(&p.idx, 1) % uint32(p.size)
select {
case <-p.done:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment