Commit 3f195ce4 authored by AlexStocks's avatar AlexStocks

Mod: add remark for tcp reading

parent bdf5e640
...@@ -235,6 +235,7 @@ func (t *gettyTCPConn) read(p []byte) (int, error) { ...@@ -235,6 +235,7 @@ func (t *gettyTCPConn) read(p []byte) (int, error) {
length int length int
) )
// set read timeout deadline
if t.compress == CompressNone && t.rTimeout > 0 { if t.compress == CompressNone && t.rTimeout > 0 {
// Optimization: update read deadline only if more than 25% // Optimization: update read deadline only if more than 25%
// of the last read deadline exceeded. // of the last read deadline exceeded.
...@@ -242,6 +243,7 @@ func (t *gettyTCPConn) read(p []byte) (int, error) { ...@@ -242,6 +243,7 @@ func (t *gettyTCPConn) read(p []byte) (int, error) {
currentTime = time.Now() currentTime = time.Now()
if currentTime.Sub(t.rLastDeadline) > (t.rTimeout >> 2) { if currentTime.Sub(t.rLastDeadline) > (t.rTimeout >> 2) {
if err = t.conn.SetReadDeadline(currentTime.Add(t.rTimeout)); err != nil { if err = t.conn.SetReadDeadline(currentTime.Add(t.rTimeout)); err != nil {
// just a timeout error
return 0, perrors.WithStack(err) return 0, perrors.WithStack(err)
} }
t.rLastDeadline = currentTime t.rLastDeadline = currentTime
......
...@@ -24,9 +24,17 @@ type NewSessionCallback func(Session) error ...@@ -24,9 +24,17 @@ type NewSessionCallback func(Session) error
// Reader is used to unmarshal a complete pkg from buffer // Reader is used to unmarshal a complete pkg from buffer
type Reader interface { type Reader interface {
// Parse tcp/udp/websocket pkg from buffer and if possible return a complete pkg // Parse tcp/udp/websocket pkg from buffer and if possible return a complete pkg.
// If length of buf is not long enough, u should return {nil,0, nil} // When receiving a tcp network streaming segment, there are 4 cases as following:
// The second return value is the length of the pkg. // case 1: a error found in the streaming segment;
// case 2: can not unmarshal a pkg from the streaming segment;
// case 3: just unmarshal a pkg from the streaming segment;
// case 4: unmarshal more than one pkg from the streaming segment;
//
// The return value is (nil, 0, error) as case 1.
// The return value is (nil, 0, nil) as case 2.
// The return value is (pkg, pkgLen, nil) as case 3.
// The handleTcpPackage may invoke func Read many times as case 4.
Read(Session, []byte) (interface{}, int, error) Read(Session, []byte) (interface{}, int, error)
} }
......
...@@ -623,6 +623,7 @@ func (s *session) handleTCPPackage() error { ...@@ -623,6 +623,7 @@ func (s *session) handleTCPPackage() error {
if err == nil && s.maxMsgLen > 0 && pkgLen > int(s.maxMsgLen) { if err == nil && s.maxMsgLen > 0 && pkgLen > int(s.maxMsgLen) {
err = perrors.Errorf("pkgLen %d > session max message len %d", pkgLen, s.maxMsgLen) err = perrors.Errorf("pkgLen %d > session max message len %d", pkgLen, s.maxMsgLen)
} }
// handle case 1
if err != nil { if err != nil {
log.Warnf("%s, [session.handleTCPPackage] = len{%d}, error:%+v", log.Warnf("%s, [session.handleTCPPackage] = len{%d}, error:%+v",
s.sessionToken(), pkgLen, err) s.sessionToken(), pkgLen, err)
...@@ -631,12 +632,15 @@ func (s *session) handleTCPPackage() error { ...@@ -631,12 +632,15 @@ func (s *session) handleTCPPackage() error {
exit = true exit = true
break break
} }
// handle case 2
if pkg == nil { if pkg == nil {
break break
} }
// handle case 3
s.UpdateActive() s.UpdateActive()
s.rQ <- pkg s.rQ <- pkg
pktBuf.Next(pkgLen) pktBuf.Next(pkgLen)
// continue to handle case 4
} }
if exit { if exit {
break break
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment