Unverified Commit 4a059312 authored by Xin.Zh's avatar Xin.Zh Committed by GitHub

Merge pull request #61 from georgehao/improve/delte-handleLoop

Improve/delte handle loop
parents 37ba9651 d03e90c9
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"encoding/binary"
"errors"
"flag"
"fmt"
"log"
"net"
"sync"
"sync/atomic"
"time"
)
import (
"github.com/apache/dubbo-getty"
"github.com/dubbogo/gost/sync"
"github.com/montanaflynn/stats"
)
var (
concurrency = flag.Int("c", 1, "concurrency")
total = flag.Int("n", 1, "total requests for all clients")
ip = flag.String("ip", "127.0.0.1:8090", "server IP")
connections = flag.Int("conn", 1, "number of tcp connections")
taskPoolMode = flag.Bool("taskPool", false, "task pool mode")
taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
pprofPort = flag.Int("pprof_port", 65431, "pprof http port")
)
var taskPool gxsync.GenericTaskPool
const CronPeriod = 20e9
const WritePkgTimeout = 1e8
func main() {
flag.Parse()
n := *concurrency
m := *total / n
log.Printf("Servers: %+v\n\n", *ip)
log.Printf("concurrency: %d\nrequests per client: %d\n\n", n, m)
var wg sync.WaitGroup
wg.Add(n * m)
d := make([][]int64, n, n)
var trans uint64
var transOK uint64
totalT := time.Now().UnixNano()
for i := 0; i < n; i++ {
dt := make([]int64, 0, m)
d = append(d, dt)
go func(ii int) {
client := getty.NewTCPClient(
getty.WithServerAddress(*ip),
getty.WithConnectionNumber(*connections),
getty.WithClientTaskPool(taskPool),
)
var tmpSession getty.Session
var NewHelloClientSession = func(session getty.Session) (err error) {
var pkgHandler = &PackageHandler{}
var EventListener = &MessageHandler{}
EventListener.SessionOnOpen = func(session getty.Session) {
tmpSession = session
}
tcpConn, ok := session.Conn().(*net.TCPConn)
if !ok {
panic(fmt.Sprintf("newSession: %s, session.conn{%#v} is not tcp connection", session.Stat(), session.Conn()))
}
if err = tcpConn.SetNoDelay(true); err != nil {
return err
}
if err = tcpConn.SetKeepAlive(true); err != nil {
return err
}
if err = tcpConn.SetKeepAlivePeriod(10 * time.Second); err != nil {
return err
}
if err = tcpConn.SetReadBuffer(262144); err != nil {
return err
}
if err = tcpConn.SetWriteBuffer(524288); err != nil {
return err
}
session.SetName("hello")
session.SetMaxMsgLen(128 * 1024) // max message package length is 128k
session.SetReadTimeout(time.Second)
session.SetWriteTimeout(5 * time.Second)
session.SetCronPeriod(int(CronPeriod / 1e6))
session.SetWaitTime(time.Second)
session.SetPkgHandler(pkgHandler)
session.SetEventListener(EventListener)
return nil
}
client.RunEventLoop(NewHelloClientSession)
for j := 0; j < m; j++ {
atomic.AddUint64(&trans, 1)
t := time.Now().UnixNano()
msg := buildSendMsg()
_, _, err := tmpSession.WritePkg(msg, WritePkgTimeout)
if err != nil {
log.Printf("Err:session.WritePkg(session{%s}, error{%v}", tmpSession.Stat(), err)
}
atomic.AddUint64(&transOK, 1)
t = time.Now().UnixNano() - t
d[ii] = append(d[ii], t)
wg.Done()
}
client.Close()
}(i)
}
wg.Wait()
totalT = time.Now().UnixNano() - totalT
totalT = totalT / 1000000
log.Printf("took %d ms for %d requests", totalT, n*m)
totalD := make([]int64, 0, n*m)
for _, k := range d {
totalD = append(totalD, k...)
}
totalD2 := make([]float64, 0, n*m)
for _, k := range totalD {
totalD2 = append(totalD2, float64(k))
}
mean, _ := stats.Mean(totalD2)
median, _ := stats.Median(totalD2)
max, _ := stats.Max(totalD2)
min, _ := stats.Min(totalD2)
p99, _ := stats.Percentile(totalD2, 99.9)
log.Printf("sent requests : %d\n", n*m)
log.Printf("received requests : %d\n", atomic.LoadUint64(&trans))
log.Printf("received requests_OK : %d\n", atomic.LoadUint64(&transOK))
log.Printf("throughput (TPS) : %d\n", int64(n*m)*1000/totalT)
log.Printf("mean: %.f ns, median: %.f ns, max: %.f ns, min: %.f ns, p99: %.f ns\n", mean, median, max, min, p99)
log.Printf("mean: %d ms, median: %d ms, max: %d ms, min: %d ms, p99: %d ms\n", int64(mean/1000000), int64(median/1000000), int64(max/1000000), int64(min/1000000), int64(p99/1000000))
}
type MessageHandler struct {
SessionOnOpen func(session getty.Session)
}
func (h *MessageHandler) OnOpen(session getty.Session) error {
log.Printf("OnOpen session{%s} open", session.Stat())
if h.SessionOnOpen != nil {
h.SessionOnOpen(session)
}
return nil
}
func (h *MessageHandler) OnError(session getty.Session, err error) {
log.Printf("OnError session{%s} got error{%v}, will be closed.", session.Stat(), err)
}
func (h *MessageHandler) OnClose(session getty.Session) {
log.Printf("hhf OnClose session{%s} is closing......", session.Stat())
}
func (h *MessageHandler) OnMessage(session getty.Session, pkg interface{}) {
log.Printf("OnMessage....")
s, ok := pkg.(string)
if !ok {
log.Printf("illegal package{%#v}", pkg)
return
}
log.Printf("OnMessage: %s", s)
}
func (h *MessageHandler) OnCron(session getty.Session) {
log.Printf("OnCron....")
}
type PackageHandler struct{}
func (h *PackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
dataLen := len(data)
if dataLen < 4 {
return nil, 0, nil
}
start := 0
pos := start + 4
pkgLen := int(binary.LittleEndian.Uint32(data[start:pos]))
if dataLen < pos+pkgLen {
return nil, pos + pkgLen, nil
}
start = pos
pos = start + pkgLen
s := string(data[start:pos])
return s, pos, nil
}
func (h *PackageHandler) Write(ss getty.Session, p interface{}) ([]byte, error) {
pkg, ok := p.(string)
if !ok {
log.Printf("illegal pkg:%+v", p)
return nil, errors.New("invalid package")
}
pkgLen := int32(len(pkg))
pkgStreams := make([]byte, 0, 4+len(pkg))
// pkg len
start := 0
pos := start + 4
binary.LittleEndian.PutUint32(pkgStreams[start:pos], uint32(pkgLen))
start = pos
// pkg
pos = start + int(pkgLen)
copy(pkgStreams[start:pos], pkg[:])
return pkgStreams[:pos], nil
}
func buildSendMsg() string {
return "如果扫描程序匹配了一行文本并且没有遇到错误,则 sc.Scan() 方法返回 true 。因此,只有当扫描仪的缓冲区中有一行文本时,才会调用 for 循环的主体。这意味着我们修改后的 CountLines 正确处理没有换行符的情况,并且还处理文件为空的情况。"
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"encoding/binary"
"errors"
"flag"
"fmt"
"log"
"net"
"time"
)
import (
"github.com/apache/dubbo-getty"
"github.com/dubbogo/gost/sync"
)
var (
concurrency = flag.Int("c", 1, "concurrency")
total = flag.Int("n", 1, "total requests for all clients")
ip = flag.String("ip", "127.0.0.1:8090", "server IP")
connections = flag.Int("conn", 1, "number of tcp connections")
taskPoolMode = flag.Bool("taskPool", false, "task pool mode")
taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
pprofPort = flag.Int("pprof_port", 65431, "pprof http port")
)
var taskPool gxsync.GenericTaskPool
const CronPeriod = 20e9
const WritePkgTimeout = 1e8
func main() {
flag.Parse()
n := *concurrency
log.Printf("Servers: %+v\n\n", *ip)
for i := 0; i < n; i++ {
go func(ii int) {
client := getty.NewTCPClient(
getty.WithServerAddress(*ip),
getty.WithConnectionNumber(*connections),
getty.WithClientTaskPool(taskPool),
)
var tmpSession getty.Session
var NewHelloClientSession = func(session getty.Session) (err error) {
var pkgHandler = &PackageHandler{}
var EventListener = &MessageHandler{}
EventListener.SessionOnOpen = func(session getty.Session) {
tmpSession = session
}
tcpConn, ok := session.Conn().(*net.TCPConn)
if !ok {
panic(fmt.Sprintf("newSession: %s, session.conn{%#v} is not tcp connection", session.Stat(), session.Conn()))
}
if err = tcpConn.SetNoDelay(true); err != nil {
return err
}
if err = tcpConn.SetKeepAlive(true); err != nil {
return err
}
if err = tcpConn.SetKeepAlivePeriod(10 * time.Second); err != nil {
return err
}
if err = tcpConn.SetReadBuffer(262144); err != nil {
return err
}
if err = tcpConn.SetWriteBuffer(524288); err != nil {
return err
}
session.SetName("hello")
session.SetMaxMsgLen(128 * 1024) // max message package length is 128k
session.SetReadTimeout(time.Second)
session.SetWriteTimeout(5 * time.Second)
session.SetCronPeriod(int(CronPeriod / 1e6))
session.SetWaitTime(time.Second)
session.SetPkgHandler(pkgHandler)
session.SetEventListener(EventListener)
return nil
}
client.RunEventLoop(NewHelloClientSession)
for {
msg := buildSendMsg()
_, _, err := tmpSession.WritePkg(msg, WritePkgTimeout)
if err != nil {
log.Printf("Err:session.WritePkg(session{%s}, error{%v}", tmpSession.Stat(), err)
tmpSession.Close()
}
}
client.Close()
}(i)
}
c := make(chan int)
<-c
}
type MessageHandler struct {
SessionOnOpen func(session getty.Session)
}
func (h *MessageHandler) OnOpen(session getty.Session) error {
log.Printf("OnOpen session{%s} open", session.Stat())
if h.SessionOnOpen != nil {
h.SessionOnOpen(session)
}
return nil
}
func (h *MessageHandler) OnError(session getty.Session, err error) {
log.Printf("OnError session{%s} got error{%v}, will be closed.", session.Stat(), err)
}
func (h *MessageHandler) OnClose(session getty.Session) {
log.Printf("hhf OnClose session{%s} is closing......", session.Stat())
}
func (h *MessageHandler) OnMessage(session getty.Session, pkg interface{}) {
log.Printf("OnMessage....")
s, ok := pkg.(string)
if !ok {
log.Printf("illegal package{%#v}", pkg)
return
}
log.Printf("OnMessage: %s", s)
}
func (h *MessageHandler) OnCron(session getty.Session) {
log.Printf("OnCron....")
}
type PackageHandler struct{}
func (h *PackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
dataLen := len(data)
if dataLen < 4 {
return nil, 0, nil
}
start := 0
pos := start + 4
pkgLen := int(binary.LittleEndian.Uint32(data[start:pos]))
if dataLen < pos+pkgLen {
return nil, pos + pkgLen, nil
}
start = pos
pos = start + pkgLen
s := string(data[start:pos])
return s, pos, nil
}
func (h *PackageHandler) Write(ss getty.Session, p interface{}) ([]byte, error) {
pkg, ok := p.(string)
if !ok {
log.Printf("illegal pkg:%+v", p)
return nil, errors.New("invalid package")
}
pkgLen := int32(len(pkg))
pkgStreams := make([]byte, 0, 4+len(pkg))
// pkg len
start := 0
pos := start + 4
binary.LittleEndian.PutUint32(pkgStreams[start:pos], uint32(pkgLen))
start = pos
// pkg
pos = start + int(pkgLen)
copy(pkgStreams[start:pos], pkg[:])
return pkgStreams[:pos], nil
}
func buildSendMsg() string {
return "如果扫描程序匹配了一行文本并且没有遇到错误,则 sc.Scan() 方法返回 true 。因此,只有当扫描仪的缓冲区中有一行文本时,才会调用 for 循环的主体。这意味着我们修改后的 CountLines 正确处理没有换行符的情况,并且还处理文件为空的情况。"
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"encoding/binary"
"errors"
"flag"
"fmt"
"log"
"net"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"syscall"
"time"
)
import (
"github.com/apache/dubbo-getty"
gxsync "github.com/dubbogo/gost/sync"
)
var (
taskPoolMode = flag.Bool("taskPool", false, "task pool mode")
taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
pprofPort = flag.Int("pprof_port", 65432, "pprof http port")
)
var (
taskPool gxsync.GenericTaskPool
)
const CronPeriod = 20e9
func main() {
flag.Parse()
go func() {
http.ListenAndServe(fmt.Sprintf(":%d", *pprofPort), nil)
}()
options := []getty.ServerOption{getty.WithLocalAddress(":8090")}
if *taskPoolMode {
taskPool = gxsync.NewTaskPoolSimple(*taskPoolSize)
options = append(options, getty.WithServerTaskPool(taskPool))
}
server := getty.NewTCPServer(options...)
go server.RunEventLoop(NewHelloServerSession)
log.Printf("getty server start, listening at: 8090")
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
<-signals
server.Close()
}
func NewHelloServerSession(session getty.Session) (err error) {
var pkgHandler = &PackageHandler{}
var EventListener = &MessageHandler{}
tcpConn, ok := session.Conn().(*net.TCPConn)
if !ok {
panic(fmt.Sprintf("newSession: %s, session.conn{%#v} is not tcp connection", session.Stat(), session.Conn()))
}
if err = tcpConn.SetNoDelay(true); err != nil {
return err
}
if err = tcpConn.SetKeepAlive(true); err != nil {
return err
}
if err = tcpConn.SetKeepAlivePeriod(10 * time.Second); err != nil {
return err
}
if err = tcpConn.SetReadBuffer(262144); err != nil {
return err
}
if err = tcpConn.SetWriteBuffer(524288); err != nil {
return err
}
session.SetName("hello")
session.SetMaxMsgLen(128 * 1024) // max message package length is 128k
session.SetReadTimeout(time.Second)
session.SetWriteTimeout(5 * time.Second)
session.SetCronPeriod(int(CronPeriod / 1e6))
session.SetWaitTime(time.Second)
session.SetPkgHandler(pkgHandler)
session.SetEventListener(EventListener)
return nil
}
type MessageHandler struct {
SessionOnOpen func(session getty.Session)
}
func (h *MessageHandler) OnOpen(session getty.Session) error {
log.Printf("OnOpen session{%s} open", session.Stat())
if h.SessionOnOpen != nil {
h.SessionOnOpen(session)
}
return nil
}
func (h *MessageHandler) OnError(session getty.Session, err error) {
log.Printf("OnError session{%s} got error{%v}, will be closed.", session.Stat(), err)
}
func (h *MessageHandler) OnClose(session getty.Session) {
log.Printf("OnClose session{%s} is closing......", session.Stat())
}
func (h *MessageHandler) OnMessage(session getty.Session, pkg interface{}) {
log.Printf("OnMessage....")
//s, ok := pkg.(string)
//if !ok {
// log.Printf("illegal package{%#v}", pkg)
// return
//}
//log.Printf("OnMessage: %s", s)
}
func (h *MessageHandler) OnCron(session getty.Session) {
log.Printf("OnCron....")
}
type PackageHandler struct{}
func (h *PackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
dataLen := len(data)
if dataLen < 4 {
return nil, 0, nil
}
start := 0
pos := start + 4
pkgLen := int(binary.LittleEndian.Uint32(data[start:pos]))
if dataLen < pos+pkgLen {
return nil, pos + pkgLen, nil
}
start = pos
pos = start + pkgLen
s := string(data[start:pos])
return s, pos, nil
}
func (h *PackageHandler) Write(ss getty.Session, p interface{}) ([]byte, error) {
pkg, ok := p.(string)
if !ok {
log.Printf("illegal pkg:%+v", p)
return nil, errors.New("invalid package")
}
pkgLen := int32(len(pkg))
pkgStreams := make([]byte, 0, 4+len(pkg))
// pkg len
start := 0
pos := start + 4
binary.LittleEndian.PutUint32(pkgStreams[start:pos], uint32(pkgLen))
start = pos
// pkg
pos = start + int(pkgLen)
copy(pkgStreams[start:pos], pkg[:])
return pkgStreams[:pos], nil
}
...@@ -301,7 +301,7 @@ func (t *gettyTCPConn) send(pkg interface{}) (int, error) { ...@@ -301,7 +301,7 @@ func (t *gettyTCPConn) send(pkg interface{}) (int, error) {
t.writeBytes.Add((uint32)(len(p))) t.writeBytes.Add((uint32)(len(p)))
t.writePkgNum.Add(1) t.writePkgNum.Add(1)
} }
log.Debugf("localAddr: %s, remoteAddr:%s, now:%s, length:%d, err:%s", log.Debugf("localAddr: %s, remoteAddr:%s, now:%s, length:%d, err:%v",
t.conn.LocalAddr(), t.conn.RemoteAddr(), currentTime, length, err) t.conn.LocalAddr(), t.conn.RemoteAddr(), currentTime, length, err)
return length, perrors.WithStack(err) return length, perrors.WithStack(err)
} }
......
...@@ -6,8 +6,9 @@ require ( ...@@ -6,8 +6,9 @@ require (
github.com/dubbogo/gost v1.11.0 github.com/dubbogo/gost v1.11.0
github.com/golang/snappy v0.0.1 github.com/golang/snappy v0.0.1
github.com/gorilla/websocket v1.4.2 github.com/gorilla/websocket v1.4.2
github.com/montanaflynn/stats v0.6.6
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.6.1 github.com/stretchr/testify v1.7.0
go.uber.org/atomic v1.7.0 go.uber.org/atomic v1.7.0
go.uber.org/zap v1.15.0 go.uber.org/zap v1.16.0
) )
...@@ -29,6 +29,8 @@ github.com/mattn/go-colorable v0.1.7 h1:bQGKb3vps/j0E9GfJQ03JyhRuxsvdAanXlT9BTw3 ...@@ -29,6 +29,8 @@ github.com/mattn/go-colorable v0.1.7 h1:bQGKb3vps/j0E9GfJQ03JyhRuxsvdAanXlT9BTw3
github.com/mattn/go-colorable v0.1.7/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-colorable v0.1.7/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/montanaflynn/stats v0.6.6 h1:Duep6KMIDpY4Yo11iFsvyqJDyfzLF9+sndUKT+v64GQ=
github.com/montanaflynn/stats v0.6.6/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
...@@ -40,8 +42,9 @@ github.com/shirou/gopsutil v3.20.11-0.20201116082039-2fb5da2f2449+incompatible/g ...@@ -40,8 +42,9 @@ github.com/shirou/gopsutil v3.20.11-0.20201116082039-2fb5da2f2449+incompatible/g
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
...@@ -49,8 +52,8 @@ go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A= ...@@ -49,8 +52,8 @@ go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
go.uber.org/zap v1.15.0 h1:ZZCA22JRF2gQE5FoNmhmrf7jeJJ2uhqDUNRYKm8dvmM= go.uber.org/zap v1.16.0 h1:uFRZXykJGK9lLY4HtgSw44DnIcAM+kRBP7x5m+NpAOM=
go.uber.org/zap v1.15.0/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc= go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
...@@ -76,6 +79,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 ...@@ -76,6 +79,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
......
...@@ -266,7 +266,7 @@ func (s *server) runTcpEventLoop(newSession NewSessionCallback) { ...@@ -266,7 +266,7 @@ func (s *server) runTcpEventLoop(newSession NewSessionCallback) {
) )
for { for {
if s.IsClosed() { if s.IsClosed() {
log.Warnf("server{%s} stop accepting client connect request.", s.addr) log.Infof("server{%s} stop accepting client connect request.", s.addr)
return return
} }
if delay != 0 { if delay != 0 {
......
...@@ -476,7 +476,6 @@ func sessionTimerLoop(_ gxtime.TimerID, _ time.Time, arg interface{}) error { ...@@ -476,7 +476,6 @@ func sessionTimerLoop(_ gxtime.TimerID, _ time.Time, arg interface{}) error {
taskPool.AddTaskAlways(f) taskPool.AddTaskAlways(f)
return nil return nil
} }
f() f()
return nil return nil
} }
...@@ -500,7 +499,7 @@ func (s *session) run() { ...@@ -500,7 +499,7 @@ func (s *session) run() {
s.grNum.Add(1) s.grNum.Add(1)
if _, err := defaultTimerWheel.AddTimer(sessionTimerLoop, gxtime.TimerLoop, s.period, s); err != nil { if _, err := defaultTimerWheel.AddTimer(sessionTimerLoop, gxtime.TimerLoop, s.period, s); err != nil {
panic(fmt.Sprintf("failed to add session %s to defaultTimerWheel", s.Stat())) panic(fmt.Sprintf("failed to add session %s to defaultTimerWheel err:%v", s.Stat(), err))
} }
// start read gr // start read gr
go s.handlePackage() go s.handlePackage()
...@@ -608,7 +607,7 @@ func (s *session) handleTCPPackage() error { ...@@ -608,7 +607,7 @@ func (s *session) handleTCPPackage() error {
break break
} }
if perrors.Cause(err) == io.EOF { if perrors.Cause(err) == io.EOF {
log.Infof("%s, [session.conn.read] = error:%+v", s.sessionToken(), perrors.WithStack(err)) log.Infof("%s, session.conn read EOF, client send over, session exit", s.sessionToken())
err = nil err = nil
exit = true exit = true
break break
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment