Commit 8b6ec175 authored by AlexStocks's avatar AlexStocks

delete session.handleLoop

parent ef4aaa4a
......@@ -34,6 +34,8 @@ import (
"github.com/dubbogo/gost/bytes"
"github.com/dubbogo/gost/net"
gxsync "github.com/dubbogo/gost/sync"
gxtime "github.com/dubbogo/gost/time"
"github.com/gorilla/websocket"
perrors "github.com/pkg/errors"
)
......@@ -170,7 +172,7 @@ func (c *client) dialTCP() Session {
}
log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, perrors.WithStack(err))
<-wheel.After(connectInterval)
<-gxtime.After(connectInterval)
}
}
......@@ -202,7 +204,7 @@ func (c *client) dialUDP() Session {
}
if err != nil {
log.Warnf("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, perrors.WithStack(err))
<-wheel.After(connectInterval)
<-gxtime.After(connectInterval)
continue
}
......@@ -211,7 +213,7 @@ func (c *client) dialUDP() Session {
if length, err = conn.Write(connectPingPackage[:]); err != nil {
conn.Close()
log.Warnf("conn.Write(%s) = {length:%d, err:%+v}", string(connectPingPackage), length, perrors.WithStack(err))
<-wheel.After(connectInterval)
<-gxtime.After(connectInterval)
continue
}
conn.SetReadDeadline(time.Now().Add(1e9))
......@@ -222,7 +224,7 @@ func (c *client) dialUDP() Session {
if err != nil {
log.Infof("conn{%#v}.Read() = {length:%d, err:%+v}", conn, length, perrors.WithStack(err))
conn.Close()
<-wheel.After(connectInterval)
<-gxtime.After(connectInterval)
continue
}
//if err == nil {
......@@ -260,7 +262,7 @@ func (c *client) dialWS() Session {
}
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, perrors.WithStack(err))
<-wheel.After(connectInterval)
<-gxtime.After(connectInterval)
}
}
......@@ -338,7 +340,7 @@ func (c *client) dialWSS() Session {
}
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, perrors.WithStack(err))
<-wheel.After(connectInterval)
<-gxtime.After(connectInterval)
}
}
......@@ -445,7 +447,7 @@ func (c *client) reConnect() {
if maxTimes < times {
times = maxTimes
}
<-wheel.After(time.Duration(int64(times) * int64(interval)))
<-gxtime.After(time.Duration(int64(times) * int64(interval)))
}
}
......
......@@ -24,7 +24,6 @@ import (
"os"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
)
......@@ -96,7 +95,6 @@ func newSessionCallback(session Session, handler *MessageHandler) error {
}
func TestTCPClient(t *testing.T) {
assert.NotNil(t, GetTimeWheel())
listenLocalServer := func() (net.Listener, error) {
listener, err := net.Listen("tcp", ":0")
if err != nil {
......@@ -137,27 +135,27 @@ func TestTCPClient(t *testing.T) {
ss.SetCompressType(CompressNone)
conn := ss.(*session).Connection.(*gettyTCPConn)
assert.True(t, conn.compress == CompressNone)
beforeWriteBytes := atomic.LoadUint32(&conn.writeBytes)
beforeWritePkgNum := atomic.LoadUint32(&conn.writePkgNum)
beforeWriteBytes := conn.writeBytes.Load()
beforeWritePkgNum := conn.writePkgNum.Load()
_, err = conn.send([]byte("hello"))
assert.Equal(t, beforeWritePkgNum+1, atomic.LoadUint32(&conn.writePkgNum))
assert.Equal(t, beforeWritePkgNum+1, conn.writePkgNum.Load())
assert.Nil(t, err)
assert.Equal(t, beforeWriteBytes+5, atomic.LoadUint32(&conn.writeBytes))
assert.Equal(t, beforeWriteBytes+5, conn.writeBytes.Load())
err = ss.WriteBytes([]byte("hello"))
assert.Equal(t, beforeWriteBytes+10, atomic.LoadUint32(&conn.writeBytes))
assert.Equal(t, beforeWritePkgNum+2, atomic.LoadUint32(&conn.writePkgNum))
assert.Equal(t, beforeWriteBytes+10, conn.writeBytes.Load())
assert.Equal(t, beforeWritePkgNum+2, conn.writePkgNum.Load())
assert.Nil(t, err)
var pkgs [][]byte
pkgs = append(pkgs, []byte("hello"), []byte("hello"))
_, err = conn.send(pkgs)
assert.Equal(t, beforeWritePkgNum+4, atomic.LoadUint32(&conn.writePkgNum))
assert.Equal(t, beforeWriteBytes+20, atomic.LoadUint32(&conn.writeBytes))
assert.Equal(t, beforeWritePkgNum+4, conn.writePkgNum.Load())
assert.Equal(t, beforeWriteBytes+20, conn.writeBytes.Load())
assert.Nil(t, err)
ss.SetCompressType(CompressSnappy)
err = ss.WriteBytesArray(pkgs...)
assert.Nil(t, err)
assert.Equal(t, beforeWritePkgNum+6, atomic.LoadUint32(&conn.writePkgNum))
assert.Equal(t, beforeWriteBytes+30, atomic.LoadUint32(&conn.writeBytes))
assert.Equal(t, beforeWritePkgNum+6, conn.writePkgNum.Load())
assert.Equal(t, beforeWriteBytes+30, conn.writeBytes.Load())
assert.True(t, conn.compress == CompressSnappy)
clt.Close()
......@@ -227,14 +225,14 @@ func TestUDPClient(t *testing.T) {
_, err = udpConn.send(udpCtx)
assert.NotNil(t, err)
udpCtx.Pkg = []byte("hello")
beforeWriteBytes := atomic.LoadUint32(&udpConn.writeBytes)
beforeWriteBytes := udpConn.writeBytes.Load()
_, err = udpConn.send(udpCtx)
assert.Equal(t, beforeWriteBytes+5, atomic.LoadUint32(&udpConn.writeBytes))
assert.Equal(t, beforeWriteBytes+5, udpConn.writeBytes.Load())
assert.Nil(t, err)
beforeWritePkgNum := atomic.LoadUint32(&udpConn.writePkgNum)
beforeWritePkgNum := udpConn.writePkgNum.Load()
err = ss.WritePkg(udpCtx, 0)
assert.Equal(t, beforeWritePkgNum+1, atomic.LoadUint32(&udpConn.writePkgNum))
assert.Equal(t, beforeWritePkgNum+1, udpConn.writePkgNum.Load())
assert.Nil(t, err)
clt.Close()
......@@ -287,17 +285,17 @@ func TestNewWSClient(t *testing.T) {
assert.Nil(t, err)
_, err = conn.send("hello")
assert.NotNil(t, err)
beforeWriteBytes := atomic.LoadUint32(&conn.writeBytes)
beforeWriteBytes := conn.writeBytes.Load()
_, err = conn.send([]byte("hello"))
assert.Nil(t, err)
assert.Equal(t, beforeWriteBytes+5, atomic.LoadUint32(&conn.writeBytes))
beforeWritePkgNum := atomic.LoadUint32(&conn.writePkgNum)
assert.Equal(t, beforeWriteBytes+5, conn.writeBytes.Load())
beforeWritePkgNum := conn.writePkgNum.Load()
err = ss.WriteBytes([]byte("hello"))
assert.Nil(t, err)
assert.Equal(t, beforeWritePkgNum+1, atomic.LoadUint32(&conn.writePkgNum))
assert.Equal(t, beforeWritePkgNum+1, conn.writePkgNum.Load())
err = ss.WriteBytesArray([]byte("hello"), []byte("hello"))
assert.Nil(t, err)
assert.Equal(t, beforeWritePkgNum+3, atomic.LoadUint32(&conn.writePkgNum))
assert.Equal(t, beforeWritePkgNum+3, conn.writePkgNum.Load())
err = conn.writePing()
assert.Nil(t, err)
......
......@@ -24,20 +24,19 @@ import (
"io"
"net"
"sync"
"sync/atomic"
"time"
)
import (
"github.com/golang/snappy"
"github.com/gorilla/websocket"
perrors "github.com/pkg/errors"
uatomic "go.uber.org/atomic"
)
var (
launchTime = time.Now()
// ErrInvalidConnection = perrors.New("connection has been closed.")
)
/////////////////////////////////////////
......@@ -45,7 +44,7 @@ var (
/////////////////////////////////////////
var (
connID uint32
connID uatomic.Uint32
)
type gettyConn struct {
......@@ -53,11 +52,11 @@ type gettyConn struct {
compress CompressType
padding1 uint8
padding2 uint16
readBytes uint32 // read bytes
writeBytes uint32 // write bytes
readPkgNum uint32 // send pkg number
writePkgNum uint32 // recv pkg number
active int64 // last active, in milliseconds
readBytes uatomic.Uint32 // read bytes
writeBytes uatomic.Uint32 // write bytes
readPkgNum uatomic.Uint32 // send pkg number
writePkgNum uatomic.Uint32 // recv pkg number
active uatomic.Int64 // last active, in milliseconds
rTimeout time.Duration // network current limiting
wTimeout time.Duration
rLastDeadline time.Time // lastest network read time
......@@ -80,19 +79,19 @@ func (c *gettyConn) RemoteAddr() string {
}
func (c *gettyConn) incReadPkgNum() {
atomic.AddUint32(&c.readPkgNum, 1)
c.readPkgNum.Add(1)
}
func (c *gettyConn) incWritePkgNum() {
atomic.AddUint32(&c.writePkgNum, 1)
c.writePkgNum.Add(1)
}
func (c *gettyConn) UpdateActive() {
atomic.StoreInt64(&(c.active), int64(time.Since(launchTime)))
c.active.Store(int64(time.Since(launchTime)))
}
func (c *gettyConn) GetActive() time.Time {
return launchTime.Add(time.Duration(atomic.LoadInt64(&(c.active))))
return launchTime.Add(time.Duration(c.active.Load()))
}
func (c *gettyConn) send(interface{}) (int, error) {
......@@ -173,7 +172,7 @@ func newGettyTCPConn(conn net.Conn) *gettyTCPConn {
reader: io.Reader(conn),
writer: io.Writer(conn),
gettyConn: gettyConn{
id: atomic.AddUint32(&connID, 1),
id: connID.Add(1),
rTimeout: netIOTimeout,
wTimeout: netIOTimeout,
local: localAddr,
......@@ -257,10 +256,8 @@ func (t *gettyTCPConn) recv(p []byte) (int, error) {
}
length, err = t.reader.Read(p)
// log.Debugf("now:%s, length:%d, err:%v", currentTime, length, err)
atomic.AddUint32(&t.readBytes, uint32(length))
t.readBytes.Add(uint32(length))
return length, perrors.WithStack(err)
//return length, err
}
// tcp connection write
......@@ -289,8 +286,8 @@ func (t *gettyTCPConn) send(pkg interface{}) (int, error) {
if buffers, ok := pkg.([][]byte); ok {
netBuf := net.Buffers(buffers)
if length, err := netBuf.WriteTo(t.conn); err == nil {
atomic.AddUint32(&t.writeBytes, (uint32)(length))
atomic.AddUint32(&t.writePkgNum, (uint32)(len(buffers)))
t.writeBytes.Add((uint32)(length))
t.writePkgNum.Add((uint32)(len(buffers)))
}
log.Debugf("localAddr: %s, remoteAddr:%s, now:%s, length:%d, err:%s",
t.conn.LocalAddr(), t.conn.RemoteAddr(), currentTime, length, err)
......@@ -299,8 +296,8 @@ func (t *gettyTCPConn) send(pkg interface{}) (int, error) {
if p, ok = pkg.([]byte); ok {
if length, err = t.writer.Write(p); err == nil {
atomic.AddUint32(&t.writeBytes, (uint32)(len(p)))
atomic.AddUint32(&t.writePkgNum, 1)
t.writeBytes.Add((uint32)(len(p)))
t.writePkgNum.Add(1)
}
log.Debugf("localAddr: %s, remoteAddr:%s, now:%s, length:%d, err:%s",
t.conn.LocalAddr(), t.conn.RemoteAddr(), currentTime, length, err)
......@@ -371,7 +368,7 @@ func newGettyUDPConn(conn *net.UDPConn) *gettyUDPConn {
return &gettyUDPConn{
conn: conn,
gettyConn: gettyConn{
id: atomic.AddUint32(&connID, 1),
id: connID.Add(1),
rTimeout: netIOTimeout,
wTimeout: netIOTimeout,
local: localAddr,
......@@ -393,33 +390,25 @@ func (u *gettyUDPConn) SetCompressType(c CompressType) {
// udp connection read
func (u *gettyUDPConn) recv(p []byte) (int, *net.UDPAddr, error) {
var (
err error
currentTime time.Time
length int
addr *net.UDPAddr
)
if u.rTimeout > 0 {
// Optimization: update read deadline only if more than 25%
// of the last read deadline exceeded.
// See https://github.com/golang/go/issues/15133 for details.
currentTime = time.Now()
currentTime := time.Now()
if currentTime.Sub(u.rLastDeadline) > (u.rTimeout >> 2) {
if err = u.conn.SetReadDeadline(currentTime.Add(u.rTimeout)); err != nil {
if err := u.conn.SetReadDeadline(currentTime.Add(u.rTimeout)); err != nil {
return 0, nil, perrors.WithStack(err)
}
u.rLastDeadline = currentTime
}
}
length, addr, err = u.conn.ReadFromUDP(p) // connected udp also can get return @addr
log.Debugf("ReadFromUDP() = {length:%d, peerAddr:%s, error:%v}", length, addr, err)
length, addr, err := u.conn.ReadFromUDP(p) // connected udp also can get return @addr
log.Debugf("ReadFromUDP(p:%d) = {length:%d, peerAddr:%s, error:%v}", len(p), length, addr, err)
if err == nil {
atomic.AddUint32(&u.readBytes, uint32(length))
u.readBytes.Add(uint32(length))
}
//return length, addr, err
return length, addr, perrors.WithStack(err)
}
......@@ -462,8 +451,8 @@ func (u *gettyUDPConn) send(udpCtx interface{}) (int, error) {
}
if length, _, err = u.conn.WriteMsgUDP(buf, nil, peerAddr); err == nil {
atomic.AddUint32(&u.writeBytes, (uint32)(len(buf)))
atomic.AddUint32(&u.writePkgNum, 1)
u.writeBytes.Add((uint32)(len(buf)))
u.writePkgNum.Add(1)
}
log.Debugf("WriteMsgUDP(peerAddr:%s) = {length:%d, error:%v}", peerAddr, length, err)
......@@ -504,7 +493,7 @@ func newGettyWSConn(conn *websocket.Conn) *gettyWSConn {
gettyWSConn := &gettyWSConn{
conn: conn,
gettyConn: gettyConn{
id: atomic.AddUint32(&connID, 1),
id: connID.Add(1),
rTimeout: netIOTimeout,
wTimeout: netIOTimeout,
local: localAddr,
......@@ -557,7 +546,7 @@ func (w *gettyWSConn) recv() ([]byte, error) {
// gorilla/websocket/conn.go:NextReader will always fail when got a timeout error.
_, b, e := w.conn.ReadMessage() // the first return value is message type.
if e == nil {
atomic.AddUint32(&w.readBytes, (uint32)(len(b)))
w.readBytes.Add((uint32)(len(b)))
} else {
if websocket.IsUnexpectedCloseError(e, websocket.CloseGoingAway) {
log.Warnf("websocket unexpected close error: %v", e)
......@@ -565,7 +554,6 @@ func (w *gettyWSConn) recv() ([]byte, error) {
}
return b, perrors.WithStack(e)
//return b, e
}
func (w *gettyWSConn) updateWriteDeadline() error {
......@@ -604,8 +592,8 @@ func (w *gettyWSConn) send(pkg interface{}) (int, error) {
w.updateWriteDeadline()
if err = w.conn.WriteMessage(websocket.BinaryMessage, p); err == nil {
atomic.AddUint32(&w.writeBytes, (uint32)(len(p)))
atomic.AddUint32(&w.writePkgNum, 1)
w.writeBytes.Add((uint32)(len(p)))
w.writePkgNum.Add(1)
}
return len(p), perrors.WithStack(err)
}
......
......@@ -3,10 +3,11 @@ module github.com/apache/dubbo-getty
go 1.14
require (
github.com/dubbogo/gost v1.10.1
github.com/dubbogo/gost v1.11.0
github.com/golang/snappy v0.0.1
github.com/gorilla/websocket v1.4.2
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.6.1
go.uber.org/atomic v1.7.0
go.uber.org/zap v1.15.0
)
......@@ -5,8 +5,8 @@ github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrU
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dubbogo/gost v1.10.1 h1:39kF9Cd5JOiMpmwG6dX1/aLWNFqFv9gHp8HrhzMmjLY=
github.com/dubbogo/gost v1.10.1/go.mod h1:+mQGS51XQEUWZP2JeGZTxJwipjRKtJO7Tr+FOg+72rI=
github.com/dubbogo/gost v1.11.0 h1:9KtyWQz1gMlAfwzen5iyhMdoe08SPBBUVhco4rdgJ9I=
github.com/dubbogo/gost v1.11.0/go.mod h1:w8Yw29eDWtRVo3tx9nPpHkNZnOi4SRx1fZf7eVlAAU4=
github.com/dubbogo/jsonparser v1.0.1/go.mod h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU=
github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI=
github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
......@@ -15,7 +15,9 @@ github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 h1:uC1QfSlInpQF+M0ao65imhwqKnz3Q2z/d8PWZRMQvDM=
github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88/go.mod h1:3w7q1U84EfirKl04SVQ/s7nPm1ZPhiXd34z40TNz36k=
github.com/k0kubun/pp v3.0.1+incompatible h1:3tqvf7QgUnZ5tXO6pNAZlrvHgl6DvifjDrd9g2S9Z40=
github.com/k0kubun/pp v3.0.1+incompatible/go.mod h1:GWse8YhT0p8pT4ir3ZgBbfZild3tgzSScAn6HmfYukg=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
......@@ -23,7 +25,9 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/mattn/go-colorable v0.1.7 h1:bQGKb3vps/j0E9GfJQ03JyhRuxsvdAanXlT9BTw3mdw=
github.com/mattn/go-colorable v0.1.7/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
......@@ -38,8 +42,9 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4=
......
......@@ -27,21 +27,24 @@ import (
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
)
import (
gxnet "github.com/dubbogo/gost/net"
gxsync "github.com/dubbogo/gost/sync"
gxtime "github.com/dubbogo/gost/time"
"github.com/gorilla/websocket"
perrors "github.com/pkg/errors"
uatomic "go.uber.org/atomic"
)
var (
errSelfConnect = perrors.New("connect self!")
serverFastFailTimeout = time.Second * 1
serverID = EndPointID(0)
serverID uatomic.Int32
)
type server struct {
......@@ -69,7 +72,7 @@ func (s *server) init(opts ...ServerOption) {
func newServer(t EndPointType, opts ...ServerOption) *server {
s := &server{
endPointID: atomic.AddInt32(&serverID, 1),
endPointID: serverID.Add(1),
endPointType: t,
done: make(chan struct{}),
}
......@@ -268,7 +271,7 @@ func (s *server) runTcpEventLoop(newSession NewSessionCallback) {
return
}
if delay != 0 {
<-wheel.After(delay)
<-gxtime.After(delay)
}
client, err = s.accept(newSession)
if err != nil {
......
......@@ -25,7 +25,6 @@ import (
"net"
"runtime"
"sync"
"sync/atomic"
"time"
)
......@@ -36,6 +35,7 @@ import (
"github.com/gorilla/websocket"
perrors "github.com/pkg/errors"
uatomic "go.uber.org/atomic"
)
const (
......@@ -59,17 +59,12 @@ const (
/////////////////////////////////////////
var (
wheel *gxtime.Wheel
defaultTimerWheel *gxtime.TimerWheel
)
func init() {
span := 100e6 // 100ms
buckets := MaxWheelTimeSpan / span
wheel = gxtime.NewWheel(time.Duration(span), int(buckets)) // wheel longest span is 15 minute
}
func GetTimeWheel() *gxtime.Wheel {
return wheel
gxtime.InitDefaultTimerWheel()
defaultTimerWheel = gxtime.GetDefaultTimerWheel()
}
// getty base session
......@@ -101,9 +96,7 @@ type session struct {
attrs *gxcontext.ValuesContext
// goroutines sync
grNum int32
// read goroutines done signal
rDone chan struct{}
grNum uatomic.Int32
lock sync.RWMutex
}
......@@ -122,7 +115,6 @@ func newSession(endPoint EndPoint, conn Connection) *session {
done: make(chan struct{}),
wait: pendingDuration,
attrs: gxcontext.NewValuesContext(context.Background()),
rDone: make(chan struct{}),
}
ss.Connection.setSession(ss)
......@@ -164,7 +156,6 @@ func (s *session) Reset() {
period: period,
wait: pendingDuration,
attrs: gxcontext.NewValuesContext(context.Background()),
rDone: make(chan struct{}),
}
}
......@@ -214,10 +205,10 @@ func (s *session) Stat() string {
return fmt.Sprintf(
outputFormat,
s.sessionToken(),
atomic.LoadUint32(&(conn.readBytes)),
atomic.LoadUint32(&(conn.writeBytes)),
atomic.LoadUint32(&(conn.readPkgNum)),
atomic.LoadUint32(&(conn.writePkgNum)),
conn.readBytes.Load(),
conn.writeBytes.Load(),
conn.readPkgNum.Load(),
conn.writePkgNum.Load(),
)
}
......@@ -460,6 +451,34 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error {
return nil
}
func sessionTimerLoop(_ gxtime.TimerID, _ time.Time, arg interface{}) error {
ss, _ := arg.(*session)
if ss != nil && ss.IsClosed() {
return ErrSessionClosed
}
f := func() {
wsConn, wsFlag := ss.Connection.(*gettyWSConn)
if wsFlag {
err := wsConn.writePing()
if err != nil {
log.Warnf("wsConn.writePing() = error:%+v", perrors.WithStack(err))
}
}
ss.listener.OnCron(ss)
}
// if enable task pool, run @f asynchronously.
if taskPool := ss.EndPoint().GetTaskPool(); taskPool != nil {
taskPool.AddTaskAlways(f)
return nil
}
f()
return nil
}
// func (s *session) RunEventLoop() {
func (s *session) run() {
if s.Connection == nil || s.listener == nil || s.writer == nil {
......@@ -477,56 +496,12 @@ func (s *session) run() {
return
}
// start read/write gr
atomic.AddInt32(&(s.grNum), 2)
go s.handleLoop()
go s.handlePackage()
}
func (s *session) handleLoop() {
var (
wsFlag bool
wsConn *gettyWSConn
counter gxtime.CountWatch
)
defer func() {
if r := recover(); r != nil {
const size = 64 << 10
rBuf := make([]byte, size)
rBuf = rBuf[:runtime.Stack(rBuf, false)]
log.Errorf("[session.handleLoop] panic session %s: err=%s\n%s", s.sessionToken(), r, rBuf)
}
grNum := atomic.AddInt32(&(s.grNum), -1)
s.listener.OnClose(s)
log.Infof("%s, [session.handleLoop] goroutine exit now, left gr num %d", s.Stat(), grNum)
s.gc()
}()
wsConn, wsFlag = s.Connection.(*gettyWSConn)
LOOP:
for {
select {
case <-s.done:
// this case branch assure the (session)handleLoop gr will exit after (session)handlePackage gr.
<-s.rDone
counter.Start()
if counter.Count() > s.wait.Nanoseconds() {
log.Infof("%s, [session.handleLoop] got done signal ", s.Stat())
break LOOP
}
case <-wheel.After(s.period):
if wsFlag {
err := wsConn.writePing()
if err != nil {
log.Warnf("wsConn.writePing() = error:%+v", perrors.WithStack(err))
}
}
s.listener.OnCron(s)
}
s.grNum.Add(1)
if _, err := defaultTimerWheel.AddTimer(sessionTimerLoop, gxtime.TimerLoop, s.period, s); err != nil {
panic(fmt.Sprintf("failed to add session %s to defaultTimerWheel", s.Stat()))
}
// start read gr
go s.handlePackage()
}
func (s *session) addTask(pkg interface{}) {
......@@ -553,9 +528,7 @@ func (s *session) handlePackage() {
rBuf = rBuf[:runtime.Stack(rBuf, false)]
log.Errorf("[session.handlePackage] panic session %s: err=%s\n%s", s.sessionToken(), r, rBuf)
}
close(s.rDone)
grNum := atomic.AddInt32(&(s.grNum), -1)
grNum := s.grNum.Add(1)
log.Infof("%s, [session.handlePackage] gr will exit now, left gr num %d", s.sessionToken(), grNum)
s.stop()
if err != nil {
......@@ -564,6 +537,9 @@ func (s *session) handlePackage() {
s.listener.OnError(s, err)
}
}
s.listener.OnClose(s)
s.gc()
}()
if _, ok := s.Connection.(*gettyTCPConn); ok {
......@@ -702,8 +678,8 @@ func (s *session) handleUDPPackage() error {
if int(s.maxMsgLen<<1) < bufLen {
maxBufLen = int(s.maxMsgLen << 1)
}
bufp = gxbytes.GetBytes(maxBufLen)
defer gxbytes.PutBytes(bufp)
bufp = gxbytes.AcquireBytes(maxBufLen)
defer gxbytes.ReleaseBytes(bufp)
buf = *bufp
for {
if s.IsClosed() {
......@@ -847,6 +823,5 @@ func (s *session) gc() {
// or (session)handleLoop automatically. It's thread safe.
func (s *session) Close() {
s.stop()
log.Infof("%s closed now. its current gr num is %d",
s.sessionToken(), atomic.LoadInt32(&(s.grNum)))
log.Infof("%s closed now. its current gr num is %d", s.sessionToken(), s.grNum.Load())
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment