Commit dbd91cc5 authored by Xin.Zh's avatar Xin.Zh Committed by AlexStocks

Merge branch 'master' into improve/delte-handleLoop

parents 8b6ec175 30e62643
...@@ -135,27 +135,31 @@ func TestTCPClient(t *testing.T) { ...@@ -135,27 +135,31 @@ func TestTCPClient(t *testing.T) {
ss.SetCompressType(CompressNone) ss.SetCompressType(CompressNone)
conn := ss.(*session).Connection.(*gettyTCPConn) conn := ss.(*session).Connection.(*gettyTCPConn)
assert.True(t, conn.compress == CompressNone) assert.True(t, conn.compress == CompressNone)
beforeWriteBytes := conn.writeBytes.Load() beforeWriteBytes := atomic.LoadUint32(&conn.writeBytes)
beforeWritePkgNum := conn.writePkgNum.Load() beforeWritePkgNum := atomic.LoadUint32(&conn.writePkgNum)
_, err = conn.send([]byte("hello")) l, err := conn.send([]byte("hello"))
assert.Equal(t, beforeWritePkgNum+1, conn.writePkgNum.Load())
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, beforeWriteBytes+5, conn.writeBytes.Load()) assert.True(t, l == 5)
err = ss.WriteBytes([]byte("hello")) assert.Equal(t, beforeWritePkgNum+1, atomic.LoadUint32(&conn.writePkgNum))
assert.Equal(t, beforeWriteBytes+10, conn.writeBytes.Load()) assert.Equal(t, beforeWriteBytes+5, atomic.LoadUint32(&conn.writeBytes))
assert.Equal(t, beforeWritePkgNum+2, conn.writePkgNum.Load()) l, err = ss.WriteBytes([]byte("hello"))
assert.Nil(t, err) assert.Nil(t, err)
assert.True(t, l == 5)
assert.Equal(t, beforeWriteBytes+10, atomic.LoadUint32(&conn.writeBytes))
assert.Equal(t, beforeWritePkgNum+2, atomic.LoadUint32(&conn.writePkgNum))
var pkgs [][]byte var pkgs [][]byte
pkgs = append(pkgs, []byte("hello"), []byte("hello")) pkgs = append(pkgs, []byte("hello"), []byte("hello"))
_, err = conn.send(pkgs) l, err = conn.send(pkgs)
assert.Equal(t, beforeWritePkgNum+4, conn.writePkgNum.Load())
assert.Equal(t, beforeWriteBytes+20, conn.writeBytes.Load())
assert.Nil(t, err) assert.Nil(t, err)
assert.True(t, l == 10)
assert.Equal(t, beforeWritePkgNum+4, atomic.LoadUint32(&conn.writePkgNum))
assert.Equal(t, beforeWriteBytes+20, atomic.LoadUint32(&conn.writeBytes))
ss.SetCompressType(CompressSnappy) ss.SetCompressType(CompressSnappy)
err = ss.WriteBytesArray(pkgs...) l, err = ss.WriteBytesArray(pkgs...)
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, beforeWritePkgNum+6, conn.writePkgNum.Load()) assert.True(t, l == 10)
assert.Equal(t, beforeWriteBytes+30, conn.writeBytes.Load()) assert.Equal(t, beforeWritePkgNum+6, atomic.LoadUint32(&conn.writePkgNum))
assert.Equal(t, beforeWriteBytes+30, atomic.LoadUint32(&conn.writeBytes))
assert.True(t, conn.compress == CompressSnappy) assert.True(t, conn.compress == CompressSnappy)
clt.Close() clt.Close()
...@@ -166,6 +170,8 @@ func TestUDPClient(t *testing.T) { ...@@ -166,6 +170,8 @@ func TestUDPClient(t *testing.T) {
var ( var (
err error err error
conn *net.UDPConn conn *net.UDPConn
sendLen int
totalLen int
) )
func() { func() {
ip := net.ParseIP("127.0.0.1") ip := net.ParseIP("127.0.0.1")
...@@ -199,15 +205,22 @@ func TestUDPClient(t *testing.T) { ...@@ -199,15 +205,22 @@ func TestUDPClient(t *testing.T) {
assert.Equal(t, 1, msgHandler.SessionNumber()) assert.Equal(t, 1, msgHandler.SessionNumber())
ss := msgHandler.array[0] ss := msgHandler.array[0]
err = ss.WritePkg(nil, 0) totalLen, sendLen, err = ss.WritePkg(nil, 0)
assert.NotNil(t, err) assert.NotNil(t, err)
err = ss.WritePkg([]byte("hello"), 0) assert.True(t, sendLen == 0)
assert.True(t, totalLen == 0)
totalLen, sendLen, err = ss.WritePkg([]byte("hello"), 0)
assert.NotNil(t, perrors.Cause(err)) assert.NotNil(t, perrors.Cause(err))
err = ss.WriteBytes([]byte("hello")) assert.True(t, sendLen == 0)
assert.True(t, totalLen == 0)
l, err := ss.WriteBytes([]byte("hello"))
assert.Zero(t, l)
assert.NotNil(t, err) assert.NotNil(t, err)
err = ss.WriteBytesArray([]byte("hello")) l, err = ss.WriteBytesArray([]byte("hello"))
assert.Zero(t, l)
assert.NotNil(t, err) assert.NotNil(t, err)
err = ss.WriteBytesArray([]byte("hello"), []byte("world")) l, err = ss.WriteBytesArray([]byte("hello"), []byte("world"))
assert.Zero(t, l)
assert.NotNil(t, err) assert.NotNil(t, err)
ss.SetCompressType(CompressNone) ss.SetCompressType(CompressNone)
host, port, _ := net.SplitHostPort(addr.String()) host, port, _ := net.SplitHostPort(addr.String())
...@@ -230,10 +243,12 @@ func TestUDPClient(t *testing.T) { ...@@ -230,10 +243,12 @@ func TestUDPClient(t *testing.T) {
assert.Equal(t, beforeWriteBytes+5, udpConn.writeBytes.Load()) assert.Equal(t, beforeWriteBytes+5, udpConn.writeBytes.Load())
assert.Nil(t, err) assert.Nil(t, err)
beforeWritePkgNum := udpConn.writePkgNum.Load() beforeWritePkgNum := atomic.LoadUint32(&udpConn.writePkgNum)
err = ss.WritePkg(udpCtx, 0) totalLen, sendLen, err = ss.WritePkg(udpCtx, 0)
assert.Equal(t, beforeWritePkgNum+1, udpConn.writePkgNum.Load()) assert.Equal(t, beforeWritePkgNum+1, atomic.LoadUint32(&udpConn.writePkgNum))
assert.Nil(t, err) assert.Nil(t, err)
assert.True(t, sendLen == 0)
assert.True(t, totalLen == 0)
clt.Close() clt.Close()
assert.True(t, clt.IsClosed()) assert.True(t, clt.IsClosed())
...@@ -283,19 +298,22 @@ func TestNewWSClient(t *testing.T) { ...@@ -283,19 +298,22 @@ func TestNewWSClient(t *testing.T) {
assert.True(t, conn.compress == CompressNone) assert.True(t, conn.compress == CompressNone)
err := conn.handlePing("hello") err := conn.handlePing("hello")
assert.Nil(t, err) assert.Nil(t, err)
_, err = conn.send("hello") l, err := conn.send("hello")
assert.NotNil(t, err) assert.NotNil(t, err)
beforeWriteBytes := conn.writeBytes.Load() assert.True(t, l == 0)
beforeWriteBytes := atomic.LoadUint32(&conn.writeBytes)
_, err = conn.send([]byte("hello")) _, err = conn.send([]byte("hello"))
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, beforeWriteBytes+5, conn.writeBytes.Load()) assert.Equal(t, beforeWriteBytes+5, atomic.LoadUint32(&conn.writeBytes))
beforeWritePkgNum := conn.writePkgNum.Load() beforeWritePkgNum := atomic.LoadUint32(&conn.writePkgNum)
err = ss.WriteBytes([]byte("hello")) l, err = ss.WriteBytes([]byte("hello"))
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, beforeWritePkgNum+1, conn.writePkgNum.Load()) assert.True(t, l == 5)
err = ss.WriteBytesArray([]byte("hello"), []byte("hello")) assert.Equal(t, beforeWritePkgNum+1, atomic.LoadUint32(&conn.writePkgNum))
l, err = ss.WriteBytesArray([]byte("hello"), []byte("hello"))
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, beforeWritePkgNum+3, conn.writePkgNum.Load()) assert.True(t, l == 10)
assert.Equal(t, beforeWritePkgNum+3, atomic.LoadUint32(&conn.writePkgNum))
err = conn.writePing() err = conn.writePing()
assert.Nil(t, err) assert.Nil(t, err)
......
...@@ -268,6 +268,7 @@ func (t *gettyTCPConn) send(pkg interface{}) (int, error) { ...@@ -268,6 +268,7 @@ func (t *gettyTCPConn) send(pkg interface{}) (int, error) {
ok bool ok bool
p []byte p []byte
length int length int
lg int64
) )
if t.compress == CompressNone && t.wTimeout > 0 { if t.compress == CompressNone && t.wTimeout > 0 {
...@@ -285,19 +286,21 @@ func (t *gettyTCPConn) send(pkg interface{}) (int, error) { ...@@ -285,19 +286,21 @@ func (t *gettyTCPConn) send(pkg interface{}) (int, error) {
if buffers, ok := pkg.([][]byte); ok { if buffers, ok := pkg.([][]byte); ok {
netBuf := net.Buffers(buffers) netBuf := net.Buffers(buffers)
if length, err := netBuf.WriteTo(t.conn); err == nil { lg, err = netBuf.WriteTo(t.conn)
t.writeBytes.Add((uint32)(length)) if err == nil {
t.writePkgNum.Add((uint32)(len(buffers))) atomic.AddUint32(&t.writeBytes, (uint32)(lg))
atomic.AddUint32(&t.writePkgNum, (uint32)(len(buffers)))
} }
log.Debugf("localAddr: %s, remoteAddr:%s, now:%s, length:%d, err:%s", log.Debugf("localAddr: %s, remoteAddr:%s, now:%s, length:%d, err:%s",
t.conn.LocalAddr(), t.conn.RemoteAddr(), currentTime, length, err) t.conn.LocalAddr(), t.conn.RemoteAddr(), currentTime, length, err)
return int(length), perrors.WithStack(err) return int(lg), perrors.WithStack(err)
} }
if p, ok = pkg.([]byte); ok { if p, ok = pkg.([]byte); ok {
if length, err = t.writer.Write(p); err == nil { length, err = t.writer.Write(p)
t.writeBytes.Add((uint32)(len(p))) if err == nil {
t.writePkgNum.Add(1) atomic.AddUint32(&t.writeBytes, (uint32)(len(p)))
atomic.AddUint32(&t.writePkgNum, 1)
} }
log.Debugf("localAddr: %s, remoteAddr:%s, now:%s, length:%d, err:%s", log.Debugf("localAddr: %s, remoteAddr:%s, now:%s, length:%d, err:%s",
t.conn.LocalAddr(), t.conn.RemoteAddr(), currentTime, length, err) t.conn.LocalAddr(), t.conn.RemoteAddr(), currentTime, length, err)
......
...@@ -31,7 +31,7 @@ func ClientRequest() { ...@@ -31,7 +31,7 @@ func ClientRequest() {
go func() { go func() {
echoTimes := 10 echoTimes := 10
for i := 0; i < echoTimes; i++ { for i := 0; i < echoTimes; i++ {
err := ss.WritePkg("hello", WritePkgTimeout) _, _, err := ss.WritePkg("hello", WritePkgTimeout)
if err != nil { if err != nil {
log.Infof("session.WritePkg(session{%s}, error{%v}", ss.Stat(), err) log.Infof("session.WritePkg(session{%s}, error{%v}", ss.Stat(), err)
ss.Close() ss.Close()
......
...@@ -171,9 +171,12 @@ type Session interface { ...@@ -171,9 +171,12 @@ type Session interface {
// the Writer will invoke this function. Pls attention that if timeout is less than 0, WritePkg will send @pkg asap. // the Writer will invoke this function. Pls attention that if timeout is less than 0, WritePkg will send @pkg asap.
// for udp session, the first parameter should be UDPContext. // for udp session, the first parameter should be UDPContext.
WritePkg(pkg interface{}, timeout time.Duration) error // totalBytesLength: @pkg stream bytes length after encoding @pkg.
WriteBytes([]byte) error // sendBytesLength: stream bytes length that sent out successfully.
WriteBytesArray(...[]byte) error // err: maybe it has illegal data, encoding error, or write out system error.
WritePkg(pkg interface{}, timeout time.Duration) (totalBytesLength int, sendBytesLength int, err error)
WriteBytes([]byte) (int, error)
WriteBytesArray(...[]byte) (int, error)
Close() Close()
} }
......
...@@ -76,6 +76,7 @@ func testTCPServer(t *testing.T, address string) { ...@@ -76,6 +76,7 @@ func testTCPServer(t *testing.T, address string) {
server.Close() server.Close()
assert.True(t, server.IsClosed()) assert.True(t, server.IsClosed())
} }
func testTCPTlsServer(t *testing.T, address string) { func testTCPTlsServer(t *testing.T, address string) {
var ( var (
server *server server *server
......
...@@ -338,12 +338,12 @@ func (s *session) sessionToken() string { ...@@ -338,12 +338,12 @@ func (s *session) sessionToken() string {
s.name, s.EndPoint().EndPointType(), s.ID(), s.LocalAddr(), s.RemoteAddr()) s.name, s.EndPoint().EndPointType(), s.ID(), s.LocalAddr(), s.RemoteAddr())
} }
func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error { func (s *session) WritePkg(pkg interface{}, timeout time.Duration) (int, int, error) {
if pkg == nil { if pkg == nil {
return fmt.Errorf("@pkg is nil") return 0, 0, fmt.Errorf("@pkg is nil")
} }
if s.IsClosed() { if s.IsClosed() {
return ErrSessionClosed return 0, 0, ErrSessionClosed
} }
defer func() { defer func() {
...@@ -358,7 +358,7 @@ func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error { ...@@ -358,7 +358,7 @@ func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error {
pkgBytes, err := s.writer.Write(s, pkg) pkgBytes, err := s.writer.Write(s, pkg)
if err != nil { if err != nil {
log.Warnf("%s, [session.WritePkg] session.writer.Write(@pkg:%#v) = error:%+v", s.Stat(), pkg, err) log.Warnf("%s, [session.WritePkg] session.writer.Write(@pkg:%#v) = error:%+v", s.Stat(), pkg, err)
return perrors.WithStack(err) return len(pkgBytes), 0, perrors.WithStack(err)
} }
var udpCtxPtr *UDPContext var udpCtxPtr *UDPContext
if udpCtx, ok := pkg.(UDPContext); ok { if udpCtx, ok := pkg.(UDPContext); ok {
...@@ -375,32 +375,32 @@ func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error { ...@@ -375,32 +375,32 @@ func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error {
if 0 < timeout { if 0 < timeout {
s.Connection.SetWriteTimeout(timeout) s.Connection.SetWriteTimeout(timeout)
} }
_, err = s.Connection.send(pkg) var succssCount int
succssCount, err = s.Connection.send(pkg)
if err != nil { if err != nil {
log.Warnf("%s, [session.WritePkg] @s.Connection.Write(pkg:%#v) = err:%+v", s.Stat(), pkg, err) log.Warnf("%s, [session.WritePkg] @s.Connection.Write(pkg:%#v) = err:%+v", s.Stat(), pkg, err)
return perrors.WithStack(err) return len(pkgBytes), succssCount, perrors.WithStack(err)
} }
return len(pkgBytes), succssCount, nil
return nil
} }
// for codecs // for codecs
func (s *session) WriteBytes(pkg []byte) error { func (s *session) WriteBytes(pkg []byte) (int, error) {
if s.IsClosed() { if s.IsClosed() {
return ErrSessionClosed return 0, ErrSessionClosed
} }
// s.conn.SetWriteTimeout(time.Now().Add(s.wTimeout)) lg, err := s.Connection.send(pkg)
if _, err := s.Connection.send(pkg); err != nil { if err != nil {
return perrors.Wrapf(err, "s.Connection.Write(pkg len:%d)", len(pkg)) return 0, perrors.Wrapf(err, "s.Connection.Write(pkg len:%d)", len(pkg))
} }
return nil return lg, nil
} }
// Write multiple packages at once. so we invoke write sys.call just one time. // Write multiple packages at once. so we invoke write sys.call just one time.
func (s *session) WriteBytesArray(pkgs ...[]byte) error { func (s *session) WriteBytesArray(pkgs ...[]byte) (int, error) {
if s.IsClosed() { if s.IsClosed() {
return ErrSessionClosed return 0, ErrSessionClosed
} }
if len(pkgs) == 1 { if len(pkgs) == 1 {
return s.WriteBytes(pkgs[0]) return s.WriteBytes(pkgs[0])
...@@ -408,15 +408,17 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error { ...@@ -408,15 +408,17 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error {
// reduce syscall and memcopy for multiple packages // reduce syscall and memcopy for multiple packages
if _, ok := s.Connection.(*gettyTCPConn); ok { if _, ok := s.Connection.(*gettyTCPConn); ok {
if _, err := s.Connection.send(pkgs); err != nil { lg, err := s.Connection.send(pkgs)
return perrors.Wrapf(err, "s.Connection.Write(pkgs num:%d)", len(pkgs)) if err != nil {
return 0, perrors.Wrapf(err, "s.Connection.Write(pkgs num:%d)", len(pkgs))
} }
return nil return lg, nil
} }
// get len // get len
var ( var (
l int l int
wlg int
err error err error
length int length int
arrp *[]byte arrp *[]byte
...@@ -428,7 +430,6 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error { ...@@ -428,7 +430,6 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error {
} }
// merge the pkgs // merge the pkgs
//arr = make([]byte, length)
arrp = gxbytes.AcquireBytes(length) arrp = gxbytes.AcquireBytes(length)
defer gxbytes.ReleaseBytes(arrp) defer gxbytes.ReleaseBytes(arrp)
arr = *arrp arr = *arrp
...@@ -439,8 +440,9 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error { ...@@ -439,8 +440,9 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error {
l += len(pkgs[i]) l += len(pkgs[i])
} }
if err = s.WriteBytes(arr); err != nil { wlg, err = s.WriteBytes(arr)
return perrors.WithStack(err) if err != nil {
return 0, perrors.WithStack(err)
} }
num := len(pkgs) - 1 num := len(pkgs) - 1
...@@ -448,12 +450,12 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error { ...@@ -448,12 +450,12 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error {
s.incWritePkgNum() s.incWritePkgNum()
} }
return nil return wlg, nil
} }
func sessionTimerLoop(_ gxtime.TimerID, _ time.Time, arg interface{}) error { func sessionTimerLoop(_ gxtime.TimerID, _ time.Time, arg interface{}) error {
ss, _ := arg.(*session) ss, _ := arg.(*session)
if ss != nil && ss.IsClosed() { if ss == nil || ss.IsClosed() {
return ErrSessionClosed return ErrSessionClosed
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment