Commit d80971ca authored by AlexStocks's avatar AlexStocks

Rem: wQ

parent ff084291
......@@ -84,15 +84,13 @@ func (p *Package) Unmarshal(buf *bytes.Buffer) (int, error) { return 0, nil }
func newSessionCallback(session Session, handler *MessageHandler) error {
var pkgHandler PackageHandler
session.SetName("hello-client-session")
session.SetMaxMsgLen(1024)
session.SetMaxMsgLen(128 * 1024) // max message package length 128k
session.SetPkgHandler(&pkgHandler)
session.SetEventListener(handler)
session.SetWQLen(32)
session.SetReadTimeout(3e9)
session.SetWriteTimeout(3e9)
session.SetCronPeriod((int)(30e9 / 1e6))
session.SetWaitTime(3e9)
session.SetTaskPool(nil)
return nil
}
......
......@@ -16,8 +16,6 @@ run server:
Or run server in task pool mode:
```bash
go run tcp/server/server.go -taskPool=true \
-task_queue_length=128 \
-task_queue_number=16 \
-task_pool_size=2000 \
-pprof_port=60000
```
......@@ -31,8 +29,6 @@ go run tcp/client/client.go
Or run client in task pool mode:
```bash
go run tcp/client/client.go -taskPool=true \
-task_queue_length=100 \
-task_queue_number=4 \
-task_pool_size=50 \
-pprof_port=60001
```
......
......@@ -36,11 +36,9 @@ var (
ip = flag.String("ip", "127.0.0.1", "server IP")
connections = flag.Int("conn", 1, "number of tcp connections")
taskPoolMode = flag.Bool("taskPool", false, "task pool mode")
taskPoolQueueLength = flag.Int("task_queue_length", 100, "task queue length")
taskPoolQueueNumber = flag.Int("task_queue_number", 4, "task queue number")
taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
pprofPort = flag.Int("pprof_port", 65431, "pprof http port")
taskPoolMode = flag.Bool("taskPool", false, "task pool mode")
taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
pprofPort = flag.Int("pprof_port", 65431, "pprof http port")
)
var (
......@@ -55,16 +53,13 @@ func main() {
util.Profiling(*pprofPort)
if *taskPoolMode {
taskPool = gxsync.NewTaskPool(
gxsync.WithTaskPoolTaskQueueLength(*taskPoolQueueLength),
gxsync.WithTaskPoolTaskQueueNumber(*taskPoolQueueNumber),
gxsync.WithTaskPoolTaskPoolSize(*taskPoolSize),
)
taskPool = gxsync.NewTaskPoolSimple(*taskPoolSize)
}
client := getty.NewTCPClient(
getty.WithServerAddress(*ip+":8090"),
getty.WithConnectionNumber(*connections),
getty.WithClientTaskPool(taskPool),
)
client.RunEventLoop(NewHelloClientSession)
......
......@@ -62,8 +62,7 @@ func InitialSession(session getty.Session) (err error) {
}
session.SetName("hello")
session.SetMaxMsgLen(128)
session.SetWQLen(512)
session.SetMaxMsgLen(128 * 1024) // max message package length is 128k
session.SetReadTimeout(time.Second)
session.SetWriteTimeout(5 * time.Second)
session.SetCronPeriod(int(hello.CronPeriod / 1e6))
......
......@@ -32,11 +32,9 @@ import (
)
var (
taskPoolMode = flag.Bool("taskPool", false, "task pool mode")
taskPoolQueueLength = flag.Int("task_queue_length", 100, "task queue length")
taskPoolQueueNumber = flag.Int("task_queue_number", 4, "task queue number")
taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
pprofPort = flag.Int("pprof_port", 65432, "pprof http port")
taskPoolMode = flag.Bool("taskPool", false, "task pool mode")
taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
pprofPort = flag.Int("pprof_port", 65432, "pprof http port")
)
var (
......@@ -53,11 +51,8 @@ func main() {
options := []getty.ServerOption{getty.WithLocalAddress(":8090")}
if *taskPoolMode {
taskPool = gxsync.NewTaskPool(
gxsync.WithTaskPoolTaskQueueLength(*taskPoolQueueLength),
gxsync.WithTaskPoolTaskQueueNumber(*taskPoolQueueNumber),
gxsync.WithTaskPoolTaskPoolSize(*taskPoolSize),
)
taskPool = gxsync.NewTaskPoolSimple(*taskPoolSize)
options = append(options, getty.WithServerTaskPool(taskPool))
}
server := getty.NewTCPServer(options...)
......
......@@ -37,15 +37,13 @@ var (
ip = flag.String("ip", "127.0.0.1", "server IP")
connections = flag.Int("conn", 1, "number of tcp connections")
taskPoolMode = flag.Bool("taskPool", false, "task pool mode")
taskPoolQueueLength = flag.Int("task_queue_length", 100, "task queue length")
taskPoolQueueNumber = flag.Int("task_queue_number", 4, "task queue number")
taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
pprofPort = flag.Int("pprof_port", 65431, "pprof http port")
taskPoolMode = flag.Bool("taskPool", false, "task pool mode")
taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
pprofPort = flag.Int("pprof_port", 65431, "pprof http port")
)
var (
taskPool *gxsync.TaskPool
taskPool gxsync.GenericTaskPool
)
func main() {
......@@ -56,11 +54,7 @@ func main() {
util.Profiling(*pprofPort)
if *taskPoolMode {
taskPool = gxsync.NewTaskPool(
gxsync.WithTaskPoolTaskQueueLength(*taskPoolQueueLength),
gxsync.WithTaskPoolTaskQueueNumber(*taskPoolQueueNumber),
gxsync.WithTaskPoolTaskPoolSize(*taskPoolSize),
)
taskPool = gxsync.NewTaskPoolSimple(*taskPoolSize)
}
keyPath, _ := filepath.Abs("./demo/hello/tls/certs/ca.key")
caPemPath, _ := filepath.Abs("./demo/hello/tls/certs/ca.pem")
......@@ -74,6 +68,7 @@ func main() {
getty.WithClientSslEnabled(true),
getty.WithClientTlsConfigBuilder(config),
getty.WithConnectionNumber(*connections),
getty.WithClientTaskPool(taskPool),
)
client.RunEventLoop(NewHelloClientSession)
......
......@@ -42,8 +42,7 @@ func InitialSession(session getty.Session) (err error) {
_, ok := session.Conn().(*tls.Conn)
if ok {
session.SetName("hello")
session.SetMaxMsgLen(128)
session.SetWQLen(512)
session.SetMaxMsgLen(128 * 1024) // max message package length is 128k
session.SetReadTimeout(time.Second)
session.SetWriteTimeout(5 * time.Second)
session.SetCronPeriod(int(hello.CronPeriod / 1e6))
......
......@@ -33,16 +33,14 @@ import (
)
var (
taskPoolMode = flag.Bool("taskPool", false, "task pool mode")
taskPoolQueueLength = flag.Int("task_queue_length", 100, "task queue length")
taskPoolQueueNumber = flag.Int("task_queue_number", 4, "task queue number")
taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
pprofPort = flag.Int("pprof_port", 65432, "pprof http port")
Sessions []getty.Session
taskPoolMode = flag.Bool("taskPool", false, "task pool mode")
taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
pprofPort = flag.Int("pprof_port", 65432, "pprof http port")
Sessions []getty.Session
)
var (
taskPool *gxsync.TaskPool
taskPool gxsync.GenericTaskPool
)
func main() {
......@@ -61,17 +59,13 @@ func main() {
ServerTrustCertCollectionPath: caPemPath,
}
if *taskPoolMode {
taskPool = gxsync.NewTaskPoolSimple(*taskPoolSize)
}
options := []getty.ServerOption{getty.WithLocalAddress(":8090"),
getty.WithServerSslEnabled(true),
getty.WithServerTlsConfigBuilder(c),
}
if *taskPoolMode {
taskPool = gxsync.NewTaskPool(
gxsync.WithTaskPoolTaskQueueLength(*taskPoolQueueLength),
gxsync.WithTaskPoolTaskQueueNumber(*taskPoolQueueNumber),
gxsync.WithTaskPoolTaskPoolSize(*taskPoolSize),
)
getty.WithServerTaskPool(taskPool),
}
server := getty.NewTCPServer(options...)
......@@ -86,7 +80,6 @@ func NewHelloServerSession(session getty.Session) (err error) {
if err != nil {
return
}
session.SetTaskPool(taskPool)
return
}
......@@ -163,10 +163,7 @@ type Session interface {
SetWriter(Writer)
SetCronPeriod(int)
SetWQLen(int)
SetWaitTime(time.Duration)
// Deprecated: don't use SetTaskPool, move to endpoints layer.
SetTaskPool(*gxsync.TaskPool)
GetAttribute(interface{}) interface{}
SetAttribute(interface{}, interface{})
......
......@@ -3,7 +3,7 @@ module github.com/apache/dubbo-getty
go 1.14
require (
github.com/dubbogo/gost v1.9.8
github.com/dubbogo/gost v1.9.9
github.com/golang/snappy v0.0.1
github.com/gorilla/websocket v1.4.2
github.com/pkg/errors v0.9.1
......
......@@ -2,12 +2,12 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dubbogo/gost v1.9.8 h1:ciAvb0M0rYh3j7+RZsf4QLyDjWVIW/fZbSBsDgHJA7M=
github.com/dubbogo/gost v1.9.8/go.mod h1:XfXynl+iquKdvsklRfl/JlqE+i0sOhHWepH9vPdhGOY=
github.com/dubbogo/gost v1.9.9 h1:fOZU5fSpZI3ZBZKTLPmTXtAfPoAdp7H8TcnCSUiIRkc=
github.com/dubbogo/gost v1.9.9/go.mod h1:+mQGS51XQEUWZP2JeGZTxJwipjRKtJO7Tr+FOg+72rI=
github.com/dubbogo/jsonparser v1.0.1/go.mod h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU=
github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI=
github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
......@@ -25,7 +25,6 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/mattn/go-colorable v0.1.7/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
......@@ -36,7 +35,6 @@ github.com/shirou/gopsutil v3.20.11-0.20201116082039-2fb5da2f2449+incompatible h
github.com/shirou/gopsutil v3.20.11-0.20201116082039-2fb5da2f2449+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
......@@ -69,12 +67,10 @@ golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
......
......@@ -35,7 +35,8 @@ type ServerOptions struct {
cert string
privateKey string
caCert string
tPool gxsync.GenericTaskPool
// task queue
tPool gxsync.GenericTaskPool
}
// @addr server listen address.
......@@ -52,7 +53,7 @@ func WithWebsocketServerPath(path string) ServerOption {
}
}
// @certs: server certificate file
// @cert: server certificate file
func WithWebsocketServerCert(cert string) ServerOption {
return func(o *ServerOptions) {
o.cert = cert
......@@ -66,7 +67,7 @@ func WithWebsocketServerPrivateKey(key string) ServerOption {
}
}
// @certs is the root certificate file to verify the legitimacy of server
// @cert is the root certificate file to verify the legitimacy of server
func WithWebsocketServerRootCert(cert string) ServerOption {
return func(o *ServerOptions) {
o.caCert = cert
......@@ -109,10 +110,11 @@ type ClientOptions struct {
sslEnabled bool
tlsConfigBuilder TlsConfigBuilder
// the certs file of wss server which may contain server domain, server ip, the starting effective date, effective
// the cert file of wss server which may contain server domain, server ip, the starting effective date, effective
// duration, the hash alg, the len of the private key.
// wss client will use it.
cert string
cert string
// task queue
tPool gxsync.GenericTaskPool
}
......
......@@ -99,7 +99,7 @@ func NewWSSServer(opts ...ServerOption) Server {
s := newServer(WSS_SERVER, opts...)
if s.addr == "" || s.cert == "" || s.privateKey == "" {
panic(fmt.Sprintf("@addr:%s, @certs:%s, @privateKey:%s, @caCert:%s",
panic(fmt.Sprintf("@addr:%s, @cert:%s, @privateKey:%s, @caCert:%s",
s.addr, s.cert, s.privateKey, s.caCert))
}
......
......@@ -43,9 +43,7 @@ const (
netIOTimeout = 1e9 // 1s
period = 60 * 1e9 // 1 minute
pendingDuration = 3e9
defaultQLen = 1024
maxIovecNum = 10
//MaxWheelTimeSpan 900s, 15 minute
// MaxWheelTimeSpan 900s, 15 minute
MaxWheelTimeSpan = 900e9
defaultSessionName = "session"
......@@ -88,13 +86,8 @@ type session struct {
reader Reader // @reader should be nil when @conn is a gettyWSConn object.
writer Writer
// write
wQ chan interface{}
// handle logic
maxMsgLen int32
// Deprecated: don't use tPool, move to endpoints layer.
tPool *gxsync.TaskPool
// heartbeat
period time.Duration
......@@ -299,18 +292,6 @@ func (s *session) SetCronPeriod(period int) {
s.period = time.Duration(period) * time.Millisecond
}
// set @session's Write queue size
func (s *session) SetWQLen(writeQLen int) {
if writeQLen < 1 {
panic("@writeQLen < 1")
}
s.lock.Lock()
defer s.lock.Unlock()
s.wQ = make(chan interface{}, writeQLen)
log.Debugf("%s, [session.SetWQLen] wQ{len:%d, cap:%d}", s.Stat(), len(s.wQ), cap(s.wQ))
}
// set maximum wait time when session got error or got exit signal
func (s *session) SetWaitTime(waitTime time.Duration) {
if waitTime < 1 {
......@@ -391,38 +372,30 @@ func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error {
}
}()
if timeout <= 0 {
pkgBytes, err := s.writer.Write(s, pkg)
if err != nil {
log.Warnf("%s, [session.WritePkg] session.writer.Write(@pkg:%#v) = error:%+v", s.Stat(), pkg, err)
return perrors.WithStack(err)
}
var udpCtxPtr *UDPContext
if udpCtx, ok := pkg.(UDPContext); ok {
udpCtxPtr = &udpCtx
} else if udpCtxP, ok := pkg.(*UDPContext); ok {
udpCtxPtr = udpCtxP
}
if udpCtxPtr != nil {
udpCtxPtr.Pkg = pkgBytes
pkg = *udpCtxPtr
} else {
pkg = pkgBytes
}
_, err = s.Connection.send(pkg)
if err != nil {
log.Warnf("%s, [session.WritePkg] @s.Connection.Write(pkg:%#v) = err:%+v", s.Stat(), pkg, err)
return perrors.WithStack(err)
}
return nil
pkgBytes, err := s.writer.Write(s, pkg)
if err != nil {
log.Warnf("%s, [session.WritePkg] session.writer.Write(@pkg:%#v) = error:%+v", s.Stat(), pkg, err)
return perrors.WithStack(err)
}
select {
case s.wQ <- pkg:
break // for possible gen a new pkg
case <-wheel.After(timeout):
log.Warnf("%s, [session.WritePkg] wQ{len:%d, cap:%d}", s.Stat(), len(s.wQ), cap(s.wQ))
return ErrSessionBlocked
var udpCtxPtr *UDPContext
if udpCtx, ok := pkg.(UDPContext); ok {
udpCtxPtr = &udpCtx
} else if udpCtxP, ok := pkg.(*UDPContext); ok {
udpCtxPtr = udpCtxP
}
if udpCtxPtr != nil {
udpCtxPtr.Pkg = pkgBytes
pkg = *udpCtxPtr
} else {
pkg = pkgBytes
}
if 0 < timeout {
s.Connection.SetWriteTimeout(timeout)
}
_, err = s.Connection.send(pkg)
if err != nil {
log.Warnf("%s, [session.WritePkg] @s.Connection.Write(pkg:%#v) = err:%+v", s.Stat(), pkg, err)
return perrors.WithStack(err)
}
return nil
......@@ -446,9 +419,7 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error {
if s.IsClosed() {
return ErrSessionClosed
}
// s.conn.SetWriteTimeout(time.Now().Add(s.wTimeout))
if len(pkgs) == 1 {
// return s.Connection.Write(pkgs[0])
return s.WriteBytes(pkgs[0])
}
......@@ -506,10 +477,6 @@ func (s *session) run() {
panic(errStr)
}
if s.wQ == nil {
s.wQ = make(chan interface{}, defaultQLen)
}
// call session opened
s.UpdateActive()
if err := s.listener.OnOpen(s); err != nil {
......@@ -526,17 +493,9 @@ func (s *session) run() {
func (s *session) handleLoop() {
var (
err error
ok bool
flag bool
wsFlag bool
udpFlag bool
loopFlag bool
wsConn *gettyWSConn
counter gxtime.CountWatch
outPkg interface{}
pkgBytes []byte
iovec [][]byte
wsFlag bool
wsConn *gettyWSConn
counter gxtime.CountWatch
)
defer func() {
......@@ -553,96 +512,27 @@ func (s *session) handleLoop() {
s.gc()
}()
flag = true // do not do any read/Write/cron operation while got Write error
wsConn, wsFlag = s.Connection.(*gettyWSConn)
_, udpFlag = s.Connection.(*gettyUDPConn)
iovec = make([][]byte, 0, maxIovecNum)
LOOP:
for {
// A select blocks until one of its cases is ready to run.
// It choose one at random if multiple are ready. Otherwise it choose default branch if none is ready.
select {
case <-s.done:
// this case branch assure the (session)handleLoop gr will exit before (session)handlePackage gr.
// this case branch assure the (session)handleLoop gr will exit after (session)handlePackage gr.
<-s.rDone
if len(s.wQ) == 0 {
log.Infof("%s, [session.handleLoop] got done signal. wQ is nil.", s.Stat())
break LOOP
}
counter.Start()
if counter.Count() > s.wait.Nanoseconds() {
log.Infof("%s, [session.handleLoop] got done signal ", s.Stat())
break LOOP
}
case outPkg, ok = <-s.wQ:
if !ok {
continue
}
if !flag {
log.Warnf("[session.handleLoop] drop write out package %#v", outPkg)
continue
}
if udpFlag || wsFlag {
err = s.WritePkg(outPkg, 0)
if err != nil {
log.Errorf("%s, [session.handleLoop] = error:%+v", s.sessionToken(), perrors.WithStack(err))
s.stop()
// break LOOP
flag = false
}
continue
}
iovec = iovec[:0]
for idx := 0; idx < maxIovecNum; idx++ {
pkgBytes, err = s.writer.Write(s, outPkg)
if err != nil {
log.Errorf("%s, [session.handleLoop] = error:%+v", s.sessionToken(), perrors.WithStack(err))
s.stop()
// break LOOP
flag = false
break
}
iovec = append(iovec, pkgBytes)
if idx < maxIovecNum-1 {
loopFlag = true
select {
case outPkg, ok = <-s.wQ:
if !ok {
loopFlag = false
}
default:
loopFlag = false
break
}
if !loopFlag {
break // break for-idx loop
}
}
}
err = s.WriteBytesArray(iovec[:]...)
if err != nil {
log.Errorf("%s, [session.handleLoop]s.WriteBytesArray(iovec len:%d) = error:%+v",
s.sessionToken(), len(iovec), perrors.WithStack(err))
s.stop()
// break LOOP
flag = false
}
case <-wheel.After(s.period):
if flag {
if wsFlag {
err := wsConn.writePing()
if err != nil {
log.Warnf("wsConn.writePing() = error:%+v", perrors.WithStack(err))
}
if wsFlag {
err := wsConn.writePing()
if err != nil {
log.Warnf("wsConn.writePing() = error:%+v", perrors.WithStack(err))
}
s.listener.OnCron(s)
}
s.listener.OnCron(s)
}
}
}
......@@ -943,25 +833,20 @@ func (s *session) stop() {
func (s *session) gc() {
var (
wQ chan interface{}
conn Connection
)
s.lock.Lock()
if s.attrs != nil {
s.attrs = nil
if s.wQ != nil {
wQ = s.wQ
s.wQ = nil
}
conn = s.Connection
s.Connection = nil
}
s.lock.Unlock()
go func() {
if wQ != nil {
conn.close((int)((int64)(s.wait)))
close(wQ)
if conn != nil {
conn.close(int(s.wait))
}
}()
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment