Unverified Commit 78e88386 authored by 望哥's avatar 望哥 Committed by GitHub

Merge pull request #7 from divebomb/master

Add: endpoint id
parents 7886bc27 a14ac8a9
......@@ -18,6 +18,7 @@ import (
"net"
"strings"
"sync"
"sync/atomic"
"time"
)
......@@ -27,9 +28,10 @@ import (
)
const (
connInterval = 5e8 // 500ms
connectTimeout = 3e9
maxTimes = 10
reconnectInterval = 3e8 // 300ms
connectInterval = 5e8 // 500ms
connectTimeout = 3e9
maxTimes = 10
)
var (
......@@ -41,9 +43,16 @@ var (
// getty tcp client
/////////////////////////////////////////
var (
clientID = EndPointID(0)
)
type client struct {
ClientOptions
// endpoint ID
endPointID EndPointID
// net
sync.Mutex
endPointType EndPointType
......@@ -64,6 +73,7 @@ func (c *client) init(opts ...ClientOption) {
func newClient(t EndPointType, opts ...ClientOption) *client {
c := &client{
endPointID: atomic.AddInt32(&clientID, 1),
endPointType: t,
done: make(chan struct{}),
}
......@@ -114,6 +124,10 @@ func NewWSSClient(opts ...ClientOption) Client {
return c
}
func (c client) ID() EndPointID {
return c.endPointID
}
func (c client) EndPointType() EndPointType {
return c.endPointType
}
......@@ -138,7 +152,7 @@ func (c *client) dialTCP() Session {
}
log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, err)
time.Sleep(connInterval)
time.Sleep(connectInterval)
}
}
......@@ -166,7 +180,7 @@ func (c *client) dialUDP() Session {
}
if err != nil {
log.Warnf("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, err)
time.Sleep(connInterval)
time.Sleep(connectInterval)
continue
}
......@@ -175,7 +189,7 @@ func (c *client) dialUDP() Session {
if length, err = conn.Write(connectPingPackage[:]); err != nil {
conn.Close()
log.Warnf("conn.Write(%s) = {length:%d, err:%+v}", string(connectPingPackage), length, err)
time.Sleep(connInterval)
time.Sleep(connectInterval)
continue
}
conn.SetReadDeadline(time.Now().Add(1e9))
......@@ -186,7 +200,7 @@ func (c *client) dialUDP() Session {
if err != nil {
log.Infof("conn{%#v}.Read() = {length:%d, err:%+v}", conn, length, err)
conn.Close()
time.Sleep(connInterval)
time.Sleep(connectInterval)
continue
}
//if err == nil {
......@@ -224,7 +238,7 @@ func (c *client) dialWS() Session {
}
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, err)
time.Sleep(connInterval)
time.Sleep(connectInterval)
}
}
......@@ -302,7 +316,7 @@ func (c *client) dialWSS() Session {
}
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, err)
time.Sleep(connInterval)
time.Sleep(connectInterval)
}
}
......@@ -383,10 +397,14 @@ func (c *client) RunEventLoop(newSession NewSessionCallback) {
// a for-loop connect to make sure the connection pool is valid
func (c *client) reConnect() {
var num, max, times int
var num, max, times, interval int
// c.Lock()
max = c.number
interval = c.reconnectInterval
if interval == 0 {
interval = reconnectInterval
}
// c.Unlock()
for {
if c.IsClosed() {
......@@ -403,7 +421,7 @@ func (c *client) reConnect() {
if maxTimes < times {
times = maxTimes
}
time.Sleep(time.Duration(int64(times) * int64(c.reconnectInterval)))
time.Sleep(time.Duration(int64(times) * int64(interval)))
}
}
......
......@@ -14,6 +14,7 @@ import (
"strconv"
)
type EndPointID = int32
type EndPointType int32
const (
......
......@@ -173,6 +173,8 @@ type Session interface {
/////////////////////////////////////////
type EndPoint interface {
// get EndPoint ID
ID() EndPointID
// get endpoint type
EndPointType() EndPointType
// run event loop and serves client request.
......
......@@ -92,14 +92,18 @@ func WithServerAddress(addr string) ClientOption {
// @reconnectInterval is server address.
func WithReconnectInterval(reconnectInterval int) ClientOption {
return func(o *ClientOptions) {
o.reconnectInterval = reconnectInterval
if 0 < reconnectInterval {
o.reconnectInterval = reconnectInterval
}
}
}
// @num is connection number.
func WithConnectionNumber(num int) ClientOption {
return func(o *ClientOptions) {
o.number = num
if 0 < num {
o.number = num
}
}
}
......
......@@ -18,6 +18,7 @@ import (
"net"
"net/http"
"sync"
"sync/atomic"
"time"
)
......@@ -29,11 +30,15 @@ import (
var (
errSelfConnect = perrors.New("connect self!")
serverFastFailTimeout = time.Second * 1
serverID = EndPointID(0)
)
type server struct {
ServerOptions
// endpoint ID
endPointID EndPointID
// net
pktListener net.PacketConn
streamListener net.Listener
......@@ -54,6 +59,7 @@ func (s *server) init(opts ...ServerOption) {
func newServer(t EndPointType, opts ...ServerOption) *server {
s := &server{
endPointID: atomic.AddInt32(&serverID, 1),
endPointType: t,
done: make(chan struct{}),
}
......@@ -94,6 +100,10 @@ func NewWSSServer(opts ...ServerOption) Server {
return s
}
func (s server) ID() int32 {
return s.endPointID
}
func (s server) EndPointType() EndPointType {
return s.endPointType
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment