Commit 8402bd41 authored by yuyu's avatar yuyu

merge upstream

parents 7731d5c4 45d3d7d6
# Comment to a new issue.
issueOpened: >
Thank your for raising a issue. We will try and get back to you as soon as possible.
Please make sure you have given us as much context as possible.
pullRequestOpened: >
Thank your for raising your pull request.
Please make sure you have followed our contributing guidelines. We will review it as soon as possible
...@@ -19,6 +19,4 @@ classes ...@@ -19,6 +19,4 @@ classes
# vim # vim
*.swp *.swp
# go mod vendor/
go.mod
go.sum
language: go language: go
go: go:
- "1.11"
- "1.12"
- "1.13" - "1.13"
env:
- GO111MODULE=on
script: script:
- go fmt ./... && [[ -z `git status -s` ]] - go fmt ./... && [[ -z `git status -s` ]]
- GO111MODULE=on && go mod vendor && go test ./... -bench . -race -v - go mod vendor && go test $(go list ./... | grep -v vendor | grep -v demo) -coverprofile=coverage.txt -covermode=atomic
after_success:
- bash <(curl -s https://codecov.io/bash)
notifications:
webhooks: https://oapi.dingtalk.com/robot/send?access_token=75f4f1ec3868508aa89e5a5d6f9d342216809df3ebc8a78c8ae8722848e06166
\ No newline at end of file
...@@ -2,9 +2,15 @@ ...@@ -2,9 +2,15 @@
*a netty like asynchronous network I/O library* *a netty like asynchronous network I/O library*
[![Build Status](https://travis-ci.org/dubbogo/getty.svg?branch=master)](https://travis-ci.org/dubbogo/getty)
[![codecov](https://codecov.io/gh/dubbogo/getty/branch/master/graph/badge.svg)](https://codecov.io/gh/dubbogo/getty)
[![go.dev reference](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-square)](https://pkg.go.dev/github.com/dubbogo/getty?tab=doc)
[![Go Report Card](https://goreportcard.com/badge/github.com/dubbogo/getty)](https://goreportcard.com/report/github.com/dubbogo/getty)
![license](https://img.shields.io/badge/license-Apache--2.0-green.svg)
## INTRO ## INTRO
Getty is a asynchronous network I/O library in golang. Getty is based on "ngo" whose author is [sanbit](https://github.com/sanbit). Getty works on tcp/udp/websocket network protocol and supplies [a uniform interface](https://github.com/dubbogo/getty/blob/master/getty.go#L45). Getty is a asynchronous network I/O library in golang. Getty works on tcp/udp/websocket network protocol and supplies [a uniform interface](https://github.com/dubbogo/getty/blob/master/getty.go#L45).
In getty there are two goroutines in one connection(session), one reads tcp stream/udp packet/websocket package, the other handles logic process and writes response into network write buffer. If your logic process may take a long time, you should start a new logic process goroutine by yourself in codec.go:(Codec)OnMessage. In getty there are two goroutines in one connection(session), one reads tcp stream/udp packet/websocket package, the other handles logic process and writes response into network write buffer. If your logic process may take a long time, you should start a new logic process goroutine by yourself in codec.go:(Codec)OnMessage.
...@@ -12,8 +18,6 @@ You can also handle heartbeat logic in codec.go:(Codec):OnCron. If you use tcp/u ...@@ -12,8 +18,6 @@ You can also handle heartbeat logic in codec.go:(Codec):OnCron. If you use tcp/u
Whatever if you use websocket, you do not need to care about hearbeat request/response because Getty do this task in session.go:(Session)handleLoop by sending/received websocket ping/pong frames. You just need to check whether the websocket session has been timeout or not in codec.go:(Codec)OnCron by session.go:(Session)GetActive. Whatever if you use websocket, you do not need to care about hearbeat request/response because Getty do this task in session.go:(Session)handleLoop by sending/received websocket ping/pong frames. You just need to check whether the websocket session has been timeout or not in codec.go:(Codec)OnCron by session.go:(Session)GetActive.
You can get code example in https://github.com/AlexStocks/getty-examples.
## LICENCE ## LICENCE
Apache License 2.0 Apache License 2.0
......
This diff is collapsed.
...@@ -24,9 +24,9 @@ import ( ...@@ -24,9 +24,9 @@ import (
import ( import (
"github.com/dubbogo/gost/bytes" "github.com/dubbogo/gost/bytes"
"github.com/dubbogo/gost/net"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
perrors "github.com/pkg/errors" perrors "github.com/pkg/errors"
"github.com/dubbogo/gost/net"
) )
const ( const (
......
This diff is collapsed.
...@@ -24,8 +24,6 @@ import ( ...@@ -24,8 +24,6 @@ import (
"github.com/golang/snappy" "github.com/golang/snappy"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
perrors "github.com/pkg/errors" perrors "github.com/pkg/errors"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
) )
var ( var (
...@@ -251,7 +249,7 @@ func (t *gettyTCPConn) recv(p []byte) (int, error) { ...@@ -251,7 +249,7 @@ func (t *gettyTCPConn) recv(p []byte) (int, error) {
} }
length, err = t.reader.Read(p) length, err = t.reader.Read(p)
log.Debugf("now:%s, length:%d, err:%v", currentTime, length, err) // log.Debugf("now:%s, length:%d, err:%v", currentTime, length, err)
atomic.AddUint32(&t.readBytes, uint32(length)) atomic.AddUint32(&t.readBytes, uint32(length))
return length, perrors.WithStack(err) return length, perrors.WithStack(err)
//return length, err //return length, err
...@@ -267,9 +265,6 @@ func (t *gettyTCPConn) send(pkg interface{}) (int, error) { ...@@ -267,9 +265,6 @@ func (t *gettyTCPConn) send(pkg interface{}) (int, error) {
length int length int
) )
if p, ok = pkg.([]byte); !ok {
return 0, perrors.Errorf("illegal @pkg{%#v} type", pkg)
}
if t.compress == CompressNone && t.wTimeout > 0 { if t.compress == CompressNone && t.wTimeout > 0 {
// Optimization: update write deadline only if more than 25% // Optimization: update write deadline only if more than 25%
// of the last write deadline exceeded. // of the last write deadline exceeded.
...@@ -283,12 +278,28 @@ func (t *gettyTCPConn) send(pkg interface{}) (int, error) { ...@@ -283,12 +278,28 @@ func (t *gettyTCPConn) send(pkg interface{}) (int, error) {
} }
} }
if length, err = t.writer.Write(p); err == nil { if buffers, ok := pkg.([][]byte); ok {
atomic.AddUint32(&t.writeBytes, (uint32)(len(p))) netBuf := net.Buffers(buffers)
if length, err := netBuf.WriteTo(t.conn); err == nil {
atomic.AddUint32(&t.writeBytes, (uint32)(length))
atomic.AddUint32(&t.writePkgNum, (uint32)(len(buffers)))
}
log.Debugf("localAddr: %s, remoteAddr:%s, now:%s, length:%d, err:%s",
t.conn.LocalAddr(), t.conn.RemoteAddr(), currentTime, length, err)
return int(length), perrors.WithStack(err)
} }
log.Debugf("now:%s, length:%d, err:%v", currentTime, length, err)
return length, perrors.WithStack(err) if p, ok = pkg.([]byte); ok {
//return length, err if length, err = t.writer.Write(p); err == nil {
atomic.AddUint32(&t.writeBytes, (uint32)(len(p)))
atomic.AddUint32(&t.writePkgNum, 1)
}
log.Debugf("localAddr: %s, remoteAddr:%s, now:%s, length:%d, err:%s",
t.conn.LocalAddr(), t.conn.RemoteAddr(), currentTime, length, err)
return length, perrors.WithStack(err)
}
return 0, perrors.Errorf("illegal @pkg{%#v} type", pkg)
} }
// close tcp connection // close tcp connection
...@@ -328,17 +339,6 @@ type gettyUDPConn struct { ...@@ -328,17 +339,6 @@ type gettyUDPConn struct {
conn *net.UDPConn // for server conn *net.UDPConn // for server
} }
func setUDPSocketOptions(conn *net.UDPConn) error {
// Try setting the flags for both families and ignore the errors unless they
// both error.
err6 := ipv6.NewPacketConn(conn).SetControlMessage(ipv6.FlagDst|ipv6.FlagInterface, true)
err4 := ipv4.NewPacketConn(conn).SetControlMessage(ipv4.FlagDst|ipv4.FlagInterface, true)
if err6 != nil && err4 != nil {
return perrors.WithStack(err4)
}
return nil
}
// create gettyUDPConn // create gettyUDPConn
func newGettyUDPConn(conn *net.UDPConn) *gettyUDPConn { func newGettyUDPConn(conn *net.UDPConn) *gettyUDPConn {
if conn == nil { if conn == nil {
...@@ -450,11 +450,11 @@ func (u *gettyUDPConn) send(udpCtx interface{}) (int, error) { ...@@ -450,11 +450,11 @@ func (u *gettyUDPConn) send(udpCtx interface{}) (int, error) {
if length, _, err = u.conn.WriteMsgUDP(buf, nil, peerAddr); err == nil { if length, _, err = u.conn.WriteMsgUDP(buf, nil, peerAddr); err == nil {
atomic.AddUint32(&u.writeBytes, (uint32)(len(buf))) atomic.AddUint32(&u.writeBytes, (uint32)(len(buf)))
atomic.AddUint32(&u.writePkgNum, 1)
} }
log.Debugf("WriteMsgUDP(peerAddr:%s) = {length:%d, error:%v}", peerAddr, length, err) log.Debugf("WriteMsgUDP(peerAddr:%s) = {length:%d, error:%v}", peerAddr, length, err)
return length, perrors.WithStack(err) return length, perrors.WithStack(err)
//return length, err
} }
// close udp connection // close udp connection
...@@ -544,7 +544,7 @@ func (w *gettyWSConn) recv() ([]byte, error) { ...@@ -544,7 +544,7 @@ func (w *gettyWSConn) recv() ([]byte, error) {
// gorilla/websocket/conn.go:NextReader will always fail when got a timeout error. // gorilla/websocket/conn.go:NextReader will always fail when got a timeout error.
_, b, e := w.conn.ReadMessage() // the first return value is message type. _, b, e := w.conn.ReadMessage() // the first return value is message type.
if e == nil { if e == nil {
w.incReadPkgNum() atomic.AddUint32(&w.readBytes, (uint32)(len(b)))
} else { } else {
if websocket.IsUnexpectedCloseError(e, websocket.CloseGoingAway) { if websocket.IsUnexpectedCloseError(e, websocket.CloseGoingAway) {
log.Warnf("websocket unexpected close error: %v", e) log.Warnf("websocket unexpected close error: %v", e)
...@@ -592,9 +592,9 @@ func (w *gettyWSConn) send(pkg interface{}) (int, error) { ...@@ -592,9 +592,9 @@ func (w *gettyWSConn) send(pkg interface{}) (int, error) {
w.updateWriteDeadline() w.updateWriteDeadline()
if err = w.conn.WriteMessage(websocket.BinaryMessage, p); err == nil { if err = w.conn.WriteMessage(websocket.BinaryMessage, p); err == nil {
atomic.AddUint32(&w.writeBytes, (uint32)(len(p))) atomic.AddUint32(&w.writeBytes, (uint32)(len(p)))
atomic.AddUint32(&w.writePkgNum, 1)
} }
return len(p), perrors.WithStack(err) return len(p), perrors.WithStack(err)
//return len(p), err
} }
func (w *gettyWSConn) writePing() error { func (w *gettyWSConn) writePing() error {
......
module github.com/dubbogo/getty module github.com/dubbogo/getty
require ( require (
github.com/dubbogo/gost v1.5.2 // indirect github.com/dubbogo/gost v1.5.2
github.com/golang/snappy v0.0.1 // indirect github.com/golang/snappy v0.0.1
github.com/gorilla/websocket v1.4.0 // indirect github.com/gorilla/websocket v1.4.0
github.com/juju/errors v0.0.0-20190930114154-d42613fe1ab9
github.com/pkg/errors v0.8.1
github.com/stretchr/testify v1.5.1
go.uber.org/atomic v1.4.0 // indirect go.uber.org/atomic v1.4.0 // indirect
go.uber.org/multierr v1.1.0 // indirect go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.10.0 // indirect go.uber.org/zap v1.10.0
golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53 // indirect
) )
go 1.13 go 1.13
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dubbogo/gost v1.5.2 h1:ri/03971hdpnn3QeCU+4UZgnRNGDXLDGDucR/iozZm8=
github.com/dubbogo/gost v1.5.2/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8= github.com/dubbogo/gost v1.5.2/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/juju/errors v0.0.0-20190930114154-d42613fe1ab9 h1:hJix6idebFclqlfZCHE7EUX7uqLCyb70nHNHH1XKGBg=
github.com/juju/errors v0.0.0-20190930114154-d42613fe1ab9/go.mod h1:W54LbzXuIE0boCoNJfwqpmkKJ1O4TCTZMetAt6jGk7Q=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
package getty
import (
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
func TestClientOptions(t *testing.T) {
addr := "127.0.0.1:0"
file := ""
clt := newClient(TCP_CLIENT,
WithServerAddress(addr),
WithReconnectInterval(5e8),
WithConnectionNumber(1),
WithRootCertificateFile(file),
)
assert.NotNil(t, clt)
assert.Equal(t, clt.endPointType, TCP_CLIENT)
assert.True(t, clt.endPointType > 0)
assert.NotNil(t, clt.done)
assert.NotNil(t, clt.ssMap)
assert.Equal(t, clt.addr, addr)
assert.NotNil(t, clt.reconnectInterval)
assert.NotNil(t, clt.cert)
assert.Equal(t, clt.number, 1)
}
func TestServerOptions(t *testing.T) {
addr := "127.0.0.1:0"
path := "/test"
cert := ""
key := "test"
srv := newServer(TCP_SERVER,
WithLocalAddress(addr),
WithWebsocketServerPath(path),
WithWebsocketServerCert(cert),
WithWebsocketServerPrivateKey(key),
WithWebsocketServerRootCert(cert),
)
assert.NotNil(t, srv)
assert.Equal(t, srv.endPointType, TCP_SERVER)
assert.True(t, srv.endPointType > 0)
assert.NotNil(t, srv.done)
assert.Equal(t, srv.addr, addr)
assert.Equal(t, srv.path, path)
assert.Equal(t, srv.cert, cert)
assert.Equal(t, srv.privateKey, key)
assert.Equal(t, srv.caCert, cert)
}
...@@ -23,9 +23,9 @@ import ( ...@@ -23,9 +23,9 @@ import (
) )
import ( import (
"github.com/dubbogo/gost/net"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
perrors "github.com/pkg/errors" perrors "github.com/pkg/errors"
"github.com/dubbogo/gost/net"
) )
var ( var (
...@@ -188,9 +188,6 @@ func (s *server) listenUDP() error { ...@@ -188,9 +188,6 @@ func (s *server) listenUDP() error {
if err != nil { if err != nil {
return perrors.Wrapf(err, "net.ListenUDP((udp, localAddr:%#v)", localAddr) return perrors.Wrapf(err, "net.ListenUDP((udp, localAddr:%#v)", localAddr)
} }
// if err = setUDPSocketOptions(pktListener); err != nil {
// return perrors.Wrapf(err, "setUDPSocketOptions(pktListener:%#v)", pktListener)
// }
s.pktListener = pktListener s.pktListener = pktListener
......
package getty
import (
"testing"
"time"
)
import (
"github.com/stretchr/testify/assert"
)
func TestTCPServer(t *testing.T) {
var (
server *server
serverMsgHandler MessageHandler
)
addr := "127.0.0.1:0"
func() {
server = newServer(
TCP_SERVER,
WithLocalAddress(addr),
)
newServerSession := func(session Session) error {
return newSessionCallback(session, &serverMsgHandler)
}
server.RunEventLoop(newServerSession)
assert.True(t, server.ID() > 0)
assert.True(t, server.EndPointType() == TCP_SERVER)
}()
time.Sleep(500e6)
addr = server.streamListener.Addr().String()
t.Logf("server addr: %v", addr)
clt := newClient(TCP_CLIENT,
WithServerAddress(addr),
WithReconnectInterval(5e8),
WithConnectionNumber(1),
)
assert.NotNil(t, clt)
assert.True(t, clt.ID() > 0)
assert.Equal(t, clt.endPointType, TCP_CLIENT)
var (
msgHandler MessageHandler
)
cb := func(session Session) error {
return newSessionCallback(session, &msgHandler)
}
clt.RunEventLoop(cb)
time.Sleep(1e9)
assert.Equal(t, 1, msgHandler.SessionNumber())
clt.Close()
assert.True(t, clt.IsClosed())
server.Close()
assert.True(t, server.IsClosed())
}
func TestUDPServer(t *testing.T) {
var (
server *server
serverMsgHandler MessageHandler
)
addr := "127.0.0.1:0"
func() {
server = newServer(
UDP_ENDPOINT,
WithLocalAddress(addr),
)
newServerSession := func(session Session) error {
return newSessionCallback(session, &serverMsgHandler)
}
server.RunEventLoop(newServerSession)
assert.True(t, server.ID() > 0)
assert.True(t, server.EndPointType() == UDP_ENDPOINT)
}()
time.Sleep(500e6)
//addr = server.streamListener.Addr().String()
//t.Logf("server addr: %v", addr)
//clt := newClient(TCP_CLIENT,
// WithServerAddress(addr),
// WithReconnectInterval(5e8),
// WithConnectionNumber(1),
//)
//assert.NotNil(t, clt)
//assert.True(t, clt.ID() > 0)
//assert.Equal(t, clt.endPointType, TCP_CLIENT)
//
//var (
// msgHandler MessageHandler
//)
//cb := func(session Session) error {
// return newSessionCallback(session, &msgHandler)
//}
//
//clt.RunEventLoop(cb)
//time.Sleep(1e9)
//
//assert.Equal(t, 1, msgHandler.SessionNumber())
//clt.Close()
//assert.True(t, clt.IsClosed())
//
//server.Close()
//assert.True(t, server.IsClosed())
}
...@@ -12,6 +12,7 @@ package getty ...@@ -12,6 +12,7 @@ package getty
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"io"
"net" "net"
"runtime" "runtime"
"sync" "sync"
...@@ -29,12 +30,14 @@ import ( ...@@ -29,12 +30,14 @@ import (
) )
const ( const (
maxReadBufLen = 4 * 1024 maxReadBufLen = 4 * 1024
netIOTimeout = 1e9 // 1s netIOTimeout = 1e9 // 1s
period = 60 * 1e9 // 1 minute period = 60 * 1e9 // 1 minute
pendingDuration = 3e9 pendingDuration = 3e9
defaultQLen = 1024 defaultQLen = 1024
maxIovecNum = 10 maxIovecNum = 10
MaxWheelTimeSpan = 900e9 // 900s, 15 minute
defaultSessionName = "session" defaultSessionName = "session"
defaultTCPSessionName = "tcp-session" defaultTCPSessionName = "tcp-session"
defaultUDPSessionName = "udp-session" defaultUDPSessionName = "udp-session"
...@@ -48,9 +51,15 @@ const ( ...@@ -48,9 +51,15 @@ const (
///////////////////////////////////////// /////////////////////////////////////////
var ( var (
wheel = gxtime.NewWheel(gxtime.TimeMillisecondDuration(100), 1200) // wheel longest span is 2 minute wheel *gxtime.Wheel
) )
func init() {
span := 100e6 // 100ms
buckets := MaxWheelTimeSpan / span
wheel = gxtime.NewWheel(time.Duration(span), int(buckets)) // wheel longest span is 15 minute
}
func GetTimeWheel() *gxtime.Wheel { func GetTimeWheel() *gxtime.Wheel {
return wheel return wheel
} }
...@@ -82,7 +91,7 @@ type session struct { ...@@ -82,7 +91,7 @@ type session struct {
// done // done
wait time.Duration wait time.Duration
once sync.Once once *sync.Once
done chan struct{} done chan struct{}
// attribute // attribute
...@@ -106,6 +115,7 @@ func newSession(endPoint EndPoint, conn Connection) *session { ...@@ -106,6 +115,7 @@ func newSession(endPoint EndPoint, conn Connection) *session {
period: period, period: period,
once: &sync.Once{},
done: make(chan struct{}), done: make(chan struct{}),
wait: pendingDuration, wait: pendingDuration,
attrs: gxcontext.NewValuesContext(nil), attrs: gxcontext.NewValuesContext(nil),
...@@ -144,17 +154,15 @@ func newWSSession(conn *websocket.Conn, endPoint EndPoint) Session { ...@@ -144,17 +154,15 @@ func newWSSession(conn *websocket.Conn, endPoint EndPoint) Session {
} }
func (s *session) Reset() { func (s *session) Reset() {
s.name = defaultSessionName *s = session{
s.once = sync.Once{} name: defaultSessionName,
s.done = make(chan struct{}) once: &sync.Once{},
s.period = period done: make(chan struct{}),
s.wait = pendingDuration period: period,
s.attrs = gxcontext.NewValuesContext(nil) wait: pendingDuration,
s.rDone = make(chan struct{}) attrs: gxcontext.NewValuesContext(nil),
s.grNum = 0 rDone: make(chan struct{}),
}
s.SetWriteTimeout(netIOTimeout)
s.SetReadTimeout(netIOTimeout)
} }
// func (s *session) SetConn(conn net.Conn) { s.gettyConn = newGettyConn(conn) } // func (s *session) SetConn(conn net.Conn) { s.gettyConn = newGettyConn(conn) }
...@@ -293,7 +301,7 @@ func (s *session) SetWQLen(writeQLen int) { ...@@ -293,7 +301,7 @@ func (s *session) SetWQLen(writeQLen int) {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
s.wQ = make(chan interface{}, writeQLen) s.wQ = make(chan interface{}, writeQLen)
log.Debug("%s, [session.SetWQLen] wQ{len:%d, cap:%d}", s.Stat(), len(s.wQ), cap(s.wQ)) log.Debugf("%s, [session.SetWQLen] wQ{len:%d, cap:%d}", s.Stat(), len(s.wQ), cap(s.wQ))
} }
// set maximum wait time when session got error or got exit signal // set maximum wait time when session got error or got exit signal
...@@ -351,6 +359,10 @@ func (s *session) RemoveAttribute(key interface{}) { ...@@ -351,6 +359,10 @@ func (s *session) RemoveAttribute(key interface{}) {
} }
func (s *session) sessionToken() string { func (s *session) sessionToken() string {
if s.IsClosed() || s.Connection == nil {
return "session-closed"
}
return fmt.Sprintf("{%s:%s:%d:%s<->%s}", return fmt.Sprintf("{%s:%s:%d:%s<->%s}",
s.name, s.EndPoint().EndPointType(), s.ID(), s.LocalAddr(), s.RemoteAddr()) s.name, s.EndPoint().EndPointType(), s.ID(), s.LocalAddr(), s.RemoteAddr())
} }
...@@ -392,10 +404,9 @@ func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error { ...@@ -392,10 +404,9 @@ func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error {
} }
_, err = s.Connection.send(pkg) _, err = s.Connection.send(pkg)
if err != nil { if err != nil {
log.Warn("%s, [session.WritePkg] @s.Connection.Write(pkg:%#v) = err:%v", s.Stat(), pkg, err) log.Warnf("%s, [session.WritePkg] @s.Connection.Write(pkg:%#v) = err:%v", s.Stat(), pkg, err)
return perrors.WithStack(err) return perrors.WithStack(err)
} }
s.incWritePkgNum()
return nil return nil
} }
select { select {
...@@ -420,9 +431,6 @@ func (s *session) WriteBytes(pkg []byte) error { ...@@ -420,9 +431,6 @@ func (s *session) WriteBytes(pkg []byte) error {
if _, err := s.Connection.send(pkg); err != nil { if _, err := s.Connection.send(pkg); err != nil {
return perrors.Wrapf(err, "s.Connection.Write(pkg len:%d)", len(pkg)) return perrors.Wrapf(err, "s.Connection.Write(pkg len:%d)", len(pkg))
} }
s.incWritePkgNum()
return nil return nil
} }
...@@ -437,39 +445,10 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error { ...@@ -437,39 +445,10 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error {
return s.WriteBytes(pkgs[0]) return s.WriteBytes(pkgs[0])
} }
// get len // TODO Currently, only TCP is supported.
var ( if _, err := s.Connection.send(pkgs); err != nil {
l int return perrors.Wrapf(err, "s.Connection.Write(pkgs num:%d)", len(pkgs))
err error
length int
arrp *[]byte
arr []byte
)
length = 0
for i := 0; i < len(pkgs); i++ {
length += len(pkgs[i])
}
// merge the pkgs
// arr = make([]byte, length)
arrp = gxbytes.GetBytes(length)
defer gxbytes.PutBytes(arrp)
arr = *arrp
l = 0
for i := 0; i < len(pkgs); i++ {
copy(arr[l:], pkgs[i])
l += len(pkgs[i])
} }
if err = s.WriteBytes(arr); err != nil {
return perrors.WithStack(err)
}
num := len(pkgs) - 1
for i := 0; i < num; i++ {
s.incWritePkgNum()
}
return nil return nil
} }
...@@ -525,7 +504,7 @@ func (s *session) handleLoop() { ...@@ -525,7 +504,7 @@ func (s *session) handleLoop() {
grNum := atomic.AddInt32(&(s.grNum), -1) grNum := atomic.AddInt32(&(s.grNum), -1)
s.listener.OnClose(s) s.listener.OnClose(s)
log.Info("%s, [session.handleLoop] goroutine exit now, left gr num %d", s.Stat(), grNum) log.Infof("%s, [session.handleLoop] goroutine exit now, left gr num %d", s.Stat(), grNum)
s.gc() s.gc()
}() }()
...@@ -555,7 +534,7 @@ LOOP: ...@@ -555,7 +534,7 @@ LOOP:
continue continue
} }
if !flag { if !flag {
log.Warn("[session.handleLoop] drop write out package %#v", outPkg) log.Warnf("[session.handleLoop] drop write out package %#v", outPkg)
continue continue
} }
...@@ -654,7 +633,9 @@ func (s *session) handlePackage() { ...@@ -654,7 +633,9 @@ func (s *session) handlePackage() {
s.stop() s.stop()
if err != nil { if err != nil {
log.Errorf("%s, [session.handlePackage] error:%+v", s.sessionToken(), err) log.Errorf("%s, [session.handlePackage] error:%+v", s.sessionToken(), err)
s.listener.OnError(s, err) if s != nil || s.listener != nil {
s.listener.OnError(s, err)
}
} }
}() }()
...@@ -721,6 +702,12 @@ func (s *session) handleTCPPackage() error { ...@@ -721,6 +702,12 @@ func (s *session) handleTCPPackage() error {
if netError, ok = perrors.Cause(err).(net.Error); ok && netError.Timeout() { if netError, ok = perrors.Cause(err).(net.Error); ok && netError.Timeout() {
break break
} }
if perrors.Cause(err) == io.EOF {
log.Infof("%s, [session.conn.read] = error:%+v", s.sessionToken(), err)
err = nil
exit = true
break
}
log.Errorf("%s, [session.conn.read] = error:%+v", s.sessionToken(), err) log.Errorf("%s, [session.conn.read] = error:%+v", s.sessionToken(), err)
exit = true exit = true
} }
...@@ -745,7 +732,7 @@ func (s *session) handleTCPPackage() error { ...@@ -745,7 +732,7 @@ func (s *session) handleTCPPackage() error {
// handle case 1 // handle case 1
if err != nil { if err != nil {
log.Warnf("%s, [session.handleTCPPackage] = len{%d}, error:%+v", log.Warnf("%s, [session.handleTCPPackage] = len{%d}, error:%+v",
s.sessionToken(), pkgLen, err) s.sessionToken(), pkgLen, perrors.WithStack(err))
exit = true exit = true
break break
} }
...@@ -863,7 +850,7 @@ func (s *session) handleWSPackage() error { ...@@ -863,7 +850,7 @@ func (s *session) handleWSPackage() error {
} }
if err != nil { if err != nil {
log.Warnf("%s, [session.handleWSPackage] = error{%+s}", log.Warnf("%s, [session.handleWSPackage] = error{%+s}",
s.sessionToken(), err) s.sessionToken(), perrors.WithStack(err))
return perrors.WithStack(err) return perrors.WithStack(err)
} }
s.UpdateActive() s.UpdateActive()
...@@ -910,22 +897,34 @@ func (s *session) stop() { ...@@ -910,22 +897,34 @@ func (s *session) stop() {
} }
func (s *session) gc() { func (s *session) gc() {
var (
wQ chan interface{}
conn Connection
)
s.lock.Lock() s.lock.Lock()
if s.attrs != nil { if s.attrs != nil {
s.attrs = nil s.attrs = nil
if s.wQ != nil { if s.wQ != nil {
close(s.wQ) wQ = s.wQ
s.wQ = nil s.wQ = nil
} }
s.Connection.close((int)((int64)(s.wait))) conn = s.Connection
} }
s.lock.Unlock() s.lock.Unlock()
go func() {
if wQ != nil {
conn.close((int)((int64)(s.wait)))
close(wQ)
}
}()
} }
// Close will be invoked by NewSessionCallback(if return error is not nil) // Close will be invoked by NewSessionCallback(if return error is not nil)
// or (session)handleLoop automatically. It's thread safe. // or (session)handleLoop automatically. It's thread safe.
func (s *session) Close() { func (s *session) Close() {
s.stop() s.stop()
log.Info("%s closed now. its current gr num is %d", log.Infof("%s closed now. its current gr num is %d",
s.sessionToken(), atomic.LoadInt32(&(s.grNum))) s.sessionToken(), atomic.LoadInt32(&(s.grNum)))
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment