Commit 805e1718 authored by AlexStocks's avatar AlexStocks Committed by watermelo

Imp: recover WriteBytesArray

parent d5d1b9ec
......@@ -153,7 +153,7 @@ func (c *client) dialTCP() Session {
return newTCPSession(conn, c)
}
log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, err)
log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, perrors.WithStack(err))
<-wheel.After(connectInterval)
}
}
......@@ -185,7 +185,7 @@ func (c *client) dialUDP() Session {
err = errSelfConnect
}
if err != nil {
log.Warnf("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, err)
log.Warnf("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, perrors.WithStack(err))
<-wheel.After(connectInterval)
continue
}
......@@ -194,7 +194,7 @@ func (c *client) dialUDP() Session {
conn.SetWriteDeadline(time.Now().Add(1e9))
if length, err = conn.Write(connectPingPackage[:]); err != nil {
conn.Close()
log.Warnf("conn.Write(%s) = {length:%d, err:%+v}", string(connectPingPackage), length, err)
log.Warnf("conn.Write(%s) = {length:%d, err:%+v}", string(connectPingPackage), length, perrors.WithStack(err))
<-wheel.After(connectInterval)
continue
}
......@@ -204,7 +204,7 @@ func (c *client) dialUDP() Session {
err = nil
}
if err != nil {
log.Infof("conn{%#v}.Read() = {length:%d, err:%+v}", conn, length, err)
log.Infof("conn{%#v}.Read() = {length:%d, err:%+v}", conn, length, perrors.WithStack(err))
conn.Close()
<-wheel.After(connectInterval)
continue
......@@ -229,7 +229,7 @@ func (c *client) dialWS() Session {
return nil
}
conn, _, err = dialer.Dial(c.addr, nil)
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, err)
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, perrors.WithStack(err))
if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
conn.Close()
err = errSelfConnect
......@@ -243,7 +243,7 @@ func (c *client) dialWS() Session {
return ss
}
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, err)
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, perrors.WithStack(err))
<-wheel.After(connectInterval)
}
}
......@@ -269,7 +269,7 @@ func (c *client) dialWSS() Session {
if c.cert != "" {
certPEMBlock, err := ioutil.ReadFile(c.cert)
if err != nil {
panic(fmt.Sprintf("ioutil.ReadFile(cert:%s) = error:%+v", c.cert, err))
panic(fmt.Sprintf("ioutil.ReadFile(cert:%s) = error:%+v", c.cert, perrors.WithStack(err)))
}
var cert tls.Certificate
......@@ -291,7 +291,7 @@ func (c *client) dialWSS() Session {
for _, c := range config.Certificates {
roots, err = x509.ParseCertificates(c.Certificate[len(c.Certificate)-1])
if err != nil {
panic(fmt.Sprintf("error parsing server's root cert: %+v\n", err))
panic(fmt.Sprintf("error parsing server's root cert: %+v\n", perrors.WithStack(err)))
}
for _, root = range roots {
certPool.AddCert(root)
......@@ -321,7 +321,7 @@ func (c *client) dialWSS() Session {
return ss
}
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, err)
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, perrors.WithStack(err))
<-wheel.After(connectInterval)
}
}
......@@ -387,7 +387,7 @@ func (c *client) connect() {
}
}
// there are two methods to keep connection pool. the first approch is like
// there are two methods to keep connection pool. the first approach is like
// redigo's lazy connection pool(https://github.com/gomodule/redigo/blob/master/redis/pool.go:),
// in which you should apply testOnBorrow to check alive of the connection.
// the second way is as follows. @RunEventLoop detects the aliveness of the connection
......@@ -405,13 +405,11 @@ func (c *client) RunEventLoop(newSession NewSessionCallback) {
func (c *client) reConnect() {
var num, max, times, interval int
// c.Lock()
max = c.number
interval = c.reconnectInterval
if interval == 0 {
interval = reconnectInterval
}
// c.Unlock()
for {
if c.IsClosed() {
log.Warnf("client{peer:%s} goroutine exit now.", c.addr)
......
......@@ -276,6 +276,8 @@ func TestNewWSClient(t *testing.T) {
beforeWritePkgNum := atomic.LoadUint32(&conn.writePkgNum)
err = ss.WriteBytes([]byte("hello"))
assert.Equal(t, beforeWritePkgNum+1, atomic.LoadUint32(&conn.writePkgNum))
err = ss.WriteBytesArray([]byte("hello"), []byte("hello"))
assert.Equal(t, beforeWritePkgNum+3, atomic.LoadUint32(&conn.writePkgNum))
err = conn.writePing()
assert.Nil(t, err)
......
......@@ -224,7 +224,7 @@ func (s *server) accept(newSession NewSessionCallback) (Session, error) {
}
if gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
log.Warnf("conn.localAddr{%s} == conn.RemoteAddr", conn.LocalAddr().String(), conn.RemoteAddr().String())
return nil, errSelfConnect
return nil, perrors.WithStack(errSelfConnect)
}
ss := newTCPSession(conn, s)
......@@ -248,7 +248,7 @@ func (s *server) runTcpEventLoop(newSession NewSessionCallback) {
)
for {
if s.IsClosed() {
log.Warnf("server{%s} stop acceptting client connect request.", s.addr)
log.Warnf("server{%s} stop accepting client connect request.", s.addr)
return
}
if delay != 0 {
......@@ -368,7 +368,7 @@ func (s *server) runWSEventLoop(newSession NewSessionCallback) {
s.lock.Unlock()
err = server.Serve(s.streamListener)
if err != nil {
log.Errorf("http.server.Serve(addr{%s}) = err{%+v}", s.addr, perrors.WithStack(err))
log.Errorf("http.server.Serve(addr{%s}) = err:%+v", s.addr, perrors.WithStack(err))
}
}()
}
......@@ -390,7 +390,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
defer s.wg.Done()
if certificate, err = tls.LoadX509KeyPair(s.cert, s.privateKey); err != nil {
panic(fmt.Sprintf("tls.LoadX509KeyPair(cert{%s}, privateKey{%s}) = err{%+v}",
panic(fmt.Sprintf("tls.LoadX509KeyPair(cert{%s}, privateKey{%s}) = err:%+v",
s.cert, s.privateKey, perrors.WithStack(err)))
return
}
......@@ -404,7 +404,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
if s.caCert != "" {
certPem, err = ioutil.ReadFile(s.caCert)
if err != nil {
panic(fmt.Errorf("ioutil.ReadFile(certFile{%s}) = err{%+v}", s.caCert, perrors.WithStack(err)))
panic(fmt.Errorf("ioutil.ReadFile(certFile{%s}) = err:%+v", s.caCert, perrors.WithStack(err)))
}
certPool = x509.NewCertPool()
if ok := certPool.AppendCertsFromPEM(certPem); !ok {
......@@ -429,7 +429,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
s.lock.Unlock()
err = server.Serve(tls.NewListener(s.streamListener, config))
if err != nil {
log.Errorf("http.server.Serve(addr{%s}) = err{%+v}", s.addr, perrors.WithStack(err))
log.Errorf("http.server.Serve(addr{%s}) = err:%+v", s.addr, perrors.WithStack(err))
panic(err)
}
}()
......
......@@ -12,6 +12,7 @@ package getty
import (
"bytes"
"fmt"
jerrors "github.com/juju/errors"
"io"
"net"
"runtime"
......@@ -387,7 +388,7 @@ func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error {
if timeout <= 0 {
pkgBytes, err := s.writer.Write(s, pkg)
if err != nil {
log.Warnf("%s, [session.WritePkg] session.writer.Write(@pkg:%#v) = error:%v", s.Stat(), pkg, err)
log.Warnf("%s, [session.WritePkg] session.writer.Write(@pkg:%#v) = error:%+v", s.Stat(), pkg, err)
return perrors.WithStack(err)
}
var udpCtxPtr *UDPContext
......@@ -404,7 +405,7 @@ func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error {
}
_, err = s.Connection.send(pkg)
if err != nil {
log.Warnf("%s, [session.WritePkg] @s.Connection.Write(pkg:%#v) = err:%v", s.Stat(), pkg, err)
log.Warnf("%s, [session.WritePkg] @s.Connection.Write(pkg:%#v) = err:%+v", s.Stat(), pkg, err)
return perrors.WithStack(err)
}
return nil
......@@ -445,10 +446,47 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error {
return s.WriteBytes(pkgs[0])
}
// TODO Currently, only TCP is supported.
if _, err := s.Connection.send(pkgs); err != nil {
return perrors.Wrapf(err, "s.Connection.Write(pkgs num:%d)", len(pkgs))
// reduce syscall and memcopy for multiple packages
if _, ok := s.Connection.(*gettyTCPConn); ok {
if _, err := s.Connection.send(pkgs); err != nil {
return perrors.Wrapf(err, "s.Connection.Write(pkgs num:%d)", len(pkgs))
}
}
// get len
var (
l int
err error
length int
arrp *[]byte
arr []byte
)
length = 0
for i := 0; i < len(pkgs); i++ {
length += len(pkgs[i])
}
// merge the pkgs
//arr = make([]byte, length)
arrp = gxbytes.AcquireBytes(length)
defer gxbytes.ReleaseBytes(arrp)
arr = *arrp
l = 0
for i := 0; i < len(pkgs); i++ {
copy(arr[l:], pkgs[i])
l += len(pkgs[i])
}
if err = s.WriteBytes(arr); err != nil {
return jerrors.Trace(err)
}
num := len(pkgs) - 1
for i := 0; i < num; i++ {
s.incWritePkgNum()
}
return nil
}
......@@ -554,7 +592,7 @@ LOOP:
for idx := 0; idx < maxIovecNum; idx++ {
pkgBytes, err = s.writer.Write(s, outPkg)
if err != nil {
log.Errorf("%s, [session.handleLoop] = error:%+v", s.sessionToken(), err)
log.Errorf("%s, [session.handleLoop] = error:%+v", s.sessionToken(), jerrors.ErrorStack(err))
s.stop()
// break LOOP
flag = false
......@@ -582,7 +620,7 @@ LOOP:
err = s.WriteBytesArray(iovec[:]...)
if err != nil {
log.Errorf("%s, [session.handleLoop]s.WriteBytesArray(iovec len:%d) = error:%+v",
s.sessionToken(), len(iovec), err)
s.sessionToken(), len(iovec), jerrors.ErrorStack(err))
s.stop()
// break LOOP
flag = false
......@@ -593,7 +631,7 @@ LOOP:
if wsFlag {
err := wsConn.writePing()
if err != nil {
log.Warnf("wsConn.writePing() = error{%s}", err)
log.Warnf("wsConn.writePing() = error:%+v", perrors.WithStack(err))
}
}
s.listener.OnCron(s)
......@@ -632,7 +670,7 @@ func (s *session) handlePackage() {
log.Infof("%s, [session.handlePackage] gr will exit now, left gr num %d", s.sessionToken(), grNum)
s.stop()
if err != nil {
log.Errorf("%s, [session.handlePackage] error:%+v", s.sessionToken(), err)
log.Errorf("%s, [session.handlePackage] error:%+v", s.sessionToken(), perrors.WithStack(err))
if s != nil || s.listener != nil {
s.listener.OnError(s, err)
}
......@@ -811,7 +849,7 @@ func (s *session) handleUDPPackage() error {
err = perrors.Errorf("Message Too Long, bufLen %d, session max message len %d", bufLen, s.maxMsgLen)
}
if err != nil {
log.Warnf("%s, [session.handleUDPPackage] = len{%d}, error:%+v",
log.Warnf("%s, [session.handleUDPPackage] = len:%d, error:%+v",
s.sessionToken(), pkgLen, perrors.WithStack(err))
continue
}
......@@ -849,7 +887,7 @@ func (s *session) handleWSPackage() error {
continue
}
if err != nil {
log.Warnf("%s, [session.handleWSPackage] = error{%+s}",
log.Warnf("%s, [session.handleWSPackage] = error:%+v",
s.sessionToken(), perrors.WithStack(err))
return perrors.WithStack(err)
}
......@@ -860,7 +898,7 @@ func (s *session) handleWSPackage() error {
err = perrors.Errorf("Message Too Long, length %d, session max message len %d", length, s.maxMsgLen)
}
if err != nil {
log.Warnf("%s, [session.handleWSPackage] = len{%d}, error:%+v",
log.Warnf("%s, [session.handleWSPackage] = len:%d, error:%+v",
s.sessionToken(), length, perrors.WithStack(err))
continue
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment