Commit 1991056b authored by AlexStocks's avatar AlexStocks

Imp: task pool worker_number/chan_number is M:N

parent 79f3f666
......@@ -3,6 +3,11 @@ package getty
import (
"fmt"
"sync"
"sync/atomic"
)
const (
defaultTaskQNumber = 10
)
// task t
......@@ -13,10 +18,11 @@ type task struct {
// task pool: manage task ts
type taskPool struct {
qLen int32 // task queue length
size int32 // task queue pool size
Q chan task
wg sync.WaitGroup
idx uint32
qLen int32 // task queue length
size int32 // task queue pool size
qArray []chan task
wg sync.WaitGroup
once sync.Once
done chan struct{}
......@@ -24,12 +30,18 @@ type taskPool struct {
// build a task pool
func newTaskPool(poolSize int32, taskQLen int32) *taskPool {
return &taskPool{
size: poolSize,
qLen: taskQLen,
Q: make(chan task, taskQLen),
done: make(chan struct{}),
p := &taskPool{
size: poolSize,
qLen: taskQLen,
qArray: make([]chan task, defaultTaskQNumber),
done: make(chan struct{}),
}
for i := 0; i < defaultTaskQNumber; i++ {
p.qArray[i] = make(chan task, taskQLen)
}
return p
}
// start task pool
......@@ -44,13 +56,14 @@ func (p *taskPool) start() {
for i := int32(0); i < p.size; i++ {
p.wg.Add(1)
taskID := i
go p.run(int(taskID))
workerID := i
q := p.qArray[workerID%defaultTaskQNumber]
go p.run(int(workerID), q)
}
}
// worker
func (p *taskPool) run(id int) {
func (p *taskPool) run(id int, q chan task) {
defer p.wg.Done()
var (
......@@ -61,15 +74,15 @@ func (p *taskPool) run(id int) {
for {
select {
case <-p.done:
if 0 < len(p.Q) {
if 0 < len(q) {
log.Warn("[getty][task_pool] task worker %d exit now while its task buffer length %d is greater than 0",
id, len(p.Q))
id, len(q))
} else {
log.Info("[getty][task_pool] task worker %d exit now", id)
}
return
case t, ok = <-p.Q:
case t, ok = <-q:
if ok {
t.session.listener.OnMessage(t.session, t.pkg)
}
......@@ -79,10 +92,12 @@ func (p *taskPool) run(id int) {
// add task
func (p *taskPool) AddTask(t task) {
id := atomic.AddUint32(&p.idx, 1) % defaultTaskQNumber
select {
case <-p.done:
return
case p.Q <- t:
case p.qArray[id] <- t:
}
}
......@@ -112,5 +127,7 @@ func (p *taskPool) isClosed() bool {
func (p *taskPool) close() {
p.stop()
p.wg.Wait()
close(p.Q)
for i := range p.qArray {
close(p.qArray[i])
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment