Commit ada69206 authored by dongjianhui03's avatar dongjianhui03

rft(*): reorganize code with proper file name

parent c8ebbd87
...@@ -31,6 +31,7 @@ import ( ...@@ -31,6 +31,7 @@ import (
import ( import (
"github.com/dubbogo/gost/sync" "github.com/dubbogo/gost/sync"
"github.com/montanaflynn/stats" "github.com/montanaflynn/stats"
) )
......
...@@ -35,7 +35,9 @@ import ( ...@@ -35,7 +35,9 @@ import (
"github.com/dubbogo/gost/net" "github.com/dubbogo/gost/net"
gxsync "github.com/dubbogo/gost/sync" gxsync "github.com/dubbogo/gost/sync"
gxtime "github.com/dubbogo/gost/time" gxtime "github.com/dubbogo/gost/time"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
perrors "github.com/pkg/errors" perrors "github.com/pkg/errors"
) )
...@@ -49,13 +51,13 @@ const ( ...@@ -49,13 +51,13 @@ const (
var ( var (
sessionClientKey = "session-client-owner" sessionClientKey = "session-client-owner"
connectPingPackage = []byte("connect-ping") connectPingPackage = []byte("connect-ping")
)
///////////////////////////////////////// clientID = EndPointID(0)
// getty tcp client )
/////////////////////////////////////////
var clientID = EndPointID(0) type Client interface {
EndPoint
}
type client struct { type client struct {
ClientOptions ClientOptions
......
...@@ -29,19 +29,49 @@ import ( ...@@ -29,19 +29,49 @@ import (
import ( import (
"github.com/golang/snappy" "github.com/golang/snappy"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
perrors "github.com/pkg/errors" perrors "github.com/pkg/errors"
uatomic "go.uber.org/atomic" uatomic "go.uber.org/atomic"
) )
var launchTime = time.Now() var (
launchTime = time.Now()
connID uatomic.Uint32
)
// Connection wrap some connection params and operations
type Connection interface {
ID() uint32
SetCompressType(CompressType)
LocalAddr() string
RemoteAddr() string
incReadPkgNum()
incWritePkgNum()
// UpdateActive update session's active time
UpdateActive()
// GetActive get session's active time
GetActive() time.Time
readTimeout() time.Duration
// SetReadTimeout sets deadline for the future read calls.
SetReadTimeout(time.Duration)
writeTimeout() time.Duration
// SetWriteTimeout sets deadline for the future read calls.
SetWriteTimeout(time.Duration)
send(interface{}) (int, error)
// don't distinguish between tcp connection and websocket connection. Because
// gorilla/websocket/conn.go:(Conn)Close also invoke net.Conn.Close
close(int)
// set related session
setSession(Session)
}
// /////////////////////////////////////// // ///////////////////////////////////////
// getty connection // getty connection
// /////////////////////////////////////// // ///////////////////////////////////////
var connID uatomic.Uint32
type gettyConn struct { type gettyConn struct {
id uint32 id uint32
compress CompressType compress CompressType
...@@ -103,7 +133,7 @@ func (c *gettyConn) setSession(ss Session) { ...@@ -103,7 +133,7 @@ func (c *gettyConn) setSession(ss Session) {
c.ss = ss c.ss = ss
} }
// Pls do not set read deadline for websocket connection. AlexStocks 20180310 // SetReadTimeout Pls do not set read deadline for websocket connection. AlexStocks 20180310
// gorilla/websocket/conn.go:NextReader will always fail when got a timeout error. // gorilla/websocket/conn.go:NextReader will always fail when got a timeout error.
// //
// Pls do not set read deadline when using compression. AlexStocks 20180314. // Pls do not set read deadline when using compression. AlexStocks 20180314.
...@@ -154,7 +184,7 @@ func newGettyTCPConn(conn net.Conn) *gettyTCPConn { ...@@ -154,7 +184,7 @@ func newGettyTCPConn(conn net.Conn) *gettyTCPConn {
panic("newGettyTCPConn(conn):@conn is nil") panic("newGettyTCPConn(conn):@conn is nil")
} }
var localAddr, peerAddr string var localAddr, peerAddr string
// check conn.LocalAddr or conn.RemoetAddr is nil to defeat panic on 2016/09/27 // check conn.LocalAddr or conn.RemoteAddr is nil to defeat panic on 2016/09/27
if conn.LocalAddr() != nil { if conn.LocalAddr() != nil {
localAddr = conn.LocalAddr().String() localAddr = conn.LocalAddr().String()
} }
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package getty package getty
import ( import (
"compress/flate"
"strconv" "strconv"
) )
...@@ -67,3 +68,14 @@ func (x EndPointType) String() string { ...@@ -67,3 +68,14 @@ func (x EndPointType) String() string {
return strconv.Itoa(int(x)) return strconv.Itoa(int(x))
} }
type CompressType int
const (
CompressNone CompressType = flate.NoCompression // 0
CompressZip = flate.DefaultCompression // -1
CompressBestSpeed = flate.BestSpeed // 1
CompressBestCompression = flate.BestCompression // 9
CompressHuffman = flate.HuffmanOnly // -2
CompressSnappy = 10
)
...@@ -18,16 +18,17 @@ ...@@ -18,16 +18,17 @@
package getty package getty
import ( import (
"compress/flate"
"net"
"time"
)
import (
gxsync "github.com/dubbogo/gost/sync" gxsync "github.com/dubbogo/gost/sync"
perrors "github.com/pkg/errors" perrors "github.com/pkg/errors"
) )
var (
ErrSessionClosed = perrors.New("session Already Closed")
ErrSessionBlocked = perrors.New("session Full Blocked")
ErrNullPeerAddr = perrors.New("peer address is nil")
)
// NewSessionCallback will be invoked when server accepts a new client connection or client connects to server successfully. // NewSessionCallback will be invoked when server accepts a new client connection or client connects to server successfully.
// If there are too many client connections or u do not want to connect a server again, u can return non-nil error. And // If there are too many client connections or u do not want to connect a server again, u can return non-nil error. And
// then getty will close the new session. // then getty will close the new session.
...@@ -92,91 +93,7 @@ type EventListener interface { ...@@ -92,91 +93,7 @@ type EventListener interface {
OnMessage(Session, interface{}) OnMessage(Session, interface{})
} }
type CompressType int // EndPoint represents the identity of the client/server
const (
CompressNone CompressType = flate.NoCompression // 0
CompressZip = flate.DefaultCompression // -1
CompressBestSpeed = flate.BestSpeed // 1
CompressBestCompression = flate.BestCompression // 9
CompressHuffman = flate.HuffmanOnly // -2
CompressSnappy = 10
)
// Connection wrap some connection params and operations
type Connection interface {
ID() uint32
SetCompressType(CompressType)
LocalAddr() string
RemoteAddr() string
incReadPkgNum()
incWritePkgNum()
// UpdateActive update session's active time
UpdateActive()
// GetActive get session's active time
GetActive() time.Time
readTimeout() time.Duration
// SetReadTimeout sets deadline for the future read calls.
SetReadTimeout(time.Duration)
writeTimeout() time.Duration
// SetWriteTimeout sets deadline for the future read calls.
SetWriteTimeout(time.Duration)
send(interface{}) (int, error)
// don't distinguish between tcp connection and websocket connection. Because
// gorilla/websocket/conn.go:(Conn)Close also invoke net.Conn.Close
close(int)
// set related session
setSession(Session)
}
/////////////////////////////////////////
// Session
/////////////////////////////////////////
var (
ErrSessionClosed = perrors.New("session Already Closed")
ErrSessionBlocked = perrors.New("session Full Blocked")
ErrNullPeerAddr = perrors.New("peer address is nil")
)
type Session interface {
Connection
Reset()
Conn() net.Conn
Stat() string
IsClosed() bool
// EndPoint get endpoint type
EndPoint() EndPoint
SetMaxMsgLen(int)
SetName(string)
SetEventListener(EventListener)
SetPkgHandler(ReadWriter)
SetReader(Reader)
SetWriter(Writer)
SetCronPeriod(int)
SetWaitTime(time.Duration)
GetAttribute(interface{}) interface{}
SetAttribute(interface{}, interface{})
RemoveAttribute(interface{})
// WritePkg the Writer will invoke this function. Pls attention that if timeout is less than 0, WritePkg will send @pkg asap.
// for udp session, the first parameter should be UDPContext.
// totalBytesLength: @pkg stream bytes length after encoding @pkg.
// sendBytesLength: stream bytes length that sent out successfully.
// err: maybe it has illegal data, encoding error, or write out system error.
WritePkg(pkg interface{}, timeout time.Duration) (totalBytesLength int, sendBytesLength int, err error)
WriteBytes([]byte) (int, error)
WriteBytesArray(...[]byte) (int, error)
Close()
}
/////////////////////////////////////////
// EndPoint
/////////////////////////////////////////
type EndPoint interface { type EndPoint interface {
// ID get EndPoint ID // ID get EndPoint ID
ID() EndPointID ID() EndPointID
...@@ -191,26 +108,3 @@ type EndPoint interface { ...@@ -191,26 +108,3 @@ type EndPoint interface {
// GetTaskPool get task pool implemented by dubbogo/gost // GetTaskPool get task pool implemented by dubbogo/gost
GetTaskPool() gxsync.GenericTaskPool GetTaskPool() gxsync.GenericTaskPool
} }
type Client interface {
EndPoint
}
// Server interface
type Server interface {
EndPoint
}
// StreamServer is like tcp/websocket/wss server
type StreamServer interface {
Server
// Listener get the network listener
Listener() net.Listener
}
// PacketServer is like udp listen endpoint
type PacketServer interface {
Server
// PacketConn get the network listener
PacketConn() net.PacketConn
}
...@@ -34,8 +34,11 @@ import ( ...@@ -34,8 +34,11 @@ import (
gxnet "github.com/dubbogo/gost/net" gxnet "github.com/dubbogo/gost/net"
gxsync "github.com/dubbogo/gost/sync" gxsync "github.com/dubbogo/gost/sync"
gxtime "github.com/dubbogo/gost/time" gxtime "github.com/dubbogo/gost/time"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
perrors "github.com/pkg/errors" perrors "github.com/pkg/errors"
uatomic "go.uber.org/atomic" uatomic "go.uber.org/atomic"
) )
...@@ -46,6 +49,25 @@ var ( ...@@ -46,6 +49,25 @@ var (
serverID uatomic.Int32 serverID uatomic.Int32
) )
// Server interface
type Server interface {
EndPoint
}
// StreamServer is like tcp/websocket/wss server
type StreamServer interface {
Server
// Listener get the network listener
Listener() net.Listener
}
// PacketServer is like udp listen endpoint
type PacketServer interface {
Server
// PacketConn get the network listener
PacketConn() net.PacketConn
}
type server struct { type server struct {
ServerOptions ServerOptions
......
...@@ -32,8 +32,11 @@ import ( ...@@ -32,8 +32,11 @@ import (
gxbytes "github.com/dubbogo/gost/bytes" gxbytes "github.com/dubbogo/gost/bytes"
gxcontext "github.com/dubbogo/gost/context" gxcontext "github.com/dubbogo/gost/context"
gxtime "github.com/dubbogo/gost/time" gxtime "github.com/dubbogo/gost/time"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
perrors "github.com/pkg/errors" perrors "github.com/pkg/errors"
uatomic "go.uber.org/atomic" uatomic "go.uber.org/atomic"
) )
...@@ -60,6 +63,41 @@ func init() { ...@@ -60,6 +63,41 @@ func init() {
defaultTimerWheel = gxtime.GetDefaultTimerWheel() defaultTimerWheel = gxtime.GetDefaultTimerWheel()
} }
// Session wrap connection between the server and the client
type Session interface {
Connection
Reset()
Conn() net.Conn
Stat() string
IsClosed() bool
// EndPoint get endpoint type
EndPoint() EndPoint
SetMaxMsgLen(int)
SetName(string)
SetEventListener(EventListener)
SetPkgHandler(ReadWriter)
SetReader(Reader)
SetWriter(Writer)
SetCronPeriod(int)
SetWaitTime(time.Duration)
GetAttribute(interface{}) interface{}
SetAttribute(interface{}, interface{})
RemoveAttribute(interface{})
// WritePkg the Writer will invoke this function. Pls attention that if timeout is less than 0, WritePkg will send @pkg asap.
// for udp session, the first parameter should be UDPContext.
// totalBytesLength: @pkg stream bytes length after encoding @pkg.
// sendBytesLength: stream bytes length that sent out successfully.
// err: maybe it has illegal data, encoding error, or write out system error.
WritePkg(pkg interface{}, timeout time.Duration) (totalBytesLength int, sendBytesLength int, err error)
WriteBytes([]byte) (int, error)
WriteBytesArray(...[]byte) (int, error)
Close()
}
// getty base session // getty base session
type session struct { type session struct {
name string name string
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment