Commit 09a7c142 authored by AlexStocks's avatar AlexStocks

use juju/errors instead errors everywhere

parent b473f6f2
......@@ -26,6 +26,7 @@ import (
"github.com/AlexStocks/goext/sync"
log "github.com/AlexStocks/log4go"
"github.com/gorilla/websocket"
jerrors "github.com/juju/errors"
)
const (
......@@ -138,7 +139,7 @@ func (c *client) dialTCP() Session {
return newTCPSession(conn, c)
}
log.Info("net.DialTimeout(addr:%s, timeout:%v) = error{%s}", c.addr, err)
log.Info("net.DialTimeout(addr:%s, timeout:%v) = error{%s}", c.addr, jerrors.ErrorStack(err))
// time.Sleep(connInterval)
<-wheel.After(connInterval)
}
......@@ -167,7 +168,7 @@ func (c *client) dialUDP() Session {
err = errSelfConnect
}
if err != nil {
log.Warn("net.DialTimeout(addr:%s, timeout:%v) = error{%s}", c.addr, err)
log.Warn("net.DialTimeout(addr:%s, timeout:%v) = error{%s}", c.addr, jerrors.ErrorStack(err))
// time.Sleep(connInterval)
<-wheel.After(connInterval)
continue
......@@ -177,7 +178,7 @@ func (c *client) dialUDP() Session {
conn.SetWriteDeadline(wheel.Now().Add(1e9))
if length, err = conn.Write(connectPingPackage[:]); err != nil {
conn.Close()
log.Warn("conn.Write(%s) = {length:%d, err:%s}", string(connectPingPackage), length, err)
log.Warn("conn.Write(%s) = {length:%d, err:%s}", string(connectPingPackage), length, jerrors.ErrorStack(err))
// time.Sleep(connInterval)
<-wheel.After(connInterval)
continue
......@@ -188,7 +189,7 @@ func (c *client) dialUDP() Session {
err = nil
}
if err != nil {
log.Info("conn{%#v}.Read() = {length:%d, err:%s}", conn, length, err)
log.Info("conn{%#v}.Read() = {length:%d, err:%s}", conn, length, jerrors.ErrorStack(err))
conn.Close()
// time.Sleep(connInterval)
<-wheel.After(connInterval)
......@@ -214,7 +215,7 @@ func (c *client) dialWS() Session {
return nil
}
conn, _, err = dialer.Dial(c.addr, nil)
log.Info("websocket.dialer.Dial(addr:%s) = error:%s", c.addr, err)
log.Info("websocket.dialer.Dial(addr:%s) = error:%s", c.addr, jerrors.ErrorStack(err))
if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
conn.Close()
err = errSelfConnect
......@@ -228,7 +229,7 @@ func (c *client) dialWS() Session {
return ss
}
log.Info("websocket.dialer.Dial(addr:%s) = error:%s", c.addr, err)
log.Info("websocket.dialer.Dial(addr:%s) = error:%s", c.addr, jerrors.ErrorStack(err))
// time.Sleep(connInterval)
<-wheel.After(connInterval)
}
......@@ -255,7 +256,7 @@ func (c *client) dialWSS() Session {
if c.cert != "" {
certPEMBlock, err := ioutil.ReadFile(c.cert)
if err != nil {
panic(fmt.Sprintf("ioutil.ReadFile(cert:%s) = error{%s}", c.cert, err))
panic(fmt.Sprintf("ioutil.ReadFile(cert:%s) = error{%s}", c.cert, jerrors.ErrorStack(err)))
}
var cert tls.Certificate
......@@ -277,7 +278,7 @@ func (c *client) dialWSS() Session {
for _, c := range config.Certificates {
roots, err = x509.ParseCertificates(c.Certificate[len(c.Certificate)-1])
if err != nil {
panic(fmt.Sprintf("error parsing server's root cert: %s\n", err))
panic(fmt.Sprintf("error parsing server's root cert: %s\n", jerrors.ErrorStack(err)))
}
for _, root = range roots {
certPool.AddCert(root)
......@@ -307,7 +308,7 @@ func (c *client) dialWSS() Session {
return ss
}
log.Info("websocket.dialer.Dial(addr:%s) = error{%s}", c.addr, err)
log.Info("websocket.dialer.Dial(addr:%s) = error{%s}", c.addr, jerrors.ErrorStack(err))
// time.Sleep(connInterval)
<-wheel.After(connInterval)
}
......
......@@ -20,10 +20,10 @@ import (
)
import (
"github.com/AlexStocks/goext/log"
log "github.com/AlexStocks/log4go"
"github.com/golang/snappy"
"github.com/gorilla/websocket"
jerrors "github.com/juju/errors"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
)
......@@ -190,10 +190,10 @@ func (t *writeFlusher) Write(p []byte) (int, error) {
n, err = t.flusher.Write(p)
if err != nil {
return n, err
return n, jerrors.Trace(err)
}
if err := t.flusher.Flush(); err != nil {
return 0, err
return 0, jerrors.Trace(err)
}
return n, nil
......@@ -240,7 +240,7 @@ func (t *gettyTCPConn) read(p []byte) (int, error) {
currentTime = wheel.Now()
if currentTime.Sub(t.rLastDeadline) > (t.rTimeout >> 2) {
if err = t.conn.SetReadDeadline(currentTime.Add(t.rTimeout)); err != nil {
return 0, err
return 0, jerrors.Trace(err)
}
t.rLastDeadline = currentTime
}
......@@ -249,7 +249,7 @@ func (t *gettyTCPConn) read(p []byte) (int, error) {
length, err = t.reader.Read(p)
log.Debug("now:%s, length:%d, err:%s", currentTime, length, err)
atomic.AddUint32(&t.readBytes, uint32(length))
return length, err
return length, jerrors.Trace(err)
}
// tcp connection write
......@@ -263,7 +263,7 @@ func (t *gettyTCPConn) Write(pkg interface{}) (int, error) {
)
if p, ok = pkg.([]byte); !ok {
return 0, fmt.Errorf("illegal @pkg{%#v} type", pkg)
return 0, jerrors.Errorf("illegal @pkg{%#v} type", pkg)
}
if t.compress == CompressNone && t.wTimeout > 0 {
// Optimization: update write deadline only if more than 25%
......@@ -272,7 +272,7 @@ func (t *gettyTCPConn) Write(pkg interface{}) (int, error) {
currentTime = wheel.Now()
if currentTime.Sub(t.wLastDeadline) > (t.wTimeout >> 2) {
if err = t.conn.SetWriteDeadline(currentTime.Add(t.wTimeout)); err != nil {
return 0, err
return 0, jerrors.Trace(err)
}
t.wLastDeadline = currentTime
}
......@@ -282,7 +282,7 @@ func (t *gettyTCPConn) Write(pkg interface{}) (int, error) {
atomic.AddUint32(&t.writeBytes, (uint32)(len(p)))
}
log.Debug("now:%s, length:%d, err:%s", currentTime, length, err)
return length, err
return length, jerrors.Trace(err)
}
// close tcp connection
......@@ -294,7 +294,7 @@ func (t *gettyTCPConn) close(waitSec int) {
if t.conn != nil {
if writer, ok := t.writer.(*snappy.Writer); ok {
if err := writer.Close(); err != nil {
log.Error("snappy.Writer.Close() = error{%s}", err)
log.Error("snappy.Writer.Close() = error{%s}", jerrors.ErrorStack(err))
}
}
t.conn.(*net.TCPConn).SetLinger(waitSec)
......@@ -328,7 +328,7 @@ func setUDPSocketOptions(conn *net.UDPConn) error {
err6 := ipv6.NewPacketConn(conn).SetControlMessage(ipv6.FlagDst|ipv6.FlagInterface, true)
err4 := ipv4.NewPacketConn(conn).SetControlMessage(ipv4.FlagDst|ipv4.FlagInterface, true)
if err6 != nil && err4 != nil {
return err4
return jerrors.Trace(err4)
}
return nil
}
......@@ -388,7 +388,7 @@ func (u *gettyUDPConn) read(p []byte) (int, *net.UDPAddr, error) {
currentTime = wheel.Now()
if currentTime.Sub(u.rLastDeadline) > (u.rTimeout >> 2) {
if err = u.conn.SetReadDeadline(currentTime.Add(u.rTimeout)); err != nil {
return 0, nil, err
return 0, nil, jerrors.Trace(err)
}
u.rLastDeadline = currentTime
}
......@@ -400,7 +400,7 @@ func (u *gettyUDPConn) read(p []byte) (int, *net.UDPAddr, error) {
atomic.AddUint32(&u.readBytes, uint32(length))
}
return length, addr, err
return length, addr, jerrors.Trace(err)
}
// write udp packet, @ctx should be of type UDPContext
......@@ -416,10 +416,10 @@ func (u *gettyUDPConn) Write(udpCtx interface{}) (int, error) {
)
if ctx, ok = udpCtx.(UDPContext); !ok {
return 0, fmt.Errorf("illegal @udpCtx{%s} type, @udpCtx type:%T", udpCtx, udpCtx)
return 0, jerrors.Errorf("illegal @udpCtx{%s} type, @udpCtx type:%T", udpCtx, udpCtx)
}
if buf, ok = ctx.Pkg.([]byte); !ok {
return 0, fmt.Errorf("illegal @udpCtx.Pkg{%#v} type", udpCtx)
return 0, jerrors.Errorf("illegal @udpCtx.Pkg{%#v} type", udpCtx)
}
if u.ss.EndPoint().EndPointType() == UDP_ENDPOINT {
peerAddr = ctx.PeerAddr
......@@ -435,7 +435,7 @@ func (u *gettyUDPConn) Write(udpCtx interface{}) (int, error) {
currentTime = wheel.Now()
if currentTime.Sub(u.wLastDeadline) > (u.wTimeout >> 2) {
if err = u.conn.SetWriteDeadline(currentTime.Add(u.wTimeout)); err != nil {
return 0, err
return 0, jerrors.Trace(err)
}
u.wLastDeadline = currentTime
}
......@@ -443,11 +443,10 @@ func (u *gettyUDPConn) Write(udpCtx interface{}) (int, error) {
if length, _, err = u.conn.WriteMsgUDP(buf, nil, peerAddr); err == nil {
atomic.AddUint32(&u.writeBytes, (uint32)(len(buf)))
gxlog.CError("write count:%d, write:%d", len(buf), u.writeBytes)
}
log.Debug("WriteMsgUDP(peerAddr:%s) = {length:%d, error:%s}", peerAddr, length, err)
return length, err
return length, jerrors.Trace(err)
}
// close udp connection
......@@ -523,7 +522,7 @@ func (w *gettyWSConn) handlePing(message string) error {
w.UpdateActive()
}
return err
return jerrors.Trace(err)
}
func (w *gettyWSConn) handlePong(string) error {
......@@ -544,7 +543,7 @@ func (w *gettyWSConn) read() ([]byte, error) {
}
}
return b, e
return b, jerrors.Trace(e)
}
func (w *gettyWSConn) updateWriteDeadline() error {
......@@ -560,7 +559,7 @@ func (w *gettyWSConn) updateWriteDeadline() error {
currentTime = wheel.Now()
if currentTime.Sub(w.wLastDeadline) > (w.wTimeout >> 2) {
if err = w.conn.SetWriteDeadline(currentTime.Add(w.wTimeout)); err != nil {
return err
return jerrors.Trace(err)
}
w.wLastDeadline = currentTime
}
......@@ -578,24 +577,24 @@ func (w *gettyWSConn) Write(pkg interface{}) (int, error) {
)
if p, ok = pkg.([]byte); !ok {
return 0, fmt.Errorf("illegal @pkg{%#v} type", pkg)
return 0, jerrors.Errorf("illegal @pkg{%#v} type", pkg)
}
w.updateWriteDeadline()
if err = w.conn.WriteMessage(websocket.BinaryMessage, p); err == nil {
atomic.AddUint32(&w.writeBytes, (uint32)(len(p)))
}
return len(p), err
return len(p), jerrors.Trace(err)
}
func (w *gettyWSConn) writePing() error {
w.updateWriteDeadline()
return w.conn.WriteMessage(websocket.PingMessage, []byte{})
return jerrors.Trace(w.conn.WriteMessage(websocket.PingMessage, []byte{}))
}
func (w *gettyWSConn) writePong(message []byte) error {
w.updateWriteDeadline()
return w.conn.WriteMessage(websocket.PongMessage, message)
return jerrors.Trace(w.conn.WriteMessage(websocket.PongMessage, message))
}
// close websocket connection
......
......@@ -27,11 +27,11 @@ import (
"github.com/AlexStocks/goext/time"
log "github.com/AlexStocks/log4go"
"github.com/gorilla/websocket"
"github.com/juju/errors"
jerrors "github.com/juju/errors"
)
var (
errSelfConnect = errors.New("connect self!")
errSelfConnect = jerrors.New("connect self!")
serverFastFailTimeout = gxtime.TimeSecondDuration(1)
)
......@@ -158,7 +158,7 @@ func (s *server) listenTCP() error {
streamListener, err = net.Listen("tcp", s.addr)
if err != nil {
return errors.Annotatef(err, "net.Listen(tcp, addr:%s))", s.addr)
return jerrors.Annotatef(err, "net.Listen(tcp, addr:%s))", s.addr)
}
s.streamListener = streamListener
......@@ -175,11 +175,11 @@ func (s *server) listenUDP() error {
localAddr, err = net.ResolveUDPAddr("udp", s.addr)
if err != nil {
return errors.Annotatef(err, "net.ResolveUDPAddr(udp, addr:%s)", s.addr)
return jerrors.Annotatef(err, "net.ResolveUDPAddr(udp, addr:%s)", s.addr)
}
pktListener, err = net.ListenUDP("udp", localAddr)
if err != nil {
return errors.Annotatef(err, "net.ListenUDP((udp, localAddr:%#v)", localAddr)
return jerrors.Annotatef(err, "net.ListenUDP((udp, localAddr:%#v)", localAddr)
}
// if err = setUDPSocketOptions(pktListener); err != nil {
// return errors.Wrapf(err, "setUDPSocketOptions(pktListener:%#v)", pktListener)
......@@ -194,9 +194,9 @@ func (s *server) listenUDP() error {
func (s *server) listen() error {
switch s.endPointType {
case TCP_SERVER, WS_SERVER, WSS_SERVER:
return s.listenTCP()
return jerrors.Trace(s.listenTCP())
case UDP_ENDPOINT:
return s.listenUDP()
return jerrors.Trace(s.listenUDP())
}
return nil
......@@ -205,7 +205,7 @@ func (s *server) listen() error {
func (s *server) accept(newSession NewSessionCallback) (Session, error) {
conn, err := s.streamListener.Accept()
if err != nil {
return nil, err
return nil, jerrors.Trace(err)
}
if gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
log.Warn("conn.localAddr{%s} == conn.RemoteAddr", conn.LocalAddr().String(), conn.RemoteAddr().String())
......@@ -216,7 +216,7 @@ func (s *server) accept(newSession NewSessionCallback) (Session, error) {
err = newSession(ss)
if err != nil {
conn.Close()
return nil, err
return nil, jerrors.Trace(err)
}
return ss, nil
......@@ -253,7 +253,7 @@ func (s *server) runTcpEventLoop(newSession NewSessionCallback) {
}
continue
}
log.Warn("server{%s}.Accept() = err {%#v}", s.addr, err)
log.Warn("server{%s}.Accept() = err {%#v}", s.addr, jerrors.ErrorStack(err))
continue
}
delay = 0
......@@ -356,7 +356,7 @@ func (s *server) runWSEventLoop(newSession NewSessionCallback) {
s.lock.Unlock()
err = server.Serve(s.streamListener)
if err != nil {
log.Error("http.server.Serve(addr{%s}) = err{%s}", s.addr, err)
log.Error("http.server.Serve(addr{%s}) = err{%s}", s.addr, jerrors.ErrorStack(err))
// panic(err)
}
}()
......@@ -379,7 +379,8 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
defer s.wg.Done()
if certificate, err = tls.LoadX509KeyPair(s.cert, s.privateKey); err != nil {
panic(fmt.Sprintf("tls.LoadX509KeyPair(cert{%s}, privateKey{%s}) = err{%s}", s.cert, s.privateKey, err))
panic(fmt.Sprintf("tls.LoadX509KeyPair(cert{%s}, privateKey{%s}) = err{%s}",
s.cert, s.privateKey, jerrors.ErrorStack(err)))
return
}
config = &tls.Config{
......@@ -392,7 +393,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
if s.caCert != "" {
certPem, err = ioutil.ReadFile(s.caCert)
if err != nil {
panic(fmt.Errorf("ioutil.ReadFile(certFile{%s}) = err{%s}", s.caCert, err))
panic(fmt.Errorf("ioutil.ReadFile(certFile{%s}) = err{%s}", s.caCert, jerrors.ErrorStack(err)))
}
certPool = x509.NewCertPool()
if ok := certPool.AppendCertsFromPEM(certPem); !ok {
......@@ -417,7 +418,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
s.lock.Unlock()
err = server.Serve(tls.NewListener(s.streamListener, config))
if err != nil {
log.Error("http.server.Serve(addr{%s}) = err{%s}", s.addr, err)
log.Error("http.server.Serve(addr{%s}) = err{%s}", s.addr, jerrors.ErrorStack(err))
panic(err)
}
}()
......@@ -427,7 +428,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
// @newSession: new connection callback
func (s *server) RunEventLoop(newSession NewSessionCallback) {
if err := s.listen(); err != nil {
panic(fmt.Errorf("server.listen() = error:%s", err))
panic(fmt.Errorf("server.listen() = error:%s", jerrors.ErrorStack(err)))
}
switch s.endPointType {
......
......@@ -26,7 +26,7 @@ import (
"github.com/AlexStocks/goext/time"
log "github.com/AlexStocks/log4go"
"github.com/gorilla/websocket"
"github.com/juju/errors"
jerrors "github.com/juju/errors"
)
const (
......@@ -300,7 +300,8 @@ func (s *session) RemoveAttribute(key interface{}) {
}
func (s *session) sessionToken() string {
return fmt.Sprintf("{%s:%s:%d:%s<->%s}", s.name, s.EndPoint().EndPointType(), s.ID(), s.LocalAddr(), s.RemoteAddr())
return fmt.Sprintf("{%s:%s:%d:%s<->%s}",
s.name, s.EndPoint().EndPointType(), s.ID(), s.LocalAddr(), s.RemoteAddr())
}
func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error {
......@@ -346,7 +347,7 @@ func (s *session) WriteBytes(pkg []byte) error {
// s.conn.SetWriteTimeout(time.Now().Add(s.wTimeout))
if _, err := s.Connection.Write(pkg); err != nil {
return errors.Annotatef(err, "s.Connection.Write(pkg len:%d)", len(pkg))
return jerrors.Annotatef(err, "s.Connection.Write(pkg len:%d)", len(pkg))
}
s.incWritePkgNum()
......@@ -361,7 +362,6 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error {
return ErrSessionClosed
}
// s.conn.SetWriteTimeout(time.Now().Add(s.wTimeout))
if len(pkgs) == 1 {
// return s.Connection.Write(pkgs[0])
return s.WriteBytes(pkgs[0])
......@@ -389,7 +389,7 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error {
// return s.Connection.Write(arr)
if err = s.WriteBytes(arr); err != nil {
return err
return jerrors.Trace(err)
}
num := len(pkgs) - 1
......@@ -600,7 +600,7 @@ func (s *session) handleTCPPackage() error {
if netError, ok = err.(net.Error); ok && netError.Timeout() {
break
}
log.Error("%s, [session.conn.read] = error{%s}", s.sessionToken(), err)
log.Error("%s, [session.conn.read] = error{%s}", s.sessionToken(), jerrors.ErrorStack(err))
// for (Codec)OnErr
// s.errFlag = true
exit = true
......@@ -624,7 +624,8 @@ func (s *session) handleTCPPackage() error {
err = ErrMsgTooLong
}
if err != nil {
log.Warn("%s, [session.handleTCPPackage] = len{%d}, error{%s}", s.sessionToken(), pkgLen, err)
log.Warn("%s, [session.handleTCPPackage] = len{%d}, error{%s}",
s.sessionToken(), pkgLen, jerrors.ErrorStack(err))
// for (Codec)OnErr
// s.errFlag = true
exit = true
......@@ -671,18 +672,19 @@ func (s *session) handleUDPPackage() error {
}
bufLen, addr, err = conn.read(buf)
log.Debug("conn.read() = bufLen:%d, addr:%#v, err:%s", bufLen, addr, err)
log.Debug("conn.read() = bufLen:%d, addr:%#v, err:%s", bufLen, addr, jerrors.ErrorStack(err))
if netError, ok = err.(net.Error); ok && netError.Timeout() {
continue
}
if err != nil {
log.Error("%s, [session.handleUDPPackage] = len{%d}, error{%s}", s.sessionToken(), bufLen, err)
err = errors.Annotatef(err, "conn.read()")
log.Error("%s, [session.handleUDPPackage] = len{%d}, error{%s}",
s.sessionToken(), bufLen, jerrors.ErrorStack(err))
err = jerrors.Annotatef(err, "conn.read()")
break
}
if bufLen == 0 {
log.Error("conn.read() = bufLen:%d, addr:%s, err:%s", bufLen, addr, err)
log.Error("conn.read() = bufLen:%d, addr:%s, err:%s", bufLen, addr, jerrors.ErrorStack(err))
continue
}
......@@ -692,16 +694,17 @@ func (s *session) handleUDPPackage() error {
}
pkg, pkgLen, err = s.reader.Read(s, buf[:bufLen])
log.Debug("s.reader.Read() = pkg:%#v, pkgLen:%d, err:%s", pkg, pkgLen, err)
log.Debug("s.reader.Read() = pkg:%#v, pkgLen:%d, err:%s", pkg, pkgLen, jerrors.ErrorStack(err))
if err == nil && s.maxMsgLen > 0 && bufLen > int(s.maxMsgLen) {
err = ErrMsgTooLong
}
if err != nil {
log.Warn("%s, [session.handleUDPPackage] = len{%d}, error{%s}", s.sessionToken(), pkgLen, err)
log.Warn("%s, [session.handleUDPPackage] = len{%d}, error{%s}",
s.sessionToken(), pkgLen, jerrors.ErrorStack(err))
continue
}
if pkgLen == 0 {
log.Error("s.reader.Read() = pkg:%#v, pkgLen:%d, err:%s", pkg, pkgLen, err)
log.Error("s.reader.Read() = pkg:%#v, pkgLen:%d, err:%s", pkg, pkgLen, jerrors.ErrorStack(err))
continue
}
......@@ -734,7 +737,8 @@ func (s *session) handleWSPackage() error {
continue
}
if err != nil {
log.Warn("%s, [session.handleWSPackage] = error{%s}", s.sessionToken(), err)
log.Warn("%s, [session.handleWSPackage] = error{%s}",
s.sessionToken(), jerrors.ErrorStack(err))
// s.errFlag = true
return err
}
......@@ -745,7 +749,8 @@ func (s *session) handleWSPackage() error {
err = ErrMsgTooLong
}
if err != nil {
log.Warn("%s, [session.handleWSPackage] = len{%d}, error{%s}", s.sessionToken(), length, err)
log.Warn("%s, [session.handleWSPackage] = len{%d}, error{%s}",
s.sessionToken(), length, jerrors.ErrorStack(err))
continue
}
s.rQ <- unmarshalPkg
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment