Commit 4fbff185 authored by AlexStocks's avatar AlexStocks

Add: tcp read case 2;

Fix: add buf slice length
parent a14ac8a9
...@@ -29,14 +29,16 @@ type Reader interface { ...@@ -29,14 +29,16 @@ type Reader interface {
// Parse tcp/udp/websocket pkg from buffer and if possible return a complete pkg. // Parse tcp/udp/websocket pkg from buffer and if possible return a complete pkg.
// When receiving a tcp network streaming segment, there are 4 cases as following: // When receiving a tcp network streaming segment, there are 4 cases as following:
// case 1: a error found in the streaming segment; // case 1: a error found in the streaming segment;
// case 2: can not unmarshal a pkg from the streaming segment; // case 2: can not unmarshal a pkg header from the streaming segment;
// case 3: just unmarshal a pkg from the streaming segment; // case 3: unmarshal a pkg header but can not unmarshal a pkg from the streaming segment;
// case 4: unmarshal more than one pkg from the streaming segment; // case 4: just unmarshal a pkg from the streaming segment;
// case 5: unmarshal more than one pkg from the streaming segment;
// //
// The return value is (nil, 0, error) as case 1. // The return value is (nil, 0, error) as case 1.
// The return value is (nil, 0, nil) as case 2. // The return value is (nil, 0, nil) as case 2.
// The return value is (pkg, pkgLen, nil) as case 3. // The return value is (nil, pkgLen, nil) as case 3.
// The handleTcpPackage may invoke func Read many times as case 4. // The return value is (pkg, pkgLen, nil) as case 4.
// The handleTcpPackage may invoke func Read many times as case 5.
Read(Session, []byte) (interface{}, int, error) Read(Session, []byte) (interface{}, int, error)
} }
......
...@@ -641,7 +641,7 @@ func (s *session) handleTCPPackage() error { ...@@ -641,7 +641,7 @@ func (s *session) handleTCPPackage() error {
for { for {
// for clause for the network timeout condition check // for clause for the network timeout condition check
// s.conn.SetReadTimeout(time.Now().Add(s.rTimeout)) // s.conn.SetReadTimeout(time.Now().Add(s.rTimeout))
bufLen, err = conn.read(buf) bufLen, err = conn.read(buf[:maxReadBufLen])
if err != nil { if err != nil {
if netError, ok = perrors.Cause(err).(net.Error); ok && netError.Timeout() { if netError, ok = perrors.Cause(err).(net.Error); ok && netError.Timeout() {
break break
...@@ -664,6 +664,7 @@ func (s *session) handleTCPPackage() error { ...@@ -664,6 +664,7 @@ func (s *session) handleTCPPackage() error {
} }
// pkg, err = s.pkgHandler.Read(s, pktBuf) // pkg, err = s.pkgHandler.Read(s, pktBuf)
pkg, pkgLen, err = s.reader.Read(s, pktBuf.Bytes()) pkg, pkgLen, err = s.reader.Read(s, pktBuf.Bytes())
// for case 3/case 4
if err == nil && s.maxMsgLen > 0 && pkgLen > int(s.maxMsgLen) { if err == nil && s.maxMsgLen > 0 && pkgLen > int(s.maxMsgLen) {
err = perrors.Errorf("pkgLen %d > session max message len %d", pkgLen, s.maxMsgLen) err = perrors.Errorf("pkgLen %d > session max message len %d", pkgLen, s.maxMsgLen)
} }
...@@ -674,15 +675,15 @@ func (s *session) handleTCPPackage() error { ...@@ -674,15 +675,15 @@ func (s *session) handleTCPPackage() error {
exit = true exit = true
break break
} }
// handle case 2 // handle case 2/case 3
if pkg == nil { if pkg == nil {
break break
} }
// handle case 3 // handle case 4
s.UpdateActive() s.UpdateActive()
s.addTask(pkg) s.addTask(pkg)
pktBuf.Next(pkgLen) pktBuf.Next(pkgLen)
// continue to handle case 4 // continue to handle case 5
} }
if exit { if exit {
break break
...@@ -700,6 +701,7 @@ func (s *session) handleUDPPackage() error { ...@@ -700,6 +701,7 @@ func (s *session) handleUDPPackage() error {
netError net.Error netError net.Error
conn *gettyUDPConn conn *gettyUDPConn
bufLen int bufLen int
maxBufLen int
buf []byte buf []byte
addr *net.UDPAddr addr *net.UDPAddr
pkgLen int pkgLen int
...@@ -707,17 +709,17 @@ func (s *session) handleUDPPackage() error { ...@@ -707,17 +709,17 @@ func (s *session) handleUDPPackage() error {
) )
conn = s.Connection.(*gettyUDPConn) conn = s.Connection.(*gettyUDPConn)
bufLen = int(s.maxMsgLen + maxReadBufLen) maxBufLen = int(s.maxMsgLen + maxReadBufLen)
if int(s.maxMsgLen<<1) < bufLen { if int(s.maxMsgLen<<1) < bufLen {
bufLen = int(s.maxMsgLen << 1) maxBufLen = int(s.maxMsgLen << 1)
} }
buf = make([]byte, bufLen) buf = make([]byte, maxBufLen)
for { for {
if s.IsClosed() { if s.IsClosed() {
break break
} }
bufLen, addr, err = conn.read(buf) bufLen, addr, err = conn.read(buf[:maxBufLen])
log.Debugf("conn.read() = bufLen:%d, addr:%#v, err:%+v", bufLen, addr, err) log.Debugf("conn.read() = bufLen:%d, addr:%#v, err:%+v", bufLen, addr, err)
if netError, ok = perrors.Cause(err).(net.Error); ok && netError.Timeout() { if netError, ok = perrors.Cause(err).(net.Error); ok && netError.Timeout() {
continue continue
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment