Commit 1998bbf8 authored by AlexStocks's avatar AlexStocks

count -> num

parent f67153eb
...@@ -15,10 +15,18 @@ ...@@ -15,10 +15,18 @@
--- ---
- 2018/03/18 - 2018/03/18
> bug fix
* ignore connectPingPackage
- 2018/03/18
> improvement > improvement
* use gxnet.IsSameAddr * use gxnet.IsSameAddr
* send out pkg asap in WritePkg when the second parameter @timeout is not greater then 0. * send out pkg asap in WritePkg when the second parameter @timeout is not greater then 0.
* delete Chinese commenting * delete Chinese commenting
* gettyConn:readCount -> gettyConn:readBytes
* gettyConn:writeCount -> gettyConn:writeBytes
* gettyConn:readPkgCount -> gettyConn:readPkgNum
* gettyConn:writePkgCount -> gettyConn:writePkgNum
- 2018/03/18 - 2018/03/18
> improvement > improvement
...@@ -295,9 +303,9 @@ ...@@ -295,9 +303,9 @@
- 2016/08/29 - 2016/08/29
> 1 rename reconnect to errFlag in function session.go:(Session)handlePackage > 1 rename reconnect to errFlag in function session.go:(Session)handlePackage
> >
> 2 session.go:(gettyConn)readCount is reconsidered as read in tcp stream bytes > 2 session.go:(gettyConn)readBytes is reconsidered as read in tcp stream bytes
> >
> 3 session.go:(gettyConn)writeCount is reconsidered as write out tcp stream bytes > 3 session.go:(gettyConn)writeBytes is reconsidered as write out tcp stream bytes
> >
> 4 reconstruct session output token string session.go:(Session)sessionToken > 4 reconstruct session output token string session.go:(Session)sessionToken
> >
......
...@@ -32,7 +32,10 @@ const ( ...@@ -32,7 +32,10 @@ const (
connInterval = 3e9 // 3s connInterval = 3e9 // 3s
connectTimeout = 5e9 connectTimeout = 5e9
maxTimes = 10 maxTimes = 10
pingPacket = "ping" )
var (
connectPingPackage = []byte("connect-ping")
) )
///////////////////////////////////////// /////////////////////////////////////////
...@@ -171,11 +174,10 @@ func (c *client) dialUDP() Session { ...@@ -171,11 +174,10 @@ func (c *client) dialUDP() Session {
} }
// check connection alive by write/read action // check connection alive by write/read action
copy(buf, []byte(pingPacket))
conn.SetWriteDeadline(wheel.Now().Add(1e9)) conn.SetWriteDeadline(wheel.Now().Add(1e9))
if length, err = conn.Write(buf[:len(pingPacket)]); err != nil { if length, err = conn.Write(connectPingPackage[:]); err != nil {
conn.Close() conn.Close()
log.Warn("conn.Write(%s) = {length:%d, err:%s}", pingPacket, length, err) log.Warn("conn.Write(%s) = {length:%d, err:%s}", string(connectPingPackage), length, err)
// time.Sleep(connInterval) // time.Sleep(connInterval)
<-wheel.After(connInterval) <-wheel.After(connInterval)
continue continue
......
...@@ -20,6 +20,7 @@ import ( ...@@ -20,6 +20,7 @@ import (
) )
import ( import (
"github.com/AlexStocks/goext/log"
log "github.com/AlexStocks/log4go" log "github.com/AlexStocks/log4go"
"github.com/golang/snappy" "github.com/golang/snappy"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
...@@ -46,10 +47,10 @@ type gettyConn struct { ...@@ -46,10 +47,10 @@ type gettyConn struct {
compress CompressType compress CompressType
padding1 uint8 padding1 uint8
padding2 uint16 padding2 uint16
readCount uint32 // read count readBytes uint32 // read bytes
writeCount uint32 // write count writeBytes uint32 // write bytes
readPkgCount uint32 // send pkg count readPkgNum uint32 // send pkg number
writePkgCount uint32 // recv pkg count writePkgNum uint32 // recv pkg number
active int64 // last active, in milliseconds active int64 // last active, in milliseconds
rTimeout time.Duration // network current limiting rTimeout time.Duration // network current limiting
wTimeout time.Duration wTimeout time.Duration
...@@ -72,12 +73,12 @@ func (c *gettyConn) RemoteAddr() string { ...@@ -72,12 +73,12 @@ func (c *gettyConn) RemoteAddr() string {
return c.peer return c.peer
} }
func (c *gettyConn) incReadPkgCount() { func (c *gettyConn) incReadPkgNum() {
atomic.AddUint32(&c.readPkgCount, 1) atomic.AddUint32(&c.readPkgNum, 1)
} }
func (c *gettyConn) incWritePkgCount() { func (c *gettyConn) incWritePkgNum() {
atomic.AddUint32(&c.writePkgCount, 1) atomic.AddUint32(&c.writePkgNum, 1)
} }
func (c *gettyConn) UpdateActive() { func (c *gettyConn) UpdateActive() {
...@@ -247,7 +248,7 @@ func (t *gettyTCPConn) read(p []byte) (int, error) { ...@@ -247,7 +248,7 @@ func (t *gettyTCPConn) read(p []byte) (int, error) {
length, err = t.reader.Read(p) length, err = t.reader.Read(p)
log.Debug("now:%s, length:%d, err:%s", currentTime, length, err) log.Debug("now:%s, length:%d, err:%s", currentTime, length, err)
atomic.AddUint32(&t.readCount, uint32(length)) atomic.AddUint32(&t.readBytes, uint32(length))
return length, err return length, err
} }
...@@ -258,6 +259,7 @@ func (t *gettyTCPConn) Write(pkg interface{}) (int, error) { ...@@ -258,6 +259,7 @@ func (t *gettyTCPConn) Write(pkg interface{}) (int, error) {
currentTime time.Time currentTime time.Time
ok bool ok bool
p []byte p []byte
length int
) )
if p, ok = pkg.([]byte); !ok { if p, ok = pkg.([]byte); !ok {
...@@ -276,8 +278,9 @@ func (t *gettyTCPConn) Write(pkg interface{}) (int, error) { ...@@ -276,8 +278,9 @@ func (t *gettyTCPConn) Write(pkg interface{}) (int, error) {
} }
} }
atomic.AddUint32(&t.writeCount, (uint32)(len(p))) if length, err = t.writer.Write(p); err == nil {
length, err := t.writer.Write(p) atomic.AddUint32(&t.writeBytes, (uint32)(len(p)))
}
log.Debug("now:%s, length:%d, err:%s", currentTime, length, err) log.Debug("now:%s, length:%d, err:%s", currentTime, length, err)
return length, err return length, err
} }
...@@ -309,6 +312,10 @@ type UDPContext struct { ...@@ -309,6 +312,10 @@ type UDPContext struct {
PeerAddr *net.UDPAddr PeerAddr *net.UDPAddr
} }
func (c UDPContext) String() string {
return fmt.Sprintf("{pkg:%#v, peer addr:%s}", c.Pkg, c.PeerAddr)
}
type gettyUDPConn struct { type gettyUDPConn struct {
gettyConn gettyConn
compressType CompressType compressType CompressType
...@@ -390,7 +397,7 @@ func (u *gettyUDPConn) read(p []byte) (int, *net.UDPAddr, error) { ...@@ -390,7 +397,7 @@ func (u *gettyUDPConn) read(p []byte) (int, *net.UDPAddr, error) {
length, addr, err = u.conn.ReadFromUDP(p) // connected udp also can get return @addr length, addr, err = u.conn.ReadFromUDP(p) // connected udp also can get return @addr
log.Debug("ReadFromUDP() = {length:%d, peerAddr:%s, error:%s}", length, addr, err) log.Debug("ReadFromUDP() = {length:%d, peerAddr:%s, error:%s}", length, addr, err)
if err == nil { if err == nil {
atomic.AddUint32(&u.readCount, uint32(length)) atomic.AddUint32(&u.readBytes, uint32(length))
} }
return length, addr, err return length, addr, err
...@@ -409,7 +416,7 @@ func (u *gettyUDPConn) Write(udpCtx interface{}) (int, error) { ...@@ -409,7 +416,7 @@ func (u *gettyUDPConn) Write(udpCtx interface{}) (int, error) {
) )
if ctx, ok = udpCtx.(UDPContext); !ok { if ctx, ok = udpCtx.(UDPContext); !ok {
return 0, fmt.Errorf("illegal @udpCtx{%#v} type", udpCtx) return 0, fmt.Errorf("illegal @udpCtx{%s} type, @udpCtx type:%T", udpCtx, udpCtx)
} }
if buf, ok = ctx.Pkg.([]byte); !ok { if buf, ok = ctx.Pkg.([]byte); !ok {
return 0, fmt.Errorf("illegal @udpCtx.Pkg{%#v} type", udpCtx) return 0, fmt.Errorf("illegal @udpCtx.Pkg{%#v} type", udpCtx)
...@@ -434,9 +441,10 @@ func (u *gettyUDPConn) Write(udpCtx interface{}) (int, error) { ...@@ -434,9 +441,10 @@ func (u *gettyUDPConn) Write(udpCtx interface{}) (int, error) {
} }
} }
atomic.AddUint32(&u.writeCount, (uint32)(len(buf))) if length, _, err = u.conn.WriteMsgUDP(buf, nil, peerAddr); err == nil {
atomic.AddUint32(&u.writeBytes, (uint32)(len(buf)))
length, _, err = u.conn.WriteMsgUDP(buf, nil, peerAddr) gxlog.CError("write count:%d, write:%d", len(buf), u.writeBytes)
}
log.Debug("WriteMsgUDP(peerAddr:%s) = {length:%d, error:%s}", peerAddr, length, err) log.Debug("WriteMsgUDP(peerAddr:%s) = {length:%d, error:%s}", peerAddr, length, err)
return length, err return length, err
...@@ -529,7 +537,7 @@ func (w *gettyWSConn) read() ([]byte, error) { ...@@ -529,7 +537,7 @@ func (w *gettyWSConn) read() ([]byte, error) {
// gorilla/websocket/conn.go:NextReader will always fail when got a timeout error. // gorilla/websocket/conn.go:NextReader will always fail when got a timeout error.
_, b, e := w.conn.ReadMessage() // the first return value is message type. _, b, e := w.conn.ReadMessage() // the first return value is message type.
if e == nil { if e == nil {
atomic.AddUint32(&w.readPkgCount, 1) w.incReadPkgNum()
} else { } else {
if websocket.IsUnexpectedCloseError(e, websocket.CloseGoingAway) { if websocket.IsUnexpectedCloseError(e, websocket.CloseGoingAway) {
log.Warn("websocket unexpected close error: %v", e) log.Warn("websocket unexpected close error: %v", e)
...@@ -564,6 +572,7 @@ func (w *gettyWSConn) updateWriteDeadline() error { ...@@ -564,6 +572,7 @@ func (w *gettyWSConn) updateWriteDeadline() error {
// websocket connection write // websocket connection write
func (w *gettyWSConn) Write(pkg interface{}) (int, error) { func (w *gettyWSConn) Write(pkg interface{}) (int, error) {
var ( var (
err error
ok bool ok bool
p []byte p []byte
) )
...@@ -572,10 +581,11 @@ func (w *gettyWSConn) Write(pkg interface{}) (int, error) { ...@@ -572,10 +581,11 @@ func (w *gettyWSConn) Write(pkg interface{}) (int, error) {
return 0, fmt.Errorf("illegal @pkg{%#v} type", pkg) return 0, fmt.Errorf("illegal @pkg{%#v} type", pkg)
} }
// atomic.AddUint32(&w.writeCount, 1)
atomic.AddUint32(&w.writeCount, (uint32)(len(p)))
w.updateWriteDeadline() w.updateWriteDeadline()
return len(p), w.conn.WriteMessage(websocket.BinaryMessage, p) if err = w.conn.WriteMessage(websocket.BinaryMessage, p); err == nil {
atomic.AddUint32(&w.writeBytes, (uint32)(len(p)))
}
return len(p), err
} }
func (w *gettyWSConn) writePing() error { func (w *gettyWSConn) writePing() error {
......
...@@ -88,8 +88,8 @@ type Connection interface { ...@@ -88,8 +88,8 @@ type Connection interface {
SetCompressType(CompressType) SetCompressType(CompressType)
LocalAddr() string LocalAddr() string
RemoteAddr() string RemoteAddr() string
incReadPkgCount() incReadPkgNum()
incWritePkgCount() incWritePkgNum()
// update session's active time // update session's active time
UpdateActive() UpdateActive()
// get session's active time // get session's active time
...@@ -143,9 +143,9 @@ type Session interface { ...@@ -143,9 +143,9 @@ type Session interface {
SetAttribute(interface{}, interface{}) SetAttribute(interface{}, interface{})
RemoveAttribute(interface{}) RemoveAttribute(interface{})
// the Writer will invoke this function. // the Writer will invoke this function. Pls attention that if timeout is less than 0, WritePkg will send @pkg asap.
// for udp session, the first parameter should be UDPContext. Otherwise its type is []byte. // for udp session, the first parameter should be UDPContext.
WritePkg(interface{}, time.Duration) error WritePkg(pkg interface{}, timeout time.Duration) error
WriteBytes([]byte) error WriteBytes([]byte) error
WriteBytesArray(...[]byte) error WriteBytesArray(...[]byte) error
Close() Close()
......
...@@ -21,10 +21,12 @@ import ( ...@@ -21,10 +21,12 @@ import (
import ( import (
"github.com/AlexStocks/goext/context" "github.com/AlexStocks/goext/context"
"github.com/AlexStocks/goext/log"
"github.com/AlexStocks/goext/sync" "github.com/AlexStocks/goext/sync"
"github.com/AlexStocks/goext/time" "github.com/AlexStocks/goext/time"
log "github.com/AlexStocks/log4go" log "github.com/AlexStocks/log4go"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/pkg/errors"
) )
const ( const (
...@@ -37,7 +39,7 @@ const ( ...@@ -37,7 +39,7 @@ const (
defaultUDPSessionName = "udp-session" defaultUDPSessionName = "udp-session"
defaultWSSessionName = "ws-session" defaultWSSessionName = "ws-session"
defaultWSSSessionName = "wss-session" defaultWSSSessionName = "wss-session"
outputFormat = "session %s, Read Count: %d, Write Count: %d, Read Pkg Count: %d, Write Pkg Count: %d" outputFormat = "session %s, Read Bytes: %d, Write Bytes: %d, Read Pkgs: %d, Write Pkgs: %d"
) )
///////////////////////////////////////// /////////////////////////////////////////
...@@ -178,10 +180,10 @@ func (s *session) Stat() string { ...@@ -178,10 +180,10 @@ func (s *session) Stat() string {
return fmt.Sprintf( return fmt.Sprintf(
outputFormat, outputFormat,
s.sessionToken(), s.sessionToken(),
atomic.LoadUint32(&(conn.readCount)), atomic.LoadUint32(&(conn.readBytes)),
atomic.LoadUint32(&(conn.writeCount)), atomic.LoadUint32(&(conn.writeBytes)),
atomic.LoadUint32(&(conn.readPkgCount)), atomic.LoadUint32(&(conn.readPkgNum)),
atomic.LoadUint32(&(conn.writePkgCount)), atomic.LoadUint32(&(conn.writePkgNum)),
) )
} }
...@@ -297,8 +299,6 @@ func (s *session) sessionToken() string { ...@@ -297,8 +299,6 @@ func (s *session) sessionToken() string {
return fmt.Sprintf("{%s:%s:%d:%s<->%s}", s.name, s.EndPoint().EndPointType(), s.ID(), s.LocalAddr(), s.RemoteAddr()) return fmt.Sprintf("{%s:%s:%d:%s<->%s}", s.name, s.EndPoint().EndPointType(), s.ID(), s.LocalAddr(), s.RemoteAddr())
} }
// Queued Write, for handler. Pls attention that if timeout is less than 0, WritePkg will send @pkg asap.
// For udp session, the @pkg should be UDPContext.
func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error { func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error {
if s.IsClosed() { if s.IsClosed() {
return ErrSessionClosed return ErrSessionClosed
...@@ -313,10 +313,15 @@ func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error { ...@@ -313,10 +313,15 @@ func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error {
} }
}() }()
var err error
if timeout <= 0 { if timeout <= 0 {
_, err := s.Connection.Write(pkg) if err = s.writer.Write(s, pkg); err == nil {
s.incWritePkgNum()
gxlog.CError("after incWritePkgNum, ss:%s", s.Stat())
}
return err return err
} }
gxlog.CError("fk")
select { select {
case s.wQ <- pkg: case s.wQ <- pkg:
break // for possible gen a new pkg break // for possible gen a new pkg
...@@ -336,8 +341,14 @@ func (s *session) WriteBytes(pkg []byte) error { ...@@ -336,8 +341,14 @@ func (s *session) WriteBytes(pkg []byte) error {
} }
// s.conn.SetWriteTimeout(time.Now().Add(s.wTimeout)) // s.conn.SetWriteTimeout(time.Now().Add(s.wTimeout))
_, err := s.Connection.Write(pkg) if _, err := s.Connection.Write(pkg); err != nil {
return err return errors.Wrapf(err, "s.Connection.Write(pkg len:%d)", len(pkg))
}
s.incWritePkgNum()
gxlog.CError("after write, ss:%s", s.Stat())
return nil
} }
// Write multiple packages at once // Write multiple packages at once
...@@ -355,6 +366,7 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error { ...@@ -355,6 +366,7 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error {
// get len // get len
var ( var (
l int l int
err error
length uint32 length uint32
arr []byte arr []byte
) )
...@@ -372,7 +384,17 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error { ...@@ -372,7 +384,17 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error {
} }
// return s.Connection.Write(arr) // return s.Connection.Write(arr)
return s.WriteBytes(arr) if err = s.WriteBytes(arr); err != nil {
return err
}
num := len(pkgs) - 1
for i := 0; i < num; i++ {
s.incWritePkgNum()
gxlog.CError("after write, ss:%s", s.Stat())
}
return nil
} }
// func (s *session) RunEventLoop() { // func (s *session) RunEventLoop() {
...@@ -463,7 +485,7 @@ LOOP: ...@@ -463,7 +485,7 @@ LOOP:
if flag { if flag {
log.Debug("%#v <-s.rQ", inPkg) log.Debug("%#v <-s.rQ", inPkg)
s.listener.OnMessage(s, inPkg) s.listener.OnMessage(s, inPkg)
s.incReadPkgCount() s.incReadPkgNum()
} else { } else {
log.Info("[session.handleLoop] drop readin package{%#v}", inPkg) log.Info("[session.handleLoop] drop readin package{%#v}", inPkg)
} }
...@@ -476,7 +498,8 @@ LOOP: ...@@ -476,7 +498,8 @@ LOOP:
flag = false flag = false
// break LOOP // break LOOP
} }
s.incWritePkgCount() s.incWritePkgNum()
gxlog.CError("outPkg:%#v, after incWritePkgNum, ss:%s", outPkg, s.Stat())
} else { } else {
log.Info("[session.handleLoop] drop writeout package{%#v}", outPkg) log.Info("[session.handleLoop] drop writeout package{%#v}", outPkg)
} }
...@@ -650,9 +673,20 @@ func (s *session) handleUDPPackage() error { ...@@ -650,9 +673,20 @@ func (s *session) handleUDPPackage() error {
} }
if err != nil { if err != nil {
log.Error("%s, [session.handleUDPPackage] = len{%d}, error{%s}", s.sessionToken(), bufLen, err) log.Error("%s, [session.handleUDPPackage] = len{%d}, error{%s}", s.sessionToken(), bufLen, err)
err = errors.Wrapf(err, "conn.read()")
break break
} }
if bufLen == 0 {
log.Error("conn.read() = bufLen:%d, addr:%s, err:%s", bufLen, addr, err)
continue
}
if bufLen == len(connectPingPackage) && bytes.Equal(connectPingPackage, buf[:bufLen]) {
log.Info("got %s connectPingPackage", addr)
continue
}
pkg, pkgLen, err = s.reader.Read(s, buf[:bufLen]) pkg, pkgLen, err = s.reader.Read(s, buf[:bufLen])
log.Debug("s.reader.Read() = pkg:%#v, pkgLen:%d, err:%s", pkg, pkgLen, err) log.Debug("s.reader.Read() = pkg:%#v, pkgLen:%d, err:%s", pkg, pkgLen, err)
if err == nil && s.maxMsgLen > 0 && bufLen > int(s.maxMsgLen) { if err == nil && s.maxMsgLen > 0 && bufLen > int(s.maxMsgLen) {
...@@ -662,6 +696,11 @@ func (s *session) handleUDPPackage() error { ...@@ -662,6 +696,11 @@ func (s *session) handleUDPPackage() error {
log.Warn("%s, [session.handleUDPPackage] = len{%d}, error{%s}", s.sessionToken(), pkgLen, err) log.Warn("%s, [session.handleUDPPackage] = len{%d}, error{%s}", s.sessionToken(), pkgLen, err)
continue continue
} }
if pkgLen == 0 {
log.Error("s.reader.Read() = pkg:%#v, pkgLen:%d, err:%s", pkg, pkgLen, err)
continue
}
s.UpdateActive() s.UpdateActive()
s.rQ <- UDPContext{Pkg: pkg, PeerAddr: addr} s.rQ <- UDPContext{Pkg: pkg, PeerAddr: addr}
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment