Commit 5969f80f authored by watermelo's avatar watermelo

Fix: some log format & update WriteBytesArray method

parent db322363
...@@ -275,8 +275,6 @@ func TestNewWSClient(t *testing.T) { ...@@ -275,8 +275,6 @@ func TestNewWSClient(t *testing.T) {
beforeWritePkgNum := atomic.LoadUint32(&conn.writePkgNum) beforeWritePkgNum := atomic.LoadUint32(&conn.writePkgNum)
err = ss.WriteBytes([]byte("hello")) err = ss.WriteBytes([]byte("hello"))
assert.Equal(t, beforeWritePkgNum+1, atomic.LoadUint32(&conn.writePkgNum)) assert.Equal(t, beforeWritePkgNum+1, atomic.LoadUint32(&conn.writePkgNum))
err = ss.WriteBytesArray([]byte("hello"), []byte("hello"))
assert.Equal(t, beforeWritePkgNum+3, atomic.LoadUint32(&conn.writePkgNum))
err = conn.writePing() err = conn.writePing()
assert.Nil(t, err) assert.Nil(t, err)
......
...@@ -284,7 +284,7 @@ func (t *gettyTCPConn) send(pkg interface{}) (int, error) { ...@@ -284,7 +284,7 @@ func (t *gettyTCPConn) send(pkg interface{}) (int, error) {
atomic.AddUint32(&t.writeBytes, (uint32)(length)) atomic.AddUint32(&t.writeBytes, (uint32)(length))
atomic.AddUint32(&t.writePkgNum, (uint32)(len(buffers))) atomic.AddUint32(&t.writePkgNum, (uint32)(len(buffers)))
} }
log.Debug("localAddr: %s, remoteAddr:%s, now:%s, length:%d, err:%s", log.Debugf("localAddr: %s, remoteAddr:%s, now:%s, length:%d, err:%s",
t.conn.LocalAddr(), t.conn.RemoteAddr(), currentTime, length, err) t.conn.LocalAddr(), t.conn.RemoteAddr(), currentTime, length, err)
return int(length), perrors.WithStack(err) return int(length), perrors.WithStack(err)
} }
...@@ -293,13 +293,12 @@ func (t *gettyTCPConn) send(pkg interface{}) (int, error) { ...@@ -293,13 +293,12 @@ func (t *gettyTCPConn) send(pkg interface{}) (int, error) {
if length, err = t.writer.Write(p); err == nil { if length, err = t.writer.Write(p); err == nil {
atomic.AddUint32(&t.writeBytes, (uint32)(len(p))) atomic.AddUint32(&t.writeBytes, (uint32)(len(p)))
} }
log.Debug("localAddr: %s, remoteAddr:%s, now:%s, length:%d, err:%s", log.Debugf("localAddr: %s, remoteAddr:%s, now:%s, length:%d, err:%s",
t.conn.LocalAddr(), t.conn.RemoteAddr(), currentTime, length, err) t.conn.LocalAddr(), t.conn.RemoteAddr(), currentTime, length, err)
return length, perrors.WithStack(err) return length, perrors.WithStack(err)
} }
return 0, perrors.Errorf("illegal @pkg{%#v} type", pkg) return 0, perrors.Errorf("illegal @pkg{%#v} type", pkg)
//return length, err
} }
// close tcp connection // close tcp connection
......
...@@ -301,7 +301,7 @@ func (s *session) SetWQLen(writeQLen int) { ...@@ -301,7 +301,7 @@ func (s *session) SetWQLen(writeQLen int) {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
s.wQ = make(chan interface{}, writeQLen) s.wQ = make(chan interface{}, writeQLen)
log.Debug("%s, [session.SetWQLen] wQ{len:%d, cap:%d}", s.Stat(), len(s.wQ), cap(s.wQ)) log.Debugf("%s, [session.SetWQLen] wQ{len:%d, cap:%d}", s.Stat(), len(s.wQ), cap(s.wQ))
} }
// set maximum wait time when session got error or got exit signal // set maximum wait time when session got error or got exit signal
...@@ -404,7 +404,7 @@ func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error { ...@@ -404,7 +404,7 @@ func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error {
} }
_, err = s.Connection.send(pkg) _, err = s.Connection.send(pkg)
if err != nil { if err != nil {
log.Warn("%s, [session.WritePkg] @s.Connection.Write(pkg:%#v) = err:%v", s.Stat(), pkg, err) log.Warnf("%s, [session.WritePkg] @s.Connection.Write(pkg:%#v) = err:%v", s.Stat(), pkg, err)
return perrors.WithStack(err) return perrors.WithStack(err)
} }
s.incWritePkgNum() s.incWritePkgNum()
...@@ -449,46 +449,10 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error { ...@@ -449,46 +449,10 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error {
return s.WriteBytes(pkgs[0]) return s.WriteBytes(pkgs[0])
} }
// reduce syscall and memcopy for multiple packages // TODO Currently, only TCP is supported.
if _, ok := s.Connection.(*gettyTCPConn); ok {
if _, err := s.Connection.send(pkgs); err != nil { if _, err := s.Connection.send(pkgs); err != nil {
return perrors.Wrapf(err, "s.Connection.Write(pkgs num:%d)", len(pkgs)) return perrors.Wrapf(err, "s.Connection.Write(pkgs num:%d)", len(pkgs))
} }
}
// get len
var (
l int
err error
length int
arrp *[]byte
arr []byte
)
length = 0
for i := 0; i < len(pkgs); i++ {
length += len(pkgs[i])
}
// merge the pkgs
// arr = make([]byte, length)
arrp = gxbytes.GetBytes(length)
defer gxbytes.PutBytes(arrp)
arr = *arrp
l = 0
for i := 0; i < len(pkgs); i++ {
copy(arr[l:], pkgs[i])
l += len(pkgs[i])
}
if err = s.WriteBytes(arr); err != nil {
return perrors.WithStack(err)
}
num := len(pkgs) - 1
for i := 0; i < num; i++ {
s.incWritePkgNum()
}
return nil return nil
} }
...@@ -574,7 +538,7 @@ LOOP: ...@@ -574,7 +538,7 @@ LOOP:
continue continue
} }
if !flag { if !flag {
log.Warn("[session.handleLoop] drop write out package %#v", outPkg) log.Warnf("[session.handleLoop] drop write out package %#v", outPkg)
continue continue
} }
...@@ -965,6 +929,6 @@ func (s *session) gc() { ...@@ -965,6 +929,6 @@ func (s *session) gc() {
// or (session)handleLoop automatically. It's thread safe. // or (session)handleLoop automatically. It's thread safe.
func (s *session) Close() { func (s *session) Close() {
s.stop() s.stop()
log.Info("%s closed now. its current gr num is %d", log.Infof("%s closed now. its current gr num is %d",
s.sessionToken(), atomic.LoadInt32(&(s.grNum))) s.sessionToken(), atomic.LoadInt32(&(s.grNum)))
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment