Commit 1b4ba19b authored by alexstocks's avatar alexstocks

rewrite (Session)handlePackage && add CountWatcher

parent 5bc8c47b
...@@ -4,13 +4,20 @@ ...@@ -4,13 +4,20 @@
## introdction ## ## introdction ##
--- ---
> DESC : a asynchronous network I/O library in golang > DESC : a asynchronous network I/O library in golang. In getty there are two goroutines in one connection(session), one handle network read buffer tcp stream, the other handle logic process and write response into network write buffer. If your logic process may take a long time, you should start a new logic process goroutine by yourself in (Codec):OnMessage.
> >
> LICENCE : Apache License 2.0 > LICENCE : Apache License 2.0
## develop history ## ## develop history ##
--- ---
- 2016/09/08
> 1 rewrite session.go:(Session)handlePackage() error handle logic
>
> 2 add utils.go:CountWatch
>
> 3 version: 0.3.06
- 2016/09/07 - 2016/09/07
> 1 session.go:(Session)Close() -> session.go:(Session)gc() to be invoked by session.go:(Session)handleLoop > 1 session.go:(Session)Close() -> session.go:(Session)gc() to be invoked by session.go:(Session)handleLoop
> >
......
...@@ -296,8 +296,11 @@ func (this *Session) WritePkg(pkg interface{}) error { ...@@ -296,8 +296,11 @@ func (this *Session) WritePkg(pkg interface{}) error {
} }
defer func() { defer func() {
if err := recover(); err != nil { if r := recover(); r != nil {
log.Error("%s [session.WritePkg] err=%+v\n", this.sessionToken(), err) const size = 64 << 10
rBuf := make([]byte, size)
rBuf = rBuf[:runtime.Stack(rBuf, false)]
log.Error("[session.handleLoop] panic session %s: err=%#v\n%s", this.sessionToken(), r, rBuf)
} }
}() }()
...@@ -346,7 +349,8 @@ func (this *Session) RunEventLoop() { ...@@ -346,7 +349,8 @@ func (this *Session) RunEventLoop() {
func (this *Session) handleLoop() { func (this *Session) handleLoop() {
var ( var (
err error err error
start time.Time // start time.Time
counter CountWatch
ticker *time.Ticker ticker *time.Ticker
inPkg interface{} inPkg interface{}
outPkg interface{} outPkg interface{}
...@@ -398,10 +402,12 @@ LOOP: ...@@ -398,10 +402,12 @@ LOOP:
<-this.readerDone <-this.readerDone
// process pending pkg // process pending pkg
start = time.Now() // start = time.Now()
counter.Start()
LAST: LAST:
for { for {
if time.Since(start).Nanoseconds() >= this.wait.Nanoseconds() { // if time.Since(start).Nanoseconds() >= this.wait.Nanoseconds() {
if counter.Count() > this.wait.Nanoseconds() {
break break
} }
...@@ -431,7 +437,8 @@ func (this *Session) handlePackage() { ...@@ -431,7 +437,8 @@ func (this *Session) handlePackage() {
ok bool ok bool
exit bool exit bool
errFlag bool errFlag bool
len int bufLen int
pkgLen int
buf []byte buf []byte
pktBuf *bytes.Buffer pktBuf *bytes.Buffer
pkg interface{} pkg interface{}
...@@ -461,39 +468,40 @@ func (this *Session) handlePackage() { ...@@ -461,39 +468,40 @@ func (this *Session) handlePackage() {
pktBuf = new(bytes.Buffer) pktBuf = new(bytes.Buffer)
for { for {
if this.IsClosed() { if this.IsClosed() {
exit = true break // 退出前不再读取任何packet,buf中剩余的stream bytes也不可能凑够一个package, 所以直接退出
} }
for { bufLen = 0
if exit { for { // for clause for the network timeout condition check
break // 退出前不再读取任何packet,跳到下个for-loop处理完pktBuf中的stream
}
this.conn.SetReadDeadline(time.Now().Add(this.rDeadline)) this.conn.SetReadDeadline(time.Now().Add(this.rDeadline))
len, err = this.read(buf) bufLen, err = this.read(buf)
if err != nil { if err != nil {
if nerr, ok = err.(net.Error); ok && nerr.Timeout() { if nerr, ok = err.(net.Error); ok && nerr.Timeout() {
break break
} }
log.Error("%s, [session.conn.read] = error{%v}", this.sessionToken(), err) log.Error("%s, [session.conn.read] = error{%v}", this.sessionToken(), err)
// 遇到网络错误的时候,handlePackage能够及时退出,但是handleLoop的第一个for-select因为要处理(Codec)OnMessage // for (Codec)OnErr
// 导致程序不能及时退出,此处添加(Codec)OnError调用以及时通知getty调用者
// AS, 2016/08/21
errFlag = true errFlag = true
exit = true exit = true
} }
break break
} }
if 0 < len { if exit {
pktBuf.Write(buf[:len]) break
}
if 0 == bufLen {
continue // just continue if connection has read no more stream bytes.
} }
pktBuf.Write(buf[:bufLen])
for { for {
if pktBuf.Len() <= 0 { if pktBuf.Len() <= 0 {
break break
} }
// pkg, err = this.pkgHandler.Read(this, pktBuf) // pkg, err = this.pkgHandler.Read(this, pktBuf)
pkg, len, err = this.pkgHandler.Read(this, pktBuf.Bytes()) pkg, pkgLen, err = this.pkgHandler.Read(this, pktBuf.Bytes())
if err != nil { if err != nil {
log.Info("%s, [session.pkgHandler.Read] = error{%+v}", this.sessionToken(), err) log.Info("%s, [session.pkgHandler.Read] = error{%+v}", this.sessionToken(), err)
// for (Codec)OnErr
errFlag = true errFlag = true
exit = true exit = true
break break
...@@ -502,7 +510,7 @@ func (this *Session) handlePackage() { ...@@ -502,7 +510,7 @@ func (this *Session) handlePackage() {
break break
} }
this.rQ <- pkg this.rQ <- pkg
pktBuf.Next(len) pktBuf.Next(pkgLen)
} }
if exit { if exit {
break break
......
...@@ -12,9 +12,26 @@ package getty ...@@ -12,9 +12,26 @@ package getty
import ( import (
"net" "net"
"strconv" "strconv"
"time"
) )
// HostAddress composes a ip:port style address. Its opposite function is net.SplitHostPort. // HostAddress composes a ip:port style address. Its opposite function is net.SplitHostPort.
func HostAddress(host string, port int) string { func HostAddress(host string, port int) string {
return net.JoinHostPort(host, strconv.Itoa(port)) return net.JoinHostPort(host, strconv.Itoa(port))
} }
type CountWatch struct {
start time.Time
}
func (w *CountWatch) Start() {
w.start = time.Now()
}
func (w *CountWatch) Reset() {
w.start = time.Now()
}
func (w *CountWatch) Count() int64 {
return time.Since(w.start).Nanoseconds()
}
...@@ -10,5 +10,5 @@ ...@@ -10,5 +10,5 @@
package getty package getty
var ( var (
Version = "0.3.05" Version = "0.3.06"
) )
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment