Commit 89139c56 authored by alexstocks's avatar alexstocks

delete wg of session. use grNum instread

parent d96fdbea
...@@ -13,9 +13,13 @@ ...@@ -13,9 +13,13 @@
## develop history ## ## develop history ##
--- ---
- 2016/08/24
> delete session.go:Session:wg(atomic.WaitGroup). Add session.go:Session:grNum instead to prevent from (Session)Close() block on session.go:Session:wg.Wait()
> version: 0.2.05
- 2016/08/23 - 2016/08/23
> do not consider empty package as a error in (Session)handlePackage > do not consider empty package as a error in (Session)handlePackage
> version: 0.2.04 > version: 0.2.04
- 2016/08/22 - 2016/08/22
> rename (Session)OnIdle to (Session)OnCron > rename (Session)OnIdle to (Session)OnCron
......
...@@ -29,7 +29,7 @@ const ( ...@@ -29,7 +29,7 @@ const (
cronPeriod = 60 * 1e9 // 1 minute cronPeriod = 60 * 1e9 // 1 minute
pendingDuration = 3e9 pendingDuration = 3e9
defaultSessionName = "Session" defaultSessionName = "Session"
outputFormat = "%s:%d:%s, Read Count: %d, Write Count: %d, Read Pkg Count: %d, Write Pkg Count: %d" outputFormat = "session %s, Read Count: %d, Write Count: %d, Read Pkg Count: %d, Write Pkg Count: %d"
) )
var ( var (
...@@ -97,7 +97,7 @@ type Session struct { ...@@ -97,7 +97,7 @@ type Session struct {
// attribute // attribute
attrs map[string]interface{} attrs map[string]interface{}
// goroutines sync // goroutines sync
wg sync.WaitGroup grNum int32
lock sync.RWMutex lock sync.RWMutex
} }
...@@ -120,9 +120,7 @@ func (this *Session) Conn() net.Conn { return this.conn } ...@@ -120,9 +120,7 @@ func (this *Session) Conn() net.Conn { return this.conn }
func (this *Session) Stat() string { func (this *Session) Stat() string {
return fmt.Sprintf( return fmt.Sprintf(
outputFormat, outputFormat,
this.name, this.sessionToken(),
this.Id,
this.conn.RemoteAddr().String(),
atomic.LoadUint32(&(this.readCount)), atomic.LoadUint32(&(this.readCount)),
atomic.LoadUint32(&(this.writeCount)), atomic.LoadUint32(&(this.writeCount)),
atomic.LoadUint32(&(this.readPkgCount)), atomic.LoadUint32(&(this.readPkgCount)),
...@@ -247,7 +245,17 @@ func (this *Session) stop() { ...@@ -247,7 +245,17 @@ func (this *Session) stop() {
func (this *Session) Close() { func (this *Session) Close() {
this.stop() this.stop()
this.wg.Wait() log.Info("%s closed now, its current gr num %d",
this.sessionToken(), atomic.LoadInt32(&(this.grNum)))
}
func (this *Session) sessionToken() string {
return fmt.Sprintf(
"%s:%d:%s:%s",
this.name, this.Id,
this.conn.LocalAddr().String(),
this.conn.RemoteAddr().String(),
)
} }
// Queued write, for handler // Queued write, for handler
...@@ -258,8 +266,7 @@ func (this *Session) WritePkg(pkg interface{}) error { ...@@ -258,8 +266,7 @@ func (this *Session) WritePkg(pkg interface{}) error {
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
log.Error("%s:%d:%s [session.WritePkg] err=%+v\n", log.Error("%s [session.WritePkg] err=%+v\n", this.sessionToken(), err)
this.name, this.Id, this.conn.RemoteAddr().String(), err)
} }
}() }()
...@@ -298,10 +305,8 @@ func (this *Session) RunEventloop() { ...@@ -298,10 +305,8 @@ func (this *Session) RunEventloop() {
panic(errStr) panic(errStr)
} }
this.wg.Add(1) atomic.AddInt32(&(this.grNum), 2)
go this.handleLoop() go this.handleLoop()
this.wg.Add(1)
go this.handlePackage() go this.handlePackage()
} }
...@@ -315,20 +320,19 @@ func (this *Session) handleLoop() { ...@@ -315,20 +320,19 @@ func (this *Session) handleLoop() {
) )
defer func() { defer func() {
var grNum int32
if err := recover(); err != nil { if err := recover(); err != nil {
log.Error("%s:%d:%s [session.handleLoop] err=%+v\n", log.Error("%s, [session.handleLoop] err=%+v\n", this.sessionToken(), err)
this.name, this.Id, this.conn.RemoteAddr().String(), err)
} }
close(this.rspQ) close(this.rspQ)
close(this.reqQ) close(this.reqQ)
this.wg.Done() grNum = atomic.AddInt32(&(this.grNum), -1)
if this.listener != nil { if this.listener != nil {
this.listener.OnClose(this) this.listener.OnClose(this)
} }
// real close connection, dispose会调用(Session)Close,(Session)Close中的wg.Wait多次调用并不会引起任何问题 // real close connection, dispose会调用(conn)Close,
this.dispose() this.dispose()
log.Info("%s:%d:%s [session.handleLoop] goroutine end, statistic{%s}", log.Info("statistic{%s}, [session.handleLoop] goroutine exit now, left gr num %d", this.Stat(), grNum)
this.name, this.Id, this.conn.RemoteAddr().String(), this.Stat())
}() }()
// call session opened // call session opened
...@@ -341,8 +345,7 @@ LOOP: ...@@ -341,8 +345,7 @@ LOOP:
for { for {
select { select {
case <-this.done: case <-this.done:
log.Info("%s:%d:%s [session.handleLoop] got done signal ", log.Info("%s, [session.handleLoop] got done signal ", this.Stat())
this.name, this.Id, this.conn.RemoteAddr().String())
break LOOP break LOOP
case reqPkg = <-this.reqQ: case reqPkg = <-this.reqQ:
if this.listener != nil { if this.listener != nil {
...@@ -351,8 +354,7 @@ LOOP: ...@@ -351,8 +354,7 @@ LOOP:
} }
case rspPkg = <-this.rspQ: case rspPkg = <-this.rspQ:
if err = this.pkgHandler.Write(this, rspPkg); err != nil { if err = this.pkgHandler.Write(this, rspPkg); err != nil {
log.Error("%s:%d:%s [session.handleLoop] = error{%+v}", log.Error("%s, [session.handleLoop] = error{%+v}", this.sessionToken(), err)
this.name, this.Id, this.conn.RemoteAddr().String(), err)
break LOOP break LOOP
} }
this.incWritePkgCount() this.incWritePkgCount()
...@@ -372,7 +374,7 @@ LOOP: ...@@ -372,7 +374,7 @@ LOOP:
start = time.Now() start = time.Now()
LAST: LAST:
for { for {
if time.Since(start).Nanoseconds() > this.closeWait.Nanoseconds() { if time.Since(start).Nanoseconds() >= this.closeWait.Nanoseconds() {
break break
} }
...@@ -388,7 +390,7 @@ LAST: ...@@ -388,7 +390,7 @@ LAST:
this.listener.OnMessage(this, reqPkg) this.listener.OnMessage(this, reqPkg)
} }
default: default:
log.Info("%s:%d:%s [session.handleLoop] default", this.name, this.Id, this.conn.RemoteAddr().String()) log.Info("%s, [session.handleLoop] default", this.sessionToken())
break LAST break LAST
} }
} }
...@@ -397,26 +399,30 @@ LAST: ...@@ -397,26 +399,30 @@ LAST:
// get package from tcp stream(packet) // get package from tcp stream(packet)
func (this *Session) handlePackage() { func (this *Session) handlePackage() {
var ( var (
err error err error
nerr net.Error nerr net.Error
ok bool ok bool
exit bool exit bool
len int reconnect bool
buf []byte len int
pktBuf *bytes.Buffer buf []byte
pkg interface{} pktBuf *bytes.Buffer
pkg interface{}
) )
defer func() { defer func() {
var grNum int32
if err := recover(); err != nil { if err := recover(); err != nil {
log.Error("%s:%d:%s [session.handlePackage] = err{%+v}", log.Error("%s, [session.handlePackage] = err{%+v}", this.sessionToken(), err)
this.name, this.Id, this.conn.RemoteAddr().String(), err)
} }
this.wg.Done()
this.stop() this.stop()
close(this.readerDone) close(this.readerDone)
log.Info("%s:%d:%s [session.handlePackage] goroutine exit......", grNum = atomic.AddInt32(&(this.grNum), -1)
this.name, this.Id, this.conn.RemoteAddr().String()) log.Info("%s, [session.handlePackage] gr will exit now, left gr num %d", this.sessionToken(), grNum)
if reconnect && this.listener != nil {
log.Info("%s, [session.handlePackage] reconnect", this.sessionToken())
this.listener.OnError(this, nerr)
}
}() }()
buf = make([]byte, maxReadBufLen) buf = make([]byte, maxReadBufLen)
...@@ -436,14 +442,11 @@ func (this *Session) handlePackage() { ...@@ -436,14 +442,11 @@ func (this *Session) handlePackage() {
if nerr, ok = err.(net.Error); ok && nerr.Timeout() { if nerr, ok = err.(net.Error); ok && nerr.Timeout() {
break break
} }
log.Error("%s:%d:%s [session.conn.read] = error{%s}", log.Error("%s, [session.conn.read] = error{%v}", this.sessionToken(), err)
this.name, this.Id, this.conn.RemoteAddr().String(), err.Error())
// 遇到网络错误的时候,handlePackage能够及时退出,但是handleLoop的第一个for-select因为要处理(Codec)OnMessage // 遇到网络错误的时候,handlePackage能够及时退出,但是handleLoop的第一个for-select因为要处理(Codec)OnMessage
// 导致程序不能及时退出,此处添加(Codec)OnError调用以及时通知getty调用者 // 导致程序不能及时退出,此处添加(Codec)OnError调用以及时通知getty调用者
// AS, 2016/08/21 // AS, 2016/08/21
if this.listener != nil { reconnect = true
this.listener.OnError(this, nerr)
}
exit = true exit = true
} }
break break
...@@ -457,8 +460,7 @@ func (this *Session) handlePackage() { ...@@ -457,8 +460,7 @@ func (this *Session) handlePackage() {
} }
pkg, err = this.pkgHandler.Read(this, pktBuf) pkg, err = this.pkgHandler.Read(this, pktBuf)
if err != nil { if err != nil {
log.Info("%s:%d:%s [session.pkgHandler.Read] = error{%+v}", log.Info("%s, [session.pkgHandler.Read] = error{%+v}", this.sessionToken(), err)
this.name, this.Id, this.conn.RemoteAddr().String(), err)
exit = true exit = true
break break
} }
......
...@@ -10,5 +10,5 @@ ...@@ -10,5 +10,5 @@
package getty package getty
var ( var (
Version = "0.2.04" Version = "0.2.05"
) )
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment