Unverified Commit a68aa365 authored by Xin.Zh's avatar Xin.Zh Committed by GitHub

Merge pull request #54 from apache/feature/delet_wq

Rem: session.wQ
parents ff084291 020cac6d
...@@ -24,7 +24,7 @@ jobs: ...@@ -24,7 +24,7 @@ jobs:
# DING_SIGN: SECbcc50d56d7315e57da8469d05da306d6cd825348a781861a42084e9579f1aebb # DING_SIGN: SECbcc50d56d7315e57da8469d05da306d6cd825348a781861a42084e9579f1aebb
DING_TOKEN: ${{ secrets.DING_TOKEN }} DING_TOKEN: ${{ secrets.DING_TOKEN }}
DING_SIGN: ${{ secrets.DING_SIGN }} DING_SIGN: ${{ secrets.DING_SIGN }}
steps: steps:
- name: Set up Go ${{ matrix.go_version }} - name: Set up Go ${{ matrix.go_version }}
...@@ -59,6 +59,12 @@ jobs: ...@@ -59,6 +59,12 @@ jobs:
go fmt ./... && [[ -z `git status -s` ]] go fmt ./... && [[ -z `git status -s` ]]
/tmp/tools/license/license-header-checker -v -a -r -i vendor /tmp/tools/license/license.txt . go && [[ -z `git status -s` ]] /tmp/tools/license/license-header-checker -v -a -r -i vendor /tmp/tools/license/license.txt . go && [[ -z `git status -s` ]]
- name: Install go ci lint
run: curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.27.0
- name: Run Linter
run: golangci-lint run --timeout=10m -v --disable-all --enable=govet --enable=staticcheck --enable=ineffassign --enable=misspell
- name: Test - name: Test
run: go mod vendor && go test $(go list ./... | grep -v vendor | grep -v demo) -coverprofile=coverage.txt -covermode=atomic run: go mod vendor && go test $(go list ./... | grep -v vendor | grep -v demo) -coverprofile=coverage.txt -covermode=atomic
...@@ -71,7 +77,7 @@ jobs: ...@@ -71,7 +77,7 @@ jobs:
uses: zcong1993/actions-ding@v3.0.1 uses: zcong1993/actions-ding@v3.0.1
# Whether job is successful or not, always () is always true. # Whether job is successful or not, always () is always true.
if: | if: |
always() && always() &&
github.event_name == 'push' && github.event_name == 'push' &&
github.repository == 'apache/dubbo-getty' github.repository == 'apache/dubbo-getty'
with: with:
...@@ -87,11 +93,11 @@ jobs: ...@@ -87,11 +93,11 @@ jobs:
"text": "## Github Actions \n - name: CI \n - repository: ${{ github.repository }} \n - trigger: ${{ github.actor }} \n - event: ${{ github.event_name }} \n - ref: ${{ github.ref }} \n - status: [${{ job.status }}](https://github.com/${{ github.repository }}/actions/runs/${{ github.run_id }}) \n - environment: ${{ runner.os }} \n - SHA: [${{ github.sha }}](${{ github.event.compare }})" "text": "## Github Actions \n - name: CI \n - repository: ${{ github.repository }} \n - trigger: ${{ github.actor }} \n - event: ${{ github.event_name }} \n - ref: ${{ github.ref }} \n - status: [${{ job.status }}](https://github.com/${{ github.repository }}/actions/runs/${{ github.run_id }}) \n - environment: ${{ runner.os }} \n - SHA: [${{ github.sha }}](${{ github.event.compare }})"
} }
} }
- name: DingTalk Message Notify only PR - name: DingTalk Message Notify only PR
uses: zcong1993/actions-ding@v3.0.1 uses: zcong1993/actions-ding@v3.0.1
if: | if: |
always() && always() &&
github.event_name == 'pull_request' && github.event_name == 'pull_request' &&
github.repository == 'apache/dubbo-getty' github.repository == 'apache/dubbo-getty'
with: with:
......
...@@ -126,7 +126,7 @@ func NewWSSClient(opts ...ClientOption) Client { ...@@ -126,7 +126,7 @@ func NewWSSClient(opts ...ClientOption) Client {
c := newClient(WSS_CLIENT, opts...) c := newClient(WSS_CLIENT, opts...)
if c.cert == "" { if c.cert == "" {
panic(fmt.Sprintf("@certs:%s", c.cert)) panic(fmt.Sprintf("@cert:%s", c.cert))
} }
if !strings.HasPrefix(c.addr, "wss://") { if !strings.HasPrefix(c.addr, "wss://") {
panic(fmt.Sprintf("the prefix @serverAddr:%s is not wss://", c.addr)) panic(fmt.Sprintf("the prefix @serverAddr:%s is not wss://", c.addr))
...@@ -135,11 +135,11 @@ func NewWSSClient(opts ...ClientOption) Client { ...@@ -135,11 +135,11 @@ func NewWSSClient(opts ...ClientOption) Client {
return c return c
} }
func (c client) ID() EndPointID { func (c *client) ID() EndPointID {
return c.endPointID return c.endPointID
} }
func (c client) EndPointType() EndPointType { func (c *client) EndPointType() EndPointType {
return c.endPointType return c.endPointType
} }
...@@ -154,7 +154,7 @@ func (c *client) dialTCP() Session { ...@@ -154,7 +154,7 @@ func (c *client) dialTCP() Session {
return nil return nil
} }
if c.sslEnabled { if c.sslEnabled {
if sslConfig, err := c.tlsConfigBuilder.BuildTlsConfig(); err == nil && sslConfig != nil { if sslConfig, buildTlsConfErr := c.tlsConfigBuilder.BuildTlsConfig(); buildTlsConfErr == nil && sslConfig != nil {
d := &net.Dialer{Timeout: connectTimeout} d := &net.Dialer{Timeout: connectTimeout}
conn, err = tls.DialWithDialer(d, "tcp", c.addr, sslConfig) conn, err = tls.DialWithDialer(d, "tcp", c.addr, sslConfig)
} }
...@@ -285,7 +285,7 @@ func (c *client) dialWSS() Session { ...@@ -285,7 +285,7 @@ func (c *client) dialWSS() Session {
if c.cert != "" { if c.cert != "" {
certPEMBlock, err := ioutil.ReadFile(c.cert) certPEMBlock, err := ioutil.ReadFile(c.cert)
if err != nil { if err != nil {
panic(fmt.Sprintf("ioutil.ReadFile(certs:%s) = error:%+v", c.cert, perrors.WithStack(err))) panic(fmt.Sprintf("ioutil.ReadFile(cert:%s) = error:%+v", c.cert, perrors.WithStack(err)))
} }
var cert tls.Certificate var cert tls.Certificate
...@@ -307,7 +307,7 @@ func (c *client) dialWSS() Session { ...@@ -307,7 +307,7 @@ func (c *client) dialWSS() Session {
for _, c := range config.Certificates { for _, c := range config.Certificates {
roots, err = x509.ParseCertificates(c.Certificate[len(c.Certificate)-1]) roots, err = x509.ParseCertificates(c.Certificate[len(c.Certificate)-1])
if err != nil { if err != nil {
panic(fmt.Sprintf("error parsing server's root certs: %+v\n", perrors.WithStack(err))) panic(fmt.Sprintf("error parsing server's root cert: %+v\n", perrors.WithStack(err)))
} }
for _, root = range roots { for _, root = range roots {
certPool.AddCert(root) certPool.AddCert(root)
......
...@@ -84,15 +84,13 @@ func (p *Package) Unmarshal(buf *bytes.Buffer) (int, error) { return 0, nil } ...@@ -84,15 +84,13 @@ func (p *Package) Unmarshal(buf *bytes.Buffer) (int, error) { return 0, nil }
func newSessionCallback(session Session, handler *MessageHandler) error { func newSessionCallback(session Session, handler *MessageHandler) error {
var pkgHandler PackageHandler var pkgHandler PackageHandler
session.SetName("hello-client-session") session.SetName("hello-client-session")
session.SetMaxMsgLen(1024) session.SetMaxMsgLen(128 * 1024) // max message package length 128k
session.SetPkgHandler(&pkgHandler) session.SetPkgHandler(&pkgHandler)
session.SetEventListener(handler) session.SetEventListener(handler)
session.SetWQLen(32)
session.SetReadTimeout(3e9) session.SetReadTimeout(3e9)
session.SetWriteTimeout(3e9) session.SetWriteTimeout(3e9)
session.SetCronPeriod((int)(30e9 / 1e6)) session.SetCronPeriod((int)(30e9 / 1e6))
session.SetWaitTime(3e9) session.SetWaitTime(3e9)
session.SetTaskPool(nil)
return nil return nil
} }
...@@ -295,8 +293,10 @@ func TestNewWSClient(t *testing.T) { ...@@ -295,8 +293,10 @@ func TestNewWSClient(t *testing.T) {
assert.Equal(t, beforeWriteBytes+5, atomic.LoadUint32(&conn.writeBytes)) assert.Equal(t, beforeWriteBytes+5, atomic.LoadUint32(&conn.writeBytes))
beforeWritePkgNum := atomic.LoadUint32(&conn.writePkgNum) beforeWritePkgNum := atomic.LoadUint32(&conn.writePkgNum)
err = ss.WriteBytes([]byte("hello")) err = ss.WriteBytes([]byte("hello"))
assert.Nil(t, err)
assert.Equal(t, beforeWritePkgNum+1, atomic.LoadUint32(&conn.writePkgNum)) assert.Equal(t, beforeWritePkgNum+1, atomic.LoadUint32(&conn.writePkgNum))
err = ss.WriteBytesArray([]byte("hello"), []byte("hello")) err = ss.WriteBytesArray([]byte("hello"), []byte("hello"))
assert.Nil(t, err)
assert.Equal(t, beforeWritePkgNum+3, atomic.LoadUint32(&conn.writePkgNum)) assert.Equal(t, beforeWritePkgNum+3, atomic.LoadUint32(&conn.writePkgNum))
err = conn.writePing() err = conn.writePing()
assert.Nil(t, err) assert.Nil(t, err)
......
...@@ -16,8 +16,6 @@ run server: ...@@ -16,8 +16,6 @@ run server:
Or run server in task pool mode: Or run server in task pool mode:
```bash ```bash
go run tcp/server/server.go -taskPool=true \ go run tcp/server/server.go -taskPool=true \
-task_queue_length=128 \
-task_queue_number=16 \
-task_pool_size=2000 \ -task_pool_size=2000 \
-pprof_port=60000 -pprof_port=60000
``` ```
...@@ -31,8 +29,6 @@ go run tcp/client/client.go ...@@ -31,8 +29,6 @@ go run tcp/client/client.go
Or run client in task pool mode: Or run client in task pool mode:
```bash ```bash
go run tcp/client/client.go -taskPool=true \ go run tcp/client/client.go -taskPool=true \
-task_queue_length=100 \
-task_queue_number=4 \
-task_pool_size=50 \ -task_pool_size=50 \
-pprof_port=60001 -pprof_port=60001
``` ```
......
...@@ -27,13 +27,14 @@ var ( ...@@ -27,13 +27,14 @@ var (
func ClientRequest() { func ClientRequest() {
for _, session := range Sessions { for _, session := range Sessions {
ss := session
go func() { go func() {
echoTimes := 10 echoTimes := 10
for i := 0; i < echoTimes; i++ { for i := 0; i < echoTimes; i++ {
err := session.WritePkg("hello", WritePkgTimeout) err := ss.WritePkg("hello", WritePkgTimeout)
if err != nil { if err != nil {
log.Infof("session.WritePkg(session{%s}, error{%v}", session.Stat(), err) log.Infof("session.WritePkg(session{%s}, error{%v}", ss.Stat(), err)
session.Close() ss.Close()
} }
} }
log.Infof("after loop %d times", echoTimes) log.Infof("after loop %d times", echoTimes)
......
...@@ -36,11 +36,9 @@ var ( ...@@ -36,11 +36,9 @@ var (
ip = flag.String("ip", "127.0.0.1", "server IP") ip = flag.String("ip", "127.0.0.1", "server IP")
connections = flag.Int("conn", 1, "number of tcp connections") connections = flag.Int("conn", 1, "number of tcp connections")
taskPoolMode = flag.Bool("taskPool", false, "task pool mode") taskPoolMode = flag.Bool("taskPool", false, "task pool mode")
taskPoolQueueLength = flag.Int("task_queue_length", 100, "task queue length") taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
taskPoolQueueNumber = flag.Int("task_queue_number", 4, "task queue number") pprofPort = flag.Int("pprof_port", 65431, "pprof http port")
taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
pprofPort = flag.Int("pprof_port", 65431, "pprof http port")
) )
var ( var (
...@@ -55,16 +53,13 @@ func main() { ...@@ -55,16 +53,13 @@ func main() {
util.Profiling(*pprofPort) util.Profiling(*pprofPort)
if *taskPoolMode { if *taskPoolMode {
taskPool = gxsync.NewTaskPool( taskPool = gxsync.NewTaskPoolSimple(*taskPoolSize)
gxsync.WithTaskPoolTaskQueueLength(*taskPoolQueueLength),
gxsync.WithTaskPoolTaskQueueNumber(*taskPoolQueueNumber),
gxsync.WithTaskPoolTaskPoolSize(*taskPoolSize),
)
} }
client := getty.NewTCPClient( client := getty.NewTCPClient(
getty.WithServerAddress(*ip+":8090"), getty.WithServerAddress(*ip+":8090"),
getty.WithConnectionNumber(*connections), getty.WithConnectionNumber(*connections),
getty.WithClientTaskPool(taskPool),
) )
client.RunEventLoop(NewHelloClientSession) client.RunEventLoop(NewHelloClientSession)
......
...@@ -62,8 +62,7 @@ func InitialSession(session getty.Session) (err error) { ...@@ -62,8 +62,7 @@ func InitialSession(session getty.Session) (err error) {
} }
session.SetName("hello") session.SetName("hello")
session.SetMaxMsgLen(128) session.SetMaxMsgLen(128 * 1024) // max message package length is 128k
session.SetWQLen(512)
session.SetReadTimeout(time.Second) session.SetReadTimeout(time.Second)
session.SetWriteTimeout(5 * time.Second) session.SetWriteTimeout(5 * time.Second)
session.SetCronPeriod(int(hello.CronPeriod / 1e6)) session.SetCronPeriod(int(hello.CronPeriod / 1e6))
......
...@@ -32,11 +32,9 @@ import ( ...@@ -32,11 +32,9 @@ import (
) )
var ( var (
taskPoolMode = flag.Bool("taskPool", false, "task pool mode") taskPoolMode = flag.Bool("taskPool", false, "task pool mode")
taskPoolQueueLength = flag.Int("task_queue_length", 100, "task queue length") taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
taskPoolQueueNumber = flag.Int("task_queue_number", 4, "task queue number") pprofPort = flag.Int("pprof_port", 65432, "pprof http port")
taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
pprofPort = flag.Int("pprof_port", 65432, "pprof http port")
) )
var ( var (
...@@ -53,11 +51,8 @@ func main() { ...@@ -53,11 +51,8 @@ func main() {
options := []getty.ServerOption{getty.WithLocalAddress(":8090")} options := []getty.ServerOption{getty.WithLocalAddress(":8090")}
if *taskPoolMode { if *taskPoolMode {
taskPool = gxsync.NewTaskPool( taskPool = gxsync.NewTaskPoolSimple(*taskPoolSize)
gxsync.WithTaskPoolTaskQueueLength(*taskPoolQueueLength), options = append(options, getty.WithServerTaskPool(taskPool))
gxsync.WithTaskPoolTaskQueueNumber(*taskPoolQueueNumber),
gxsync.WithTaskPoolTaskPoolSize(*taskPoolSize),
)
} }
server := getty.NewTCPServer(options...) server := getty.NewTCPServer(options...)
......
...@@ -37,15 +37,13 @@ var ( ...@@ -37,15 +37,13 @@ var (
ip = flag.String("ip", "127.0.0.1", "server IP") ip = flag.String("ip", "127.0.0.1", "server IP")
connections = flag.Int("conn", 1, "number of tcp connections") connections = flag.Int("conn", 1, "number of tcp connections")
taskPoolMode = flag.Bool("taskPool", false, "task pool mode") taskPoolMode = flag.Bool("taskPool", false, "task pool mode")
taskPoolQueueLength = flag.Int("task_queue_length", 100, "task queue length") taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
taskPoolQueueNumber = flag.Int("task_queue_number", 4, "task queue number") pprofPort = flag.Int("pprof_port", 65431, "pprof http port")
taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
pprofPort = flag.Int("pprof_port", 65431, "pprof http port")
) )
var ( var (
taskPool *gxsync.TaskPool taskPool gxsync.GenericTaskPool
) )
func main() { func main() {
...@@ -56,11 +54,7 @@ func main() { ...@@ -56,11 +54,7 @@ func main() {
util.Profiling(*pprofPort) util.Profiling(*pprofPort)
if *taskPoolMode { if *taskPoolMode {
taskPool = gxsync.NewTaskPool( taskPool = gxsync.NewTaskPoolSimple(*taskPoolSize)
gxsync.WithTaskPoolTaskQueueLength(*taskPoolQueueLength),
gxsync.WithTaskPoolTaskQueueNumber(*taskPoolQueueNumber),
gxsync.WithTaskPoolTaskPoolSize(*taskPoolSize),
)
} }
keyPath, _ := filepath.Abs("./demo/hello/tls/certs/ca.key") keyPath, _ := filepath.Abs("./demo/hello/tls/certs/ca.key")
caPemPath, _ := filepath.Abs("./demo/hello/tls/certs/ca.pem") caPemPath, _ := filepath.Abs("./demo/hello/tls/certs/ca.pem")
...@@ -74,6 +68,7 @@ func main() { ...@@ -74,6 +68,7 @@ func main() {
getty.WithClientSslEnabled(true), getty.WithClientSslEnabled(true),
getty.WithClientTlsConfigBuilder(config), getty.WithClientTlsConfigBuilder(config),
getty.WithConnectionNumber(*connections), getty.WithConnectionNumber(*connections),
getty.WithClientTaskPool(taskPool),
) )
client.RunEventLoop(NewHelloClientSession) client.RunEventLoop(NewHelloClientSession)
......
...@@ -42,8 +42,7 @@ func InitialSession(session getty.Session) (err error) { ...@@ -42,8 +42,7 @@ func InitialSession(session getty.Session) (err error) {
_, ok := session.Conn().(*tls.Conn) _, ok := session.Conn().(*tls.Conn)
if ok { if ok {
session.SetName("hello") session.SetName("hello")
session.SetMaxMsgLen(128) session.SetMaxMsgLen(128 * 1024) // max message package length is 128k
session.SetWQLen(512)
session.SetReadTimeout(time.Second) session.SetReadTimeout(time.Second)
session.SetWriteTimeout(5 * time.Second) session.SetWriteTimeout(5 * time.Second)
session.SetCronPeriod(int(hello.CronPeriod / 1e6)) session.SetCronPeriod(int(hello.CronPeriod / 1e6))
......
...@@ -33,16 +33,14 @@ import ( ...@@ -33,16 +33,14 @@ import (
) )
var ( var (
taskPoolMode = flag.Bool("taskPool", false, "task pool mode") taskPoolMode = flag.Bool("taskPool", false, "task pool mode")
taskPoolQueueLength = flag.Int("task_queue_length", 100, "task queue length") taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
taskPoolQueueNumber = flag.Int("task_queue_number", 4, "task queue number") pprofPort = flag.Int("pprof_port", 65432, "pprof http port")
taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size") Sessions []getty.Session
pprofPort = flag.Int("pprof_port", 65432, "pprof http port")
Sessions []getty.Session
) )
var ( var (
taskPool *gxsync.TaskPool taskPool gxsync.GenericTaskPool
) )
func main() { func main() {
...@@ -61,17 +59,13 @@ func main() { ...@@ -61,17 +59,13 @@ func main() {
ServerTrustCertCollectionPath: caPemPath, ServerTrustCertCollectionPath: caPemPath,
} }
if *taskPoolMode {
taskPool = gxsync.NewTaskPoolSimple(*taskPoolSize)
}
options := []getty.ServerOption{getty.WithLocalAddress(":8090"), options := []getty.ServerOption{getty.WithLocalAddress(":8090"),
getty.WithServerSslEnabled(true), getty.WithServerSslEnabled(true),
getty.WithServerTlsConfigBuilder(c), getty.WithServerTlsConfigBuilder(c),
} getty.WithServerTaskPool(taskPool),
if *taskPoolMode {
taskPool = gxsync.NewTaskPool(
gxsync.WithTaskPoolTaskQueueLength(*taskPoolQueueLength),
gxsync.WithTaskPoolTaskQueueNumber(*taskPoolQueueNumber),
gxsync.WithTaskPoolTaskPoolSize(*taskPoolSize),
)
} }
server := getty.NewTCPServer(options...) server := getty.NewTCPServer(options...)
...@@ -86,7 +80,6 @@ func NewHelloServerSession(session getty.Session) (err error) { ...@@ -86,7 +80,6 @@ func NewHelloServerSession(session getty.Session) (err error) {
if err != nil { if err != nil {
return return
} }
session.SetTaskPool(taskPool)
return return
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package util
func SetLimit() {
}
...@@ -29,7 +29,7 @@ type Closer interface { ...@@ -29,7 +29,7 @@ type Closer interface {
func WaitCloseSignals(closer Closer) { func WaitCloseSignals(closer Closer) {
signals := make(chan os.Signal, 1) signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt, os.Kill, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT) signal.Notify(signals, os.Interrupt, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
<-signals <-signals
closer.Close() closer.Close()
} }
...@@ -163,10 +163,7 @@ type Session interface { ...@@ -163,10 +163,7 @@ type Session interface {
SetWriter(Writer) SetWriter(Writer)
SetCronPeriod(int) SetCronPeriod(int)
SetWQLen(int)
SetWaitTime(time.Duration) SetWaitTime(time.Duration)
// Deprecated: don't use SetTaskPool, move to endpoints layer.
SetTaskPool(*gxsync.TaskPool)
GetAttribute(interface{}) interface{} GetAttribute(interface{}) interface{}
SetAttribute(interface{}, interface{}) SetAttribute(interface{}, interface{})
......
...@@ -3,7 +3,7 @@ module github.com/apache/dubbo-getty ...@@ -3,7 +3,7 @@ module github.com/apache/dubbo-getty
go 1.14 go 1.14
require ( require (
github.com/dubbogo/gost v1.9.8 github.com/dubbogo/gost v1.9.9
github.com/golang/snappy v0.0.1 github.com/golang/snappy v0.0.1
github.com/gorilla/websocket v1.4.2 github.com/gorilla/websocket v1.4.2
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
......
...@@ -2,12 +2,12 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ ...@@ -2,12 +2,12 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk= github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dubbogo/gost v1.9.8 h1:ciAvb0M0rYh3j7+RZsf4QLyDjWVIW/fZbSBsDgHJA7M= github.com/dubbogo/gost v1.9.9 h1:fOZU5fSpZI3ZBZKTLPmTXtAfPoAdp7H8TcnCSUiIRkc=
github.com/dubbogo/gost v1.9.8/go.mod h1:XfXynl+iquKdvsklRfl/JlqE+i0sOhHWepH9vPdhGOY= github.com/dubbogo/gost v1.9.9/go.mod h1:+mQGS51XQEUWZP2JeGZTxJwipjRKtJO7Tr+FOg+72rI=
github.com/dubbogo/jsonparser v1.0.1/go.mod h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU=
github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI= github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI=
github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
...@@ -25,7 +25,6 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= ...@@ -25,7 +25,6 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/mattn/go-colorable v0.1.7/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-colorable v0.1.7/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
...@@ -36,7 +35,6 @@ github.com/shirou/gopsutil v3.20.11-0.20201116082039-2fb5da2f2449+incompatible h ...@@ -36,7 +35,6 @@ github.com/shirou/gopsutil v3.20.11-0.20201116082039-2fb5da2f2449+incompatible h
github.com/shirou/gopsutil v3.20.11-0.20201116082039-2fb5da2f2449+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shirou/gopsutil v3.20.11-0.20201116082039-2fb5da2f2449+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
...@@ -69,12 +67,10 @@ golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtn ...@@ -69,12 +67,10 @@ golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs= golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
......
...@@ -35,7 +35,8 @@ type ServerOptions struct { ...@@ -35,7 +35,8 @@ type ServerOptions struct {
cert string cert string
privateKey string privateKey string
caCert string caCert string
tPool gxsync.GenericTaskPool // task queue
tPool gxsync.GenericTaskPool
} }
// @addr server listen address. // @addr server listen address.
...@@ -52,7 +53,7 @@ func WithWebsocketServerPath(path string) ServerOption { ...@@ -52,7 +53,7 @@ func WithWebsocketServerPath(path string) ServerOption {
} }
} }
// @certs: server certificate file // @cert: server certificate file
func WithWebsocketServerCert(cert string) ServerOption { func WithWebsocketServerCert(cert string) ServerOption {
return func(o *ServerOptions) { return func(o *ServerOptions) {
o.cert = cert o.cert = cert
...@@ -66,7 +67,7 @@ func WithWebsocketServerPrivateKey(key string) ServerOption { ...@@ -66,7 +67,7 @@ func WithWebsocketServerPrivateKey(key string) ServerOption {
} }
} }
// @certs is the root certificate file to verify the legitimacy of server // @cert is the root certificate file to verify the legitimacy of server
func WithWebsocketServerRootCert(cert string) ServerOption { func WithWebsocketServerRootCert(cert string) ServerOption {
return func(o *ServerOptions) { return func(o *ServerOptions) {
o.caCert = cert o.caCert = cert
...@@ -109,10 +110,11 @@ type ClientOptions struct { ...@@ -109,10 +110,11 @@ type ClientOptions struct {
sslEnabled bool sslEnabled bool
tlsConfigBuilder TlsConfigBuilder tlsConfigBuilder TlsConfigBuilder
// the certs file of wss server which may contain server domain, server ip, the starting effective date, effective // the cert file of wss server which may contain server domain, server ip, the starting effective date, effective
// duration, the hash alg, the len of the private key. // duration, the hash alg, the len of the private key.
// wss client will use it. // wss client will use it.
cert string cert string
// task queue
tPool gxsync.GenericTaskPool tPool gxsync.GenericTaskPool
} }
......
...@@ -99,27 +99,22 @@ func NewWSSServer(opts ...ServerOption) Server { ...@@ -99,27 +99,22 @@ func NewWSSServer(opts ...ServerOption) Server {
s := newServer(WSS_SERVER, opts...) s := newServer(WSS_SERVER, opts...)
if s.addr == "" || s.cert == "" || s.privateKey == "" { if s.addr == "" || s.cert == "" || s.privateKey == "" {
panic(fmt.Sprintf("@addr:%s, @certs:%s, @privateKey:%s, @caCert:%s", panic(fmt.Sprintf("@addr:%s, @cert:%s, @privateKey:%s, @caCert:%s",
s.addr, s.cert, s.privateKey, s.caCert)) s.addr, s.cert, s.privateKey, s.caCert))
} }
return s return s
} }
func (s server) ID() int32 { func (s *server) ID() int32 {
return s.endPointID return s.endPointID
} }
func (s server) EndPointType() EndPointType { func (s *server) EndPointType() EndPointType {
return s.endPointType return s.endPointType
} }
func (s *server) stop() { func (s *server) stop() {
var (
err error
ctx context.Context
)
select { select {
case <-s.done: case <-s.done:
return return
...@@ -128,12 +123,13 @@ func (s *server) stop() { ...@@ -128,12 +123,13 @@ func (s *server) stop() {
close(s.done) close(s.done)
s.lock.Lock() s.lock.Lock()
if s.server != nil { if s.server != nil {
ctx, _ = context.WithTimeout(context.Background(), serverFastFailTimeout) ctx, cancel := context.WithTimeout(context.Background(), serverFastFailTimeout)
if err = s.server.Shutdown(ctx); err != nil { if err := s.server.Shutdown(ctx); err != nil {
// if the log output is "shutdown ctx: context deadline exceeded", it means that // if the log output is "shutdown ctx: context deadline exceeded", it means that
// there are still some active connections. // there are still some active connections.
log.Errorf("server shutdown ctx:%s error:%v", ctx, err) log.Errorf("server shutdown ctx:%s error:%v", ctx, err)
} }
cancel()
} }
s.server = nil s.server = nil
s.lock.Unlock() s.lock.Unlock()
...@@ -179,7 +175,7 @@ func (s *server) listenTCP() error { ...@@ -179,7 +175,7 @@ func (s *server) listenTCP() error {
} }
} else { } else {
if s.sslEnabled { if s.sslEnabled {
if sslConfig, err := s.tlsConfigBuilder.BuildTlsConfig(); err == nil && sslConfig != nil { if sslConfig, buildTlsConfErr := s.tlsConfigBuilder.BuildTlsConfig(); buildTlsConfErr == nil && sslConfig != nil {
streamListener, err = tls.Listen("tcp", s.addr, sslConfig) streamListener, err = tls.Listen("tcp", s.addr, sslConfig)
} }
} else { } else {
...@@ -420,7 +416,6 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) { ...@@ -420,7 +416,6 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
if certificate, err = tls.LoadX509KeyPair(s.cert, s.privateKey); err != nil { if certificate, err = tls.LoadX509KeyPair(s.cert, s.privateKey); err != nil {
panic(fmt.Sprintf("tls.LoadX509KeyPair(certs{%s}, privateKey{%s}) = err:%+v", panic(fmt.Sprintf("tls.LoadX509KeyPair(certs{%s}, privateKey{%s}) = err:%+v",
s.cert, s.privateKey, perrors.WithStack(err))) s.cert, s.privateKey, perrors.WithStack(err)))
return
} }
config = &tls.Config{ config = &tls.Config{
InsecureSkipVerify: true, // do not verify peer certs InsecureSkipVerify: true, // do not verify peer certs
......
...@@ -19,6 +19,7 @@ package getty ...@@ -19,6 +19,7 @@ package getty
import ( import (
"bytes" "bytes"
"context"
"fmt" "fmt"
"io" "io"
"net" "net"
...@@ -31,7 +32,6 @@ import ( ...@@ -31,7 +32,6 @@ import (
import ( import (
gxbytes "github.com/dubbogo/gost/bytes" gxbytes "github.com/dubbogo/gost/bytes"
gxcontext "github.com/dubbogo/gost/context" gxcontext "github.com/dubbogo/gost/context"
gxsync "github.com/dubbogo/gost/sync"
gxtime "github.com/dubbogo/gost/time" gxtime "github.com/dubbogo/gost/time"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
...@@ -43,9 +43,7 @@ const ( ...@@ -43,9 +43,7 @@ const (
netIOTimeout = 1e9 // 1s netIOTimeout = 1e9 // 1s
period = 60 * 1e9 // 1 minute period = 60 * 1e9 // 1 minute
pendingDuration = 3e9 pendingDuration = 3e9
defaultQLen = 1024 // MaxWheelTimeSpan 900s, 15 minute
maxIovecNum = 10
//MaxWheelTimeSpan 900s, 15 minute
MaxWheelTimeSpan = 900e9 MaxWheelTimeSpan = 900e9
defaultSessionName = "session" defaultSessionName = "session"
...@@ -88,13 +86,8 @@ type session struct { ...@@ -88,13 +86,8 @@ type session struct {
reader Reader // @reader should be nil when @conn is a gettyWSConn object. reader Reader // @reader should be nil when @conn is a gettyWSConn object.
writer Writer writer Writer
// write
wQ chan interface{}
// handle logic // handle logic
maxMsgLen int32 maxMsgLen int32
// Deprecated: don't use tPool, move to endpoints layer.
tPool *gxsync.TaskPool
// heartbeat // heartbeat
period time.Duration period time.Duration
...@@ -128,7 +121,7 @@ func newSession(endPoint EndPoint, conn Connection) *session { ...@@ -128,7 +121,7 @@ func newSession(endPoint EndPoint, conn Connection) *session {
once: &sync.Once{}, once: &sync.Once{},
done: make(chan struct{}), done: make(chan struct{}),
wait: pendingDuration, wait: pendingDuration,
attrs: gxcontext.NewValuesContext(nil), attrs: gxcontext.NewValuesContext(context.Background()),
rDone: make(chan struct{}), rDone: make(chan struct{}),
} }
...@@ -170,7 +163,7 @@ func (s *session) Reset() { ...@@ -170,7 +163,7 @@ func (s *session) Reset() {
done: make(chan struct{}), done: make(chan struct{}),
period: period, period: period,
wait: pendingDuration, wait: pendingDuration,
attrs: gxcontext.NewValuesContext(nil), attrs: gxcontext.NewValuesContext(context.Background()),
rDone: make(chan struct{}), rDone: make(chan struct{}),
} }
} }
...@@ -299,18 +292,6 @@ func (s *session) SetCronPeriod(period int) { ...@@ -299,18 +292,6 @@ func (s *session) SetCronPeriod(period int) {
s.period = time.Duration(period) * time.Millisecond s.period = time.Duration(period) * time.Millisecond
} }
// set @session's Write queue size
func (s *session) SetWQLen(writeQLen int) {
if writeQLen < 1 {
panic("@writeQLen < 1")
}
s.lock.Lock()
defer s.lock.Unlock()
s.wQ = make(chan interface{}, writeQLen)
log.Debugf("%s, [session.SetWQLen] wQ{len:%d, cap:%d}", s.Stat(), len(s.wQ), cap(s.wQ))
}
// set maximum wait time when session got error or got exit signal // set maximum wait time when session got error or got exit signal
func (s *session) SetWaitTime(waitTime time.Duration) { func (s *session) SetWaitTime(waitTime time.Duration) {
if waitTime < 1 { if waitTime < 1 {
...@@ -322,14 +303,6 @@ func (s *session) SetWaitTime(waitTime time.Duration) { ...@@ -322,14 +303,6 @@ func (s *session) SetWaitTime(waitTime time.Duration) {
s.wait = waitTime s.wait = waitTime
} }
// Deprecated: set task pool
func (s *session) SetTaskPool(p *gxsync.TaskPool) {
s.lock.Lock()
defer s.lock.Unlock()
s.tPool = p
}
// set attribute of key @session:key // set attribute of key @session:key
func (s *session) GetAttribute(key interface{}) interface{} { func (s *session) GetAttribute(key interface{}) interface{} {
s.lock.RLock() s.lock.RLock()
...@@ -391,38 +364,30 @@ func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error { ...@@ -391,38 +364,30 @@ func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error {
} }
}() }()
if timeout <= 0 { pkgBytes, err := s.writer.Write(s, pkg)
pkgBytes, err := s.writer.Write(s, pkg) if err != nil {
if err != nil { log.Warnf("%s, [session.WritePkg] session.writer.Write(@pkg:%#v) = error:%+v", s.Stat(), pkg, err)
log.Warnf("%s, [session.WritePkg] session.writer.Write(@pkg:%#v) = error:%+v", s.Stat(), pkg, err) return perrors.WithStack(err)
return perrors.WithStack(err)
}
var udpCtxPtr *UDPContext
if udpCtx, ok := pkg.(UDPContext); ok {
udpCtxPtr = &udpCtx
} else if udpCtxP, ok := pkg.(*UDPContext); ok {
udpCtxPtr = udpCtxP
}
if udpCtxPtr != nil {
udpCtxPtr.Pkg = pkgBytes
pkg = *udpCtxPtr
} else {
pkg = pkgBytes
}
_, err = s.Connection.send(pkg)
if err != nil {
log.Warnf("%s, [session.WritePkg] @s.Connection.Write(pkg:%#v) = err:%+v", s.Stat(), pkg, err)
return perrors.WithStack(err)
}
return nil
} }
select { var udpCtxPtr *UDPContext
case s.wQ <- pkg: if udpCtx, ok := pkg.(UDPContext); ok {
break // for possible gen a new pkg udpCtxPtr = &udpCtx
} else if udpCtxP, ok := pkg.(*UDPContext); ok {
case <-wheel.After(timeout): udpCtxPtr = udpCtxP
log.Warnf("%s, [session.WritePkg] wQ{len:%d, cap:%d}", s.Stat(), len(s.wQ), cap(s.wQ)) }
return ErrSessionBlocked if udpCtxPtr != nil {
udpCtxPtr.Pkg = pkgBytes
pkg = *udpCtxPtr
} else {
pkg = pkgBytes
}
if 0 < timeout {
s.Connection.SetWriteTimeout(timeout)
}
_, err = s.Connection.send(pkg)
if err != nil {
log.Warnf("%s, [session.WritePkg] @s.Connection.Write(pkg:%#v) = err:%+v", s.Stat(), pkg, err)
return perrors.WithStack(err)
} }
return nil return nil
...@@ -446,9 +411,7 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error { ...@@ -446,9 +411,7 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error {
if s.IsClosed() { if s.IsClosed() {
return ErrSessionClosed return ErrSessionClosed
} }
// s.conn.SetWriteTimeout(time.Now().Add(s.wTimeout))
if len(pkgs) == 1 { if len(pkgs) == 1 {
// return s.Connection.Write(pkgs[0])
return s.WriteBytes(pkgs[0]) return s.WriteBytes(pkgs[0])
} }
...@@ -506,10 +469,6 @@ func (s *session) run() { ...@@ -506,10 +469,6 @@ func (s *session) run() {
panic(errStr) panic(errStr)
} }
if s.wQ == nil {
s.wQ = make(chan interface{}, defaultQLen)
}
// call session opened // call session opened
s.UpdateActive() s.UpdateActive()
if err := s.listener.OnOpen(s); err != nil { if err := s.listener.OnOpen(s); err != nil {
...@@ -526,17 +485,9 @@ func (s *session) run() { ...@@ -526,17 +485,9 @@ func (s *session) run() {
func (s *session) handleLoop() { func (s *session) handleLoop() {
var ( var (
err error wsFlag bool
ok bool wsConn *gettyWSConn
flag bool counter gxtime.CountWatch
wsFlag bool
udpFlag bool
loopFlag bool
wsConn *gettyWSConn
counter gxtime.CountWatch
outPkg interface{}
pkgBytes []byte
iovec [][]byte
) )
defer func() { defer func() {
...@@ -553,96 +504,27 @@ func (s *session) handleLoop() { ...@@ -553,96 +504,27 @@ func (s *session) handleLoop() {
s.gc() s.gc()
}() }()
flag = true // do not do any read/Write/cron operation while got Write error
wsConn, wsFlag = s.Connection.(*gettyWSConn) wsConn, wsFlag = s.Connection.(*gettyWSConn)
_, udpFlag = s.Connection.(*gettyUDPConn)
iovec = make([][]byte, 0, maxIovecNum)
LOOP: LOOP:
for { for {
// A select blocks until one of its cases is ready to run.
// It choose one at random if multiple are ready. Otherwise it choose default branch if none is ready.
select { select {
case <-s.done: case <-s.done:
// this case branch assure the (session)handleLoop gr will exit before (session)handlePackage gr. // this case branch assure the (session)handleLoop gr will exit after (session)handlePackage gr.
<-s.rDone <-s.rDone
if len(s.wQ) == 0 {
log.Infof("%s, [session.handleLoop] got done signal. wQ is nil.", s.Stat())
break LOOP
}
counter.Start() counter.Start()
if counter.Count() > s.wait.Nanoseconds() { if counter.Count() > s.wait.Nanoseconds() {
log.Infof("%s, [session.handleLoop] got done signal ", s.Stat()) log.Infof("%s, [session.handleLoop] got done signal ", s.Stat())
break LOOP break LOOP
} }
case outPkg, ok = <-s.wQ:
if !ok {
continue
}
if !flag {
log.Warnf("[session.handleLoop] drop write out package %#v", outPkg)
continue
}
if udpFlag || wsFlag {
err = s.WritePkg(outPkg, 0)
if err != nil {
log.Errorf("%s, [session.handleLoop] = error:%+v", s.sessionToken(), perrors.WithStack(err))
s.stop()
// break LOOP
flag = false
}
continue
}
iovec = iovec[:0]
for idx := 0; idx < maxIovecNum; idx++ {
pkgBytes, err = s.writer.Write(s, outPkg)
if err != nil {
log.Errorf("%s, [session.handleLoop] = error:%+v", s.sessionToken(), perrors.WithStack(err))
s.stop()
// break LOOP
flag = false
break
}
iovec = append(iovec, pkgBytes)
if idx < maxIovecNum-1 {
loopFlag = true
select {
case outPkg, ok = <-s.wQ:
if !ok {
loopFlag = false
}
default:
loopFlag = false
break
}
if !loopFlag {
break // break for-idx loop
}
}
}
err = s.WriteBytesArray(iovec[:]...)
if err != nil {
log.Errorf("%s, [session.handleLoop]s.WriteBytesArray(iovec len:%d) = error:%+v",
s.sessionToken(), len(iovec), perrors.WithStack(err))
s.stop()
// break LOOP
flag = false
}
case <-wheel.After(s.period): case <-wheel.After(s.period):
if flag { if wsFlag {
if wsFlag { err := wsConn.writePing()
err := wsConn.writePing() if err != nil {
if err != nil { log.Warnf("wsConn.writePing() = error:%+v", perrors.WithStack(err))
log.Warnf("wsConn.writePing() = error:%+v", perrors.WithStack(err))
}
} }
s.listener.OnCron(s)
} }
s.listener.OnCron(s)
} }
} }
} }
...@@ -943,25 +825,20 @@ func (s *session) stop() { ...@@ -943,25 +825,20 @@ func (s *session) stop() {
func (s *session) gc() { func (s *session) gc() {
var ( var (
wQ chan interface{}
conn Connection conn Connection
) )
s.lock.Lock() s.lock.Lock()
if s.attrs != nil { if s.attrs != nil {
s.attrs = nil s.attrs = nil
if s.wQ != nil {
wQ = s.wQ
s.wQ = nil
}
conn = s.Connection conn = s.Connection
s.Connection = nil
} }
s.lock.Unlock() s.lock.Unlock()
go func() { go func() {
if wQ != nil { if conn != nil {
conn.close((int)((int64)(s.wait))) conn.close(int(s.wait))
close(wQ)
} }
}() }()
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment