Commit db322363 authored by watermelo's avatar watermelo

Impl: reduce syscall and memcopy for multiple package

parent 93993521
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
"os" "os"
"strconv" "strconv"
"sync" "sync"
"sync/atomic"
"testing" "testing"
"time" "time"
) )
...@@ -122,7 +123,20 @@ func TestTCPClient(t *testing.T) { ...@@ -122,7 +123,20 @@ func TestTCPClient(t *testing.T) {
ss.SetCompressType(CompressNone) ss.SetCompressType(CompressNone)
conn := ss.(*session).Connection.(*gettyTCPConn) conn := ss.(*session).Connection.(*gettyTCPConn)
assert.True(t, conn.compress == CompressNone) assert.True(t, conn.compress == CompressNone)
beforeWriteBytes := atomic.LoadUint32(&conn.writeBytes)
beforeWritePkgNum := atomic.LoadUint32(&conn.writePkgNum)
_, err = conn.send([]byte("hello"))
assert.Nil(t, err)
assert.Equal(t, beforeWriteBytes+5, atomic.LoadUint32(&conn.writeBytes))
err = ss.WriteBytes([]byte("hello")) err = ss.WriteBytes([]byte("hello"))
assert.Equal(t, beforeWriteBytes+10, atomic.LoadUint32(&conn.writeBytes))
assert.Equal(t, beforeWritePkgNum+1, atomic.LoadUint32(&conn.writePkgNum))
assert.Nil(t, err)
var pkgs [][]byte
pkgs = append(pkgs, []byte("hello"), []byte("hello"))
_, err = conn.send(pkgs)
assert.Equal(t, beforeWritePkgNum+3, atomic.LoadUint32(&conn.writePkgNum))
assert.Equal(t, beforeWriteBytes+20, atomic.LoadUint32(&conn.writeBytes))
assert.Nil(t, err) assert.Nil(t, err)
ss.SetCompressType(CompressSnappy) ss.SetCompressType(CompressSnappy)
assert.True(t, conn.compress == CompressSnappy) assert.True(t, conn.compress == CompressSnappy)
...@@ -194,7 +208,14 @@ func TestUDPClient(t *testing.T) { ...@@ -194,7 +208,14 @@ func TestUDPClient(t *testing.T) {
_, err = udpConn.send(udpCtx) _, err = udpConn.send(udpCtx)
assert.NotNil(t, err) assert.NotNil(t, err)
udpCtx.Pkg = []byte("hello") udpCtx.Pkg = []byte("hello")
beforeWriteBytes := atomic.LoadUint32(&udpConn.writeBytes)
_, err = udpConn.send(udpCtx) _, err = udpConn.send(udpCtx)
assert.Equal(t, beforeWriteBytes+5, atomic.LoadUint32(&udpConn.writeBytes))
assert.Nil(t, err)
beforeWritePkgNum := atomic.LoadUint32(&udpConn.writePkgNum)
err = ss.WritePkg(udpCtx, 0)
assert.Equal(t, beforeWritePkgNum+1, atomic.LoadUint32(&udpConn.writePkgNum))
assert.Nil(t, err) assert.Nil(t, err)
clt.Close() clt.Close()
...@@ -247,8 +268,15 @@ func TestNewWSClient(t *testing.T) { ...@@ -247,8 +268,15 @@ func TestNewWSClient(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
_, err = conn.send("hello") _, err = conn.send("hello")
assert.NotNil(t, err) assert.NotNil(t, err)
beforeWriteBytes := atomic.LoadUint32(&conn.writeBytes)
_, err = conn.send([]byte("hello")) _, err = conn.send([]byte("hello"))
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, beforeWriteBytes+5, atomic.LoadUint32(&conn.writeBytes))
beforeWritePkgNum := atomic.LoadUint32(&conn.writePkgNum)
err = ss.WriteBytes([]byte("hello"))
assert.Equal(t, beforeWritePkgNum+1, atomic.LoadUint32(&conn.writePkgNum))
err = ss.WriteBytesArray([]byte("hello"), []byte("hello"))
assert.Equal(t, beforeWritePkgNum+3, atomic.LoadUint32(&conn.writePkgNum))
err = conn.writePing() err = conn.writePing()
assert.Nil(t, err) assert.Nil(t, err)
......
...@@ -265,9 +265,6 @@ func (t *gettyTCPConn) send(pkg interface{}) (int, error) { ...@@ -265,9 +265,6 @@ func (t *gettyTCPConn) send(pkg interface{}) (int, error) {
length int length int
) )
if p, ok = pkg.([]byte); !ok {
return 0, perrors.Errorf("illegal @pkg{%#v} type", pkg)
}
if t.compress == CompressNone && t.wTimeout > 0 { if t.compress == CompressNone && t.wTimeout > 0 {
// Optimization: update write deadline only if more than 25% // Optimization: update write deadline only if more than 25%
// of the last write deadline exceeded. // of the last write deadline exceeded.
...@@ -281,11 +278,27 @@ func (t *gettyTCPConn) send(pkg interface{}) (int, error) { ...@@ -281,11 +278,27 @@ func (t *gettyTCPConn) send(pkg interface{}) (int, error) {
} }
} }
if length, err = t.writer.Write(p); err == nil { if buffers, ok := pkg.([][]byte); ok {
atomic.AddUint32(&t.writeBytes, (uint32)(len(p))) netBuf := net.Buffers(buffers)
if length, err := netBuf.WriteTo(t.conn); err == nil {
atomic.AddUint32(&t.writeBytes, (uint32)(length))
atomic.AddUint32(&t.writePkgNum, (uint32)(len(buffers)))
}
log.Debug("localAddr: %s, remoteAddr:%s, now:%s, length:%d, err:%s",
t.conn.LocalAddr(), t.conn.RemoteAddr(), currentTime, length, err)
return int(length), perrors.WithStack(err)
} }
log.Debugf("now:%s, length:%d, err:%v", currentTime, length, err)
return length, perrors.WithStack(err) if p, ok = pkg.([]byte); ok {
if length, err = t.writer.Write(p); err == nil {
atomic.AddUint32(&t.writeBytes, (uint32)(len(p)))
}
log.Debug("localAddr: %s, remoteAddr:%s, now:%s, length:%d, err:%s",
t.conn.LocalAddr(), t.conn.RemoteAddr(), currentTime, length, err)
return length, perrors.WithStack(err)
}
return 0, perrors.Errorf("illegal @pkg{%#v} type", pkg)
//return length, err //return length, err
} }
...@@ -531,7 +544,7 @@ func (w *gettyWSConn) recv() ([]byte, error) { ...@@ -531,7 +544,7 @@ func (w *gettyWSConn) recv() ([]byte, error) {
// gorilla/websocket/conn.go:NextReader will always fail when got a timeout error. // gorilla/websocket/conn.go:NextReader will always fail when got a timeout error.
_, b, e := w.conn.ReadMessage() // the first return value is message type. _, b, e := w.conn.ReadMessage() // the first return value is message type.
if e == nil { if e == nil {
w.incReadPkgNum() atomic.AddUint32(&w.readBytes, (uint32)(len(b)))
} else { } else {
if websocket.IsUnexpectedCloseError(e, websocket.CloseGoingAway) { if websocket.IsUnexpectedCloseError(e, websocket.CloseGoingAway) {
log.Warnf("websocket unexpected close error: %v", e) log.Warnf("websocket unexpected close error: %v", e)
......
...@@ -449,6 +449,13 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error { ...@@ -449,6 +449,13 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error {
return s.WriteBytes(pkgs[0]) return s.WriteBytes(pkgs[0])
} }
// reduce syscall and memcopy for multiple packages
if _, ok := s.Connection.(*gettyTCPConn); ok {
if _, err := s.Connection.send(pkgs); err != nil {
return perrors.Wrapf(err, "s.Connection.Write(pkgs num:%d)", len(pkgs))
}
}
// get len // get len
var ( var (
l int l int
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment