Unverified Commit 522be9af authored by Xin.Zh's avatar Xin.Zh Committed by GitHub

Merge pull request #35 from watermelo/opt_taskpool

Add: simple task pool without queue
parents ca4cfcb0 eeb1680e
......@@ -19,3 +19,5 @@ classes
# vim
*.swp
vendor/
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package gxsync
import (
"fmt"
)
const (
defaultTaskQNumber = 10
defaultTaskQLen = 128
)
/////////////////////////////////////////
// Task Pool Options
/////////////////////////////////////////
// TaskPoolOptions is optional settings for task pool
type TaskPoolOptions struct {
tQLen int // task queue length. buffer size per queue
tQNumber int // task queue number. number of queue
tQPoolSize int // task pool size. number of workers
}
func (o *TaskPoolOptions) validate() {
if o.tQPoolSize < 1 {
panic(fmt.Sprintf("illegal pool size %d", o.tQPoolSize))
}
if o.tQLen < 1 {
o.tQLen = defaultTaskQLen
}
if o.tQNumber < 1 {
o.tQNumber = defaultTaskQNumber
}
if o.tQNumber > o.tQPoolSize {
o.tQNumber = o.tQPoolSize
}
}
type TaskPoolOption func(*TaskPoolOptions)
// WithTaskPoolTaskPoolSize set @size of the task queue pool size
func WithTaskPoolTaskPoolSize(size int) TaskPoolOption {
return func(o *TaskPoolOptions) {
o.tQPoolSize = size
}
}
// WithTaskPoolTaskQueueLength set @length of the task queue length
func WithTaskPoolTaskQueueLength(length int) TaskPoolOption {
return func(o *TaskPoolOptions) {
o.tQLen = length
}
}
// WithTaskPoolTaskQueueNumber set @number of the task queue number
func WithTaskPoolTaskQueueNumber(number int) TaskPoolOption {
return func(o *TaskPoolOptions) {
o.tQNumber = number
}
}
......@@ -22,6 +22,7 @@ import (
"log"
"math/rand"
"os"
"runtime"
"runtime/debug"
"sync"
"sync/atomic"
......@@ -32,69 +33,29 @@ import (
gxruntime "github.com/dubbogo/gost/runtime"
)
const (
defaultTaskQNumber = 10
defaultTaskQLen = 128
)
/////////////////////////////////////////
// Task Pool Options
/////////////////////////////////////////
type TaskPoolOptions struct {
tQLen int // task queue length. buffer size per queue
tQNumber int // task queue number. number of queue
tQPoolSize int // task pool size. number of workers
}
func (o *TaskPoolOptions) validate() {
if o.tQPoolSize < 1 {
panic(fmt.Sprintf("illegal pool size %d", o.tQPoolSize))
}
if o.tQLen < 1 {
o.tQLen = defaultTaskQLen
}
if o.tQNumber < 1 {
o.tQNumber = defaultTaskQNumber
}
if o.tQNumber > o.tQPoolSize {
o.tQNumber = o.tQPoolSize
}
}
type TaskPoolOption func(*TaskPoolOptions)
// @size is the task queue pool size
func WithTaskPoolTaskPoolSize(size int) TaskPoolOption {
return func(o *TaskPoolOptions) {
o.tQPoolSize = size
}
}
type task func()
// @length is the task queue length
func WithTaskPoolTaskQueueLength(length int) TaskPoolOption {
return func(o *TaskPoolOptions) {
o.tQLen = length
}
// GenericTaskPool represents an generic task pool.
type GenericTaskPool interface {
// AddTask wait idle worker add task
AddTask(t task) bool
// AddTaskAlways add task to queues or do it immediately
AddTaskAlways(t task)
// AddTaskBalance add task to idle queue
AddTaskBalance(t task)
// Close use to close the task pool
Close()
// IsClosed use to check pool status.
IsClosed() bool
}
// @number is the task queue number
func WithTaskPoolTaskQueueNumber(number int) TaskPoolOption {
return func(o *TaskPoolOptions) {
o.tQNumber = number
}
func goSafely(fn func()) {
gxruntime.GoSafely(nil, false, fn, nil)
}
/////////////////////////////////////////
// Task Pool
/////////////////////////////////////////
// task t
type task func()
// task pool: manage task ts
type TaskPool struct {
TaskPoolOptions
......@@ -107,8 +68,8 @@ type TaskPool struct {
done chan struct{}
}
// build a task pool
func NewTaskPool(opts ...TaskPoolOption) *TaskPool {
// NewTaskPool build a task pool
func NewTaskPool(opts ...TaskPoolOption) GenericTaskPool {
var tOpts TaskPoolOptions
for _, opt := range opts {
opt(&tOpts)
......@@ -188,7 +149,6 @@ func (p *TaskPool) run(id int, q chan task) error {
}
}
// AddTask wait idle worker add task
// return false when the pool is stop
func (p *TaskPool) AddTask(t task) (ok bool) {
idx := atomic.AddUint32(&p.idx, 1)
......@@ -203,7 +163,6 @@ func (p *TaskPool) AddTask(t task) (ok bool) {
}
}
// AddTaskAlways add task to queues or do it immediately
func (p *TaskPool) AddTaskAlways(t task) {
id := atomic.AddUint32(&p.idx, 1) % uint32(p.tQNumber)
......@@ -211,11 +170,10 @@ func (p *TaskPool) AddTaskAlways(t task) {
case p.qArray[id] <- t:
return
default:
p.goSafely(t)
goSafely(t)
}
}
// AddTaskBalance add task to idle queue
// do it immediately when no idle queue
func (p *TaskPool) AddTaskBalance(t task) {
length := len(p.qArray)
......@@ -230,11 +188,7 @@ func (p *TaskPool) AddTaskBalance(t task) {
}
}
p.goSafely(t)
}
func (p *TaskPool) goSafely(fn func()) {
gxruntime.GoSafely(nil, false, fn, nil)
goSafely(t)
}
// stop all tasks
......@@ -267,3 +221,114 @@ func (p *TaskPool) Close() {
close(p.qArray[i])
}
}
/////////////////////////////////////////
// Task Pool Simple
/////////////////////////////////////////
type taskPoolSimple struct {
work chan task
sem chan struct{}
wg sync.WaitGroup
once sync.Once
done chan struct{}
}
// NewTaskPoolSimple build a simple task pool
func NewTaskPoolSimple(size int) GenericTaskPool {
if size < 1 {
size = runtime.NumCPU() * 100
}
return &taskPoolSimple{
work: make(chan task),
sem: make(chan struct{}, size),
done: make(chan struct{}),
}
}
func (p *taskPoolSimple) AddTask(t task) bool {
select {
case <-p.done:
return false
default:
}
select {
case <-p.done:
return false
case p.work <- t:
case p.sem <- struct{}{}:
p.wg.Add(1)
go p.worker(t)
}
return true
}
func (p *taskPoolSimple) AddTaskAlways(t task) {
select {
case <-p.done:
return
default:
}
select {
case p.work <- t:
return
default:
}
select {
case p.work <- t:
case p.sem <- struct{}{}:
p.wg.Add(1)
go p.worker(t)
default:
goSafely(t)
}
}
func (p *taskPoolSimple) worker(t task) {
defer func() {
if r := recover(); r != nil {
fmt.Fprintf(os.Stderr, "%s goroutine panic: %v\n%s\n",
time.Now(), r, string(debug.Stack()))
}
p.wg.Done()
<-p.sem
}()
t()
for t := range p.work {
t()
}
}
// stop all tasks
func (p *taskPoolSimple) stop() {
select {
case <-p.done:
return
default:
p.once.Do(func() {
close(p.done)
close(p.work)
})
}
}
func (p *taskPoolSimple) Close() {
p.stop()
// wait until all tasks done
p.wg.Wait()
}
// check whether the session has been closed.
func (p *taskPoolSimple) IsClosed() bool {
select {
case <-p.done:
return true
default:
return false
}
}
func (p *taskPoolSimple) AddTaskBalance(t task) { p.AddTaskAlways(t) }
......@@ -33,6 +33,165 @@ func newCountTask() (func(), *int64) {
}, &cnt
}
func newIOTask() (func(), *int64) {
var cnt int64
return func() {
time.Sleep(700 * time.Microsecond)
}, &cnt
}
func newCPUTask() (func(), *int64) {
var cnt int64
return func() {
atomic.AddInt64(&cnt, int64(fib(22)))
}, &cnt
}
func newRandomTask() (func(), *int64) {
c := rand.Intn(4)
tasks := []func(){
func() { _ = fib(rand.Intn(20)) },
func() { t, _ := newCountTask(); t() },
func() { runtime.Gosched() },
func() { time.Sleep(time.Duration(rand.Int63n(100)) * time.Microsecond) },
}
return tasks[c], nil
}
func TestTaskPoolSimple(t *testing.T) {
numCPU := runtime.NumCPU()
taskCnt := int64(numCPU * numCPU * 100)
tp := NewTaskPoolSimple(1)
task, cnt := newCountTask()
var wg sync.WaitGroup
for i := 0; i < numCPU*numCPU; i++ {
wg.Add(1)
go func() {
for j := 0; j < 100; j++ {
ok := tp.AddTask(task)
if !ok {
t.Log(j)
}
}
wg.Done()
}()
}
wg.Wait()
if taskCnt != atomic.LoadInt64(cnt) {
t.Error("want ", taskCnt, " got ", *cnt)
}
}
func BenchmarkTaskPoolSimple_CountTask(b *testing.B) {
tp := NewTaskPoolSimple(runtime.NumCPU())
b.Run(`AddTask`, func(b *testing.B) {
task, _ := newCountTask()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
tp.AddTask(task)
}
})
})
b.Run(`AddTaskAlways`, func(b *testing.B) {
task, _ := newCountTask()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
tp.AddTaskAlways(task)
}
})
})
}
func fib(n int) int {
if n < 3 {
return 1
}
return fib(n-1) + fib(n-2)
}
// cpu-intensive task
func BenchmarkTaskPoolSimple_CPUTask(b *testing.B) {
tp := NewTaskPoolSimple(runtime.NumCPU())
b.Run(`fib`, func(b *testing.B) {
t, _ := newCPUTask()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
t()
}
})
})
b.Run(`AddTask`, func(b *testing.B) {
task, _ := newCPUTask()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
tp.AddTask(task)
}
})
})
b.Run(`AddTaskAlways`, func(b *testing.B) {
task, _ := newCPUTask()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
tp.AddTaskAlways(task)
}
})
})
}
// IO-intensive task
func BenchmarkTaskPoolSimple_IOTask(b *testing.B) {
tp := NewTaskPoolSimple(runtime.NumCPU())
b.Run(`AddTask`, func(b *testing.B) {
task, _ := newIOTask()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
tp.AddTask(task)
}
})
})
b.Run(`AddTaskAlways`, func(b *testing.B) {
task, _ := newIOTask()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
tp.AddTaskAlways(task)
}
})
})
}
func BenchmarkTaskPoolSimple_RandomTask(b *testing.B) {
tp := NewTaskPoolSimple(runtime.NumCPU())
b.Run(`AddTask`, func(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
task, _ := newRandomTask()
tp.AddTask(task)
}
})
})
b.Run(`AddTaskAlways`, func(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
task, _ := newRandomTask()
tp.AddTaskAlways(task)
}
})
})
}
func TestTaskPool(t *testing.T) {
numCPU := runtime.NumCPU()
taskCnt := int64(numCPU * numCPU * 100)
......@@ -61,7 +220,7 @@ func TestTaskPool(t *testing.T) {
wg.Wait()
tp.Close()
if taskCnt != *cnt {
if taskCnt != atomic.LoadInt64(cnt) {
t.Error("want ", taskCnt, " got ", *cnt)
}
}
......@@ -102,13 +261,6 @@ func BenchmarkTaskPool_CountTask(b *testing.B) {
}
func fib(n int) int {
if n < 3 {
return 1
}
return fib(n-1) + fib(n-2)
}
// cpu-intensive task
func BenchmarkTaskPool_CPUTask(b *testing.B) {
tp := NewTaskPool(
......@@ -117,13 +269,6 @@ func BenchmarkTaskPool_CPUTask(b *testing.B) {
//WithTaskPoolTaskQueueLength(runtime.NumCPU()),
)
newCPUTask := func() (func(), *int64) {
var cnt int64
return func() {
atomic.AddInt64(&cnt, int64(fib(22)))
}, &cnt
}
b.Run(`fib`, func(b *testing.B) {
t, _ := newCPUTask()
b.RunParallel(func(pb *testing.PB) {
......@@ -170,13 +315,6 @@ func BenchmarkTaskPool_IOTask(b *testing.B) {
//WithTaskPoolTaskQueueLength(runtime.NumCPU()),
)
newIOTask := func() (func(), *int64) {
var cnt int64
return func() {
time.Sleep(700 * time.Microsecond)
}, &cnt
}
b.Run(`AddTask`, func(b *testing.B) {
task, _ := newIOTask()
b.RunParallel(func(pb *testing.PB) {
......@@ -212,17 +350,6 @@ func BenchmarkTaskPool_RandomTask(b *testing.B) {
//WithTaskPoolTaskQueueLength(runtime.NumCPU()),
)
newRandomTask := func() (func(), *int64) {
c := rand.Intn(4)
tasks := []func(){
func() { _ = fib(rand.Intn(20)) },
func() { t, _ := newCountTask(); t() },
func() { runtime.Gosched() },
func() { time.Sleep(time.Duration(rand.Int63n(100)) * time.Microsecond) },
}
return tasks[c], nil
}
b.Run(`AddTask`, func(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
......@@ -251,22 +378,138 @@ func BenchmarkTaskPool_RandomTask(b *testing.B) {
})
}
/*
func PrintMemUsage(t *testing.T, prefix string) {
var m runtime.MemStats
runtime.ReadMemStats(&m)
t.Logf("%s Alloc = %v MiB", prefix, bToMb(m.Alloc))
t.Logf("\tTotalAlloc = %v MiB", bToMb(m.TotalAlloc))
t.Logf("\tSys = %v MiB", bToMb(m.Sys))
t.Logf("\tNumGC = %v\n", m.NumGC)
}
func elapsed(t *testing.T, what string) func() {
start := time.Now()
return func() {
t.Logf("\n\t %s took %v\n", what, time.Since(start))
}
}
func bToMb(b uint64) uint64 {
return b / 1024 / 1024
}
var n = 100000
func TestWithoutPool(t *testing.T) {
PrintMemUsage(t, "Before")
numG := runtime.NumGoroutine()
defer elapsed(t, "TestWithoutPool")()
var wg sync.WaitGroup
task, _ := newIOTask()
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
task()
wg.Done()
}()
}
t.Logf("TestWithoutPool took %v goroutines\n", runtime.NumGoroutine()-numG)
wg.Wait()
PrintMemUsage(t, "After")
}
pkg: github.com/dubbogo/gost/sync
BenchmarkTaskPool_CountTask/AddTask-8 2872177 380 ns/op 0 B/op 0 allocs/op
BenchmarkTaskPool_CountTask/AddTaskAlways-8 2769730 455 ns/op 1 B/op 0 allocs/op
BenchmarkTaskPool_CountTask/AddTaskBalance-8 4630167 248 ns/op 0 B/op 0 allocs/op
BenchmarkTaskPool_CPUTask/fib-8 73975 16524 ns/op 0 B/op 0 allocs/op
BenchmarkTaskPool_CPUTask/AddTask-8 72525 18160 ns/op 0 B/op 0 allocs/op
BenchmarkTaskPool_CPUTask/AddTaskAlways-8 606813 16464 ns/op 40 B/op 0 allocs/op
BenchmarkTaskPool_CPUTask/AddTaskBalance-8 137926 17646 ns/op 0 B/op 0 allocs/op
BenchmarkTaskPool_IOTask/AddTask-8 10000 108520 ns/op 0 B/op 0 allocs/op
BenchmarkTaskPool_IOTask/AddTaskAlways-8 1000000 1236 ns/op 95 B/op 1 allocs/op
BenchmarkTaskPool_IOTask/AddTaskBalance-8 1518144 673 ns/op 63 B/op 0 allocs/op
BenchmarkTaskPool_RandomTask/AddTask-8 497055 2517 ns/op 6 B/op 0 allocs/op
BenchmarkTaskPool_RandomTask/AddTaskAlways-8 2511391 415 ns/op 21 B/op 0 allocs/op
BenchmarkTaskPool_RandomTask/AddTaskBalance-8 1381711 868 ns/op 17 B/op 0 allocs/op
func TestWithSimpledPoolUseAlways(t *testing.T) {
PrintMemUsage(t, "Before")
numG := runtime.NumGoroutine()
defer elapsed(t, "TestWithSimplePool")()
tp := NewTaskPoolSimple(1000)
task, _ := newIOTask()
for i := 0; i < n; i++ {
tp.AddTaskAlways(task)
}
t.Logf("TestWithSimplePool took %v goroutines\n", runtime.NumGoroutine()-numG)
tp.Close()
PrintMemUsage(t, "After")
}
func TestWithSimplePool(t *testing.T) {
PrintMemUsage(t, "Before")
numG := runtime.NumGoroutine()
defer elapsed(t, "TestWithSimplePool")()
tp := NewTaskPoolSimple(1000)
task, _ := newIOTask()
for i := 0; i < n; i++ {
tp.AddTask(task)
}
t.Logf("TestWithSimplePool took %v goroutines\n", runtime.NumGoroutine()-numG)
tp.Close()
PrintMemUsage(t, "After")
}
func TestWithPool(t *testing.T) {
PrintMemUsage(t, "Before")
numG := runtime.NumGoroutine()
defer elapsed(t, "TestWithPool")()
tp := NewTaskPool(
WithTaskPoolTaskPoolSize(1000),
WithTaskPoolTaskQueueNumber(2),
//WithTaskPoolTaskQueueLength(runtime.NumCPU()),
)
task, _ := newIOTask()
for i := 0; i < n; i++ {
tp.AddTask(task)
}
t.Logf("TestWithPool took %v goroutines\n", runtime.NumGoroutine()-numG)
tp.Close()
PrintMemUsage(t, "After")
}
func TestWithPoolUseAlways(t *testing.T) {
PrintMemUsage(t, "Before")
numG := runtime.NumGoroutine()
defer elapsed(t, "TestWithPoolUseAlways")()
tp := NewTaskPool(
WithTaskPoolTaskPoolSize(1000),
WithTaskPoolTaskQueueNumber(10),
//WithTaskPoolTaskQueueLength(runtime.NumCPU()),
)
task, _ := newIOTask()
for i := 0; i < n; i++ {
tp.AddTaskAlways(task)
}
t.Logf("TestWithPoolUseAlways took %v goroutines\n", runtime.NumGoroutine()-numG)
tp.Close()
PrintMemUsage(t, "After")
}
/*
goos: darwin
goarch: amd64
pkg: github.com/dubbogo/gost/sync 执行次数 单次执行时间 单次执行内存消耗 单次执行内存分配次数
BenchmarkTaskPoolSimple_CountTask/AddTask-8 1693192 700 ns/op 0 B/op 0 allocs/op
BenchmarkTaskPoolSimple_CountTask/AddTaskAlways-8 3262932 315 ns/op 0 B/op 0 allocs/op
BenchmarkTaskPoolSimple_CPUTask/fib-8 83479 14760 ns/op 0 B/op 0 allocs/op
BenchmarkTaskPoolSimple_CPUTask/AddTask-8 85956 14571 ns/op 0 B/op 0 allocs/op
BenchmarkTaskPoolSimple_CPUTask/AddTaskAlways-8 1000000 17712 ns/op 19 B/op 0 allocs/op
BenchmarkTaskPoolSimple_IOTask/AddTask-8 10000 107361 ns/op 0 B/op 0 allocs/op
BenchmarkTaskPoolSimple_IOTask/AddTaskAlways-8 2772476 477 ns/op 79 B/op 1 allocs/op
BenchmarkTaskPoolSimple_RandomTask/AddTask-8 499417 2451 ns/op 6 B/op 0 allocs/op
BenchmarkTaskPoolSimple_RandomTask/AddTaskAlways-8 3307748 354 ns/op 21 B/op 0 allocs/op
BenchmarkTaskPool_CountTask/AddTask-8 5367189 229 ns/op 0 B/op 0 allocs/op
BenchmarkTaskPool_CountTask/AddTaskAlways-8 5438667 218 ns/op 0 B/op 0 allocs/op
BenchmarkTaskPool_CountTask/AddTaskBalance-8 4765616 247 ns/op 0 B/op 0 allocs/op
BenchmarkTaskPool_CPUTask/fib-8 74749 17153 ns/op 0 B/op 0 allocs/op
BenchmarkTaskPool_CPUTask/AddTask-8 71020 18131 ns/op 0 B/op 0 allocs/op
BenchmarkTaskPool_CPUTask/AddTaskAlways-8 563931 17725 ns/op 0 B/op 0 allocs/op
BenchmarkTaskPool_CPUTask/AddTaskBalance-8 204085 17720 ns/op 0 B/op 0 allocs/op
BenchmarkTaskPool_IOTask/AddTask-8 12427 106108 ns/op 0 B/op 0 allocs/op
BenchmarkTaskPool_IOTask/AddTaskAlways-8 2607068 504 ns/op 81 B/op 1 allocs/op
BenchmarkTaskPool_IOTask/AddTaskBalance-8 2065213 580 ns/op 63 B/op 0 allocs/op
BenchmarkTaskPool_RandomTask/AddTask-8 590595 2274 ns/op 6 B/op 0 allocs/op
BenchmarkTaskPool_RandomTask/AddTaskAlways-8 3565921 333 ns/op 21 B/op 0 allocs/op
BenchmarkTaskPool_RandomTask/AddTaskBalance-8 1487217 839 ns/op 17 B/op 0 allocs/op
PASS
*/
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment