Commit 50759d83 authored by AlexStocks's avatar AlexStocks

bug fix: maxMsgLen of udp session 0 -> 4k

parent 41c6d4ac
...@@ -14,6 +14,10 @@ ...@@ -14,6 +14,10 @@
## develop history ## ## develop history ##
--- ---
- 2018/03/16
> bug fix
* set maxMsgLen of UDPSession from zero to 4k
- 2018/03/15 - 2018/03/15
> improvement > improvement
* add gettyUDPConn to session::Conn and session::gettyConn * add gettyUDPConn to session::Conn and session::gettyConn
......
...@@ -24,6 +24,7 @@ type Reader interface { ...@@ -24,6 +24,7 @@ type Reader interface {
// Writer is used to marshal pkg and write to session // Writer is used to marshal pkg and write to session
type Writer interface { type Writer interface {
// if @Session is udpGettySession, the second parameter is UDPContext.
Write(Session, interface{}) error Write(Session, interface{}) error
} }
......
...@@ -20,6 +20,7 @@ import ( ...@@ -20,6 +20,7 @@ import (
) )
import ( import (
"github.com/AlexStocks/goext/log"
log "github.com/AlexStocks/log4go" log "github.com/AlexStocks/log4go"
"github.com/golang/snappy" "github.com/golang/snappy"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
...@@ -435,6 +436,7 @@ func (u *gettyUDPConn) read(p []byte) (int, *net.UDPAddr, error) { ...@@ -435,6 +436,7 @@ func (u *gettyUDPConn) read(p []byte) (int, *net.UDPAddr, error) {
length, err = u.conn.Read(p) length, err = u.conn.Read(p)
addr = u.peerAddr addr = u.peerAddr
} }
log.Debug("now:%s, length:%d, err:%#v", currentTime, length, err)
if err == nil { if err == nil {
atomic.AddUint32(&u.readCount, uint32(length)) atomic.AddUint32(&u.readCount, uint32(length))
} }
...@@ -480,6 +482,8 @@ func (u *gettyUDPConn) Write(udpCtx interface{}) (int, error) { ...@@ -480,6 +482,8 @@ func (u *gettyUDPConn) Write(udpCtx interface{}) (int, error) {
peerAddr = u.peerAddr peerAddr = u.peerAddr
} }
length, _, err = u.conn.WriteMsgUDP(buf, nil, peerAddr) length, _, err = u.conn.WriteMsgUDP(buf, nil, peerAddr)
log.Debug("now:%s, length:%d, err:%#v", currentTime, length, err)
return length, err return length, err
} }
......
...@@ -191,9 +191,9 @@ func (s *Server) listenUDP() error { ...@@ -191,9 +191,9 @@ func (s *Server) listenUDP() error {
if err != nil { if err != nil {
return errors.Wrapf(err, "net.ListenUDP((udp, localAddr:%#v)", localAddr) return errors.Wrapf(err, "net.ListenUDP((udp, localAddr:%#v)", localAddr)
} }
if err = setUDPSocketOptions(pktListener); err != nil { //if err = setUDPSocketOptions(pktListener); err != nil {
return errors.Wrapf(err, "setUDPSocketOptions(pktListener:%#v)", pktListener) // return errors.Wrapf(err, "setUDPSocketOptions(pktListener:%#v)", pktListener)
} //}
s.pktListener = pktListener s.pktListener = pktListener
...@@ -382,7 +382,6 @@ func (s *Server) runWSEventLoop(newSession NewSessionCallback) { ...@@ -382,7 +382,6 @@ func (s *Server) runWSEventLoop(newSession NewSessionCallback) {
func (s *Server) runWSSEventLoop(newSession NewSessionCallback) { func (s *Server) runWSSEventLoop(newSession NewSessionCallback) {
s.wg.Add(1) s.wg.Add(1)
go func() { go func() {
defer s.wg.Done()
var ( var (
err error err error
certPem []byte certPem []byte
...@@ -392,6 +391,7 @@ func (s *Server) runWSSEventLoop(newSession NewSessionCallback) { ...@@ -392,6 +391,7 @@ func (s *Server) runWSSEventLoop(newSession NewSessionCallback) {
handler *wsHandler handler *wsHandler
server *http.Server server *http.Server
) )
defer s.wg.Done()
if certificate, err = tls.LoadX509KeyPair(s.cert, s.privateKey); err != nil { if certificate, err = tls.LoadX509KeyPair(s.cert, s.privateKey); err != nil {
panic(fmt.Sprintf("tls.LoadX509KeyPair(cert{%s}, privateKey{%s}) = err{%#v}", s.cert, s.privateKey, err)) panic(fmt.Sprintf("tls.LoadX509KeyPair(cert{%s}, privateKey{%s}) = err{%#v}", s.cert, s.privateKey, err))
......
...@@ -22,6 +22,7 @@ import ( ...@@ -22,6 +22,7 @@ import (
import ( import (
"github.com/AlexStocks/goext/context" "github.com/AlexStocks/goext/context"
"github.com/AlexStocks/goext/log"
"github.com/AlexStocks/goext/sync" "github.com/AlexStocks/goext/sync"
"github.com/AlexStocks/goext/time" "github.com/AlexStocks/goext/time"
log "github.com/AlexStocks/log4go" log "github.com/AlexStocks/log4go"
...@@ -145,6 +146,7 @@ func NewTCPSession(conn net.Conn) Session { ...@@ -145,6 +146,7 @@ func NewTCPSession(conn net.Conn) Session {
func NewUDPSession(conn *net.UDPConn, peerAddr *net.UDPAddr) Session { func NewUDPSession(conn *net.UDPConn, peerAddr *net.UDPAddr) Session {
session := &session{ session := &session{
name: defaultUDPSessionName, name: defaultUDPSessionName,
maxMsgLen: maxReadBufLen,
Connection: newGettyUDPConn(conn, peerAddr), Connection: newGettyUDPConn(conn, peerAddr),
done: make(chan gxsync.Empty), done: make(chan gxsync.Empty),
period: period, period: period,
...@@ -512,6 +514,7 @@ LOOP: ...@@ -512,6 +514,7 @@ LOOP:
case inPkg = <-s.rQ: case inPkg = <-s.rQ:
// 这个条件分支通过(session)rQ排空确保(session)handlePackage gr不会阻塞在(session)rQ上 // 这个条件分支通过(session)rQ排空确保(session)handlePackage gr不会阻塞在(session)rQ上
if flag { if flag {
gxlog.CInfo("%#v <-s.rQ", inPkg)
s.listener.OnMessage(s, inPkg) s.listener.OnMessage(s, inPkg)
s.incReadPkgCount() s.incReadPkgCount()
} else { } else {
...@@ -681,7 +684,6 @@ func (s *session) handleUDPPackage() error { ...@@ -681,7 +684,6 @@ func (s *session) handleUDPPackage() error {
pkg interface{} pkg interface{}
) )
buf = make([]byte, s.maxMsgLen)
conn = s.Connection.(*gettyUDPConn) conn = s.Connection.(*gettyUDPConn)
bufLen = int(s.maxMsgLen + maxReadBufLen) bufLen = int(s.maxMsgLen + maxReadBufLen)
if int(s.maxMsgLen<<1) < bufLen { if int(s.maxMsgLen<<1) < bufLen {
...@@ -694,20 +696,26 @@ func (s *session) handleUDPPackage() error { ...@@ -694,20 +696,26 @@ func (s *session) handleUDPPackage() error {
} }
bufLen, addr, err = conn.read(buf) bufLen, addr, err = conn.read(buf)
gxlog.CInfo("conn.read() = bufLen:%d, addr:%#v, err:%#v", bufLen, pkgLen, err)
if nerr, ok = err.(net.Error); ok && nerr.Timeout() { if nerr, ok = err.(net.Error); ok && nerr.Timeout() {
continue continue
} }
if bufLen == 0 {
continue
}
if err != nil { if err != nil {
log.Warn("%s, [session.handleUDPPackage] = len{%d}, error{%+v}", s.sessionToken(), bufLen, err) log.Warn("%s, [session.handleUDPPackage] = len{%d}, error{%+v}", s.sessionToken(), bufLen, err)
continue continue
} }
pkg, pkgLen, err = s.reader.Read(s, buf[:bufLen]) pkg, pkgLen, err = s.reader.Read(s, buf[:bufLen])
gxlog.CInfo("s.reader.Read() = pkg:%#v, pkgLen:%d, err:%#v", pkg, pkgLen, err)
time.Sleep(10e9)
if err == nil && s.maxMsgLen > 0 && bufLen > int(s.maxMsgLen) { if err == nil && s.maxMsgLen > 0 && bufLen > int(s.maxMsgLen) {
err = ErrMsgTooLong err = ErrMsgTooLong
} }
if err != nil { if err != nil {
log.Warn("%s, [session.handleUDPPackage] = len{%d}, error{%+v}", s.sessionToken(), bufLen, err) log.Warn("%s, [session.handleUDPPackage] = len{%d}, error{%+v}", s.sessionToken(), pkgLen, err)
continue continue
} }
s.UpdateActive() s.UpdateActive()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment