Unverified Commit 3eb8b38b authored by Xin.Zh's avatar Xin.Zh Committed by GitHub

Merge pull request #57 from apache/feature/write-size

Ftr: return write length
parents ef4aaa4a 4a15143f
......@@ -139,23 +139,27 @@ func TestTCPClient(t *testing.T) {
assert.True(t, conn.compress == CompressNone)
beforeWriteBytes := atomic.LoadUint32(&conn.writeBytes)
beforeWritePkgNum := atomic.LoadUint32(&conn.writePkgNum)
_, err = conn.send([]byte("hello"))
assert.Equal(t, beforeWritePkgNum+1, atomic.LoadUint32(&conn.writePkgNum))
l, err := conn.send([]byte("hello"))
assert.Nil(t, err)
assert.True(t, l == 5)
assert.Equal(t, beforeWritePkgNum+1, atomic.LoadUint32(&conn.writePkgNum))
assert.Equal(t, beforeWriteBytes+5, atomic.LoadUint32(&conn.writeBytes))
err = ss.WriteBytes([]byte("hello"))
l, err = ss.WriteBytes([]byte("hello"))
assert.Nil(t, err)
assert.True(t, l == 5)
assert.Equal(t, beforeWriteBytes+10, atomic.LoadUint32(&conn.writeBytes))
assert.Equal(t, beforeWritePkgNum+2, atomic.LoadUint32(&conn.writePkgNum))
assert.Nil(t, err)
var pkgs [][]byte
pkgs = append(pkgs, []byte("hello"), []byte("hello"))
_, err = conn.send(pkgs)
l, err = conn.send(pkgs)
assert.Nil(t, err)
assert.True(t, l == 10)
assert.Equal(t, beforeWritePkgNum+4, atomic.LoadUint32(&conn.writePkgNum))
assert.Equal(t, beforeWriteBytes+20, atomic.LoadUint32(&conn.writeBytes))
assert.Nil(t, err)
ss.SetCompressType(CompressSnappy)
err = ss.WriteBytesArray(pkgs...)
l, err = ss.WriteBytesArray(pkgs...)
assert.Nil(t, err)
assert.True(t, l == 10)
assert.Equal(t, beforeWritePkgNum+6, atomic.LoadUint32(&conn.writePkgNum))
assert.Equal(t, beforeWriteBytes+30, atomic.LoadUint32(&conn.writeBytes))
assert.True(t, conn.compress == CompressSnappy)
......@@ -205,11 +209,14 @@ func TestUDPClient(t *testing.T) {
assert.NotNil(t, err)
err = ss.WritePkg([]byte("hello"), 0)
assert.NotNil(t, perrors.Cause(err))
err = ss.WriteBytes([]byte("hello"))
l, err := ss.WriteBytes([]byte("hello"))
assert.Zero(t, l)
assert.NotNil(t, err)
err = ss.WriteBytesArray([]byte("hello"))
l, err = ss.WriteBytesArray([]byte("hello"))
assert.Zero(t, l)
assert.NotNil(t, err)
err = ss.WriteBytesArray([]byte("hello"), []byte("world"))
l, err = ss.WriteBytesArray([]byte("hello"), []byte("world"))
assert.Zero(t, l)
assert.NotNil(t, err)
ss.SetCompressType(CompressNone)
host, port, _ := net.SplitHostPort(addr.String())
......@@ -285,18 +292,21 @@ func TestNewWSClient(t *testing.T) {
assert.True(t, conn.compress == CompressNone)
err := conn.handlePing("hello")
assert.Nil(t, err)
_, err = conn.send("hello")
l, err := conn.send("hello")
assert.NotNil(t, err)
assert.True(t, l == 0)
beforeWriteBytes := atomic.LoadUint32(&conn.writeBytes)
_, err = conn.send([]byte("hello"))
assert.Nil(t, err)
assert.Equal(t, beforeWriteBytes+5, atomic.LoadUint32(&conn.writeBytes))
beforeWritePkgNum := atomic.LoadUint32(&conn.writePkgNum)
err = ss.WriteBytes([]byte("hello"))
l, err = ss.WriteBytes([]byte("hello"))
assert.Nil(t, err)
assert.True(t, l == 5)
assert.Equal(t, beforeWritePkgNum+1, atomic.LoadUint32(&conn.writePkgNum))
err = ss.WriteBytesArray([]byte("hello"), []byte("hello"))
l, err = ss.WriteBytesArray([]byte("hello"), []byte("hello"))
assert.Nil(t, err)
assert.True(t, l == 10)
assert.Equal(t, beforeWritePkgNum+3, atomic.LoadUint32(&conn.writePkgNum))
err = conn.writePing()
assert.Nil(t, err)
......
......@@ -271,6 +271,7 @@ func (t *gettyTCPConn) send(pkg interface{}) (int, error) {
ok bool
p []byte
length int
lg int64
)
if t.compress == CompressNone && t.wTimeout > 0 {
......@@ -288,17 +289,19 @@ func (t *gettyTCPConn) send(pkg interface{}) (int, error) {
if buffers, ok := pkg.([][]byte); ok {
netBuf := net.Buffers(buffers)
if length, err := netBuf.WriteTo(t.conn); err == nil {
atomic.AddUint32(&t.writeBytes, (uint32)(length))
lg, err = netBuf.WriteTo(t.conn)
if err == nil {
atomic.AddUint32(&t.writeBytes, (uint32)(lg))
atomic.AddUint32(&t.writePkgNum, (uint32)(len(buffers)))
}
log.Debugf("localAddr: %s, remoteAddr:%s, now:%s, length:%d, err:%s",
t.conn.LocalAddr(), t.conn.RemoteAddr(), currentTime, length, err)
return int(length), perrors.WithStack(err)
return int(lg), perrors.WithStack(err)
}
if p, ok = pkg.([]byte); ok {
if length, err = t.writer.Write(p); err == nil {
length, err = t.writer.Write(p)
if err == nil {
atomic.AddUint32(&t.writeBytes, (uint32)(len(p)))
atomic.AddUint32(&t.writePkgNum, 1)
}
......
......@@ -172,8 +172,8 @@ type Session interface {
// the Writer will invoke this function. Pls attention that if timeout is less than 0, WritePkg will send @pkg asap.
// for udp session, the first parameter should be UDPContext.
WritePkg(pkg interface{}, timeout time.Duration) error
WriteBytes([]byte) error
WriteBytesArray(...[]byte) error
WriteBytes([]byte) (int, error)
WriteBytesArray(...[]byte) (int, error)
Close()
}
......
......@@ -76,6 +76,7 @@ func testTCPServer(t *testing.T, address string) {
server.Close()
assert.True(t, server.IsClosed())
}
func testTCPTlsServer(t *testing.T, address string) {
var (
server *server
......
......@@ -394,22 +394,22 @@ func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error {
}
// for codecs
func (s *session) WriteBytes(pkg []byte) error {
func (s *session) WriteBytes(pkg []byte) (int, error) {
if s.IsClosed() {
return ErrSessionClosed
return 0, ErrSessionClosed
}
// s.conn.SetWriteTimeout(time.Now().Add(s.wTimeout))
if _, err := s.Connection.send(pkg); err != nil {
return perrors.Wrapf(err, "s.Connection.Write(pkg len:%d)", len(pkg))
lg, err := s.Connection.send(pkg)
if err != nil {
return 0, perrors.Wrapf(err, "s.Connection.Write(pkg len:%d)", len(pkg))
}
return nil
return lg, nil
}
// Write multiple packages at once. so we invoke write sys.call just one time.
func (s *session) WriteBytesArray(pkgs ...[]byte) error {
func (s *session) WriteBytesArray(pkgs ...[]byte) (int, error) {
if s.IsClosed() {
return ErrSessionClosed
return 0, ErrSessionClosed
}
if len(pkgs) == 1 {
return s.WriteBytes(pkgs[0])
......@@ -417,15 +417,17 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error {
// reduce syscall and memcopy for multiple packages
if _, ok := s.Connection.(*gettyTCPConn); ok {
if _, err := s.Connection.send(pkgs); err != nil {
return perrors.Wrapf(err, "s.Connection.Write(pkgs num:%d)", len(pkgs))
lg, err := s.Connection.send(pkgs)
if err != nil {
return 0, perrors.Wrapf(err, "s.Connection.Write(pkgs num:%d)", len(pkgs))
}
return nil
return lg, nil
}
// get len
var (
l int
wlg int
err error
length int
arrp *[]byte
......@@ -437,7 +439,6 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error {
}
// merge the pkgs
//arr = make([]byte, length)
arrp = gxbytes.AcquireBytes(length)
defer gxbytes.ReleaseBytes(arrp)
arr = *arrp
......@@ -448,8 +449,9 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error {
l += len(pkgs[i])
}
if err = s.WriteBytes(arr); err != nil {
return perrors.WithStack(err)
wlg, err = s.WriteBytes(arr)
if err != nil {
return 0, perrors.WithStack(err)
}
num := len(pkgs) - 1
......@@ -457,7 +459,7 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error {
s.incWritePkgNum()
}
return nil
return wlg, nil
}
// func (s *session) RunEventLoop() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment