Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in
Toggle navigation
G
getty
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
wei.xuan
getty
Commits
8d9cf087
Commit
8d9cf087
authored
May 23, 2019
by
u0x01
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Dep: replace dubbogo/log4go to go.uber.org/zap
parent
ac16c937
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
182 additions
and
66 deletions
+182
-66
client.go
client.go
+8
-9
conn.go
conn.go
+6
-7
go.mod
go.mod
+4
-1
go.sum
go.sum
+13
-8
logger.go
logger.go
+112
-0
server.go
server.go
+11
-12
session.go
session.go
+28
-29
No files found.
client.go
View file @
8d9cf087
...
...
@@ -22,7 +22,6 @@ import (
)
import
(
log
"github.com/dubbogo/log4go"
"github.com/gorilla/websocket"
perrors
"github.com/pkg/errors"
)
...
...
@@ -138,7 +137,7 @@ func (c *client) dialTCP() Session {
return
newTCPSession
(
conn
,
c
)
}
log
.
Info
(
"net.DialTimeout(addr:%s, timeout:%v) = error:%+v"
,
c
.
addr
,
err
)
log
.
Info
f
(
"net.DialTimeout(addr:%s, timeout:%v) = error:%+v"
,
c
.
addr
,
connectTimeout
,
err
)
// time.Sleep(connInterval)
<-
wheel
.
After
(
connInterval
)
}
...
...
@@ -167,7 +166,7 @@ func (c *client) dialUDP() Session {
err
=
errSelfConnect
}
if
err
!=
nil
{
log
.
Warn
(
"net.DialTimeout(addr:%s, timeout:%v) = error:%+v"
,
c
.
addr
,
err
)
log
.
Warn
f
(
"net.DialTimeout(addr:%s, timeout:%v) = error:%+v"
,
c
.
addr
,
err
)
// time.Sleep(connInterval)
<-
wheel
.
After
(
connInterval
)
continue
...
...
@@ -177,7 +176,7 @@ func (c *client) dialUDP() Session {
conn
.
SetWriteDeadline
(
time
.
Now
()
.
Add
(
1e9
))
if
length
,
err
=
conn
.
Write
(
connectPingPackage
[
:
]);
err
!=
nil
{
conn
.
Close
()
log
.
Warn
(
"conn.Write(%s) = {length:%d, err:%+v}"
,
string
(
connectPingPackage
),
length
,
err
)
log
.
Warn
f
(
"conn.Write(%s) = {length:%d, err:%+v}"
,
string
(
connectPingPackage
),
length
,
err
)
// time.Sleep(connInterval)
<-
wheel
.
After
(
connInterval
)
continue
...
...
@@ -188,7 +187,7 @@ func (c *client) dialUDP() Session {
err
=
nil
}
if
err
!=
nil
{
log
.
Info
(
"conn{%#v}.Read() = {length:%d, err:%+v}"
,
conn
,
length
,
err
)
log
.
Info
f
(
"conn{%#v}.Read() = {length:%d, err:%+v}"
,
conn
,
length
,
err
)
conn
.
Close
()
// time.Sleep(connInterval)
<-
wheel
.
After
(
connInterval
)
...
...
@@ -214,7 +213,7 @@ func (c *client) dialWS() Session {
return
nil
}
conn
,
_
,
err
=
dialer
.
Dial
(
c
.
addr
,
nil
)
log
.
Info
(
"websocket.dialer.Dial(addr:%s) = error:%+v"
,
c
.
addr
,
err
)
log
.
Info
f
(
"websocket.dialer.Dial(addr:%s) = error:%+v"
,
c
.
addr
,
err
)
if
err
==
nil
&&
IsSameAddr
(
conn
.
RemoteAddr
(),
conn
.
LocalAddr
())
{
conn
.
Close
()
err
=
errSelfConnect
...
...
@@ -228,7 +227,7 @@ func (c *client) dialWS() Session {
return
ss
}
log
.
Info
(
"websocket.dialer.Dial(addr:%s) = error:%+v"
,
c
.
addr
,
err
)
log
.
Info
f
(
"websocket.dialer.Dial(addr:%s) = error:%+v"
,
c
.
addr
,
err
)
// time.Sleep(connInterval)
<-
wheel
.
After
(
connInterval
)
}
...
...
@@ -307,7 +306,7 @@ func (c *client) dialWSS() Session {
return
ss
}
log
.
Info
(
"websocket.dialer.Dial(addr:%s) = error:%+v"
,
c
.
addr
,
err
)
log
.
Info
f
(
"websocket.dialer.Dial(addr:%s) = error:%+v"
,
c
.
addr
,
err
)
// time.Sleep(connInterval)
<-
wheel
.
After
(
connInterval
)
}
...
...
@@ -398,7 +397,7 @@ func (c *client) reConnect() {
// c.Unlock()
for
{
if
c
.
IsClosed
()
{
log
.
Warn
(
"client{peer:%s} goroutine exit now."
,
c
.
addr
)
log
.
Warn
f
(
"client{peer:%s} goroutine exit now."
,
c
.
addr
)
break
}
...
...
conn.go
View file @
8d9cf087
...
...
@@ -21,7 +21,6 @@ import (
)
import
(
log
"github.com/dubbogo/log4go"
"github.com/golang/snappy"
"github.com/gorilla/websocket"
perrors
"github.com/pkg/errors"
...
...
@@ -250,7 +249,7 @@ func (t *gettyTCPConn) read(p []byte) (int, error) {
}
length
,
err
=
t
.
reader
.
Read
(
p
)
log
.
Debug
(
"now:%s, length:%d, err:%v"
,
currentTime
,
length
,
err
)
log
.
Debug
f
(
"now:%s, length:%d, err:%v"
,
currentTime
,
length
,
err
)
atomic
.
AddUint32
(
&
t
.
readBytes
,
uint32
(
length
))
return
length
,
perrors
.
WithStack
(
err
)
//return length, err
...
...
@@ -285,7 +284,7 @@ func (t *gettyTCPConn) Write(pkg interface{}) (int, error) {
if
length
,
err
=
t
.
writer
.
Write
(
p
);
err
==
nil
{
atomic
.
AddUint32
(
&
t
.
writeBytes
,
(
uint32
)(
len
(
p
)))
}
log
.
Debug
(
"now:%s, length:%d, err:%v"
,
currentTime
,
length
,
err
)
log
.
Debug
f
(
"now:%s, length:%d, err:%v"
,
currentTime
,
length
,
err
)
return
length
,
perrors
.
WithStack
(
err
)
//return length, err
}
...
...
@@ -299,7 +298,7 @@ func (t *gettyTCPConn) close(waitSec int) {
if
t
.
conn
!=
nil
{
if
writer
,
ok
:=
t
.
writer
.
(
*
snappy
.
Writer
);
ok
{
if
err
:=
writer
.
Close
();
err
!=
nil
{
log
.
Error
(
"snappy.Writer.Close() = error:%+v"
,
err
)
log
.
Error
f
(
"snappy.Writer.Close() = error:%+v"
,
err
)
}
}
t
.
conn
.
(
*
net
.
TCPConn
)
.
SetLinger
(
waitSec
)
...
...
@@ -400,7 +399,7 @@ func (u *gettyUDPConn) read(p []byte) (int, *net.UDPAddr, error) {
}
length
,
addr
,
err
=
u
.
conn
.
ReadFromUDP
(
p
)
// connected udp also can get return @addr
log
.
Debug
(
"ReadFromUDP() = {length:%d, peerAddr:%s, error:%s}"
,
length
,
addr
,
err
)
log
.
Debug
f
(
"ReadFromUDP() = {length:%d, peerAddr:%s, error:%s}"
,
length
,
addr
,
err
)
if
err
==
nil
{
atomic
.
AddUint32
(
&
u
.
readBytes
,
uint32
(
length
))
}
...
...
@@ -450,7 +449,7 @@ func (u *gettyUDPConn) Write(udpCtx interface{}) (int, error) {
if
length
,
_
,
err
=
u
.
conn
.
WriteMsgUDP
(
buf
,
nil
,
peerAddr
);
err
==
nil
{
atomic
.
AddUint32
(
&
u
.
writeBytes
,
(
uint32
)(
len
(
buf
)))
}
log
.
Debug
(
"WriteMsgUDP(peerAddr:%s) = {length:%d, error:%s}"
,
peerAddr
,
length
,
err
)
log
.
Debug
f
(
"WriteMsgUDP(peerAddr:%s) = {length:%d, error:%s}"
,
peerAddr
,
length
,
err
)
return
length
,
perrors
.
WithStack
(
err
)
//return length, err
...
...
@@ -546,7 +545,7 @@ func (w *gettyWSConn) read() ([]byte, error) {
w
.
incReadPkgNum
()
}
else
{
if
websocket
.
IsUnexpectedCloseError
(
e
,
websocket
.
CloseGoingAway
)
{
log
.
Warn
(
"websocket unexpected close error: %v"
,
e
)
log
.
Warn
f
(
"websocket unexpected close error: %v"
,
e
)
}
}
...
...
go.mod
View file @
8d9cf087
module github.com/dubbogo/getty
require (
github.com/dubbogo/log4go v0.0.0-20190406152735-41c57e1073e9
github.com/golang/snappy v0.0.1
github.com/gorilla/websocket v1.4.0
github.com/pkg/errors v0.8.1
github.com/stretchr/testify v1.3.0 // indirect
go.uber.org/atomic v1.4.0 // indirect
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.10.0
golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53
)
go.sum
View file @
8d9cf087
github.com/d
ubbogo/log4go v0.0.0-20190406152735-41c57e1073e9 h1:RCRkCLJPUZNyAHLEEJvbFrNkyzmmzFnrRbk+eGvUwNQ
=
github.com/d
ubbogo/log4go v0.0.0-20190406152735-41c57e1073e9/go.mod h1:iyyiSbUgJZcUgpt4hQs7YHZUop6982EGjQxIBeEmevQ
=
github.com/d
avecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8
=
github.com/d
avecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38
=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/mailru/easyjson v0.0.0-20190403194419-1ea4449da983 h1:wL11wNW7dhKIcRCHSm4sHKPWz0tt4mwBsVodG7+Xyqg=
github.com/mailru/easyjson v0.0.0-20190403194419-1ea4449da983/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mattn/go-isatty v0.0.7 h1:UvyT9uN+3r7yLEYSlJsbQGdsaB/a0DlgWP3pql6iwOc=
github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53 h1:kcXqo9vE6fsZY5X5Rd7R1l7fTgnWaDCVmln65REefiE=
golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 h1:DH4skfRX4EBpamg7iV4ZlCpblAHI6s6TDM39bFZumv8=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
logger.go
0 → 100644
View file @
8d9cf087
package
getty
import
(
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// Logger for user who want to customize logger of getty
type
Logger
interface
{
Info
(
args
...
interface
{})
Warn
(
args
...
interface
{})
Error
(
args
...
interface
{})
Debug
(
args
...
interface
{})
Infof
(
fmt
string
,
args
...
interface
{})
Warnf
(
fmt
string
,
args
...
interface
{})
Errorf
(
fmt
string
,
args
...
interface
{})
Debugf
(
fmt
string
,
args
...
interface
{})
}
type
LoggerLevel
int8
const
(
// DebugLevel logs are typically voluminous, and are usually disabled in
// production.
LoggerLevelDebug
=
LoggerLevel
(
zapcore
.
DebugLevel
)
// InfoLevel is the default logging priority.
LoggerLevelInfo
=
LoggerLevel
(
zapcore
.
InfoLevel
)
// WarnLevel logs are more important than Infof, but don't need individual
// human review.
LoggerLevelWarn
=
LoggerLevel
(
zapcore
.
WarnLevel
)
// ErrorLevel logs are high-priority. If an application is running smoothly,
// it shouldn't generate any error-level logs.
LoggerLevelError
=
LoggerLevel
(
zapcore
.
ErrorLevel
)
// DPanicLevel logs are particularly important errors. In development the
// logger panics after writing the message.
LoggerLevelDPanic
=
LoggerLevel
(
zapcore
.
DPanicLevel
)
// PanicLevel logs a message, then panics.
LoggerLevelPanic
=
LoggerLevel
(
zapcore
.
PanicLevel
)
// FatalLevel logs a message, then calls os.Exit(1).
LoggerLevelFatal
=
LoggerLevel
(
zapcore
.
FatalLevel
)
)
var
(
log
Logger
zapLogger
*
zap
.
Logger
zapLoggerConfig
=
zap
.
NewDevelopmentConfig
()
zapLoggerEncoderConfig
=
zapcore
.
EncoderConfig
{
TimeKey
:
"time"
,
LevelKey
:
"level"
,
NameKey
:
"logger"
,
CallerKey
:
"caller"
,
MessageKey
:
"message"
,
StacktraceKey
:
"stacktrace"
,
EncodeLevel
:
zapcore
.
CapitalColorLevelEncoder
,
EncodeTime
:
zapcore
.
ISO8601TimeEncoder
,
EncodeDuration
:
zapcore
.
SecondsDurationEncoder
,
EncodeCaller
:
zapcore
.
ShortCallerEncoder
,
}
)
func
init
()
{
zapLoggerConfig
.
EncoderConfig
=
zapLoggerEncoderConfig
zapLogger
,
_
=
zapLoggerConfig
.
Build
()
log
=
zapLogger
.
Sugar
()
// todo: flushes buffer when redirect log to file.
// var exitSignal = make(chan os.Signal)
// signal.Notify(exitSignal, syscall.SIGTERM, syscall.SIGINT)
// go func() {
// <-exitSignal
// // Sync calls the underlying Core's Sync method, flushing any buffered log
// // entries. Applications should take care to call Sync before exiting.
// err := zapLogger.Sync() // flushes buffer, if any
// if err != nil {
// fmt.Printf("zapLogger sync err: %+v", perrors.WithStack(err))
// }
// os.Exit(0)
// }()
}
// SetLogger: customize yourself logger.
func
SetLogger
(
logger
Logger
)
{
log
=
logger
}
// SetLoggerLevel
func
SetLoggerLevel
(
level
LoggerLevel
)
error
{
var
err
error
zapLoggerConfig
.
Level
=
zap
.
NewAtomicLevelAt
(
zapcore
.
Level
(
level
))
zapLogger
,
err
=
zapLoggerConfig
.
Build
()
if
err
!=
nil
{
return
err
}
log
=
zapLogger
.
Sugar
()
return
nil
}
// SetLoggerCallerDisable: disable caller info in production env for performance improve.
// It is highly recommended that you execute this method in a production environment.
func
SetLoggerCallerDisable
()
error
{
var
err
error
zapLoggerConfig
.
Development
=
false
zapLoggerConfig
.
DisableCaller
=
true
zapLogger
,
err
=
zapLoggerConfig
.
Build
()
if
err
!=
nil
{
return
err
}
log
=
zapLogger
.
Sugar
()
return
nil
}
server.go
View file @
8d9cf087
...
...
@@ -22,14 +22,13 @@ import (
)
import
(
log
"github.com/dubbogo/log4go"
"github.com/gorilla/websocket"
perrors
"github.com/pkg/errors"
)
var
(
errSelfConnect
=
perrors
.
New
(
"connect self!"
)
serverFastFailTimeout
=
time
.
Second
*
1
serverFastFailTimeout
=
time
.
Second
*
1
)
type
server
struct
{
...
...
@@ -117,7 +116,7 @@ func (s *server) stop() {
if
err
=
s
.
server
.
Shutdown
(
ctx
);
err
!=
nil
{
// if the log output is "shutdown ctx: context deadline exceeded", it means that
// there are still some active connections.
log
.
Error
(
"server shutdown ctx:%s error:%s"
,
ctx
,
err
)
log
.
Error
f
(
"server shutdown ctx:%s error:%s"
,
ctx
,
err
)
}
}
s
.
server
=
nil
...
...
@@ -205,7 +204,7 @@ func (s *server) accept(newSession NewSessionCallback) (Session, error) {
return
nil
,
perrors
.
WithStack
(
err
)
}
if
IsSameAddr
(
conn
.
RemoteAddr
(),
conn
.
LocalAddr
())
{
log
.
Warn
(
"conn.localAddr{%s} == conn.RemoteAddr"
,
conn
.
LocalAddr
()
.
String
(),
conn
.
RemoteAddr
()
.
String
())
log
.
Warn
f
(
"conn.localAddr{%s} == conn.RemoteAddr"
,
conn
.
LocalAddr
()
.
String
(),
conn
.
RemoteAddr
()
.
String
())
return
nil
,
errSelfConnect
}
...
...
@@ -230,7 +229,7 @@ func (s *server) runTcpEventLoop(newSession NewSessionCallback) {
)
for
{
if
s
.
IsClosed
()
{
log
.
Warn
(
"server{%s} stop acceptting client connect request."
,
s
.
addr
)
log
.
Warn
f
(
"server{%s} stop acceptting client connect request."
,
s
.
addr
)
return
}
if
delay
!=
0
{
...
...
@@ -250,7 +249,7 @@ func (s *server) runTcpEventLoop(newSession NewSessionCallback) {
}
continue
}
log
.
Warn
(
"server{%s}.Accept() = err {%+v}"
,
s
.
addr
,
err
)
log
.
Warn
f
(
"server{%s}.Accept() = err {%+v}"
,
s
.
addr
,
err
)
continue
}
delay
=
0
...
...
@@ -301,17 +300,17 @@ func (s *wsHandler) serveWSRequest(w http.ResponseWriter, r *http.Request) {
if
s
.
server
.
IsClosed
()
{
http
.
Error
(
w
,
"HTTP server is closed(code:500-11)."
,
500
)
log
.
Warn
(
"server{%s} stop acceptting client connect request."
,
s
.
server
.
addr
)
log
.
Warn
f
(
"server{%s} stop acceptting client connect request."
,
s
.
server
.
addr
)
return
}
conn
,
err
:=
s
.
upgrader
.
Upgrade
(
w
,
r
,
nil
)
if
err
!=
nil
{
log
.
Warn
(
"upgrader.Upgrader(http.Request{%#v}) = error:%+v"
,
r
,
err
)
log
.
Warn
f
(
"upgrader.Upgrader(http.Request{%#v}) = error:%+v"
,
r
,
err
)
return
}
if
conn
.
RemoteAddr
()
.
String
()
==
conn
.
LocalAddr
()
.
String
()
{
log
.
Warn
(
"conn.localAddr{%s} == conn.RemoteAddr"
,
conn
.
LocalAddr
()
.
String
(),
conn
.
RemoteAddr
()
.
String
())
log
.
Warn
f
(
"conn.localAddr{%s} == conn.RemoteAddr"
,
conn
.
LocalAddr
()
.
String
(),
conn
.
RemoteAddr
()
.
String
())
return
}
// conn.SetReadLimit(int64(handler.maxMsgLen))
...
...
@@ -319,7 +318,7 @@ func (s *wsHandler) serveWSRequest(w http.ResponseWriter, r *http.Request) {
err
=
s
.
newSession
(
ss
)
if
err
!=
nil
{
conn
.
Close
()
log
.
Warn
(
"server{%s}.newSession(ss{%#v}) = err {%s}"
,
s
.
server
.
addr
,
ss
,
err
)
log
.
Warn
f
(
"server{%s}.newSession(ss{%#v}) = err {%s}"
,
s
.
server
.
addr
,
ss
,
err
)
return
}
if
ss
.
(
*
session
)
.
maxMsgLen
>
0
{
...
...
@@ -353,7 +352,7 @@ func (s *server) runWSEventLoop(newSession NewSessionCallback) {
s
.
lock
.
Unlock
()
err
=
server
.
Serve
(
s
.
streamListener
)
if
err
!=
nil
{
log
.
Error
(
"http.server.Serve(addr{%s}) = err{%+v}"
,
s
.
addr
,
err
)
log
.
Error
f
(
"http.server.Serve(addr{%s}) = err{%+v}"
,
s
.
addr
,
err
)
// panic(err)
}
}()
...
...
@@ -415,7 +414,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
s
.
lock
.
Unlock
()
err
=
server
.
Serve
(
tls
.
NewListener
(
s
.
streamListener
,
config
))
if
err
!=
nil
{
log
.
Error
(
"http.server.Serve(addr{%s}) = err{%+v}"
,
s
.
addr
,
err
)
log
.
Error
f
(
"http.server.Serve(addr{%s}) = err{%+v}"
,
s
.
addr
,
err
)
panic
(
err
)
}
}()
...
...
session.go
View file @
8d9cf087
...
...
@@ -20,7 +20,6 @@ import (
)
import
(
log
"github.com/dubbogo/log4go"
"github.com/gorilla/websocket"
perrors
"github.com/pkg/errors"
)
...
...
@@ -246,7 +245,7 @@ func (s *session) SetRQLen(readQLen int) {
s
.
lock
.
Lock
()
s
.
rQ
=
make
(
chan
interface
{},
readQLen
)
s
.
lock
.
Unlock
()
log
.
Debug
(
"%s, [session.SetRQLen] rQ{len:%d, cap:%d}"
,
s
.
Stat
(),
len
(
s
.
rQ
),
cap
(
s
.
rQ
))
log
.
Debug
f
(
"%s, [session.SetRQLen] rQ{len:%d, cap:%d}"
,
s
.
Stat
(),
len
(
s
.
rQ
),
cap
(
s
.
rQ
))
}
// set @session's Write queue size
...
...
@@ -258,7 +257,7 @@ func (s *session) SetWQLen(writeQLen int) {
s
.
lock
.
Lock
()
s
.
wQ
=
make
(
chan
interface
{},
writeQLen
)
s
.
lock
.
Unlock
()
log
.
Debug
(
"%s, [session.SetWQLen] wQ{len:%d, cap:%d}"
,
s
.
Stat
(),
len
(
s
.
wQ
),
cap
(
s
.
wQ
))
log
.
Debug
f
(
"%s, [session.SetWQLen] wQ{len:%d, cap:%d}"
,
s
.
Stat
(),
len
(
s
.
wQ
),
cap
(
s
.
wQ
))
}
// set maximum wait time when session got error or got exit signal
...
...
@@ -322,7 +321,7 @@ func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error {
const
size
=
64
<<
10
rBuf
:=
make
([]
byte
,
size
)
rBuf
=
rBuf
[
:
runtime
.
Stack
(
rBuf
,
false
)]
log
.
Error
(
"[session.WritePkg] panic session %s: err=%s
\n
%s"
,
s
.
sessionToken
(),
r
,
rBuf
)
log
.
Error
f
(
"[session.WritePkg] panic session %s: err=%s
\n
%s"
,
s
.
sessionToken
(),
r
,
rBuf
)
}
}()
...
...
@@ -339,7 +338,7 @@ func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error {
break
// for possible gen a new pkg
case
<-
wheel
.
After
(
timeout
)
:
log
.
Warn
(
"%s, [session.WritePkg] wQ{len:%d, cap:%d}"
,
s
.
Stat
(),
len
(
s
.
wQ
),
cap
(
s
.
wQ
))
log
.
Warn
f
(
"%s, [session.WritePkg] wQ{len:%d, cap:%d}"
,
s
.
Stat
(),
len
(
s
.
wQ
),
cap
(
s
.
wQ
))
return
ErrSessionBlocked
}
...
...
@@ -453,14 +452,14 @@ func (s *session) handleLoop() {
const
size
=
64
<<
10
rBuf
:=
make
([]
byte
,
size
)
rBuf
=
rBuf
[
:
runtime
.
Stack
(
rBuf
,
false
)]
log
.
Error
(
"[session.handleLoop] panic session %s: err=%s
\n
%s"
,
s
.
sessionToken
(),
r
,
rBuf
)
log
.
Error
f
(
"[session.handleLoop] panic session %s: err=%s
\n
%s"
,
s
.
sessionToken
(),
r
,
rBuf
)
}
grNum
=
atomic
.
AddInt32
(
&
(
s
.
grNum
),
-
1
)
// if !s.errFlag {
s
.
listener
.
OnClose
(
s
)
// }
log
.
Info
(
"%s, [session.handleLoop] goroutine exit now, left gr num %d"
,
s
.
Stat
(),
grNum
)
log
.
Info
f
(
"%s, [session.handleLoop] goroutine exit now, left gr num %d"
,
s
.
Stat
(),
grNum
)
s
.
gc
()
}()
...
...
@@ -475,13 +474,13 @@ LOOP:
// this case branch assure the (session)handleLoop gr will exit before (session)handlePackage gr.
if
atomic
.
LoadInt32
(
&
(
s
.
grNum
))
==
1
{
// make sure @(session)handlePackage goroutine has been closed.
if
len
(
s
.
rQ
)
==
0
&&
len
(
s
.
wQ
)
==
0
{
log
.
Info
(
"%s, [session.handleLoop] got done signal. Both rQ and wQ are nil."
,
s
.
Stat
())
log
.
Info
f
(
"%s, [session.handleLoop] got done signal. Both rQ and wQ are nil."
,
s
.
Stat
())
break
LOOP
}
counter
.
Start
()
// if time.Since(start).Nanoseconds() >= s.wait.Nanoseconds() {
if
counter
.
Count
()
>
s
.
wait
.
Nanoseconds
()
{
log
.
Info
(
"%s, [session.handleLoop] got done signal "
,
s
.
Stat
())
log
.
Info
f
(
"%s, [session.handleLoop] got done signal "
,
s
.
Stat
())
break
LOOP
}
}
...
...
@@ -489,26 +488,26 @@ LOOP:
case
inPkg
=
<-
s
.
rQ
:
// read the s.rQ and assure (session)handlePackage gr will not block by (session)rQ.
if
flag
{
log
.
Debug
(
"%#v <-s.rQ"
,
inPkg
)
log
.
Debug
f
(
"%#v <-s.rQ"
,
inPkg
)
pkg
:=
inPkg
// go s.listener.OnMessage(s, pkg)
s
.
listener
.
OnMessage
(
s
,
pkg
)
s
.
incReadPkgNum
()
}
else
{
log
.
Info
(
"[session.handleLoop] drop readin package{%#v}"
,
inPkg
)
log
.
Info
f
(
"[session.handleLoop] drop readin package{%#v}"
,
inPkg
)
}
case
outPkg
=
<-
s
.
wQ
:
if
flag
{
if
err
=
s
.
writer
.
Write
(
s
,
outPkg
);
err
!=
nil
{
log
.
Error
(
"%s, [session.handleLoop] = error:%+v"
,
s
.
sessionToken
(),
err
)
log
.
Error
f
(
"%s, [session.handleLoop] = error:%+v"
,
s
.
sessionToken
(),
err
)
s
.
stop
()
flag
=
false
// break LOOP
}
s
.
incWritePkgNum
()
}
else
{
log
.
Info
(
"[session.handleLoop] drop writeout package{%#v}"
,
outPkg
)
log
.
Info
f
(
"[session.handleLoop] drop writeout package{%#v}"
,
outPkg
)
}
case
<-
wheel
.
After
(
s
.
period
)
:
...
...
@@ -516,7 +515,7 @@ LOOP:
if
wsFlag
{
err
:=
wsConn
.
writePing
()
if
err
!=
nil
{
log
.
Warn
(
"wsConn.writePing() = error{%s}"
,
err
)
log
.
Warn
f
(
"wsConn.writePing() = error{%s}"
,
err
)
}
}
s
.
listener
.
OnCron
(
s
)
...
...
@@ -537,14 +536,14 @@ func (s *session) handlePackage() {
const
size
=
64
<<
10
rBuf
:=
make
([]
byte
,
size
)
rBuf
=
rBuf
[
:
runtime
.
Stack
(
rBuf
,
false
)]
log
.
Error
(
"[session.handlePackage] panic session %s: err=%s
\n
%s"
,
s
.
sessionToken
(),
r
,
rBuf
)
log
.
Error
f
(
"[session.handlePackage] panic session %s: err=%s
\n
%s"
,
s
.
sessionToken
(),
r
,
rBuf
)
}
grNum
=
atomic
.
AddInt32
(
&
(
s
.
grNum
),
-
1
)
log
.
Info
(
"%s, [session.handlePackage] gr will exit now, left gr num %d"
,
s
.
sessionToken
(),
grNum
)
log
.
Info
f
(
"%s, [session.handlePackage] gr will exit now, left gr num %d"
,
s
.
sessionToken
(),
grNum
)
s
.
stop
()
if
err
!=
nil
{
log
.
Error
(
"%s, [session.handlePackage] error:%+v"
,
s
.
sessionToken
(),
err
)
log
.
Error
f
(
"%s, [session.handlePackage] error:%+v"
,
s
.
sessionToken
(),
err
)
s
.
listener
.
OnError
(
s
,
err
)
}
}()
...
...
@@ -601,7 +600,7 @@ func (s *session) handleTCPPackage() error {
if
netError
,
ok
=
perrors
.
Cause
(
err
)
.
(
net
.
Error
);
ok
&&
netError
.
Timeout
()
{
break
}
log
.
Error
(
"%s, [session.conn.read] = error:%+v"
,
s
.
sessionToken
(),
err
)
log
.
Error
f
(
"%s, [session.conn.read] = error:%+v"
,
s
.
sessionToken
(),
err
)
// for (Codec)OnErr
// s.errFlag = true
exit
=
true
...
...
@@ -625,7 +624,7 @@ func (s *session) handleTCPPackage() error {
err
=
perrors
.
Errorf
(
"pkgLen %d > session max message len %d"
,
pkgLen
,
s
.
maxMsgLen
)
}
if
err
!=
nil
{
log
.
Warn
(
"%s, [session.handleTCPPackage] = len{%d}, error:%+v"
,
log
.
Warn
f
(
"%s, [session.handleTCPPackage] = len{%d}, error:%+v"
,
s
.
sessionToken
(),
pkgLen
,
err
)
// for (Codec)OnErr
// s.errFlag = true
...
...
@@ -673,39 +672,39 @@ func (s *session) handleUDPPackage() error {
}
bufLen
,
addr
,
err
=
conn
.
read
(
buf
)
log
.
Debug
(
"conn.read() = bufLen:%d, addr:%#v, err:%+v"
,
bufLen
,
addr
,
err
)
log
.
Debug
f
(
"conn.read() = bufLen:%d, addr:%#v, err:%+v"
,
bufLen
,
addr
,
err
)
if
netError
,
ok
=
perrors
.
Cause
(
err
)
.
(
net
.
Error
);
ok
&&
netError
.
Timeout
()
{
continue
}
if
err
!=
nil
{
log
.
Error
(
"%s, [session.handleUDPPackage] = len{%d}, error{%+s}"
,
log
.
Error
f
(
"%s, [session.handleUDPPackage] = len{%d}, error{%+s}"
,
s
.
sessionToken
(),
bufLen
,
err
)
err
=
perrors
.
Wrapf
(
err
,
"conn.read()"
)
break
}
if
bufLen
==
0
{
log
.
Error
(
"conn.read() = bufLen:%d, addr:%s, err:%+v"
,
bufLen
,
addr
,
err
)
log
.
Error
f
(
"conn.read() = bufLen:%d, addr:%s, err:%+v"
,
bufLen
,
addr
,
err
)
continue
}
if
bufLen
==
len
(
connectPingPackage
)
&&
bytes
.
Equal
(
connectPingPackage
,
buf
[
:
bufLen
])
{
log
.
Info
(
"got %s connectPingPackage"
,
addr
)
log
.
Info
f
(
"got %s connectPingPackage"
,
addr
)
continue
}
pkg
,
pkgLen
,
err
=
s
.
reader
.
Read
(
s
,
buf
[
:
bufLen
])
log
.
Debug
(
"s.reader.Read() = pkg:%#v, pkgLen:%d, err:%+v"
,
pkg
,
pkgLen
,
err
)
log
.
Debug
f
(
"s.reader.Read() = pkg:%#v, pkgLen:%d, err:%+v"
,
pkg
,
pkgLen
,
err
)
if
err
==
nil
&&
s
.
maxMsgLen
>
0
&&
bufLen
>
int
(
s
.
maxMsgLen
)
{
err
=
perrors
.
Errorf
(
"Message Too Long, bufLen %d, session max message len %d"
,
bufLen
,
s
.
maxMsgLen
)
}
if
err
!=
nil
{
log
.
Warn
(
"%s, [session.handleUDPPackage] = len{%d}, error:%+v"
,
log
.
Warn
f
(
"%s, [session.handleUDPPackage] = len{%d}, error:%+v"
,
s
.
sessionToken
(),
pkgLen
,
err
)
continue
}
if
pkgLen
==
0
{
log
.
Error
(
"s.reader.Read() = pkg:%#v, pkgLen:%d, err:%+v"
,
pkg
,
pkgLen
,
err
)
log
.
Error
f
(
"s.reader.Read() = pkg:%#v, pkgLen:%d, err:%+v"
,
pkg
,
pkgLen
,
err
)
continue
}
...
...
@@ -738,7 +737,7 @@ func (s *session) handleWSPackage() error {
continue
}
if
err
!=
nil
{
log
.
Warn
(
"%s, [session.handleWSPackage] = error{%+s}"
,
log
.
Warn
f
(
"%s, [session.handleWSPackage] = error{%+s}"
,
s
.
sessionToken
(),
err
)
// s.errFlag = true
return
perrors
.
WithStack
(
err
)
...
...
@@ -750,7 +749,7 @@ func (s *session) handleWSPackage() error {
err
=
perrors
.
Errorf
(
"Message Too Long, length %d, session max message len %d"
,
length
,
s
.
maxMsgLen
)
}
if
err
!=
nil
{
log
.
Warn
(
"%s, [session.handleWSPackage] = len{%d}, error:%+v"
,
log
.
Warn
f
(
"%s, [session.handleWSPackage] = len{%d}, error:%+v"
,
s
.
sessionToken
(),
length
,
err
)
continue
}
...
...
@@ -802,5 +801,5 @@ func (s *session) gc() {
// or (session)handleLoop automatically. It's thread safe.
func
(
s
*
session
)
Close
()
{
s
.
stop
()
log
.
Info
(
"%s closed now. its current gr num is %d"
,
s
.
sessionToken
(),
atomic
.
LoadInt32
(
&
(
s
.
grNum
)))
log
.
Info
f
(
"%s closed now. its current gr num is %d"
,
s
.
sessionToken
(),
atomic
.
LoadInt32
(
&
(
s
.
grNum
)))
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment