Commit 35056ac3 authored by AlexStocks's avatar AlexStocks

add EndPointType

parent e00f376f
...@@ -14,6 +14,10 @@ ...@@ -14,6 +14,10 @@
## develop history ## ## develop history ##
--- ---
- 2018/03/17
> improvement
* add end point type
- 2018/03/16 - 2018/03/16
> bug fix > bug fix
* set maxMsgLen of UDPSession from zero to 4k * set maxMsgLen of UDPSession from zero to 4k
......
...@@ -33,14 +33,6 @@ const ( ...@@ -33,14 +33,6 @@ const (
maxTimes = 10 maxTimes = 10
) )
const (
CONNECTED_UDP_CLIENT = 1
UNCONNECTED_UDP_CLIENT = 2
TCP_CLIENT = 3
WS_CLIENT = 4
WSS_CLIENT = 5
)
///////////////////////////////////////// /////////////////////////////////////////
// getty tcp client // getty tcp client
///////////////////////////////////////// /////////////////////////////////////////
...@@ -48,12 +40,12 @@ const ( ...@@ -48,12 +40,12 @@ const (
type Client struct { type Client struct {
// net // net
sync.Mutex sync.Mutex
typ int endPointType EndPointType
number int number int
interval time.Duration interval time.Duration
addr string addr string
newSession NewSessionCallback newSession NewSessionCallback
ssMap map[Session]gxsync.Empty ssMap map[Session]gxsync.Empty
sync.Once sync.Once
done chan gxsync.Empty done chan gxsync.Empty
...@@ -70,46 +62,49 @@ type Client struct { ...@@ -70,46 +62,49 @@ type Client struct {
// @connInterval is reconnect sleep interval when getty fails to connect the server. // @connInterval is reconnect sleep interval when getty fails to connect the server.
// @serverAddr is server address. // @serverAddr is server address.
func NewTCPClient(connNum int, connInterval time.Duration, serverAddr string) *Client { func NewTCPClient(connNum int, connInterval time.Duration, serverAddr string) *Client {
if connNum <= 0 { if connNum <= 0 || serverAddr == "" {
connNum = 1 panic(fmt.Sprintf("@connNum:%d, @serverAddr:%s", connNum, serverAddr))
} }
if connInterval < defaultInterval { if connInterval < defaultInterval {
connInterval = defaultInterval connInterval = defaultInterval
} }
return &Client{ return &Client{
typ: TCP_CLIENT, endPointType: TCP_CLIENT,
number: connNum, number: connNum,
interval: connInterval, interval: connInterval,
addr: serverAddr, addr: serverAddr,
ssMap: make(map[Session]gxsync.Empty, connNum), ssMap: make(map[Session]gxsync.Empty, connNum),
done: make(chan gxsync.Empty), done: make(chan gxsync.Empty),
} }
} }
// NewUdpClient function builds a udp client // NewUdpClient function builds a udp client
// @connNum is connection number. If this value is non-zero, getty will build // @connNum is connection number.
// some connected udp clients.
//
// @connInterval is reconnect sleep interval when getty fails to connect the server. // @connInterval is reconnect sleep interval when getty fails to connect the server.
// @serverAddr is server address. // @serverAddr is server address. if this value is none-nil-string, getty will build some connected udp clients.
func NewUDPClient(connNum int, connInterval time.Duration, serverAddr string) *Client { func NewUDPClient(connNum int, connInterval time.Duration, serverAddr string) *Client {
var typ int = CONNECTED_UDP_CLIENT var endPointType = UNCONNECTED_UDP_CLIENT
if connNum <= 0 { if len(serverAddr) != 0 {
connNum = 1 if connNum <= 0 {
typ = UNCONNECTED_UDP_CLIENT panic(fmt.Sprintf("getty will build a preconected connection by @serverAddr:%s while @connNum is %d",
serverAddr, connNum))
}
endPointType = CONNECTED_UDP_CLIENT
} }
if connInterval < defaultInterval { if connInterval < defaultInterval {
connInterval = defaultInterval connInterval = defaultInterval
} }
return &Client{ return &Client{
typ: typ, endPointType: endPointType,
number: connNum, number: connNum,
interval: connInterval, interval: connInterval,
addr: serverAddr, addr: serverAddr,
ssMap: make(map[Session]gxsync.Empty, connNum), ssMap: make(map[Session]gxsync.Empty, connNum),
done: make(chan gxsync.Empty), done: make(chan gxsync.Empty),
} }
} }
...@@ -130,12 +125,12 @@ func NewWSClient(connNum int, connInterval time.Duration, serverAddr string) *Cl ...@@ -130,12 +125,12 @@ func NewWSClient(connNum int, connInterval time.Duration, serverAddr string) *Cl
} }
return &Client{ return &Client{
typ: WS_CLIENT, endPointType: WS_CLIENT,
number: connNum, number: connNum,
interval: connInterval, interval: connInterval,
addr: serverAddr, addr: serverAddr,
ssMap: make(map[Session]gxsync.Empty, connNum), ssMap: make(map[Session]gxsync.Empty, connNum),
done: make(chan gxsync.Empty), done: make(chan gxsync.Empty),
} }
} }
...@@ -160,16 +155,20 @@ func NewWSSClient(connNum int, connInterval time.Duration, serverAddr string, ce ...@@ -160,16 +155,20 @@ func NewWSSClient(connNum int, connInterval time.Duration, serverAddr string, ce
} }
return &Client{ return &Client{
typ: WSS_CLIENT, endPointType: WSS_CLIENT,
number: connNum, number: connNum,
interval: connInterval, interval: connInterval,
addr: serverAddr, addr: serverAddr,
ssMap: make(map[Session]gxsync.Empty, connNum), ssMap: make(map[Session]gxsync.Empty, connNum),
done: make(chan gxsync.Empty), done: make(chan gxsync.Empty),
cert: cert, cert: cert,
} }
} }
func (c Client) Type() EndPointType {
return c.endPointType
}
func (c *Client) dialTCP() Session { func (c *Client) dialTCP() Session {
var ( var (
err error err error
...@@ -207,7 +206,7 @@ func (c *Client) dialUDP() Session { ...@@ -207,7 +206,7 @@ func (c *Client) dialUDP() Session {
if c.IsClosed() { if c.IsClosed() {
return nil return nil
} }
if UNCONNECTED_UDP_CLIENT == c.typ { if UNCONNECTED_UDP_CLIENT == c.endPointType {
conn, err = net.ListenUDP("udp", localAddr) conn, err = net.ListenUDP("udp", localAddr)
} else { } else {
conn, err = net.DialUDP("udp", localAddr, peerAddr) conn, err = net.DialUDP("udp", localAddr, peerAddr)
...@@ -335,7 +334,7 @@ func (c *Client) dialWSS() Session { ...@@ -335,7 +334,7 @@ func (c *Client) dialWSS() Session {
} }
func (c *Client) dial() Session { func (c *Client) dial() Session {
switch c.typ { switch c.endPointType {
case TCP_CLIENT: case TCP_CLIENT:
return c.dialTCP() return c.dialTCP()
case UNCONNECTED_UDP_CLIENT, CONNECTED_UDP_CLIENT: case UNCONNECTED_UDP_CLIENT, CONNECTED_UDP_CLIENT:
...@@ -424,7 +423,7 @@ func (c *Client) RunEventLoop(newSession NewSessionCallback) { ...@@ -424,7 +423,7 @@ func (c *Client) RunEventLoop(newSession NewSessionCallback) {
} }
times = 0 times = 0
c.connect() c.connect()
if c.typ == UNCONNECTED_UDP_CLIENT || c.typ == CONNECTED_UDP_CLIENT { if c.endPointType == UNCONNECTED_UDP_CLIENT || c.endPointType == CONNECTED_UDP_CLIENT {
break break
} }
// time.Sleep(c.interval) // build c.number connections asap // time.Sleep(c.interval) // build c.number connections asap
......
...@@ -20,6 +20,7 @@ import ( ...@@ -20,6 +20,7 @@ import (
) )
import ( import (
"github.com/AlexStocks/goext/net"
log "github.com/AlexStocks/log4go" log "github.com/AlexStocks/log4go"
"github.com/golang/snappy" "github.com/golang/snappy"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
...@@ -477,11 +478,17 @@ func (u *gettyUDPConn) Write(udpCtx interface{}) (int, error) { ...@@ -477,11 +478,17 @@ func (u *gettyUDPConn) Write(udpCtx interface{}) (int, error) {
atomic.AddUint32(&u.writeCount, (uint32)(len(buf))) atomic.AddUint32(&u.writeCount, (uint32)(len(buf)))
peerAddr = ctx.PeerAddr peerAddr = ctx.PeerAddr
if u.peerAddr != nil { if u.peerAddr != nil && peerAddr != nil {
peerAddr = u.peerAddr if !gxnet.IsUDPAddrEqual(peerAddr, u.peerAddr) {
return 0, fmt.Errorf("use of peerAddr %s different from preconnected udp connection address %s",
peerAddr, u.peerAddr)
}
peerAddr = nil
} }
peerAddr = nil
length, _, err = u.conn.WriteMsgUDP(buf, nil, peerAddr) length, _, err = u.conn.WriteMsgUDP(buf, nil, peerAddr)
log.Debug("now:%s, length:%d, err:%#v", currentTime, length, err) log.Debug("now:%s, length:%d, err:%s, peerAddr:%s, session.peer:%s", currentTime, length, err, peerAddr, u.peerAddr)
return length, err return length, err
} }
......
/******************************************************
# DESC : const properties
# AUTHOR : Alex Stocks
# VERSION : 1.0
# LICENCE : Apache License 2.0
# EMAIL : alexstocks@foxmail.com
# MOD : 2018-03-17 16:54
# FILE : const.go
******************************************************/
package getty
import (
"strconv"
)
type EndPointType int32
const (
CONNECTED_UDP_CLIENT EndPointType = 0
UNCONNECTED_UDP_CLIENT EndPointType = 1
TCP_CLIENT EndPointType = 2
WS_CLIENT EndPointType = 3
WSS_CLIENT EndPointType = 4
UDP_SERVER EndPointType = 6
TCP_SERVER EndPointType = 7
WS_SERVER EndPointType = 8
WSS_SERVER EndPointType = 9
)
var EndPointType_name = map[int32]string{
0: "CONNECTED_UDP_CLIENT",
1: "UNCONNECTED_UDP_CLIENT",
2: "TCP_CLIENT",
3: "WS_CLIENT",
4: "WSS_CLIENT",
6: "UDP_SERVER",
7: "TCP_SERVER",
8: "WS_SERVER",
9: "WSS_SERVER",
}
var EndPointType_value = map[string]int32{
"CONNECTED_UDP_CLIENT": 0,
"UNCONNECTED_UDP_CLIENT": 1,
"TCP_CLIENT": 2,
"WS_CLIENT": 3,
"WSS_CLIENT": 4,
"UDP_SERVER": 6,
"TCP_SERVER": 7,
"WS_SERVER": 8,
"WSS_SERVER": 9,
}
func (x EndPointType) String() string {
s, ok := EndPointType_name[int32(x)]
if ok {
return s
}
return strconv.Itoa(int(x))
}
...@@ -34,20 +34,13 @@ var ( ...@@ -34,20 +34,13 @@ var (
serverFastFailTimeout = gxtime.TimeSecondDuration(1) serverFastFailTimeout = gxtime.TimeSecondDuration(1)
) )
const (
UDP_SERVER = 1
TCP_SERVER = 2
WS_SERVER = 3
WSS_SERVER = 4
)
type Server struct { type Server struct {
// net // net
addr string addr string
pktListener net.PacketConn pktListener net.PacketConn
streamListener net.Listener streamListener net.Listener
lock sync.Mutex // for server lock sync.Mutex // for server
typ int endPointType EndPointType
server *http.Server // for ws or wss server server *http.Server // for ws or wss server
// websocket // websocket
...@@ -65,9 +58,9 @@ type Server struct { ...@@ -65,9 +58,9 @@ type Server struct {
// @addr server listen address. // @addr server listen address.
func NewTCPServer(addr string) *Server { func NewTCPServer(addr string) *Server {
return &Server{ return &Server{
typ: TCP_SERVER, endPointType: TCP_SERVER,
done: make(chan gxsync.Empty), done: make(chan gxsync.Empty),
addr: addr, addr: addr,
} }
} }
...@@ -75,9 +68,9 @@ func NewTCPServer(addr string) *Server { ...@@ -75,9 +68,9 @@ func NewTCPServer(addr string) *Server {
// @addr server listen address. // @addr server listen address.
func NewUDPPServer(addr string) *Server { func NewUDPPServer(addr string) *Server {
return &Server{ return &Server{
typ: UDP_SERVER, endPointType: UDP_SERVER,
done: make(chan gxsync.Empty), done: make(chan gxsync.Empty),
addr: addr, addr: addr,
} }
} }
...@@ -86,10 +79,10 @@ func NewUDPPServer(addr string) *Server { ...@@ -86,10 +79,10 @@ func NewUDPPServer(addr string) *Server {
// @path: websocket request url path // @path: websocket request url path
func NewWSServer(addr string, path string) *Server { func NewWSServer(addr string, path string) *Server {
return &Server{ return &Server{
typ: WS_SERVER, endPointType: WS_SERVER,
done: make(chan gxsync.Empty), done: make(chan gxsync.Empty),
addr: addr, addr: addr,
path: path, path: path,
} }
} }
...@@ -101,16 +94,20 @@ func NewWSServer(addr string, path string) *Server { ...@@ -101,16 +94,20 @@ func NewWSServer(addr string, path string) *Server {
// @caCert: root certificate file. to verify the legitimacy of client. it can be nil. // @caCert: root certificate file. to verify the legitimacy of client. it can be nil.
func NewWSSServer(addr, path, cert, privateKey, caCert string) *Server { func NewWSSServer(addr, path, cert, privateKey, caCert string) *Server {
return &Server{ return &Server{
typ: WSS_SERVER, endPointType: WSS_SERVER,
done: make(chan gxsync.Empty), done: make(chan gxsync.Empty),
addr: addr, addr: addr,
path: path, path: path,
cert: cert, cert: cert,
privateKey: privateKey, privateKey: privateKey,
caCert: caCert, caCert: caCert,
} }
} }
func (s Server) Type() EndPointType {
return s.endPointType
}
func (s *Server) stop() { func (s *Server) stop() {
var ( var (
err error err error
...@@ -202,7 +199,7 @@ func (s *Server) listenUDP() error { ...@@ -202,7 +199,7 @@ func (s *Server) listenUDP() error {
// Listen announces on the local network address. // Listen announces on the local network address.
func (s *Server) listen() error { func (s *Server) listen() error {
switch s.typ { switch s.endPointType {
case TCP_SERVER, WS_SERVER, WSS_SERVER: case TCP_SERVER, WS_SERVER, WSS_SERVER:
return s.listenTCP() return s.listenTCP()
case UDP_SERVER: case UDP_SERVER:
...@@ -445,7 +442,7 @@ func (s *Server) RunEventloop(newSession NewSessionCallback) { ...@@ -445,7 +442,7 @@ func (s *Server) RunEventloop(newSession NewSessionCallback) {
panic(fmt.Errorf("Server.listen() = error:%#v", err)) panic(fmt.Errorf("Server.listen() = error:%#v", err))
} }
switch s.typ { switch s.endPointType {
case TCP_SERVER: case TCP_SERVER:
s.runTcpEventloop(newSession) s.runTcpEventloop(newSession)
case UDP_SERVER: case UDP_SERVER:
...@@ -454,6 +451,8 @@ func (s *Server) RunEventloop(newSession NewSessionCallback) { ...@@ -454,6 +451,8 @@ func (s *Server) RunEventloop(newSession NewSessionCallback) {
s.runWSEventLoop(newSession) s.runWSEventLoop(newSession)
case WSS_SERVER: case WSS_SERVER:
s.runWSSEventLoop(newSession) s.runWSSEventLoop(newSession)
default:
panic(fmt.Sprintf("illegal server type %s", s.endPointType.String()))
} }
} }
......
...@@ -524,7 +524,7 @@ LOOP: ...@@ -524,7 +524,7 @@ LOOP:
case outPkg = <-s.wQ: case outPkg = <-s.wQ:
if flag { if flag {
if err = s.writer.Write(s, outPkg); err != nil { if err = s.writer.Write(s, outPkg); err != nil {
log.Error("%s, [session.handleLoop] = error{%#v}", s.sessionToken(), err) log.Error("%s, [session.handleLoop] = error{%+v}", s.sessionToken(), err)
s.stop() s.stop()
flag = false flag = false
// break LOOP // break LOOP
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment