Commit ab472d9d authored by AlexStocks's avatar AlexStocks Committed by watermelo

Add: listen on random local port

parent 0683f35e
...@@ -6,6 +6,8 @@ github.com/dubbogo/gost v1.1.1 h1:JCM7vx5edPIjDA5ovJTuzEEXuw2t7xLyrlgi2mi5jHI= ...@@ -6,6 +6,8 @@ github.com/dubbogo/gost v1.1.1 h1:JCM7vx5edPIjDA5ovJTuzEEXuw2t7xLyrlgi2mi5jHI=
github.com/dubbogo/gost v1.1.1/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg= github.com/dubbogo/gost v1.1.1/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg=
github.com/dubbogo/gost v1.5.2 h1:ri/03971hdpnn3QeCU+4UZgnRNGDXLDGDucR/iozZm8= github.com/dubbogo/gost v1.5.2 h1:ri/03971hdpnn3QeCU+4UZgnRNGDXLDGDucR/iozZm8=
github.com/dubbogo/gost v1.5.2/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8= github.com/dubbogo/gost v1.5.2/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/dubbogo/gost v1.9.0 h1:UT+dWwvLyJiDotxJERO75jB3Yxgsdy10KztR5ycxRAk=
github.com/dubbogo/gost v1.9.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
......
...@@ -17,6 +17,7 @@ import ( ...@@ -17,6 +17,7 @@ import (
"io/ioutil" "io/ioutil"
"net" "net"
"net/http" "net/http"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
...@@ -67,9 +68,9 @@ func newServer(t EndPointType, opts ...ServerOption) *server { ...@@ -67,9 +68,9 @@ func newServer(t EndPointType, opts ...ServerOption) *server {
s.init(opts...) s.init(opts...)
if s.addr == "" { //if len(s.addr) == 0 {
panic(fmt.Sprintf("@addr:%s", s.addr)) // panic(fmt.Sprintf("@addr:%s", s.addr))
} //}
return s return s
} }
...@@ -163,9 +164,16 @@ func (s *server) listenTCP() error { ...@@ -163,9 +164,16 @@ func (s *server) listenTCP() error {
streamListener net.Listener streamListener net.Listener
) )
if len(s.addr) == 0 || !strings.Contains(s.addr, ":") {
streamListener, err = gxnet.ListenOnTCPRandomPort(s.addr)
if err != nil {
return perrors.Wrapf(err, "gxnet.ListenOnTCPRandomPort(addr:%s)", s.addr)
}
} else {
streamListener, err = net.Listen("tcp", s.addr) streamListener, err = net.Listen("tcp", s.addr)
if err != nil { if err != nil {
return perrors.Wrapf(err, "net.Listen(tcp, addr:%s))", s.addr) return perrors.Wrapf(err, "net.Listen(tcp, addr:%s)", s.addr)
}
} }
s.streamListener = streamListener s.streamListener = streamListener
...@@ -180,6 +188,12 @@ func (s *server) listenUDP() error { ...@@ -180,6 +188,12 @@ func (s *server) listenUDP() error {
pktListener *net.UDPConn pktListener *net.UDPConn
) )
if len(s.addr) == 0 || !strings.Contains(s.addr, ":") {
pktListener, err = gxnet.ListenOnUDPRandomPort(s.addr)
if err != nil {
return perrors.Wrapf(err, "gxnet.ListenOnUDPRandomPort(addr:%s)", s.addr)
}
} else {
localAddr, err = net.ResolveUDPAddr("udp", s.addr) localAddr, err = net.ResolveUDPAddr("udp", s.addr)
if err != nil { if err != nil {
return perrors.Wrapf(err, "net.ResolveUDPAddr(udp, addr:%s)", s.addr) return perrors.Wrapf(err, "net.ResolveUDPAddr(udp, addr:%s)", s.addr)
...@@ -188,6 +202,7 @@ func (s *server) listenUDP() error { ...@@ -188,6 +202,7 @@ func (s *server) listenUDP() error {
if err != nil { if err != nil {
return perrors.Wrapf(err, "net.ListenUDP((udp, localAddr:%#v)", localAddr) return perrors.Wrapf(err, "net.ListenUDP((udp, localAddr:%#v)", localAddr)
} }
}
s.pktListener = pktListener s.pktListener = pktListener
...@@ -256,7 +271,7 @@ func (s *server) runTcpEventLoop(newSession NewSessionCallback) { ...@@ -256,7 +271,7 @@ func (s *server) runTcpEventLoop(newSession NewSessionCallback) {
} }
continue continue
} }
log.Warnf("server{%s}.Accept() = err {%+v}", s.addr, err) log.Warnf("server{%s}.Accept() = err {%+v}", s.addr, perrors.WithStack(err))
continue continue
} }
delay = 0 delay = 0
...@@ -357,7 +372,7 @@ func (s *server) runWSEventLoop(newSession NewSessionCallback) { ...@@ -357,7 +372,7 @@ func (s *server) runWSEventLoop(newSession NewSessionCallback) {
s.lock.Unlock() s.lock.Unlock()
err = server.Serve(s.streamListener) err = server.Serve(s.streamListener)
if err != nil { if err != nil {
log.Errorf("http.server.Serve(addr{%s}) = err{%+v}", s.addr, err) log.Errorf("http.server.Serve(addr{%s}) = err{%+v}", s.addr, perrors.WithStack(err))
// panic(err) // panic(err)
} }
}() }()
...@@ -381,7 +396,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) { ...@@ -381,7 +396,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
if certificate, err = tls.LoadX509KeyPair(s.cert, s.privateKey); err != nil { if certificate, err = tls.LoadX509KeyPair(s.cert, s.privateKey); err != nil {
panic(fmt.Sprintf("tls.LoadX509KeyPair(cert{%s}, privateKey{%s}) = err{%+v}", panic(fmt.Sprintf("tls.LoadX509KeyPair(cert{%s}, privateKey{%s}) = err{%+v}",
s.cert, s.privateKey, err)) s.cert, s.privateKey, perrors.WithStack(err)))
return return
} }
config = &tls.Config{ config = &tls.Config{
...@@ -394,7 +409,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) { ...@@ -394,7 +409,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
if s.caCert != "" { if s.caCert != "" {
certPem, err = ioutil.ReadFile(s.caCert) certPem, err = ioutil.ReadFile(s.caCert)
if err != nil { if err != nil {
panic(fmt.Errorf("ioutil.ReadFile(certFile{%s}) = err{%+v}", s.caCert, err)) panic(fmt.Errorf("ioutil.ReadFile(certFile{%s}) = err{%+v}", s.caCert, perrors.WithStack(err)))
} }
certPool = x509.NewCertPool() certPool = x509.NewCertPool()
if ok := certPool.AppendCertsFromPEM(certPem); !ok { if ok := certPool.AppendCertsFromPEM(certPem); !ok {
...@@ -419,7 +434,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) { ...@@ -419,7 +434,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
s.lock.Unlock() s.lock.Unlock()
err = server.Serve(tls.NewListener(s.streamListener, config)) err = server.Serve(tls.NewListener(s.streamListener, config))
if err != nil { if err != nil {
log.Errorf("http.server.Serve(addr{%s}) = err{%+v}", s.addr, err) log.Errorf("http.server.Serve(addr{%s}) = err{%+v}", s.addr, perrors.WithStack(err))
panic(err) panic(err)
} }
}() }()
...@@ -429,7 +444,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) { ...@@ -429,7 +444,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
// @newSession: new connection callback // @newSession: new connection callback
func (s *server) RunEventLoop(newSession NewSessionCallback) { func (s *server) RunEventLoop(newSession NewSessionCallback) {
if err := s.listen(); err != nil { if err := s.listen(); err != nil {
panic(fmt.Errorf("server.listen() = error:%+v", err)) panic(fmt.Errorf("server.listen() = error:%+v", perrors.WithStack(err)))
} }
switch s.endPointType { switch s.endPointType {
......
...@@ -9,16 +9,16 @@ import ( ...@@ -9,16 +9,16 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
func TestTCPServer(t *testing.T) { func testTCPServer(t *testing.T, address string) {
var ( var (
server *server server *server
serverMsgHandler MessageHandler serverMsgHandler MessageHandler
) )
addr := "127.0.0.1:0"
func() { func() {
server = newServer( server = newServer(
TCP_SERVER, TCP_SERVER,
WithLocalAddress(addr), WithLocalAddress(address),
) )
newServerSession := func(session Session) error { newServerSession := func(session Session) error {
return newSessionCallback(session, &serverMsgHandler) return newSessionCallback(session, &serverMsgHandler)
...@@ -26,11 +26,12 @@ func TestTCPServer(t *testing.T) { ...@@ -26,11 +26,12 @@ func TestTCPServer(t *testing.T) {
server.RunEventLoop(newServerSession) server.RunEventLoop(newServerSession)
assert.True(t, server.ID() > 0) assert.True(t, server.ID() > 0)
assert.True(t, server.EndPointType() == TCP_SERVER) assert.True(t, server.EndPointType() == TCP_SERVER)
assert.NotNil(t, server.streamListener)
}() }()
time.Sleep(500e6) time.Sleep(500e6)
addr = server.streamListener.Addr().String() addr := server.streamListener.Addr().String()
t.Logf("server addr: %v", addr) t.Logf("@address:%s, tcp server addr: %v", address, addr)
clt := newClient(TCP_CLIENT, clt := newClient(TCP_CLIENT,
WithServerAddress(addr), WithServerAddress(addr),
WithReconnectInterval(5e8), WithReconnectInterval(5e8),
...@@ -58,16 +59,15 @@ func TestTCPServer(t *testing.T) { ...@@ -58,16 +59,15 @@ func TestTCPServer(t *testing.T) {
assert.True(t, server.IsClosed()) assert.True(t, server.IsClosed())
} }
func TestUDPServer(t *testing.T) { func testUDPServer(t *testing.T, address string) {
var ( var (
server *server server *server
serverMsgHandler MessageHandler serverMsgHandler MessageHandler
) )
addr := "127.0.0.1:0"
func() { func() {
server = newServer( server = newServer(
UDP_ENDPOINT, UDP_ENDPOINT,
WithLocalAddress(addr), WithLocalAddress(address),
) )
newServerSession := func(session Session) error { newServerSession := func(session Session) error {
return newSessionCallback(session, &serverMsgHandler) return newSessionCallback(session, &serverMsgHandler)
...@@ -75,34 +75,25 @@ func TestUDPServer(t *testing.T) { ...@@ -75,34 +75,25 @@ func TestUDPServer(t *testing.T) {
server.RunEventLoop(newServerSession) server.RunEventLoop(newServerSession)
assert.True(t, server.ID() > 0) assert.True(t, server.ID() > 0)
assert.True(t, server.EndPointType() == UDP_ENDPOINT) assert.True(t, server.EndPointType() == UDP_ENDPOINT)
assert.NotNil(t, server.pktListener)
}() }()
time.Sleep(500e6) time.Sleep(500e6)
//addr = server.streamListener.Addr().String() addr := server.pktListener.LocalAddr().String()
//t.Logf("server addr: %v", addr) t.Logf("@address:%s, udp server addr: %v", address, addr)
//clt := newClient(TCP_CLIENT, }
// WithServerAddress(addr),
// WithReconnectInterval(5e8), func TestServer(t *testing.T) {
// WithConnectionNumber(1), var addr string
//)
//assert.NotNil(t, clt) testTCPServer(t, addr)
//assert.True(t, clt.ID() > 0) testUDPServer(t, addr)
//assert.Equal(t, clt.endPointType, TCP_CLIENT)
// addr = "127.0.0.1:0"
//var ( testTCPServer(t, addr)
// msgHandler MessageHandler testUDPServer(t, addr)
//)
//cb := func(session Session) error { addr = "127.0.0.1"
// return newSessionCallback(session, &msgHandler) testTCPServer(t, addr)
//} testUDPServer(t, addr)
//
//clt.RunEventLoop(cb)
//time.Sleep(1e9)
//
//assert.Equal(t, 1, msgHandler.SessionNumber())
//clt.Close()
//assert.True(t, clt.IsClosed())
//
//server.Close()
//assert.True(t, server.IsClosed())
} }
...@@ -703,12 +703,12 @@ func (s *session) handleTCPPackage() error { ...@@ -703,12 +703,12 @@ func (s *session) handleTCPPackage() error {
break break
} }
if perrors.Cause(err) == io.EOF { if perrors.Cause(err) == io.EOF {
log.Infof("%s, [session.conn.read] = error:%+v", s.sessionToken(), err) log.Infof("%s, [session.conn.read] = error:%+v", s.sessionToken(), perrors.WithStack(err))
err = nil err = nil
exit = true exit = true
break break
} }
log.Errorf("%s, [session.conn.read] = error:%+v", s.sessionToken(), err) log.Errorf("%s, [session.conn.read] = error:%+v", s.sessionToken(), perrors.WithStack(err))
exit = true exit = true
} }
break break
...@@ -784,7 +784,7 @@ func (s *session) handleUDPPackage() error { ...@@ -784,7 +784,7 @@ func (s *session) handleUDPPackage() error {
} }
bufLen, addr, err = conn.recv(buf) bufLen, addr, err = conn.recv(buf)
log.Debugf("conn.read() = bufLen:%d, addr:%#v, err:%+v", bufLen, addr, err) log.Debugf("conn.read() = bufLen:%d, addr:%#v, err:%+v", bufLen, addr, perrors.WithStack(err))
if netError, ok = perrors.Cause(err).(net.Error); ok && netError.Timeout() { if netError, ok = perrors.Cause(err).(net.Error); ok && netError.Timeout() {
continue continue
} }
...@@ -796,7 +796,7 @@ func (s *session) handleUDPPackage() error { ...@@ -796,7 +796,7 @@ func (s *session) handleUDPPackage() error {
} }
if bufLen == 0 { if bufLen == 0 {
log.Errorf("conn.read() = bufLen:%d, addr:%s, err:%+v", bufLen, addr, err) log.Errorf("conn.read() = bufLen:%d, addr:%s, err:%+v", bufLen, addr, perrors.WithStack(err))
continue continue
} }
...@@ -806,17 +806,17 @@ func (s *session) handleUDPPackage() error { ...@@ -806,17 +806,17 @@ func (s *session) handleUDPPackage() error {
} }
pkg, pkgLen, err = s.reader.Read(s, buf[:bufLen]) pkg, pkgLen, err = s.reader.Read(s, buf[:bufLen])
log.Debugf("s.reader.Read() = pkg:%#v, pkgLen:%d, err:%+v", pkg, pkgLen, err) log.Debugf("s.reader.Read() = pkg:%#v, pkgLen:%d, err:%+v", pkg, pkgLen, perrors.WithStack(err))
if err == nil && s.maxMsgLen > 0 && bufLen > int(s.maxMsgLen) { if err == nil && s.maxMsgLen > 0 && bufLen > int(s.maxMsgLen) {
err = perrors.Errorf("Message Too Long, bufLen %d, session max message len %d", bufLen, s.maxMsgLen) err = perrors.Errorf("Message Too Long, bufLen %d, session max message len %d", bufLen, s.maxMsgLen)
} }
if err != nil { if err != nil {
log.Warnf("%s, [session.handleUDPPackage] = len{%d}, error:%+v", log.Warnf("%s, [session.handleUDPPackage] = len{%d}, error:%+v",
s.sessionToken(), pkgLen, err) s.sessionToken(), pkgLen, perrors.WithStack(err))
continue continue
} }
if pkgLen == 0 { if pkgLen == 0 {
log.Errorf("s.reader.Read() = pkg:%#v, pkgLen:%d, err:%+v", pkg, pkgLen, err) log.Errorf("s.reader.Read() = pkg:%#v, pkgLen:%d, err:%+v", pkg, pkgLen, perrors.WithStack(err))
continue continue
} }
...@@ -861,7 +861,7 @@ func (s *session) handleWSPackage() error { ...@@ -861,7 +861,7 @@ func (s *session) handleWSPackage() error {
} }
if err != nil { if err != nil {
log.Warnf("%s, [session.handleWSPackage] = len{%d}, error:%+v", log.Warnf("%s, [session.handleWSPackage] = len{%d}, error:%+v",
s.sessionToken(), length, err) s.sessionToken(), length, perrors.WithStack(err))
continue continue
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment