Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in / Register
Toggle navigation
G
getty
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
wei.xuan
getty
Commits
4cf247e1
Commit
4cf247e1
authored
Feb 03, 2017
by
AlexStocks
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
add compression type and delete this
parent
8abb0aa7
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
399 additions
and
383 deletions
+399
-383
change_log.md
change_log.md
+5
-1
client.go
client.go
+62
-62
conn.go
conn.go
+101
-89
server.go
server.go
+46
-46
session.go
session.go
+185
-185
No files found.
change_log.md
View file @
4cf247e1
...
...
@@ -14,7 +14,11 @@
-
2017/02/03
> 1 Session struct -> session struct and add Session interface
>
> 2 version: 0.7.0
> 2 change receiver name from this to a alphabet letter
>
> 3 add compression type
>
> 4 version: 0.7.0
-
2016/11/19
> 1 add conn.go:(*gettyWSConn) setCompressType to add zip compress feature for ws connection
...
...
client.go
View file @
4cf247e1
...
...
@@ -97,17 +97,17 @@ func NewWSSClient(connNum int, connInterval time.Duration, serverAddr string, ce
}
}
func
(
this
*
Client
)
dialTCP
()
Session
{
func
(
c
*
Client
)
dialTCP
()
Session
{
var
(
err
error
conn
net
.
Conn
)
for
{
if
this
.
IsClosed
()
{
if
c
.
IsClosed
()
{
return
nil
}
conn
,
err
=
net
.
DialTimeout
(
"tcp"
,
this
.
addr
,
connectTimeout
)
conn
,
err
=
net
.
DialTimeout
(
"tcp"
,
c
.
addr
,
connectTimeout
)
if
err
==
nil
&&
conn
.
LocalAddr
()
.
String
()
==
conn
.
RemoteAddr
()
.
String
()
{
err
=
errSelfConnect
}
...
...
@@ -115,13 +115,13 @@ func (this *Client) dialTCP() Session {
return
NewTCPSession
(
conn
)
}
log
.
Info
(
"net.DialTimeout(addr:%s, timeout:%v) = error{%v}"
,
this
.
addr
,
err
)
time
.
Sleep
(
this
.
interval
)
log
.
Info
(
"net.DialTimeout(addr:%s, timeout:%v) = error{%v}"
,
c
.
addr
,
err
)
time
.
Sleep
(
c
.
interval
)
continue
}
}
func
(
this
*
Client
)
dialWS
()
Session
{
func
(
c
*
Client
)
dialWS
()
Session
{
var
(
err
error
dialer
websocket
.
Dialer
...
...
@@ -131,10 +131,10 @@ func (this *Client) dialWS() Session {
dialer
.
EnableCompression
=
true
for
{
if
this
.
IsClosed
()
{
if
c
.
IsClosed
()
{
return
nil
}
conn
,
_
,
err
=
dialer
.
Dial
(
this
.
addr
,
nil
)
conn
,
_
,
err
=
dialer
.
Dial
(
c
.
addr
,
nil
)
if
err
==
nil
&&
conn
.
LocalAddr
()
.
String
()
==
conn
.
RemoteAddr
()
.
String
()
{
err
=
errSelfConnect
}
...
...
@@ -147,13 +147,13 @@ func (this *Client) dialWS() Session {
return
session
}
log
.
Info
(
"websocket.dialer.Dial(addr:%s) = error{%v}"
,
this
.
addr
,
err
)
time
.
Sleep
(
this
.
interval
)
log
.
Info
(
"websocket.dialer.Dial(addr:%s) = error{%v}"
,
c
.
addr
,
err
)
time
.
Sleep
(
c
.
interval
)
continue
}
}
func
(
this
*
Client
)
dialWSS
()
Session
{
func
(
c
*
Client
)
dialWSS
()
Session
{
var
(
err
error
certPem
[]
byte
...
...
@@ -164,9 +164,9 @@ func (this *Client) dialWSS() Session {
)
dialer
.
EnableCompression
=
true
certPem
,
err
=
ioutil
.
ReadFile
(
this
.
certFile
)
certPem
,
err
=
ioutil
.
ReadFile
(
c
.
certFile
)
if
err
!=
nil
{
panic
(
fmt
.
Errorf
(
"ioutil.ReadFile(certFile{%s}) = err{%#v}"
,
this
.
certFile
,
err
))
panic
(
fmt
.
Errorf
(
"ioutil.ReadFile(certFile{%s}) = err{%#v}"
,
c
.
certFile
,
err
))
}
certPool
=
x509
.
NewCertPool
()
if
ok
:=
certPool
.
AppendCertsFromPEM
(
certPem
);
!
ok
{
...
...
@@ -176,10 +176,10 @@ func (this *Client) dialWSS() Session {
// dialer.EnableCompression = true
dialer
.
TLSClientConfig
=
&
tls
.
Config
{
RootCAs
:
certPool
}
for
{
if
this
.
IsClosed
()
{
if
c
.
IsClosed
()
{
return
nil
}
conn
,
_
,
err
=
dialer
.
Dial
(
this
.
addr
,
nil
)
conn
,
_
,
err
=
dialer
.
Dial
(
c
.
addr
,
nil
)
if
err
==
nil
&&
conn
.
LocalAddr
()
.
String
()
==
conn
.
RemoteAddr
()
.
String
()
{
err
=
errSelfConnect
}
...
...
@@ -192,56 +192,56 @@ func (this *Client) dialWSS() Session {
return
session
}
log
.
Info
(
"websocket.dialer.Dial(addr:%s) = error{%v}"
,
this
.
addr
,
err
)
time
.
Sleep
(
this
.
interval
)
log
.
Info
(
"websocket.dialer.Dial(addr:%s) = error{%v}"
,
c
.
addr
,
err
)
time
.
Sleep
(
c
.
interval
)
continue
}
}
func
(
this
*
Client
)
dial
()
Session
{
if
strings
.
HasPrefix
(
this
.
addr
,
"ws"
)
{
return
this
.
dialWS
()
}
else
if
strings
.
HasPrefix
(
this
.
addr
,
"wss"
)
{
return
this
.
dialWSS
()
func
(
c
*
Client
)
dial
()
Session
{
if
strings
.
HasPrefix
(
c
.
addr
,
"ws"
)
{
return
c
.
dialWS
()
}
else
if
strings
.
HasPrefix
(
c
.
addr
,
"wss"
)
{
return
c
.
dialWSS
()
}
return
this
.
dialTCP
()
return
c
.
dialTCP
()
}
func
(
this
*
Client
)
sessionNum
()
int
{
func
(
c
*
Client
)
sessionNum
()
int
{
var
num
int
this
.
Lock
()
for
s
:=
range
this
.
sessionMap
{
c
.
Lock
()
for
s
:=
range
c
.
sessionMap
{
if
s
.
IsClosed
()
{
delete
(
this
.
sessionMap
,
s
)
delete
(
c
.
sessionMap
,
s
)
}
}
num
=
len
(
this
.
sessionMap
)
this
.
Unlock
()
num
=
len
(
c
.
sessionMap
)
c
.
Unlock
()
return
num
}
func
(
this
*
Client
)
connect
()
{
func
(
c
*
Client
)
connect
()
{
var
(
err
error
session
Session
)
for
{
session
=
this
.
dial
()
session
=
c
.
dial
()
if
session
==
nil
{
// client has been closed
break
}
err
=
this
.
newSession
(
session
)
err
=
c
.
newSession
(
session
)
if
err
==
nil
{
// session.RunEventLoop()
session
.
(
*
session
)
.
run
()
this
.
Lock
()
this
.
sessionMap
[
session
]
=
gxsync
.
Empty
{}
this
.
Unlock
()
c
.
Lock
()
c
.
sessionMap
[
session
]
=
gxsync
.
Empty
{}
c
.
Unlock
()
break
}
// don't distinguish between tcp connection and websocket connection. Because
...
...
@@ -250,27 +250,27 @@ func (this *Client) connect() {
}
}
func
(
this
*
Client
)
RunEventLoop
(
newSession
NewSessionCallback
)
{
this
.
Lock
()
this
.
newSession
=
newSession
this
.
Unlock
()
func
(
c
*
Client
)
RunEventLoop
(
newSession
NewSessionCallback
)
{
c
.
Lock
()
c
.
newSession
=
newSession
c
.
Unlock
()
this
.
wg
.
Add
(
1
)
c
.
wg
.
Add
(
1
)
go
func
()
{
var
num
,
max
,
times
int
defer
this
.
wg
.
Done
()
defer
c
.
wg
.
Done
()
this
.
Lock
()
max
=
this
.
number
this
.
Unlock
()
c
.
Lock
()
max
=
c
.
number
c
.
Unlock
()
// log.Info("maximum client connection number:%d", max)
for
{
if
this
.
IsClosed
()
{
log
.
Warn
(
"client{peer:%s} goroutine exit now."
,
this
.
addr
)
if
c
.
IsClosed
()
{
log
.
Warn
(
"client{peer:%s} goroutine exit now."
,
c
.
addr
)
break
}
num
=
this
.
sessionNum
()
num
=
c
.
sessionNum
()
// log.Info("current client connction number:%d", num)
if
max
<=
num
{
times
++
...
...
@@ -281,39 +281,39 @@ func (this *Client) RunEventLoop(newSession NewSessionCallback) {
continue
}
times
=
0
this
.
connect
()
// time.Sleep(
this.interval) // build this
.number connections asap
c
.
connect
()
// time.Sleep(
c.interval) // build c
.number connections asap
}
}()
}
func
(
this
*
Client
)
stop
()
{
func
(
c
*
Client
)
stop
()
{
select
{
case
<-
this
.
done
:
case
<-
c
.
done
:
return
default
:
this
.
Once
.
Do
(
func
()
{
close
(
this
.
done
)
this
.
Lock
()
for
s
:=
range
this
.
sessionMap
{
c
.
Once
.
Do
(
func
()
{
close
(
c
.
done
)
c
.
Lock
()
for
s
:=
range
c
.
sessionMap
{
s
.
Close
()
}
this
.
sessionMap
=
nil
this
.
Unlock
()
c
.
sessionMap
=
nil
c
.
Unlock
()
})
}
}
func
(
this
*
Client
)
IsClosed
()
bool
{
func
(
c
*
Client
)
IsClosed
()
bool
{
select
{
case
<-
this
.
done
:
case
<-
c
.
done
:
return
true
default
:
return
false
}
}
func
(
this
*
Client
)
Close
()
{
this
.
stop
()
this
.
wg
.
Wait
()
func
(
c
*
Client
)
Close
()
{
c
.
stop
()
c
.
wg
.
Wait
()
}
conn.go
View file @
4cf247e1
...
...
@@ -36,12 +36,15 @@ var (
// compress
/////////////////////////////////////////
type
CompressType
byte
type
CompressType
int
const
(
CompressNone
CompressType
=
0x00
CompressZip
=
0x01
CompressSnappy
=
0x02
CompressNone
CompressType
=
flate
.
NoCompression
// 0
CompressZip
=
flate
.
DefaultCompression
// -1
CompressBestSpeed
=
flate
.
BestSpeed
// 1
CompressBestCompression
=
flate
.
BestCompression
// 9
CompressHuffman
=
flate
.
HuffmanOnly
// -2
CompressSnappy
=
10
)
/////////////////////////////////////////
...
...
@@ -95,65 +98,65 @@ type gettyConn struct {
peer
string
// peer address
}
func
(
this
*
gettyConn
)
ID
()
uint32
{
return
this
.
id
func
(
c
*
gettyConn
)
ID
()
uint32
{
return
c
.
id
}
func
(
this
*
gettyConn
)
LocalAddr
()
string
{
return
this
.
local
func
(
c
*
gettyConn
)
LocalAddr
()
string
{
return
c
.
local
}
func
(
this
*
gettyConn
)
RemoteAddr
()
string
{
return
this
.
peer
func
(
c
*
gettyConn
)
RemoteAddr
()
string
{
return
c
.
peer
}
func
(
this
*
gettyConn
)
incReadPkgCount
()
{
atomic
.
AddUint32
(
&
this
.
readPkgCount
,
1
)
func
(
c
*
gettyConn
)
incReadPkgCount
()
{
atomic
.
AddUint32
(
&
c
.
readPkgCount
,
1
)
}
func
(
this
*
gettyConn
)
incWritePkgCount
()
{
atomic
.
AddUint32
(
&
this
.
writePkgCount
,
1
)
func
(
c
*
gettyConn
)
incWritePkgCount
()
{
atomic
.
AddUint32
(
&
c
.
writePkgCount
,
1
)
}
func
(
this
*
gettyConn
)
UpdateActive
()
{
atomic
.
StoreInt64
(
&
(
this
.
active
),
int64
(
time
.
Since
(
launchTime
)))
func
(
c
*
gettyConn
)
UpdateActive
()
{
atomic
.
StoreInt64
(
&
(
c
.
active
),
int64
(
time
.
Since
(
launchTime
)))
}
func
(
this
*
gettyConn
)
GetActive
()
time
.
Time
{
return
launchTime
.
Add
(
time
.
Duration
(
atomic
.
LoadInt64
(
&
(
this
.
active
))))
func
(
c
*
gettyConn
)
GetActive
()
time
.
Time
{
return
launchTime
.
Add
(
time
.
Duration
(
atomic
.
LoadInt64
(
&
(
c
.
active
))))
}
func
(
this
*
gettyConn
)
Write
([]
byte
)
error
{
func
(
c
*
gettyConn
)
Write
([]
byte
)
error
{
return
nil
}
func
(
this
*
gettyConn
)
close
(
int
)
{}
func
(
c
*
gettyConn
)
close
(
int
)
{}
func
(
this
gettyConn
)
readDeadline
()
time
.
Duration
{
return
this
.
rDeadline
func
(
c
gettyConn
)
readDeadline
()
time
.
Duration
{
return
c
.
rDeadline
}
func
(
this
*
gettyConn
)
SetReadDeadline
(
rDeadline
time
.
Duration
)
{
func
(
c
*
gettyConn
)
SetReadDeadline
(
rDeadline
time
.
Duration
)
{
if
rDeadline
<
1
{
panic
(
"@rDeadline < 1"
)
}
this
.
rDeadline
=
rDeadline
if
this
.
wDeadline
==
0
{
this
.
wDeadline
=
rDeadline
c
.
rDeadline
=
rDeadline
if
c
.
wDeadline
==
0
{
c
.
wDeadline
=
rDeadline
}
}
func
(
this
gettyConn
)
writeDeadline
()
time
.
Duration
{
return
this
.
wDeadline
func
(
c
gettyConn
)
writeDeadline
()
time
.
Duration
{
return
c
.
wDeadline
}
func
(
this
*
gettyConn
)
SetWriteDeadline
(
wDeadline
time
.
Duration
)
{
func
(
c
*
gettyConn
)
SetWriteDeadline
(
wDeadline
time
.
Duration
)
{
if
wDeadline
<
1
{
panic
(
"@wDeadline < 1"
)
}
this
.
wDeadline
=
wDeadline
c
.
wDeadline
=
wDeadline
}
/////////////////////////////////////////
...
...
@@ -199,17 +202,17 @@ type writeFlusher struct {
flusher
*
flate
.
Writer
}
func
(
t
his
*
writeFlusher
)
Write
(
p
[]
byte
)
(
int
,
error
)
{
func
(
t
*
writeFlusher
)
Write
(
p
[]
byte
)
(
int
,
error
)
{
var
(
n
int
err
error
)
n
,
err
=
t
his
.
flusher
.
Write
(
p
)
n
,
err
=
t
.
flusher
.
Write
(
p
)
if
err
!=
nil
{
return
n
,
err
}
if
err
:=
t
his
.
flusher
.
Flush
();
err
!=
nil
{
if
err
:=
t
.
flusher
.
Flush
();
err
!=
nil
{
return
0
,
err
}
...
...
@@ -217,60 +220,69 @@ func (this *writeFlusher) Write(p []byte) (int, error) {
}
// set compress type(tcp: zip/snappy, websocket:zip)
func
(
t
his
*
gettyTCPConn
)
SetCompressType
(
t
CompressType
)
{
switch
{
case
t
==
CompressZip
:
t
his
.
reader
=
flate
.
NewReader
(
this
.
conn
)
func
(
t
*
gettyTCPConn
)
SetCompressType
(
c
CompressType
)
{
switch
c
{
case
CompressNone
,
CompressZip
,
CompressBestSpeed
,
CompressBestCompression
,
CompressHuffman
:
t
.
reader
=
flate
.
NewReader
(
t
.
conn
)
w
,
err
:=
flate
.
NewWriter
(
t
his
.
conn
,
flate
.
DefaultCompression
)
w
,
err
:=
flate
.
NewWriter
(
t
.
conn
,
int
(
c
)
)
if
err
!=
nil
{
panic
(
fmt
.
Sprintf
(
"flate.NewReader(flate.DefaultCompress) = err(%s)"
,
err
))
}
t
his
.
writer
=
&
writeFlusher
{
flusher
:
w
}
t
.
writer
=
&
writeFlusher
{
flusher
:
w
}
case
t
==
CompressSnappy
:
this
.
reader
=
snappy
.
NewReader
(
this
.
conn
)
this
.
writer
=
snappy
.
NewWriter
(
this
.
conn
)
case
CompressSnappy
:
t
.
reader
=
snappy
.
NewReader
(
t
.
conn
)
// t.writer = snappy.NewWriter(t.conn)
t
.
writer
=
snappy
.
NewBufferedWriter
(
t
.
conn
)
default
:
panic
(
fmt
.
Sprintf
(
"illegal comparess type %d"
,
c
))
}
}
// tcp connection read
func
(
t
his
*
gettyTCPConn
)
read
(
p
[]
byte
)
(
int
,
error
)
{
// if t
his
.conn == nil {
func
(
t
*
gettyTCPConn
)
read
(
p
[]
byte
)
(
int
,
error
)
{
// if t.conn == nil {
// return 0, ErrInvalidConnection
// }
// atomic.AddUint32(&t
his
.readCount, 1)
// l, e := t
his
.conn.Read(p)
l
,
e
:=
t
his
.
reader
.
Read
(
p
)
atomic
.
AddUint32
(
&
t
his
.
readCount
,
uint32
(
l
))
// atomic.AddUint32(&t.readCount, 1)
// l, e := t.conn.Read(p)
l
,
e
:=
t
.
reader
.
Read
(
p
)
atomic
.
AddUint32
(
&
t
.
readCount
,
uint32
(
l
))
return
l
,
e
}
// tcp connection write
func
(
t
his
*
gettyTCPConn
)
Write
(
p
[]
byte
)
error
{
// if t
his
.conn == nil {
func
(
t
*
gettyTCPConn
)
Write
(
p
[]
byte
)
error
{
// if t.conn == nil {
// return 0, ErrInvalidConnection
// }
// atomic.AddUint32(&t
his
.writeCount, 1)
atomic
.
AddUint32
(
&
t
his
.
writeCount
,
(
uint32
)(
len
(
p
)))
// _, err := t
his
.conn.Write(p)
_
,
err
:=
t
his
.
writer
.
Write
(
p
)
// atomic.AddUint32(&t.writeCount, 1)
atomic
.
AddUint32
(
&
t
.
writeCount
,
(
uint32
)(
len
(
p
)))
// _, err := t.conn.Write(p)
_
,
err
:=
t
.
writer
.
Write
(
p
)
return
err
}
// close tcp connection
func
(
t
his
*
gettyTCPConn
)
close
(
waitSec
int
)
{
// if tcpConn, ok := t
his
.conn.(*net.TCPConn); ok {
func
(
t
*
gettyTCPConn
)
close
(
waitSec
int
)
{
// if tcpConn, ok := t.conn.(*net.TCPConn); ok {
// tcpConn.SetLinger(0)
// }
if
this
.
conn
!=
nil
{
this
.
conn
.
(
*
net
.
TCPConn
)
.
SetLinger
(
waitSec
)
this
.
conn
.
Close
()
this
.
conn
=
nil
if
t
.
conn
!=
nil
{
if
writer
,
ok
:=
t
.
writer
.
(
*
snappy
.
Writer
);
ok
{
if
err
:=
writer
.
Close
();
err
!=
nil
{
log
.
Error
(
"snappy.Writer.Close() = error{%v}"
,
err
)
}
}
t
.
conn
.
(
*
net
.
TCPConn
)
.
SetLinger
(
waitSec
)
t
.
conn
.
Close
()
t
.
conn
=
nil
}
}
...
...
@@ -312,44 +324,44 @@ func newGettyWSConn(conn *websocket.Conn) *gettyWSConn {
return
gettyWSConn
}
// set compress type
(tcp: zip/snappy, websocket:zip)
func
(
this
*
gettyWSConn
)
SetCompressType
(
t
CompressType
)
{
switch
{
case
t
==
CompressZip
:
this
.
conn
.
EnableWriteCompression
(
true
)
case
t
==
CompressSnappy
:
this
.
conn
.
EnableWriteCompression
(
true
)
// set compress type
func
(
w
*
gettyWSConn
)
SetCompressType
(
c
CompressType
)
{
switch
c
{
case
CompressNone
,
CompressZip
,
CompressBestSpeed
,
CompressBestCompression
,
CompressHuffman
:
w
.
conn
.
EnableWriteCompression
(
true
)
w
.
conn
.
SetCompressionLevel
(
int
(
c
))
default
:
this
.
conn
.
EnableWriteCompression
(
false
)
panic
(
fmt
.
Sprintf
(
"illegal comparess type %d"
,
c
)
)
}
}
func
(
this
*
gettyWSConn
)
handlePing
(
message
string
)
error
{
err
:=
this
.
conn
.
WriteMessage
(
websocket
.
PongMessage
,
[]
byte
(
message
))
func
(
w
*
gettyWSConn
)
handlePing
(
message
string
)
error
{
err
:=
w
.
conn
.
WriteMessage
(
websocket
.
PongMessage
,
[]
byte
(
message
))
if
err
==
websocket
.
ErrCloseSent
{
err
=
nil
}
else
if
e
,
ok
:=
err
.
(
net
.
Error
);
ok
&&
e
.
Temporary
()
{
err
=
nil
}
if
err
==
nil
{
this
.
UpdateActive
()
w
.
UpdateActive
()
}
return
err
}
func
(
this
*
gettyWSConn
)
handlePong
(
string
)
error
{
this
.
UpdateActive
()
func
(
w
*
gettyWSConn
)
handlePong
(
string
)
error
{
w
.
UpdateActive
()
return
nil
}
// websocket connection read
func
(
this
*
gettyWSConn
)
read
()
([]
byte
,
error
)
{
//
this.conn.SetReadDeadline(time.Now().Add(this
.rDeadline))
_
,
b
,
e
:=
this
.
conn
.
ReadMessage
()
// the first return value is message type.
func
(
w
*
gettyWSConn
)
read
()
([]
byte
,
error
)
{
//
w.conn.SetReadDeadline(time.Now().Add(w
.rDeadline))
_
,
b
,
e
:=
w
.
conn
.
ReadMessage
()
// the first return value is message type.
if
e
==
nil
{
// atomic.AddUint32(&
this
.readCount, (uint32)(l))
atomic
.
AddUint32
(
&
this
.
readPkgCount
,
1
)
// atomic.AddUint32(&
w
.readCount, (uint32)(l))
atomic
.
AddUint32
(
&
w
.
readPkgCount
,
1
)
}
else
{
if
websocket
.
IsUnexpectedCloseError
(
e
,
websocket
.
CloseGoingAway
)
{
log
.
Warn
(
"websocket unexpected close error: %v"
,
e
)
...
...
@@ -360,20 +372,20 @@ func (this *gettyWSConn) read() ([]byte, error) {
}
// websocket connection write
func
(
this
*
gettyWSConn
)
Write
(
p
[]
byte
)
error
{
// atomic.AddUint32(&
this
.writeCount, 1)
atomic
.
AddUint32
(
&
this
.
writeCount
,
(
uint32
)(
len
(
p
)))
//
this.conn.SetWriteDeadline(time.Now().Add(this
.wDeadline))
return
this
.
conn
.
WriteMessage
(
websocket
.
BinaryMessage
,
p
)
func
(
w
*
gettyWSConn
)
Write
(
p
[]
byte
)
error
{
// atomic.AddUint32(&
w
.writeCount, 1)
atomic
.
AddUint32
(
&
w
.
writeCount
,
(
uint32
)(
len
(
p
)))
//
w.conn.SetWriteDeadline(time.Now().Add(w
.wDeadline))
return
w
.
conn
.
WriteMessage
(
websocket
.
BinaryMessage
,
p
)
}
func
(
this
*
gettyWSConn
)
writePing
()
error
{
return
this
.
conn
.
WriteMessage
(
websocket
.
PingMessage
,
[]
byte
{})
func
(
w
*
gettyWSConn
)
writePing
()
error
{
return
w
.
conn
.
WriteMessage
(
websocket
.
PingMessage
,
[]
byte
{})
}
// close websocket connection
func
(
this
*
gettyWSConn
)
close
(
waitSec
int
)
{
this
.
conn
.
WriteMessage
(
websocket
.
CloseMessage
,
[]
byte
(
"bye-bye!!!"
))
this
.
conn
.
UnderlyingConn
()
.
(
*
net
.
TCPConn
)
.
SetLinger
(
waitSec
)
this
.
conn
.
Close
()
func
(
w
*
gettyWSConn
)
close
(
waitSec
int
)
{
w
.
conn
.
WriteMessage
(
websocket
.
CloseMessage
,
[]
byte
(
"bye-bye!!!"
))
w
.
conn
.
UnderlyingConn
()
.
(
*
net
.
TCPConn
)
.
SetLinger
(
waitSec
)
w
.
conn
.
Close
()
}
server.go
View file @
4cf247e1
...
...
@@ -43,23 +43,23 @@ func NewServer() *Server {
return
&
Server
{
done
:
make
(
chan
gxsync
.
Empty
)}
}
func
(
thi
s
*
Server
)
stop
()
{
func
(
s
*
Server
)
stop
()
{
select
{
case
<-
thi
s
.
done
:
case
<-
s
.
done
:
return
default
:
thi
s
.
Once
.
Do
(
func
()
{
close
(
thi
s
.
done
)
s
.
Once
.
Do
(
func
()
{
close
(
s
.
done
)
// 把listener.Close放在这里,既能防止多次关闭调用,
// 又能及时让Server因accept返回错误而从RunEventloop退出
thi
s
.
listener
.
Close
()
s
.
listener
.
Close
()
})
}
}
func
(
thi
s
*
Server
)
IsClosed
()
bool
{
func
(
s
*
Server
)
IsClosed
()
bool
{
select
{
case
<-
thi
s
.
done
:
case
<-
s
.
done
:
return
true
default
:
return
false
...
...
@@ -67,46 +67,46 @@ func (this *Server) IsClosed() bool {
}
// (Server)Bind's functionality is equal to (Server)Listen.
func
(
thi
s
*
Server
)
Bind
(
network
string
,
host
string
,
port
int
)
error
{
func
(
s
*
Server
)
Bind
(
network
string
,
host
string
,
port
int
)
error
{
if
port
<=
0
{
return
errors
.
New
(
"port<=0 illegal"
)
}
return
thi
s
.
Listen
(
network
,
gxnet
.
HostAddress
(
host
,
port
))
return
s
.
Listen
(
network
,
gxnet
.
HostAddress
(
host
,
port
))
}
// net.ipv4.tcp_max_syn_backlog
// net.ipv4.tcp_timestamps
// net.ipv4.tcp_tw_recycle
func
(
thi
s
*
Server
)
Listen
(
network
string
,
addr
string
)
error
{
func
(
s
*
Server
)
Listen
(
network
string
,
addr
string
)
error
{
listener
,
err
:=
net
.
Listen
(
network
,
addr
)
if
err
!=
nil
{
return
err
}
thi
s
.
addr
=
addr
thi
s
.
listener
=
listener
s
.
addr
=
addr
s
.
listener
=
listener
return
nil
}
func
(
thi
s
*
Server
)
RunEventloop
(
newSession
NewSessionCallback
)
{
thi
s
.
wg
.
Add
(
1
)
func
(
s
*
Server
)
RunEventloop
(
newSession
NewSessionCallback
)
{
s
.
wg
.
Add
(
1
)
go
func
()
{
defer
thi
s
.
wg
.
Done
()
defer
s
.
wg
.
Done
()
var
(
err
error
client
Session
delay
time
.
Duration
)
for
{
if
thi
s
.
IsClosed
()
{
log
.
Warn
(
"Server{%s} stop acceptting client connect request."
,
thi
s
.
addr
)
if
s
.
IsClosed
()
{
log
.
Warn
(
"Server{%s} stop acceptting client connect request."
,
s
.
addr
)
return
}
if
delay
!=
0
{
time
.
Sleep
(
delay
)
}
client
,
err
=
thi
s
.
accept
(
newSession
)
client
,
err
=
s
.
accept
(
newSession
)
if
err
!=
nil
{
if
netErr
,
ok
:=
err
.
(
net
.
Error
);
ok
&&
netErr
.
Temporary
()
{
if
delay
==
0
{
...
...
@@ -119,7 +119,7 @@ func (this *Server) RunEventloop(newSession NewSessionCallback) {
}
continue
}
log
.
Warn
(
"Server{%s}.Accept() = err {%#v}"
,
thi
s
.
addr
,
err
)
log
.
Warn
(
"Server{%s}.Accept() = err {%#v}"
,
s
.
addr
,
err
)
continue
}
delay
=
0
...
...
@@ -149,20 +149,20 @@ func newWSHandler(server *Server, newSession NewSessionCallback) *wsHandler {
}
}
func
(
thi
s
*
wsHandler
)
serveWSRequest
(
w
http
.
ResponseWriter
,
r
*
http
.
Request
)
{
func
(
s
*
wsHandler
)
serveWSRequest
(
w
http
.
ResponseWriter
,
r
*
http
.
Request
)
{
if
r
.
Method
!=
"GET"
{
// w.WriteHeader(http.StatusMethodNotAllowed)
http
.
Error
(
w
,
"Method not allowed"
,
405
)
return
}
if
thi
s
.
server
.
IsClosed
()
{
if
s
.
server
.
IsClosed
()
{
http
.
Error
(
w
,
"HTTP server is closed(code:500-11)."
,
500
)
log
.
Warn
(
"Server{%s} stop acceptting client connect request."
,
thi
s
.
server
.
addr
)
log
.
Warn
(
"Server{%s} stop acceptting client connect request."
,
s
.
server
.
addr
)
return
}
conn
,
err
:=
thi
s
.
upgrader
.
Upgrade
(
w
,
r
,
nil
)
conn
,
err
:=
s
.
upgrader
.
Upgrade
(
w
,
r
,
nil
)
if
err
!=
nil
{
log
.
Warn
(
"upgrader.Upgrader(http.Request{%#v}) = error{%#v}"
,
r
,
err
)
return
...
...
@@ -173,10 +173,10 @@ func (this *wsHandler) serveWSRequest(w http.ResponseWriter, r *http.Request) {
}
// conn.SetReadLimit(int64(handler.maxMsgLen))
session
:=
NewWSSession
(
conn
)
err
=
thi
s
.
newSession
(
session
)
err
=
s
.
newSession
(
session
)
if
err
!=
nil
{
conn
.
Close
()
log
.
Warn
(
"Server{%s}.newSession(session{%#v}) = err {%#v}"
,
thi
s
.
server
.
addr
,
session
,
err
)
log
.
Warn
(
"Server{%s}.newSession(session{%#v}) = err {%#v}"
,
s
.
server
.
addr
,
session
,
err
)
return
}
if
session
.
(
*
session
)
.
maxMsgLen
>
0
{
...
...
@@ -189,24 +189,24 @@ func (this *wsHandler) serveWSRequest(w http.ResponseWriter, r *http.Request) {
// RunWSEventLoop serve websocket client request
// @newSession: new websocket connection callback
// @path: websocket request url path
func
(
thi
s
*
Server
)
RunWSEventLoop
(
newSession
NewSessionCallback
,
path
string
)
{
thi
s
.
wg
.
Add
(
1
)
func
(
s
*
Server
)
RunWSEventLoop
(
newSession
NewSessionCallback
,
path
string
)
{
s
.
wg
.
Add
(
1
)
go
func
()
{
defer
thi
s
.
wg
.
Done
()
defer
s
.
wg
.
Done
()
var
(
err
error
handler
*
wsHandler
)
handler
=
newWSHandler
(
thi
s
,
newSession
)
handler
=
newWSHandler
(
s
,
newSession
)
handler
.
HandleFunc
(
path
,
handler
.
serveWSRequest
)
err
=
(
&
http
.
Server
{
Addr
:
thi
s
.
addr
,
Addr
:
s
.
addr
,
Handler
:
handler
,
// ReadTimeout: server.HTTPTimeout,
// WriteTimeout: server.HTTPTimeout,
})
.
Serve
(
thi
s
.
listener
)
})
.
Serve
(
s
.
listener
)
if
err
!=
nil
{
log
.
Error
(
"http.Server.Serve(addr{%s}) = err{%#v}"
,
thi
s
.
addr
,
err
)
log
.
Error
(
"http.Server.Serve(addr{%s}) = err{%#v}"
,
s
.
addr
,
err
)
// panic(err)
}
}()
...
...
@@ -215,10 +215,10 @@ func (this *Server) RunWSEventLoop(newSession NewSessionCallback, path string) {
// RunWSEventLoopWithTLS serve websocket client request
// @newSession: new websocket connection callback
// @path: websocket request url path
func
(
thi
s
*
Server
)
RunWSEventLoopWithTLS
(
newSession
NewSessionCallback
,
path
string
,
cert
string
,
priv
string
)
{
thi
s
.
wg
.
Add
(
1
)
func
(
s
*
Server
)
RunWSEventLoopWithTLS
(
newSession
NewSessionCallback
,
path
string
,
cert
string
,
priv
string
)
{
s
.
wg
.
Add
(
1
)
go
func
()
{
defer
thi
s
.
wg
.
Done
()
defer
s
.
wg
.
Done
()
var
(
err
error
config
*
tls
.
Config
...
...
@@ -233,29 +233,29 @@ func (this *Server) RunWSEventLoopWithTLS(newSession NewSessionCallback, path st
return
}
handler
=
newWSHandler
(
thi
s
,
newSession
)
handler
=
newWSHandler
(
s
,
newSession
)
handler
.
HandleFunc
(
path
,
handler
.
serveWSRequest
)
server
=
&
http
.
Server
{
Addr
:
thi
s
.
addr
,
Addr
:
s
.
addr
,
Handler
:
handler
,
// ReadTimeout: server.HTTPTimeout,
// WriteTimeout: server.HTTPTimeout,
}
server
.
SetKeepAlivesEnabled
(
true
)
err
=
server
.
Serve
(
tls
.
NewListener
(
thi
s
.
listener
,
config
))
err
=
server
.
Serve
(
tls
.
NewListener
(
s
.
listener
,
config
))
if
err
!=
nil
{
log
.
Error
(
"http.Server.Serve(addr{%s}) = err{%#v}"
,
thi
s
.
addr
,
err
)
log
.
Error
(
"http.Server.Serve(addr{%s}) = err{%#v}"
,
s
.
addr
,
err
)
panic
(
err
)
}
}()
}
func
(
thi
s
*
Server
)
Listener
()
net
.
Listener
{
return
thi
s
.
listener
func
(
s
*
Server
)
Listener
()
net
.
Listener
{
return
s
.
listener
}
func
(
thi
s
*
Server
)
accept
(
newSession
NewSessionCallback
)
(
Session
,
error
)
{
conn
,
err
:=
thi
s
.
listener
.
Accept
()
func
(
s
*
Server
)
accept
(
newSession
NewSessionCallback
)
(
Session
,
error
)
{
conn
,
err
:=
s
.
listener
.
Accept
()
if
err
!=
nil
{
return
nil
,
err
}
...
...
@@ -274,7 +274,7 @@ func (this *Server) accept(newSession NewSessionCallback) (Session, error) {
return
session
,
nil
}
func
(
thi
s
*
Server
)
Close
()
{
thi
s
.
stop
()
thi
s
.
wg
.
Wait
()
func
(
s
*
Server
)
Close
()
{
s
.
stop
()
s
.
wg
.
Wait
()
}
session.go
View file @
4cf247e1
...
...
@@ -151,39 +151,39 @@ func NewWSSession(conn *websocket.Conn) Session {
return
session
}
func
(
thi
s
*
session
)
Reset
()
{
thi
s
.
name
=
defaultSessionName
thi
s
.
once
=
sync
.
Once
{}
thi
s
.
done
=
make
(
chan
gxsync
.
Empty
)
//
thi
s.errFlag = false
thi
s
.
period
=
period
thi
s
.
wait
=
pendingDuration
thi
s
.
attrs
=
make
(
map
[
string
]
interface
{})
thi
s
.
grNum
=
0
thi
s
.
SetWriteDeadline
(
netIOTimeout
)
thi
s
.
SetReadDeadline
(
netIOTimeout
)
}
// func (
this *session) SetConn(conn net.Conn) { thi
s.gettyConn = newGettyConn(conn) }
func
(
thi
s
*
session
)
Conn
()
net
.
Conn
{
if
tc
,
ok
:=
thi
s
.
Connection
.
(
*
gettyTCPConn
);
ok
{
func
(
s
*
session
)
Reset
()
{
s
.
name
=
defaultSessionName
s
.
once
=
sync
.
Once
{}
s
.
done
=
make
(
chan
gxsync
.
Empty
)
// s.errFlag = false
s
.
period
=
period
s
.
wait
=
pendingDuration
s
.
attrs
=
make
(
map
[
string
]
interface
{})
s
.
grNum
=
0
s
.
SetWriteDeadline
(
netIOTimeout
)
s
.
SetReadDeadline
(
netIOTimeout
)
}
// func (
s *session) SetConn(conn net.Conn) {
s.gettyConn = newGettyConn(conn) }
func
(
s
*
session
)
Conn
()
net
.
Conn
{
if
tc
,
ok
:=
s
.
Connection
.
(
*
gettyTCPConn
);
ok
{
return
tc
.
conn
}
if
wc
,
ok
:=
thi
s
.
Connection
.
(
*
gettyWSConn
);
ok
{
if
wc
,
ok
:=
s
.
Connection
.
(
*
gettyWSConn
);
ok
{
return
wc
.
conn
.
UnderlyingConn
()
}
return
nil
}
func
(
thi
s
*
session
)
gettyConn
()
*
gettyConn
{
if
tc
,
ok
:=
thi
s
.
Connection
.
(
*
gettyTCPConn
);
ok
{
func
(
s
*
session
)
gettyConn
()
*
gettyConn
{
if
tc
,
ok
:=
s
.
Connection
.
(
*
gettyTCPConn
);
ok
{
return
&
(
tc
.
gettyConn
)
}
if
wc
,
ok
:=
thi
s
.
Connection
.
(
*
gettyWSConn
);
ok
{
if
wc
,
ok
:=
s
.
Connection
.
(
*
gettyWSConn
);
ok
{
return
&
(
wc
.
gettyConn
)
}
...
...
@@ -191,14 +191,14 @@ func (this *session) gettyConn() *gettyConn {
}
// return the connect statistic data
func
(
thi
s
*
session
)
Stat
()
string
{
func
(
s
*
session
)
Stat
()
string
{
var
conn
*
gettyConn
if
conn
=
thi
s
.
gettyConn
();
conn
==
nil
{
if
conn
=
s
.
gettyConn
();
conn
==
nil
{
return
""
}
return
fmt
.
Sprintf
(
outputFormat
,
thi
s
.
sessionToken
(),
s
.
sessionToken
(),
atomic
.
LoadUint32
(
&
(
conn
.
readCount
)),
atomic
.
LoadUint32
(
&
(
conn
.
writeCount
)),
atomic
.
LoadUint32
(
&
(
conn
.
readPkgCount
)),
...
...
@@ -207,9 +207,9 @@ func (this *session) Stat() string {
}
// check whether the session has been closed.
func
(
thi
s
*
session
)
IsClosed
()
bool
{
func
(
s
*
session
)
IsClosed
()
bool
{
select
{
case
<-
thi
s
.
done
:
case
<-
s
.
done
:
return
true
default
:
...
...
@@ -218,109 +218,109 @@ func (this *session) IsClosed() bool {
}
// set maximum pacakge length of every pacakge in (EventListener)OnMessage(@pkgs)
func
(
this
*
session
)
SetMaxMsgLen
(
length
int
)
{
thi
s
.
maxMsgLen
=
int32
(
length
)
}
func
(
s
*
session
)
SetMaxMsgLen
(
length
int
)
{
s
.
maxMsgLen
=
int32
(
length
)
}
// set session name
func
(
this
*
session
)
SetName
(
name
string
)
{
thi
s
.
name
=
name
}
func
(
s
*
session
)
SetName
(
name
string
)
{
s
.
name
=
name
}
// set EventListener
func
(
thi
s
*
session
)
SetEventListener
(
listener
EventListener
)
{
thi
s
.
listener
=
listener
func
(
s
*
session
)
SetEventListener
(
listener
EventListener
)
{
s
.
listener
=
listener
}
// set package handler
func
(
thi
s
*
session
)
SetPkgHandler
(
handler
ReadWriter
)
{
thi
s
.
reader
=
handler
thi
s
.
writer
=
handler
//
thi
s.pkgHandler = handler
func
(
s
*
session
)
SetPkgHandler
(
handler
ReadWriter
)
{
s
.
reader
=
handler
s
.
writer
=
handler
// s.pkgHandler = handler
}
// set Reader
func
(
thi
s
*
session
)
SetReader
(
reader
Reader
)
{
thi
s
.
reader
=
reader
func
(
s
*
session
)
SetReader
(
reader
Reader
)
{
s
.
reader
=
reader
}
// set Writer
func
(
thi
s
*
session
)
SetWriter
(
writer
Writer
)
{
thi
s
.
writer
=
writer
func
(
s
*
session
)
SetWriter
(
writer
Writer
)
{
s
.
writer
=
writer
}
// period is in millisecond. Websocket session will send ping frame automatically every peroid.
func
(
thi
s
*
session
)
SetCronPeriod
(
period
int
)
{
func
(
s
*
session
)
SetCronPeriod
(
period
int
)
{
if
period
<
1
{
panic
(
"@period < 1"
)
}
thi
s
.
lock
.
Lock
()
thi
s
.
period
=
time
.
Duration
(
period
)
*
time
.
Millisecond
thi
s
.
lock
.
Unlock
()
s
.
lock
.
Lock
()
s
.
period
=
time
.
Duration
(
period
)
*
time
.
Millisecond
s
.
lock
.
Unlock
()
}
// set @session's read queue size
func
(
thi
s
*
session
)
SetRQLen
(
readQLen
int
)
{
func
(
s
*
session
)
SetRQLen
(
readQLen
int
)
{
if
readQLen
<
1
{
panic
(
"@readQLen < 1"
)
}
thi
s
.
lock
.
Lock
()
thi
s
.
rQ
=
make
(
chan
interface
{},
readQLen
)
log
.
Info
(
"%s, [session.SetRQLen] rQ{len:%d, cap:%d}"
,
this
.
Stat
(),
len
(
this
.
rQ
),
cap
(
thi
s
.
rQ
))
thi
s
.
lock
.
Unlock
()
s
.
lock
.
Lock
()
s
.
rQ
=
make
(
chan
interface
{},
readQLen
)
log
.
Info
(
"%s, [session.SetRQLen] rQ{len:%d, cap:%d}"
,
s
.
Stat
(),
len
(
s
.
rQ
),
cap
(
s
.
rQ
))
s
.
lock
.
Unlock
()
}
// set @session's Write queue size
func
(
thi
s
*
session
)
SetWQLen
(
writeQLen
int
)
{
func
(
s
*
session
)
SetWQLen
(
writeQLen
int
)
{
if
writeQLen
<
1
{
panic
(
"@writeQLen < 1"
)
}
thi
s
.
lock
.
Lock
()
thi
s
.
wQ
=
make
(
chan
interface
{},
writeQLen
)
log
.
Info
(
"%s, [session.SetWQLen] wQ{len:%d, cap:%d}"
,
this
.
Stat
(),
len
(
this
.
wQ
),
cap
(
thi
s
.
wQ
))
thi
s
.
lock
.
Unlock
()
s
.
lock
.
Lock
()
s
.
wQ
=
make
(
chan
interface
{},
writeQLen
)
log
.
Info
(
"%s, [session.SetWQLen] wQ{len:%d, cap:%d}"
,
s
.
Stat
(),
len
(
s
.
wQ
),
cap
(
s
.
wQ
))
s
.
lock
.
Unlock
()
}
// set maximum wait time when session got error or got exit signal
func
(
thi
s
*
session
)
SetWaitTime
(
waitTime
time
.
Duration
)
{
func
(
s
*
session
)
SetWaitTime
(
waitTime
time
.
Duration
)
{
if
waitTime
<
1
{
panic
(
"@wait < 1"
)
}
thi
s
.
lock
.
Lock
()
thi
s
.
wait
=
waitTime
thi
s
.
lock
.
Unlock
()
s
.
lock
.
Lock
()
s
.
wait
=
waitTime
s
.
lock
.
Unlock
()
}
// set attribute of key @session:key
func
(
thi
s
*
session
)
GetAttribute
(
key
string
)
interface
{}
{
func
(
s
*
session
)
GetAttribute
(
key
string
)
interface
{}
{
var
ret
interface
{}
thi
s
.
lock
.
RLock
()
ret
=
thi
s
.
attrs
[
key
]
thi
s
.
lock
.
RUnlock
()
s
.
lock
.
RLock
()
ret
=
s
.
attrs
[
key
]
s
.
lock
.
RUnlock
()
return
ret
}
// get attribute of key @session:key
func
(
thi
s
*
session
)
SetAttribute
(
key
string
,
value
interface
{})
{
thi
s
.
lock
.
Lock
()
thi
s
.
attrs
[
key
]
=
value
thi
s
.
lock
.
Unlock
()
func
(
s
*
session
)
SetAttribute
(
key
string
,
value
interface
{})
{
s
.
lock
.
Lock
()
s
.
attrs
[
key
]
=
value
s
.
lock
.
Unlock
()
}
// delete attribute of key @session:key
func
(
thi
s
*
session
)
RemoveAttribute
(
key
string
)
{
thi
s
.
lock
.
Lock
()
delete
(
thi
s
.
attrs
,
key
)
thi
s
.
lock
.
Unlock
()
func
(
s
*
session
)
RemoveAttribute
(
key
string
)
{
s
.
lock
.
Lock
()
delete
(
s
.
attrs
,
key
)
s
.
lock
.
Unlock
()
}
func
(
thi
s
*
session
)
sessionToken
()
string
{
return
fmt
.
Sprintf
(
"{%s:%d:%s<->%s}"
,
this
.
name
,
this
.
ID
(),
this
.
LocalAddr
(),
thi
s
.
RemoteAddr
())
func
(
s
*
session
)
sessionToken
()
string
{
return
fmt
.
Sprintf
(
"{%s:%d:%s<->%s}"
,
s
.
name
,
s
.
ID
(),
s
.
LocalAddr
(),
s
.
RemoteAddr
())
}
// Queued Write, for handler
func
(
thi
s
*
session
)
WritePkg
(
pkg
interface
{})
error
{
if
thi
s
.
IsClosed
()
{
func
(
s
*
session
)
WritePkg
(
pkg
interface
{})
error
{
if
s
.
IsClosed
()
{
return
ErrSessionClosed
}
...
...
@@ -329,23 +329,23 @@ func (this *session) WritePkg(pkg interface{}) error {
const
size
=
64
<<
10
rBuf
:=
make
([]
byte
,
size
)
rBuf
=
rBuf
[
:
runtime
.
Stack
(
rBuf
,
false
)]
log
.
Error
(
"[session.WritePkg] panic session %s: err=%#v
\n
%s"
,
thi
s
.
sessionToken
(),
r
,
rBuf
)
log
.
Error
(
"[session.WritePkg] panic session %s: err=%#v
\n
%s"
,
s
.
sessionToken
(),
r
,
rBuf
)
}
}()
var
d
=
thi
s
.
writeDeadline
()
var
d
=
s
.
writeDeadline
()
if
d
>
netIOTimeout
{
d
=
netIOTimeout
}
select
{
case
thi
s
.
wQ
<-
pkg
:
case
s
.
wQ
<-
pkg
:
break
// for possible gen a new pkg
// default:
// case <-time.After(
thi
s.wDeadline):
// case <-time.After(s.wDeadline):
// case <-time.After(netIOTimeout):
case
<-
wheel
.
After
(
d
)
:
log
.
Warn
(
"%s, [session.WritePkg] wQ{len:%d, cap:%d}"
,
this
.
Stat
(),
len
(
this
.
wQ
),
cap
(
thi
s
.
wQ
))
log
.
Warn
(
"%s, [session.WritePkg] wQ{len:%d, cap:%d}"
,
s
.
Stat
(),
len
(
s
.
wQ
),
cap
(
s
.
wQ
))
return
ErrSessionBlocked
}
...
...
@@ -353,24 +353,24 @@ func (this *session) WritePkg(pkg interface{}) error {
}
// for codecs
func
(
thi
s
*
session
)
WriteBytes
(
pkg
[]
byte
)
error
{
if
thi
s
.
IsClosed
()
{
func
(
s
*
session
)
WriteBytes
(
pkg
[]
byte
)
error
{
if
s
.
IsClosed
()
{
return
ErrSessionClosed
}
//
this.conn.SetWriteDeadline(time.Now().Add(thi
s.wDeadline))
return
thi
s
.
Connection
.
Write
(
pkg
)
//
s.conn.SetWriteDeadline(time.Now().Add(
s.wDeadline))
return
s
.
Connection
.
Write
(
pkg
)
}
// Write multiple packages at once
func
(
thi
s
*
session
)
WriteBytesArray
(
pkgs
...
[]
byte
)
error
{
if
thi
s
.
IsClosed
()
{
func
(
s
*
session
)
WriteBytesArray
(
pkgs
...
[]
byte
)
error
{
if
s
.
IsClosed
()
{
return
ErrSessionClosed
}
//
this.conn.SetWriteDeadline(time.Now().Add(thi
s.wDeadline))
//
s.conn.SetWriteDeadline(time.Now().Add(
s.wDeadline))
if
len
(
pkgs
)
==
1
{
return
thi
s
.
Connection
.
Write
(
pkgs
[
0
])
return
s
.
Connection
.
Write
(
pkgs
[
0
])
}
// get len
...
...
@@ -392,38 +392,38 @@ func (this *session) WriteBytesArray(pkgs ...[]byte) error {
l
+=
len
(
pkgs
[
i
])
}
// return
thi
s.Connection.Write(arr)
return
thi
s
.
WriteBytes
(
arr
)
// return s.Connection.Write(arr)
return
s
.
WriteBytes
(
arr
)
}
// func (
thi
s *session) RunEventLoop() {
func
(
thi
s
*
session
)
run
()
{
if
this
.
rQ
==
nil
||
thi
s
.
wQ
==
nil
{
// func (s *session) RunEventLoop() {
func
(
s
*
session
)
run
()
{
if
s
.
rQ
==
nil
||
s
.
wQ
==
nil
{
errStr
:=
fmt
.
Sprintf
(
"session{name:%s, rQ:%#v, wQ:%#v}"
,
this
.
name
,
this
.
rQ
,
thi
s
.
wQ
)
s
.
name
,
s
.
rQ
,
s
.
wQ
)
log
.
Error
(
errStr
)
panic
(
errStr
)
}
if
this
.
Connection
==
nil
||
this
.
listener
==
nil
||
thi
s
.
writer
==
nil
{
if
s
.
Connection
==
nil
||
s
.
listener
==
nil
||
s
.
writer
==
nil
{
errStr
:=
fmt
.
Sprintf
(
"session{name:%s, conn:%#v, listener:%#v, writer:%#v}"
,
this
.
name
,
this
.
Connection
,
this
.
listener
,
thi
s
.
writer
)
s
.
name
,
s
.
Connection
,
s
.
listener
,
s
.
writer
)
log
.
Error
(
errStr
)
panic
(
errStr
)
}
// call session opened
thi
s
.
UpdateActive
()
if
err
:=
this
.
listener
.
OnOpen
(
thi
s
);
err
!=
nil
{
thi
s
.
Close
()
s
.
UpdateActive
()
if
err
:=
s
.
listener
.
OnOpen
(
s
);
err
!=
nil
{
s
.
Close
()
return
}
atomic
.
AddInt32
(
&
(
thi
s
.
grNum
),
2
)
go
thi
s
.
handleLoop
()
go
thi
s
.
handlePackage
()
atomic
.
AddInt32
(
&
(
s
.
grNum
),
2
)
go
s
.
handleLoop
()
go
s
.
handlePackage
()
}
func
(
thi
s
*
session
)
handleLoop
()
{
func
(
s
*
session
)
handleLoop
()
{
var
(
err
error
flag
bool
...
...
@@ -444,65 +444,65 @@ func (this *session) handleLoop() {
const
size
=
64
<<
10
rBuf
:=
make
([]
byte
,
size
)
rBuf
=
rBuf
[
:
runtime
.
Stack
(
rBuf
,
false
)]
log
.
Error
(
"[session.handleLoop] panic session %s: err=%#v
\n
%s"
,
thi
s
.
sessionToken
(),
r
,
rBuf
)
log
.
Error
(
"[session.handleLoop] panic session %s: err=%#v
\n
%s"
,
s
.
sessionToken
(),
r
,
rBuf
)
}
grNum
=
atomic
.
AddInt32
(
&
(
thi
s
.
grNum
),
-
1
)
// if !
thi
s.errFlag {
this
.
listener
.
OnClose
(
thi
s
)
grNum
=
atomic
.
AddInt32
(
&
(
s
.
grNum
),
-
1
)
// if !s.errFlag {
s
.
listener
.
OnClose
(
s
)
// }
log
.
Info
(
"%s, [session.handleLoop] goroutine exit now, left gr num %d"
,
thi
s
.
Stat
(),
grNum
)
thi
s
.
gc
()
log
.
Info
(
"%s, [session.handleLoop] goroutine exit now, left gr num %d"
,
s
.
Stat
(),
grNum
)
s
.
gc
()
}()
wsConn
,
wsFlag
=
thi
s
.
Connection
.
(
*
gettyWSConn
)
wsConn
,
wsFlag
=
s
.
Connection
.
(
*
gettyWSConn
)
flag
=
true
// do not do any read/Write/cron operation while got Write error
// ticker = time.NewTicker(
thi
s.period) // use wheel instead, 2016/09/26
// ticker = time.NewTicker(s.period) // use wheel instead, 2016/09/26
LOOP
:
for
{
// A select blocks until one of its cases can run, then it executes that case.
// It choose one at random if multiple are ready. Otherwise it choose default branch if none is ready.
select
{
case
<-
thi
s
.
done
:
case
<-
s
.
done
:
// 这个分支确保(session)handleLoop gr在(session)handlePackage gr之后退出
// once.Do(func() { ticker.Stop() }) // use wheel instead, 2016/09/26
if
atomic
.
LoadInt32
(
&
(
thi
s
.
grNum
))
==
1
{
// make sure @(session)handlePackage goroutine has been closed.
if
len
(
this
.
rQ
)
==
0
&&
len
(
thi
s
.
wQ
)
==
0
{
log
.
Info
(
"%s, [session.handleLoop] got done signal. Both rQ and wQ are nil."
,
thi
s
.
Stat
())
if
atomic
.
LoadInt32
(
&
(
s
.
grNum
))
==
1
{
// make sure @(session)handlePackage goroutine has been closed.
if
len
(
s
.
rQ
)
==
0
&&
len
(
s
.
wQ
)
==
0
{
log
.
Info
(
"%s, [session.handleLoop] got done signal. Both rQ and wQ are nil."
,
s
.
Stat
())
break
LOOP
}
counter
.
Start
()
// if time.Since(start).Nanoseconds() >=
thi
s.wait.Nanoseconds() {
if
counter
.
Count
()
>
thi
s
.
wait
.
Nanoseconds
()
{
log
.
Info
(
"%s, [session.handleLoop] got done signal "
,
thi
s
.
Stat
())
// if time.Since(start).Nanoseconds() >= s.wait.Nanoseconds() {
if
counter
.
Count
()
>
s
.
wait
.
Nanoseconds
()
{
log
.
Info
(
"%s, [session.handleLoop] got done signal "
,
s
.
Stat
())
break
LOOP
}
}
case
inPkg
=
<-
thi
s
.
rQ
:
case
inPkg
=
<-
s
.
rQ
:
// 这个条件分支通过(session)rQ排空确保(session)handlePackage gr不会阻塞在(session)rQ上
if
flag
{
this
.
listener
.
OnMessage
(
thi
s
,
inPkg
)
thi
s
.
incReadPkgCount
()
s
.
listener
.
OnMessage
(
s
,
inPkg
)
s
.
incReadPkgCount
()
}
else
{
log
.
Info
(
"[session.handleLoop] drop readin package{%#v}"
,
inPkg
)
}
case
outPkg
=
<-
thi
s
.
wQ
:
case
outPkg
=
<-
s
.
wQ
:
if
flag
{
if
err
=
this
.
writer
.
Write
(
thi
s
,
outPkg
);
err
!=
nil
{
log
.
Error
(
"%s, [session.handleLoop] = error{%+v}"
,
thi
s
.
sessionToken
(),
err
)
thi
s
.
stop
()
if
err
=
s
.
writer
.
Write
(
s
,
outPkg
);
err
!=
nil
{
log
.
Error
(
"%s, [session.handleLoop] = error{%+v}"
,
s
.
sessionToken
(),
err
)
s
.
stop
()
flag
=
false
// break LOOP
}
thi
s
.
incWritePkgCount
()
s
.
incWritePkgCount
()
}
else
{
log
.
Info
(
"[session.handleLoop] drop writeout package{%#v}"
,
outPkg
)
}
// case <-ticker.C: // use wheel instead, 2016/09/26
case
<-
wheel
.
After
(
thi
s
.
period
)
:
case
<-
wheel
.
After
(
s
.
period
)
:
if
flag
{
if
wsFlag
{
err
=
wsConn
.
writePing
()
...
...
@@ -511,14 +511,14 @@ LOOP:
log
.
Warn
(
"wsConn.writePing() = error{%#v}"
,
err
)
}
}
this
.
listener
.
OnCron
(
thi
s
)
s
.
listener
.
OnCron
(
s
)
}
}
}
// once.Do(func() { ticker.Stop() }) // use wheel instead, 2016/09/26
}
func
(
thi
s
*
session
)
handlePackage
()
{
func
(
s
*
session
)
handlePackage
()
{
var
(
err
error
)
...
...
@@ -530,34 +530,34 @@ func (this *session) handlePackage() {
const
size
=
64
<<
10
rBuf
:=
make
([]
byte
,
size
)
rBuf
=
rBuf
[
:
runtime
.
Stack
(
rBuf
,
false
)]
log
.
Error
(
"[session.handlePackage] panic session %s: err=%#v
\n
%s"
,
thi
s
.
sessionToken
(),
r
,
rBuf
)
log
.
Error
(
"[session.handlePackage] panic session %s: err=%#v
\n
%s"
,
s
.
sessionToken
(),
r
,
rBuf
)
}
grNum
=
atomic
.
AddInt32
(
&
(
thi
s
.
grNum
),
-
1
)
log
.
Info
(
"%s, [session.handlePackage] gr will exit now, left gr num %d"
,
thi
s
.
sessionToken
(),
grNum
)
thi
s
.
stop
()
// if
thi
s.errFlag {
grNum
=
atomic
.
AddInt32
(
&
(
s
.
grNum
),
-
1
)
log
.
Info
(
"%s, [session.handlePackage] gr will exit now, left gr num %d"
,
s
.
sessionToken
(),
grNum
)
s
.
stop
()
// if s.errFlag {
if
err
!=
nil
{
log
.
Error
(
"%s, [session.handlePackage] error{%#v}"
,
thi
s
.
sessionToken
(),
err
)
this
.
listener
.
OnError
(
thi
s
,
err
)
log
.
Error
(
"%s, [session.handlePackage] error{%#v}"
,
s
.
sessionToken
(),
err
)
s
.
listener
.
OnError
(
s
,
err
)
}
}()
if
_
,
ok
:=
thi
s
.
Connection
.
(
*
gettyTCPConn
);
ok
{
if
thi
s
.
reader
==
nil
{
errStr
:=
fmt
.
Sprintf
(
"session{name:%s, conn:%#v, reader:%#v}"
,
this
.
name
,
this
.
Connection
,
thi
s
.
reader
)
if
_
,
ok
:=
s
.
Connection
.
(
*
gettyTCPConn
);
ok
{
if
s
.
reader
==
nil
{
errStr
:=
fmt
.
Sprintf
(
"session{name:%s, conn:%#v, reader:%#v}"
,
s
.
name
,
s
.
Connection
,
s
.
reader
)
log
.
Error
(
errStr
)
panic
(
errStr
)
}
err
=
thi
s
.
handleTCPPackage
()
}
else
if
_
,
ok
:=
thi
s
.
Connection
.
(
*
gettyWSConn
);
ok
{
err
=
thi
s
.
handleWSPackage
()
err
=
s
.
handleTCPPackage
()
}
else
if
_
,
ok
:=
s
.
Connection
.
(
*
gettyWSConn
);
ok
{
err
=
s
.
handleWSPackage
()
}
}
// get package from tcp stream(packet)
func
(
thi
s
*
session
)
handleTCPPackage
()
error
{
func
(
s
*
session
)
handleTCPPackage
()
error
{
var
(
ok
bool
err
error
...
...
@@ -573,9 +573,9 @@ func (this *session) handleTCPPackage() error {
buf
=
make
([]
byte
,
maxReadBufLen
)
pktBuf
=
new
(
bytes
.
Buffer
)
conn
=
thi
s
.
Connection
.
(
*
gettyTCPConn
)
conn
=
s
.
Connection
.
(
*
gettyTCPConn
)
for
{
if
thi
s
.
IsClosed
()
{
if
s
.
IsClosed
()
{
err
=
nil
break
// 退出前不再读取任何packet,buf中剩余的stream bytes也不可能凑够一个package, 所以直接退出
}
...
...
@@ -583,15 +583,15 @@ func (this *session) handleTCPPackage() error {
bufLen
=
0
for
{
// for clause for the network timeout condition check
//
this.conn.SetReadDeadline(time.Now().Add(thi
s.rDeadline))
//
s.conn.SetReadDeadline(time.Now().Add(
s.rDeadline))
bufLen
,
err
=
conn
.
read
(
buf
)
if
err
!=
nil
{
if
nerr
,
ok
=
err
.
(
net
.
Error
);
ok
&&
nerr
.
Timeout
()
{
break
}
log
.
Error
(
"%s, [session.conn.read] = error{%v}"
,
thi
s
.
sessionToken
(),
err
)
log
.
Error
(
"%s, [session.conn.read] = error{%v}"
,
s
.
sessionToken
(),
err
)
// for (Codec)OnErr
//
thi
s.errFlag = true
// s.errFlag = true
exit
=
true
}
break
...
...
@@ -607,23 +607,23 @@ func (this *session) handleTCPPackage() error {
if
pktBuf
.
Len
()
<=
0
{
break
}
// pkg, err =
this.pkgHandler.Read(thi
s, pktBuf)
pkg
,
pkgLen
,
err
=
this
.
reader
.
Read
(
thi
s
,
pktBuf
.
Bytes
())
if
err
==
nil
&&
this
.
maxMsgLen
>
0
&&
pkgLen
>
int
(
thi
s
.
maxMsgLen
)
{
// pkg, err =
s.pkgHandler.Read(
s, pktBuf)
pkg
,
pkgLen
,
err
=
s
.
reader
.
Read
(
s
,
pktBuf
.
Bytes
())
if
err
==
nil
&&
s
.
maxMsgLen
>
0
&&
pkgLen
>
int
(
s
.
maxMsgLen
)
{
err
=
ErrMsgTooLong
}
if
err
!=
nil
{
log
.
Warn
(
"%s, [session.handleTCPPackage] = len{%d}, error{%+v}"
,
thi
s
.
sessionToken
(),
pkgLen
,
err
)
log
.
Warn
(
"%s, [session.handleTCPPackage] = len{%d}, error{%+v}"
,
s
.
sessionToken
(),
pkgLen
,
err
)
// for (Codec)OnErr
//
thi
s.errFlag = true
// s.errFlag = true
exit
=
true
break
}
if
pkg
==
nil
{
break
}
thi
s
.
UpdateActive
()
thi
s
.
rQ
<-
pkg
s
.
UpdateActive
()
s
.
rQ
<-
pkg
pktBuf
.
Next
(
pkgLen
)
}
if
exit
{
...
...
@@ -635,7 +635,7 @@ func (this *session) handleTCPPackage() error {
}
// get package from websocket stream
func
(
thi
s
*
session
)
handleWSPackage
()
error
{
func
(
s
*
session
)
handleWSPackage
()
error
{
var
(
ok
bool
err
error
...
...
@@ -646,9 +646,9 @@ func (this *session) handleWSPackage() error {
unmarshalPkg
interface
{}
)
conn
=
thi
s
.
Connection
.
(
*
gettyWSConn
)
conn
=
s
.
Connection
.
(
*
gettyWSConn
)
for
{
if
thi
s
.
IsClosed
()
{
if
s
.
IsClosed
()
{
break
}
pkg
,
err
=
conn
.
read
()
...
...
@@ -656,62 +656,62 @@ func (this *session) handleWSPackage() error {
continue
}
if
err
!=
nil
{
log
.
Warn
(
"%s, [session.handleWSPackage] = error{%+v}"
,
thi
s
.
sessionToken
(),
err
)
//
thi
s.errFlag = true
log
.
Warn
(
"%s, [session.handleWSPackage] = error{%+v}"
,
s
.
sessionToken
(),
err
)
// s.errFlag = true
return
err
}
thi
s
.
UpdateActive
()
if
thi
s
.
reader
!=
nil
{
unmarshalPkg
,
length
,
err
=
this
.
reader
.
Read
(
thi
s
,
pkg
)
if
err
==
nil
&&
this
.
maxMsgLen
>
0
&&
length
>
int
(
thi
s
.
maxMsgLen
)
{
s
.
UpdateActive
()
if
s
.
reader
!=
nil
{
unmarshalPkg
,
length
,
err
=
s
.
reader
.
Read
(
s
,
pkg
)
if
err
==
nil
&&
s
.
maxMsgLen
>
0
&&
length
>
int
(
s
.
maxMsgLen
)
{
err
=
ErrMsgTooLong
}
if
err
!=
nil
{
log
.
Warn
(
"%s, [session.handleWSPackage] = len{%d}, error{%+v}"
,
thi
s
.
sessionToken
(),
length
,
err
)
log
.
Warn
(
"%s, [session.handleWSPackage] = len{%d}, error{%+v}"
,
s
.
sessionToken
(),
length
,
err
)
continue
}
thi
s
.
rQ
<-
unmarshalPkg
s
.
rQ
<-
unmarshalPkg
}
else
{
thi
s
.
rQ
<-
pkg
s
.
rQ
<-
pkg
}
}
return
nil
}
func
(
thi
s
*
session
)
stop
()
{
func
(
s
*
session
)
stop
()
{
select
{
case
<-
this
.
done
:
// thi
s.done is a blocked channel. if it has not been closed, the default branch will be invoked.
case
<-
s
.
done
:
//
s.done is a blocked channel. if it has not been closed, the default branch will be invoked.
return
default
:
thi
s
.
once
.
Do
(
func
()
{
s
.
once
.
Do
(
func
()
{
// let read/Write timeout asap
if
conn
:=
thi
s
.
Conn
();
conn
!=
nil
{
conn
.
SetReadDeadline
(
time
.
Now
()
.
Add
(
thi
s
.
readDeadline
()))
conn
.
SetWriteDeadline
(
time
.
Now
()
.
Add
(
thi
s
.
writeDeadline
()))
if
conn
:=
s
.
Conn
();
conn
!=
nil
{
conn
.
SetReadDeadline
(
time
.
Now
()
.
Add
(
s
.
readDeadline
()))
conn
.
SetWriteDeadline
(
time
.
Now
()
.
Add
(
s
.
writeDeadline
()))
}
close
(
thi
s
.
done
)
close
(
s
.
done
)
})
}
}
func
(
thi
s
*
session
)
gc
()
{
thi
s
.
lock
.
Lock
()
if
thi
s
.
attrs
!=
nil
{
thi
s
.
attrs
=
nil
close
(
thi
s
.
wQ
)
thi
s
.
wQ
=
nil
close
(
thi
s
.
rQ
)
thi
s
.
rQ
=
nil
this
.
Connection
.
close
((
int
)((
int64
)(
thi
s
.
wait
)))
func
(
s
*
session
)
gc
()
{
s
.
lock
.
Lock
()
if
s
.
attrs
!=
nil
{
s
.
attrs
=
nil
close
(
s
.
wQ
)
s
.
wQ
=
nil
close
(
s
.
rQ
)
s
.
rQ
=
nil
s
.
Connection
.
close
((
int
)((
int64
)(
s
.
wait
)))
}
thi
s
.
lock
.
Unlock
()
s
.
lock
.
Unlock
()
}
//
thi
s function will be invoked by NewSessionCallback(if return error is not nil) or (session)handleLoop automatically.
// s function will be invoked by NewSessionCallback(if return error is not nil) or (session)handleLoop automatically.
// It is goroutine-safe to be invoked many times.
func
(
thi
s
*
session
)
Close
()
{
thi
s
.
stop
()
log
.
Info
(
"%s closed now, its current gr num %d"
,
this
.
sessionToken
(),
atomic
.
LoadInt32
(
&
(
thi
s
.
grNum
)))
func
(
s
*
session
)
Close
()
{
s
.
stop
()
log
.
Info
(
"%s closed now, its current gr num %d"
,
s
.
sessionToken
(),
atomic
.
LoadInt32
(
&
(
s
.
grNum
)))
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment