Commit 78dcd589 authored by AlexStocks's avatar AlexStocks

add Client & Server interface

parent 34ccf56d
......@@ -17,6 +17,7 @@
- 2018/03/17
> improvement
* add end point type
* add Client & Server interface
- 2018/03/16
> bug fix
......
......@@ -37,7 +37,7 @@ const (
// getty tcp client
/////////////////////////////////////////
type Client struct {
type client struct {
// net
sync.Mutex
endPointType EndPointType
......@@ -61,7 +61,7 @@ type Client struct {
// @connNum is connection number.
// @connInterval is reconnect sleep interval when getty fails to connect the server.
// @serverAddr is server address.
func NewTCPClient(connNum int, connInterval time.Duration, serverAddr string) *Client {
func NewTCPClient(connNum int, connInterval time.Duration, serverAddr string) Client {
if connNum <= 0 || serverAddr == "" {
panic(fmt.Sprintf("@connNum:%d, @serverAddr:%s", connNum, serverAddr))
}
......@@ -69,7 +69,7 @@ func NewTCPClient(connNum int, connInterval time.Duration, serverAddr string) *C
connInterval = defaultInterval
}
return &Client{
return &client{
endPointType: TCP_CLIENT,
number: connNum,
interval: connInterval,
......@@ -83,7 +83,7 @@ func NewTCPClient(connNum int, connInterval time.Duration, serverAddr string) *C
// @connNum is connection number.
// @connInterval is reconnect sleep interval when getty fails to connect the server.
// @serverAddr is server address. if this value is none-nil-string, getty will build some connected udp clients.
func NewUDPClient(connNum int, connInterval time.Duration, serverAddr string) *Client {
func NewUDPClient(connNum int, connInterval time.Duration, serverAddr string) Client {
var endPointType = UNCONNECTED_UDP_CLIENT
if len(serverAddr) != 0 {
if connNum <= 0 {
......@@ -98,7 +98,7 @@ func NewUDPClient(connNum int, connInterval time.Duration, serverAddr string) *C
connInterval = defaultInterval
}
return &Client{
return &client{
endPointType: endPointType,
number: connNum,
interval: connInterval,
......@@ -112,7 +112,7 @@ func NewUDPClient(connNum int, connInterval time.Duration, serverAddr string) *C
// @connNum is connection number.
// @connInterval is reconnect sleep interval when getty fails to connect the server.
// @serverAddr is server address. its prefix should be "ws://".
func NewWSClient(connNum int, connInterval time.Duration, serverAddr string) *Client {
func NewWSClient(connNum int, connInterval time.Duration, serverAddr string) Client {
if connNum <= 0 || serverAddr == "" {
panic(fmt.Sprintf("@connNum:%d, @serverAddr:%s", connNum, serverAddr))
}
......@@ -124,7 +124,7 @@ func NewWSClient(connNum int, connInterval time.Duration, serverAddr string) *Cl
return nil
}
return &Client{
return &client{
endPointType: WS_CLIENT,
number: connNum,
interval: connInterval,
......@@ -141,7 +141,7 @@ func NewWSClient(connNum int, connInterval time.Duration, serverAddr string) *Cl
// @cert is client certificate file. it can be emtpy.
// @privateKey is client private key(contains its public key). it can be empty.
// @caCert is the root certificate file to verify the legitimacy of server
func NewWSSClient(connNum int, connInterval time.Duration, serverAddr string, cert string) *Client {
func NewWSSClient(connNum int, connInterval time.Duration, serverAddr string, cert string) Client {
if connNum <= 0 || serverAddr == "" || cert == "" {
panic(fmt.Sprintf("@connNum:%d, @serverAddr:%s, @cert:%s", connNum, serverAddr, cert))
}
......@@ -157,7 +157,7 @@ func NewWSSClient(connNum int, connInterval time.Duration, serverAddr string, ce
return nil
}
return &Client{
return &client{
endPointType: WSS_CLIENT,
number: connNum,
interval: connInterval,
......@@ -168,11 +168,11 @@ func NewWSSClient(connNum int, connInterval time.Duration, serverAddr string, ce
}
}
func (c Client) Type() EndPointType {
func (c client) Type() EndPointType {
return c.endPointType
}
func (c *Client) dialTCP() Session {
func (c *client) dialTCP() Session {
var (
err error
conn net.Conn
......@@ -195,7 +195,7 @@ func (c *Client) dialTCP() Session {
}
}
func (c *Client) dialUDP() Session {
func (c *client) dialUDP() Session {
var (
err error
conn *net.UDPConn
......@@ -227,7 +227,7 @@ func (c *Client) dialUDP() Session {
}
}
func (c *Client) dialWS() Session {
func (c *client) dialWS() Session {
var (
err error
dialer websocket.Dialer
......@@ -259,7 +259,7 @@ func (c *Client) dialWS() Session {
}
}
func (c *Client) dialWSS() Session {
func (c *client) dialWSS() Session {
var (
err error
root *x509.Certificate
......@@ -336,7 +336,7 @@ func (c *Client) dialWSS() Session {
}
}
func (c *Client) dial() Session {
func (c *client) dial() Session {
switch c.endPointType {
case TCP_CLIENT:
return c.dialTCP()
......@@ -351,7 +351,7 @@ func (c *Client) dial() Session {
return nil
}
func (c *Client) sessionNum() int {
func (c *client) sessionNum() int {
var num int
c.Lock()
......@@ -366,7 +366,7 @@ func (c *Client) sessionNum() int {
return num
}
func (c *Client) connect() {
func (c *client) connect() {
var (
err error
ss Session
......@@ -393,7 +393,7 @@ func (c *Client) connect() {
}
}
func (c *Client) RunEventLoop(newSession NewSessionCallback) {
func (c *client) RunEventLoop(newSession NewSessionCallback) {
c.Lock()
c.newSession = newSession
c.Unlock()
......@@ -434,7 +434,7 @@ func (c *Client) RunEventLoop(newSession NewSessionCallback) {
}()
}
func (c *Client) stop() {
func (c *client) stop() {
select {
case <-c.done:
return
......@@ -451,7 +451,7 @@ func (c *Client) stop() {
}
}
func (c *Client) IsClosed() bool {
func (c *client) IsClosed() bool {
select {
case <-c.done:
return true
......@@ -460,7 +460,7 @@ func (c *Client) IsClosed() bool {
}
}
func (c *Client) Close() {
func (c *client) Close() {
c.stop()
c.wg.Wait()
}
......@@ -9,6 +9,10 @@
package getty
import (
"net"
)
// NewSessionCallback will be invoked when server accepts a new client connection or client connects to server successfully.
// If there are too many client connections or u do not want to connect a server again, u can return non-nil error. And
// then getty will close the new session.
......@@ -56,3 +60,19 @@ type EventListener interface {
// If this is a udp event listener, the second parameter type is UDPContext.
OnMessage(Session, interface{})
}
type EndPoint interface {
Type() EndPointType
RunEventLoop(newSession NewSessionCallback)
IsClosed() bool
Close()
}
type Client interface {
EndPoint
}
type Server interface {
EndPoint
Listener() net.Listener
}
......@@ -34,7 +34,7 @@ var (
serverFastFailTimeout = gxtime.TimeSecondDuration(1)
)
type Server struct {
type server struct {
// net
addr string
pktListener net.PacketConn
......@@ -56,12 +56,12 @@ type Server struct {
// NewTCServer builds a tcp server.
// @addr server listen address.
func NewTCPServer(addr string) *Server {
func NewTCPServer(addr string) Server {
if addr == "" {
panic(fmt.Sprintf("@addr:%s", addr))
}
return &Server{
return &server{
endPointType: TCP_SERVER,
done: make(chan gxsync.Empty),
addr: addr,
......@@ -70,12 +70,12 @@ func NewTCPServer(addr string) *Server {
// NewUDPServer builds a unconnected udp server.
// @addr server listen address.
func NewUDPPServer(addr string) *Server {
func NewUDPPServer(addr string) Server {
if addr == "" {
panic(fmt.Sprintf("@addr:%s", addr))
}
return &Server{
return &server{
endPointType: UDP_SERVER,
done: make(chan gxsync.Empty),
addr: addr,
......@@ -85,12 +85,12 @@ func NewUDPPServer(addr string) *Server {
// NewWSServer builds a websocket server.
// @addr server listen address.
// @path: websocket request url path
func NewWSServer(addr string, path string) *Server {
func NewWSServer(addr string, path string) Server {
if addr == "" {
panic(fmt.Sprintf("@addr:%s", addr))
}
return &Server{
return &server{
endPointType: WS_SERVER,
done: make(chan gxsync.Empty),
addr: addr,
......@@ -104,12 +104,12 @@ func NewWSServer(addr string, path string) *Server {
// @cert: server certificate file
// @privateKey: server private key(contains its public key)
// @caCert: root certificate file. to verify the legitimacy of client. it can be nil.
func NewWSSServer(addr, path, cert, privateKey, caCert string) *Server {
func NewWSSServer(addr, path, cert, privateKey, caCert string) Server {
if addr == "" || cert == "" || privateKey == "" || caCert == "" {
panic(fmt.Sprintf("@addr:%s, @cert:%s, @privateKey:%s, @caCert:%s", addr, cert, privateKey, caCert))
}
return &Server{
return &server{
endPointType: WSS_SERVER,
done: make(chan gxsync.Empty),
addr: addr,
......@@ -120,11 +120,11 @@ func NewWSSServer(addr, path, cert, privateKey, caCert string) *Server {
}
}
func (s Server) Type() EndPointType {
func (s server) Type() EndPointType {
return s.endPointType
}
func (s *Server) stop() {
func (s *server) stop() {
var (
err error
ctx context.Context
......@@ -149,7 +149,7 @@ func (s *Server) stop() {
s.lock.Unlock()
if s.streamListener != nil {
// 把streamListener.Close放在这里,既能防止多次关闭调用,
// 又能及时让Server因accept返回错误而从RunEventloop退出
// 又能及时让Server因accept返回错误而从RunEventLoop退出
s.streamListener.Close()
s.streamListener = nil
}
......@@ -161,7 +161,7 @@ func (s *Server) stop() {
}
}
func (s *Server) IsClosed() bool {
func (s *server) IsClosed() bool {
select {
case <-s.done:
return true
......@@ -173,7 +173,7 @@ func (s *Server) IsClosed() bool {
// net.ipv4.tcp_max_syn_backlog
// net.ipv4.tcp_timestamps
// net.ipv4.tcp_tw_recycle
func (s *Server) listenTCP() error {
func (s *server) listenTCP() error {
var (
err error
streamListener net.Listener
......@@ -189,7 +189,7 @@ func (s *Server) listenTCP() error {
return nil
}
func (s *Server) listenUDP() error {
func (s *server) listenUDP() error {
var (
err error
localAddr *net.UDPAddr
......@@ -214,7 +214,7 @@ func (s *Server) listenUDP() error {
}
// Listen announces on the local network address.
func (s *Server) listen() error {
func (s *server) listen() error {
switch s.endPointType {
case TCP_SERVER, WS_SERVER, WSS_SERVER:
return s.listenTCP()
......@@ -225,7 +225,7 @@ func (s *Server) listen() error {
return nil
}
func (s *Server) accept(newSession NewSessionCallback) (Session, error) {
func (s *server) accept(newSession NewSessionCallback) (Session, error) {
conn, err := s.streamListener.Accept()
if err != nil {
return nil, err
......@@ -245,7 +245,7 @@ func (s *Server) accept(newSession NewSessionCallback) (Session, error) {
return ss, nil
}
func (s *Server) runTcpEventloop(newSession NewSessionCallback) {
func (s *server) runTcpEventLoop(newSession NewSessionCallback) {
s.wg.Add(1)
go func() {
defer s.wg.Done()
......@@ -256,7 +256,7 @@ func (s *Server) runTcpEventloop(newSession NewSessionCallback) {
)
for {
if s.IsClosed() {
log.Warn("Server{%s} stop acceptting client connect request.", s.addr)
log.Warn("server{%s} stop acceptting client connect request.", s.addr)
return
}
if delay != 0 {
......@@ -275,7 +275,7 @@ func (s *Server) runTcpEventloop(newSession NewSessionCallback) {
}
continue
}
log.Warn("Server{%s}.Accept() = err {%#v}", s.addr, err)
log.Warn("server{%s}.Accept() = err {%#v}", s.addr, err)
continue
}
delay = 0
......@@ -285,7 +285,7 @@ func (s *Server) runTcpEventloop(newSession NewSessionCallback) {
}()
}
func (s *Server) runUDPEventloop(newSession NewSessionCallback) {
func (s *server) runUDPEventLoop(newSession NewSessionCallback) {
var (
ss Session
)
......@@ -299,12 +299,12 @@ func (s *Server) runUDPEventloop(newSession NewSessionCallback) {
type wsHandler struct {
http.ServeMux
server *Server
server *server
newSession NewSessionCallback
upgrader websocket.Upgrader
}
func newWSHandler(server *Server, newSession NewSessionCallback) *wsHandler {
func newWSHandler(server *server, newSession NewSessionCallback) *wsHandler {
return &wsHandler{
server: server,
newSession: newSession,
......@@ -326,7 +326,7 @@ func (s *wsHandler) serveWSRequest(w http.ResponseWriter, r *http.Request) {
if s.server.IsClosed() {
http.Error(w, "HTTP server is closed(code:500-11).", 500)
log.Warn("Server{%s} stop acceptting client connect request.", s.server.addr)
log.Warn("server{%s} stop acceptting client connect request.", s.server.addr)
return
}
......@@ -344,7 +344,7 @@ func (s *wsHandler) serveWSRequest(w http.ResponseWriter, r *http.Request) {
err = s.newSession(ss)
if err != nil {
conn.Close()
log.Warn("Server{%s}.newSession(ss{%#v}) = err {%#v}", s.server.addr, ss, err)
log.Warn("server{%s}.newSession(ss{%#v}) = err {%#v}", s.server.addr, ss, err)
return
}
if ss.(*session).maxMsgLen > 0 {
......@@ -357,7 +357,7 @@ func (s *wsHandler) serveWSRequest(w http.ResponseWriter, r *http.Request) {
// runWSEventLoop serve websocket client request
// @newSession: new websocket connection callback
// @path: websocket request url path
func (s *Server) runWSEventLoop(newSession NewSessionCallback) {
func (s *server) runWSEventLoop(newSession NewSessionCallback) {
s.wg.Add(1)
go func() {
defer s.wg.Done()
......@@ -379,7 +379,7 @@ func (s *Server) runWSEventLoop(newSession NewSessionCallback) {
s.lock.Unlock()
err = server.Serve(s.streamListener)
if err != nil {
log.Error("http.Server.Serve(addr{%s}) = err{%#v}", s.addr, err)
log.Error("http.server.Serve(addr{%s}) = err{%#v}", s.addr, err)
// panic(err)
}
}()
......@@ -392,7 +392,7 @@ func (s *Server) runWSEventLoop(newSession NewSessionCallback) {
// @cert: server certificate file
// @privateKey: server private key(contains its public key)
// @caCert: root certificate file. to verify the legitimacy of client. it can be nil.
func (s *Server) runWSSEventLoop(newSession NewSessionCallback) {
func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
s.wg.Add(1)
go func() {
var (
......@@ -445,24 +445,24 @@ func (s *Server) runWSSEventLoop(newSession NewSessionCallback) {
s.lock.Unlock()
err = server.Serve(tls.NewListener(s.streamListener, config))
if err != nil {
log.Error("http.Server.Serve(addr{%s}) = err{%#v}", s.addr, err)
log.Error("http.server.Serve(addr{%s}) = err{%#v}", s.addr, err)
panic(err)
}
}()
}
// RunEventloop serves client request.
// RunEventLoop serves client request.
// @newSession: new connection callback
func (s *Server) RunEventloop(newSession NewSessionCallback) {
func (s *server) RunEventLoop(newSession NewSessionCallback) {
if err := s.listen(); err != nil {
panic(fmt.Errorf("Server.listen() = error:%#v", err))
panic(fmt.Errorf("server.listen() = error:%#v", err))
}
switch s.endPointType {
case TCP_SERVER:
s.runTcpEventloop(newSession)
s.runTcpEventLoop(newSession)
case UDP_SERVER:
s.runUDPEventloop(newSession)
s.runUDPEventLoop(newSession)
case WS_SERVER:
s.runWSEventLoop(newSession)
case WSS_SERVER:
......@@ -472,11 +472,11 @@ func (s *Server) RunEventloop(newSession NewSessionCallback) {
}
}
func (s *Server) Listener() net.Listener {
func (s *server) Listener() net.Listener {
return s.streamListener
}
func (s *Server) Close() {
func (s *server) Close() {
s.stop()
s.wg.Wait()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment