Commit ac39a3ba authored by AlexStocks's avatar AlexStocks

Imp: add task pool options

parent 1991056b
...@@ -80,11 +80,7 @@ func newClient(t EndPointType, opts ...ClientOption) *client { ...@@ -80,11 +80,7 @@ func newClient(t EndPointType, opts ...ClientOption) *client {
c.ssMap = make(map[Session]struct{}, c.number) c.ssMap = make(map[Session]struct{}, c.number)
if c.tQPoolSize > 0 { if c.tQPoolSize > 0 {
qLen := c.tQLen c.tQPool = newTaskPool(c.taskPoolOptions)
if qLen == 0 {
qLen = defaultTaskQLen
}
c.tQPool = newTaskPool(c.tQPoolSize, qLen)
} }
return c return c
......
...@@ -14,18 +14,13 @@ package getty ...@@ -14,18 +14,13 @@ package getty
// Server Options // Server Options
///////////////////////////////////////// /////////////////////////////////////////
const (
defaultTaskQLen = 128
)
type ServerOption func(*ServerOptions) type ServerOption func(*ServerOptions)
type ServerOptions struct { type ServerOptions struct {
addr string addr string
// task pool // task pool
tQLen int32 taskPoolOptions
tQPoolSize int32
// websocket // websocket
path string path string
...@@ -44,14 +39,21 @@ func WithLocalAddress(addr string) ServerOption { ...@@ -44,14 +39,21 @@ func WithLocalAddress(addr string) ServerOption {
// @size is the task queue pool size // @size is the task queue pool size
func WithServerTaskPoolSize(size int32) ServerOption { func WithServerTaskPoolSize(size int32) ServerOption {
return func(o *ServerOptions) { return func(o *ServerOptions) {
o.tQPoolSize = size o.taskPoolOptions.tQPoolSize = size
} }
} }
// @length is the task queue length // @length is the task queue length
func WithServerTaskQueueLength(length int32) ServerOption { func WithServerTaskQueueLength(length int32) ServerOption {
return func(o *ServerOptions) { return func(o *ServerOptions) {
o.tQLen = length o.taskPoolOptions.tQLen = length
}
}
// @number is the task queue number
func WithServerTaskQueueNumber(number int32) ServerOption {
return func(o *ServerOptions) {
o.taskPoolOptions.tQNumber = number
} }
} }
...@@ -95,8 +97,7 @@ type ClientOptions struct { ...@@ -95,8 +97,7 @@ type ClientOptions struct {
reconnectInterval int // reConnect Interval reconnectInterval int // reConnect Interval
// task pool // task pool
tQLen int32 taskPoolOptions
tQPoolSize int32
// the cert file of wss server which may contain server domain, server ip, the starting effective date, effective // the cert file of wss server which may contain server domain, server ip, the starting effective date, effective
// duration, the hash alg, the len of the private key. // duration, the hash alg, the len of the private key.
...@@ -128,14 +129,21 @@ func WithConnectionNumber(num int) ClientOption { ...@@ -128,14 +129,21 @@ func WithConnectionNumber(num int) ClientOption {
// @size is the task queue pool size // @size is the task queue pool size
func WithClientTaskPoolSize(size int32) ClientOption { func WithClientTaskPoolSize(size int32) ClientOption {
return func(o *ClientOptions) { return func(o *ClientOptions) {
o.tQPoolSize = size o.taskPoolOptions.tQPoolSize = size
} }
} }
// @length is the task queue length // @length is the task queue length
func WithClientTaskQueueLength(length int32) ClientOption { func WithClientTaskQueueLength(length int32) ClientOption {
return func(o *ClientOptions) { return func(o *ClientOptions) {
o.tQLen = length o.taskPoolOptions.tQLen = length
}
}
// @number is the task queue number
func WithClientTaskQueueNumber(number int32) ClientOption {
return func(o *ClientOptions) {
o.taskPoolOptions.tQNumber = number
} }
} }
......
...@@ -68,11 +68,7 @@ func newServer(t EndPointType, opts ...ServerOption) *server { ...@@ -68,11 +68,7 @@ func newServer(t EndPointType, opts ...ServerOption) *server {
} }
if s.tQPoolSize > 0 { if s.tQPoolSize > 0 {
qLen := s.tQLen s.tQPool = newTaskPool(s.taskPoolOptions)
if qLen == 0 {
qLen = defaultTaskQLen
}
s.tQPool = newTaskPool(s.tQPoolSize, qLen)
} }
return s return s
......
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
const ( const (
defaultTaskQNumber = 10 defaultTaskQNumber = 10
defaultTaskQLen = 128
) )
// task t // task t
...@@ -16,11 +17,35 @@ type task struct { ...@@ -16,11 +17,35 @@ type task struct {
pkg interface{} pkg interface{}
} }
type taskPoolOptions struct {
tQLen int32 // task queue length
tQNumber int32 // task queue number
tQPoolSize int32 // task pool size
}
func (o *taskPoolOptions) Validate() {
if o.tQPoolSize == 0 {
panic(fmt.Sprintf("[getty][task_pool] illegal pool size %d", o.tQPoolSize))
}
if o.tQLen == 0 {
o.tQLen = defaultTaskQLen
}
if o.tQNumber < 1 {
o.tQNumber = defaultTaskQNumber
}
if o.tQNumber > o.tQPoolSize {
o.tQNumber = o.tQPoolSize
}
}
// task pool: manage task ts // task pool: manage task ts
type taskPool struct { type taskPool struct {
idx uint32 taskPoolOptions
qLen int32 // task queue length
size int32 // task queue pool size idx uint32 // round robin index
qArray []chan task qArray []chan task
wg sync.WaitGroup wg sync.WaitGroup
...@@ -29,16 +54,17 @@ type taskPool struct { ...@@ -29,16 +54,17 @@ type taskPool struct {
} }
// build a task pool // build a task pool
func newTaskPool(poolSize int32, taskQLen int32) *taskPool { func newTaskPool(opts taskPoolOptions) *taskPool {
opts.Validate()
p := &taskPool{ p := &taskPool{
size: poolSize, taskPoolOptions: opts,
qLen: taskQLen, qArray: make([]chan task, opts.tQNumber),
qArray: make([]chan task, defaultTaskQNumber), done: make(chan struct{}),
done: make(chan struct{}),
} }
for i := 0; i < defaultTaskQNumber; i++ { for i := int32(0); i < p.tQNumber; i++ {
p.qArray[i] = make(chan task, taskQLen) p.qArray[i] = make(chan task, p.tQLen)
} }
return p return p
...@@ -46,18 +72,10 @@ func newTaskPool(poolSize int32, taskQLen int32) *taskPool { ...@@ -46,18 +72,10 @@ func newTaskPool(poolSize int32, taskQLen int32) *taskPool {
// start task pool // start task pool
func (p *taskPool) start() { func (p *taskPool) start() {
if p.size == 0 { for i := int32(0); i < p.tQPoolSize; i++ {
panic(fmt.Sprintf("[getty][task_pool] illegal pool size %d", p.size))
}
if p.qLen == 0 {
panic(fmt.Sprintf("[getty][task_pool] illegal t queue length %d", p.qLen))
}
for i := int32(0); i < p.size; i++ {
p.wg.Add(1) p.wg.Add(1)
workerID := i workerID := i
q := p.qArray[workerID%defaultTaskQNumber] q := p.qArray[workerID%p.tQNumber]
go p.run(int(workerID), q) go p.run(int(workerID), q)
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment