Commit 5eb2164e authored by AlexStocks's avatar AlexStocks

add read/write deadline

parent 03f1fd31
......@@ -11,6 +11,12 @@
## develop history ##
---
- 2017/05/02
> feature
* 1 set read/write deadline for every read/write action refers to fasthttp
> version: 0.7.04
- 2017/04/27
> bug fix
* 1 client connect wss server just using the cert file.
......
......@@ -94,6 +94,8 @@ type gettyConn struct {
active int64 // last active, in milliseconds
rDeadline time.Duration // network current limiting
wDeadline time.Duration
rLastDeadline time.Time // lastest network read time
wLastDeadline time.Time // lastest network write time
local string // local address
peer string // peer address
}
......@@ -157,6 +159,9 @@ func (c *gettyConn) SetWriteDeadline(wDeadline time.Duration) {
}
c.wDeadline = wDeadline
if c.rDeadline == 0 {
c.rDeadline = wDeadline
}
}
/////////////////////////////////////////
......@@ -243,28 +248,51 @@ func (t *gettyTCPConn) SetCompressType(c CompressType) {
// tcp connection read
func (t *gettyTCPConn) read(p []byte) (int, error) {
// if t.conn == nil {
// return 0, ErrInvalidConnection
// }
var (
err error
currentTime time.Time
length int
)
// atomic.AddUint32(&t.readCount, 1)
// l, e := t.conn.Read(p)
l, e := t.reader.Read(p)
atomic.AddUint32(&t.readCount, uint32(l))
return l, e
if t.rDeadline > 0 {
// Optimization: update read deadline only if more than 25%
// of the last read deadline exceeded.
// See https://github.com/golang/go/issues/15133 for details.
currentTime = wheel.Now()
if currentTime.Sub(t.rLastDeadline) > (t.rDeadline >> 2) {
if err = t.conn.SetReadDeadline(currentTime.Add(t.rDeadline)); err != nil {
return 0, err
}
t.rLastDeadline = currentTime
}
}
length, err = t.reader.Read(p)
atomic.AddUint32(&t.readCount, uint32(length))
return length, err
}
// tcp connection write
func (t *gettyTCPConn) Write(p []byte) error {
// if t.conn == nil {
// return 0, ErrInvalidConnection
// }
var (
err error
currentTime time.Time
)
if t.wDeadline > 0 {
// Optimization: update write deadline only if more than 25%
// of the last write deadline exceeded.
// See https://github.com/golang/go/issues/15133 for details.
currentTime = wheel.Now()
if currentTime.Sub(t.wLastDeadline) > (t.wDeadline >> 2) {
if err = t.conn.SetWriteDeadline(currentTime.Add(t.wDeadline)); err != nil {
return err
}
t.wLastDeadline = currentTime
}
}
// atomic.AddUint32(&t.writeCount, 1)
atomic.AddUint32(&t.writeCount, (uint32)(len(p)))
// _, err := t.conn.Write(p)
_, err := t.writer.Write(p)
_, err = t.writer.Write(p)
return err
}
......@@ -337,7 +365,24 @@ func (w *gettyWSConn) SetCompressType(c CompressType) {
}
func (w *gettyWSConn) handlePing(message string) error {
err := w.conn.WriteMessage(websocket.PongMessage, []byte(message))
var (
err error
currentTime time.Time
)
if w.wDeadline > 0 {
// Optimization: update write deadline only if more than 25%
// of the last write deadline exceeded.
// See https://github.com/golang/go/issues/15133 for details.
currentTime = wheel.Now()
if currentTime.Sub(w.wLastDeadline) > (w.wDeadline >> 2) {
if err = w.conn.SetWriteDeadline(currentTime.Add(w.wDeadline)); err != nil {
return err
}
w.wLastDeadline = currentTime
}
}
err = w.conn.WriteMessage(websocket.PongMessage, []byte(message))
if err == websocket.ErrCloseSent {
err = nil
} else if e, ok := err.(net.Error); ok && e.Temporary() {
......@@ -357,6 +402,23 @@ func (w *gettyWSConn) handlePong(string) error {
// websocket connection read
func (w *gettyWSConn) read() ([]byte, error) {
var (
err error
currentTime time.Time
)
if w.rDeadline > 0 {
// Optimization: update read deadline only if more than 25%
// of the last read deadline exceeded.
// See https://github.com/golang/go/issues/15133 for details.
currentTime = wheel.Now()
if currentTime.Sub(w.rLastDeadline) > (w.rDeadline >> 2) {
if err = w.conn.SetReadDeadline(currentTime.Add(w.rDeadline)); err != nil {
return nil, err
}
w.rLastDeadline = currentTime
}
}
// w.conn.SetReadDeadline(time.Now().Add(w.rDeadline))
_, b, e := w.conn.ReadMessage() // the first return value is message type.
if e == nil {
......@@ -373,6 +435,23 @@ func (w *gettyWSConn) read() ([]byte, error) {
// websocket connection write
func (w *gettyWSConn) Write(p []byte) error {
var (
err error
currentTime time.Time
)
if w.wDeadline > 0 {
// Optimization: update write deadline only if more than 25%
// of the last write deadline exceeded.
// See https://github.com/golang/go/issues/15133 for details.
currentTime = wheel.Now()
if currentTime.Sub(w.wLastDeadline) > (w.wDeadline >> 2) {
if err = w.conn.SetWriteDeadline(currentTime.Add(w.wDeadline)); err != nil {
return err
}
w.wLastDeadline = currentTime
}
}
// atomic.AddUint32(&w.writeCount, 1)
atomic.AddUint32(&w.writeCount, (uint32)(len(p)))
// w.conn.SetWriteDeadline(time.Now().Add(w.wDeadline))
......
......@@ -373,7 +373,8 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error {
// s.conn.SetWriteDeadline(time.Now().Add(s.wDeadline))
if len(pkgs) == 1 {
return s.Connection.Write(pkgs[0])
// return s.Connection.Write(pkgs[0])
return s.WriteBytes(pkgs[0])
}
// get len
......@@ -458,8 +459,8 @@ func (s *session) handleLoop() {
s.gc()
}()
wsConn, wsFlag = s.Connection.(*gettyWSConn)
flag = true // do not do any read/Write/cron operation while got Write error
wsConn, wsFlag = s.Connection.(*gettyWSConn)
// ticker = time.NewTicker(s.period) // use wheel instead, 2016/09/26
LOOP:
for {
......@@ -690,9 +691,10 @@ func (s *session) stop() {
default:
s.once.Do(func() {
// let read/Write timeout asap
now := wheel.Now()
if conn := s.Conn(); conn != nil {
conn.SetReadDeadline(time.Now().Add(s.readDeadline()))
conn.SetWriteDeadline(time.Now().Add(s.writeDeadline()))
conn.SetReadDeadline(now.Add(s.readDeadline()))
conn.SetWriteDeadline(now.Add(s.writeDeadline()))
}
close(s.done)
})
......
......@@ -10,9 +10,9 @@
package getty
const (
Version = "0.7.03"
DATE = "2017/04/27"
Version = "0.7.04"
DATE = "2017/05/02"
GETTY_MAJOR = 0
GETTY_MINOR = 7
GETTY_BUILD = 3
GETTY_BUILD = 4
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment