Commit a021f778 authored by alexstocks's avatar alexstocks

add time wheel

parent f83552e6
...@@ -13,6 +13,10 @@ ...@@ -13,6 +13,10 @@
- 2016/09/26 - 2016/09/26
> 1 move utils.go's function to github.com/AlexStocks/goext and delete it > 1 move utils.go's function to github.com/AlexStocks/goext and delete it
>
> 2 use goext/time Wheel
>
> 3 version: 0.3.11
- 2016/09/20 - 2016/09/20
> 1 just invoke OnError when session got error > 1 just invoke OnError when session got error
......
...@@ -44,6 +44,7 @@ var ( ...@@ -44,6 +44,7 @@ var (
var ( var (
connID uint32 connID uint32
wheel *gxtime.Wheel = gxtime.NewWheel(gxtime.TimeSecondDuration(1), 3600) // wheel longest span is 1 hour
) )
type gettyConn struct { type gettyConn struct {
...@@ -317,7 +318,8 @@ func (this *Session) WritePkg(pkg interface{}) error { ...@@ -317,7 +318,8 @@ func (this *Session) WritePkg(pkg interface{}) error {
// default: // default:
// case <-time.After(this.wDeadline): // case <-time.After(this.wDeadline):
case <-time.After(netIOTimeout): // case <-time.After(netIOTimeout):
case <-wheel.After(this.wDeadline):
log.Warn("%s, [session.WritePkg] wQ{len:%d, cap:%d}", this.Stat(), len(this.wQ), cap(this.wQ)) log.Warn("%s, [session.WritePkg] wQ{len:%d, cap:%d}", this.Stat(), len(this.wQ), cap(this.wQ))
return ErrSessionBlocked return ErrSessionBlocked
} }
...@@ -363,10 +365,10 @@ func (this *Session) handleLoop() { ...@@ -363,10 +365,10 @@ func (this *Session) handleLoop() {
flag bool flag bool
// start time.Time // start time.Time
counter gxtime.CountWatch counter gxtime.CountWatch
ticker *time.Ticker // ticker *time.Ticker // use wheel instead, 2016/09/26
inPkg interface{} inPkg interface{}
outPkg interface{} outPkg interface{}
once sync.Once // once sync.Once // use wheel instead, 2016/09/26
) )
defer func() { defer func() {
...@@ -388,13 +390,13 @@ func (this *Session) handleLoop() { ...@@ -388,13 +390,13 @@ func (this *Session) handleLoop() {
}() }()
flag = true // do not do any read/write/cron operation while got write error flag = true // do not do any read/write/cron operation while got write error
ticker = time.NewTicker(this.period) // ticker = time.NewTicker(this.period) // use wheel instead, 2016/09/26
LOOP: LOOP:
for { for {
select { select {
case <-this.done: case <-this.done:
// 这个分支确保(Session)handleLoop gr在(Session)handlePackage gr之后退出 // 这个分支确保(Session)handleLoop gr在(Session)handlePackage gr之后退出
once.Do(func() { ticker.Stop() }) // once.Do(func() { ticker.Stop() }) // use wheel instead, 2016/09/26
if atomic.LoadInt32(&(this.grNum)) == 1 { // make sure @(Session)handlePackage goroutine has been closed. if atomic.LoadInt32(&(this.grNum)) == 1 { // make sure @(Session)handlePackage goroutine has been closed.
if len(this.rQ) == 0 && len(this.wQ) == 0 { if len(this.rQ) == 0 && len(this.wQ) == 0 {
log.Info("%s, [session.handleLoop] got done signal. Both rQ and wQ are nil.", this.Stat()) log.Info("%s, [session.handleLoop] got done signal. Both rQ and wQ are nil.", this.Stat())
...@@ -430,14 +432,14 @@ LOOP: ...@@ -430,14 +432,14 @@ LOOP:
log.Info("[session.handleLoop] drop writeout package{%#v}", outPkg) log.Info("[session.handleLoop] drop writeout package{%#v}", outPkg)
} }
case <-ticker.C: // case <-ticker.C: // use wheel instead, 2016/09/26
case <-wheel.After(this.period):
if flag { if flag {
this.listener.OnCron(this) this.listener.OnCron(this)
} }
} }
} }
once.Do(func() { ticker.Stop() }) // once.Do(func() { ticker.Stop() }) // use wheel instead, 2016/09/26
// ticker.Stop()
} }
// get package from tcp stream(packet) // get package from tcp stream(packet)
......
...@@ -10,8 +10,8 @@ ...@@ -10,8 +10,8 @@
package getty package getty
const ( const (
Version = "0.3.10" Version = "0.3.11"
GETTY_MAJOR = 0 GETTY_MAJOR = 0
GETTY_MINOR = 3 GETTY_MINOR = 3
GETTY_BUILD = 10 GETTY_BUILD = 11
) )
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment