Commit b731b1f5 authored by watermelon's avatar watermelon

opt: Simplify the task pool

parent f041d150
...@@ -19,12 +19,9 @@ package gxsync ...@@ -19,12 +19,9 @@ package gxsync
import ( import (
"fmt" "fmt"
"log"
"math/rand"
"os" "os"
"runtime"
"runtime/debug" "runtime/debug"
"sync"
"sync/atomic"
"time" "time"
) )
...@@ -32,238 +29,71 @@ import ( ...@@ -32,238 +29,71 @@ import (
gxruntime "github.com/dubbogo/gost/runtime" gxruntime "github.com/dubbogo/gost/runtime"
) )
const ( // task t
defaultTaskQNumber = 10 type task func()
defaultTaskQLen = 128
)
/////////////////////////////////////////
// Task Pool Options
/////////////////////////////////////////
type TaskPoolOptions struct {
tQLen int // task queue length. buffer size per queue
tQNumber int // task queue number. number of queue
tQPoolSize int // task pool size. number of workers
}
func (o *TaskPoolOptions) validate() {
if o.tQPoolSize < 1 {
panic(fmt.Sprintf("illegal pool size %d", o.tQPoolSize))
}
if o.tQLen < 1 {
o.tQLen = defaultTaskQLen
}
if o.tQNumber < 1 {
o.tQNumber = defaultTaskQNumber
}
if o.tQNumber > o.tQPoolSize {
o.tQNumber = o.tQPoolSize
}
}
type TaskPoolOption func(*TaskPoolOptions)
// @size is the task queue pool size
func WithTaskPoolTaskPoolSize(size int) TaskPoolOption {
return func(o *TaskPoolOptions) {
o.tQPoolSize = size
}
}
// @length is the task queue length
func WithTaskPoolTaskQueueLength(length int) TaskPoolOption {
return func(o *TaskPoolOptions) {
o.tQLen = length
}
}
// @number is the task queue number
func WithTaskPoolTaskQueueNumber(number int) TaskPoolOption {
return func(o *TaskPoolOptions) {
o.tQNumber = number
}
}
///////////////////////////////////////// /////////////////////////////////////////
// Task Pool // Task Pool
///////////////////////////////////////// /////////////////////////////////////////
type TaskPool interface {
// task t AddTask(t task) bool
type task func() AddTaskAlways(t task) bool
// task pool: manage task ts
type TaskPool struct {
TaskPoolOptions
idx uint32 // round robin index
qArray []chan task
wg sync.WaitGroup
once sync.Once
done chan struct{}
} }
// build a task pool type taskPool struct {
func NewTaskPool(opts ...TaskPoolOption) *TaskPool { work chan task
var tOpts TaskPoolOptions sem chan struct{}
for _, opt := range opts {
opt(&tOpts)
}
tOpts.validate()
p := &TaskPool{
TaskPoolOptions: tOpts,
qArray: make([]chan task, tOpts.tQNumber),
done: make(chan struct{}),
}
for i := 0; i < p.tQNumber; i++ {
p.qArray[i] = make(chan task, p.tQLen)
}
p.start()
return p
} }
// start task pool func NewTaskPool(size int) TaskPool {
func (p *TaskPool) start() { if size < 1 {
for i := 0; i < p.tQPoolSize; i++ { size = runtime.NumCPU() * 100
p.wg.Add(1)
workerID := i
q := p.qArray[workerID%p.tQNumber]
p.safeRun(workerID, q)
} }
} return &taskPool{
work: make(chan task),
func (p *TaskPool) safeRun(workerID int, q chan task) { sem: make(chan struct{}, size),
gxruntime.GoSafely(nil, false,
func() {
err := p.run(int(workerID), q)
if err != nil {
// log error to stderr
log.Printf("gost/TaskPool.run error: %s", err.Error())
}
},
nil,
)
}
// worker
func (p *TaskPool) run(id int, q chan task) error {
defer p.wg.Done()
var (
ok bool
t task
)
for {
select {
case <-p.done:
if 0 < len(q) {
return fmt.Errorf("task worker %d exit now while its task buffer length %d is greater than 0",
id, len(q))
}
return nil
case t, ok = <-q:
if ok {
func() {
defer func() {
if r := recover(); r != nil {
fmt.Fprintf(os.Stderr, "%s goroutine panic: %v\n%s\n",
time.Now(), r, string(debug.Stack()))
}
}()
t()
}()
}
}
} }
} }
// AddTask wait idle worker add task func (p *taskPool) AddTask(t task) (ok bool) {
// return false when the pool is stop
func (p *TaskPool) AddTask(t task) (ok bool) {
idx := atomic.AddUint32(&p.idx, 1)
id := idx % uint32(p.tQNumber)
select { select {
case <-p.done: case p.work <- t:
return false case p.sem <- struct{}{}:
default: go p.worker(t)
p.qArray[id] <- t
return true
} }
return true
} }
// AddTaskAlways add task to queues or do it immediately func (p *taskPool) AddTaskAlways(t task) (ok bool) {
func (p *TaskPool) AddTaskAlways(t task) {
id := atomic.AddUint32(&p.idx, 1) % uint32(p.tQNumber)
select { select {
case p.qArray[id] <- t: case p.work <- t:
return return
default: default:
p.goSafely(t)
} }
}
// AddTaskBalance add task to idle queue
// do it immediately when no idle queue
func (p *TaskPool) AddTaskBalance(t task) {
length := len(p.qArray)
// try len/2 times to lookup idle queue
for i := 0; i < length/2; i++ {
select {
case p.qArray[rand.Intn(length)] <- t:
return
default:
continue
}
}
p.goSafely(t)
}
func (p *TaskPool) goSafely(fn func()) {
gxruntime.GoSafely(nil, false, fn, nil)
}
// stop all tasks
func (p *TaskPool) stop() {
select { select {
case <-p.done: case p.work <- t:
return case p.sem <- struct{}{}:
go p.worker(t)
default: default:
p.once.Do(func() { p.goSafely(t)
close(p.done)
})
} }
return true
} }
// check whether the session has been closed. func (p *taskPool) worker(t task) {
func (p *TaskPool) IsClosed() bool { defer func() {
select { if r := recover(); r != nil {
case <-p.done: fmt.Fprintf(os.Stderr, "%s goroutine panic: %v\n%s\n",
return true time.Now(), r, string(debug.Stack()))
}
default: <-p.sem
return false }()
for {
t()
t = <-p.work
} }
} }
func (p *TaskPool) Close() { func (p *taskPool) goSafely(fn func()) {
p.stop() gxruntime.GoSafely(nil, false, fn, nil)
p.wg.Wait()
for i := range p.qArray {
close(p.qArray[i])
}
} }
...@@ -37,11 +37,7 @@ func TestTaskPool(t *testing.T) { ...@@ -37,11 +37,7 @@ func TestTaskPool(t *testing.T) {
numCPU := runtime.NumCPU() numCPU := runtime.NumCPU()
taskCnt := int64(numCPU * numCPU * 100) taskCnt := int64(numCPU * numCPU * 100)
tp := NewTaskPool( tp := NewTaskPool(10)
WithTaskPoolTaskPoolSize(1),
WithTaskPoolTaskQueueNumber(1),
WithTaskPoolTaskQueueLength(1),
)
task, cnt := newCountTask() task, cnt := newCountTask()
...@@ -59,7 +55,6 @@ func TestTaskPool(t *testing.T) { ...@@ -59,7 +55,6 @@ func TestTaskPool(t *testing.T) {
}() }()
} }
wg.Wait() wg.Wait()
tp.Close()
if taskCnt != *cnt { if taskCnt != *cnt {
t.Error("want ", taskCnt, " got ", *cnt) t.Error("want ", taskCnt, " got ", *cnt)
...@@ -67,11 +62,7 @@ func TestTaskPool(t *testing.T) { ...@@ -67,11 +62,7 @@ func TestTaskPool(t *testing.T) {
} }
func BenchmarkTaskPool_CountTask(b *testing.B) { func BenchmarkTaskPool_CountTask(b *testing.B) {
tp := NewTaskPool( tp := NewTaskPool(runtime.NumCPU())
WithTaskPoolTaskPoolSize(runtime.NumCPU()),
WithTaskPoolTaskQueueNumber(runtime.NumCPU()),
//WithTaskPoolTaskQueueLength(runtime.NumCPU()),
)
b.Run(`AddTask`, func(b *testing.B) { b.Run(`AddTask`, func(b *testing.B) {
task, _ := newCountTask() task, _ := newCountTask()
...@@ -90,16 +81,6 @@ func BenchmarkTaskPool_CountTask(b *testing.B) { ...@@ -90,16 +81,6 @@ func BenchmarkTaskPool_CountTask(b *testing.B) {
} }
}) })
}) })
b.Run(`AddTaskBalance`, func(b *testing.B) {
task, _ := newCountTask()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
tp.AddTaskBalance(task)
}
})
})
} }
func fib(n int) int { func fib(n int) int {
...@@ -111,11 +92,7 @@ func fib(n int) int { ...@@ -111,11 +92,7 @@ func fib(n int) int {
// cpu-intensive task // cpu-intensive task
func BenchmarkTaskPool_CPUTask(b *testing.B) { func BenchmarkTaskPool_CPUTask(b *testing.B) {
tp := NewTaskPool( tp := NewTaskPool(runtime.NumCPU())
WithTaskPoolTaskPoolSize(runtime.NumCPU()),
WithTaskPoolTaskQueueNumber(runtime.NumCPU()),
//WithTaskPoolTaskQueueLength(runtime.NumCPU()),
)
newCPUTask := func() (func(), *int64) { newCPUTask := func() (func(), *int64) {
var cnt int64 var cnt int64
...@@ -150,25 +127,11 @@ func BenchmarkTaskPool_CPUTask(b *testing.B) { ...@@ -150,25 +127,11 @@ func BenchmarkTaskPool_CPUTask(b *testing.B) {
} }
}) })
}) })
b.Run(`AddTaskBalance`, func(b *testing.B) {
task, _ := newCPUTask()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
tp.AddTaskBalance(task)
}
})
})
} }
// IO-intensive task // IO-intensive task
func BenchmarkTaskPool_IOTask(b *testing.B) { func BenchmarkTaskPool_IOTask(b *testing.B) {
tp := NewTaskPool( tp := NewTaskPool(runtime.NumCPU())
WithTaskPoolTaskPoolSize(runtime.NumCPU()),
WithTaskPoolTaskQueueNumber(runtime.NumCPU()),
//WithTaskPoolTaskQueueLength(runtime.NumCPU()),
)
newIOTask := func() (func(), *int64) { newIOTask := func() (func(), *int64) {
var cnt int64 var cnt int64
...@@ -194,23 +157,10 @@ func BenchmarkTaskPool_IOTask(b *testing.B) { ...@@ -194,23 +157,10 @@ func BenchmarkTaskPool_IOTask(b *testing.B) {
} }
}) })
}) })
b.Run(`AddTaskBalance`, func(b *testing.B) {
task, _ := newIOTask()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
tp.AddTaskBalance(task)
}
})
})
} }
func BenchmarkTaskPool_RandomTask(b *testing.B) { func BenchmarkTaskPool_RandomTask(b *testing.B) {
tp := NewTaskPool( tp := NewTaskPool(runtime.NumCPU())
WithTaskPoolTaskPoolSize(runtime.NumCPU()),
WithTaskPoolTaskQueueNumber(runtime.NumCPU()),
//WithTaskPoolTaskQueueLength(runtime.NumCPU()),
)
newRandomTask := func() (func(), *int64) { newRandomTask := func() (func(), *int64) {
c := rand.Intn(4) c := rand.Intn(4)
...@@ -240,33 +190,20 @@ func BenchmarkTaskPool_RandomTask(b *testing.B) { ...@@ -240,33 +190,20 @@ func BenchmarkTaskPool_RandomTask(b *testing.B) {
} }
}) })
}) })
b.Run(`AddTaskBalance`, func(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
task, _ := newRandomTask()
tp.AddTaskBalance(task)
}
})
})
} }
/* /*
goos: darwin
goarch: amd64
pkg: github.com/dubbogo/gost/sync pkg: github.com/dubbogo/gost/sync
BenchmarkTaskPool_CountTask/AddTask-8 2872177 380 ns/op 0 B/op 0 allocs/op BenchmarkTaskPool_CountTask/AddTask-8 1724671 655 ns/op 0 B/op 0 allocs/op
BenchmarkTaskPool_CountTask/AddTaskAlways-8 2769730 455 ns/op 1 B/op 0 allocs/op BenchmarkTaskPool_CountTask/AddTaskAlways-8 3102237 339 ns/op 0 B/op 0 allocs/op
BenchmarkTaskPool_CountTask/AddTaskBalance-8 4630167 248 ns/op 0 B/op 0 allocs/op BenchmarkTaskPool_CPUTask/fib-8 75745 16507 ns/op 0 B/op 0 allocs/op
BenchmarkTaskPool_CPUTask/fib-8 73975 16524 ns/op 0 B/op 0 allocs/op BenchmarkTaskPool_CPUTask/AddTask-8 65875 18167 ns/op 0 B/op 0 allocs/op
BenchmarkTaskPool_CPUTask/AddTask-8 72525 18160 ns/op 0 B/op 0 allocs/op BenchmarkTaskPool_CPUTask/AddTaskAlways-8 116119 18804 ns/op 1 B/op 0 allocs/op
BenchmarkTaskPool_CPUTask/AddTaskAlways-8 606813 16464 ns/op 40 B/op 0 allocs/op BenchmarkTaskPool_IOTask/AddTask-8 10000 103712 ns/op 0 B/op 0 allocs/op
BenchmarkTaskPool_CPUTask/AddTaskBalance-8 137926 17646 ns/op 0 B/op 0 allocs/op BenchmarkTaskPool_IOTask/AddTaskAlways-8 2034420 618 ns/op 87 B/op 1 allocs/op
BenchmarkTaskPool_IOTask/AddTask-8 10000 108520 ns/op 0 B/op 0 allocs/op BenchmarkTaskPool_RandomTask/AddTask-8 462364 2575 ns/op 6 B/op 0 allocs/op
BenchmarkTaskPool_IOTask/AddTaskAlways-8 1000000 1236 ns/op 95 B/op 1 allocs/op BenchmarkTaskPool_RandomTask/AddTaskAlways-8 3025962 415 ns/op 21 B/op 0 allocs/op
BenchmarkTaskPool_IOTask/AddTaskBalance-8 1518144 673 ns/op 63 B/op 0 allocs/op
BenchmarkTaskPool_RandomTask/AddTask-8 497055 2517 ns/op 6 B/op 0 allocs/op
BenchmarkTaskPool_RandomTask/AddTaskAlways-8 2511391 415 ns/op 21 B/op 0 allocs/op
BenchmarkTaskPool_RandomTask/AddTaskBalance-8 1381711 868 ns/op 17 B/op 0 allocs/op
PASS PASS
*/ */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment