Unverified Commit 66f25061 authored by Xin.Zh's avatar Xin.Zh Committed by GitHub

Merge pull request #80 from takewofly/bug/race

Opt: change timeout to atomic
parents d58f9f35 04c5bef7
...@@ -29,11 +29,8 @@ import ( ...@@ -29,11 +29,8 @@ import (
import ( import (
"github.com/golang/snappy" "github.com/golang/snappy"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
perrors "github.com/pkg/errors" perrors "github.com/pkg/errors"
uatomic "go.uber.org/atomic" uatomic "go.uber.org/atomic"
) )
...@@ -77,17 +74,17 @@ type gettyConn struct { ...@@ -77,17 +74,17 @@ type gettyConn struct {
compress CompressType compress CompressType
padding1 uint8 padding1 uint8
padding2 uint16 padding2 uint16
readBytes uatomic.Uint32 // read bytes readBytes uatomic.Uint32 // read bytes
writeBytes uatomic.Uint32 // write bytes writeBytes uatomic.Uint32 // write bytes
readPkgNum uatomic.Uint32 // send pkg number readPkgNum uatomic.Uint32 // send pkg number
writePkgNum uatomic.Uint32 // recv pkg number writePkgNum uatomic.Uint32 // recv pkg number
active uatomic.Int64 // last active, in milliseconds active uatomic.Int64 // last active, in milliseconds
rTimeout time.Duration // network current limiting rTimeout uatomic.Duration // network current limiting
wTimeout time.Duration wTimeout uatomic.Duration
rLastDeadline time.Time // last network read time rLastDeadline uatomic.Time // last network read time
wLastDeadline time.Time // last network write time wLastDeadline uatomic.Time // last network write time
local string // local address local string // local address
peer string // peer address peer string // peer address
ss Session ss Session
} }
...@@ -126,7 +123,7 @@ func (c *gettyConn) send(interface{}) (int, error) { ...@@ -126,7 +123,7 @@ func (c *gettyConn) send(interface{}) (int, error) {
func (c *gettyConn) close(int) {} func (c *gettyConn) close(int) {}
func (c gettyConn) readTimeout() time.Duration { func (c gettyConn) readTimeout() time.Duration {
return c.rTimeout return c.rTimeout.Load()
} }
func (c *gettyConn) setSession(ss Session) { func (c *gettyConn) setSession(ss Session) {
...@@ -142,14 +139,14 @@ func (c *gettyConn) SetReadTimeout(rTimeout time.Duration) { ...@@ -142,14 +139,14 @@ func (c *gettyConn) SetReadTimeout(rTimeout time.Duration) {
panic("@rTimeout < 1") panic("@rTimeout < 1")
} }
c.rTimeout = rTimeout c.rTimeout.Store(rTimeout)
if c.wTimeout == 0 { if c.wTimeout.Load() == 0 {
c.wTimeout = rTimeout c.wTimeout.Store(rTimeout)
} }
} }
func (c gettyConn) writeTimeout() time.Duration { func (c gettyConn) writeTimeout() time.Duration {
return c.wTimeout return c.wTimeout.Load()
} }
// SetWriteTimeout Pls do not set write deadline for websocket connection. AlexStocks 20180310 // SetWriteTimeout Pls do not set write deadline for websocket connection. AlexStocks 20180310
...@@ -161,9 +158,9 @@ func (c *gettyConn) SetWriteTimeout(wTimeout time.Duration) { ...@@ -161,9 +158,9 @@ func (c *gettyConn) SetWriteTimeout(wTimeout time.Duration) {
panic("@wTimeout < 1") panic("@wTimeout < 1")
} }
c.wTimeout = wTimeout c.wTimeout.Store(wTimeout)
if c.rTimeout == 0 { if c.rTimeout.Load() == 0 {
c.rTimeout = wTimeout c.rTimeout.Store(wTimeout)
} }
} }
...@@ -198,8 +195,8 @@ func newGettyTCPConn(conn net.Conn) *gettyTCPConn { ...@@ -198,8 +195,8 @@ func newGettyTCPConn(conn net.Conn) *gettyTCPConn {
writer: io.Writer(conn), writer: io.Writer(conn),
gettyConn: gettyConn{ gettyConn: gettyConn{
id: connID.Add(1), id: connID.Add(1),
rTimeout: netIOTimeout, rTimeout: *uatomic.NewDuration(netIOTimeout),
wTimeout: netIOTimeout, wTimeout: *uatomic.NewDuration(netIOTimeout),
local: localAddr, local: localAddr,
peer: peerAddr, peer: peerAddr,
compress: CompressNone, compress: CompressNone,
...@@ -266,17 +263,17 @@ func (t *gettyTCPConn) recv(p []byte) (int, error) { ...@@ -266,17 +263,17 @@ func (t *gettyTCPConn) recv(p []byte) (int, error) {
) )
// set read timeout deadline // set read timeout deadline
if t.compress == CompressNone && t.rTimeout > 0 { if t.compress == CompressNone && t.rTimeout.Load() > 0 {
// Optimization: update read deadline only if more than 25% // Optimization: update read deadline only if more than 25%
// of the last read deadline exceeded. // of the last read deadline exceeded.
// See https://github.com/golang/go/issues/15133 for details. // See https://github.com/golang/go/issues/15133 for details.
currentTime = time.Now() currentTime = time.Now()
if currentTime.Sub(t.rLastDeadline) > t.rTimeout>>2 { if currentTime.Sub(t.rLastDeadline.Load()) > t.rTimeout.Load()>>2 {
if err = t.conn.SetReadDeadline(currentTime.Add(t.rTimeout)); err != nil { if err = t.conn.SetReadDeadline(currentTime.Add(t.rTimeout.Load())); err != nil {
// just a timeout error // just a timeout error
return 0, perrors.WithStack(err) return 0, perrors.WithStack(err)
} }
t.rLastDeadline = currentTime t.rLastDeadline.Store(currentTime)
} }
} }
...@@ -296,16 +293,16 @@ func (t *gettyTCPConn) send(pkg interface{}) (int, error) { ...@@ -296,16 +293,16 @@ func (t *gettyTCPConn) send(pkg interface{}) (int, error) {
lg int64 lg int64
) )
if t.compress == CompressNone && t.wTimeout > 0 { if t.compress == CompressNone && t.wTimeout.Load() > 0 {
// Optimization: update write deadline only if more than 25% // Optimization: update write deadline only if more than 25%
// of the last write deadline exceeded. // of the last write deadline exceeded.
// See https://github.com/golang/go/issues/15133 for details. // See https://github.com/golang/go/issues/15133 for details.
currentTime = time.Now() currentTime = time.Now()
if currentTime.Sub(t.wLastDeadline) > t.wTimeout>>2 { if currentTime.Sub(t.wLastDeadline.Load()) > t.wTimeout.Load()>>2 {
if err = t.conn.SetWriteDeadline(currentTime.Add(t.wTimeout)); err != nil { if err = t.conn.SetWriteDeadline(currentTime.Add(t.wTimeout.Load())); err != nil {
return 0, perrors.WithStack(err) return 0, perrors.WithStack(err)
} }
t.wLastDeadline = currentTime t.wLastDeadline.Store(currentTime)
} }
} }
...@@ -396,8 +393,8 @@ func newGettyUDPConn(conn *net.UDPConn) *gettyUDPConn { ...@@ -396,8 +393,8 @@ func newGettyUDPConn(conn *net.UDPConn) *gettyUDPConn {
conn: conn, conn: conn,
gettyConn: gettyConn{ gettyConn: gettyConn{
id: connID.Add(1), id: connID.Add(1),
rTimeout: netIOTimeout, rTimeout: *uatomic.NewDuration(netIOTimeout),
wTimeout: netIOTimeout, wTimeout: *uatomic.NewDuration(netIOTimeout),
local: localAddr, local: localAddr,
peer: peerAddr, peer: peerAddr,
compress: CompressNone, compress: CompressNone,
...@@ -417,16 +414,16 @@ func (u *gettyUDPConn) SetCompressType(c CompressType) { ...@@ -417,16 +414,16 @@ func (u *gettyUDPConn) SetCompressType(c CompressType) {
// udp connection read // udp connection read
func (u *gettyUDPConn) recv(p []byte) (int, *net.UDPAddr, error) { func (u *gettyUDPConn) recv(p []byte) (int, *net.UDPAddr, error) {
if u.rTimeout > 0 { if u.rTimeout.Load() > 0 {
// Optimization: update read deadline only if more than 25% // Optimization: update read deadline only if more than 25%
// of the last read deadline exceeded. // of the last read deadline exceeded.
// See https://github.com/golang/go/issues/15133 for details. // See https://github.com/golang/go/issues/15133 for details.
currentTime := time.Now() currentTime := time.Now()
if currentTime.Sub(u.rLastDeadline) > u.rTimeout>>2 { if currentTime.Sub(u.rLastDeadline.Load()) > u.rTimeout.Load()>>2 {
if err := u.conn.SetReadDeadline(currentTime.Add(u.rTimeout)); err != nil { if err := u.conn.SetReadDeadline(currentTime.Add(u.rTimeout.Load())); err != nil {
return 0, nil, perrors.WithStack(err) return 0, nil, perrors.WithStack(err)
} }
u.rLastDeadline = currentTime u.rLastDeadline.Store(currentTime)
} }
} }
...@@ -464,16 +461,16 @@ func (u *gettyUDPConn) send(udpCtx interface{}) (int, error) { ...@@ -464,16 +461,16 @@ func (u *gettyUDPConn) send(udpCtx interface{}) (int, error) {
} }
} }
if u.wTimeout > 0 { if u.wTimeout.Load() > 0 {
// Optimization: update write deadline only if more than 25% // Optimization: update write deadline only if more than 25%
// of the last write deadline exceeded. // of the last write deadline exceeded.
// See https://github.com/golang/go/issues/15133 for details. // See https://github.com/golang/go/issues/15133 for details.
currentTime = time.Now() currentTime = time.Now()
if currentTime.Sub(u.wLastDeadline) > u.wTimeout>>2 { if currentTime.Sub(u.wLastDeadline.Load()) > u.wTimeout.Load()>>2 {
if err = u.conn.SetWriteDeadline(currentTime.Add(u.wTimeout)); err != nil { if err = u.conn.SetWriteDeadline(currentTime.Add(u.wTimeout.Load())); err != nil {
return 0, perrors.WithStack(err) return 0, perrors.WithStack(err)
} }
u.wLastDeadline = currentTime u.wLastDeadline.Store(currentTime)
} }
} }
...@@ -521,8 +518,8 @@ func newGettyWSConn(conn *websocket.Conn) *gettyWSConn { ...@@ -521,8 +518,8 @@ func newGettyWSConn(conn *websocket.Conn) *gettyWSConn {
conn: conn, conn: conn,
gettyConn: gettyConn{ gettyConn: gettyConn{
id: connID.Add(1), id: connID.Add(1),
rTimeout: netIOTimeout, rTimeout: *uatomic.NewDuration(netIOTimeout),
wTimeout: netIOTimeout, wTimeout: *uatomic.NewDuration(netIOTimeout),
local: localAddr, local: localAddr,
peer: peerAddr, peer: peerAddr,
compress: CompressNone, compress: CompressNone,
...@@ -589,16 +586,16 @@ func (w *gettyWSConn) updateWriteDeadline() error { ...@@ -589,16 +586,16 @@ func (w *gettyWSConn) updateWriteDeadline() error {
currentTime time.Time currentTime time.Time
) )
if w.wTimeout > 0 { if w.wTimeout.Load() > 0 {
// Optimization: update write deadline only if more than 25% // Optimization: update write deadline only if more than 25%
// of the last write deadline exceeded. // of the last write deadline exceeded.
// See https://github.com/golang/go/issues/15133 for details. // See https://github.com/golang/go/issues/15133 for details.
currentTime = time.Now() currentTime = time.Now()
if currentTime.Sub(w.wLastDeadline) > w.wTimeout>>2 { if currentTime.Sub(w.wLastDeadline.Load()) > w.wTimeout.Load()>>2 {
if err = w.conn.SetWriteDeadline(currentTime.Add(w.wTimeout)); err != nil { if err = w.conn.SetWriteDeadline(currentTime.Add(w.wTimeout.Load())); err != nil {
return perrors.WithStack(err) return perrors.WithStack(err)
} }
w.wLastDeadline = currentTime w.wLastDeadline.Store(currentTime)
} }
} }
......
...@@ -9,6 +9,6 @@ require ( ...@@ -9,6 +9,6 @@ require (
github.com/montanaflynn/stats v0.6.6 github.com/montanaflynn/stats v0.6.6
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.7.0 github.com/stretchr/testify v1.7.0
go.uber.org/atomic v1.7.0 go.uber.org/atomic v1.9.0
go.uber.org/zap v1.16.0 go.uber.org/zap v1.16.0
) )
...@@ -81,6 +81,8 @@ github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8 ...@@ -81,6 +81,8 @@ github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8
github.com/dubbogo/go-zookeeper v1.0.3/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c= github.com/dubbogo/go-zookeeper v1.0.3/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c=
github.com/dubbogo/gost v1.11.12 h1:e3861DxHWe509whpMxS6mFBmgmm7r9+bT5iJ/PRufcw= github.com/dubbogo/gost v1.11.12 h1:e3861DxHWe509whpMxS6mFBmgmm7r9+bT5iJ/PRufcw=
github.com/dubbogo/gost v1.11.12/go.mod h1:vIcP9rqz2KsXHPjsAwIUtfJIJjppQLQDcYaZTy/61jI= github.com/dubbogo/gost v1.11.12/go.mod h1:vIcP9rqz2KsXHPjsAwIUtfJIJjppQLQDcYaZTy/61jI=
github.com/dubbogo/gost v1.11.19 h1:R1rZ3TNJKV9W5XHLMv+GDO2Wy6UDnwGQtVWbsWYvo0A=
github.com/dubbogo/gost v1.11.19/go.mod h1:vIcP9rqz2KsXHPjsAwIUtfJIJjppQLQDcYaZTy/61jI=
github.com/dubbogo/jsonparser v1.0.1/go.mod h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU= github.com/dubbogo/jsonparser v1.0.1/go.mod h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
...@@ -413,8 +415,9 @@ go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= ...@@ -413,8 +415,9 @@ go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A= go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A=
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment