Unverified Commit 168f412b authored by Xin.Zh's avatar Xin.Zh Committed by GitHub

Ftr: delete session.handleLoop (#56)

* delete session.handleLoop

* fix error codes

* fix unit test

* feat: benckmark

* feat: 添加压测程序

* feat: remove no need info

* feat: del exec file

* feat: fix client's session

* feat: update gomod

* feat: fix imports

* feat: update gomod

* feat: change sessionTimerLoop to heartbeat

* feat: go fmt

* feat: update benchmark client send message

* feat: remove loop_client unreachable code

* increase grNum before starting goroutine

* feat: format

* feat: add dubbo-getty alias

* feat: fix gettyUDPConn recv err
Co-authored-by: 's avatarwatermelon <80680489@qq.com>
Co-authored-by: 's avatargeorgehao <haohongfan@gmail.com>
Co-authored-by: 's avatarwangoo <wongoo@apache.org>
parent 03651cac
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"encoding/binary"
"errors"
"flag"
"fmt"
"log"
"net"
"sync"
"sync/atomic"
"time"
)
import (
getty "github.com/apache/dubbo-getty"
"github.com/dubbogo/gost/sync"
"github.com/montanaflynn/stats"
)
var (
concurrency = flag.Int("c", 1, "concurrency")
total = flag.Int("n", 1, "total requests for all clients")
ip = flag.String("ip", "127.0.0.1:8090", "server IP")
connections = flag.Int("conn", 1, "number of tcp connections")
taskPoolMode = flag.Bool("taskPool", false, "task pool mode")
taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
pprofPort = flag.Int("pprof_port", 65431, "pprof http port")
)
var taskPool gxsync.GenericTaskPool
const (
CronPeriod = 20e9
WritePkgTimeout = 1e8
)
func main() {
flag.Parse()
n := *concurrency
m := *total / n
log.Printf("Servers: %+v\n\n", *ip)
log.Printf("concurrency: %d\nrequests per client: %d\n\n", n, m)
var wg sync.WaitGroup
wg.Add(n * m)
d := make([][]int64, n, n)
var trans uint64
var transOK uint64
totalT := time.Now().UnixNano()
for i := 0; i < n; i++ {
dt := make([]int64, 0, m)
d = append(d, dt)
go func(ii int) {
client := getty.NewTCPClient(
getty.WithServerAddress(*ip),
getty.WithConnectionNumber(*connections),
getty.WithClientTaskPool(taskPool),
)
var tmpSession getty.Session
NewHelloClientSession := func(session getty.Session) (err error) {
pkgHandler := &PackageHandler{}
EventListener := &MessageHandler{}
EventListener.SessionOnOpen = func(session getty.Session) {
tmpSession = session
}
tcpConn, ok := session.Conn().(*net.TCPConn)
if !ok {
panic(fmt.Sprintf("newSession: %s, session.conn{%#v} is not tcp connection", session.Stat(), session.Conn()))
}
if err = tcpConn.SetNoDelay(true); err != nil {
return err
}
if err = tcpConn.SetKeepAlive(true); err != nil {
return err
}
if err = tcpConn.SetKeepAlivePeriod(10 * time.Second); err != nil {
return err
}
if err = tcpConn.SetReadBuffer(262144); err != nil {
return err
}
if err = tcpConn.SetWriteBuffer(524288); err != nil {
return err
}
session.SetName("hello")
session.SetMaxMsgLen(128 * 1024) // max message package length is 128k
session.SetReadTimeout(time.Second)
session.SetWriteTimeout(5 * time.Second)
session.SetCronPeriod(int(CronPeriod / 1e6))
session.SetWaitTime(time.Second)
session.SetPkgHandler(pkgHandler)
session.SetEventListener(EventListener)
return nil
}
client.RunEventLoop(NewHelloClientSession)
for j := 0; j < m; j++ {
atomic.AddUint64(&trans, 1)
t := time.Now().UnixNano()
msg := buildSendMsg()
_, _, err := tmpSession.WritePkg(msg, WritePkgTimeout)
if err != nil {
log.Printf("Err:session.WritePkg(session{%s}, error{%v}", tmpSession.Stat(), err)
}
atomic.AddUint64(&transOK, 1)
t = time.Now().UnixNano() - t
d[ii] = append(d[ii], t)
wg.Done()
}
client.Close()
}(i)
}
wg.Wait()
totalT = time.Now().UnixNano() - totalT
totalT = totalT / 1000000
log.Printf("took %d ms for %d requests", totalT, n*m)
totalD := make([]int64, 0, n*m)
for _, k := range d {
totalD = append(totalD, k...)
}
totalD2 := make([]float64, 0, n*m)
for _, k := range totalD {
totalD2 = append(totalD2, float64(k))
}
mean, _ := stats.Mean(totalD2)
median, _ := stats.Median(totalD2)
max, _ := stats.Max(totalD2)
min, _ := stats.Min(totalD2)
p99, _ := stats.Percentile(totalD2, 99.9)
log.Printf("sent requests : %d\n", n*m)
log.Printf("received requests : %d\n", atomic.LoadUint64(&trans))
log.Printf("received requests_OK : %d\n", atomic.LoadUint64(&transOK))
log.Printf("throughput (TPS) : %d\n", int64(n*m)*1000/totalT)
log.Printf("mean: %.f ns, median: %.f ns, max: %.f ns, min: %.f ns, p99: %.f ns\n", mean, median, max, min, p99)
log.Printf("mean: %d ms, median: %d ms, max: %d ms, min: %d ms, p99: %d ms\n", int64(mean/1000000), int64(median/1000000), int64(max/1000000), int64(min/1000000), int64(p99/1000000))
}
type MessageHandler struct {
SessionOnOpen func(session getty.Session)
}
func (h *MessageHandler) OnOpen(session getty.Session) error {
log.Printf("OnOpen session{%s} open", session.Stat())
if h.SessionOnOpen != nil {
h.SessionOnOpen(session)
}
return nil
}
func (h *MessageHandler) OnError(session getty.Session, err error) {
log.Printf("OnError session{%s} got error{%v}, will be closed.", session.Stat(), err)
}
func (h *MessageHandler) OnClose(session getty.Session) {
log.Printf("hhf OnClose session{%s} is closing......", session.Stat())
}
func (h *MessageHandler) OnMessage(session getty.Session, pkg interface{}) {
log.Printf("OnMessage....")
s, ok := pkg.(string)
if !ok {
log.Printf("illegal package{%#v}", pkg)
return
}
log.Printf("OnMessage: %s", s)
}
func (h *MessageHandler) OnCron(session getty.Session) {
log.Printf("OnCron....")
}
type PackageHandler struct{}
func (h *PackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
dataLen := len(data)
if dataLen < 4 {
return nil, 0, nil
}
start := 0
pos := start + 4
pkgLen := int(binary.LittleEndian.Uint32(data[start:pos]))
if dataLen < pos+pkgLen {
return nil, pos + pkgLen, nil
}
start = pos
pos = start + pkgLen
s := string(data[start:pos])
return s, pos, nil
}
func (h *PackageHandler) Write(ss getty.Session, p interface{}) ([]byte, error) {
pkg, ok := p.(string)
if !ok {
log.Printf("illegal pkg:%+v", p)
return nil, errors.New("invalid package")
}
pkgLen := int32(len(pkg))
pkgStreams := make([]byte, 0, 4+len(pkg))
// pkg len
start := 0
pos := start + 4
binary.LittleEndian.PutUint32(pkgStreams[start:pos], uint32(pkgLen))
start = pos
// pkg
pos = start + int(pkgLen)
copy(pkgStreams[start:pos], pkg[:])
return pkgStreams[:pos], nil
}
func buildSendMsg() string {
return "Now we know what the itables look like, but where do they come from? Go's dynamic type conversions mean that it isn't reasonable for the compiler or linker to precompute all possible itables: there are too many (interface type, concrete type) pairs, and most won't be needed. Instead, the compiler generates a type description structure for each concrete type like Binary or int or func(map[int]string). Among other metadata, the type description structure contains a list of the methods implemented by that type. Similarly, the compiler generates a (different) type description structure for each interface type like Stringer; it too contains a method list. The interface runtime computes the itable by looking for each method listed in the interface type's method table in the concrete type's method table. The runtime caches the itable after generating it, so that this correspondence need only be computed once。"
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"encoding/binary"
"errors"
"flag"
"fmt"
"log"
"net"
"time"
)
import (
getty "github.com/apache/dubbo-getty"
"github.com/dubbogo/gost/sync"
)
var (
concurrency = flag.Int("c", 1, "concurrency")
total = flag.Int("n", 1, "total requests for all clients")
ip = flag.String("ip", "127.0.0.1:8090", "server IP")
connections = flag.Int("conn", 1, "number of tcp connections")
taskPoolMode = flag.Bool("taskPool", false, "task pool mode")
taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
pprofPort = flag.Int("pprof_port", 65431, "pprof http port")
)
var taskPool gxsync.GenericTaskPool
const (
CronPeriod = 20e9
WritePkgTimeout = 1e8
)
func main() {
flag.Parse()
n := *concurrency
log.Printf("Servers: %+v\n\n", *ip)
for i := 0; i < n; i++ {
go func(ii int) {
client := getty.NewTCPClient(
getty.WithServerAddress(*ip),
getty.WithConnectionNumber(*connections),
getty.WithClientTaskPool(taskPool),
)
var tmpSession getty.Session
NewHelloClientSession := func(session getty.Session) (err error) {
pkgHandler := &PackageHandler{}
EventListener := &MessageHandler{}
EventListener.SessionOnOpen = func(session getty.Session) {
tmpSession = session
}
tcpConn, ok := session.Conn().(*net.TCPConn)
if !ok {
panic(fmt.Sprintf("newSession: %s, session.conn{%#v} is not tcp connection", session.Stat(), session.Conn()))
}
if err = tcpConn.SetNoDelay(true); err != nil {
return err
}
if err = tcpConn.SetKeepAlive(true); err != nil {
return err
}
if err = tcpConn.SetKeepAlivePeriod(10 * time.Second); err != nil {
return err
}
if err = tcpConn.SetReadBuffer(262144); err != nil {
return err
}
if err = tcpConn.SetWriteBuffer(524288); err != nil {
return err
}
session.SetName("hello")
session.SetMaxMsgLen(128 * 1024) // max message package length is 128k
session.SetReadTimeout(time.Second)
session.SetWriteTimeout(5 * time.Second)
session.SetCronPeriod(int(CronPeriod / 1e6))
session.SetWaitTime(time.Second)
session.SetPkgHandler(pkgHandler)
session.SetEventListener(EventListener)
return nil
}
client.RunEventLoop(NewHelloClientSession)
for {
msg := buildSendMsg()
_, _, err := tmpSession.WritePkg(msg, WritePkgTimeout)
if err != nil {
log.Printf("Err:session.WritePkg(session{%s}, error{%v}", tmpSession.Stat(), err)
tmpSession.Close()
}
}
}(i)
}
c := make(chan int)
<-c
}
type MessageHandler struct {
SessionOnOpen func(session getty.Session)
}
func (h *MessageHandler) OnOpen(session getty.Session) error {
log.Printf("OnOpen session{%s} open", session.Stat())
if h.SessionOnOpen != nil {
h.SessionOnOpen(session)
}
return nil
}
func (h *MessageHandler) OnError(session getty.Session, err error) {
log.Printf("OnError session{%s} got error{%v}, will be closed.", session.Stat(), err)
}
func (h *MessageHandler) OnClose(session getty.Session) {
log.Printf("hhf OnClose session{%s} is closing......", session.Stat())
}
func (h *MessageHandler) OnMessage(session getty.Session, pkg interface{}) {
log.Printf("OnMessage....")
s, ok := pkg.(string)
if !ok {
log.Printf("illegal package{%#v}", pkg)
return
}
log.Printf("OnMessage: %s", s)
}
func (h *MessageHandler) OnCron(session getty.Session) {
log.Printf("OnCron....")
}
type PackageHandler struct{}
func (h *PackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
dataLen := len(data)
if dataLen < 4 {
return nil, 0, nil
}
start := 0
pos := start + 4
pkgLen := int(binary.LittleEndian.Uint32(data[start:pos]))
if dataLen < pos+pkgLen {
return nil, pos + pkgLen, nil
}
start = pos
pos = start + pkgLen
s := string(data[start:pos])
return s, pos, nil
}
func (h *PackageHandler) Write(ss getty.Session, p interface{}) ([]byte, error) {
pkg, ok := p.(string)
if !ok {
log.Printf("illegal pkg:%+v", p)
return nil, errors.New("invalid package")
}
pkgLen := int32(len(pkg))
pkgStreams := make([]byte, 0, 4+len(pkg))
// pkg len
start := 0
pos := start + 4
binary.LittleEndian.PutUint32(pkgStreams[start:pos], uint32(pkgLen))
start = pos
// pkg
pos = start + int(pkgLen)
copy(pkgStreams[start:pos], pkg[:])
return pkgStreams[:pos], nil
}
func buildSendMsg() string {
return "Now we know what the itables look like, but where do they come from? Go's dynamic type conversions mean that it isn't reasonable for the compiler or linker to precompute all possible itables: there are too many (interface type, concrete type) pairs, and most won't be needed. Instead, the compiler generates a type description structure for each concrete type like Binary or int or func(map[int]string). Among other metadata, the type description structure contains a list of the methods implemented by that type. Similarly, the compiler generates a (different) type description structure for each interface type like Stringer; it too contains a method list. The interface runtime computes the itable by looking for each method listed in the interface type's method table in the concrete type's method table. The runtime caches the itable after generating it, so that this correspondence need only be computed once"
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"encoding/binary"
"errors"
"flag"
"fmt"
"log"
"net"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"syscall"
"time"
)
import (
getty "github.com/apache/dubbo-getty"
gxsync "github.com/dubbogo/gost/sync"
)
var (
taskPoolMode = flag.Bool("taskPool", false, "task pool mode")
taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
pprofPort = flag.Int("pprof_port", 65432, "pprof http port")
)
var taskPool gxsync.GenericTaskPool
const CronPeriod = 20e9
func main() {
flag.Parse()
go func() {
http.ListenAndServe(fmt.Sprintf(":%d", *pprofPort), nil)
}()
options := []getty.ServerOption{getty.WithLocalAddress(":8090")}
if *taskPoolMode {
taskPool = gxsync.NewTaskPoolSimple(*taskPoolSize)
options = append(options, getty.WithServerTaskPool(taskPool))
}
server := getty.NewTCPServer(options...)
go server.RunEventLoop(NewHelloServerSession)
log.Printf("getty server start, listening at: 8090")
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
<-signals
server.Close()
}
func NewHelloServerSession(session getty.Session) (err error) {
pkgHandler := &PackageHandler{}
EventListener := &MessageHandler{}
tcpConn, ok := session.Conn().(*net.TCPConn)
if !ok {
panic(fmt.Sprintf("newSession: %s, session.conn{%#v} is not tcp connection", session.Stat(), session.Conn()))
}
if err = tcpConn.SetNoDelay(true); err != nil {
return err
}
if err = tcpConn.SetKeepAlive(true); err != nil {
return err
}
if err = tcpConn.SetKeepAlivePeriod(10 * time.Second); err != nil {
return err
}
if err = tcpConn.SetReadBuffer(262144); err != nil {
return err
}
if err = tcpConn.SetWriteBuffer(524288); err != nil {
return err
}
session.SetName("hello")
session.SetMaxMsgLen(128 * 1024) // max message package length is 128k
session.SetReadTimeout(time.Second)
session.SetWriteTimeout(5 * time.Second)
session.SetCronPeriod(int(CronPeriod / 1e6))
session.SetWaitTime(time.Second)
session.SetPkgHandler(pkgHandler)
session.SetEventListener(EventListener)
return nil
}
type MessageHandler struct {
SessionOnOpen func(session getty.Session)
}
func (h *MessageHandler) OnOpen(session getty.Session) error {
log.Printf("OnOpen session{%s} open", session.Stat())
if h.SessionOnOpen != nil {
h.SessionOnOpen(session)
}
return nil
}
func (h *MessageHandler) OnError(session getty.Session, err error) {
log.Printf("OnError session{%s} got error{%v}, will be closed.", session.Stat(), err)
}
func (h *MessageHandler) OnClose(session getty.Session) {
log.Printf("OnClose session{%s} is closing......", session.Stat())
}
func (h *MessageHandler) OnMessage(session getty.Session, pkg interface{}) {
log.Printf("OnMessage....")
//s, ok := pkg.(string)
//if !ok {
// log.Printf("illegal package{%#v}", pkg)
// return
//}
//log.Printf("OnMessage: %s", s)
}
func (h *MessageHandler) OnCron(session getty.Session) {
log.Printf("OnCron....")
}
type PackageHandler struct{}
func (h *PackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
dataLen := len(data)
if dataLen < 4 {
return nil, 0, nil
}
start := 0
pos := start + 4
pkgLen := int(binary.LittleEndian.Uint32(data[start:pos]))
if dataLen < pos+pkgLen {
return nil, pos + pkgLen, nil
}
start = pos
pos = start + pkgLen
s := string(data[start:pos])
return s, pos, nil
}
func (h *PackageHandler) Write(ss getty.Session, p interface{}) ([]byte, error) {
pkg, ok := p.(string)
if !ok {
log.Printf("illegal pkg:%+v", p)
return nil, errors.New("invalid package")
}
pkgLen := int32(len(pkg))
pkgStreams := make([]byte, 0, 4+len(pkg))
// pkg len
start := 0
pos := start + 4
binary.LittleEndian.PutUint32(pkgStreams[start:pos], uint32(pkgLen))
start = pos
// pkg
pos = start + int(pkgLen)
copy(pkgStreams[start:pos], pkg[:])
return pkgStreams[:pos], nil
}
......@@ -34,6 +34,7 @@ import (
"github.com/dubbogo/gost/bytes"
"github.com/dubbogo/gost/net"
gxsync "github.com/dubbogo/gost/sync"
gxtime "github.com/dubbogo/gost/time"
"github.com/gorilla/websocket"
perrors "github.com/pkg/errors"
)
......@@ -168,7 +169,7 @@ func (c *client) dialTCP() Session {
}
log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, perrors.WithStack(err))
<-wheel.After(connectInterval)
<-gxtime.After(connectInterval)
}
}
......@@ -200,7 +201,7 @@ func (c *client) dialUDP() Session {
}
if err != nil {
log.Warnf("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, perrors.WithStack(err))
<-wheel.After(connectInterval)
<-gxtime.After(connectInterval)
continue
}
......@@ -209,7 +210,7 @@ func (c *client) dialUDP() Session {
if length, err = conn.Write(connectPingPackage[:]); err != nil {
conn.Close()
log.Warnf("conn.Write(%s) = {length:%d, err:%+v}", string(connectPingPackage), length, perrors.WithStack(err))
<-wheel.After(connectInterval)
<-gxtime.After(connectInterval)
continue
}
conn.SetReadDeadline(time.Now().Add(1e9))
......@@ -220,7 +221,7 @@ func (c *client) dialUDP() Session {
if err != nil {
log.Infof("conn{%#v}.Read() = {length:%d, err:%+v}", conn, length, perrors.WithStack(err))
conn.Close()
<-wheel.After(connectInterval)
<-gxtime.After(connectInterval)
continue
}
// if err == nil {
......@@ -258,7 +259,7 @@ func (c *client) dialWS() Session {
}
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, perrors.WithStack(err))
<-wheel.After(connectInterval)
<-gxtime.After(connectInterval)
}
}
......@@ -336,7 +337,7 @@ func (c *client) dialWSS() Session {
}
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, perrors.WithStack(err))
<-wheel.After(connectInterval)
<-gxtime.After(connectInterval)
}
}
......@@ -443,7 +444,7 @@ func (c *client) reConnect() {
if maxTimes < times {
times = maxTimes
}
<-wheel.After(time.Duration(int64(times) * int64(interval)))
<-gxtime.After(time.Duration(int64(times) * int64(interval)))
}
}
......
......@@ -24,13 +24,11 @@ import (
"os"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
)
import (
perrors "github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)
......@@ -96,7 +94,6 @@ func newSessionCallback(session Session, handler *MessageHandler) error {
}
func TestTCPClient(t *testing.T) {
assert.NotNil(t, GetTimeWheel())
listenLocalServer := func() (net.Listener, error) {
listener, err := net.Listen("tcp", ":0")
if err != nil {
......@@ -135,31 +132,39 @@ func TestTCPClient(t *testing.T) {
ss.SetCompressType(CompressNone)
conn := ss.(*session).Connection.(*gettyTCPConn)
assert.True(t, conn.compress == CompressNone)
beforeWriteBytes := atomic.LoadUint32(&conn.writeBytes)
beforeWritePkgNum := atomic.LoadUint32(&conn.writePkgNum)
beforeWriteBytes := conn.writeBytes
beforeWritePkgNum := conn.writePkgNum
l, err := conn.send([]byte("hello"))
assert.Nil(t, err)
assert.True(t, l == 5)
assert.Equal(t, beforeWritePkgNum+1, atomic.LoadUint32(&conn.writePkgNum))
assert.Equal(t, beforeWriteBytes+5, atomic.LoadUint32(&conn.writeBytes))
beforeWritePkgNum.Add(1)
beforeWriteBytes.Add(5)
assert.Equal(t, beforeWritePkgNum, conn.writePkgNum)
assert.Equal(t, beforeWriteBytes, conn.writeBytes)
l, err = ss.WriteBytes([]byte("hello"))
assert.Nil(t, err)
assert.True(t, l == 5)
assert.Equal(t, beforeWriteBytes+10, atomic.LoadUint32(&conn.writeBytes))
assert.Equal(t, beforeWritePkgNum+2, atomic.LoadUint32(&conn.writePkgNum))
beforeWriteBytes.Add(5)
beforeWritePkgNum.Add(1)
assert.Equal(t, beforeWriteBytes, conn.writeBytes)
assert.Equal(t, beforeWritePkgNum, conn.writePkgNum)
var pkgs [][]byte
pkgs = append(pkgs, []byte("hello"), []byte("hello"))
l, err = conn.send(pkgs)
assert.Nil(t, err)
assert.True(t, l == 10)
assert.Equal(t, beforeWritePkgNum+4, atomic.LoadUint32(&conn.writePkgNum))
assert.Equal(t, beforeWriteBytes+20, atomic.LoadUint32(&conn.writeBytes))
beforeWritePkgNum.Add(2)
beforeWriteBytes.Add(10)
assert.Equal(t, beforeWritePkgNum, conn.writePkgNum)
assert.Equal(t, beforeWriteBytes, conn.writeBytes)
ss.SetCompressType(CompressSnappy)
l, err = ss.WriteBytesArray(pkgs...)
assert.Nil(t, err)
assert.True(t, l == 10)
assert.Equal(t, beforeWritePkgNum+6, atomic.LoadUint32(&conn.writePkgNum))
assert.Equal(t, beforeWriteBytes+30, atomic.LoadUint32(&conn.writeBytes))
beforeWritePkgNum.Add(2)
beforeWriteBytes.Add(10)
assert.Equal(t, beforeWritePkgNum, conn.writePkgNum)
assert.Equal(t, beforeWriteBytes, conn.writeBytes)
assert.True(t, conn.compress == CompressSnappy)
clt.Close()
......@@ -208,7 +213,7 @@ func TestUDPClient(t *testing.T) {
assert.True(t, sendLen == 0)
assert.True(t, totalLen == 0)
totalLen, sendLen, err = ss.WritePkg([]byte("hello"), 0)
assert.NotNil(t, perrors.Cause(err))
assert.NotNil(t, err)
assert.True(t, sendLen == 0)
assert.True(t, totalLen == 0)
l, err := ss.WriteBytes([]byte("hello"))
......@@ -236,14 +241,16 @@ func TestUDPClient(t *testing.T) {
_, err = udpConn.send(udpCtx)
assert.NotNil(t, err)
udpCtx.Pkg = []byte("hello")
beforeWriteBytes := atomic.LoadUint32(&udpConn.writeBytes)
beforeWriteBytes := udpConn.writeBytes
_, err = udpConn.send(udpCtx)
assert.Equal(t, beforeWriteBytes+5, atomic.LoadUint32(&udpConn.writeBytes))
beforeWriteBytes.Add(5)
assert.Equal(t, beforeWriteBytes, udpConn.writeBytes)
assert.Nil(t, err)
beforeWritePkgNum := atomic.LoadUint32(&udpConn.writePkgNum)
beforeWritePkgNum := udpConn.writePkgNum
totalLen, sendLen, err = ss.WritePkg(udpCtx, 0)
assert.Equal(t, beforeWritePkgNum+1, atomic.LoadUint32(&udpConn.writePkgNum))
beforeWritePkgNum.Add(1)
assert.Equal(t, beforeWritePkgNum, udpConn.writePkgNum)
assert.Nil(t, err)
assert.True(t, sendLen == 0)
assert.True(t, totalLen == 0)
......@@ -297,19 +304,22 @@ func TestNewWSClient(t *testing.T) {
l, err := conn.send("hello")
assert.NotNil(t, err)
assert.True(t, l == 0)
beforeWriteBytes := atomic.LoadUint32(&conn.writeBytes)
beforeWriteBytes := conn.writeBytes
_, err = conn.send([]byte("hello"))
assert.Nil(t, err)
assert.Equal(t, beforeWriteBytes+5, atomic.LoadUint32(&conn.writeBytes))
beforeWritePkgNum := atomic.LoadUint32(&conn.writePkgNum)
beforeWriteBytes.Add(5)
assert.Equal(t, beforeWriteBytes, conn.writeBytes)
beforeWritePkgNum := conn.writePkgNum
l, err = ss.WriteBytes([]byte("hello"))
assert.Nil(t, err)
assert.True(t, l == 5)
assert.Equal(t, beforeWritePkgNum+1, atomic.LoadUint32(&conn.writePkgNum))
beforeWritePkgNum.Add(1)
assert.Equal(t, beforeWritePkgNum, conn.writePkgNum)
l, err = ss.WriteBytesArray([]byte("hello"), []byte("hello"))
assert.Nil(t, err)
assert.True(t, l == 10)
assert.Equal(t, beforeWritePkgNum+3, atomic.LoadUint32(&conn.writePkgNum))
beforeWritePkgNum.Add(2)
assert.Equal(t, beforeWritePkgNum, conn.writePkgNum)
err = conn.writePing()
assert.Nil(t, err)
......
......@@ -24,33 +24,33 @@ import (
"io"
"net"
"sync"
"sync/atomic"
"time"
"github.com/golang/snappy"
"github.com/gorilla/websocket"
perrors "github.com/pkg/errors"
uatomic "go.uber.org/atomic"
)
var launchTime = time.Now() // ErrInvalidConnection = perrors.New("connection has been closed.")
var launchTime = time.Now()
// ///////////////////////////////////////
// getty connection
// ///////////////////////////////////////
var connID uint32
var connID uatomic.Uint32
type gettyConn struct {
id uint32
compress CompressType
padding1 uint8
padding2 uint16
readBytes uint32 // read bytes
writeBytes uint32 // write bytes
readPkgNum uint32 // send pkg number
writePkgNum uint32 // recv pkg number
active int64 // last active, in milliseconds
rTimeout time.Duration // network current limiting
readBytes uatomic.Uint32 // read bytes
writeBytes uatomic.Uint32 // write bytes
readPkgNum uatomic.Uint32 // send pkg number
writePkgNum uatomic.Uint32 // recv pkg number
active uatomic.Int64 // last active, in milliseconds
rTimeout time.Duration // network current limiting
wTimeout time.Duration
rLastDeadline int64 // lastest network read time
wLastDeadline int64 // lastest network write time
......@@ -72,19 +72,19 @@ func (c *gettyConn) RemoteAddr() string {
}
func (c *gettyConn) incReadPkgNum() {
atomic.AddUint32(&c.readPkgNum, 1)
c.readPkgNum.Add(1)
}
func (c *gettyConn) incWritePkgNum() {
atomic.AddUint32(&c.writePkgNum, 1)
c.writePkgNum.Add(1)
}
func (c *gettyConn) UpdateActive() {
atomic.StoreInt64(&(c.active), int64(time.Since(launchTime)))
c.active.Store(int64(time.Since(launchTime)))
}
func (c *gettyConn) GetActive() time.Time {
return launchTime.Add(time.Duration(atomic.LoadInt64(&(c.active))))
return launchTime.Add(time.Duration(c.active.Load()))
}
func (c *gettyConn) send(interface{}) (int, error) {
......@@ -165,7 +165,7 @@ func newGettyTCPConn(conn net.Conn) *gettyTCPConn {
reader: io.Reader(conn),
writer: io.Writer(conn),
gettyConn: gettyConn{
id: atomic.AddUint32(&connID, 1),
id: connID.Add(1),
rTimeout: netIOTimeout,
wTimeout: netIOTimeout,
local: localAddr,
......@@ -249,10 +249,8 @@ func (t *gettyTCPConn) recv(p []byte) (int, error) {
}
length, err = t.reader.Read(p)
// log.Debugf("now:%s, length:%d, err:%v", currentTime, length, err)
atomic.AddUint32(&t.readBytes, uint32(length))
t.readBytes.Add(uint32(length))
return length, perrors.WithStack(err)
// return length, err
}
// tcp connection write
......@@ -271,7 +269,7 @@ func (t *gettyTCPConn) send(pkg interface{}) (int, error) {
// of the last write deadline exceeded.
// See https://github.com/golang/go/issues/15133 for details.
currentTime = time.Now()
if currentTime.Unix() - t.wLastDeadline > int64(t.wTimeout >> 2) {
if currentTime.Unix()-t.wLastDeadline > int64(t.wTimeout>>2) {
if err = t.conn.SetWriteDeadline(currentTime.Add(t.wTimeout)); err != nil {
return 0, perrors.WithStack(err)
}
......@@ -283,8 +281,8 @@ func (t *gettyTCPConn) send(pkg interface{}) (int, error) {
netBuf := net.Buffers(buffers)
lg, err = netBuf.WriteTo(t.conn)
if err == nil {
atomic.AddUint32(&t.writeBytes, (uint32)(lg))
atomic.AddUint32(&t.writePkgNum, (uint32)(len(buffers)))
t.writeBytes.Add((uint32)(lg))
t.writePkgNum.Add((uint32)(len(buffers)))
}
log.Debugf("localAddr: %s, remoteAddr:%s, now:%s, length:%d, err:%s",
t.conn.LocalAddr(), t.conn.RemoteAddr(), currentTime, length, err)
......@@ -294,10 +292,10 @@ func (t *gettyTCPConn) send(pkg interface{}) (int, error) {
if p, ok = pkg.([]byte); ok {
length, err = t.writer.Write(p)
if err == nil {
atomic.AddUint32(&t.writeBytes, (uint32)(len(p)))
atomic.AddUint32(&t.writePkgNum, 1)
t.writeBytes.Add((uint32)(len(p)))
t.writePkgNum.Add(1)
}
log.Debugf("localAddr: %s, remoteAddr:%s, now:%s, length:%d, err:%s",
log.Debugf("localAddr: %s, remoteAddr:%s, now:%s, length:%d, err:%v",
t.conn.LocalAddr(), t.conn.RemoteAddr(), currentTime, length, err)
return length, perrors.WithStack(err)
}
......@@ -365,7 +363,7 @@ func newGettyUDPConn(conn *net.UDPConn) *gettyUDPConn {
return &gettyUDPConn{
conn: conn,
gettyConn: gettyConn{
id: atomic.AddUint32(&connID, 1),
id: connID.Add(1),
rTimeout: netIOTimeout,
wTimeout: netIOTimeout,
local: localAddr,
......@@ -387,33 +385,25 @@ func (u *gettyUDPConn) SetCompressType(c CompressType) {
// udp connection read
func (u *gettyUDPConn) recv(p []byte) (int, *net.UDPAddr, error) {
var (
err error
currentTime time.Time
length int
addr *net.UDPAddr
)
if u.rTimeout > 0 {
// Optimization: update read deadline only if more than 25%
// of the last read deadline exceeded.
// See https://github.com/golang/go/issues/15133 for details.
currentTime = time.Now()
if currentTime.Unix() - u.rLastDeadline > int64(u.rTimeout >> 2) {
if err = u.conn.SetReadDeadline(currentTime.Add(u.rTimeout)); err != nil {
currentTime := time.Now()
if currentTime.Unix()-u.rLastDeadline > int64(u.rTimeout>>2) {
if err := u.conn.SetReadDeadline(currentTime.Add(u.rTimeout)); err != nil {
return 0, nil, perrors.WithStack(err)
}
u.rLastDeadline = currentTime.Unix()
}
}
length, addr, err = u.conn.ReadFromUDP(p) // connected udp also can get return @addr
log.Debugf("ReadFromUDP() = {length:%d, peerAddr:%s, error:%v}", length, addr, err)
length, addr, err := u.conn.ReadFromUDP(p) // connected udp also can get return @addr
log.Debugf("ReadFromUDP(p:%d) = {length:%d, peerAddr:%s, error:%v}", len(p), length, addr, err)
if err == nil {
atomic.AddUint32(&u.readBytes, uint32(length))
u.readBytes.Add(uint32(length))
}
// return length, addr, err
return length, addr, perrors.WithStack(err)
}
......@@ -447,7 +437,7 @@ func (u *gettyUDPConn) send(udpCtx interface{}) (int, error) {
// of the last write deadline exceeded.
// See https://github.com/golang/go/issues/15133 for details.
currentTime = time.Now()
if currentTime.Unix() - u.wLastDeadline > int64(u.wTimeout >> 2) {
if currentTime.Unix()-u.wLastDeadline > int64(u.wTimeout>>2) {
if err = u.conn.SetWriteDeadline(currentTime.Add(u.wTimeout)); err != nil {
return 0, perrors.WithStack(err)
}
......@@ -456,8 +446,8 @@ func (u *gettyUDPConn) send(udpCtx interface{}) (int, error) {
}
if length, _, err = u.conn.WriteMsgUDP(buf, nil, peerAddr); err == nil {
atomic.AddUint32(&u.writeBytes, (uint32)(len(buf)))
atomic.AddUint32(&u.writePkgNum, 1)
u.writeBytes.Add((uint32)(len(buf)))
u.writePkgNum.Add(1)
}
log.Debugf("WriteMsgUDP(peerAddr:%s) = {length:%d, error:%v}", peerAddr, length, err)
......@@ -498,7 +488,7 @@ func newGettyWSConn(conn *websocket.Conn) *gettyWSConn {
gettyWSConn := &gettyWSConn{
conn: conn,
gettyConn: gettyConn{
id: atomic.AddUint32(&connID, 1),
id: connID.Add(1),
rTimeout: netIOTimeout,
wTimeout: netIOTimeout,
local: localAddr,
......@@ -551,7 +541,7 @@ func (w *gettyWSConn) recv() ([]byte, error) {
// gorilla/websocket/conn.go:NextReader will always fail when got a timeout error.
_, b, e := w.conn.ReadMessage() // the first return value is message type.
if e == nil {
atomic.AddUint32(&w.readBytes, (uint32)(len(b)))
w.readBytes.Add((uint32)(len(b)))
} else {
if websocket.IsUnexpectedCloseError(e, websocket.CloseGoingAway) {
log.Warnf("websocket unexpected close error: %v", e)
......@@ -559,7 +549,6 @@ func (w *gettyWSConn) recv() ([]byte, error) {
}
return b, perrors.WithStack(e)
// return b, e
}
func (w *gettyWSConn) updateWriteDeadline() error {
......@@ -573,7 +562,7 @@ func (w *gettyWSConn) updateWriteDeadline() error {
// of the last write deadline exceeded.
// See https://github.com/golang/go/issues/15133 for details.
currentTime = time.Now()
if currentTime.Unix() - w.wLastDeadline > int64(w.wTimeout >> 2) {
if currentTime.Unix()-w.wLastDeadline > int64(w.wTimeout>>2) {
if err = w.conn.SetWriteDeadline(currentTime.Add(w.wTimeout)); err != nil {
return perrors.WithStack(err)
}
......@@ -598,8 +587,8 @@ func (w *gettyWSConn) send(pkg interface{}) (int, error) {
w.updateWriteDeadline()
if err = w.conn.WriteMessage(websocket.BinaryMessage, p); err == nil {
atomic.AddUint32(&w.writeBytes, (uint32)(len(p)))
atomic.AddUint32(&w.writePkgNum, 1)
w.writeBytes.Add((uint32)(len(p)))
w.writePkgNum.Add(1)
}
return len(p), perrors.WithStack(err)
}
......
......@@ -23,11 +23,11 @@ import (
)
import (
"github.com/apache/dubbo-getty"
"github.com/dubbogo/gost/sync"
)
import (
"github.com/apache/dubbo-getty"
"github.com/apache/dubbo-getty/demo/hello"
tls "github.com/apache/dubbo-getty/demo/hello/tls"
"github.com/apache/dubbo-getty/demo/util"
......
......@@ -20,16 +20,15 @@ package main
import (
"flag"
"path/filepath"
tls "github.com/apache/dubbo-getty/demo/hello/tls"
)
import (
"github.com/apache/dubbo-getty"
gxsync "github.com/dubbogo/gost/sync"
)
import (
"github.com/apache/dubbo-getty"
tls "github.com/apache/dubbo-getty/demo/hello/tls"
"github.com/apache/dubbo-getty/demo/util"
)
......
......@@ -3,10 +3,12 @@ module github.com/apache/dubbo-getty
go 1.14
require (
github.com/dubbogo/gost v1.10.1
github.com/dubbogo/gost v1.11.12
github.com/golang/snappy v0.0.1
github.com/gorilla/websocket v1.4.2
github.com/montanaflynn/stats v0.6.6
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.6.1
go.uber.org/zap v1.15.0
github.com/stretchr/testify v1.7.0
go.uber.org/atomic v1.7.0
go.uber.org/zap v1.16.0
)
This diff is collapsed.
......@@ -27,21 +27,23 @@ import (
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
)
import (
gxnet "github.com/dubbogo/gost/net"
gxsync "github.com/dubbogo/gost/sync"
gxtime "github.com/dubbogo/gost/time"
"github.com/gorilla/websocket"
perrors "github.com/pkg/errors"
uatomic "go.uber.org/atomic"
)
var (
errSelfConnect = perrors.New("connect self!")
serverFastFailTimeout = time.Second * 1
serverID = EndPointID(0)
serverID uatomic.Int32
)
type server struct {
......@@ -69,7 +71,7 @@ func (s *server) init(opts ...ServerOption) {
func newServer(t EndPointType, opts ...ServerOption) *server {
s := &server{
endPointID: atomic.AddInt32(&serverID, 1),
endPointID: serverID.Add(1),
endPointType: t,
done: make(chan struct{}),
}
......@@ -264,11 +266,11 @@ func (s *server) runTcpEventLoop(newSession NewSessionCallback) {
)
for {
if s.IsClosed() {
log.Warnf("server{%s} stop accepting client connect request.", s.addr)
log.Infof("server{%s} stop accepting client connect request.", s.addr)
return
}
if delay != 0 {
<-wheel.After(delay)
<-gxtime.After(delay)
}
client, err = s.accept(newSession)
if err != nil {
......
......@@ -25,7 +25,6 @@ import (
"net"
"runtime"
"sync"
"sync/atomic"
"time"
)
......@@ -36,6 +35,7 @@ import (
"github.com/gorilla/websocket"
perrors "github.com/pkg/errors"
uatomic "go.uber.org/atomic"
)
const (
......@@ -58,16 +58,11 @@ const (
// session
/////////////////////////////////////////
var wheel *gxtime.Wheel
var defaultTimerWheel *gxtime.TimerWheel
func init() {
span := 100e6 // 100ms
buckets := MaxWheelTimeSpan / span
wheel = gxtime.NewWheel(time.Duration(span), int(buckets)) // wheel longest span is 15 minute
}
func GetTimeWheel() *gxtime.Wheel {
return wheel
gxtime.InitDefaultTimerWheel()
defaultTimerWheel = gxtime.GetDefaultTimerWheel()
}
// getty base session
......@@ -99,9 +94,7 @@ type session struct {
attrs *gxcontext.ValuesContext
// goroutines sync
grNum int32
// read goroutines done signal
rDone chan struct{}
grNum uatomic.Int32
lock sync.RWMutex
}
......@@ -120,7 +113,6 @@ func newSession(endPoint EndPoint, conn Connection) *session {
done: make(chan struct{}),
wait: pendingDuration,
attrs: gxcontext.NewValuesContext(context.Background()),
rDone: make(chan struct{}),
}
ss.Connection.setSession(ss)
......@@ -162,7 +154,6 @@ func (s *session) Reset() {
period: period,
wait: pendingDuration,
attrs: gxcontext.NewValuesContext(context.Background()),
rDone: make(chan struct{}),
}
}
......@@ -212,10 +203,10 @@ func (s *session) Stat() string {
return fmt.Sprintf(
outputFormat,
s.sessionToken(),
atomic.LoadUint32(&(conn.readBytes)),
atomic.LoadUint32(&(conn.writeBytes)),
atomic.LoadUint32(&(conn.readPkgNum)),
atomic.LoadUint32(&(conn.writePkgNum)),
conn.readBytes.Load(),
conn.writeBytes.Load(),
conn.readPkgNum.Load(),
conn.writePkgNum.Load(),
)
}
......@@ -460,6 +451,33 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) (int, error) {
return wlg, nil
}
func heartbeat(_ gxtime.TimerID, _ time.Time, arg interface{}) error {
ss, _ := arg.(*session)
if ss == nil || ss.IsClosed() {
return ErrSessionClosed
}
f := func() {
wsConn, wsFlag := ss.Connection.(*gettyWSConn)
if wsFlag {
err := wsConn.writePing()
if err != nil {
log.Warnf("wsConn.writePing() = error:%+v", perrors.WithStack(err))
}
}
ss.listener.OnCron(ss)
}
// if enable task pool, run @f asynchronously.
if taskPool := ss.EndPoint().GetTaskPool(); taskPool != nil {
taskPool.AddTaskAlways(f)
return nil
}
f()
return nil
}
// func (s *session) RunEventLoop() {
func (s *session) run() {
if s.Connection == nil || s.listener == nil || s.writer == nil {
......@@ -477,56 +495,13 @@ func (s *session) run() {
return
}
// start read/write gr
atomic.AddInt32(&(s.grNum), 2)
go s.handleLoop()
go s.handlePackage()
}
func (s *session) handleLoop() {
var (
wsFlag bool
wsConn *gettyWSConn
counter gxtime.CountWatch
)
defer func() {
if r := recover(); r != nil {
const size = 64 << 10
rBuf := make([]byte, size)
rBuf = rBuf[:runtime.Stack(rBuf, false)]
log.Errorf("[session.handleLoop] panic session %s: err=%s\n%s", s.sessionToken(), r, rBuf)
}
grNum := atomic.AddInt32(&(s.grNum), -1)
s.listener.OnClose(s)
log.Infof("%s, [session.handleLoop] goroutine exit now, left gr num %d", s.Stat(), grNum)
s.gc()
}()
wsConn, wsFlag = s.Connection.(*gettyWSConn)
LOOP:
for {
select {
case <-s.done:
// this case branch assure the (session)handleLoop gr will exit after (session)handlePackage gr.
<-s.rDone
counter.Start()
if counter.Count() > s.wait.Nanoseconds() {
log.Infof("%s, [session.handleLoop] got done signal ", s.Stat())
break LOOP
}
case <-wheel.After(s.period):
if wsFlag {
err := wsConn.writePing()
if err != nil {
log.Warnf("wsConn.writePing() = error:%+v", perrors.WithStack(err))
}
}
s.listener.OnCron(s)
}
if _, err := defaultTimerWheel.AddTimer(heartbeat, gxtime.TimerLoop, s.period, s); err != nil {
panic(fmt.Sprintf("failed to add session %s to defaultTimerWheel err:%v", s.Stat(), err))
}
s.grNum.Add(1)
// start read gr
go s.handlePackage()
}
func (s *session) addTask(pkg interface{}) {
......@@ -551,9 +526,7 @@ func (s *session) handlePackage() {
rBuf = rBuf[:runtime.Stack(rBuf, false)]
log.Errorf("[session.handlePackage] panic session %s: err=%s\n%s", s.sessionToken(), r, rBuf)
}
close(s.rDone)
grNum := atomic.AddInt32(&(s.grNum), -1)
grNum := s.grNum.Add(-1)
log.Infof("%s, [session.handlePackage] gr will exit now, left gr num %d", s.sessionToken(), grNum)
s.stop()
if err != nil {
......@@ -562,6 +535,9 @@ func (s *session) handlePackage() {
s.listener.OnError(s, err)
}
}
s.listener.OnClose(s)
s.gc()
}()
if _, ok := s.Connection.(*gettyTCPConn); ok {
......@@ -628,7 +604,7 @@ func (s *session) handleTCPPackage() error {
break
}
if perrors.Cause(err) == io.EOF {
log.Infof("%s, [session.conn.read] = error:%+v", s.sessionToken(), perrors.WithStack(err))
log.Infof("%s, session.conn read EOF, client send over, session exit", s.sessionToken())
err = nil
exit = true
break
......@@ -700,8 +676,8 @@ func (s *session) handleUDPPackage() error {
if int(s.maxMsgLen<<1) < bufLen {
maxBufLen = int(s.maxMsgLen << 1)
}
bufp = gxbytes.GetBytes(maxBufLen)
defer gxbytes.PutBytes(bufp)
bufp = gxbytes.AcquireBytes(maxBufLen)
defer gxbytes.ReleaseBytes(bufp)
buf = *bufp
for {
if s.IsClosed() {
......@@ -843,6 +819,5 @@ func (s *session) gc() {
// or (session)handleLoop automatically. It's thread safe.
func (s *session) Close() {
s.stop()
log.Infof("%s closed now. its current gr num is %d",
s.sessionToken(), atomic.LoadInt32(&(s.grNum)))
log.Infof("%s closed now. its current gr num is %d", s.sessionToken(), s.grNum.Load())
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment