Unverified Commit 36312663 authored by huiren's avatar huiren Committed by GitHub

sync/task_pool: addTaskAlways (#25)

* add auto-comment
Signed-off-by: 's avatarhuiren <zhrlnt@gmail.com>

* add auto-comment
Signed-off-by: 's avatarhuiren <zhrlnt@gmail.com>

* worker interface
Signed-off-by: 's avatarhuiren <zhrlnt@gmail.com>

* comments
Signed-off-by: 's avatarhuiren <zhrlnt@gmail.com>

* worker impl
Signed-off-by: 's avatarhuiren <zhrlnt@gmail.com>

* revert rename
Signed-off-by: 's avatarhuiren <zhrlnt@gmail.com>

* task poll test
Signed-off-by: 's avatarhuiren <zhrlnt@gmail.com>

* testcase
Signed-off-by: 's avatarhuiren <zhrlnt@gmail.com>

* test case & comment
Signed-off-by: 's avatarhuiren <zhrlnt@gmail.com>

* rm worker
Signed-off-by: 's avatarhuiren <zhrlnt@gmail.com>

* bench result
Signed-off-by: 's avatarhuiren <zhrlnt@gmail.com>

* fix addtaskbalance
Signed-off-by: 's avatarhuiren <zhrlnt@gmail.com>

* use gosafety in pool_task

* fix comment idle

* addtask return success status

* update select code

* AddTaskAlways don't listen done signal

* split import

* remove catch handler
Co-authored-by: 's avatarJoe Zou <yixian.zou@gmail.com>
parent 20509bdb
......@@ -26,7 +26,7 @@ import (
)
// GoSafely wraps a `go func()` with recover()
func GoSafely(wg *sync.WaitGroup, ignoreRecover bool, handler func(), finalFunc func(r interface{})) {
func GoSafely(wg *sync.WaitGroup, ignoreRecover bool, handler func(), catchFunc func(r interface{})) {
if wg != nil {
wg.Add(1)
}
......@@ -37,7 +37,7 @@ func GoSafely(wg *sync.WaitGroup, ignoreRecover bool, handler func(), finalFunc
fmt.Fprintf(os.Stderr, "%s goroutine panic: %v\n%s\n",
time.Now(), r, string(debug.Stack()))
}
if finalFunc != nil {
if catchFunc != nil {
if wg != nil {
wg.Add(1)
}
......@@ -54,7 +54,7 @@ func GoSafely(wg *sync.WaitGroup, ignoreRecover bool, handler func(), finalFunc
wg.Done()
}
}()
finalFunc(r)
catchFunc(r)
}()
}
}
......
......@@ -19,10 +19,16 @@ package gxsync
import (
"fmt"
"log"
"math/rand"
"sync"
"sync/atomic"
)
import (
gxruntime "github.com/dubbogo/gost/runtime"
)
const (
defaultTaskQNumber = 10
defaultTaskQLen = 128
......@@ -33,9 +39,9 @@ const (
/////////////////////////////////////////
type TaskPoolOptions struct {
tQLen int // task queue length
tQNumber int // task queue number
tQPoolSize int // task pool size
tQLen int // task queue length. buffer size per queue
tQNumber int // task queue number. number of queue
tQPoolSize int // task pool size. number of workers
}
func (o *TaskPoolOptions) validate() {
......@@ -127,10 +133,23 @@ func (p *TaskPool) start() {
p.wg.Add(1)
workerID := i
q := p.qArray[workerID%p.tQNumber]
go p.run(int(workerID), q)
p.safeRun(workerID, q)
}
}
func (p *TaskPool) safeRun(workerID int, q chan task) {
gxruntime.GoSafely(nil, false,
func() {
err := p.run(int(workerID), q)
if err != nil {
// log error to stderr
log.Printf("gost/TaskPool.run error: %s", err.Error())
}
},
nil,
)
}
// worker
func (p *TaskPool) run(id int, q chan task) error {
defer p.wg.Done()
......@@ -158,15 +177,53 @@ func (p *TaskPool) run(id int, q chan task) error {
}
}
// add task
func (p *TaskPool) AddTask(t task) {
id := atomic.AddUint32(&p.idx, 1) % uint32(p.tQNumber)
// AddTask wait idle worker add task
// return false when the pool is stop
func (p *TaskPool) AddTask(t task) (ok bool) {
idx := atomic.AddUint32(&p.idx, 1)
id := idx % uint32(p.tQNumber)
select {
case <-p.done:
return
return false
default:
p.qArray[id] <- t
return true
}
}
// AddTaskAlways add task to queues or do it immediately
func (p *TaskPool) AddTaskAlways(t task) {
id := atomic.AddUint32(&p.idx, 1) % uint32(p.tQNumber)
select {
case p.qArray[id] <- t:
return
default:
p.goSafely(t)
}
}
// AddTaskBalance add task to idle queue
// do it immediately when no idle queue
func (p *TaskPool) AddTaskBalance(t task) {
length := len(p.qArray)
// try len/2 times to lookup idle queue
for i := 0; i < length/2; i++ {
select {
case p.qArray[rand.Intn(length)] <- t:
return
default:
continue
}
}
p.goSafely(t)
}
func (p *TaskPool) goSafely(fn func()) {
gxruntime.GoSafely(nil, false, fn, nil)
}
// stop all tasks
......
package gxsync
import (
"math/rand"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
)
func newCountTask() (func(), *int64) {
var cnt int64
return func() {
atomic.AddInt64(&cnt, 1)
}, &cnt
}
func TestTaskPool(t *testing.T) {
numCPU := runtime.NumCPU()
taskCnt := int64(numCPU * numCPU * 100)
tp := NewTaskPool(
WithTaskPoolTaskPoolSize(1),
WithTaskPoolTaskQueueNumber(1),
WithTaskPoolTaskQueueLength(1),
)
task, cnt := newCountTask()
var wg sync.WaitGroup
for i := 0; i < numCPU*numCPU; i++ {
wg.Add(1)
go func() {
for j := 0; j < 100; j++ {
ok := tp.AddTask(task)
if !ok {
t.Log(j)
}
}
wg.Done()
}()
}
wg.Wait()
tp.Close()
if taskCnt != *cnt {
t.Error("want ", taskCnt, " got ", *cnt)
}
}
func BenchmarkTaskPool_CountTask(b *testing.B) {
tp := NewTaskPool(
WithTaskPoolTaskPoolSize(runtime.NumCPU()),
WithTaskPoolTaskQueueNumber(runtime.NumCPU()),
//WithTaskPoolTaskQueueLength(runtime.NumCPU()),
)
b.Run(`AddTask`, func(b *testing.B) {
task, _ := newCountTask()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
tp.AddTask(task)
}
})
})
b.Run(`AddTaskAlways`, func(b *testing.B) {
task, _ := newCountTask()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
tp.AddTaskAlways(task)
}
})
})
b.Run(`AddTaskBalance`, func(b *testing.B) {
task, _ := newCountTask()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
tp.AddTaskBalance(task)
}
})
})
}
func fib(n int) int {
if n < 3 {
return 1
}
return fib(n-1) + fib(n-2)
}
// cpu-intensive task
func BenchmarkTaskPool_CPUTask(b *testing.B) {
tp := NewTaskPool(
WithTaskPoolTaskPoolSize(runtime.NumCPU()),
WithTaskPoolTaskQueueNumber(runtime.NumCPU()),
//WithTaskPoolTaskQueueLength(runtime.NumCPU()),
)
newCPUTask := func() (func(), *int64) {
var cnt int64
return func() {
atomic.AddInt64(&cnt, int64(fib(22)))
}, &cnt
}
b.Run(`fib`, func(b *testing.B) {
t, _ := newCPUTask()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
t()
}
})
})
b.Run(`AddTask`, func(b *testing.B) {
task, _ := newCPUTask()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
tp.AddTask(task)
}
})
})
b.Run(`AddTaskAlways`, func(b *testing.B) {
task, _ := newCPUTask()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
tp.AddTaskAlways(task)
}
})
})
b.Run(`AddTaskBalance`, func(b *testing.B) {
task, _ := newCPUTask()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
tp.AddTaskBalance(task)
}
})
})
}
// IO-intensive task
func BenchmarkTaskPool_IOTask(b *testing.B) {
tp := NewTaskPool(
WithTaskPoolTaskPoolSize(runtime.NumCPU()),
WithTaskPoolTaskQueueNumber(runtime.NumCPU()),
//WithTaskPoolTaskQueueLength(runtime.NumCPU()),
)
newIOTask := func() (func(), *int64) {
var cnt int64
return func() {
time.Sleep(700 * time.Microsecond)
}, &cnt
}
b.Run(`AddTask`, func(b *testing.B) {
task, _ := newIOTask()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
tp.AddTask(task)
}
})
})
b.Run(`AddTaskAlways`, func(b *testing.B) {
task, _ := newIOTask()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
tp.AddTaskAlways(task)
}
})
})
b.Run(`AddTaskBalance`, func(b *testing.B) {
task, _ := newIOTask()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
tp.AddTaskBalance(task)
}
})
})
}
func BenchmarkTaskPool_RandomTask(b *testing.B) {
tp := NewTaskPool(
WithTaskPoolTaskPoolSize(runtime.NumCPU()),
WithTaskPoolTaskQueueNumber(runtime.NumCPU()),
//WithTaskPoolTaskQueueLength(runtime.NumCPU()),
)
newRandomTask := func() (func(), *int64) {
c := rand.Intn(4)
tasks := []func(){
func() { _ = fib(rand.Intn(20)) },
func() { t, _ := newCountTask(); t() },
func() { runtime.Gosched() },
func() { time.Sleep(time.Duration(rand.Int63n(100)) * time.Microsecond) },
}
return tasks[c], nil
}
b.Run(`AddTask`, func(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
task, _ := newRandomTask()
tp.AddTask(task)
}
})
})
b.Run(`AddTaskAlways`, func(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
task, _ := newRandomTask()
tp.AddTaskAlways(task)
}
})
})
b.Run(`AddTaskBalance`, func(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
task, _ := newRandomTask()
tp.AddTaskBalance(task)
}
})
})
}
/*
pkg: github.com/dubbogo/gost/sync
BenchmarkTaskPool_CountTask/AddTask-8 2872177 380 ns/op 0 B/op 0 allocs/op
BenchmarkTaskPool_CountTask/AddTaskAlways-8 2769730 455 ns/op 1 B/op 0 allocs/op
BenchmarkTaskPool_CountTask/AddTaskBalance-8 4630167 248 ns/op 0 B/op 0 allocs/op
BenchmarkTaskPool_CPUTask/fib-8 73975 16524 ns/op 0 B/op 0 allocs/op
BenchmarkTaskPool_CPUTask/AddTask-8 72525 18160 ns/op 0 B/op 0 allocs/op
BenchmarkTaskPool_CPUTask/AddTaskAlways-8 606813 16464 ns/op 40 B/op 0 allocs/op
BenchmarkTaskPool_CPUTask/AddTaskBalance-8 137926 17646 ns/op 0 B/op 0 allocs/op
BenchmarkTaskPool_IOTask/AddTask-8 10000 108520 ns/op 0 B/op 0 allocs/op
BenchmarkTaskPool_IOTask/AddTaskAlways-8 1000000 1236 ns/op 95 B/op 1 allocs/op
BenchmarkTaskPool_IOTask/AddTaskBalance-8 1518144 673 ns/op 63 B/op 0 allocs/op
BenchmarkTaskPool_RandomTask/AddTask-8 497055 2517 ns/op 6 B/op 0 allocs/op
BenchmarkTaskPool_RandomTask/AddTaskAlways-8 2511391 415 ns/op 21 B/op 0 allocs/op
BenchmarkTaskPool_RandomTask/AddTaskBalance-8 1381711 868 ns/op 17 B/op 0 allocs/op
PASS
*/
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment