Commit 4d35e108 authored by watermelon's avatar watermelon

opt: add tests for task_pool

parent cc350d55
......@@ -228,6 +228,11 @@ func (p *TaskPool) Close() {
type taskPoolSimple struct {
work chan task
sem chan struct{}
wg sync.WaitGroup
once sync.Once
done chan struct{}
}
// NewTaskPoolSimple build a simple task pool
......@@ -238,13 +243,23 @@ func NewTaskPoolSimple(size int) GenericTaskPool {
return &taskPoolSimple{
work: make(chan task),
sem: make(chan struct{}, size),
done: make(chan struct{}),
}
}
func (p *taskPoolSimple) AddTask(t task) bool {
select {
case <-p.done:
return false
default:
}
select {
case <-p.done:
return false
case p.work <- t:
case p.sem <- struct{}{}:
p.wg.Add(1)
go p.worker(t)
}
return true
......@@ -259,6 +274,7 @@ func (p *taskPoolSimple) AddTaskAlways(t task) {
select {
case p.work <- t:
case p.sem <- struct{}{}:
p.wg.Add(1)
go p.worker(t)
default:
goSafely(t)
......@@ -271,16 +287,42 @@ func (p *taskPoolSimple) worker(t task) {
fmt.Fprintf(os.Stderr, "%s goroutine panic: %v\n%s\n",
time.Now(), r, string(debug.Stack()))
}
p.wg.Done()
<-p.sem
}()
for {
t()
for t := range p.work {
t()
t = <-p.work
}
}
func (p *taskPoolSimple) Close() {}
// stop all tasks
func (p *taskPoolSimple) stop() {
select {
case <-p.done:
return
default:
p.once.Do(func() {
close(p.done)
close(p.work)
})
}
}
func (p *taskPoolSimple) Close() {
p.stop()
// wait until all tasks done
p.wg.Wait()
}
func (p *taskPoolSimple) IsClosed() bool { return false }
// check whether the session has been closed.
func (p *taskPoolSimple) IsClosed() bool {
select {
case <-p.done:
return true
default:
return false
}
}
func (p *taskPoolSimple) AddTaskBalance(t task) { p.AddTaskAlways(t) }
......@@ -33,6 +33,31 @@ func newCountTask() (func(), *int64) {
}, &cnt
}
func newIOTask() (func(), *int64) {
var cnt int64
return func() {
time.Sleep(700 * time.Microsecond)
}, &cnt
}
func newCPUTask() (func(), *int64) {
var cnt int64
return func() {
atomic.AddInt64(&cnt, int64(fib(22)))
}, &cnt
}
func newRandomTask() (func(), *int64) {
c := rand.Intn(4)
tasks := []func(){
func() { _ = fib(rand.Intn(20)) },
func() { t, _ := newCountTask(); t() },
func() { runtime.Gosched() },
func() { time.Sleep(time.Duration(rand.Int63n(100)) * time.Microsecond) },
}
return tasks[c], nil
}
func TestTaskPoolSimple(t *testing.T) {
numCPU := runtime.NumCPU()
taskCnt := int64(numCPU * numCPU * 100)
......@@ -56,7 +81,7 @@ func TestTaskPoolSimple(t *testing.T) {
}
wg.Wait()
if taskCnt != *cnt {
if taskCnt != atomic.LoadInt64(cnt) {
t.Error("want ", taskCnt, " got ", *cnt)
}
}
......@@ -94,13 +119,6 @@ func fib(n int) int {
func BenchmarkTaskPoolSimple_CPUTask(b *testing.B) {
tp := NewTaskPoolSimple(runtime.NumCPU())
newCPUTask := func() (func(), *int64) {
var cnt int64
return func() {
atomic.AddInt64(&cnt, int64(fib(22)))
}, &cnt
}
b.Run(`fib`, func(b *testing.B) {
t, _ := newCPUTask()
b.RunParallel(func(pb *testing.PB) {
......@@ -133,13 +151,6 @@ func BenchmarkTaskPoolSimple_CPUTask(b *testing.B) {
func BenchmarkTaskPoolSimple_IOTask(b *testing.B) {
tp := NewTaskPoolSimple(runtime.NumCPU())
newIOTask := func() (func(), *int64) {
var cnt int64
return func() {
time.Sleep(700 * time.Microsecond)
}, &cnt
}
b.Run(`AddTask`, func(b *testing.B) {
task, _ := newIOTask()
b.RunParallel(func(pb *testing.PB) {
......@@ -162,17 +173,6 @@ func BenchmarkTaskPoolSimple_IOTask(b *testing.B) {
func BenchmarkTaskPoolSimple_RandomTask(b *testing.B) {
tp := NewTaskPoolSimple(runtime.NumCPU())
newRandomTask := func() (func(), *int64) {
c := rand.Intn(4)
tasks := []func(){
func() { _ = fib(rand.Intn(20)) },
func() { t, _ := newCountTask(); t() },
func() { runtime.Gosched() },
func() { time.Sleep(time.Duration(rand.Int63n(100)) * time.Microsecond) },
}
return tasks[c], nil
}
b.Run(`AddTask`, func(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
......@@ -220,7 +220,7 @@ func TestTaskPool(t *testing.T) {
wg.Wait()
tp.Close()
if taskCnt != *cnt {
if taskCnt != atomic.LoadInt64(cnt) {
t.Error("want ", taskCnt, " got ", *cnt)
}
}
......@@ -269,13 +269,6 @@ func BenchmarkTaskPool_CPUTask(b *testing.B) {
//WithTaskPoolTaskQueueLength(runtime.NumCPU()),
)
newCPUTask := func() (func(), *int64) {
var cnt int64
return func() {
atomic.AddInt64(&cnt, int64(fib(22)))
}, &cnt
}
b.Run(`fib`, func(b *testing.B) {
t, _ := newCPUTask()
b.RunParallel(func(pb *testing.PB) {
......@@ -322,13 +315,6 @@ func BenchmarkTaskPool_IOTask(b *testing.B) {
//WithTaskPoolTaskQueueLength(runtime.NumCPU()),
)
newIOTask := func() (func(), *int64) {
var cnt int64
return func() {
time.Sleep(700 * time.Microsecond)
}, &cnt
}
b.Run(`AddTask`, func(b *testing.B) {
task, _ := newIOTask()
b.RunParallel(func(pb *testing.PB) {
......@@ -364,17 +350,6 @@ func BenchmarkTaskPool_RandomTask(b *testing.B) {
//WithTaskPoolTaskQueueLength(runtime.NumCPU()),
)
newRandomTask := func() (func(), *int64) {
c := rand.Intn(4)
tasks := []func(){
func() { _ = fib(rand.Intn(20)) },
func() { t, _ := newCountTask(); t() },
func() { runtime.Gosched() },
func() { time.Sleep(time.Duration(rand.Int63n(100)) * time.Microsecond) },
}
return tasks[c], nil
}
b.Run(`AddTask`, func(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
......@@ -403,10 +378,114 @@ func BenchmarkTaskPool_RandomTask(b *testing.B) {
})
}
func PrintMemUsage(t *testing.T, prefix string) {
var m runtime.MemStats
runtime.ReadMemStats(&m)
t.Logf("%s Alloc = %v MiB", prefix, bToMb(m.Alloc))
t.Logf("\tTotalAlloc = %v MiB", bToMb(m.TotalAlloc))
t.Logf("\tSys = %v MiB", bToMb(m.Sys))
t.Logf("\tNumGC = %v\n", m.NumGC)
}
func elapsed(t *testing.T, what string) func() {
start := time.Now()
return func() {
t.Logf("\n\t %s took %v\n", what, time.Since(start))
}
}
func bToMb(b uint64) uint64 {
return b / 1024 / 1024
}
var n = 100000
func TestWithoutPool(t *testing.T) {
PrintMemUsage(t, "Before")
numG := runtime.NumGoroutine()
defer elapsed(t, "TestWithoutPool")()
var wg sync.WaitGroup
task, _ := newIOTask()
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
task()
wg.Done()
}()
}
t.Logf("TestWithoutPool took %v goroutines\n", runtime.NumGoroutine()-numG)
wg.Wait()
PrintMemUsage(t, "After")
}
func TestWithSimpledPoolUseAlways(t *testing.T) {
PrintMemUsage(t, "Before")
numG := runtime.NumGoroutine()
defer elapsed(t, "TestWithSimplePool")()
tp := NewTaskPoolSimple(1000)
task, _ := newIOTask()
for i := 0; i < n; i++ {
tp.AddTaskAlways(task)
}
t.Logf("TestWithSimplePool took %v goroutines\n", runtime.NumGoroutine()-numG)
tp.Close()
PrintMemUsage(t, "After")
}
func TestWithSimplePool(t *testing.T) {
PrintMemUsage(t, "Before")
numG := runtime.NumGoroutine()
defer elapsed(t, "TestWithSimplePool")()
tp := NewTaskPoolSimple(1000)
task, _ := newIOTask()
for i := 0; i < n; i++ {
tp.AddTask(task)
}
t.Logf("TestWithSimplePool took %v goroutines\n", runtime.NumGoroutine()-numG)
tp.Close()
PrintMemUsage(t, "After")
}
func TestWithPool(t *testing.T) {
PrintMemUsage(t, "Before")
numG := runtime.NumGoroutine()
defer elapsed(t, "TestWithPool")()
tp := NewTaskPool(
WithTaskPoolTaskPoolSize(1000),
WithTaskPoolTaskQueueNumber(2),
//WithTaskPoolTaskQueueLength(runtime.NumCPU()),
)
task, _ := newIOTask()
for i := 0; i < n; i++ {
tp.AddTask(task)
}
t.Logf("TestWithPool took %v goroutines\n", runtime.NumGoroutine()-numG)
tp.Close()
PrintMemUsage(t, "After")
}
func TestWithPoolUseAlways(t *testing.T) {
PrintMemUsage(t, "Before")
numG := runtime.NumGoroutine()
defer elapsed(t, "TestWithPoolUseAlways")()
tp := NewTaskPool(
WithTaskPoolTaskPoolSize(1000),
WithTaskPoolTaskQueueNumber(10),
//WithTaskPoolTaskQueueLength(runtime.NumCPU()),
)
task, _ := newIOTask()
for i := 0; i < n; i++ {
tp.AddTaskAlways(task)
}
t.Logf("TestWithPoolUseAlways took %v goroutines\n", runtime.NumGoroutine()-numG)
tp.Close()
PrintMemUsage(t, "After")
}
/*
goos: darwin
goarch: amd64
pkg: github.com/dubbogo/gost/sync
pkg: github.com/dubbogo/gost/sync 执行次数 单次执行时间 单次执行内存消耗 单次执行内存分配次数
BenchmarkTaskPoolSimple_CountTask/AddTask-8 1693192 700 ns/op 0 B/op 0 allocs/op
BenchmarkTaskPoolSimple_CountTask/AddTaskAlways-8 3262932 315 ns/op 0 B/op 0 allocs/op
BenchmarkTaskPoolSimple_CPUTask/fib-8 83479 14760 ns/op 0 B/op 0 allocs/op
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment