Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in
Toggle navigation
G
getty
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
wei.xuan
getty
Commits
c8ebbd87
Commit
c8ebbd87
authored
Aug 07, 2021
by
dongjianhui03
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
format comments & function name
parent
1f33b205
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
60 additions
and
76 deletions
+60
-76
client.go
client.go
+3
-6
connection.go
connection.go
+3
-3
getty.go
getty.go
+20
-26
logger.go
logger.go
+3
-3
options.go
options.go
+15
-15
server.go
server.go
+3
-3
session.go
session.go
+13
-20
No files found.
client.go
View file @
c8ebbd87
...
...
@@ -99,17 +99,17 @@ func newClient(t EndPointType, opts ...ClientOption) *client {
return
c
}
// NewT
cpClient function
builds a tcp client.
// NewT
CPClient
builds a tcp client.
func
NewTCPClient
(
opts
...
ClientOption
)
Client
{
return
newClient
(
TCP_CLIENT
,
opts
...
)
}
// NewU
dpClient function
builds a connected udp client
// NewU
DPClient
builds a connected udp client
func
NewUDPClient
(
opts
...
ClientOption
)
Client
{
return
newClient
(
UDP_CLIENT
,
opts
...
)
}
// NewW
sClient function
builds a ws client.
// NewW
SClient
builds a ws client.
func
NewWSClient
(
opts
...
ClientOption
)
Client
{
c
:=
newClient
(
WS_CLIENT
,
opts
...
)
...
...
@@ -184,7 +184,6 @@ func (c *client) dialUDP() Session {
buf
[]
byte
)
// buf = make([]byte, 128)
bufp
=
gxbytes
.
GetBytes
(
128
)
defer
gxbytes
.
PutBytes
(
bufp
)
buf
=
*
bufp
...
...
@@ -224,9 +223,7 @@ func (c *client) dialUDP() Session {
<-
gxtime
.
After
(
connectInterval
)
continue
}
// if err == nil {
return
newUDPSession
(
conn
,
c
)
//}
}
}
...
...
connection.go
View file @
c8ebbd87
...
...
@@ -122,7 +122,7 @@ func (c gettyConn) writeTimeout() time.Duration {
return
c
.
wTimeout
}
// Pls do not set write deadline for websocket connection. AlexStocks 20180310
//
SetWriteTimeout
Pls do not set write deadline for websocket connection. AlexStocks 20180310
// gorilla/websocket/conn.go:NextWriter will always fail when got a timeout error.
//
// Pls do not set write deadline when using compression. AlexStocks 20180314.
...
...
@@ -201,7 +201,7 @@ func (t *writeFlusher) Write(p []byte) (int, error) {
return
n
,
nil
}
// set compress type(tcp: zip/snappy, websocket:zip)
//
SetCompressType
set compress type(tcp: zip/snappy, websocket:zip)
func
(
t
*
gettyTCPConn
)
SetCompressType
(
c
CompressType
)
{
switch
c
{
case
CompressNone
,
CompressZip
,
CompressBestSpeed
,
CompressBestCompression
,
CompressHuffman
:
...
...
@@ -505,7 +505,7 @@ func newGettyWSConn(conn *websocket.Conn) *gettyWSConn {
return
gettyWSConn
}
// set compress type
//
SetCompressType
set compress type
func
(
w
*
gettyWSConn
)
SetCompressType
(
c
CompressType
)
{
switch
c
{
case
CompressNone
,
CompressZip
,
CompressBestSpeed
,
CompressBestCompression
,
CompressHuffman
:
...
...
getty.go
View file @
c8ebbd87
...
...
@@ -35,7 +35,7 @@ type NewSessionCallback func(Session) error
// Reader is used to unmarshal a complete pkg from buffer
type
Reader
interface
{
// Parse tcp/udp/websocket pkg from buffer and if possible return a complete pkg.
//
Read
Parse tcp/udp/websocket pkg from buffer and if possible return a complete pkg.
// When receiving a tcp network streaming segment, there are 4 cases as following:
// case 1: a error found in the streaming segment;
// case 2: can not unmarshal a pkg header from the streaming segment;
...
...
@@ -53,7 +53,7 @@ type Reader interface {
// Writer is used to marshal pkg and write to session
type
Writer
interface
{
// if @Session is udpGettySession, the second parameter is UDPContext.
//
Write
if @Session is udpGettySession, the second parameter is UDPContext.
Write
(
Session
,
interface
{})
([]
byte
,
error
)
}
...
...
@@ -65,20 +65,20 @@ type ReadWriter interface {
// EventListener is used to process pkg that received from remote session
type
EventListener
interface
{
// invoked when session opened
//
OnOpen
invoked when session opened
// If the return error is not nil, @Session will be closed.
OnOpen
(
Session
)
error
// invoked when session closed.
//
OnClose
invoked when session closed.
OnClose
(
Session
)
// invoked when got error.
//
OnError
invoked when got error.
OnError
(
Session
,
error
)
// invoked periodically, its period can be set by (Session)SetCronPeriod
//
OnCron
invoked periodically, its period can be set by (Session)SetCronPeriod
OnCron
(
Session
)
// invoked when getty received a package. Pls attention that do not handle long time
//
OnMessage
invoked when getty received a package. Pls attention that do not handle long time
// logic processing in this func. You'd better set the package's maximum length.
// If the message's length is greater than it, u should should return err in
// Reader{Read} and getty will close this connection soon.
...
...
@@ -92,10 +92,6 @@ type EventListener interface {
OnMessage
(
Session
,
interface
{})
}
/////////////////////////////////////////
// compress
/////////////////////////////////////////
type
CompressType
int
const
(
...
...
@@ -107,10 +103,7 @@ const (
CompressSnappy
=
10
)
/////////////////////////////////////////
// connection
/////////////////////////////////////////
// Connection wrap some connection params and operations
type
Connection
interface
{
ID
()
uint32
SetCompressType
(
CompressType
)
...
...
@@ -118,9 +111,9 @@ type Connection interface {
RemoteAddr
()
string
incReadPkgNum
()
incWritePkgNum
()
// update session's active time
//
UpdateActive
update session's active time
UpdateActive
()
// get session's active time
//
GetActive
get session's active time
GetActive
()
time
.
Time
readTimeout
()
time
.
Duration
// SetReadTimeout sets deadline for the future read calls.
...
...
@@ -152,7 +145,7 @@ type Session interface {
Conn
()
net
.
Conn
Stat
()
string
IsClosed
()
bool
// get endpoint type
//
EndPoint
get endpoint type
EndPoint
()
EndPoint
SetMaxMsgLen
(
int
)
...
...
@@ -169,7 +162,7 @@ type Session interface {
SetAttribute
(
interface
{},
interface
{})
RemoveAttribute
(
interface
{})
// the Writer will invoke this function. Pls attention that if timeout is less than 0, WritePkg will send @pkg asap.
//
WritePkg
the Writer will invoke this function. Pls attention that if timeout is less than 0, WritePkg will send @pkg asap.
// for udp session, the first parameter should be UDPContext.
// totalBytesLength: @pkg stream bytes length after encoding @pkg.
// sendBytesLength: stream bytes length that sent out successfully.
...
...
@@ -185,16 +178,17 @@ type Session interface {
/////////////////////////////////////////
type
EndPoint
interface
{
// get EndPoint ID
//
ID
get EndPoint ID
ID
()
EndPointID
// get endpoint type
//
EndPointType
get endpoint type
EndPointType
()
EndPointType
// run event loop and serves client request.
//
RunEventLoop
run event loop and serves client request.
RunEventLoop
(
newSession
NewSessionCallback
)
// check the endpoint has been closed
//
IsClosed
check the endpoint has been closed
IsClosed
()
bool
// close the endpoint and free its resource
//
Close
close the endpoint and free its resource
Close
()
// GetTaskPool get task pool implemented by dubbogo/gost
GetTaskPool
()
gxsync
.
GenericTaskPool
}
...
...
@@ -210,13 +204,13 @@ type Server interface {
// StreamServer is like tcp/websocket/wss server
type
StreamServer
interface
{
Server
// get the network listener
//
Listener
get the network listener
Listener
()
net
.
Listener
}
// PacketServer is like udp listen endpoint
type
PacketServer
interface
{
Server
// get the network listener
//
PacketConn
get the network listener
PacketConn
()
net
.
PacketConn
}
logger.go
View file @
c8ebbd87
...
...
@@ -97,7 +97,7 @@ func init() {
// }()
}
// SetLogger
:
customize yourself logger.
// SetLogger customize yourself logger.
func
SetLogger
(
logger
Logger
)
{
log
=
logger
}
...
...
@@ -107,7 +107,7 @@ func GetLogger() Logger {
return
log
}
// SetLoggerLevel
// SetLoggerLevel
set logger level
func
SetLoggerLevel
(
level
LoggerLevel
)
error
{
var
err
error
zapLoggerConfig
.
Level
=
zap
.
NewAtomicLevelAt
(
zapcore
.
Level
(
level
))
...
...
@@ -119,7 +119,7 @@ func SetLoggerLevel(level LoggerLevel) error {
return
nil
}
// SetLoggerCallerDisable
:
disable caller info in production env for performance improve.
// SetLoggerCallerDisable disable caller info in production env for performance improve.
// It is highly recommended that you execute this method in a production environment.
func
SetLoggerCallerDisable
()
error
{
var
err
error
...
...
options.go
View file @
c8ebbd87
...
...
@@ -37,56 +37,56 @@ type ServerOptions struct {
tPool
gxsync
.
GenericTaskPool
}
// @addr server listen address.
//
WithLocalAddress
@addr server listen address.
func
WithLocalAddress
(
addr
string
)
ServerOption
{
return
func
(
o
*
ServerOptions
)
{
o
.
addr
=
addr
}
}
// @path: websocket request url path
//
WithWebsocketServerPath
@path: websocket request url path
func
WithWebsocketServerPath
(
path
string
)
ServerOption
{
return
func
(
o
*
ServerOptions
)
{
o
.
path
=
path
}
}
// @cert: server certificate file
//
WithWebsocketServerCert
@cert: server certificate file
func
WithWebsocketServerCert
(
cert
string
)
ServerOption
{
return
func
(
o
*
ServerOptions
)
{
o
.
cert
=
cert
}
}
// @key: server private key(contains its public key)
//
WithWebsocketServerPrivateKey
@key: server private key(contains its public key)
func
WithWebsocketServerPrivateKey
(
key
string
)
ServerOption
{
return
func
(
o
*
ServerOptions
)
{
o
.
privateKey
=
key
}
}
// @cert is the root certificate file to verify the legitimacy of server
//
WithWebsocketServerRootCert
@cert is the root certificate file to verify the legitimacy of server
func
WithWebsocketServerRootCert
(
cert
string
)
ServerOption
{
return
func
(
o
*
ServerOptions
)
{
o
.
caCert
=
cert
}
}
// @pool server task pool.
//
WithServerTaskPool
@pool server task pool.
func
WithServerTaskPool
(
pool
gxsync
.
GenericTaskPool
)
ServerOption
{
return
func
(
o
*
ServerOptions
)
{
o
.
tPool
=
pool
}
}
//
@With
SslEnabled enable use tls
//
WithServer
SslEnabled enable use tls
func
WithServerSslEnabled
(
sslEnabled
bool
)
ServerOption
{
return
func
(
o
*
ServerOptions
)
{
o
.
sslEnabled
=
sslEnabled
}
}
//
@WithServerKeyCertChainPath
sslConfig is tls config
//
WithServerTlsConfigBuilder
sslConfig is tls config
func
WithServerTlsConfigBuilder
(
tlsConfigBuilder
TlsConfigBuilder
)
ServerOption
{
return
func
(
o
*
ServerOptions
)
{
o
.
tlsConfigBuilder
=
tlsConfigBuilder
...
...
@@ -116,14 +116,14 @@ type ClientOptions struct {
tPool
gxsync
.
GenericTaskPool
}
// @addr is server address.
//
WithServerAddress
@addr is server address.
func
WithServerAddress
(
addr
string
)
ClientOption
{
return
func
(
o
*
ClientOptions
)
{
o
.
addr
=
addr
}
}
// @reconnectInterval is server address.
//
WithReconnectInterval
@reconnectInterval is server address.
func
WithReconnectInterval
(
reconnectInterval
int
)
ClientOption
{
return
func
(
o
*
ClientOptions
)
{
if
0
<
reconnectInterval
{
...
...
@@ -132,14 +132,14 @@ func WithReconnectInterval(reconnectInterval int) ClientOption {
}
}
// @pool client task pool.
//
WithClientTaskPool
@pool client task pool.
func
WithClientTaskPool
(
pool
gxsync
.
GenericTaskPool
)
ClientOption
{
return
func
(
o
*
ClientOptions
)
{
o
.
tPool
=
pool
}
}
// @num is connection number.
//
WithConnectionNumber
@num is connection number.
func
WithConnectionNumber
(
num
int
)
ClientOption
{
return
func
(
o
*
ClientOptions
)
{
if
0
<
num
{
...
...
@@ -148,21 +148,21 @@ func WithConnectionNumber(num int) ClientOption {
}
}
// @certs is client certificate file. it can be empty.
//
WithRootCertificateFile
@certs is client certificate file. it can be empty.
func
WithRootCertificateFile
(
cert
string
)
ClientOption
{
return
func
(
o
*
ClientOptions
)
{
o
.
cert
=
cert
}
}
//
@With
SslEnabled enable use tls
//
WithClient
SslEnabled enable use tls
func
WithClientSslEnabled
(
sslEnabled
bool
)
ClientOption
{
return
func
(
o
*
ClientOptions
)
{
o
.
sslEnabled
=
sslEnabled
}
}
//
@WithClientKeyCertChainPath
sslConfig is tls config
//
WithClientTlsConfigBuilder
sslConfig is tls config
func
WithClientTlsConfigBuilder
(
tlsConfigBuilder
TlsConfigBuilder
)
ClientOption
{
return
func
(
o
*
ClientOptions
)
{
o
.
tlsConfigBuilder
=
tlsConfigBuilder
...
...
server.go
View file @
c8ebbd87
...
...
@@ -87,7 +87,7 @@ func NewTCPServer(opts ...ServerOption) Server {
}
// NewUDPEndPoint builds a unconnected udp server.
func
NewUDP
P
EndPoint
(
opts
...
ServerOption
)
Server
{
func
NewUDPEndPoint
(
opts
...
ServerOption
)
Server
{
return
newServer
(
UDP_ENDPOINT
,
opts
...
)
}
...
...
@@ -255,7 +255,7 @@ func (s *server) accept(newSession NewSessionCallback) (Session, error) {
return
ss
,
nil
}
func
(
s
*
server
)
runT
cp
EventLoop
(
newSession
NewSessionCallback
)
{
func
(
s
*
server
)
runT
CP
EventLoop
(
newSession
NewSessionCallback
)
{
s
.
wg
.
Add
(
1
)
go
func
()
{
defer
s
.
wg
.
Done
()
...
...
@@ -469,7 +469,7 @@ func (s *server) RunEventLoop(newSession NewSessionCallback) {
switch
s
.
endPointType
{
case
TCP_SERVER
:
s
.
runT
cp
EventLoop
(
newSession
)
s
.
runT
CP
EventLoop
(
newSession
)
case
UDP_ENDPOINT
:
s
.
runUDPEventLoop
(
newSession
)
case
WS_SERVER
:
...
...
session.go
View file @
c8ebbd87
...
...
@@ -53,10 +53,6 @@ const (
outputFormat
=
"session %s, Read Bytes: %d, Write Bytes: %d, Read Pkgs: %d, Write Pkgs: %d"
)
/////////////////////////////////////////
// session
/////////////////////////////////////////
var
defaultTimerWheel
*
gxtime
.
TimerWheel
func
init
()
{
...
...
@@ -156,7 +152,6 @@ func (s *session) Reset() {
}
}
// func (s *session) SetConn(conn net.Conn) { s.gettyConn = newGettyConn(conn) }
func
(
s
*
session
)
Conn
()
net
.
Conn
{
if
tc
,
ok
:=
s
.
Connection
.
(
*
gettyTCPConn
);
ok
{
return
tc
.
conn
...
...
@@ -193,7 +188,7 @@ func (s *session) gettyConn() *gettyConn {
return
nil
}
//
return
the connect statistic data
//
Stat get
the connect statistic data
func
(
s
*
session
)
Stat
()
string
{
var
conn
*
gettyConn
if
conn
=
s
.
gettyConn
();
conn
==
nil
{
...
...
@@ -209,7 +204,7 @@ func (s *session) Stat() string {
)
}
// check whether the session has been closed.
//
IsClosed
check whether the session has been closed.
func
(
s
*
session
)
IsClosed
()
bool
{
select
{
case
<-
s
.
done
:
...
...
@@ -220,7 +215,7 @@ func (s *session) IsClosed() bool {
}
}
// set maximum package length of every package in (EventListener)OnMessage(@pkgs)
//
SetMaxMsgLen
set maximum package length of every package in (EventListener)OnMessage(@pkgs)
func
(
s
*
session
)
SetMaxMsgLen
(
length
int
)
{
s
.
lock
.
Lock
()
defer
s
.
lock
.
Unlock
()
...
...
@@ -228,7 +223,7 @@ func (s *session) SetMaxMsgLen(length int) {
s
.
maxMsgLen
=
int32
(
length
)
}
// set session name
//
SetName
set session name
func
(
s
*
session
)
SetName
(
name
string
)
{
s
.
lock
.
Lock
()
defer
s
.
lock
.
Unlock
()
...
...
@@ -236,7 +231,7 @@ func (s *session) SetName(name string) {
s
.
name
=
name
}
//
set EventL
istener
//
SetEventListener set event l
istener
func
(
s
*
session
)
SetEventListener
(
listener
EventListener
)
{
s
.
lock
.
Lock
()
defer
s
.
lock
.
Unlock
()
...
...
@@ -244,7 +239,7 @@ func (s *session) SetEventListener(listener EventListener) {
s
.
listener
=
listener
}
// set package handler
//
SetPkgHandler
set package handler
func
(
s
*
session
)
SetPkgHandler
(
handler
ReadWriter
)
{
s
.
lock
.
Lock
()
defer
s
.
lock
.
Unlock
()
...
...
@@ -253,7 +248,6 @@ func (s *session) SetPkgHandler(handler ReadWriter) {
s
.
writer
=
handler
}
// set Reader
func
(
s
*
session
)
SetReader
(
reader
Reader
)
{
s
.
lock
.
Lock
()
defer
s
.
lock
.
Unlock
()
...
...
@@ -261,7 +255,6 @@ func (s *session) SetReader(reader Reader) {
s
.
reader
=
reader
}
// set Writer
func
(
s
*
session
)
SetWriter
(
writer
Writer
)
{
s
.
lock
.
Lock
()
defer
s
.
lock
.
Unlock
()
...
...
@@ -269,7 +262,7 @@ func (s *session) SetWriter(writer Writer) {
s
.
writer
=
writer
}
// period is in millisecond. Websocket session will send ping frame automatically every peroid.
//
SetCronPeriod
period is in millisecond. Websocket session will send ping frame automatically every peroid.
func
(
s
*
session
)
SetCronPeriod
(
period
int
)
{
if
period
<
1
{
panic
(
"@period < 1"
)
...
...
@@ -280,7 +273,7 @@ func (s *session) SetCronPeriod(period int) {
s
.
period
=
time
.
Duration
(
period
)
*
time
.
Millisecond
}
// set maximum wait time when session got error or got exit signal
//
SetWaitTime
set maximum wait time when session got error or got exit signal
func
(
s
*
session
)
SetWaitTime
(
waitTime
time
.
Duration
)
{
if
waitTime
<
1
{
panic
(
"@wait < 1"
)
...
...
@@ -291,7 +284,7 @@ func (s *session) SetWaitTime(waitTime time.Duration) {
s
.
wait
=
waitTime
}
//
s
et attribute of key @session:key
//
GetAttribute g
et attribute of key @session:key
func
(
s
*
session
)
GetAttribute
(
key
interface
{})
interface
{}
{
s
.
lock
.
RLock
()
if
s
.
attrs
==
nil
{
...
...
@@ -308,7 +301,7 @@ func (s *session) GetAttribute(key interface{}) interface{} {
return
ret
}
//
g
et attribute of key @session:key
//
SetAttribute s
et attribute of key @session:key
func
(
s
*
session
)
SetAttribute
(
key
interface
{},
value
interface
{})
{
s
.
lock
.
Lock
()
if
s
.
attrs
!=
nil
{
...
...
@@ -317,7 +310,7 @@ func (s *session) SetAttribute(key interface{}, value interface{}) {
s
.
lock
.
Unlock
()
}
//
delet
e attribute of key @session:key
//
RemoveAttribute remov
e attribute of key @session:key
func
(
s
*
session
)
RemoveAttribute
(
key
interface
{})
{
s
.
lock
.
Lock
()
if
s
.
attrs
!=
nil
{
...
...
@@ -381,7 +374,7 @@ func (s *session) WritePkg(pkg interface{}, timeout time.Duration) (int, int, er
return
len
(
pkgBytes
),
succssCount
,
nil
}
// for codecs
//
WriteBytes
for codecs
func
(
s
*
session
)
WriteBytes
(
pkg
[]
byte
)
(
int
,
error
)
{
if
s
.
IsClosed
()
{
return
0
,
ErrSessionClosed
...
...
@@ -394,7 +387,7 @@ func (s *session) WriteBytes(pkg []byte) (int, error) {
return
lg
,
nil
}
// Write multiple packages at once. so we invoke write sys.call just one time.
// Write
BytesArray Write
multiple packages at once. so we invoke write sys.call just one time.
func
(
s
*
session
)
WriteBytesArray
(
pkgs
...
[]
byte
)
(
int
,
error
)
{
if
s
.
IsClosed
()
{
return
0
,
ErrSessionClosed
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment