Commit 09fe6560 authored by AlexStocks's avatar AlexStocks

add udp client & server

parent 5eb2164e
......@@ -33,6 +33,14 @@ const (
maxTimes = 10
)
const (
CONNECTED_UDP_CLIENT = 1
UNCONNECTED_UDP_CLIENT = 2
TCP_CLIENT = 3
WS_CLIENT = 4
WSS_CLIENT = 5
)
/////////////////////////////////////////
// getty tcp client
/////////////////////////////////////////
......@@ -40,6 +48,7 @@ const (
type Client struct {
// net
sync.Mutex
typ int
number int
interval time.Duration
addr string
......@@ -51,22 +60,77 @@ type Client struct {
wg sync.WaitGroup
// for wss client
cert string // 服务端的证书文件(包含了公钥以及服务端其他一些验证信息:服务端域名、服务端ip、起始有效日期、有效时长、hash算法、秘钥长度等)
// 服务端的证书文件(包含了公钥以及服务端其他一些验证信息:服务端域名、
// 服务端ip、起始有效日期、有效时长、hash算法、秘钥长度等)
cert string
}
// NewClient function builds a tcp & ws client.
// NewTcpClient function builds a tcp client.
// @connNum is connection number.
// @connInterval is reconnect sleep interval when getty fails to connect the server.
// @serverAddr is server address.
func NewClient(connNum int, connInterval time.Duration, serverAddr string) *Client {
if connNum < 0 {
func NewTCPClient(connNum int, connInterval time.Duration, serverAddr string) *Client {
if connNum <= 0 {
connNum = 1
}
if connInterval < defaultInterval {
connInterval = defaultInterval
}
return &Client{
typ: TCP_CLIENT,
number: connNum,
interval: connInterval,
addr: serverAddr,
ssMap: make(map[Session]gxsync.Empty, connNum),
done: make(chan gxsync.Empty),
}
}
// NewUdpClient function builds a udp client
// @connNum is connection number. If this value is non-zero, getty will build
// some connected udp clients.
//
// @connInterval is reconnect sleep interval when getty fails to connect the server.
// @serverAddr is server address.
func NewUDPClient(connNum int, connInterval time.Duration, serverAddr string) *Client {
var typ int = CONNECTED_UDP_CLIENT
if connNum <= 0 {
connNum = 1
typ = UNCONNECTED_UDP_CLIENT
}
if connInterval < defaultInterval {
connInterval = defaultInterval
}
return &Client{
typ: typ,
number: connNum,
interval: connInterval,
addr: serverAddr,
ssMap: make(map[Session]gxsync.Empty, connNum),
done: make(chan gxsync.Empty),
}
}
// NewWsClient function builds a ws client.
// @connNum is connection number.
// @connInterval is reconnect sleep interval when getty fails to connect the server.
// @serverAddr is server address. its prefix should be "ws://".
func NewWSClient(connNum int, connInterval time.Duration, serverAddr string) *Client {
if connNum <= 0 {
connNum = 1
}
if connInterval < defaultInterval {
connInterval = defaultInterval
}
if !strings.HasPrefix(serverAddr, "ws://") {
return nil
}
return &Client{
typ: WS_CLIENT,
number: connNum,
interval: connInterval,
addr: serverAddr,
......@@ -75,7 +139,7 @@ func NewClient(connNum int, connInterval time.Duration, serverAddr string) *Clie
}
}
// NewClient function builds a wss client.
// NewWSSClient function builds a wss client.
// @connNum is connection number.
// @connInterval is reconnect sleep interval when getty fails to connect the server.
// @serverAddr is server address.
......@@ -86,17 +150,21 @@ func NewWSSClient(
connNum int,
connInterval time.Duration,
serverAddr string,
cert string,
) *Client {
cert string) *Client {
if connNum < 0 {
if connNum <= 0 {
connNum = 1
}
if connInterval < defaultInterval {
connInterval = defaultInterval
}
if !strings.HasPrefix(serverAddr, "wss://") {
return nil
}
return &Client{
typ: WSS_CLIENT,
number: connNum,
interval: connInterval,
addr: serverAddr,
......@@ -126,7 +194,38 @@ func (c *Client) dialTCP() Session {
log.Info("net.DialTimeout(addr:%s, timeout:%v) = error{%v}", c.addr, err)
time.Sleep(c.interval)
continue
}
}
func (c *Client) dialUDP() Session {
var (
err error
conn *net.UDPConn
localAddr *net.UDPAddr
peerAddr *net.UDPAddr
)
localAddr = &net.UDPAddr{IP: net.IPv4zero, Port: 0}
peerAddr, _ = net.ResolveUDPAddr("udp", c.addr)
for {
if c.IsClosed() {
return nil
}
if UNCONNECTED_UDP_CLIENT == c.typ {
conn, err = net.ListenUDP("udp", localAddr)
} else {
conn, err = net.DialUDP("udp", localAddr, peerAddr)
if err == nil && conn.LocalAddr().String() == conn.RemoteAddr().String() {
err = errSelfConnect
}
peerAddr = nil // for connected session
}
if err == nil {
return NewUDPSession(conn, peerAddr)
}
log.Info("net.DialTimeout(addr:%s, timeout:%v) = error{%v}", c.addr, err)
time.Sleep(c.interval)
}
}
......@@ -159,7 +258,6 @@ func (c *Client) dialWS() Session {
log.Info("websocket.dialer.Dial(addr:%s) = error{%v}", c.addr, err)
time.Sleep(c.interval)
continue
}
}
......@@ -237,20 +335,22 @@ func (c *Client) dialWSS() Session {
log.Info("websocket.dialer.Dial(addr:%s) = error{%v}", c.addr, err)
time.Sleep(c.interval)
continue
}
}
func (c *Client) dial() Session {
if strings.HasPrefix(c.addr, "wss") {
return c.dialWSS()
}
if strings.HasPrefix(c.addr, "ws") {
switch c.typ {
case TCP_CLIENT:
return c.dialTCP()
case UNCONNECTED_UDP_CLIENT, CONNECTED_UDP_CLIENT:
return c.dialUDP()
case WS_CLIENT:
return c.dialWS()
case WSS_CLIENT:
return c.dialWSS()
}
return c.dialTCP()
return nil
}
func (c *Client) sessionNum() int {
......@@ -301,6 +401,7 @@ func (c *Client) RunEventLoop(newSession NewSessionCallback) {
c.Unlock()
c.wg.Add(1)
// a for-loop goroutine to make sure the connection is valid
go func() {
var num, max, times int
defer c.wg.Done()
......@@ -327,6 +428,9 @@ func (c *Client) RunEventLoop(newSession NewSessionCallback) {
}
times = 0
c.connect()
if c.typ == UNCONNECTED_UDP_CLIENT || c.typ == CONNECTED_UDP_CLIENT {
break
}
// time.Sleep(c.interval) // build c.number connections asap
}
}()
......
......@@ -10,7 +10,7 @@
package getty
// NewSessionCallback will be invoked when server accepts a new client connection or client connects to server successfully.
// if there are too many client connections or u do not want to connect a server again, u can return non-nil error. And
// If there are too many client connections or u do not want to connect a server again, u can return non-nil error. And
// then getty will close the new session.
type NewSessionCallback func(Session) error
......@@ -33,7 +33,7 @@ type ReadWriter interface {
Writer
}
// EventListener is used to process pkg that recved from remote session
// EventListener is used to process pkg that received from remote session
type EventListener interface {
// invoked when session opened
// If the return error is not nil, @Session will be closed.
......@@ -49,7 +49,7 @@ type EventListener interface {
OnCron(Session)
// invoked when receive packge. Pls attention that do not handle long time logic processing in this func.
// Y'd better set the package's maximum length. If the message's length is greater than it, u should
// You'd better set the package's maximum length. If the message's length is greater than it, u should
// should return err in Reader{Read} and getty will close this connection soon.
OnMessage(Session, interface{})
}
......@@ -24,6 +24,8 @@ import (
log "github.com/AlexStocks/log4go"
"github.com/golang/snappy"
"github.com/gorilla/websocket"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
)
var (
......@@ -53,7 +55,7 @@ const (
type Connection interface {
ID() uint32
SetCompressType(t CompressType)
SetCompressType(CompressType)
LocalAddr() string
RemoteAddr() string
incReadPkgCount()
......@@ -68,7 +70,7 @@ type Connection interface {
writeDeadline() time.Duration
// SetWriteDeadlile sets deadline for the future read calls.
SetWriteDeadline(time.Duration)
Write(p []byte) error
Write(interface{}) (int, error)
// don't distinguish between tcp connection and websocket connection. Because
// gorilla/websocket/conn.go:(Conn)Close also invoke net.Conn.Close
close(int)
......@@ -87,8 +89,8 @@ type gettyConn struct {
compress CompressType
padding1 uint8
padding2 uint16
readCount uint32 // read() count
writeCount uint32 // write() count
readCount uint32 // read count
writeCount uint32 // write count
readPkgCount uint32 // send pkg count
writePkgCount uint32 // recv pkg count
active int64 // last active, in milliseconds
......@@ -128,8 +130,8 @@ func (c *gettyConn) GetActive() time.Time {
return launchTime.Add(time.Duration(atomic.LoadInt64(&(c.active))))
}
func (c *gettyConn) Write([]byte) error {
return nil
func (c *gettyConn) Write(interface{}) (int, error) {
return 0, nil
}
func (c *gettyConn) close(int) {}
......@@ -273,11 +275,17 @@ func (t *gettyTCPConn) read(p []byte) (int, error) {
}
// tcp connection write
func (t *gettyTCPConn) Write(p []byte) error {
func (t *gettyTCPConn) Write(pkg interface{}) (int, error) {
var (
err error
currentTime time.Time
ok bool
p []byte
)
if p, ok = pkg.([]byte); !ok {
return 0, fmt.Errorf("illegal @pkg{%#v} type", pkg)
}
if t.wDeadline > 0 {
// Optimization: update write deadline only if more than 25%
// of the last write deadline exceeded.
......@@ -285,15 +293,14 @@ func (t *gettyTCPConn) Write(p []byte) error {
currentTime = wheel.Now()
if currentTime.Sub(t.wLastDeadline) > (t.wDeadline >> 2) {
if err = t.conn.SetWriteDeadline(currentTime.Add(t.wDeadline)); err != nil {
return err
return 0, err
}
t.wLastDeadline = currentTime
}
}
atomic.AddUint32(&t.writeCount, (uint32)(len(p)))
_, err = t.writer.Write(p)
return err
return t.writer.Write(p)
}
// close tcp connection
......@@ -434,11 +441,17 @@ func (w *gettyWSConn) read() ([]byte, error) {
}
// websocket connection write
func (w *gettyWSConn) Write(p []byte) error {
func (w *gettyWSConn) Write(pkg interface{}) (int, error) {
var (
err error
currentTime time.Time
ok bool
p []byte
)
if p, ok = pkg.([]byte); !ok {
return 0, fmt.Errorf("illegal @pkg{%#v} type", pkg)
}
if w.wDeadline > 0 {
// Optimization: update write deadline only if more than 25%
// of the last write deadline exceeded.
......@@ -446,7 +459,7 @@ func (w *gettyWSConn) Write(p []byte) error {
currentTime = wheel.Now()
if currentTime.Sub(w.wLastDeadline) > (w.wDeadline >> 2) {
if err = w.conn.SetWriteDeadline(currentTime.Add(w.wDeadline)); err != nil {
return err
return 0, err
}
w.wLastDeadline = currentTime
}
......@@ -455,7 +468,7 @@ func (w *gettyWSConn) Write(p []byte) error {
// atomic.AddUint32(&w.writeCount, 1)
atomic.AddUint32(&w.writeCount, (uint32)(len(p)))
// w.conn.SetWriteDeadline(time.Now().Add(w.wDeadline))
return w.conn.WriteMessage(websocket.BinaryMessage, p)
return len(p), w.conn.WriteMessage(websocket.BinaryMessage, p)
}
func (w *gettyWSConn) writePing() error {
......@@ -473,3 +486,150 @@ func (w *gettyWSConn) close(waitSec int) {
}
w.conn.Close()
}
/////////////////////////////////////////
// getty udp connection
/////////////////////////////////////////
type UDPContext struct {
Pkg []byte
PeerAddr *net.UDPAddr
}
type gettyUDPConn struct {
gettyConn
peerAddr *net.UDPAddr // for client
compressType CompressType
conn *net.UDPConn // for server
}
func setUDPSocketOptions(conn *net.UDPConn) error {
// Try setting the flags for both families and ignore the errors unless they
// both error.
err6 := ipv6.NewPacketConn(conn).SetControlMessage(ipv6.FlagDst|ipv6.FlagInterface, true)
err4 := ipv4.NewPacketConn(conn).SetControlMessage(ipv4.FlagDst|ipv4.FlagInterface, true)
if err6 != nil && err4 != nil {
return err4
}
return nil
}
// create gettyUDPConn
func newGettyUDPConn(conn *net.UDPConn, peerUDPAddr *net.UDPAddr) *gettyUDPConn {
if conn == nil {
panic("newGettyWSConn(conn):@conn is nil")
}
var localAddr, peerAddr string
if conn.LocalAddr() != nil {
localAddr = conn.LocalAddr().String()
}
if conn.RemoteAddr() != nil {
// connected udp
peerAddr = conn.RemoteAddr().String()
} else if peerUDPAddr != nil {
// unconnected udp
peerAddr = peerUDPAddr.String()
}
return &gettyUDPConn{
conn: conn,
peerAddr: peerUDPAddr,
gettyConn: gettyConn{
id: atomic.AddUint32(&connID, 1),
local: localAddr,
peer: peerAddr,
compress: CompressNone,
},
}
}
func (u *gettyUDPConn) SetCompressType(c CompressType) {
switch c {
case CompressNone, CompressZip, CompressBestSpeed, CompressBestCompression, CompressHuffman, CompressSnappy:
u.compressType = c
default:
panic(fmt.Sprintf("illegal comparess type %d", c))
}
}
// udp connection read
func (u *gettyUDPConn) read(p []byte) (int, *net.UDPAddr, error) {
var (
err error
currentTime time.Time
length int
addr *net.UDPAddr
)
if u.rDeadline > 0 {
// Optimization: update read deadline only if more than 25%
// of the last read deadline exceeded.
// See https://github.com/golang/go/issues/15133 for details.
currentTime = wheel.Now()
if currentTime.Sub(u.rLastDeadline) > (u.rDeadline >> 2) {
if err = u.conn.SetReadDeadline(currentTime.Add(u.rDeadline)); err != nil {
return 0, nil, err
}
u.rLastDeadline = currentTime
}
}
if u.peerAddr == nil {
length, addr, err = u.conn.ReadFromUDP(p)
} else {
length, err = u.conn.Read(p)
}
if err == nil {
atomic.AddUint32(&u.readCount, uint32(length))
}
return length, addr, err
}
// write udp packet, @ctx should be of type UDPContext
func (u *gettyUDPConn) Write(udpCtx interface{}) (int, error) {
var (
err error
currentTime time.Time
length int
ok bool
ctx UDPContext
peerAddr *net.UDPAddr
)
if ctx, ok = udpCtx.(UDPContext); !ok {
return 0, fmt.Errorf("illegal @udpCtx{%#v} type", udpCtx)
}
if u.wDeadline > 0 {
// Optimization: update write deadline only if more than 25%
// of the last write deadline exceeded.
// See https://github.com/golang/go/issues/15133 for details.
currentTime = wheel.Now()
if currentTime.Sub(u.wLastDeadline) > (u.wDeadline >> 2) {
if err = u.conn.SetWriteDeadline(currentTime.Add(u.wDeadline)); err != nil {
return 0, err
}
u.wLastDeadline = currentTime
}
}
atomic.AddUint32(&u.writeCount, (uint32)(len(ctx.Pkg)))
peerAddr = ctx.PeerAddr
if u.peerAddr != nil {
peerAddr = u.peerAddr
}
length, _, err = u.conn.WriteMsgUDP(ctx.Pkg, nil, peerAddr)
return length, err
}
// close udp connection
func (u *gettyUDPConn) close() {
if u.conn != nil {
u.conn.Close()
u.conn = nil
}
}
......@@ -13,7 +13,6 @@ import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io/ioutil"
"net"
......@@ -23,11 +22,11 @@ import (
)
import (
"github.com/AlexStocks/goext/net"
"github.com/AlexStocks/goext/sync"
"github.com/AlexStocks/goext/time"
log "github.com/AlexStocks/log4go"
"github.com/gorilla/websocket"
"github.com/pkg/errors"
)
var (
......@@ -35,20 +34,81 @@ var (
serverFastFailTimeout = gxtime.TimeSecondDuration(1)
)
const (
UDP_SERVER = 1
TCP_SERVER = 2
WS_SERVER = 3
WSS_SERVER = 4
)
type Server struct {
// net
addr string
udpConn *net.UDPConn // for udp server
listener net.Listener
lock sync.Mutex // for server
lock sync.Mutex // for server
typ int
server *http.Server // for ws or wss server
// websocket
path string
cert string
privateKey string
caCert string
sync.Once
done chan gxsync.Empty
wg sync.WaitGroup
}
func NewServer() *Server {
return &Server{done: make(chan gxsync.Empty)}
// NewTCServer builds a tcp server.
func NewTCPServer() *Server {
return &Server{
typ: TCP_SERVER,
done: make(chan gxsync.Empty),
}
}
// NewUDPServer builds a unconnected udp server.
// @path: websocket request url path
func NewUDPPServer(addr string) *Server {
return &Server{
typ: UDP_SERVER,
done: make(chan gxsync.Empty),
addr: addr,
}
}
// NewWSSServer builds a websocket server.
// @path: websocket request url path
func NewWSServer(path string) *Server {
return &Server{
typ: WS_SERVER,
done: make(chan gxsync.Empty),
path: path,
}
}
// NewWSSServer builds a secure websocket server.
// @path: websocket request url path
// @cert: server certificate file
// @privateKey: server private key(contains its public key)
// @caCert: root certificate file. to verify the legitimacy of client. it can be nil.
func NewWSSServer(
path string,
cert string,
privateKey string,
caCert string,
) *Server {
return &Server{
typ: WSS_SERVER,
done: make(chan gxsync.Empty),
path: path,
cert: cert,
privateKey: privateKey,
caCert: caCert,
}
}
func (s *Server) stop() {
......@@ -88,30 +148,78 @@ func (s *Server) IsClosed() bool {
}
}
// (Server)Bind's functionality is equal to (Server)Listen.
func (s *Server) Bind(network string, host string, port int) error {
if port <= 0 {
return errors.New("port<=0 illegal")
}
return s.Listen(network, gxnet.HostAddress(host, port))
}
// net.ipv4.tcp_max_syn_backlog
// net.ipv4.tcp_timestamps
// net.ipv4.tcp_tw_recycle
func (s *Server) Listen(network string, addr string) error {
listener, err := net.Listen(network, addr)
func (s *Server) listenTCP() error {
var (
err error
listener net.Listener
)
listener, err = net.Listen("tcp", s.addr)
if err != nil {
return err
return errors.Wrapf(err, "net.Listen(tcp, addr:%s))", s.addr)
}
s.addr = addr
s.listener = listener
return nil
}
func (s *Server) RunEventloop(newSession NewSessionCallback) {
func (s *Server) listenUDP() error {
var (
err error
localAddr *net.UDPAddr
udpConn *net.UDPConn
)
localAddr, err = net.ResolveUDPAddr("udp", s.addr)
if err != nil {
return errors.Wrapf(err, "net.ResolveUDPAddr(udp, addr:%s)", s.addr)
}
udpConn, err = net.ListenUDP("udp", localAddr)
if err != nil {
return errors.Wrapf(err, "net.ListenUDP((udp, localAddr:%#v)", localAddr)
}
s.udpConn = udpConn
return nil
}
// Listen announces on the local network address.
func (s *Server) listen() error {
switch s.typ {
case TCP_SERVER, WS_SERVER, WSS_SERVER:
return s.listenTCP()
case UDP_SERVER:
return s.listenUDP()
}
return nil
}
func (s *Server) accept(newSession NewSessionCallback) (Session, error) {
conn, err := s.listener.Accept()
if err != nil {
return nil, err
}
if conn.RemoteAddr().String() == conn.LocalAddr().String() {
log.Warn("conn.localAddr{%s} == conn.RemoteAddr", conn.LocalAddr().String(), conn.RemoteAddr().String())
return nil, errSelfConnect
}
ss := NewTCPSession(conn)
err = newSession(ss)
if err != nil {
conn.Close()
return nil, err
}
return ss, nil
}
func (s *Server) runTcpEventloop(newSession NewSessionCallback) {
s.wg.Add(1)
go func() {
defer s.wg.Done()
......@@ -151,6 +259,17 @@ func (s *Server) RunEventloop(newSession NewSessionCallback) {
}()
}
func (s *Server) runUDPEventloop(newSession NewSessionCallback) {
var (
ss Session
)
ss = NewUDPSession(s.udpConn, nil)
if err := newSession(ss); err != nil {
panic(err.Error())
}
}
type wsHandler struct {
http.ServeMux
server *Server
......@@ -208,10 +327,10 @@ func (s *wsHandler) serveWSRequest(w http.ResponseWriter, r *http.Request) {
ss.(*session).run()
}
// RunWSEventLoop serve websocket client request
// runWSEventLoop serve websocket client request
// @newSession: new websocket connection callback
// @path: websocket request url path
func (s *Server) RunWSEventLoop(newSession NewSessionCallback, path string) {
func (s *Server) runWSEventLoop(newSession NewSessionCallback) {
s.wg.Add(1)
go func() {
defer s.wg.Done()
......@@ -221,7 +340,7 @@ func (s *Server) RunWSEventLoop(newSession NewSessionCallback, path string) {
server *http.Server
)
handler = newWSHandler(s, newSession)
handler.HandleFunc(path, handler.serveWSRequest)
handler.HandleFunc(s.path, handler.serveWSRequest)
server = &http.Server{
Addr: s.addr,
Handler: handler,
......@@ -246,14 +365,7 @@ func (s *Server) RunWSEventLoop(newSession NewSessionCallback, path string) {
// @cert: server certificate file
// @privateKey: server private key(contains its public key)
// @caCert: root certificate file. to verify the legitimacy of client. it can be nil.
func (s *Server) RunWSSEventLoop(
newSession NewSessionCallback,
path string,
cert string,
privateKey string,
caCert string,
) {
func (s *Server) runWSSEventLoop(newSession NewSessionCallback) {
s.wg.Add(1)
go func() {
defer s.wg.Done()
......@@ -267,8 +379,8 @@ func (s *Server) RunWSSEventLoop(
server *http.Server
)
if certificate, err = tls.LoadX509KeyPair(cert, privateKey); err != nil {
panic(fmt.Sprintf("tls.LoadX509KeyPair(cert{%s}, privateKey{%s}) = err{%#v}", cert, privateKey, err))
if certificate, err = tls.LoadX509KeyPair(s.cert, s.privateKey); err != nil {
panic(fmt.Sprintf("tls.LoadX509KeyPair(cert{%s}, privateKey{%s}) = err{%#v}", s.cert, s.privateKey, err))
return
}
config = &tls.Config{
......@@ -278,10 +390,10 @@ func (s *Server) RunWSSEventLoop(
Certificates: []tls.Certificate{certificate},
}
if caCert != "" {
certPem, err = ioutil.ReadFile(caCert)
if s.caCert != "" {
certPem, err = ioutil.ReadFile(s.caCert)
if err != nil {
panic(fmt.Errorf("ioutil.ReadFile(certFile{%s}) = err{%#v}", caCert, err))
panic(fmt.Errorf("ioutil.ReadFile(certFile{%s}) = err{%#v}", s.caCert, err))
}
certPool = x509.NewCertPool()
if ok := certPool.AppendCertsFromPEM(certPem); !ok {
......@@ -293,7 +405,7 @@ func (s *Server) RunWSSEventLoop(
}
handler = newWSHandler(s, newSession)
handler.HandleFunc(path, handler.serveWSRequest)
handler.HandleFunc(s.path, handler.serveWSRequest)
server = &http.Server{
Addr: s.addr,
Handler: handler,
......@@ -312,28 +424,27 @@ func (s *Server) RunWSSEventLoop(
}()
}
func (s *Server) Listener() net.Listener {
return s.listener
}
func (s *Server) accept(newSession NewSessionCallback) (Session, error) {
conn, err := s.listener.Accept()
if err != nil {
return nil, err
}
if conn.RemoteAddr().String() == conn.LocalAddr().String() {
log.Warn("conn.localAddr{%s} == conn.RemoteAddr", conn.LocalAddr().String(), conn.RemoteAddr().String())
return nil, errSelfConnect
// RunEventloop serves client request.
// @newSession: new connection callback
func (s *Server) RunEventloop(newSession NewSessionCallback) {
if err := s.listen(); err != nil {
panic(fmt.Errorf("Server.listen() = error:%#v", err))
}
ss := NewTCPSession(conn)
err = newSession(ss)
if err != nil {
conn.Close()
return nil, err
switch s.typ {
case TCP_SERVER:
s.runTcpEventloop(newSession)
case UDP_SERVER:
s.runUDPEventloop(newSession)
case WS_SERVER:
s.runWSEventLoop(newSession)
case WSS_SERVER:
s.runWSSEventLoop(newSession)
}
}
return ss, nil
func (s *Server) Listener() net.Listener {
return s.listener
}
func (s *Server) Close() {
......
......@@ -21,6 +21,7 @@ import (
)
import (
"github.com/AlexStocks/goext/context"
"github.com/AlexStocks/goext/sync"
"github.com/AlexStocks/goext/time"
log "github.com/AlexStocks/log4go"
......@@ -34,6 +35,7 @@ const (
pendingDuration = 3e9
defaultSessionName = "session"
defaultTCPSessionName = "tcp-session"
defaultUDPSessionName = "udp-session"
defaultWSSessionName = "ws-session"
defaultWSSSessionName = "wss-session"
outputFormat = "session %s, Read Count: %d, Write Count: %d, Read Pkg Count: %d, Write Pkg Count: %d"
......@@ -71,9 +73,9 @@ type Session interface {
SetWQLen(int)
SetWaitTime(time.Duration)
GetAttribute(string) interface{}
SetAttribute(string, interface{})
RemoveAttribute(string)
GetAttribute(interface{}) interface{}
SetAttribute(interface{}, interface{})
RemoveAttribute(interface{})
WritePkg(interface{}) error
WriteBytes([]byte) error
......@@ -101,7 +103,7 @@ type session struct {
wQ chan interface{}
// attribute
attrs map[string]interface{}
attrs *gxcontext.ValuesContext
// goroutines sync
grNum int32
lock sync.RWMutex
......@@ -113,7 +115,7 @@ func NewSession() Session {
done: make(chan gxsync.Empty),
period: period,
wait: pendingDuration,
attrs: make(map[string]interface{}),
attrs: gxcontext.NewValuesContext(nil),
}
session.SetWriteDeadline(netIOTimeout)
......@@ -129,7 +131,23 @@ func NewTCPSession(conn net.Conn) Session {
done: make(chan gxsync.Empty),
period: period,
wait: pendingDuration,
attrs: make(map[string]interface{}),
attrs: gxcontext.NewValuesContext(nil),
}
session.SetWriteDeadline(netIOTimeout)
session.SetReadDeadline(netIOTimeout)
return session
}
func NewUDPSession(conn *net.UDPConn, peerAddr *net.UDPAddr) Session {
session := &session{
name: defaultUDPSessionName,
Connection: newGettyUDPConn(conn, peerAddr),
done: make(chan gxsync.Empty),
period: period,
wait: pendingDuration,
attrs: gxcontext.NewValuesContext(nil),
}
session.SetWriteDeadline(netIOTimeout)
......@@ -145,7 +163,7 @@ func NewWSSession(conn *websocket.Conn) Session {
done: make(chan gxsync.Empty),
period: period,
wait: pendingDuration,
attrs: make(map[string]interface{}),
attrs: gxcontext.NewValuesContext(nil),
}
session.SetWriteDeadline(netIOTimeout)
......@@ -161,7 +179,7 @@ func (s *session) Reset() {
// s.errFlag = false
s.period = period
s.wait = pendingDuration
s.attrs = make(map[string]interface{})
s.attrs = gxcontext.NewValuesContext(nil)
s.grNum = 0
s.SetWriteDeadline(netIOTimeout)
......@@ -295,25 +313,25 @@ func (s *session) SetWaitTime(waitTime time.Duration) {
}
// set attribute of key @session:key
func (s *session) GetAttribute(key string) interface{} {
func (s *session) GetAttribute(key interface{}) interface{} {
var ret interface{}
s.lock.RLock()
ret = s.attrs[key]
ret = s.attrs.Get(key)
s.lock.RUnlock()
return ret
}
// get attribute of key @session:key
func (s *session) SetAttribute(key string, value interface{}) {
func (s *session) SetAttribute(key interface{}, value interface{}) {
s.lock.Lock()
s.attrs[key] = value
s.attrs.Set(key, value)
s.lock.Unlock()
}
// delete attribute of key @session:key
func (s *session) RemoveAttribute(key string) {
func (s *session) RemoveAttribute(key interface{}) {
s.lock.Lock()
delete(s.attrs, key)
s.attrs.Delete(key)
s.lock.Unlock()
}
......@@ -362,7 +380,8 @@ func (s *session) WriteBytes(pkg []byte) error {
}
// s.conn.SetWriteDeadline(time.Now().Add(s.wDeadline))
return s.Connection.Write(pkg)
_, err := s.Connection.Write(pkg)
return err
}
// Write multiple packages at once
......@@ -495,7 +514,7 @@ LOOP:
case outPkg = <-s.wQ:
if flag {
if err = s.writer.Write(s, outPkg); err != nil {
log.Error("%s, [session.handleLoop] = error{%+v}", s.sessionToken(), err)
log.Error("%s, [session.handleLoop] = error{%#v}", s.sessionToken(), err)
s.stop()
flag = false
// break LOOP
......@@ -557,6 +576,8 @@ func (s *session) handlePackage() {
err = s.handleTCPPackage()
} else if _, ok := s.Connection.(*gettyWSConn); ok {
err = s.handleWSPackage()
} else if _, ok := s.Connection.(*gettyUDPConn); ok {
err = s.handleUDPPackage()
}
}
......@@ -593,7 +614,7 @@ func (s *session) handleTCPPackage() error {
if nerr, ok = err.(net.Error); ok && nerr.Timeout() {
break
}
log.Error("%s, [session.conn.read] = error{%v}", s.sessionToken(), err)
log.Error("%s, [session.conn.read] = error{%#v}", s.sessionToken(), err)
// for (Codec)OnErr
// s.errFlag = true
exit = true
......@@ -638,6 +659,53 @@ func (s *session) handleTCPPackage() error {
return err
}
// get package from udp packet
func (s *session) handleUDPPackage() error {
var (
ok bool
err error
nerr net.Error
conn *gettyUDPConn
bufLen int
buf []byte
addr *net.UDPAddr
)
buf = make([]byte, s.maxMsgLen)
conn = s.Connection.(*gettyUDPConn)
bufLen = int(s.maxMsgLen + maxReadBufLen)
if int(s.maxMsgLen<<1) < bufLen {
bufLen = int(s.maxMsgLen << 1)
}
buf = make([]byte, bufLen)
for {
if s.IsClosed() {
break
}
bufLen, addr, err = conn.read(buf)
if nerr, ok = err.(net.Error); ok && nerr.Timeout() {
continue
}
if err != nil {
log.Warn("%s, [session.handleUDPPackage] = len{%d}, error{%+v}", s.sessionToken(), bufLen, err)
continue
}
if err == nil && s.maxMsgLen > 0 && bufLen > int(s.maxMsgLen) {
err = ErrMsgTooLong
}
if err != nil {
log.Warn("%s, [session.handleUDPPackage] = len{%d}, error{%+v}", s.sessionToken(), bufLen, err)
continue
}
s.UpdateActive()
s.rQ <- UDPContext{Pkg: buf[:bufLen], PeerAddr: addr}
}
return err
}
// get package from websocket stream
func (s *session) handleWSPackage() error {
var (
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment