Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in
Toggle navigation
G
getty
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
wei.xuan
getty
Commits
2b6c6583
Unverified
Commit
2b6c6583
authored
Sep 10, 2019
by
望哥
Committed by
GitHub
Sep 10, 2019
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #26 from divebomb/master
Add: writev
parents
34d2a349
e545f6fa
Hide whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
170 additions
and
70 deletions
+170
-70
connection.go
connection.go
+8
-8
pkghandler.go
demo/hello/pkghandler.go
+37
-7
client.go
demo/hello/tcp/client/client.go
+20
-9
config.go
demo/hello/tcp/config.go
+3
-15
server.go
demo/hello/tcp/server/server.go
+1
-1
pprof.go
demo/util/pprof.go
+2
-2
getty.go
getty.go
+3
-3
session.go
session.go
+96
-25
No files found.
conn.go
→
conn
ection
.go
View file @
2b6c6583
...
...
@@ -4,7 +4,7 @@
# LICENCE : Apache License 2.0
# EMAIL : alexstocks@foxmail.com
# MOD : 2016-08-17 11:21
# FILE : conn.go
# FILE : conn
ection
.go
******************************************************/
package
getty
...
...
@@ -89,7 +89,7 @@ func (c *gettyConn) GetActive() time.Time {
return
launchTime
.
Add
(
time
.
Duration
(
atomic
.
LoadInt64
(
&
(
c
.
active
))))
}
func
(
c
*
gettyConn
)
Write
(
interface
{})
(
int
,
error
)
{
func
(
c
*
gettyConn
)
send
(
interface
{})
(
int
,
error
)
{
return
0
,
nil
}
...
...
@@ -228,7 +228,7 @@ func (t *gettyTCPConn) SetCompressType(c CompressType) {
}
// tcp connection read
func
(
t
*
gettyTCPConn
)
re
ad
(
p
[]
byte
)
(
int
,
error
)
{
func
(
t
*
gettyTCPConn
)
re
cv
(
p
[]
byte
)
(
int
,
error
)
{
var
(
err
error
currentTime
time
.
Time
...
...
@@ -258,7 +258,7 @@ func (t *gettyTCPConn) read(p []byte) (int, error) {
}
// tcp connection write
func
(
t
*
gettyTCPConn
)
Write
(
pkg
interface
{})
(
int
,
error
)
{
func
(
t
*
gettyTCPConn
)
send
(
pkg
interface
{})
(
int
,
error
)
{
var
(
err
error
currentTime
time
.
Time
...
...
@@ -379,7 +379,7 @@ func (u *gettyUDPConn) SetCompressType(c CompressType) {
}
// udp connection read
func
(
u
*
gettyUDPConn
)
re
ad
(
p
[]
byte
)
(
int
,
*
net
.
UDPAddr
,
error
)
{
func
(
u
*
gettyUDPConn
)
re
cv
(
p
[]
byte
)
(
int
,
*
net
.
UDPAddr
,
error
)
{
var
(
err
error
currentTime
time
.
Time
...
...
@@ -411,7 +411,7 @@ func (u *gettyUDPConn) read(p []byte) (int, *net.UDPAddr, error) {
}
// write udp packet, @ctx should be of type UDPContext
func
(
u
*
gettyUDPConn
)
Write
(
udpCtx
interface
{})
(
int
,
error
)
{
func
(
u
*
gettyUDPConn
)
send
(
udpCtx
interface
{})
(
int
,
error
)
{
var
(
err
error
currentTime
time
.
Time
...
...
@@ -539,7 +539,7 @@ func (w *gettyWSConn) handlePong(string) error {
}
// websocket connection read
func
(
w
*
gettyWSConn
)
re
ad
()
([]
byte
,
error
)
{
func
(
w
*
gettyWSConn
)
re
cv
()
([]
byte
,
error
)
{
// Pls do not set read deadline when using ReadMessage. AlexStocks 20180310
// gorilla/websocket/conn.go:NextReader will always fail when got a timeout error.
_
,
b
,
e
:=
w
.
conn
.
ReadMessage
()
// the first return value is message type.
...
...
@@ -578,7 +578,7 @@ func (w *gettyWSConn) updateWriteDeadline() error {
}
// websocket connection write
func
(
w
*
gettyWSConn
)
Write
(
pkg
interface
{})
(
int
,
error
)
{
func
(
w
*
gettyWSConn
)
send
(
pkg
interface
{})
(
int
,
error
)
{
var
(
err
error
ok
bool
...
...
demo/hello/pkghandler.go
View file @
2b6c6583
...
...
@@ -8,6 +8,7 @@
package
hello
import
(
"encoding/binary"
"errors"
)
...
...
@@ -18,15 +19,44 @@ import (
type
PackageHandler
struct
{}
func
(
h
*
PackageHandler
)
Read
(
ss
getty
.
Session
,
data
[]
byte
)
(
interface
{},
int
,
error
)
{
s
:=
string
(
data
)
return
s
,
len
(
s
),
nil
dataLen
:=
len
(
data
)
if
dataLen
<
4
{
return
nil
,
0
,
nil
}
start
:=
0
pos
:=
start
+
4
pkgLen
:=
int
(
binary
.
LittleEndian
.
Uint32
(
data
[
start
:
pos
]))
if
dataLen
<
pos
+
pkgLen
{
return
nil
,
pos
+
pkgLen
,
nil
}
start
=
pos
pos
=
start
+
pkgLen
s
:=
string
(
data
[
start
:
pos
])
return
s
,
pos
,
nil
}
func
(
h
*
PackageHandler
)
Write
(
ss
getty
.
Session
,
p
kg
interface
{})
error
{
s
,
ok
:=
pkg
.
(
string
)
func
(
h
*
PackageHandler
)
Write
(
ss
getty
.
Session
,
p
interface
{})
([]
byte
,
error
)
{
pkg
,
ok
:=
p
.
(
string
)
if
!
ok
{
log
.
Infof
(
"illegal pkg:%+v"
,
p
kg
)
return
errors
.
New
(
"invalid package"
)
log
.
Infof
(
"illegal pkg:%+v"
,
p
)
return
nil
,
errors
.
New
(
"invalid package"
)
}
return
ss
.
WriteBytes
([]
byte
(
s
))
pkgLen
:=
int32
(
len
(
pkg
))
pkgStreams
:=
make
([]
byte
,
0
,
4
+
len
(
pkg
))
// pkg len
start
:=
0
pos
:=
start
+
4
binary
.
LittleEndian
.
PutUint32
(
pkgStreams
[
start
:
pos
],
uint32
(
pkgLen
))
start
=
pos
// pkg
pos
=
start
+
int
(
pkgLen
)
copy
(
pkgStreams
[
start
:
pos
],
pkg
[
:
])
return
pkgStreams
[
:
pos
],
nil
}
demo/hello/tcp/client/client.go
View file @
2b6c6583
...
...
@@ -34,7 +34,7 @@ var (
)
var
(
taskPool
*
gxsync
.
TaskPool
taskPool
*
gxsync
.
TaskPool
)
func
main
()
{
...
...
@@ -44,23 +44,34 @@ func main() {
util
.
Profiling
(
*
pprofPort
)
if
*
taskPoolMode
{
taskPool
=
gxsync
.
NewTaskPool
(
gxsync
.
WithTaskPoolTaskQueueLength
(
*
taskPoolQueueLength
),
gxsync
.
WithTaskPoolTaskQueueNumber
(
*
taskPoolQueueNumber
),
gxsync
.
WithTaskPoolTaskPoolSize
(
*
taskPoolSize
),
)
}
if
*
taskPoolMode
{
taskPool
=
gxsync
.
NewTaskPool
(
gxsync
.
WithTaskPoolTaskQueueLength
(
*
taskPoolQueueLength
),
gxsync
.
WithTaskPoolTaskQueueNumber
(
*
taskPoolQueueNumber
),
gxsync
.
WithTaskPoolTaskPoolSize
(
*
taskPoolSize
),
)
}
client
:=
getty
.
NewTCPClient
(
getty
.
WithServerAddress
(
*
ip
+
":8090"
),
getty
.
WithConnectionNumber
(
*
connections
),
)
client
.
RunEventLoop
(
tcp
.
NewHelloClientSession
)
client
.
RunEventLoop
(
NewHelloClientSession
)
go
hello
.
ClientRequest
()
util
.
WaitCloseSignals
(
client
)
taskPool
.
Close
()
}
func
NewHelloClientSession
(
session
getty
.
Session
)
(
err
error
)
{
tcp
.
EventListener
.
SessionOnOpen
=
func
(
session
getty
.
Session
)
{
hello
.
Sessions
=
append
(
hello
.
Sessions
,
session
)
}
err
=
tcp
.
InitialSession
(
session
)
if
err
!=
nil
{
return
}
return
}
demo/hello/tcp/config.go
View file @
2b6c6583
...
...
@@ -23,21 +23,9 @@ import (
var
(
pkgHandler
=
&
hello
.
PackageHandler
{}
e
ventListener
=
&
hello
.
MessageHandler
{}
E
ventListener
=
&
hello
.
MessageHandler
{}
)
func
NewHelloClientSession
(
session
getty
.
Session
,
taskPool
*
gxsync
.
TaskPool
)
(
err
error
)
{
eventListener
.
SessionOnOpen
=
func
(
session
getty
.
Session
)
{
hello
.
Sessions
=
append
(
hello
.
Sessions
,
session
)
}
err
=
InitialSession
(
session
)
if
err
!=
nil
{
return
}
session
.
SetTaskPool
(
taskPool
)
return
}
func
InitialSession
(
session
getty
.
Session
)
(
err
error
)
{
session
.
SetCompressType
(
getty
.
CompressZip
)
...
...
@@ -64,7 +52,7 @@ func InitialSession(session getty.Session) (err error) {
session
.
SetName
(
"hello"
)
session
.
SetMaxMsgLen
(
128
)
session
.
SetRQLen
(
1024
)
//
session.SetRQLen(1024)
session
.
SetWQLen
(
512
)
session
.
SetReadTimeout
(
time
.
Second
)
session
.
SetWriteTimeout
(
5
*
time
.
Second
)
...
...
@@ -72,6 +60,6 @@ func InitialSession(session getty.Session) (err error) {
session
.
SetWaitTime
(
time
.
Second
)
session
.
SetPkgHandler
(
pkgHandler
)
session
.
SetEventListener
(
e
ventListener
)
session
.
SetEventListener
(
E
ventListener
)
return
nil
}
demo/hello/tcp/server/server.go
View file @
2b6c6583
...
...
@@ -38,7 +38,7 @@ func main() {
util
.
SetLimit
()
util
.
Profiling
(
*
pprofPort
)
util
.
Profiling
(
*
pprofPort
)
options
:=
[]
getty
.
ServerOption
{
getty
.
WithLocalAddress
(
":8090"
)}
...
...
demo/util/pprof.go
View file @
2b6c6583
...
...
@@ -8,13 +8,13 @@
package
util
import
(
"fmt"
"fmt"
"net/http"
_
"net/http/pprof"
)
func
Profiling
(
port
int
)
{
go
func
()
{
http
.
ListenAndServe
(
fmt
.
Sprintf
(
":%d"
,
port
),
nil
)
http
.
ListenAndServe
(
fmt
.
Sprintf
(
":%d"
,
port
),
nil
)
}()
}
getty.go
View file @
2b6c6583
...
...
@@ -46,10 +46,10 @@ type Reader interface {
// Writer is used to marshal pkg and write to session
type
Writer
interface
{
// if @Session is udpGettySession, the second parameter is UDPContext.
Write
(
Session
,
interface
{})
error
Write
(
Session
,
interface
{})
([]
byte
,
error
)
}
//
tcp
package handler interface
// package handler interface
type
ReadWriter
interface
{
Reader
Writer
...
...
@@ -120,7 +120,7 @@ type Connection interface {
writeTimeout
()
time
.
Duration
// SetWriteTimeout sets deadline for the future read calls.
SetWriteTimeout
(
time
.
Duration
)
Write
(
interface
{})
(
int
,
error
)
send
(
interface
{})
(
int
,
error
)
// don't distinguish between tcp connection and websocket connection. Because
// gorilla/websocket/conn.go:(Conn)Close also invoke net.Conn.Close
close
(
int
)
...
...
session.go
View file @
2b6c6583
...
...
@@ -33,6 +33,7 @@ const (
period
=
60
*
1e9
// 1 minute
pendingDuration
=
3e9
defaultQLen
=
1024
maxIovecNum
=
10
defaultSessionName
=
"session"
defaultTCPSessionName
=
"tcp-session"
defaultUDPSessionName
=
"udp-session"
...
...
@@ -108,7 +109,6 @@ func newSession(endPoint EndPoint, conn Connection) *session {
wait
:
pendingDuration
,
attrs
:
NewValuesContext
(
nil
),
rDone
:
make
(
chan
struct
{}),
grNum
:
0
,
}
ss
.
Connection
.
setSession
(
ss
)
...
...
@@ -355,6 +355,9 @@ func (s *session) sessionToken() string {
}
func
(
s
*
session
)
WritePkg
(
pkg
interface
{},
timeout
time
.
Duration
)
error
{
if
pkg
==
nil
{
return
fmt
.
Errorf
(
"@pkg is nil"
)
}
if
s
.
IsClosed
()
{
return
ErrSessionClosed
}
...
...
@@ -368,12 +371,31 @@ func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error {
}
}()
var
err
error
if
timeout
<=
0
{
if
err
=
s
.
writer
.
Write
(
s
,
pkg
);
err
!=
nil
{
pkgBytes
,
err
:=
s
.
writer
.
Write
(
s
,
pkg
)
if
err
!=
nil
{
log
.
Warnf
(
"%s, [session.WritePkg] session.writer.Write(@pkg:%#v) = error:%v"
,
s
.
Stat
(),
pkg
,
err
)
return
perrors
.
WithStack
(
err
)
}
var
udpCtxPtr
*
UDPContext
if
udpCtx
,
ok
:=
pkg
.
(
UDPContext
);
ok
{
udpCtxPtr
=
&
udpCtx
}
else
if
udpCtxP
,
ok
:=
pkg
.
(
*
UDPContext
);
ok
{
udpCtxPtr
=
udpCtxP
}
if
udpCtxPtr
!=
nil
{
udpCtxPtr
.
Pkg
=
pkgBytes
pkg
=
*
udpCtxPtr
}
else
{
pkg
=
pkgBytes
}
_
,
err
=
s
.
Connection
.
send
(
pkg
)
if
err
!=
nil
{
log
.
Warn
(
"%s, [session.WritePkg] @s.Connection.Write(pkg:%#v) = err:%v"
,
s
.
Stat
(),
pkg
,
err
)
return
perrors
.
WithStack
(
err
)
}
s
.
incWritePkgNum
()
return
nil
}
select
{
case
s
.
wQ
<-
pkg
:
...
...
@@ -394,7 +416,7 @@ func (s *session) WriteBytes(pkg []byte) error {
}
// s.conn.SetWriteTimeout(time.Now().Add(s.wTimeout))
if
_
,
err
:=
s
.
Connection
.
Write
(
pkg
);
err
!=
nil
{
if
_
,
err
:=
s
.
Connection
.
send
(
pkg
);
err
!=
nil
{
return
perrors
.
Wrapf
(
err
,
"s.Connection.Write(pkg len:%d)"
,
len
(
pkg
))
}
...
...
@@ -403,7 +425,7 @@ func (s *session) WriteBytes(pkg []byte) error {
return
nil
}
// Write multiple packages at once
// Write multiple packages at once
. so we invoke write sys.call just one time.
func
(
s
*
session
)
WriteBytesArray
(
pkgs
...
[]
byte
)
error
{
if
s
.
IsClosed
()
{
return
ErrSessionClosed
...
...
@@ -438,7 +460,6 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error {
l
+=
len
(
pkgs
[
i
])
}
// return s.Connection.Write(arr)
if
err
=
s
.
WriteBytes
(
arr
);
err
!=
nil
{
return
perrors
.
WithStack
(
err
)
}
...
...
@@ -446,7 +467,6 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error {
num
:=
len
(
pkgs
)
-
1
for
i
:=
0
;
i
<
num
;
i
++
{
s
.
incWritePkgNum
()
// gxlog.CError("after write, ss:%s", s.Stat())
}
return
nil
...
...
@@ -468,7 +488,7 @@ func (s *session) run() {
// call session opened
s
.
UpdateActive
()
if
err
:=
s
.
listener
.
OnOpen
(
s
);
err
!=
nil
{
log
.
Errorf
(
"[OnOpen]
error: %#v"
,
err
)
log
.
Errorf
(
"[OnOpen]
session %s, error: %#v"
,
s
.
Stat
()
,
err
)
s
.
Close
()
return
}
...
...
@@ -481,13 +501,17 @@ func (s *session) run() {
func
(
s
*
session
)
handleLoop
()
{
var
(
err
error
flag
bool
wsFlag
bool
wsConn
*
gettyWSConn
// start time.Time
counter
gxtime
.
CountWatch
outPkg
interface
{}
err
error
ok
bool
flag
bool
wsFlag
bool
udpFlag
bool
loopFlag
bool
wsConn
*
gettyWSConn
counter
gxtime
.
CountWatch
outPkg
interface
{}
pkgBytes
[]
byte
iovec
[][]
byte
)
defer
func
()
{
...
...
@@ -506,6 +530,8 @@ func (s *session) handleLoop() {
flag
=
true
// do not do any read/Write/cron operation while got Write error
wsConn
,
wsFlag
=
s
.
Connection
.
(
*
gettyWSConn
)
_
,
udpFlag
=
s
.
Connection
.
(
*
gettyUDPConn
)
iovec
=
make
([][]
byte
,
0
,
maxIovecNum
)
LOOP
:
for
{
// A select blocks until one of its cases is ready to run.
...
...
@@ -519,21 +545,67 @@ LOOP:
break
LOOP
}
counter
.
Start
()
// if time.Since(start).Nanoseconds() >= s.wait.Nanoseconds() {
if
counter
.
Count
()
>
s
.
wait
.
Nanoseconds
()
{
log
.
Infof
(
"%s, [session.handleLoop] got done signal "
,
s
.
Stat
())
break
LOOP
}
case
outPkg
=
<-
s
.
wQ
:
if
flag
{
if
err
=
s
.
writer
.
Write
(
s
,
outPkg
);
err
!=
nil
{
case
outPkg
,
ok
=
<-
s
.
wQ
:
if
!
ok
{
continue
}
if
!
flag
{
log
.
Warn
(
"[session.handleLoop] drop write out package %#v"
,
outPkg
)
continue
}
if
udpFlag
||
wsFlag
{
err
=
s
.
WritePkg
(
outPkg
,
0
)
if
err
!=
nil
{
log
.
Errorf
(
"%s, [session.handleLoop] = error:%+v"
,
s
.
sessionToken
(),
err
)
s
.
stop
()
// break LOOP
flag
=
false
}
continue
}
iovec
=
iovec
[
:
0
]
for
idx
:=
0
;
idx
<
maxIovecNum
;
idx
++
{
pkgBytes
,
err
=
s
.
writer
.
Write
(
s
,
outPkg
)
if
err
!=
nil
{
log
.
Errorf
(
"%s, [session.handleLoop] = error:%+v"
,
s
.
sessionToken
(),
err
)
s
.
stop
()
// break LOOP
flag
=
false
break
}
iovec
=
append
(
iovec
,
pkgBytes
)
if
idx
<
maxIovecNum
-
1
{
loopFlag
=
true
select
{
case
outPkg
,
ok
=
<-
s
.
wQ
:
if
!
ok
{
loopFlag
=
false
}
default
:
loopFlag
=
false
break
}
if
!
loopFlag
{
break
// break for-idx loop
}
}
}
else
{
log
.
Infof
(
"[session.handleLoop] drop writeout package{%#v}"
,
outPkg
)
}
err
=
s
.
WriteBytesArray
(
iovec
[
:
]
...
)
if
err
!=
nil
{
log
.
Errorf
(
"%s, [session.handleLoop]s.WriteBytesArray(iovec len:%d) = error:%+v"
,
s
.
sessionToken
(),
len
(
iovec
),
err
)
s
.
stop
()
// break LOOP
flag
=
false
}
case
<-
wheel
.
After
(
s
.
period
)
:
...
...
@@ -643,7 +715,7 @@ func (s *session) handleTCPPackage() error {
for
{
// for clause for the network timeout condition check
// s.conn.SetReadTimeout(time.Now().Add(s.rTimeout))
bufLen
,
err
=
conn
.
re
ad
(
buf
)
bufLen
,
err
=
conn
.
re
cv
(
buf
)
if
err
!=
nil
{
if
netError
,
ok
=
perrors
.
Cause
(
err
)
.
(
net
.
Error
);
ok
&&
netError
.
Timeout
()
{
break
...
...
@@ -664,7 +736,6 @@ func (s *session) handleTCPPackage() error {
if
pktBuf
.
Len
()
<=
0
{
break
}
// pkg, err = s.pkgHandler.Read(s, pktBuf)
pkg
,
pkgLen
,
err
=
s
.
reader
.
Read
(
s
,
pktBuf
.
Bytes
())
// for case 3/case 4
if
err
==
nil
&&
s
.
maxMsgLen
>
0
&&
pkgLen
>
int
(
s
.
maxMsgLen
)
{
...
...
@@ -724,7 +795,7 @@ func (s *session) handleUDPPackage() error {
break
}
bufLen
,
addr
,
err
=
conn
.
re
ad
(
buf
)
bufLen
,
addr
,
err
=
conn
.
re
cv
(
buf
)
log
.
Debugf
(
"conn.read() = bufLen:%d, addr:%#v, err:%+v"
,
bufLen
,
addr
,
err
)
if
netError
,
ok
=
perrors
.
Cause
(
err
)
.
(
net
.
Error
);
ok
&&
netError
.
Timeout
()
{
continue
...
...
@@ -785,7 +856,7 @@ func (s *session) handleWSPackage() error {
if
s
.
IsClosed
()
{
break
}
pkg
,
err
=
conn
.
re
ad
()
pkg
,
err
=
conn
.
re
cv
()
if
netError
,
ok
=
perrors
.
Cause
(
err
)
.
(
net
.
Error
);
ok
&&
netError
.
Timeout
()
{
continue
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment