Unverified Commit 076a8069 authored by 望哥's avatar 望哥 Committed by GitHub

Merge pull request #37 from divebomb/master

Add: listen on random local port
parents 45d3d7d6 f3924fe7
---
name: Bug Report
about: Report a bug
labels: kind/bug
---
<!-- Please use this template while reporting a bug and provide as much info as possible. Not doing so may result in your bug not being addressed in a timely manner. Thanks!
-->
**What happened**:
**What you expected to happen**:
**How to reproduce it (as minimally and precisely as possible)**:
**Anything else we need to know?**:
---
name: Enhancement Request
about: Suggest an enhancement
labels: kind/feature
---
<!-- Please only use this template for submitting enhancement requests -->
**What would you like to be added**:
**Why is this needed**:
\ No newline at end of file
<!-- Thanks for sending a pull request!
-->
**What this PR does**:
**Which issue(s) this PR fixes**:
<!--
*Automatically closes linked issue when PR is merged.
Usage: `Fixes #<issue number>`, or `Fixes (paste link of issue)`.
_If PR is about `failing-tests or flakes`, please post the related issues/tests in a comment and do not use `Fixes`_*
-->
Fixes #
**Special notes for your reviewer**:
**Does this PR introduce a user-facing change?**:
<!--
If no, just write "NONE" in the release-note block below.
If yes, a release note is required:
Enter your extended release note in the block below. If the PR requires additional action from users switching to the new release, include the string "action required".
-->
```release-note
```
\ No newline at end of file
language: go language: go
os:
- linux
go: go:
- "1.13" - "1.13"
env: env:
- GO111MODULE=on - GO111MODULE=on
install: true
script: script:
- go fmt ./... && [[ -z `git status -s` ]] - go fmt ./... && [[ -z `git status -s` ]]
- go mod vendor && go test $(go list ./... | grep -v vendor | grep -v demo) -coverprofile=coverage.txt -covermode=atomic - go mod vendor && go test $(go list ./... | grep -v vendor | grep -v demo) -coverprofile=coverage.txt -covermode=atomic
......
...@@ -153,7 +153,7 @@ func (c *client) dialTCP() Session { ...@@ -153,7 +153,7 @@ func (c *client) dialTCP() Session {
return newTCPSession(conn, c) return newTCPSession(conn, c)
} }
log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, err) log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, perrors.WithStack(err))
<-wheel.After(connectInterval) <-wheel.After(connectInterval)
} }
} }
...@@ -185,7 +185,7 @@ func (c *client) dialUDP() Session { ...@@ -185,7 +185,7 @@ func (c *client) dialUDP() Session {
err = errSelfConnect err = errSelfConnect
} }
if err != nil { if err != nil {
log.Warnf("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, err) log.Warnf("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, perrors.WithStack(err))
<-wheel.After(connectInterval) <-wheel.After(connectInterval)
continue continue
} }
...@@ -194,7 +194,7 @@ func (c *client) dialUDP() Session { ...@@ -194,7 +194,7 @@ func (c *client) dialUDP() Session {
conn.SetWriteDeadline(time.Now().Add(1e9)) conn.SetWriteDeadline(time.Now().Add(1e9))
if length, err = conn.Write(connectPingPackage[:]); err != nil { if length, err = conn.Write(connectPingPackage[:]); err != nil {
conn.Close() conn.Close()
log.Warnf("conn.Write(%s) = {length:%d, err:%+v}", string(connectPingPackage), length, err) log.Warnf("conn.Write(%s) = {length:%d, err:%+v}", string(connectPingPackage), length, perrors.WithStack(err))
<-wheel.After(connectInterval) <-wheel.After(connectInterval)
continue continue
} }
...@@ -204,7 +204,7 @@ func (c *client) dialUDP() Session { ...@@ -204,7 +204,7 @@ func (c *client) dialUDP() Session {
err = nil err = nil
} }
if err != nil { if err != nil {
log.Infof("conn{%#v}.Read() = {length:%d, err:%+v}", conn, length, err) log.Infof("conn{%#v}.Read() = {length:%d, err:%+v}", conn, length, perrors.WithStack(err))
conn.Close() conn.Close()
<-wheel.After(connectInterval) <-wheel.After(connectInterval)
continue continue
...@@ -229,7 +229,7 @@ func (c *client) dialWS() Session { ...@@ -229,7 +229,7 @@ func (c *client) dialWS() Session {
return nil return nil
} }
conn, _, err = dialer.Dial(c.addr, nil) conn, _, err = dialer.Dial(c.addr, nil)
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, err) log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, perrors.WithStack(err))
if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) { if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
conn.Close() conn.Close()
err = errSelfConnect err = errSelfConnect
...@@ -243,7 +243,7 @@ func (c *client) dialWS() Session { ...@@ -243,7 +243,7 @@ func (c *client) dialWS() Session {
return ss return ss
} }
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, err) log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, perrors.WithStack(err))
<-wheel.After(connectInterval) <-wheel.After(connectInterval)
} }
} }
...@@ -269,7 +269,7 @@ func (c *client) dialWSS() Session { ...@@ -269,7 +269,7 @@ func (c *client) dialWSS() Session {
if c.cert != "" { if c.cert != "" {
certPEMBlock, err := ioutil.ReadFile(c.cert) certPEMBlock, err := ioutil.ReadFile(c.cert)
if err != nil { if err != nil {
panic(fmt.Sprintf("ioutil.ReadFile(cert:%s) = error:%+v", c.cert, err)) panic(fmt.Sprintf("ioutil.ReadFile(cert:%s) = error:%+v", c.cert, perrors.WithStack(err)))
} }
var cert tls.Certificate var cert tls.Certificate
...@@ -291,7 +291,7 @@ func (c *client) dialWSS() Session { ...@@ -291,7 +291,7 @@ func (c *client) dialWSS() Session {
for _, c := range config.Certificates { for _, c := range config.Certificates {
roots, err = x509.ParseCertificates(c.Certificate[len(c.Certificate)-1]) roots, err = x509.ParseCertificates(c.Certificate[len(c.Certificate)-1])
if err != nil { if err != nil {
panic(fmt.Sprintf("error parsing server's root cert: %+v\n", err)) panic(fmt.Sprintf("error parsing server's root cert: %+v\n", perrors.WithStack(err)))
} }
for _, root = range roots { for _, root = range roots {
certPool.AddCert(root) certPool.AddCert(root)
...@@ -321,7 +321,7 @@ func (c *client) dialWSS() Session { ...@@ -321,7 +321,7 @@ func (c *client) dialWSS() Session {
return ss return ss
} }
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, err) log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, perrors.WithStack(err))
<-wheel.After(connectInterval) <-wheel.After(connectInterval)
} }
} }
...@@ -387,7 +387,7 @@ func (c *client) connect() { ...@@ -387,7 +387,7 @@ func (c *client) connect() {
} }
} }
// there are two methods to keep connection pool. the first approch is like // there are two methods to keep connection pool. the first approach is like
// redigo's lazy connection pool(https://github.com/gomodule/redigo/blob/master/redis/pool.go:), // redigo's lazy connection pool(https://github.com/gomodule/redigo/blob/master/redis/pool.go:),
// in which you should apply testOnBorrow to check alive of the connection. // in which you should apply testOnBorrow to check alive of the connection.
// the second way is as follows. @RunEventLoop detects the aliveness of the connection // the second way is as follows. @RunEventLoop detects the aliveness of the connection
...@@ -405,13 +405,11 @@ func (c *client) RunEventLoop(newSession NewSessionCallback) { ...@@ -405,13 +405,11 @@ func (c *client) RunEventLoop(newSession NewSessionCallback) {
func (c *client) reConnect() { func (c *client) reConnect() {
var num, max, times, interval int var num, max, times, interval int
// c.Lock()
max = c.number max = c.number
interval = c.reconnectInterval interval = c.reconnectInterval
if interval == 0 { if interval == 0 {
interval = reconnectInterval interval = reconnectInterval
} }
// c.Unlock()
for { for {
if c.IsClosed() { if c.IsClosed() {
log.Warnf("client{peer:%s} goroutine exit now.", c.addr) log.Warnf("client{peer:%s} goroutine exit now.", c.addr)
......
...@@ -276,6 +276,8 @@ func TestNewWSClient(t *testing.T) { ...@@ -276,6 +276,8 @@ func TestNewWSClient(t *testing.T) {
beforeWritePkgNum := atomic.LoadUint32(&conn.writePkgNum) beforeWritePkgNum := atomic.LoadUint32(&conn.writePkgNum)
err = ss.WriteBytes([]byte("hello")) err = ss.WriteBytes([]byte("hello"))
assert.Equal(t, beforeWritePkgNum+1, atomic.LoadUint32(&conn.writePkgNum)) assert.Equal(t, beforeWritePkgNum+1, atomic.LoadUint32(&conn.writePkgNum))
err = ss.WriteBytesArray([]byte("hello"), []byte("hello"))
assert.Equal(t, beforeWritePkgNum+3, atomic.LoadUint32(&conn.writePkgNum))
err = conn.writePing() err = conn.writePing()
assert.Nil(t, err) assert.Nil(t, err)
......
module github.com/dubbogo/getty module github.com/dubbogo/getty
require ( require (
github.com/dubbogo/gost v1.5.2 github.com/dubbogo/gost v1.9.0
github.com/golang/snappy v0.0.1 github.com/golang/snappy v0.0.1
github.com/gorilla/websocket v1.4.0 github.com/gorilla/websocket v1.4.0
github.com/juju/errors v0.0.0-20190930114154-d42613fe1ab9 github.com/juju/errors v0.0.0-20190930114154-d42613fe1ab9
......
...@@ -2,6 +2,8 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8 ...@@ -2,6 +2,8 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dubbogo/gost v1.5.2 h1:ri/03971hdpnn3QeCU+4UZgnRNGDXLDGDucR/iozZm8= github.com/dubbogo/gost v1.5.2 h1:ri/03971hdpnn3QeCU+4UZgnRNGDXLDGDucR/iozZm8=
github.com/dubbogo/gost v1.5.2/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8= github.com/dubbogo/gost v1.5.2/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/dubbogo/gost v1.9.0 h1:UT+dWwvLyJiDotxJERO75jB3Yxgsdy10KztR5ycxRAk=
github.com/dubbogo/gost v1.9.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
......
...@@ -17,6 +17,7 @@ import ( ...@@ -17,6 +17,7 @@ import (
"io/ioutil" "io/ioutil"
"net" "net"
"net/http" "net/http"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
...@@ -67,10 +68,6 @@ func newServer(t EndPointType, opts ...ServerOption) *server { ...@@ -67,10 +68,6 @@ func newServer(t EndPointType, opts ...ServerOption) *server {
s.init(opts...) s.init(opts...)
if s.addr == "" {
panic(fmt.Sprintf("@addr:%s", s.addr))
}
return s return s
} }
...@@ -163,9 +160,16 @@ func (s *server) listenTCP() error { ...@@ -163,9 +160,16 @@ func (s *server) listenTCP() error {
streamListener net.Listener streamListener net.Listener
) )
if len(s.addr) == 0 || !strings.Contains(s.addr, ":") {
streamListener, err = gxnet.ListenOnTCPRandomPort(s.addr)
if err != nil {
return perrors.Wrapf(err, "gxnet.ListenOnTCPRandomPort(addr:%s)", s.addr)
}
} else {
streamListener, err = net.Listen("tcp", s.addr) streamListener, err = net.Listen("tcp", s.addr)
if err != nil { if err != nil {
return perrors.Wrapf(err, "net.Listen(tcp, addr:%s))", s.addr) return perrors.Wrapf(err, "net.Listen(tcp, addr:%s)", s.addr)
}
} }
s.streamListener = streamListener s.streamListener = streamListener
...@@ -180,6 +184,12 @@ func (s *server) listenUDP() error { ...@@ -180,6 +184,12 @@ func (s *server) listenUDP() error {
pktListener *net.UDPConn pktListener *net.UDPConn
) )
if len(s.addr) == 0 || !strings.Contains(s.addr, ":") {
pktListener, err = gxnet.ListenOnUDPRandomPort(s.addr)
if err != nil {
return perrors.Wrapf(err, "gxnet.ListenOnUDPRandomPort(addr:%s)", s.addr)
}
} else {
localAddr, err = net.ResolveUDPAddr("udp", s.addr) localAddr, err = net.ResolveUDPAddr("udp", s.addr)
if err != nil { if err != nil {
return perrors.Wrapf(err, "net.ResolveUDPAddr(udp, addr:%s)", s.addr) return perrors.Wrapf(err, "net.ResolveUDPAddr(udp, addr:%s)", s.addr)
...@@ -188,6 +198,7 @@ func (s *server) listenUDP() error { ...@@ -188,6 +198,7 @@ func (s *server) listenUDP() error {
if err != nil { if err != nil {
return perrors.Wrapf(err, "net.ListenUDP((udp, localAddr:%#v)", localAddr) return perrors.Wrapf(err, "net.ListenUDP((udp, localAddr:%#v)", localAddr)
} }
}
s.pktListener = pktListener s.pktListener = pktListener
...@@ -213,7 +224,7 @@ func (s *server) accept(newSession NewSessionCallback) (Session, error) { ...@@ -213,7 +224,7 @@ func (s *server) accept(newSession NewSessionCallback) (Session, error) {
} }
if gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) { if gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
log.Warnf("conn.localAddr{%s} == conn.RemoteAddr", conn.LocalAddr().String(), conn.RemoteAddr().String()) log.Warnf("conn.localAddr{%s} == conn.RemoteAddr", conn.LocalAddr().String(), conn.RemoteAddr().String())
return nil, errSelfConnect return nil, perrors.WithStack(errSelfConnect)
} }
ss := newTCPSession(conn, s) ss := newTCPSession(conn, s)
...@@ -237,7 +248,7 @@ func (s *server) runTcpEventLoop(newSession NewSessionCallback) { ...@@ -237,7 +248,7 @@ func (s *server) runTcpEventLoop(newSession NewSessionCallback) {
) )
for { for {
if s.IsClosed() { if s.IsClosed() {
log.Warnf("server{%s} stop acceptting client connect request.", s.addr) log.Warnf("server{%s} stop accepting client connect request.", s.addr)
return return
} }
if delay != 0 { if delay != 0 {
...@@ -256,7 +267,7 @@ func (s *server) runTcpEventLoop(newSession NewSessionCallback) { ...@@ -256,7 +267,7 @@ func (s *server) runTcpEventLoop(newSession NewSessionCallback) {
} }
continue continue
} }
log.Warnf("server{%s}.Accept() = err {%+v}", s.addr, err) log.Warnf("server{%s}.Accept() = err {%+v}", s.addr, perrors.WithStack(err))
continue continue
} }
delay = 0 delay = 0
...@@ -357,8 +368,7 @@ func (s *server) runWSEventLoop(newSession NewSessionCallback) { ...@@ -357,8 +368,7 @@ func (s *server) runWSEventLoop(newSession NewSessionCallback) {
s.lock.Unlock() s.lock.Unlock()
err = server.Serve(s.streamListener) err = server.Serve(s.streamListener)
if err != nil { if err != nil {
log.Errorf("http.server.Serve(addr{%s}) = err{%+v}", s.addr, err) log.Errorf("http.server.Serve(addr{%s}) = err:%+v", s.addr, perrors.WithStack(err))
// panic(err)
} }
}() }()
} }
...@@ -380,8 +390,8 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) { ...@@ -380,8 +390,8 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
defer s.wg.Done() defer s.wg.Done()
if certificate, err = tls.LoadX509KeyPair(s.cert, s.privateKey); err != nil { if certificate, err = tls.LoadX509KeyPair(s.cert, s.privateKey); err != nil {
panic(fmt.Sprintf("tls.LoadX509KeyPair(cert{%s}, privateKey{%s}) = err{%+v}", panic(fmt.Sprintf("tls.LoadX509KeyPair(cert{%s}, privateKey{%s}) = err:%+v",
s.cert, s.privateKey, err)) s.cert, s.privateKey, perrors.WithStack(err)))
return return
} }
config = &tls.Config{ config = &tls.Config{
...@@ -394,7 +404,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) { ...@@ -394,7 +404,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
if s.caCert != "" { if s.caCert != "" {
certPem, err = ioutil.ReadFile(s.caCert) certPem, err = ioutil.ReadFile(s.caCert)
if err != nil { if err != nil {
panic(fmt.Errorf("ioutil.ReadFile(certFile{%s}) = err{%+v}", s.caCert, err)) panic(fmt.Errorf("ioutil.ReadFile(certFile{%s}) = err:%+v", s.caCert, perrors.WithStack(err)))
} }
certPool = x509.NewCertPool() certPool = x509.NewCertPool()
if ok := certPool.AppendCertsFromPEM(certPem); !ok { if ok := certPool.AppendCertsFromPEM(certPem); !ok {
...@@ -419,7 +429,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) { ...@@ -419,7 +429,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
s.lock.Unlock() s.lock.Unlock()
err = server.Serve(tls.NewListener(s.streamListener, config)) err = server.Serve(tls.NewListener(s.streamListener, config))
if err != nil { if err != nil {
log.Errorf("http.server.Serve(addr{%s}) = err{%+v}", s.addr, err) log.Errorf("http.server.Serve(addr{%s}) = err:%+v", s.addr, perrors.WithStack(err))
panic(err) panic(err)
} }
}() }()
...@@ -429,7 +439,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) { ...@@ -429,7 +439,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
// @newSession: new connection callback // @newSession: new connection callback
func (s *server) RunEventLoop(newSession NewSessionCallback) { func (s *server) RunEventLoop(newSession NewSessionCallback) {
if err := s.listen(); err != nil { if err := s.listen(); err != nil {
panic(fmt.Errorf("server.listen() = error:%+v", err)) panic(fmt.Errorf("server.listen() = error:%+v", perrors.WithStack(err)))
} }
switch s.endPointType { switch s.endPointType {
......
...@@ -9,16 +9,16 @@ import ( ...@@ -9,16 +9,16 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
func TestTCPServer(t *testing.T) { func testTCPServer(t *testing.T, address string) {
var ( var (
server *server server *server
serverMsgHandler MessageHandler serverMsgHandler MessageHandler
) )
addr := "127.0.0.1:0"
func() { func() {
server = newServer( server = newServer(
TCP_SERVER, TCP_SERVER,
WithLocalAddress(addr), WithLocalAddress(address),
) )
newServerSession := func(session Session) error { newServerSession := func(session Session) error {
return newSessionCallback(session, &serverMsgHandler) return newSessionCallback(session, &serverMsgHandler)
...@@ -26,11 +26,12 @@ func TestTCPServer(t *testing.T) { ...@@ -26,11 +26,12 @@ func TestTCPServer(t *testing.T) {
server.RunEventLoop(newServerSession) server.RunEventLoop(newServerSession)
assert.True(t, server.ID() > 0) assert.True(t, server.ID() > 0)
assert.True(t, server.EndPointType() == TCP_SERVER) assert.True(t, server.EndPointType() == TCP_SERVER)
assert.NotNil(t, server.streamListener)
}() }()
time.Sleep(500e6) time.Sleep(500e6)
addr = server.streamListener.Addr().String() addr := server.streamListener.Addr().String()
t.Logf("server addr: %v", addr) t.Logf("@address:%s, tcp server addr: %v", address, addr)
clt := newClient(TCP_CLIENT, clt := newClient(TCP_CLIENT,
WithServerAddress(addr), WithServerAddress(addr),
WithReconnectInterval(5e8), WithReconnectInterval(5e8),
...@@ -58,16 +59,15 @@ func TestTCPServer(t *testing.T) { ...@@ -58,16 +59,15 @@ func TestTCPServer(t *testing.T) {
assert.True(t, server.IsClosed()) assert.True(t, server.IsClosed())
} }
func TestUDPServer(t *testing.T) { func testUDPServer(t *testing.T, address string) {
var ( var (
server *server server *server
serverMsgHandler MessageHandler serverMsgHandler MessageHandler
) )
addr := "127.0.0.1:0"
func() { func() {
server = newServer( server = newServer(
UDP_ENDPOINT, UDP_ENDPOINT,
WithLocalAddress(addr), WithLocalAddress(address),
) )
newServerSession := func(session Session) error { newServerSession := func(session Session) error {
return newSessionCallback(session, &serverMsgHandler) return newSessionCallback(session, &serverMsgHandler)
...@@ -75,34 +75,25 @@ func TestUDPServer(t *testing.T) { ...@@ -75,34 +75,25 @@ func TestUDPServer(t *testing.T) {
server.RunEventLoop(newServerSession) server.RunEventLoop(newServerSession)
assert.True(t, server.ID() > 0) assert.True(t, server.ID() > 0)
assert.True(t, server.EndPointType() == UDP_ENDPOINT) assert.True(t, server.EndPointType() == UDP_ENDPOINT)
assert.NotNil(t, server.pktListener)
}() }()
time.Sleep(500e6) time.Sleep(500e6)
//addr = server.streamListener.Addr().String() addr := server.pktListener.LocalAddr().String()
//t.Logf("server addr: %v", addr) t.Logf("@address:%s, udp server addr: %v", address, addr)
//clt := newClient(TCP_CLIENT, }
// WithServerAddress(addr),
// WithReconnectInterval(5e8), func TestServer(t *testing.T) {
// WithConnectionNumber(1), var addr string
//)
//assert.NotNil(t, clt) testTCPServer(t, addr)
//assert.True(t, clt.ID() > 0) testUDPServer(t, addr)
//assert.Equal(t, clt.endPointType, TCP_CLIENT)
// addr = "127.0.0.1:0"
//var ( testTCPServer(t, addr)
// msgHandler MessageHandler testUDPServer(t, addr)
//)
//cb := func(session Session) error { addr = "127.0.0.1"
// return newSessionCallback(session, &msgHandler) testTCPServer(t, addr)
//} testUDPServer(t, addr)
//
//clt.RunEventLoop(cb)
//time.Sleep(1e9)
//
//assert.Equal(t, 1, msgHandler.SessionNumber())
//clt.Close()
//assert.True(t, clt.IsClosed())
//
//server.Close()
//assert.True(t, server.IsClosed())
} }
...@@ -12,6 +12,7 @@ package getty ...@@ -12,6 +12,7 @@ package getty
import ( import (
"bytes" "bytes"
"fmt" "fmt"
jerrors "github.com/juju/errors"
"io" "io"
"net" "net"
"runtime" "runtime"
...@@ -387,7 +388,7 @@ func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error { ...@@ -387,7 +388,7 @@ func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error {
if timeout <= 0 { if timeout <= 0 {
pkgBytes, err := s.writer.Write(s, pkg) pkgBytes, err := s.writer.Write(s, pkg)
if err != nil { if err != nil {
log.Warnf("%s, [session.WritePkg] session.writer.Write(@pkg:%#v) = error:%v", s.Stat(), pkg, err) log.Warnf("%s, [session.WritePkg] session.writer.Write(@pkg:%#v) = error:%+v", s.Stat(), pkg, err)
return perrors.WithStack(err) return perrors.WithStack(err)
} }
var udpCtxPtr *UDPContext var udpCtxPtr *UDPContext
...@@ -404,7 +405,7 @@ func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error { ...@@ -404,7 +405,7 @@ func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error {
} }
_, err = s.Connection.send(pkg) _, err = s.Connection.send(pkg)
if err != nil { if err != nil {
log.Warnf("%s, [session.WritePkg] @s.Connection.Write(pkg:%#v) = err:%v", s.Stat(), pkg, err) log.Warnf("%s, [session.WritePkg] @s.Connection.Write(pkg:%#v) = err:%+v", s.Stat(), pkg, err)
return perrors.WithStack(err) return perrors.WithStack(err)
} }
return nil return nil
...@@ -445,10 +446,47 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error { ...@@ -445,10 +446,47 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error {
return s.WriteBytes(pkgs[0]) return s.WriteBytes(pkgs[0])
} }
// TODO Currently, only TCP is supported. // reduce syscall and memcopy for multiple packages
if _, ok := s.Connection.(*gettyTCPConn); ok {
if _, err := s.Connection.send(pkgs); err != nil { if _, err := s.Connection.send(pkgs); err != nil {
return perrors.Wrapf(err, "s.Connection.Write(pkgs num:%d)", len(pkgs)) return perrors.Wrapf(err, "s.Connection.Write(pkgs num:%d)", len(pkgs))
} }
}
// get len
var (
l int
err error
length int
arrp *[]byte
arr []byte
)
length = 0
for i := 0; i < len(pkgs); i++ {
length += len(pkgs[i])
}
// merge the pkgs
//arr = make([]byte, length)
arrp = gxbytes.AcquireBytes(length)
defer gxbytes.ReleaseBytes(arrp)
arr = *arrp
l = 0
for i := 0; i < len(pkgs); i++ {
copy(arr[l:], pkgs[i])
l += len(pkgs[i])
}
if err = s.WriteBytes(arr); err != nil {
return jerrors.Trace(err)
}
num := len(pkgs) - 1
for i := 0; i < num; i++ {
s.incWritePkgNum()
}
return nil return nil
} }
...@@ -554,7 +592,7 @@ LOOP: ...@@ -554,7 +592,7 @@ LOOP:
for idx := 0; idx < maxIovecNum; idx++ { for idx := 0; idx < maxIovecNum; idx++ {
pkgBytes, err = s.writer.Write(s, outPkg) pkgBytes, err = s.writer.Write(s, outPkg)
if err != nil { if err != nil {
log.Errorf("%s, [session.handleLoop] = error:%+v", s.sessionToken(), err) log.Errorf("%s, [session.handleLoop] = error:%+v", s.sessionToken(), jerrors.ErrorStack(err))
s.stop() s.stop()
// break LOOP // break LOOP
flag = false flag = false
...@@ -582,7 +620,7 @@ LOOP: ...@@ -582,7 +620,7 @@ LOOP:
err = s.WriteBytesArray(iovec[:]...) err = s.WriteBytesArray(iovec[:]...)
if err != nil { if err != nil {
log.Errorf("%s, [session.handleLoop]s.WriteBytesArray(iovec len:%d) = error:%+v", log.Errorf("%s, [session.handleLoop]s.WriteBytesArray(iovec len:%d) = error:%+v",
s.sessionToken(), len(iovec), err) s.sessionToken(), len(iovec), jerrors.ErrorStack(err))
s.stop() s.stop()
// break LOOP // break LOOP
flag = false flag = false
...@@ -593,7 +631,7 @@ LOOP: ...@@ -593,7 +631,7 @@ LOOP:
if wsFlag { if wsFlag {
err := wsConn.writePing() err := wsConn.writePing()
if err != nil { if err != nil {
log.Warnf("wsConn.writePing() = error{%s}", err) log.Warnf("wsConn.writePing() = error:%+v", perrors.WithStack(err))
} }
} }
s.listener.OnCron(s) s.listener.OnCron(s)
...@@ -632,7 +670,7 @@ func (s *session) handlePackage() { ...@@ -632,7 +670,7 @@ func (s *session) handlePackage() {
log.Infof("%s, [session.handlePackage] gr will exit now, left gr num %d", s.sessionToken(), grNum) log.Infof("%s, [session.handlePackage] gr will exit now, left gr num %d", s.sessionToken(), grNum)
s.stop() s.stop()
if err != nil { if err != nil {
log.Errorf("%s, [session.handlePackage] error:%+v", s.sessionToken(), err) log.Errorf("%s, [session.handlePackage] error:%+v", s.sessionToken(), perrors.WithStack(err))
if s != nil || s.listener != nil { if s != nil || s.listener != nil {
s.listener.OnError(s, err) s.listener.OnError(s, err)
} }
...@@ -703,12 +741,12 @@ func (s *session) handleTCPPackage() error { ...@@ -703,12 +741,12 @@ func (s *session) handleTCPPackage() error {
break break
} }
if perrors.Cause(err) == io.EOF { if perrors.Cause(err) == io.EOF {
log.Infof("%s, [session.conn.read] = error:%+v", s.sessionToken(), err) log.Infof("%s, [session.conn.read] = error:%+v", s.sessionToken(), perrors.WithStack(err))
err = nil err = nil
exit = true exit = true
break break
} }
log.Errorf("%s, [session.conn.read] = error:%+v", s.sessionToken(), err) log.Errorf("%s, [session.conn.read] = error:%+v", s.sessionToken(), perrors.WithStack(err))
exit = true exit = true
} }
break break
...@@ -784,7 +822,7 @@ func (s *session) handleUDPPackage() error { ...@@ -784,7 +822,7 @@ func (s *session) handleUDPPackage() error {
} }
bufLen, addr, err = conn.recv(buf) bufLen, addr, err = conn.recv(buf)
log.Debugf("conn.read() = bufLen:%d, addr:%#v, err:%+v", bufLen, addr, err) log.Debugf("conn.read() = bufLen:%d, addr:%#v, err:%+v", bufLen, addr, perrors.WithStack(err))
if netError, ok = perrors.Cause(err).(net.Error); ok && netError.Timeout() { if netError, ok = perrors.Cause(err).(net.Error); ok && netError.Timeout() {
continue continue
} }
...@@ -796,7 +834,7 @@ func (s *session) handleUDPPackage() error { ...@@ -796,7 +834,7 @@ func (s *session) handleUDPPackage() error {
} }
if bufLen == 0 { if bufLen == 0 {
log.Errorf("conn.read() = bufLen:%d, addr:%s, err:%+v", bufLen, addr, err) log.Errorf("conn.read() = bufLen:%d, addr:%s, err:%+v", bufLen, addr, perrors.WithStack(err))
continue continue
} }
...@@ -806,17 +844,17 @@ func (s *session) handleUDPPackage() error { ...@@ -806,17 +844,17 @@ func (s *session) handleUDPPackage() error {
} }
pkg, pkgLen, err = s.reader.Read(s, buf[:bufLen]) pkg, pkgLen, err = s.reader.Read(s, buf[:bufLen])
log.Debugf("s.reader.Read() = pkg:%#v, pkgLen:%d, err:%+v", pkg, pkgLen, err) log.Debugf("s.reader.Read() = pkg:%#v, pkgLen:%d, err:%+v", pkg, pkgLen, perrors.WithStack(err))
if err == nil && s.maxMsgLen > 0 && bufLen > int(s.maxMsgLen) { if err == nil && s.maxMsgLen > 0 && bufLen > int(s.maxMsgLen) {
err = perrors.Errorf("Message Too Long, bufLen %d, session max message len %d", bufLen, s.maxMsgLen) err = perrors.Errorf("Message Too Long, bufLen %d, session max message len %d", bufLen, s.maxMsgLen)
} }
if err != nil { if err != nil {
log.Warnf("%s, [session.handleUDPPackage] = len{%d}, error:%+v", log.Warnf("%s, [session.handleUDPPackage] = len:%d, error:%+v",
s.sessionToken(), pkgLen, err) s.sessionToken(), pkgLen, perrors.WithStack(err))
continue continue
} }
if pkgLen == 0 { if pkgLen == 0 {
log.Errorf("s.reader.Read() = pkg:%#v, pkgLen:%d, err:%+v", pkg, pkgLen, err) log.Errorf("s.reader.Read() = pkg:%#v, pkgLen:%d, err:%+v", pkg, pkgLen, perrors.WithStack(err))
continue continue
} }
...@@ -849,7 +887,7 @@ func (s *session) handleWSPackage() error { ...@@ -849,7 +887,7 @@ func (s *session) handleWSPackage() error {
continue continue
} }
if err != nil { if err != nil {
log.Warnf("%s, [session.handleWSPackage] = error{%+s}", log.Warnf("%s, [session.handleWSPackage] = error:%+v",
s.sessionToken(), perrors.WithStack(err)) s.sessionToken(), perrors.WithStack(err))
return perrors.WithStack(err) return perrors.WithStack(err)
} }
...@@ -860,8 +898,8 @@ func (s *session) handleWSPackage() error { ...@@ -860,8 +898,8 @@ func (s *session) handleWSPackage() error {
err = perrors.Errorf("Message Too Long, length %d, session max message len %d", length, s.maxMsgLen) err = perrors.Errorf("Message Too Long, length %d, session max message len %d", length, s.maxMsgLen)
} }
if err != nil { if err != nil {
log.Warnf("%s, [session.handleWSPackage] = len{%d}, error:%+v", log.Warnf("%s, [session.handleWSPackage] = len:%d, error:%+v",
s.sessionToken(), length, err) s.sessionToken(), length, perrors.WithStack(err))
continue continue
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment