Commit 79c4682d authored by alexstocks's avatar alexstocks

let socket read/write timeout asas when got error

parent 34ba12b2
......@@ -34,8 +34,12 @@ var (
type iConn interface {
incReadPkgCount()
incWritePkgCount()
UpdateActive()
GetActive() time.Time
updateActive()
getActive() time.Time
readDeadline() time.Duration
setReadDeadline(time.Duration)
writeDeadline() time.Duration
setWriteDeadline(time.Duration)
write(p []byte) error
// don't distinguish between tcp connection and websocket connection. Because
// gorilla/websocket/conn.go:(Conn)Close also invoke net.Conn.Close
......@@ -52,12 +56,14 @@ var (
type gettyConn struct {
ID uint32
padding uint32 // last active, in milliseconds
readCount uint32 // read() count
writeCount uint32 // write() count
readPkgCount uint32 // send pkg count
writePkgCount uint32 // recv pkg count
active int64 // active
padding uint32 // last active, in milliseconds
readCount uint32 // read() count
writeCount uint32 // write() count
readPkgCount uint32 // send pkg count
writePkgCount uint32 // recv pkg count
active int64 // active
rDeadline time.Duration // network current limiting
wDeadline time.Duration
local string // local address
peer string // peer address
}
......@@ -70,11 +76,11 @@ func (this *gettyConn) incWritePkgCount() {
atomic.AddUint32(&this.writePkgCount, 1)
}
func (this *gettyConn) UpdateActive() {
func (this *gettyConn) updateActive() {
atomic.StoreInt64(&(this.active), int64(time.Since(launchTime)))
}
func (this *gettyConn) GetActive() time.Time {
func (this *gettyConn) getActive() time.Time {
return launchTime.Add(time.Duration(atomic.LoadInt64(&(this.active))))
}
......@@ -84,6 +90,33 @@ func (this *gettyConn) write([]byte) error {
func (this *gettyConn) close(int) {}
func (this gettyConn) readDeadline() time.Duration {
return this.rDeadline
}
func (this *gettyConn) setReadDeadline(rDeadline time.Duration) {
if rDeadline < 1 {
panic("@rDeadline < 1")
}
this.rDeadline = rDeadline
if this.wDeadline == 0 {
this.wDeadline = rDeadline
}
}
func (this gettyConn) writeDeadline() time.Duration {
return this.wDeadline
}
func (this *gettyConn) setWriteDeadline(wDeadline time.Duration) {
if wDeadline < 1 {
panic("@wDeadline < 1")
}
this.wDeadline = wDeadline
}
/////////////////////////////////////////
// getty tcp connection
/////////////////////////////////////////
......@@ -124,8 +157,9 @@ func (this *gettyTCPConn) read(p []byte) (int, error) {
// }
// atomic.AddUint32(&this.readCount, 1)
atomic.AddUint32(&this.readCount, (uint32)(len(p)))
return this.conn.Read(p)
l, e := this.conn.Read(p)
atomic.AddUint32(&this.readCount, uint32(l))
return l, e
}
// tcp connection write
......@@ -198,19 +232,20 @@ func (this *gettyWSConn) handlePing(message string) error {
err = nil
}
if err == nil {
this.UpdateActive()
this.updateActive()
}
return err
}
func (this *gettyWSConn) handlePong(string) error {
this.UpdateActive()
this.updateActive()
return nil
}
// websocket connection read
func (this *gettyWSConn) read() ([]byte, error) {
// this.conn.SetReadDeadline(time.Now().Add(this.rDeadline))
_, b, e := this.conn.ReadMessage()
if e == nil {
// atomic.AddUint32(&this.readCount, (uint32)(l))
......@@ -224,6 +259,7 @@ func (this *gettyWSConn) read() ([]byte, error) {
func (this *gettyWSConn) write(p []byte) error {
// atomic.AddUint32(&this.writeCount, 1)
atomic.AddUint32(&this.writeCount, (uint32)(len(p)))
// this.conn.SetWriteDeadline(time.Now().Add(this.wDeadline))
return this.conn.WriteMessage(websocket.BinaryMessage, p)
}
......@@ -233,6 +269,7 @@ func (this *gettyWSConn) writePing() error {
// close websocket connection
func (this *gettyWSConn) close(waitSec int) {
this.conn.WriteMessage(websocket.CloseMessage, []byte("bye-bye!!!"))
this.conn.UnderlyingConn().(*net.TCPConn).SetLinger(waitSec)
this.conn.Close()
}
......@@ -15,6 +15,12 @@
> 1 add conn.go:(gettyWSConn)handlePing
>
> 2 add conn.go:(gettyWSConn)handlePong
>
> 3 set read/write timeout in session.go:(Session)stop to let read/write timeout asap
>
> 4 fix bug: websocket block on session.go:(Session)handleWSPkg when got error. set read/write timeout asap to solve this problem.
>
> 5 version: 0.4.04
- 2016/10/13
> 1 add conn.go:(gettyWSConn)writePing which is invoked automatically in session.go:(Session)handleLoop
......
......@@ -203,7 +203,7 @@ func (this *Server) RunWSEventLoop(newSession NewSessionCallback, path string) {
}).Serve(this.listener)
if err != nil {
log.Error("http.Server.Serve(addr{%s}) = err{%#v}", this.addr, err)
panic(err)
// panic(err)
}
}()
}
......
......@@ -28,7 +28,7 @@ import (
const (
maxReadBufLen = 4 * 1024
netIOTimeout = 100e6 // 100ms
netIOTimeout = 1e9 // 1s
period = 60 * 1e9 // 1 minute
pendingDuration = 3e9
defaultSessionName = "Session"
......@@ -65,12 +65,10 @@ type Session struct {
done chan empty
// errFlag bool
period time.Duration
rDeadline time.Duration // network current limiting
wDeadline time.Duration
wait time.Duration
rQ chan interface{}
wQ chan interface{}
period time.Duration
wait time.Duration
rQ chan interface{}
wQ chan interface{}
// attribute
attrs map[string]interface{}
......@@ -79,42 +77,54 @@ type Session struct {
lock sync.RWMutex
}
// if nerr, ok = err.(net.Error); ok && nerr.Timeout() {
// break
// }
func NewSession() *Session {
return &Session{
name: defaultSessionName,
done: make(chan empty),
period: period,
rDeadline: netIOTimeout,
wDeadline: netIOTimeout,
wait: pendingDuration,
attrs: make(map[string]interface{}),
session := &Session{
name: defaultSessionName,
done: make(chan empty),
period: period,
wait: pendingDuration,
attrs: make(map[string]interface{}),
}
session.setWriteDeadline(netIOTimeout)
session.setReadDeadline(netIOTimeout)
return session
}
func NewTCPSession(conn net.Conn) *Session {
return &Session{
name: defaultSessionName,
iConn: newGettyTCPConn(conn),
done: make(chan empty),
period: period,
rDeadline: netIOTimeout,
wDeadline: netIOTimeout,
wait: pendingDuration,
attrs: make(map[string]interface{}),
session := &Session{
name: defaultSessionName,
iConn: newGettyTCPConn(conn),
done: make(chan empty),
period: period,
wait: pendingDuration,
attrs: make(map[string]interface{}),
}
session.setWriteDeadline(netIOTimeout)
session.setReadDeadline(netIOTimeout)
return session
}
func NewWSSession(conn *websocket.Conn) *Session {
return &Session{
name: defaultSessionName,
iConn: newGettyWSConn(conn),
done: make(chan empty),
period: period,
rDeadline: netIOTimeout,
wDeadline: netIOTimeout,
wait: pendingDuration,
attrs: make(map[string]interface{}),
session := &Session{
name: defaultSessionName,
iConn: newGettyWSConn(conn),
done: make(chan empty),
period: period,
wait: pendingDuration,
attrs: make(map[string]interface{}),
}
session.setWriteDeadline(netIOTimeout)
session.setReadDeadline(netIOTimeout)
return session
}
func (this *Session) Reset() {
......@@ -123,11 +133,12 @@ func (this *Session) Reset() {
this.done = make(chan empty)
// this.errFlag = false
this.period = period
this.rDeadline = netIOTimeout
this.wDeadline = netIOTimeout
this.wait = pendingDuration
this.attrs = make(map[string]interface{})
this.grNum = 0
this.setWriteDeadline(netIOTimeout)
this.setReadDeadline(netIOTimeout)
}
// func (this *Session) SetConn(conn net.Conn) { this.gettyConn = newGettyConn(conn) }
......@@ -241,26 +252,15 @@ func (this *Session) SetWQLen(writeQLen int) {
// SetReadDeadline sets deadline for the future read calls.
func (this *Session) SetReadDeadline(rDeadline time.Duration) {
if rDeadline < 1 {
panic("@rDeadline < 1")
}
this.lock.Lock()
this.rDeadline = rDeadline
if this.wDeadline == 0 {
this.wDeadline = rDeadline
}
this.setReadDeadline(rDeadline)
this.lock.Unlock()
}
// SetWriteDeadlile sets deadline for the future read calls.
func (this *Session) SetWriteDeadline(wDeadline time.Duration) {
if wDeadline < 1 {
panic("@wDeadline < 1")
}
this.lock.Lock()
this.wDeadline = wDeadline
this.setWriteDeadline(wDeadline)
this.lock.Unlock()
}
......@@ -295,6 +295,14 @@ func (this *Session) RemoveAttribute(key string) {
this.lock.Unlock()
}
func (this *Session) UpdateActive() {
this.updateActive()
}
func (this *Session) GetActive() time.Time {
return this.getActive()
}
func (this *Session) sessionToken() string {
var conn *gettyConn
if conn = this.gettyConn(); conn == nil {
......@@ -319,7 +327,7 @@ func (this *Session) WritePkg(pkg interface{}) error {
}
}()
var d = this.wDeadline
var d = this.writeDeadline()
if d > netIOTimeout {
d = netIOTimeout
}
......@@ -388,7 +396,7 @@ func (this *Session) RunEventLoop() {
}
// call session opened
this.UpdateActive()
this.updateActive()
if err := this.listener.OnOpen(this); err != nil {
this.Close()
return
......@@ -533,8 +541,9 @@ func (this *Session) handlePackage() {
// get package from tcp stream(packet)
func (this *Session) handleTCPPackage() error {
var (
err error
// nerr net.Error
ok bool
err error
nerr net.Error
conn *gettyTCPConn
exit bool
bufLen int
......@@ -554,13 +563,14 @@ func (this *Session) handleTCPPackage() error {
}
bufLen = 0
for { // for clause for the network timeout condition check
for {
// for clause for the network timeout condition check
// this.conn.SetReadDeadline(time.Now().Add(this.rDeadline))
bufLen, err = conn.read(buf)
if err != nil {
// if nerr, ok = err.(net.Error); ok && nerr.Timeout() {
// break
// }
if nerr, ok = err.(net.Error); ok && nerr.Timeout() {
break
}
log.Error("%s, [session.conn.read] = error{%v}", this.sessionToken(), err)
// for (Codec)OnErr
// this.errFlag = true
......@@ -594,7 +604,7 @@ func (this *Session) handleTCPPackage() error {
if pkg == nil {
break
}
this.UpdateActive()
this.updateActive()
this.rQ <- pkg
pktBuf.Next(pkgLen)
}
......@@ -609,7 +619,9 @@ func (this *Session) handleTCPPackage() error {
// get package from websocket stream
func (this *Session) handleWSPackage() error {
var (
ok bool
err error
nerr net.Error
length int
conn *gettyWSConn
pkg []byte
......@@ -619,16 +631,18 @@ func (this *Session) handleWSPackage() error {
conn = this.iConn.(*gettyWSConn)
for {
if this.IsClosed() {
break // 退出前不再读取任何packet,buf中剩余的stream bytes也不可能凑够一个package, 所以直接退出
break
}
pkg, err = conn.read()
if nerr, ok = err.(net.Error); ok && nerr.Timeout() {
continue
}
if err != nil {
log.Warn("%s, [session.handleWSPackage] = error{%+v}", this.sessionToken(), err)
// this.errFlag = true
return err
}
this.UpdateActive()
this.updateActive()
if this.reader != nil {
unmarshalPkg, length, err = this.reader.Read(this, pkg)
if err == nil && this.maxMsgLen > 0 && length > this.maxMsgLen {
......@@ -636,9 +650,9 @@ func (this *Session) handleWSPackage() error {
}
if err != nil {
log.Warn("%s, [session.handleWSPackage] = len{%d}, error{%+v}", this.sessionToken(), length, err)
} else {
this.rQ <- unmarshalPkg
continue
}
this.rQ <- unmarshalPkg
} else {
this.rQ <- pkg
}
......@@ -653,7 +667,14 @@ func (this *Session) stop() {
return
default:
this.once.Do(func() { close(this.done) })
this.once.Do(func() {
// let read/write timeout asap
if conn := this.Conn(); conn != nil {
conn.SetReadDeadline(time.Now().Add(this.readDeadline()))
conn.SetWriteDeadline(time.Now().Add(this.writeDeadline()))
}
close(this.done)
})
}
}
......
......@@ -10,9 +10,9 @@
package getty
const (
Version = "0.4.03"
DATE = "2016/10/13"
Version = "0.4.04"
DATE = "2016/10/14"
GETTY_MAJOR = 0
GETTY_MINOR = 4
GETTY_BUILD = 3
GETTY_BUILD = 4
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment