Commit d1e123b3 authored by AlexStocks's avatar AlexStocks

delete peerAddr of UDPSession

parent 94f42a29
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
* add end point type * add end point type
* add ClientOptions & Client * add ClientOptions & Client
* add ServerOptions & Server * add ServerOptions & Server
* delete peerAddr of UDPSession
* version 0.8.2 * version 0.8.2
- 2018/03/16 - 2018/03/16
......
...@@ -12,6 +12,7 @@ package getty ...@@ -12,6 +12,7 @@ package getty
import ( import (
"crypto/tls" "crypto/tls"
"crypto/x509" "crypto/x509"
"encoding/pem"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net" "net"
...@@ -21,7 +22,6 @@ import ( ...@@ -21,7 +22,6 @@ import (
) )
import ( import (
"encoding/pem"
"github.com/AlexStocks/goext/sync" "github.com/AlexStocks/goext/sync"
log "github.com/AlexStocks/log4go" log "github.com/AlexStocks/log4go"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
...@@ -58,16 +58,15 @@ func (c *client) init(opts ...ClientOption) { ...@@ -58,16 +58,15 @@ func (c *client) init(opts ...ClientOption) {
} }
} }
// NewTcpClient function builds a tcp client. func newClient(t EndPointType, opts ...ClientOption) *client {
func NewTCPClient(opts ...ClientOption) Client {
c := &client{ c := &client{
endPointType: TCP_CLIENT, endPointType: t,
done: make(chan gxsync.Empty), done: make(chan gxsync.Empty),
} }
c.init(opts...) c.init(opts...)
if c.number <= 0 || c.addr == "" { if t != UNCONNECTED_UDP_CLIENT && c.number <= 0 || c.addr == "" {
panic(fmt.Sprintf("@connNum:%d, @serverAddr:%s", c.number, c.addr)) panic(fmt.Sprintf("@connNum:%d, @serverAddr:%s", c.number, c.addr))
} }
...@@ -76,15 +75,15 @@ func NewTCPClient(opts ...ClientOption) Client { ...@@ -76,15 +75,15 @@ func NewTCPClient(opts ...ClientOption) Client {
return c return c
} }
// NewTcpClient function builds a tcp client.
func NewTCPClient(opts ...ClientOption) Client {
return newClient(TCP_CLIENT, opts...)
}
// NewUdpClient function builds a udp client // NewUdpClient function builds a udp client
func NewUDPClient(opts ...ClientOption) Client { func NewUDPClient(opts ...ClientOption) Client {
c := &client{ c := newClient(UNCONNECTED_UDP_CLIENT, opts...)
done: make(chan gxsync.Empty),
}
c.init(opts...)
c.endPointType = UNCONNECTED_UDP_CLIENT
if len(c.addr) != 0 { if len(c.addr) != 0 {
if c.number <= 0 { if c.number <= 0 {
panic(fmt.Sprintf("getty will build a preconected connection by @serverAddr:%s while @connNum is %d", panic(fmt.Sprintf("getty will build a preconected connection by @serverAddr:%s while @connNum is %d",
...@@ -93,50 +92,32 @@ func NewUDPClient(opts ...ClientOption) Client { ...@@ -93,50 +92,32 @@ func NewUDPClient(opts ...ClientOption) Client {
c.endPointType = CONNECTED_UDP_CLIENT c.endPointType = CONNECTED_UDP_CLIENT
} }
c.ssMap = make(map[Session]gxsync.Empty, c.number)
return c return c
} }
// NewWsClient function builds a ws client. // NewWsClient function builds a ws client.
func NewWSClient(opts ...ClientOption) Client { func NewWSClient(opts ...ClientOption) Client {
c := &client{ c := newClient(WS_CLIENT, opts...)
endPointType: WS_CLIENT,
done: make(chan gxsync.Empty),
}
c.init(opts...)
if c.number <= 0 || c.addr == "" {
panic(fmt.Sprintf("@connNum:%d, @serverAddr:%s", c.number, c.addr))
}
if !strings.HasPrefix(c.addr, "ws://") { if !strings.HasPrefix(c.addr, "ws://") {
panic(fmt.Sprintf("the prefix @serverAddr:%s is not ws://", c.addr)) panic(fmt.Sprintf("the prefix @serverAddr:%s is not ws://", c.addr))
} }
c.ssMap = make(map[Session]gxsync.Empty, c.number)
return c return c
} }
// NewWSSClient function builds a wss client. // NewWSSClient function builds a wss client.
func NewWSSClient(opts ...ClientOption) Client { func NewWSSClient(opts ...ClientOption) Client {
c := &client{ c := newClient(WSS_CLIENT, opts...)
endPointType: WSS_CLIENT,
done: make(chan gxsync.Empty),
}
c.init(opts...) if c.cert == "" {
panic(fmt.Sprintf("@cert:%s", c.cert))
if c.number <= 0 || c.addr == "" || c.cert == "" {
panic(fmt.Sprintf("@connNum:%d, @serverAddr:%s, @cert:%s", c.number, c.addr, c.cert))
} }
if !strings.HasPrefix(c.addr, "wss://") { if !strings.HasPrefix(c.addr, "wss://") {
panic(fmt.Sprintf("the prefix @serverAddr:%s is not wss://", c.addr)) panic(fmt.Sprintf("the prefix @serverAddr:%s is not wss://", c.addr))
} }
c.ssMap = make(map[Session]gxsync.Empty, c.number)
return c return c
} }
...@@ -159,7 +140,7 @@ func (c *client) dialTCP() Session { ...@@ -159,7 +140,7 @@ func (c *client) dialTCP() Session {
err = errSelfConnect err = errSelfConnect
} }
if err == nil { if err == nil {
return NewTCPSession(conn) return newTCPSession(conn, c.endPointType)
} }
log.Info("net.DialTimeout(addr:%s, timeout:%v) = error{%v}", c.addr, err) log.Info("net.DialTimeout(addr:%s, timeout:%v) = error{%v}", c.addr, err)
...@@ -188,10 +169,9 @@ func (c *client) dialUDP() Session { ...@@ -188,10 +169,9 @@ func (c *client) dialUDP() Session {
if err == nil && conn.LocalAddr().String() == conn.RemoteAddr().String() { if err == nil && conn.LocalAddr().String() == conn.RemoteAddr().String() {
err = errSelfConnect err = errSelfConnect
} }
peerAddr = nil // for connected session
} }
if err == nil { if err == nil {
return NewUDPSession(conn, peerAddr) return newUDPSession(conn, c.endPointType)
} }
log.Info("net.DialTimeout(addr:%s, timeout:%v) = error{%v}", c.addr, err) log.Info("net.DialTimeout(addr:%s, timeout:%v) = error{%v}", c.addr, err)
...@@ -218,7 +198,7 @@ func (c *client) dialWS() Session { ...@@ -218,7 +198,7 @@ func (c *client) dialWS() Session {
err = errSelfConnect err = errSelfConnect
} }
if err == nil { if err == nil {
ss = NewWSSession(conn) ss = newWSSession(conn, c.endPointType)
if ss.(*session).maxMsgLen > 0 { if ss.(*session).maxMsgLen > 0 {
conn.SetReadLimit(int64(ss.(*session).maxMsgLen)) conn.SetReadLimit(int64(ss.(*session).maxMsgLen))
} }
...@@ -294,7 +274,7 @@ func (c *client) dialWSS() Session { ...@@ -294,7 +274,7 @@ func (c *client) dialWSS() Session {
err = errSelfConnect err = errSelfConnect
} }
if err == nil { if err == nil {
ss = NewWSSession(conn) ss = newWSSession(conn, c.endPointType)
if ss.(*session).maxMsgLen > 0 { if ss.(*session).maxMsgLen > 0 {
conn.SetReadLimit(int64(ss.(*session).maxMsgLen)) conn.SetReadLimit(int64(ss.(*session).maxMsgLen))
} }
......
...@@ -20,7 +20,6 @@ import ( ...@@ -20,7 +20,6 @@ import (
) )
import ( import (
"github.com/AlexStocks/goext/net"
log "github.com/AlexStocks/log4go" log "github.com/AlexStocks/log4go"
"github.com/golang/snappy" "github.com/golang/snappy"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
...@@ -58,6 +57,7 @@ type gettyConn struct { ...@@ -58,6 +57,7 @@ type gettyConn struct {
wLastDeadline time.Time // lastest network write time wLastDeadline time.Time // lastest network write time
local string // local address local string // local address
peer string // peer address peer string // peer address
ss Session
} }
func (c *gettyConn) ID() uint32 { func (c *gettyConn) ID() uint32 {
...@@ -98,6 +98,10 @@ func (c gettyConn) readTimeout() time.Duration { ...@@ -98,6 +98,10 @@ func (c gettyConn) readTimeout() time.Duration {
return c.rTimeout return c.rTimeout
} }
func (c *gettyConn) setSession(ss Session) {
c.ss = ss
}
// Pls do not set read deadline for websocket connection. AlexStocks 20180310 // Pls do not set read deadline for websocket connection. AlexStocks 20180310
// gorilla/websocket/conn.go:NextReader will always fail when got a timeout error. // gorilla/websocket/conn.go:NextReader will always fail when got a timeout error.
// //
...@@ -307,7 +311,6 @@ type UDPContext struct { ...@@ -307,7 +311,6 @@ type UDPContext struct {
type gettyUDPConn struct { type gettyUDPConn struct {
gettyConn gettyConn
peerAddr *net.UDPAddr // for client
compressType CompressType compressType CompressType
conn *net.UDPConn // for server conn *net.UDPConn // for server
} }
...@@ -324,7 +327,7 @@ func setUDPSocketOptions(conn *net.UDPConn) error { ...@@ -324,7 +327,7 @@ func setUDPSocketOptions(conn *net.UDPConn) error {
} }
// create gettyUDPConn // create gettyUDPConn
func newGettyUDPConn(conn *net.UDPConn, peerUDPAddr *net.UDPAddr) *gettyUDPConn { func newGettyUDPConn(conn *net.UDPConn) *gettyUDPConn {
if conn == nil { if conn == nil {
panic("newGettyUDPConn(conn):@conn is nil") panic("newGettyUDPConn(conn):@conn is nil")
} }
...@@ -337,14 +340,10 @@ func newGettyUDPConn(conn *net.UDPConn, peerUDPAddr *net.UDPAddr) *gettyUDPConn ...@@ -337,14 +340,10 @@ func newGettyUDPConn(conn *net.UDPConn, peerUDPAddr *net.UDPAddr) *gettyUDPConn
if conn.RemoteAddr() != nil { if conn.RemoteAddr() != nil {
// connected udp // connected udp
peerAddr = conn.RemoteAddr().String() peerAddr = conn.RemoteAddr().String()
} else if peerUDPAddr != nil {
// unconnected udp
peerAddr = peerUDPAddr.String()
} }
return &gettyUDPConn{ return &gettyUDPConn{
conn: conn, conn: conn,
peerAddr: peerUDPAddr,
gettyConn: gettyConn{ gettyConn: gettyConn{
id: atomic.AddUint32(&connID, 1), id: atomic.AddUint32(&connID, 1),
rTimeout: netIOTimeout, rTimeout: netIOTimeout,
...@@ -388,11 +387,10 @@ func (u *gettyUDPConn) read(p []byte) (int, *net.UDPAddr, error) { ...@@ -388,11 +387,10 @@ func (u *gettyUDPConn) read(p []byte) (int, *net.UDPAddr, error) {
} }
} }
if u.peerAddr == nil { if u.ss.Type() == CONNECTED_UDP_CLIENT {
length, addr, err = u.conn.ReadFromUDP(p)
} else {
length, err = u.conn.Read(p) length, err = u.conn.Read(p)
addr = u.peerAddr } else {
length, addr, err = u.conn.ReadFromUDP(p)
} }
log.Debug("now:%s, length:%d, err:%#v", currentTime, length, err) log.Debug("now:%s, length:%d, err:%#v", currentTime, length, err)
if err == nil { if err == nil {
...@@ -420,6 +418,12 @@ func (u *gettyUDPConn) Write(udpCtx interface{}) (int, error) { ...@@ -420,6 +418,12 @@ func (u *gettyUDPConn) Write(udpCtx interface{}) (int, error) {
if buf, ok = ctx.Pkg.([]byte); !ok { if buf, ok = ctx.Pkg.([]byte); !ok {
return 0, fmt.Errorf("illegal @udpCtx.Pkg{%#v} type", udpCtx) return 0, fmt.Errorf("illegal @udpCtx.Pkg{%#v} type", udpCtx)
} }
if u.ss.Type() == UDP_SERVER || u.ss.Type() == UNCONNECTED_UDP_CLIENT {
peerAddr = ctx.PeerAddr
if peerAddr == nil {
return 0, ErrNullPeerAddr
}
}
if u.wTimeout > 0 { if u.wTimeout > 0 {
// Optimization: update write deadline only if more than 25% // Optimization: update write deadline only if more than 25%
...@@ -435,18 +439,9 @@ func (u *gettyUDPConn) Write(udpCtx interface{}) (int, error) { ...@@ -435,18 +439,9 @@ func (u *gettyUDPConn) Write(udpCtx interface{}) (int, error) {
} }
atomic.AddUint32(&u.writeCount, (uint32)(len(buf))) atomic.AddUint32(&u.writeCount, (uint32)(len(buf)))
peerAddr = ctx.PeerAddr
if u.peerAddr != nil && peerAddr != nil {
if !gxnet.IsUDPAddrEqual(peerAddr, u.peerAddr) {
return 0, fmt.Errorf("use of peerAddr %s different from preconnected udp connection address %s",
peerAddr, u.peerAddr)
}
peerAddr = nil
}
peerAddr = nil
length, _, err = u.conn.WriteMsgUDP(buf, nil, peerAddr) length, _, err = u.conn.WriteMsgUDP(buf, nil, peerAddr)
log.Debug("now:%s, length:%d, err:%s, peerAddr:%s, session.peer:%s", currentTime, length, err, peerAddr, u.peerAddr) log.Debug("now:%s, length:%d, err:%s, peerAddr:%s", currentTime, length, err, peerAddr)
return length, err return length, err
} }
......
...@@ -104,6 +104,8 @@ type Connection interface { ...@@ -104,6 +104,8 @@ type Connection interface {
// don't distinguish between tcp connection and websocket connection. Because // don't distinguish between tcp connection and websocket connection. Because
// gorilla/websocket/conn.go:(Conn)Close also invoke net.Conn.Close // gorilla/websocket/conn.go:(Conn)Close also invoke net.Conn.Close
close(int) close(int)
// set related session
setSession(Session)
} }
///////////////////////////////////////// /////////////////////////////////////////
...@@ -114,6 +116,7 @@ var ( ...@@ -114,6 +116,7 @@ var (
ErrSessionClosed = errors.New("session Already Closed") ErrSessionClosed = errors.New("session Already Closed")
ErrSessionBlocked = errors.New("session Full Blocked") ErrSessionBlocked = errors.New("session Full Blocked")
ErrMsgTooLong = errors.New("Message Too Long") ErrMsgTooLong = errors.New("Message Too Long")
ErrNullPeerAddr = errors.New("peer address is nil")
) )
type Session interface { type Session interface {
...@@ -122,6 +125,8 @@ type Session interface { ...@@ -122,6 +125,8 @@ type Session interface {
Conn() net.Conn Conn() net.Conn
Stat() string Stat() string
IsClosed() bool IsClosed() bool
// get endpoint type
Type() EndPointType
SetMaxMsgLen(int) SetMaxMsgLen(int)
SetName(string) SetName(string)
......
...@@ -181,9 +181,9 @@ func (s *server) listenUDP() error { ...@@ -181,9 +181,9 @@ func (s *server) listenUDP() error {
if err != nil { if err != nil {
return errors.Wrapf(err, "net.ListenUDP((udp, localAddr:%#v)", localAddr) return errors.Wrapf(err, "net.ListenUDP((udp, localAddr:%#v)", localAddr)
} }
//if err = setUDPSocketOptions(pktListener); err != nil { // if err = setUDPSocketOptions(pktListener); err != nil {
// return errors.Wrapf(err, "setUDPSocketOptions(pktListener:%#v)", pktListener) // return errors.Wrapf(err, "setUDPSocketOptions(pktListener:%#v)", pktListener)
//} // }
s.pktListener = pktListener s.pktListener = pktListener
...@@ -212,7 +212,7 @@ func (s *server) accept(newSession NewSessionCallback) (Session, error) { ...@@ -212,7 +212,7 @@ func (s *server) accept(newSession NewSessionCallback) (Session, error) {
return nil, errSelfConnect return nil, errSelfConnect
} }
ss := NewTCPSession(conn) ss := newTCPSession(conn, s.endPointType)
err = newSession(ss) err = newSession(ss)
if err != nil { if err != nil {
conn.Close() conn.Close()
...@@ -267,7 +267,7 @@ func (s *server) runUDPEventLoop(newSession NewSessionCallback) { ...@@ -267,7 +267,7 @@ func (s *server) runUDPEventLoop(newSession NewSessionCallback) {
ss Session ss Session
) )
ss = NewUDPSession(s.pktListener.(*net.UDPConn), nil) ss = newUDPSession(s.pktListener.(*net.UDPConn), s.endPointType)
if err := newSession(ss); err != nil { if err := newSession(ss); err != nil {
panic(err.Error()) panic(err.Error())
} }
...@@ -317,7 +317,7 @@ func (s *wsHandler) serveWSRequest(w http.ResponseWriter, r *http.Request) { ...@@ -317,7 +317,7 @@ func (s *wsHandler) serveWSRequest(w http.ResponseWriter, r *http.Request) {
return return
} }
// conn.SetReadLimit(int64(handler.maxMsgLen)) // conn.SetReadLimit(int64(handler.maxMsgLen))
ss := NewWSSession(conn) ss := newWSSession(conn, s.server.endPointType)
err = s.newSession(ss) err = s.newSession(ss)
if err != nil { if err != nil {
conn.Close() conn.Close()
......
...@@ -52,6 +52,7 @@ var ( ...@@ -52,6 +52,7 @@ var (
// getty base session // getty base session
type session struct { type session struct {
name string name string
endPointType EndPointType
maxMsgLen int32 maxMsgLen int32
// net read Write // net read Write
Connection Connection
...@@ -75,66 +76,45 @@ type session struct { ...@@ -75,66 +76,45 @@ type session struct {
lock sync.RWMutex lock sync.RWMutex
} }
func NewSession() Session { func newSession(endPointType EndPointType, conn Connection) *session {
session := &session{ session := &session{
name: defaultSessionName, name: defaultSessionName,
endPointType: endPointType,
maxMsgLen: maxReadBufLen,
Connection: conn,
done: make(chan gxsync.Empty), done: make(chan gxsync.Empty),
period: period, period: period,
wait: pendingDuration, wait: pendingDuration,
attrs: gxcontext.NewValuesContext(nil), attrs: gxcontext.NewValuesContext(nil),
} }
session.Connection.setSession(session)
session.SetWriteTimeout(netIOTimeout) session.SetWriteTimeout(netIOTimeout)
session.SetReadTimeout(netIOTimeout) session.SetReadTimeout(netIOTimeout)
return session return session
} }
func NewTCPSession(conn net.Conn) Session { func newTCPSession(conn net.Conn, endPointType EndPointType) Session {
session := &session{ c := newGettyTCPConn(conn)
name: defaultTCPSessionName, session := newSession(endPointType, c)
Connection: newGettyTCPConn(conn), session.name = defaultTCPSessionName
done: make(chan gxsync.Empty),
period: period,
wait: pendingDuration,
attrs: gxcontext.NewValuesContext(nil),
}
session.SetWriteTimeout(netIOTimeout)
session.SetReadTimeout(netIOTimeout)
return session return session
} }
func NewUDPSession(conn *net.UDPConn, peerAddr *net.UDPAddr) Session { func newUDPSession(conn *net.UDPConn, endPointType EndPointType) Session {
session := &session{ c := newGettyUDPConn(conn)
name: defaultUDPSessionName, session := newSession(endPointType, c)
maxMsgLen: maxReadBufLen, session.name = defaultUDPSessionName
Connection: newGettyUDPConn(conn, peerAddr),
done: make(chan gxsync.Empty),
period: period,
wait: pendingDuration,
attrs: gxcontext.NewValuesContext(nil),
}
session.SetWriteTimeout(netIOTimeout)
session.SetReadTimeout(netIOTimeout)
return session return session
} }
func NewWSSession(conn *websocket.Conn) Session { func newWSSession(conn *websocket.Conn, endPointType EndPointType) Session {
session := &session{ c := newGettyWSConn(conn)
name: defaultWSSessionName, session := newSession(endPointType, c)
Connection: newGettyWSConn(conn), session.name = defaultWSSessionName
done: make(chan gxsync.Empty),
period: period,
wait: pendingDuration,
attrs: gxcontext.NewValuesContext(nil),
}
session.SetWriteTimeout(netIOTimeout)
session.SetReadTimeout(netIOTimeout)
return session return session
} }
...@@ -170,6 +150,10 @@ func (s *session) Conn() net.Conn { ...@@ -170,6 +150,10 @@ func (s *session) Conn() net.Conn {
return nil return nil
} }
func (s *session) Type() EndPointType {
return s.endPointType
}
func (s *session) gettyConn() *gettyConn { func (s *session) gettyConn() *gettyConn {
if tc, ok := s.Connection.(*gettyTCPConn); ok { if tc, ok := s.Connection.(*gettyTCPConn); ok {
return &(tc.gettyConn) return &(tc.gettyConn)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment