Commit a14ac8a9 authored by AlexStocks's avatar AlexStocks

Add: endpoint id

parent 58156632
...@@ -18,6 +18,7 @@ import ( ...@@ -18,6 +18,7 @@ import (
"net" "net"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
) )
...@@ -27,9 +28,10 @@ import ( ...@@ -27,9 +28,10 @@ import (
) )
const ( const (
connInterval = 5e8 // 500ms reconnectInterval = 3e8 // 300ms
connectTimeout = 3e9 connectInterval = 5e8 // 500ms
maxTimes = 10 connectTimeout = 3e9
maxTimes = 10
) )
var ( var (
...@@ -41,9 +43,16 @@ var ( ...@@ -41,9 +43,16 @@ var (
// getty tcp client // getty tcp client
///////////////////////////////////////// /////////////////////////////////////////
var (
clientID = EndPointID(0)
)
type client struct { type client struct {
ClientOptions ClientOptions
// endpoint ID
endPointID EndPointID
// net // net
sync.Mutex sync.Mutex
endPointType EndPointType endPointType EndPointType
...@@ -64,6 +73,7 @@ func (c *client) init(opts ...ClientOption) { ...@@ -64,6 +73,7 @@ func (c *client) init(opts ...ClientOption) {
func newClient(t EndPointType, opts ...ClientOption) *client { func newClient(t EndPointType, opts ...ClientOption) *client {
c := &client{ c := &client{
endPointID: atomic.AddInt32(&clientID, 1),
endPointType: t, endPointType: t,
done: make(chan struct{}), done: make(chan struct{}),
} }
...@@ -114,6 +124,10 @@ func NewWSSClient(opts ...ClientOption) Client { ...@@ -114,6 +124,10 @@ func NewWSSClient(opts ...ClientOption) Client {
return c return c
} }
func (c client) ID() EndPointID {
return c.endPointID
}
func (c client) EndPointType() EndPointType { func (c client) EndPointType() EndPointType {
return c.endPointType return c.endPointType
} }
...@@ -138,7 +152,7 @@ func (c *client) dialTCP() Session { ...@@ -138,7 +152,7 @@ func (c *client) dialTCP() Session {
} }
log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, err) log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, err)
time.Sleep(connInterval) time.Sleep(connectInterval)
} }
} }
...@@ -166,7 +180,7 @@ func (c *client) dialUDP() Session { ...@@ -166,7 +180,7 @@ func (c *client) dialUDP() Session {
} }
if err != nil { if err != nil {
log.Warnf("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, err) log.Warnf("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, err)
time.Sleep(connInterval) time.Sleep(connectInterval)
continue continue
} }
...@@ -175,7 +189,7 @@ func (c *client) dialUDP() Session { ...@@ -175,7 +189,7 @@ func (c *client) dialUDP() Session {
if length, err = conn.Write(connectPingPackage[:]); err != nil { if length, err = conn.Write(connectPingPackage[:]); err != nil {
conn.Close() conn.Close()
log.Warnf("conn.Write(%s) = {length:%d, err:%+v}", string(connectPingPackage), length, err) log.Warnf("conn.Write(%s) = {length:%d, err:%+v}", string(connectPingPackage), length, err)
time.Sleep(connInterval) time.Sleep(connectInterval)
continue continue
} }
conn.SetReadDeadline(time.Now().Add(1e9)) conn.SetReadDeadline(time.Now().Add(1e9))
...@@ -186,7 +200,7 @@ func (c *client) dialUDP() Session { ...@@ -186,7 +200,7 @@ func (c *client) dialUDP() Session {
if err != nil { if err != nil {
log.Infof("conn{%#v}.Read() = {length:%d, err:%+v}", conn, length, err) log.Infof("conn{%#v}.Read() = {length:%d, err:%+v}", conn, length, err)
conn.Close() conn.Close()
time.Sleep(connInterval) time.Sleep(connectInterval)
continue continue
} }
//if err == nil { //if err == nil {
...@@ -224,7 +238,7 @@ func (c *client) dialWS() Session { ...@@ -224,7 +238,7 @@ func (c *client) dialWS() Session {
} }
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, err) log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, err)
time.Sleep(connInterval) time.Sleep(connectInterval)
} }
} }
...@@ -302,7 +316,7 @@ func (c *client) dialWSS() Session { ...@@ -302,7 +316,7 @@ func (c *client) dialWSS() Session {
} }
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, err) log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, err)
time.Sleep(connInterval) time.Sleep(connectInterval)
} }
} }
...@@ -383,10 +397,14 @@ func (c *client) RunEventLoop(newSession NewSessionCallback) { ...@@ -383,10 +397,14 @@ func (c *client) RunEventLoop(newSession NewSessionCallback) {
// a for-loop connect to make sure the connection pool is valid // a for-loop connect to make sure the connection pool is valid
func (c *client) reConnect() { func (c *client) reConnect() {
var num, max, times int var num, max, times, interval int
// c.Lock() // c.Lock()
max = c.number max = c.number
interval = c.reconnectInterval
if interval == 0 {
interval = reconnectInterval
}
// c.Unlock() // c.Unlock()
for { for {
if c.IsClosed() { if c.IsClosed() {
...@@ -403,7 +421,7 @@ func (c *client) reConnect() { ...@@ -403,7 +421,7 @@ func (c *client) reConnect() {
if maxTimes < times { if maxTimes < times {
times = maxTimes times = maxTimes
} }
time.Sleep(time.Duration(int64(times) * int64(c.reconnectInterval))) time.Sleep(time.Duration(int64(times) * int64(interval)))
} }
} }
......
...@@ -14,6 +14,7 @@ import ( ...@@ -14,6 +14,7 @@ import (
"strconv" "strconv"
) )
type EndPointID = int32
type EndPointType int32 type EndPointType int32
const ( const (
......
...@@ -173,6 +173,8 @@ type Session interface { ...@@ -173,6 +173,8 @@ type Session interface {
///////////////////////////////////////// /////////////////////////////////////////
type EndPoint interface { type EndPoint interface {
// get EndPoint ID
ID() EndPointID
// get endpoint type // get endpoint type
EndPointType() EndPointType EndPointType() EndPointType
// run event loop and serves client request. // run event loop and serves client request.
......
...@@ -92,14 +92,18 @@ func WithServerAddress(addr string) ClientOption { ...@@ -92,14 +92,18 @@ func WithServerAddress(addr string) ClientOption {
// @reconnectInterval is server address. // @reconnectInterval is server address.
func WithReconnectInterval(reconnectInterval int) ClientOption { func WithReconnectInterval(reconnectInterval int) ClientOption {
return func(o *ClientOptions) { return func(o *ClientOptions) {
o.reconnectInterval = reconnectInterval if 0 < reconnectInterval {
o.reconnectInterval = reconnectInterval
}
} }
} }
// @num is connection number. // @num is connection number.
func WithConnectionNumber(num int) ClientOption { func WithConnectionNumber(num int) ClientOption {
return func(o *ClientOptions) { return func(o *ClientOptions) {
o.number = num if 0 < num {
o.number = num
}
} }
} }
......
...@@ -18,6 +18,7 @@ import ( ...@@ -18,6 +18,7 @@ import (
"net" "net"
"net/http" "net/http"
"sync" "sync"
"sync/atomic"
"time" "time"
) )
...@@ -29,11 +30,15 @@ import ( ...@@ -29,11 +30,15 @@ import (
var ( var (
errSelfConnect = perrors.New("connect self!") errSelfConnect = perrors.New("connect self!")
serverFastFailTimeout = time.Second * 1 serverFastFailTimeout = time.Second * 1
serverID = EndPointID(0)
) )
type server struct { type server struct {
ServerOptions ServerOptions
// endpoint ID
endPointID EndPointID
// net // net
pktListener net.PacketConn pktListener net.PacketConn
streamListener net.Listener streamListener net.Listener
...@@ -54,6 +59,7 @@ func (s *server) init(opts ...ServerOption) { ...@@ -54,6 +59,7 @@ func (s *server) init(opts ...ServerOption) {
func newServer(t EndPointType, opts ...ServerOption) *server { func newServer(t EndPointType, opts ...ServerOption) *server {
s := &server{ s := &server{
endPointID: atomic.AddInt32(&serverID, 1),
endPointType: t, endPointType: t,
done: make(chan struct{}), done: make(chan struct{}),
} }
...@@ -94,6 +100,10 @@ func NewWSSServer(opts ...ServerOption) Server { ...@@ -94,6 +100,10 @@ func NewWSSServer(opts ...ServerOption) Server {
return s return s
} }
func (s server) ID() int32 {
return s.endPointID
}
func (s server) EndPointType() EndPointType { func (s server) EndPointType() EndPointType {
return s.endPointType return s.endPointType
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment