Commit ba5c0707 authored by fangyincheng's avatar fangyincheng

Mod:use taskpool of dubbogogost/sync

parent cd6ab9c8
......@@ -16,6 +16,7 @@ import (
)
import (
gssync "github.com/dubbogo/gost/sync"
perrors "github.com/pkg/errors"
)
......@@ -156,7 +157,7 @@ type Session interface {
SetRQLen(int)
SetWQLen(int)
SetWaitTime(time.Duration)
SetTaskPool(*TaskPool)
SetTaskPool(*gssync.TaskPool)
GetAttribute(interface{}) interface{}
SetAttribute(interface{}, interface{})
......
module github.com/dubbogo/getty
require (
github.com/dubbogo/gost v1.0.1-0.20190706005735-65c3ecbba418
github.com/dubbogo/gost v1.1.1
github.com/golang/snappy v0.0.1
github.com/gorilla/websocket v1.4.0
github.com/pkg/errors v0.8.1
......
......@@ -2,6 +2,8 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dubbogo/gost v1.0.1-0.20190706005735-65c3ecbba418 h1:7OsAjhWpX0m6o1b/fcO47IF7FgpVv/qMSgHNk2orvIM=
github.com/dubbogo/gost v1.0.1-0.20190706005735-65c3ecbba418/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg=
github.com/dubbogo/gost v1.1.1 h1:JCM7vx5edPIjDA5ovJTuzEEXuw2t7xLyrlgi2mi5jHI=
github.com/dubbogo/gost v1.1.1/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
......
......@@ -10,10 +10,6 @@
package getty
import (
"fmt"
)
/////////////////////////////////////////
// Server Options
/////////////////////////////////////////
......@@ -113,54 +109,3 @@ func WithRootCertificateFile(cert string) ClientOption {
o.cert = cert
}
}
/////////////////////////////////////////
// Task Pool Options
/////////////////////////////////////////
type TaskPoolOptions struct {
tQLen int // task queue length
tQNumber int // task queue number
tQPoolSize int // task pool size
}
func (o *TaskPoolOptions) validate() {
if o.tQPoolSize < 1 {
panic(fmt.Sprintf("[getty][task_pool] illegal pool size %d", o.tQPoolSize))
}
if o.tQLen < 1 {
o.tQLen = defaultTaskQLen
}
if o.tQNumber < 1 {
o.tQNumber = defaultTaskQNumber
}
if o.tQNumber > o.tQPoolSize {
o.tQNumber = o.tQPoolSize
}
}
type TaskPoolOption func(*TaskPoolOptions)
// @size is the task queue pool size
func WithTaskPoolTaskPoolSize(size int) TaskPoolOption {
return func(o *TaskPoolOptions) {
o.tQPoolSize = size
}
}
// @length is the task queue length
func WithTaskPoolTaskQueueLength(length int) TaskPoolOption {
return func(o *TaskPoolOptions) {
o.tQLen = length
}
}
// @number is the task queue number
func WithTaskPoolTaskQueueNumber(number int) TaskPoolOption {
return func(o *TaskPoolOptions) {
o.tQNumber = number
}
}
......@@ -20,6 +20,7 @@ import (
)
import (
gssync "github.com/dubbogo/gost/sync"
gstime "github.com/dubbogo/gost/time"
"github.com/gorilla/websocket"
perrors "github.com/pkg/errors"
......@@ -72,7 +73,7 @@ type session struct {
// handle logic
maxMsgLen int32
// task queue
tPool *TaskPool
tPool *gssync.TaskPool
// heartbeat
period time.Duration
......@@ -310,7 +311,7 @@ func (s *session) SetWaitTime(waitTime time.Duration) {
}
// set task pool
func (s *session) SetTaskPool(p *TaskPool) {
func (s *session) SetTaskPool(p *gssync.TaskPool) {
s.lock.Lock()
defer s.lock.Unlock()
......@@ -573,7 +574,9 @@ LOOP:
func (s *session) addTask(pkg interface{}) {
if s.tPool != nil {
s.tPool.AddTask(task{session: s, pkg: pkg})
s.tPool.AddTask(func() {
s.listener.OnMessage(s, pkg)
})
} else {
s.rQ <- pkg
}
......
package getty
import (
"sync"
"sync/atomic"
)
const (
defaultTaskQNumber = 10
defaultTaskQLen = 128
)
// task t
type task struct {
session *session
pkg interface{}
}
// task pool: manage task ts
type TaskPool struct {
TaskPoolOptions
idx uint32 // round robin index
qArray []chan task
wg sync.WaitGroup
once sync.Once
done chan struct{}
}
// build a task pool
func NewTaskPool(opts ...TaskPoolOption) *TaskPool {
var tOpts TaskPoolOptions
for _, opt := range opts {
opt(&tOpts)
}
tOpts.validate()
p := &TaskPool{
TaskPoolOptions: tOpts,
qArray: make([]chan task, tOpts.tQNumber),
done: make(chan struct{}),
}
for i := 0; i < p.tQNumber; i++ {
p.qArray[i] = make(chan task, p.tQLen)
}
p.start()
return p
}
// start task pool
func (p *TaskPool) start() {
for i := 0; i < p.tQPoolSize; i++ {
p.wg.Add(1)
workerID := i
q := p.qArray[workerID%p.tQNumber]
go p.run(int(workerID), q)
}
}
// worker
func (p *TaskPool) run(id int, q chan task) {
defer p.wg.Done()
var (
ok bool
t task
)
for {
select {
case <-p.done:
if 0 < len(q) {
log.Warn("[getty][task_pool] task worker %d exit now while its task buffer length %d is greater than 0",
id, len(q))
} else {
log.Info("[getty][task_pool] task worker %d exit now", id)
}
return
case t, ok = <-q:
if ok {
t.session.listener.OnMessage(t.session, t.pkg)
}
}
}
}
// add task
func (p *TaskPool) AddTask(t task) {
id := atomic.AddUint32(&p.idx, 1) % uint32(p.tQNumber)
select {
case <-p.done:
return
case p.qArray[id] <- t:
}
}
// stop all tasks
func (p *TaskPool) stop() {
select {
case <-p.done:
return
default:
p.once.Do(func() {
close(p.done)
})
}
}
// check whether the session has been closed.
func (p *TaskPool) IsClosed() bool {
select {
case <-p.done:
return true
default:
return false
}
}
func (p *TaskPool) Close() {
p.stop()
p.wg.Wait()
for i := range p.qArray {
close(p.qArray[i])
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment