Unverified Commit 50264ef3 authored by 望哥's avatar 望哥 Committed by GitHub

Merge pull request #3 from u0x01/master

Dep: pkg/errors & go.uber.org/zap
parents d200327c 070dca7f
......@@ -22,10 +22,8 @@ import (
)
import (
"github.com/AlexStocks/goext/net"
log "github.com/dubbogo/log4go"
"github.com/gorilla/websocket"
jerrors "github.com/juju/errors"
perrors "github.com/pkg/errors"
)
const (
......@@ -131,7 +129,7 @@ func (c *client) dialTCP() Session {
return nil
}
conn, err = net.DialTimeout("tcp", c.addr, connectTimeout)
if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
if err == nil && IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
conn.Close()
err = errSelfConnect
}
......@@ -139,7 +137,7 @@ func (c *client) dialTCP() Session {
return newTCPSession(conn, c)
}
log.Info("net.DialTimeout(addr:%s, timeout:%v) = error{%s}", c.addr, jerrors.ErrorStack(err))
log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, err)
// time.Sleep(connInterval)
<-wheel.After(connInterval)
}
......@@ -163,12 +161,12 @@ func (c *client) dialUDP() Session {
return nil
}
conn, err = net.DialUDP("udp", localAddr, peerAddr)
if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
if err == nil && IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
conn.Close()
err = errSelfConnect
}
if err != nil {
log.Warn("net.DialTimeout(addr:%s, timeout:%v) = error{%s}", c.addr, jerrors.ErrorStack(err))
log.Warnf("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, err)
// time.Sleep(connInterval)
<-wheel.After(connInterval)
continue
......@@ -178,18 +176,18 @@ func (c *client) dialUDP() Session {
conn.SetWriteDeadline(time.Now().Add(1e9))
if length, err = conn.Write(connectPingPackage[:]); err != nil {
conn.Close()
log.Warn("conn.Write(%s) = {length:%d, err:%s}", string(connectPingPackage), length, jerrors.ErrorStack(err))
log.Warnf("conn.Write(%s) = {length:%d, err:%+v}", string(connectPingPackage), length, err)
// time.Sleep(connInterval)
<-wheel.After(connInterval)
continue
}
conn.SetReadDeadline(time.Now().Add(1e9))
length, err = conn.Read(buf)
if netErr, ok := jerrors.Cause(err).(net.Error); ok && netErr.Timeout() {
if netErr, ok := perrors.Cause(err).(net.Error); ok && netErr.Timeout() {
err = nil
}
if err != nil {
log.Info("conn{%#v}.Read() = {length:%d, err:%s}", conn, length, jerrors.ErrorStack(err))
log.Infof("conn{%#v}.Read() = {length:%d, err:%+v}", conn, length, err)
conn.Close()
// time.Sleep(connInterval)
<-wheel.After(connInterval)
......@@ -215,8 +213,8 @@ func (c *client) dialWS() Session {
return nil
}
conn, _, err = dialer.Dial(c.addr, nil)
log.Info("websocket.dialer.Dial(addr:%s) = error:%s", c.addr, jerrors.ErrorStack(err))
if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, err)
if err == nil && IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
conn.Close()
err = errSelfConnect
}
......@@ -229,7 +227,7 @@ func (c *client) dialWS() Session {
return ss
}
log.Info("websocket.dialer.Dial(addr:%s) = error:%s", c.addr, jerrors.ErrorStack(err))
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, err)
// time.Sleep(connInterval)
<-wheel.After(connInterval)
}
......@@ -256,7 +254,7 @@ func (c *client) dialWSS() Session {
if c.cert != "" {
certPEMBlock, err := ioutil.ReadFile(c.cert)
if err != nil {
panic(fmt.Sprintf("ioutil.ReadFile(cert:%s) = error{%s}", c.cert, jerrors.ErrorStack(err)))
panic(fmt.Sprintf("ioutil.ReadFile(cert:%s) = error:%+v", c.cert, err))
}
var cert tls.Certificate
......@@ -278,7 +276,7 @@ func (c *client) dialWSS() Session {
for _, c := range config.Certificates {
roots, err = x509.ParseCertificates(c.Certificate[len(c.Certificate)-1])
if err != nil {
panic(fmt.Sprintf("error parsing server's root cert: %s\n", jerrors.ErrorStack(err)))
panic(fmt.Sprintf("error parsing server's root cert: %+v\n", err))
}
for _, root = range roots {
certPool.AddCert(root)
......@@ -294,7 +292,7 @@ func (c *client) dialWSS() Session {
return nil
}
conn, _, err = dialer.Dial(c.addr, nil)
if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
if err == nil && IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
conn.Close()
err = errSelfConnect
}
......@@ -308,7 +306,7 @@ func (c *client) dialWSS() Session {
return ss
}
log.Info("websocket.dialer.Dial(addr:%s) = error{%s}", c.addr, jerrors.ErrorStack(err))
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, err)
// time.Sleep(connInterval)
<-wheel.After(connInterval)
}
......@@ -399,7 +397,7 @@ func (c *client) reConnect() {
// c.Unlock()
for {
if c.IsClosed() {
log.Warn("client{peer:%s} goroutine exit now.", c.addr)
log.Warnf("client{peer:%s} goroutine exit now.", c.addr)
break
}
......
......@@ -21,10 +21,9 @@ import (
)
import (
log "github.com/dubbogo/log4go"
"github.com/golang/snappy"
"github.com/gorilla/websocket"
jerrors "github.com/juju/errors"
perrors "github.com/pkg/errors"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
)
......@@ -32,7 +31,7 @@ import (
var (
launchTime = time.Now()
// ErrInvalidConnection = errors.New("connection has been closed.")
// ErrInvalidConnection = perrors.New("connection has been closed.")
)
/////////////////////////////////////////
......@@ -193,10 +192,10 @@ func (t *writeFlusher) Write(p []byte) (int, error) {
defer t.lock.Unlock()
n, err = t.flusher.Write(p)
if err != nil {
return n, jerrors.Trace(err)
return n, perrors.WithStack(err)
}
if err := t.flusher.Flush(); err != nil {
return 0, jerrors.Trace(err)
return 0, perrors.WithStack(err)
}
return n, nil
......@@ -243,16 +242,16 @@ func (t *gettyTCPConn) read(p []byte) (int, error) {
currentTime = time.Now()
if currentTime.Sub(t.rLastDeadline) > (t.rTimeout >> 2) {
if err = t.conn.SetReadDeadline(currentTime.Add(t.rTimeout)); err != nil {
return 0, jerrors.Trace(err)
return 0, perrors.WithStack(err)
}
t.rLastDeadline = currentTime
}
}
length, err = t.reader.Read(p)
log.Debug("now:%s, length:%d, err:%s", currentTime, length, err)
log.Debugf("now:%s, length:%d, err:%v", currentTime, length, err)
atomic.AddUint32(&t.readBytes, uint32(length))
return length, jerrors.Trace(err)
return length, perrors.WithStack(err)
//return length, err
}
......@@ -267,7 +266,7 @@ func (t *gettyTCPConn) Write(pkg interface{}) (int, error) {
)
if p, ok = pkg.([]byte); !ok {
return 0, jerrors.Errorf("illegal @pkg{%#v} type", pkg)
return 0, perrors.Errorf("illegal @pkg{%#v} type", pkg)
}
if t.compress == CompressNone && t.wTimeout > 0 {
// Optimization: update write deadline only if more than 25%
......@@ -276,7 +275,7 @@ func (t *gettyTCPConn) Write(pkg interface{}) (int, error) {
currentTime = time.Now()
if currentTime.Sub(t.wLastDeadline) > (t.wTimeout >> 2) {
if err = t.conn.SetWriteDeadline(currentTime.Add(t.wTimeout)); err != nil {
return 0, jerrors.Trace(err)
return 0, perrors.WithStack(err)
}
t.wLastDeadline = currentTime
}
......@@ -285,8 +284,8 @@ func (t *gettyTCPConn) Write(pkg interface{}) (int, error) {
if length, err = t.writer.Write(p); err == nil {
atomic.AddUint32(&t.writeBytes, (uint32)(len(p)))
}
log.Debug("now:%s, length:%d, err:%s", currentTime, length, err)
return length, jerrors.Trace(err)
log.Debugf("now:%s, length:%d, err:%v", currentTime, length, err)
return length, perrors.WithStack(err)
//return length, err
}
......@@ -299,7 +298,7 @@ func (t *gettyTCPConn) close(waitSec int) {
if t.conn != nil {
if writer, ok := t.writer.(*snappy.Writer); ok {
if err := writer.Close(); err != nil {
log.Error("snappy.Writer.Close() = error{%s}", jerrors.ErrorStack(err))
log.Errorf("snappy.Writer.Close() = error:%+v", err)
}
}
t.conn.(*net.TCPConn).SetLinger(waitSec)
......@@ -333,7 +332,7 @@ func setUDPSocketOptions(conn *net.UDPConn) error {
err6 := ipv6.NewPacketConn(conn).SetControlMessage(ipv6.FlagDst|ipv6.FlagInterface, true)
err4 := ipv4.NewPacketConn(conn).SetControlMessage(ipv4.FlagDst|ipv4.FlagInterface, true)
if err6 != nil && err4 != nil {
return jerrors.Trace(err4)
return perrors.WithStack(err4)
}
return nil
}
......@@ -393,20 +392,20 @@ func (u *gettyUDPConn) read(p []byte) (int, *net.UDPAddr, error) {
currentTime = time.Now()
if currentTime.Sub(u.rLastDeadline) > (u.rTimeout >> 2) {
if err = u.conn.SetReadDeadline(currentTime.Add(u.rTimeout)); err != nil {
return 0, nil, jerrors.Trace(err)
return 0, nil, perrors.WithStack(err)
}
u.rLastDeadline = currentTime
}
}
length, addr, err = u.conn.ReadFromUDP(p) // connected udp also can get return @addr
log.Debug("ReadFromUDP() = {length:%d, peerAddr:%s, error:%s}", length, addr, err)
log.Debugf("ReadFromUDP() = {length:%d, peerAddr:%s, error:%v}", length, addr, err)
if err == nil {
atomic.AddUint32(&u.readBytes, uint32(length))
}
//return length, addr, err
return length, addr, jerrors.Trace(err)
return length, addr, perrors.WithStack(err)
}
// write udp packet, @ctx should be of type UDPContext
......@@ -422,10 +421,10 @@ func (u *gettyUDPConn) Write(udpCtx interface{}) (int, error) {
)
if ctx, ok = udpCtx.(UDPContext); !ok {
return 0, jerrors.Errorf("illegal @udpCtx{%s} type, @udpCtx type:%T", udpCtx, udpCtx)
return 0, perrors.Errorf("illegal @udpCtx{%s} type, @udpCtx type:%T", udpCtx, udpCtx)
}
if buf, ok = ctx.Pkg.([]byte); !ok {
return 0, jerrors.Errorf("illegal @udpCtx.Pkg{%#v} type", udpCtx)
return 0, perrors.Errorf("illegal @udpCtx.Pkg{%#v} type", udpCtx)
}
if u.ss.EndPoint().EndPointType() == UDP_ENDPOINT {
peerAddr = ctx.PeerAddr
......@@ -441,7 +440,7 @@ func (u *gettyUDPConn) Write(udpCtx interface{}) (int, error) {
currentTime = time.Now()
if currentTime.Sub(u.wLastDeadline) > (u.wTimeout >> 2) {
if err = u.conn.SetWriteDeadline(currentTime.Add(u.wTimeout)); err != nil {
return 0, jerrors.Trace(err)
return 0, perrors.WithStack(err)
}
u.wLastDeadline = currentTime
}
......@@ -450,9 +449,9 @@ func (u *gettyUDPConn) Write(udpCtx interface{}) (int, error) {
if length, _, err = u.conn.WriteMsgUDP(buf, nil, peerAddr); err == nil {
atomic.AddUint32(&u.writeBytes, (uint32)(len(buf)))
}
log.Debug("WriteMsgUDP(peerAddr:%s) = {length:%d, error:%s}", peerAddr, length, err)
log.Debugf("WriteMsgUDP(peerAddr:%s) = {length:%d, error:%v}", peerAddr, length, err)
return length, jerrors.Trace(err)
return length, perrors.WithStack(err)
//return length, err
}
......@@ -529,7 +528,7 @@ func (w *gettyWSConn) handlePing(message string) error {
w.UpdateActive()
}
return jerrors.Trace(err)
return perrors.WithStack(err)
}
func (w *gettyWSConn) handlePong(string) error {
......@@ -546,11 +545,11 @@ func (w *gettyWSConn) read() ([]byte, error) {
w.incReadPkgNum()
} else {
if websocket.IsUnexpectedCloseError(e, websocket.CloseGoingAway) {
log.Warn("websocket unexpected close error: %v", e)
log.Warnf("websocket unexpected close error: %v", e)
}
}
return b, jerrors.Trace(e)
return b, perrors.WithStack(e)
//return b, e
}
......@@ -567,7 +566,7 @@ func (w *gettyWSConn) updateWriteDeadline() error {
currentTime = time.Now()
if currentTime.Sub(w.wLastDeadline) > (w.wTimeout >> 2) {
if err = w.conn.SetWriteDeadline(currentTime.Add(w.wTimeout)); err != nil {
return jerrors.Trace(err)
return perrors.WithStack(err)
}
w.wLastDeadline = currentTime
}
......@@ -585,25 +584,25 @@ func (w *gettyWSConn) Write(pkg interface{}) (int, error) {
)
if p, ok = pkg.([]byte); !ok {
return 0, jerrors.Errorf("illegal @pkg{%#v} type", pkg)
return 0, perrors.Errorf("illegal @pkg{%#v} type", pkg)
}
w.updateWriteDeadline()
if err = w.conn.WriteMessage(websocket.BinaryMessage, p); err == nil {
atomic.AddUint32(&w.writeBytes, (uint32)(len(p)))
}
return len(p), jerrors.Trace(err)
return len(p), perrors.WithStack(err)
//return len(p), err
}
func (w *gettyWSConn) writePing() error {
w.updateWriteDeadline()
return jerrors.Trace(w.conn.WriteMessage(websocket.PingMessage, []byte{}))
return perrors.WithStack(w.conn.WriteMessage(websocket.PingMessage, []byte{}))
}
func (w *gettyWSConn) writePong(message []byte) error {
w.updateWriteDeadline()
return jerrors.Trace(w.conn.WriteMessage(websocket.PongMessage, message))
return perrors.WithStack(w.conn.WriteMessage(websocket.PongMessage, message))
}
// close websocket connection
......
......@@ -11,9 +11,10 @@ package getty
import (
"compress/flate"
"errors"
"net"
"time"
perrors "github.com/pkg/errors"
)
// NewSessionCallback will be invoked when server accepts a new client connection or client connects to server successfully.
......@@ -119,9 +120,9 @@ type Connection interface {
/////////////////////////////////////////
var (
ErrSessionClosed = errors.New("session Already Closed")
ErrSessionBlocked = errors.New("session Full Blocked")
ErrNullPeerAddr = errors.New("peer address is nil")
ErrSessionClosed = perrors.New("session Already Closed")
ErrSessionBlocked = perrors.New("session Full Blocked")
ErrNullPeerAddr = perrors.New("peer address is nil")
)
type Session interface {
......
module github.com/dubbogo/getty
require (
github.com/AlexStocks/getty v1.0.4
github.com/AlexStocks/goext v0.3.2
github.com/AlexStocks/log4go v1.0.2
github.com/dubbogo/log4go v0.0.0-20190406152735-41c57e1073e9
github.com/fatih/camelcase v1.0.0 // indirect
github.com/fatih/structs v1.1.0 // indirect
github.com/gogo/protobuf v1.2.1
github.com/golang/snappy v0.0.1
github.com/gorilla/websocket v1.4.0
github.com/juju/errors v0.0.0-20190207033735-e65537c515d7
github.com/k0kubun/pp v3.0.1+incompatible // indirect
github.com/koding/multiconfig v0.0.0-20171124222453-69c27309b2d7
go.etcd.io/etcd v3.3.13+incompatible // indirect
github.com/pkg/errors v0.8.1
github.com/stretchr/testify v1.3.0 // indirect
go.uber.org/atomic v1.4.0 // indirect
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.10.0
golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53
gopkg.in/yaml.v2 v2.2.2
)
This diff is collapsed.
package getty
import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// Logger for user who want to customize logger of getty
type Logger interface {
Info(args ...interface{})
Warn(args ...interface{})
Error(args ...interface{})
Debug(args ...interface{})
Infof(fmt string, args ...interface{})
Warnf(fmt string, args ...interface{})
Errorf(fmt string, args ...interface{})
Debugf(fmt string, args ...interface{})
}
type LoggerLevel int8
const (
// DebugLevel logs are typically voluminous, and are usually disabled in
// production.
LoggerLevelDebug = LoggerLevel(zapcore.DebugLevel)
// InfoLevel is the default logging priority.
LoggerLevelInfo = LoggerLevel(zapcore.InfoLevel)
// WarnLevel logs are more important than Infof, but don't need individual
// human review.
LoggerLevelWarn = LoggerLevel(zapcore.WarnLevel)
// ErrorLevel logs are high-priority. If an application is running smoothly,
// it shouldn't generate any error-level logs.
LoggerLevelError = LoggerLevel(zapcore.ErrorLevel)
// DPanicLevel logs are particularly important errors. In development the
// logger panics after writing the message.
LoggerLevelDPanic = LoggerLevel(zapcore.DPanicLevel)
// PanicLevel logs a message, then panics.
LoggerLevelPanic = LoggerLevel(zapcore.PanicLevel)
// FatalLevel logs a message, then calls os.Exit(1).
LoggerLevelFatal = LoggerLevel(zapcore.FatalLevel)
)
var (
log Logger
zapLogger *zap.Logger
zapLoggerConfig = zap.NewDevelopmentConfig()
zapLoggerEncoderConfig = zapcore.EncoderConfig{
TimeKey: "time",
LevelKey: "level",
NameKey: "logger",
CallerKey: "caller",
MessageKey: "message",
StacktraceKey: "stacktrace",
EncodeLevel: zapcore.CapitalColorLevelEncoder,
EncodeTime: zapcore.ISO8601TimeEncoder,
EncodeDuration: zapcore.SecondsDurationEncoder,
EncodeCaller: zapcore.ShortCallerEncoder,
}
)
func init() {
zapLoggerConfig.EncoderConfig = zapLoggerEncoderConfig
zapLogger, _ = zapLoggerConfig.Build()
log = zapLogger.Sugar()
// todo: flushes buffer when redirect log to file.
// var exitSignal = make(chan os.Signal)
// signal.Notify(exitSignal, syscall.SIGTERM, syscall.SIGINT)
// go func() {
// <-exitSignal
// // Sync calls the underlying Core's Sync method, flushing any buffered log
// // entries. Applications should take care to call Sync before exiting.
// err := zapLogger.Sync() // flushes buffer, if any
// if err != nil {
// fmt.Printf("zapLogger sync err: %+v", perrors.WithStack(err))
// }
// os.Exit(0)
// }()
}
// SetLogger: customize yourself logger.
func SetLogger(logger Logger) {
log = logger
}
// SetLoggerLevel
func SetLoggerLevel(level LoggerLevel) error {
var err error
zapLoggerConfig.Level = zap.NewAtomicLevelAt(zapcore.Level(level))
zapLogger, err = zapLoggerConfig.Build()
if err != nil {
return err
}
log = zapLogger.Sugar()
return nil
}
// SetLoggerCallerDisable: disable caller info in production env for performance improve.
// It is highly recommended that you execute this method in a production environment.
func SetLoggerCallerDisable() error {
var err error
zapLoggerConfig.Development = false
zapLoggerConfig.DisableCaller = true
zapLogger, err = zapLoggerConfig.Build()
if err != nil {
return err
}
log = zapLogger.Sugar()
return nil
}
......@@ -68,9 +68,9 @@ func WithWebsocketServerRootCert(cert string) ServerOption {
type ClientOption func(*ClientOptions)
type ClientOptions struct {
addr string
number int
reconnectInterval int// reConnect Interval
addr string
number int
reconnectInterval int // reConnect Interval
// the cert file of wss server which may contain server domain, server ip, the starting effective date, effective
// duration, the hash alg, the len of the private key.
......
......@@ -22,16 +22,13 @@ import (
)
import (
"github.com/AlexStocks/goext/net"
"github.com/AlexStocks/goext/time"
log "github.com/dubbogo/log4go"
"github.com/gorilla/websocket"
jerrors "github.com/juju/errors"
perrors "github.com/pkg/errors"
)
var (
errSelfConnect = jerrors.New("connect self!")
serverFastFailTimeout = gxtime.TimeSecondDuration(1)
errSelfConnect = perrors.New("connect self!")
serverFastFailTimeout = time.Second * 1
)
type server struct {
......@@ -119,7 +116,7 @@ func (s *server) stop() {
if err = s.server.Shutdown(ctx); err != nil {
// if the log output is "shutdown ctx: context deadline exceeded", it means that
// there are still some active connections.
log.Error("server shutdown ctx:%s error:%s", ctx, err)
log.Errorf("server shutdown ctx:%s error:%v", ctx, err)
}
}
s.server = nil
......@@ -157,7 +154,7 @@ func (s *server) listenTCP() error {
streamListener, err = net.Listen("tcp", s.addr)
if err != nil {
return jerrors.Annotatef(err, "net.Listen(tcp, addr:%s))", s.addr)
return perrors.Wrapf(err, "net.Listen(tcp, addr:%s))", s.addr)
}
s.streamListener = streamListener
......@@ -174,14 +171,14 @@ func (s *server) listenUDP() error {
localAddr, err = net.ResolveUDPAddr("udp", s.addr)
if err != nil {
return jerrors.Annotatef(err, "net.ResolveUDPAddr(udp, addr:%s)", s.addr)
return perrors.Wrapf(err, "net.ResolveUDPAddr(udp, addr:%s)", s.addr)
}
pktListener, err = net.ListenUDP("udp", localAddr)
if err != nil {
return jerrors.Annotatef(err, "net.ListenUDP((udp, localAddr:%#v)", localAddr)
return perrors.Wrapf(err, "net.ListenUDP((udp, localAddr:%#v)", localAddr)
}
// if err = setUDPSocketOptions(pktListener); err != nil {
// return errors.Wrapf(err, "setUDPSocketOptions(pktListener:%#v)", pktListener)
// return perrors.Wrapf(err, "setUDPSocketOptions(pktListener:%#v)", pktListener)
// }
s.pktListener = pktListener
......@@ -193,9 +190,9 @@ func (s *server) listenUDP() error {
func (s *server) listen() error {
switch s.endPointType {
case TCP_SERVER, WS_SERVER, WSS_SERVER:
return jerrors.Trace(s.listenTCP())
return perrors.WithStack(s.listenTCP())
case UDP_ENDPOINT:
return jerrors.Trace(s.listenUDP())
return perrors.WithStack(s.listenUDP())
}
return nil
......@@ -204,10 +201,10 @@ func (s *server) listen() error {
func (s *server) accept(newSession NewSessionCallback) (Session, error) {
conn, err := s.streamListener.Accept()
if err != nil {
return nil, jerrors.Trace(err)
return nil, perrors.WithStack(err)
}
if gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
log.Warn("conn.localAddr{%s} == conn.RemoteAddr", conn.LocalAddr().String(), conn.RemoteAddr().String())
if IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
log.Warnf("conn.localAddr{%s} == conn.RemoteAddr", conn.LocalAddr().String(), conn.RemoteAddr().String())
return nil, errSelfConnect
}
......@@ -215,7 +212,7 @@ func (s *server) accept(newSession NewSessionCallback) (Session, error) {
err = newSession(ss)
if err != nil {
conn.Close()
return nil, jerrors.Trace(err)
return nil, perrors.WithStack(err)
}
return ss, nil
......@@ -232,7 +229,7 @@ func (s *server) runTcpEventLoop(newSession NewSessionCallback) {
)
for {
if s.IsClosed() {
log.Warn("server{%s} stop acceptting client connect request.", s.addr)
log.Warnf("server{%s} stop acceptting client connect request.", s.addr)
return
}
if delay != 0 {
......@@ -252,7 +249,7 @@ func (s *server) runTcpEventLoop(newSession NewSessionCallback) {
}
continue
}
log.Warn("server{%s}.Accept() = err {%#v}", s.addr, jerrors.ErrorStack(err))
log.Warnf("server{%s}.Accept() = err {%+v}", s.addr, err)
continue
}
delay = 0
......@@ -303,17 +300,17 @@ func (s *wsHandler) serveWSRequest(w http.ResponseWriter, r *http.Request) {
if s.server.IsClosed() {
http.Error(w, "HTTP server is closed(code:500-11).", 500)
log.Warn("server{%s} stop acceptting client connect request.", s.server.addr)
log.Warnf("server{%s} stop acceptting client connect request.", s.server.addr)
return
}
conn, err := s.upgrader.Upgrade(w, r, nil)
if err != nil {
log.Warn("upgrader.Upgrader(http.Request{%#v}) = error{%s}", r, err)
log.Warnf("upgrader.Upgrader(http.Request{%#v}) = error:%+v", r, err)
return
}
if conn.RemoteAddr().String() == conn.LocalAddr().String() {
log.Warn("conn.localAddr{%s} == conn.RemoteAddr", conn.LocalAddr().String(), conn.RemoteAddr().String())
log.Warnf("conn.localAddr{%s} == conn.RemoteAddr", conn.LocalAddr().String(), conn.RemoteAddr().String())
return
}
// conn.SetReadLimit(int64(handler.maxMsgLen))
......@@ -321,7 +318,7 @@ func (s *wsHandler) serveWSRequest(w http.ResponseWriter, r *http.Request) {
err = s.newSession(ss)
if err != nil {
conn.Close()
log.Warn("server{%s}.newSession(ss{%#v}) = err {%s}", s.server.addr, ss, err)
log.Warnf("server{%s}.newSession(ss{%#v}) = err {%s}", s.server.addr, ss, err)
return
}
if ss.(*session).maxMsgLen > 0 {
......@@ -355,7 +352,7 @@ func (s *server) runWSEventLoop(newSession NewSessionCallback) {
s.lock.Unlock()
err = server.Serve(s.streamListener)
if err != nil {
log.Error("http.server.Serve(addr{%s}) = err{%s}", s.addr, jerrors.ErrorStack(err))
log.Errorf("http.server.Serve(addr{%s}) = err{%+v}", s.addr, err)
// panic(err)
}
}()
......@@ -378,8 +375,8 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
defer s.wg.Done()
if certificate, err = tls.LoadX509KeyPair(s.cert, s.privateKey); err != nil {
panic(fmt.Sprintf("tls.LoadX509KeyPair(cert{%s}, privateKey{%s}) = err{%s}",
s.cert, s.privateKey, jerrors.ErrorStack(err)))
panic(fmt.Sprintf("tls.LoadX509KeyPair(cert{%s}, privateKey{%s}) = err{%+v}",
s.cert, s.privateKey, err))
return
}
config = &tls.Config{
......@@ -392,7 +389,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
if s.caCert != "" {
certPem, err = ioutil.ReadFile(s.caCert)
if err != nil {
panic(fmt.Errorf("ioutil.ReadFile(certFile{%s}) = err{%s}", s.caCert, jerrors.ErrorStack(err)))
panic(fmt.Errorf("ioutil.ReadFile(certFile{%s}) = err{%+v}", s.caCert, err))
}
certPool = x509.NewCertPool()
if ok := certPool.AppendCertsFromPEM(certPem); !ok {
......@@ -417,7 +414,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
s.lock.Unlock()
err = server.Serve(tls.NewListener(s.streamListener, config))
if err != nil {
log.Error("http.server.Serve(addr{%s}) = err{%s}", s.addr, jerrors.ErrorStack(err))
log.Errorf("http.server.Serve(addr{%s}) = err{%+v}", s.addr, err)
panic(err)
}
}()
......@@ -427,7 +424,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
// @newSession: new connection callback
func (s *server) RunEventLoop(newSession NewSessionCallback) {
if err := s.listen(); err != nil {
panic(fmt.Errorf("server.listen() = error:%s", jerrors.ErrorStack(err)))
panic(fmt.Errorf("server.listen() = error:%+v", err))
}
switch s.endPointType {
......
This diff is collapsed.
package getty
import (
"net"
"strings"
"context"
"sync"
"time"
)
// refers from https://github.com/facebookgo/grace/blob/master/gracenet/net.go#L180:6
func IsSameAddr(a1, a2 net.Addr) bool {
if a1.Network() != a2.Network() {
return false
}
a1s := a1.String()
a2s := a2.String()
if a1s == a2s {
return true
}
// This allows for ipv6 vs ipv4 local addresses to compare as equal. This
// scenario is common when listening on localhost.
const ipv6prefix = "[::]"
a1s = strings.TrimPrefix(a1s, ipv6prefix)
a2s = strings.TrimPrefix(a2s, ipv6prefix)
const ipv4prefix = "0.0.0.0"
a1s = strings.TrimPrefix(a1s, ipv4prefix)
a2s = strings.TrimPrefix(a2s, ipv4prefix)
return a1s == a2s
}
var (
defaultCtxKey int = 1
)
type Values struct {
m map[interface{}]interface{}
}
func (v Values) Get(key interface{}) (interface{}, bool) {
i, b := v.m[key]
return i, b
}
func (c Values) Set(key interface{}, value interface{}) {
c.m[key] = value
}
func (c Values) Delete(key interface{}) {
delete(c.m, key)
}
type ValuesContext struct {
context.Context
}
func NewValuesContext(ctx context.Context) *ValuesContext {
if ctx == nil {
ctx = context.Background()
}
return &ValuesContext{
Context: context.WithValue(
ctx,
defaultCtxKey,
Values{m: make(map[interface{}]interface{})},
),
}
}
func (c *ValuesContext) Get(key interface{}) (interface{}, bool) {
return c.Context.Value(defaultCtxKey).(Values).Get(key)
}
func (c *ValuesContext) Delete(key interface{}) {
c.Context.Value(defaultCtxKey).(Values).Delete(key)
}
func (c *ValuesContext) Set(key interface{}, value interface{}) {
c.Context.Value(defaultCtxKey).(Values).Set(key, value)
}
type Wheel struct {
sync.RWMutex
span time.Duration
period time.Duration
ticker *time.Ticker
index int
ring []chan struct{}
once sync.Once
now time.Time
}
func NewWheel(span time.Duration, buckets int) *Wheel {
var (
w *Wheel
)
if span == 0 {
panic("@span == 0")
}
if buckets == 0 {
panic("@bucket == 0")
}
w = &Wheel{
span: span,
period: span * (time.Duration(buckets)),
ticker: time.NewTicker(span),
index: 0,
ring: make([](chan struct{}), buckets),
now: time.Now(),
}
go func() {
var notify chan struct{}
// var cw CountWatch
// cw.Start()
for t := range w.ticker.C {
w.Lock()
w.now = t
// fmt.Println("index:", w.index, ", value:", w.bitmap.Get(w.index))
notify = w.ring[w.index]
w.ring[w.index] = nil
w.index = (w.index + 1) % len(w.ring)
w.Unlock()
if notify != nil {
close(notify)
}
}
// fmt.Println("timer costs:", cw.Count()/1e9, "s")
}()
return w
}
func (w *Wheel) Stop() {
w.once.Do(func() { w.ticker.Stop() })
}
func (w *Wheel) After(timeout time.Duration) <-chan struct{} {
if timeout >= w.period {
panic("@timeout over ring's life period")
}
var pos = int(timeout / w.span)
if 0 < pos {
pos--
}
w.Lock()
pos = (w.index + pos) % len(w.ring)
if w.ring[pos] == nil {
w.ring[pos] = make(chan struct{})
}
// fmt.Println("pos:", pos)
c := w.ring[pos]
w.Unlock()
return c
}
func (w *Wheel) Now() time.Time {
w.RLock()
now := w.now
w.RUnlock()
return now
}
type CountWatch struct {
start time.Time
}
func (w *CountWatch) Start() {
var t time.Time
if t.Equal(w.start) {
w.start = time.Now()
}
}
func (w *CountWatch) Reset() {
w.start = time.Now()
}
func (w *CountWatch) Count() int64 {
return time.Since(w.start).Nanoseconds()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment