Commit 8471e55d authored by AlexStocks's avatar AlexStocks

Mod: use goext goroutine pool

parent 9785c0d0
package getty
import (
"fmt"
"time"
)
var ErrScheduleTimeout = fmt.Errorf("schedule error: timed out")
type Pool struct {
sem chan struct{}
work chan func()
}
func NewPool(size, queue, spawn int) *Pool {
if spawn <= 0 && queue > 0 {
panic("dead queue configuration detected")
}
if spawn > size {
panic("spawn > workers")
}
p := &Pool{
sem: make(chan struct{}, size),
work: make(chan func(), queue),
}
for i := 0; i < spawn; i++ {
p.sem <- struct{}{}
go p.worker(func() {})
}
return p
}
func (p *Pool) ScheduleTimeout(timeout time.Duration, task func()) error {
return p.schedule(task, time.After(timeout))
}
func (p *Pool) schedule(task func(), timeout <-chan time.Time) error {
select {
case <-timeout:
return ErrScheduleTimeout
case p.work <- task:
return nil
case p.sem <- struct{}{}:
go p.worker(task)
return nil
}
}
func (p *Pool) worker(task func()) {
defer func() { <-p.sem }()
task()
for task := range p.work {
task()
}
}
......@@ -20,19 +20,24 @@ import (
)
import (
"github.com/AlexStocks/goext/context"
"github.com/AlexStocks/goext/sync"
"github.com/AlexStocks/goext/time"
log "github.com/AlexStocks/log4go"
"github.com/gorilla/websocket"
jerrors "github.com/juju/errors"
)
import (
"github.com/AlexStocks/goext/context"
"github.com/AlexStocks/goext/runtime"
"github.com/AlexStocks/goext/sync"
"github.com/AlexStocks/goext/time"
)
const (
maxReadBufLen = 4 * 1024
netIOTimeout = 1e9 // 1s
period = 60 * 1e9 // 1 minute
pendingDuration = 3e9
grPoolIdle = 10e9
defaultSessionName = "session"
defaultTCPSessionName = "tcp-session"
defaultUDPSessionName = "udp-session"
......@@ -79,7 +84,8 @@ type session struct {
grNum int32
lock sync.RWMutex
pool *Pool
// goroutine pool
pool *gxruntime.Pool
}
func newSession(endPoint EndPoint, conn Connection) *session {
......@@ -92,6 +98,7 @@ func newSession(endPoint EndPoint, conn Connection) *session {
period: period,
wait: pendingDuration,
attrs: gxcontext.NewValuesContext(nil),
pool: gxruntime.NewGoroutinePool(grPoolIdle),
}
ss.Connection.setSession(ss)
......@@ -251,7 +258,6 @@ func (s *session) SetRQLen(readQLen int) {
s.lock.Lock()
s.rQ = make(chan interface{}, readQLen)
log.Info("%s, [session.SetRQLen] rQ{len:%d, cap:%d}", s.Stat(), len(s.rQ), cap(s.rQ))
s.pool = NewPool(readQLen/2, 2, 1)
s.lock.Unlock()
}
......@@ -440,10 +446,8 @@ func (s *session) handleLoop() {
wsConn *gettyWSConn
// start time.Time
counter gxtime.CountWatch
// ticker *time.Ticker // use wheel instead, 2016/09/26
inPkg interface{}
outPkg interface{}
// once sync.Once // use wheel instead, 2016/09/26
inPkg interface{}
outPkg interface{}
)
defer func() {
......@@ -466,7 +470,6 @@ func (s *session) handleLoop() {
flag = true // do not do any read/Write/cron operation while got Write error
wsConn, wsFlag = s.Connection.(*gettyWSConn)
// ticker = time.NewTicker(s.period) // use wheel instead, 2016/09/26
LOOP:
for {
// A select blocks until one of its cases can run, then it executes that case.
......@@ -474,7 +477,6 @@ LOOP:
select {
case <-s.done:
// this case branch assure the (session)handleLoop gr will exit before (session)handlePackage gr.
// once.Do(func() { ticker.Stop() }) // use wheel instead, 2016/09/26
if atomic.LoadInt32(&(s.grNum)) == 1 { // make sure @(session)handlePackage goroutine has been closed.
if len(s.rQ) == 0 && len(s.wQ) == 0 {
log.Info("%s, [session.handleLoop] got done signal. Both rQ and wQ are nil.", s.Stat())
......@@ -493,7 +495,7 @@ LOOP:
if flag {
log.Debug("%#v <-s.rQ", inPkg)
pkg := inPkg
s.pool.ScheduleTimeout(s.wait, func() {
s.pool.Go(func() {
s.listener.OnMessage(s, pkg)
})
s.incReadPkgNum()
......@@ -510,25 +512,24 @@ LOOP:
// break LOOP
}
s.incWritePkgNum()
// gxlog.CError("outPkg:%#v, after incWritePkgNum, ss:%s", outPkg, s.Stat())
} else {
log.Info("[session.handleLoop] drop writeout package{%#v}", outPkg)
}
// case <-ticker.C: // use wheel instead, 2016/09/26
case <-wheel.After(s.period):
if flag {
if wsFlag {
err = wsConn.writePing()
if err != nil {
log.Warn("wsConn.writePing() = error{%s}", err)
s.pool.Go(func() {
if wsFlag {
err := wsConn.writePing()
if err != nil {
log.Warn("wsConn.writePing() = error{%s}", err)
}
}
}
s.listener.OnCron(s)
s.listener.OnCron(s)
})
}
}
}
// once.Do(func() { ticker.Stop() }) // use wheel instead, 2016/09/26
}
func (s *session) handlePackage() {
......@@ -783,6 +784,7 @@ func (s *session) stop() {
conn.SetWriteDeadline(now.Add(s.writeTimeout()))
}
close(s.done)
s.pool.Close()
})
}
}
......@@ -800,8 +802,8 @@ func (s *session) gc() {
s.lock.Unlock()
}
// s function will be invoked by NewSessionCallback(if return error is not nil) or (session)handleLoop automatically.
// It is goroutine-safe to be invoked many times.
// Close will be invoked by NewSessionCallback(if return error is not nil) or (session)handleLoop automatically.
// It's thread safe.
func (s *session) Close() {
s.stop()
log.Info("%s closed now, its current gr num %d", s.sessionToken(), atomic.LoadInt32(&(s.grNum)))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment