Commit 43624e3f authored by AlexStocks's avatar AlexStocks

update session::Conn & session::gettyConn

parent 129c6dcd
......@@ -14,9 +14,14 @@
## develop history ##
---
- 2018/03/15
> improvement
* add gettyUDPConn to session::Conn and session::gettyConn
- 2018/03/14
> bug fix
* disable SetReadDeadline when enable compression.
Refers to the NextReader/NextWriter of gorilla/websocket, you should make a new compression reader/writer when
read/write a package again.
......
......@@ -16,7 +16,7 @@ type NewSessionCallback func(Session) error
// Reader is used to unmarshal a complete pkg from buffer
type Reader interface {
// Parse tcp pkg from buffer and if possible return a complete pkg
// Parse tcp/udp/websocket pkg from buffer and if possible return a complete pkg
// If length of buf is not long enough, u should return {nil,0, nil}
// The second return value is the length of the pkg.
Read(Session, []byte) (interface{}, int, error)
......
......@@ -139,6 +139,10 @@ func (c gettyConn) readTimeout() time.Duration {
return c.rTimeout
}
// Pls do not set read deadline for websocket connection. AlexStocks 20180310
// gorilla/websocket/conn.go:NextReader will always fail when got a timeout error.
//
// Pls do not set read deadline when using compression. AlexStocks 20180314.
func (c *gettyConn) SetReadTimeout(rTimeout time.Duration) {
if rTimeout < 1 {
panic("@rTimeout < 1")
......@@ -154,6 +158,10 @@ func (c gettyConn) writeTimeout() time.Duration {
return c.wTimeout
}
// Pls do not set write deadline for websocket connection. AlexStocks 20180310
// gorilla/websocket/conn.go:NextWriter will always fail when got a timeout error.
//
// Pls do not set write deadline when using compression. AlexStocks 20180314.
func (c *gettyConn) SetWriteTimeout(wTimeout time.Duration) {
if wTimeout < 1 {
panic("@wTimeout < 1")
......@@ -275,7 +283,7 @@ func (t *gettyTCPConn) read(p []byte) (int, error) {
}
length, err = t.reader.Read(p)
log.Debug("now:%s, length:%d, err:%s\n", currentTime, length, err)
log.Debug("now:%s, length:%d, err:%#v", currentTime, length, err)
atomic.AddUint32(&t.readCount, uint32(length))
return length, err
}
......@@ -306,7 +314,9 @@ func (t *gettyTCPConn) Write(pkg interface{}) (int, error) {
}
atomic.AddUint32(&t.writeCount, (uint32)(len(p)))
return t.writer.Write(p)
length, err := t.writer.Write(p)
log.Debug("now:%s, length:%d, err:%#v", currentTime, length, err)
return length, err
}
// close tcp connection
......@@ -328,6 +338,156 @@ func (t *gettyTCPConn) close(waitSec int) {
}
/////////////////////////////////////////
// getty udp connection
/////////////////////////////////////////
type UDPContext struct {
Pkg []byte
PeerAddr *net.UDPAddr
}
type gettyUDPConn struct {
gettyConn
peerAddr *net.UDPAddr // for client
compressType CompressType
conn *net.UDPConn // for server
}
func setUDPSocketOptions(conn *net.UDPConn) error {
// Try setting the flags for both families and ignore the errors unless they
// both error.
err6 := ipv6.NewPacketConn(conn).SetControlMessage(ipv6.FlagDst|ipv6.FlagInterface, true)
err4 := ipv4.NewPacketConn(conn).SetControlMessage(ipv4.FlagDst|ipv4.FlagInterface, true)
if err6 != nil && err4 != nil {
return err4
}
return nil
}
// create gettyUDPConn
func newGettyUDPConn(conn *net.UDPConn, peerUDPAddr *net.UDPAddr) *gettyUDPConn {
if conn == nil {
panic("newGettyUDPConn(conn):@conn is nil")
}
var localAddr, peerAddr string
if conn.LocalAddr() != nil {
localAddr = conn.LocalAddr().String()
}
if conn.RemoteAddr() != nil {
// connected udp
peerAddr = conn.RemoteAddr().String()
} else if peerUDPAddr != nil {
// unconnected udp
peerAddr = peerUDPAddr.String()
}
return &gettyUDPConn{
conn: conn,
peerAddr: peerUDPAddr,
gettyConn: gettyConn{
id: atomic.AddUint32(&connID, 1),
rTimeout: netIOTimeout,
wTimeout: netIOTimeout,
local: localAddr,
peer: peerAddr,
compress: CompressNone,
},
}
}
func (u *gettyUDPConn) SetCompressType(c CompressType) {
switch c {
case CompressNone, CompressZip, CompressBestSpeed, CompressBestCompression, CompressHuffman, CompressSnappy:
u.compressType = c
default:
panic(fmt.Sprintf("illegal comparess type %d", c))
}
}
// udp connection read
func (u *gettyUDPConn) read(p []byte) (int, *net.UDPAddr, error) {
var (
err error
currentTime time.Time
length int
addr *net.UDPAddr
)
if u.rTimeout > 0 {
// Optimization: update read deadline only if more than 25%
// of the last read deadline exceeded.
// See https://github.com/golang/go/issues/15133 for details.
currentTime = wheel.Now()
if currentTime.Sub(u.rLastDeadline) > (u.rTimeout >> 2) {
if err = u.conn.SetReadDeadline(currentTime.Add(u.rTimeout)); err != nil {
return 0, nil, err
}
u.rLastDeadline = currentTime
}
}
if u.peerAddr == nil {
length, addr, err = u.conn.ReadFromUDP(p)
} else {
length, err = u.conn.Read(p)
addr = u.peerAddr
}
if err == nil {
atomic.AddUint32(&u.readCount, uint32(length))
}
return length, addr, err
}
// write udp packet, @ctx should be of type UDPContext
func (u *gettyUDPConn) Write(udpCtx interface{}) (int, error) {
var (
err error
currentTime time.Time
length int
ok bool
ctx UDPContext
peerAddr *net.UDPAddr
)
if ctx, ok = udpCtx.(UDPContext); !ok {
return 0, fmt.Errorf("illegal @udpCtx{%#v} type", udpCtx)
}
if u.wTimeout > 0 {
// Optimization: update write deadline only if more than 25%
// of the last write deadline exceeded.
// See https://github.com/golang/go/issues/15133 for details.
currentTime = wheel.Now()
if currentTime.Sub(u.wLastDeadline) > (u.wTimeout >> 2) {
if err = u.conn.SetWriteDeadline(currentTime.Add(u.wTimeout)); err != nil {
return 0, err
}
u.wLastDeadline = currentTime
}
}
atomic.AddUint32(&u.writeCount, (uint32)(len(ctx.Pkg)))
peerAddr = ctx.PeerAddr
if u.peerAddr != nil {
peerAddr = u.peerAddr
}
length, _, err = u.conn.WriteMsgUDP(ctx.Pkg, nil, peerAddr)
return length, err
}
// close udp connection
func (u *gettyUDPConn) close(_ int) {
if u.conn != nil {
u.conn.Close()
u.conn = nil
}
}
/////////////////////////////////////////
// getty websocket connection
/////////////////////////////////////////
......@@ -477,153 +637,3 @@ func (w *gettyWSConn) close(waitSec int) {
}
w.conn.Close()
}
/////////////////////////////////////////
// getty udp connection
/////////////////////////////////////////
type UDPContext struct {
Pkg []byte
PeerAddr *net.UDPAddr
}
type gettyUDPConn struct {
gettyConn
peerAddr *net.UDPAddr // for client
compressType CompressType
conn *net.UDPConn // for server
}
func setUDPSocketOptions(conn *net.UDPConn) error {
// Try setting the flags for both families and ignore the errors unless they
// both error.
err6 := ipv6.NewPacketConn(conn).SetControlMessage(ipv6.FlagDst|ipv6.FlagInterface, true)
err4 := ipv4.NewPacketConn(conn).SetControlMessage(ipv4.FlagDst|ipv4.FlagInterface, true)
if err6 != nil && err4 != nil {
return err4
}
return nil
}
// create gettyUDPConn
func newGettyUDPConn(conn *net.UDPConn, peerUDPAddr *net.UDPAddr) *gettyUDPConn {
if conn == nil {
panic("newGettyUDPConn(conn):@conn is nil")
}
var localAddr, peerAddr string
if conn.LocalAddr() != nil {
localAddr = conn.LocalAddr().String()
}
if conn.RemoteAddr() != nil {
// connected udp
peerAddr = conn.RemoteAddr().String()
} else if peerUDPAddr != nil {
// unconnected udp
peerAddr = peerUDPAddr.String()
}
return &gettyUDPConn{
conn: conn,
peerAddr: peerUDPAddr,
gettyConn: gettyConn{
id: atomic.AddUint32(&connID, 1),
rTimeout: netIOTimeout,
wTimeout: netIOTimeout,
local: localAddr,
peer: peerAddr,
compress: CompressNone,
},
}
}
func (u *gettyUDPConn) SetCompressType(c CompressType) {
switch c {
case CompressNone, CompressZip, CompressBestSpeed, CompressBestCompression, CompressHuffman, CompressSnappy:
u.compressType = c
default:
panic(fmt.Sprintf("illegal comparess type %d", c))
}
}
// udp connection read
func (u *gettyUDPConn) read(p []byte) (int, *net.UDPAddr, error) {
var (
err error
currentTime time.Time
length int
addr *net.UDPAddr
)
if u.rTimeout > 0 {
// Optimization: update read deadline only if more than 25%
// of the last read deadline exceeded.
// See https://github.com/golang/go/issues/15133 for details.
currentTime = wheel.Now()
if currentTime.Sub(u.rLastDeadline) > (u.rTimeout >> 2) {
if err = u.conn.SetReadDeadline(currentTime.Add(u.rTimeout)); err != nil {
return 0, nil, err
}
u.rLastDeadline = currentTime
}
}
if u.peerAddr == nil {
length, addr, err = u.conn.ReadFromUDP(p)
} else {
length, err = u.conn.Read(p)
addr = u.peerAddr
}
if err == nil {
atomic.AddUint32(&u.readCount, uint32(length))
}
return length, addr, err
}
// write udp packet, @ctx should be of type UDPContext
func (u *gettyUDPConn) Write(udpCtx interface{}) (int, error) {
var (
err error
currentTime time.Time
length int
ok bool
ctx UDPContext
peerAddr *net.UDPAddr
)
if ctx, ok = udpCtx.(UDPContext); !ok {
return 0, fmt.Errorf("illegal @udpCtx{%#v} type", udpCtx)
}
if u.wTimeout > 0 {
// Optimization: update write deadline only if more than 25%
// of the last write deadline exceeded.
// See https://github.com/golang/go/issues/15133 for details.
currentTime = wheel.Now()
if currentTime.Sub(u.wLastDeadline) > (u.wTimeout >> 2) {
if err = u.conn.SetWriteDeadline(currentTime.Add(u.wTimeout)); err != nil {
return 0, err
}
u.wLastDeadline = currentTime
}
}
atomic.AddUint32(&u.writeCount, (uint32)(len(ctx.Pkg)))
peerAddr = ctx.PeerAddr
if u.peerAddr != nil {
peerAddr = u.peerAddr
}
length, _, err = u.conn.WriteMsgUDP(ctx.Pkg, nil, peerAddr)
return length, err
}
// close udp connection
func (u *gettyUDPConn) close(_ int) {
if u.conn != nil {
u.conn.Close()
u.conn = nil
}
}
......@@ -192,6 +192,10 @@ func (s *session) Conn() net.Conn {
return tc.conn
}
if uc, ok := s.Connection.(*gettyUDPConn); ok {
return uc.conn
}
if wc, ok := s.Connection.(*gettyWSConn); ok {
return wc.conn.UnderlyingConn()
}
......@@ -204,6 +208,10 @@ func (s *session) gettyConn() *gettyConn {
return &(tc.gettyConn)
}
if uc, ok := s.Connection.(*gettyUDPConn); ok {
return &(uc.gettyConn)
}
if wc, ok := s.Connection.(*gettyWSConn); ok {
return &(wc.gettyConn)
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment