Commit 4a0f0138 authored by alexstocks's avatar alexstocks

delete session.go:(Session)handleLoop Last clause

parent e631ec31
......@@ -13,6 +13,8 @@
- 2016/09/09
> 1 delete session.go:(Session)readerDone
>
> 2 delete session.go:(Session)handlePackage Last clause
>
> 3 version: 0.3.07
- 2016/09/08
......
......@@ -348,12 +348,14 @@ func (this *Session) RunEventLoop() {
func (this *Session) handleLoop() {
var (
err error
err error
flag bool
// start time.Time
counter CountWatch
ticker *time.Ticker
inPkg interface{}
outPkg interface{}
once sync.Once
)
defer func() {
......@@ -372,62 +374,49 @@ func (this *Session) handleLoop() {
this.gc()
}()
flag = true // do not do any read/write/cron operation while got write error
ticker = time.NewTicker(this.peroid)
LOOP:
for {
select {
case <-this.done:
// 这个分支确保(Session)handleLoop gr在(Session)handlePackage gr之后退出
once.Do(func() { ticker.Stop() })
log.Info("%s, [session.handleLoop] got done signal ", this.Stat())
break LOOP
if atomic.LoadInt32(&(this.grNum)) == 1 { // make sure @(Session)handlePackage goroutine has been closed.
counter.Start()
// if time.Since(start).Nanoseconds() >= this.wait.Nanoseconds() {
if counter.Count() > this.wait.Nanoseconds() {
break LOOP
}
}
case inPkg = <-this.rQ:
this.listener.OnMessage(this, inPkg)
this.incReadPkgCount()
// 这个条件分支通过(Session)rQ排空确保(Session)handlePackage gr不会阻塞在(Session)rQ上
if flag {
this.listener.OnMessage(this, inPkg)
this.incReadPkgCount()
}
case outPkg = <-this.wQ:
if err = this.pkgHandler.Write(this, outPkg); err != nil {
log.Error("%s, [session.handleLoop] = error{%+v}", this.sessionToken(), err)
this.stop()
break LOOP
if flag {
if err = this.pkgHandler.Write(this, outPkg); err != nil {
log.Error("%s, [session.handleLoop] = error{%+v}", this.sessionToken(), err)
this.stop()
flag = false
// break LOOP
}
this.incWritePkgCount()
}
this.incWritePkgCount()
case <-ticker.C:
this.listener.OnCron(this)
}
}
ticker.Stop()
// wait for reader goroutine closed
// <-this.readerDone
// process pending pkg
// start = time.Now()
counter.Start()
LAST:
for {
if atomic.LoadInt32(&(this.grNum)) == 1 {
counter.Start()
// if time.Since(start).Nanoseconds() >= this.wait.Nanoseconds() {
if counter.Count() > this.wait.Nanoseconds() {
break
}
}
select {
case outPkg = <-this.wQ:
if err = this.pkgHandler.Write(this, outPkg); err != nil {
break LAST
if flag {
this.listener.OnCron(this)
}
this.incWritePkgCount()
case inPkg = <-this.rQ:
this.listener.OnMessage(this, inPkg)
this.incReadPkgCount()
default:
log.Info("%s, [session.handleLoop] default", this.sessionToken())
break LAST
}
}
once.Do(func() { ticker.Stop() })
// ticker.Stop()
}
// get package from tcp stream(packet)
......
......@@ -10,5 +10,5 @@
package getty
var (
Version = "0.3.06"
Version = "0.3.07"
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment