Commit 4b32c61e authored by AlexStocks's avatar AlexStocks

Imp: use just 1 task queue

parent f8a04498
...@@ -3,7 +3,6 @@ package getty ...@@ -3,7 +3,6 @@ package getty
import ( import (
"fmt" "fmt"
"sync" "sync"
"sync/atomic"
) )
import ( import (
...@@ -35,10 +34,9 @@ type task struct { ...@@ -35,10 +34,9 @@ type task struct {
// task pool: manage task ts // task pool: manage task ts
type taskPool struct { type taskPool struct {
idx uint32
qLen int32 // task queue length qLen int32 // task queue length
size int32 // task queue pool size size int32 // task queue pool size
qArray []chan task Q chan task
wg sync.WaitGroup wg sync.WaitGroup
once sync.Once once sync.Once
...@@ -50,7 +48,7 @@ func newTaskPool(poolSize int32, taskQLen int32) *taskPool { ...@@ -50,7 +48,7 @@ func newTaskPool(poolSize int32, taskQLen int32) *taskPool {
return &taskPool{ return &taskPool{
size: poolSize, size: poolSize,
qLen: taskQLen, qLen: taskQLen,
qArray: make([]chan task, poolSize), Q: make(chan task, taskQLen),
done: make(chan struct{}), done: make(chan struct{}),
} }
} }
...@@ -66,7 +64,6 @@ func (p *taskPool) start() { ...@@ -66,7 +64,6 @@ func (p *taskPool) start() {
} }
for i := int32(0); i < p.size; i++ { for i := int32(0); i < p.size; i++ {
p.qArray[i] = make(chan task, p.qLen)
p.wg.Add(1) p.wg.Add(1)
taskID := i taskID := i
go p.run(int(taskID)) go p.run(int(taskID))
...@@ -85,14 +82,14 @@ func (p *taskPool) run(id int) { ...@@ -85,14 +82,14 @@ func (p *taskPool) run(id int) {
for { for {
select { select {
case <-p.done: case <-p.done:
if 0 < len(p.qArray[id]) { if 0 < len(p.Q) {
log.Warn("[getty][task_pool] task %d exit now while its task length is %d greater than 0", log.Warn("[getty][task_pool] task %d exit now while its task length is %d greater than 0",
id, len(p.qArray[id])) id, len(p.Q))
} }
log.Info("[getty][task_pool] task %d exit now", id) log.Info("[getty][task_pool] task %d exit now", id)
return return
case t, ok = <-p.qArray[id]: case t, ok = <-p.Q:
if ok { if ok {
t.session.listener.OnMessage(t.session, t.pkg) t.session.listener.OnMessage(t.session, t.pkg)
} }
...@@ -102,13 +99,10 @@ func (p *taskPool) run(id int) { ...@@ -102,13 +99,10 @@ func (p *taskPool) run(id int) {
// add task // add task
func (p *taskPool) AddTask(t task) { func (p *taskPool) AddTask(t task) {
//id := randID() % uint64(p.size)
id := atomic.AddUint32(&p.idx, 1) % uint32(p.size)
select { select {
case <-p.done: case <-p.done:
return return
case p.qArray[id] <- t: case p.Q <- t:
} }
} }
...@@ -138,7 +132,5 @@ func (p *taskPool) isClosed() bool { ...@@ -138,7 +132,5 @@ func (p *taskPool) isClosed() bool {
func (p *taskPool) close() { func (p *taskPool) close() {
p.stop() p.stop()
p.wg.Wait() p.wg.Wait()
for i := range p.qArray { close(p.Q)
close(p.qArray[i])
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment