Commit c415ef8c authored by georgehao's avatar georgehao

feat: 添加压测程序

parent 1fae9a57
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"encoding/binary"
"errors"
"flag"
"fmt"
"log"
"net"
"sync"
"sync/atomic"
"time"
)
import (
"github.com/apache/dubbo-getty"
"github.com/dubbogo/gost/sync"
"github.com/montanaflynn/stats"
)
var (
concurrency = flag.Int("c", 1, "concurrency")
total = flag.Int("n", 1, "total requests for all clients")
ip = flag.String("ip", "127.0.0.1:8090", "server IP")
connections = flag.Int("conn", 1, "number of tcp connections")
taskPoolMode = flag.Bool("taskPool", false, "task pool mode")
taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
pprofPort = flag.Int("pprof_port", 65431, "pprof http port")
)
var taskPool gxsync.GenericTaskPool
var Session getty.Session
const CronPeriod = 20e9
const WritePkgTimeout = 1e8
func main() {
flag.Parse()
n := *concurrency
m := *total / n
log.Printf("Servers: %+v\n\n", *ip)
log.Printf("concurrency: %d\nrequests per client: %d\n\n", n, m)
var wg sync.WaitGroup
wg.Add(n * m)
d := make([][]int64, n, n)
var trans uint64
var transOK uint64
totalT := time.Now().UnixNano()
for i := 0; i < n; i++ {
dt := make([]int64, 0, m)
d = append(d, dt)
go func(ii int) {
client := getty.NewTCPClient(
getty.WithServerAddress(*ip),
getty.WithConnectionNumber(*connections),
getty.WithClientTaskPool(taskPool),
)
client.RunEventLoop(NewHelloClientSession)
for j := 0; j < m; j++ {
atomic.AddUint64(&trans, 1)
t := time.Now().UnixNano()
msg := buildSendMsg()
_, _, err := Session.WritePkg(msg, WritePkgTimeout)
if err != nil {
log.Printf("Err:session.WritePkg(session{%s}, error{%v}", Session.Stat(), err)
Session.Close()
}
atomic.AddUint64(&transOK, 1)
t = time.Now().UnixNano() - t
d[ii] = append(d[ii], t)
wg.Done()
}
client.Close()
}(i)
}
wg.Wait()
totalT = time.Now().UnixNano() - totalT
totalT = totalT / 1000000
log.Printf("took %d ms for %d requests", totalT, n*m)
totalD := make([]int64, 0, n*m)
for _, k := range d {
totalD = append(totalD, k...)
}
totalD2 := make([]float64, 0, n*m)
for _, k := range totalD {
totalD2 = append(totalD2, float64(k))
}
mean, _ := stats.Mean(totalD2)
median, _ := stats.Median(totalD2)
max, _ := stats.Max(totalD2)
min, _ := stats.Min(totalD2)
p99, _ := stats.Percentile(totalD2, 99.9)
log.Printf("sent requests : %d\n", n*m)
log.Printf("received requests : %d\n", atomic.LoadUint64(&trans))
log.Printf("received requests_OK : %d\n", atomic.LoadUint64(&transOK))
log.Printf("throughput (TPS) : %d\n", int64(n*m)*1000/totalT)
log.Printf("mean: %.f ns, median: %.f ns, max: %.f ns, min: %.f ns, p99: %.f ns\n", mean, median, max, min, p99)
log.Printf("mean: %d ms, median: %d ms, max: %d ms, min: %d ms, p99: %d ms\n", int64(mean/1000000), int64(median/1000000), int64(max/1000000), int64(min/1000000), int64(p99/1000000))
}
// NewHelloClientSession use for init client session
func NewHelloClientSession(session getty.Session) (err error) {
var pkgHandler = &PackageHandler{}
var EventListener = &MessageHandler{}
EventListener.SessionOnOpen = func(session getty.Session) {
Session = session
}
tcpConn, ok := session.Conn().(*net.TCPConn)
if !ok {
panic(fmt.Sprintf("newSession: %s, session.conn{%#v} is not tcp connection", session.Stat(), session.Conn()))
}
if err = tcpConn.SetNoDelay(true); err != nil {
return err
}
if err = tcpConn.SetKeepAlive(true); err != nil {
return err
}
if err = tcpConn.SetKeepAlivePeriod(10 * time.Second); err != nil {
return err
}
if err = tcpConn.SetReadBuffer(262144); err != nil {
return err
}
if err = tcpConn.SetWriteBuffer(524288); err != nil {
return err
}
session.SetName("hello")
session.SetMaxMsgLen(128 * 1024) // max message package length is 128k
session.SetReadTimeout(time.Second)
session.SetWriteTimeout(5 * time.Second)
session.SetCronPeriod(int(CronPeriod / 1e6))
session.SetWaitTime(time.Second)
session.SetPkgHandler(pkgHandler)
session.SetEventListener(EventListener)
return nil
}
type MessageHandler struct {
SessionOnOpen func(session getty.Session)
}
func (h *MessageHandler) OnOpen(session getty.Session) error {
log.Printf("OnOpen session{%s} open", session.Stat())
if h.SessionOnOpen != nil {
h.SessionOnOpen(session)
}
return nil
}
func (h *MessageHandler) OnError(session getty.Session, err error) {
log.Printf("OnError session{%s} got error{%v}, will be closed.", session.Stat(), err)
}
func (h *MessageHandler) OnClose(session getty.Session) {
log.Printf("hhf OnClose session{%s} is closing......", session.Stat())
}
func (h *MessageHandler) OnMessage(session getty.Session, pkg interface{}) {
log.Printf("OnMessage....")
s, ok := pkg.(string)
if !ok {
log.Printf("illegal package{%#v}", pkg)
return
}
log.Printf("OnMessage: %s", s)
}
func (h *MessageHandler) OnCron(session getty.Session) {
log.Printf("OnCron....")
}
type PackageHandler struct{}
func (h *PackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
dataLen := len(data)
if dataLen < 4 {
return nil, 0, nil
}
start := 0
pos := start + 4
pkgLen := int(binary.LittleEndian.Uint32(data[start:pos]))
if dataLen < pos+pkgLen {
return nil, pos + pkgLen, nil
}
start = pos
pos = start + pkgLen
s := string(data[start:pos])
return s, pos, nil
}
func (h *PackageHandler) Write(ss getty.Session, p interface{}) ([]byte, error) {
pkg, ok := p.(string)
if !ok {
log.Printf("illegal pkg:%+v", p)
return nil, errors.New("invalid package")
}
pkgLen := int32(len(pkg))
pkgStreams := make([]byte, 0, 4+len(pkg))
// pkg len
start := 0
pos := start + 4
binary.LittleEndian.PutUint32(pkgStreams[start:pos], uint32(pkgLen))
start = pos
// pkg
pos = start + int(pkgLen)
copy(pkgStreams[start:pos], pkg[:])
return pkgStreams[:pos], nil
}
func buildSendMsg() string {
return "如果扫描程序匹配了一行文本并且没有遇到错误,则 sc.Scan() 方法返回 true 。因此,只有当扫描仪的缓冲区中有一行文本时,才会调用 for 循环的主体。这意味着我们修改后的 CountLines 正确处理没有换行符的情况,并且还处理文件为空的情况。"
}
...@@ -33,7 +33,9 @@ import ( ...@@ -33,7 +33,9 @@ import (
) )
var ( var (
ip = flag.String("ip", "127.0.0.1", "server IP") concurrency = flag.Int("c", 1, "concurrency")
total = flag.Int("n", 1, "total requests for all clients")
ip = flag.String("ip", "127.0.0.1:8090", "server IP")
connections = flag.Int("conn", 1, "number of tcp connections") connections = flag.Int("conn", 1, "number of tcp connections")
taskPoolMode = flag.Bool("taskPool", false, "task pool mode") taskPoolMode = flag.Bool("taskPool", false, "task pool mode")
...@@ -41,26 +43,43 @@ var ( ...@@ -41,26 +43,43 @@ var (
pprofPort = flag.Int("pprof_port", 65431, "pprof http port") pprofPort = flag.Int("pprof_port", 65431, "pprof http port")
) )
var ( var taskPool gxsync.GenericTaskPool
taskPool gxsync.GenericTaskPool var Session getty.Session
)
const CronPeriod = time.Second const CronPeriod = 20e9
const WritePkgTimeout = 1e8 const WritePkgTimeout = 1e8
func main() { func main() {
flag.Parse() flag.Parse()
client := getty.NewTCPClient( n := *concurrency
getty.WithServerAddress(*ip+":8090"),
getty.WithConnectionNumber(*connections), log.Printf("Servers: %+v\n\n", *ip)
getty.WithClientTaskPool(taskPool), for i := 0; i < n; i++ {
) go func(ii int) {
client := getty.NewTCPClient(
client.RunEventLoop(NewHelloClientSession) getty.WithServerAddress(*ip),
getty.WithConnectionNumber(*connections),
getty.WithClientTaskPool(taskPool),
)
client.RunEventLoop(NewHelloClientSession)
for {
msg := buildSendMsg()
_, _, err := Session.WritePkg(msg, WritePkgTimeout)
if err != nil {
log.Printf("Err:session.WritePkg(session{%s}, error{%v}", Session.Stat(), err)
Session.Close()
}
}
client.Close()
}(i)
}
ClientRequest() c := make(chan int)
client.Close() <-c
} }
// NewHelloClientSession use for init client session // NewHelloClientSession use for init client session
...@@ -69,7 +88,7 @@ func NewHelloClientSession(session getty.Session) (err error) { ...@@ -69,7 +88,7 @@ func NewHelloClientSession(session getty.Session) (err error) {
var EventListener = &MessageHandler{} var EventListener = &MessageHandler{}
EventListener.SessionOnOpen = func(session getty.Session) { EventListener.SessionOnOpen = func(session getty.Session) {
Sessions = append(Sessions, session) Session = session
} }
tcpConn, ok := session.Conn().(*net.TCPConn) tcpConn, ok := session.Conn().(*net.TCPConn)
...@@ -122,7 +141,7 @@ func (h *MessageHandler) OnError(session getty.Session, err error) { ...@@ -122,7 +141,7 @@ func (h *MessageHandler) OnError(session getty.Session, err error) {
} }
func (h *MessageHandler) OnClose(session getty.Session) { func (h *MessageHandler) OnClose(session getty.Session) {
log.Printf("OnClose session{%s} is closing......", session.Stat()) log.Printf("hhf OnClose session{%s} is closing......", session.Stat())
} }
func (h *MessageHandler) OnMessage(session getty.Session, pkg interface{}) { func (h *MessageHandler) OnMessage(session getty.Session, pkg interface{}) {
...@@ -137,11 +156,6 @@ func (h *MessageHandler) OnMessage(session getty.Session, pkg interface{}) { ...@@ -137,11 +156,6 @@ func (h *MessageHandler) OnMessage(session getty.Session, pkg interface{}) {
func (h *MessageHandler) OnCron(session getty.Session) { func (h *MessageHandler) OnCron(session getty.Session) {
log.Printf("OnCron....") log.Printf("OnCron....")
active := session.GetActive()
if CronPeriod < time.Since(active) {
log.Printf("OnCorn session{%s} timeout{%s}", session.Stat(), time.Since(active).String())
session.Close()
}
} }
type PackageHandler struct{} type PackageHandler struct{}
...@@ -189,18 +203,6 @@ func (h *PackageHandler) Write(ss getty.Session, p interface{}) ([]byte, error) ...@@ -189,18 +203,6 @@ func (h *PackageHandler) Write(ss getty.Session, p interface{}) ([]byte, error)
return pkgStreams[:pos], nil return pkgStreams[:pos], nil
} }
var ( func buildSendMsg() string {
Sessions []getty.Session return "如果扫描程序匹配了一行文本并且没有遇到错误,则 sc.Scan() 方法返回 true 。因此,只有当扫描仪的缓冲区中有一行文本时,才会调用 for 循环的主体。这意味着我们修改后的 CountLines 正确处理没有换行符的情况,并且还处理文件为空的情况。"
)
func ClientRequest() {
for _, session := range Sessions {
ss := session
_, _, err := ss.WritePkg("hello", WritePkgTimeout)
if err != nil {
log.Printf("session.WritePkg(session{%s}, error{%v}", ss.Stat(), err)
ss.Close()
}
}
} }
...@@ -25,6 +25,7 @@ import ( ...@@ -25,6 +25,7 @@ import (
"log" "log"
"net" "net"
"net/http" "net/http"
_ "net/http/pprof"
"os" "os"
"os/signal" "os/signal"
"syscall" "syscall"
...@@ -46,7 +47,7 @@ var ( ...@@ -46,7 +47,7 @@ var (
taskPool gxsync.GenericTaskPool taskPool gxsync.GenericTaskPool
) )
const CronPeriod = time.Second const CronPeriod = 20e9
func main() { func main() {
flag.Parse() flag.Parse()
...@@ -132,21 +133,16 @@ func (h *MessageHandler) OnClose(session getty.Session) { ...@@ -132,21 +133,16 @@ func (h *MessageHandler) OnClose(session getty.Session) {
func (h *MessageHandler) OnMessage(session getty.Session, pkg interface{}) { func (h *MessageHandler) OnMessage(session getty.Session, pkg interface{}) {
log.Printf("OnMessage....") log.Printf("OnMessage....")
s, ok := pkg.(string) //s, ok := pkg.(string)
if !ok { //if !ok {
log.Printf("illegal package{%#v}", pkg) // log.Printf("illegal package{%#v}", pkg)
return // return
} //}
log.Printf("OnMessage: %s", s) //log.Printf("OnMessage: %s", s)
} }
func (h *MessageHandler) OnCron(session getty.Session) { func (h *MessageHandler) OnCron(session getty.Session) {
log.Printf("OnCron....") log.Printf("OnCron....")
active := session.GetActive()
if CronPeriod < time.Since(active) {
log.Printf("OnCorn session{%s} timeout{%s}", session.Stat(), time.Since(active).String())
session.Close()
}
} }
type PackageHandler struct{} type PackageHandler struct{}
......
...@@ -301,7 +301,7 @@ func (t *gettyTCPConn) send(pkg interface{}) (int, error) { ...@@ -301,7 +301,7 @@ func (t *gettyTCPConn) send(pkg interface{}) (int, error) {
t.writeBytes.Add((uint32)(len(p))) t.writeBytes.Add((uint32)(len(p)))
t.writePkgNum.Add(1) t.writePkgNum.Add(1)
} }
log.Debugf("localAddr: %s, remoteAddr:%s, now:%s, length:%d, err:%s", log.Debugf("localAddr: %s, remoteAddr:%s, now:%s, length:%d, err:%v",
t.conn.LocalAddr(), t.conn.RemoteAddr(), currentTime, length, err) t.conn.LocalAddr(), t.conn.RemoteAddr(), currentTime, length, err)
return length, perrors.WithStack(err) return length, perrors.WithStack(err)
} }
......
...@@ -8,6 +8,7 @@ require ( ...@@ -8,6 +8,7 @@ require (
github.com/dubbogo/gost v1.11.0 github.com/dubbogo/gost v1.11.0
github.com/golang/snappy v0.0.1 github.com/golang/snappy v0.0.1
github.com/gorilla/websocket v1.4.2 github.com/gorilla/websocket v1.4.2
github.com/montanaflynn/stats v0.6.6
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.7.0 github.com/stretchr/testify v1.7.0
go.uber.org/atomic v1.7.0 go.uber.org/atomic v1.7.0
......
...@@ -263,6 +263,8 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJ ...@@ -263,6 +263,8 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJ
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/montanaflynn/stats v0.6.6 h1:Duep6KMIDpY4Yo11iFsvyqJDyfzLF9+sndUKT+v64GQ=
github.com/montanaflynn/stats v0.6.6/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nacos-group/nacos-sdk-go v1.0.8/go.mod h1:hlAPn3UdzlxIlSILAyOXKxjFSvDJ9oLzTJ9hLAK1KzA= github.com/nacos-group/nacos-sdk-go v1.0.8/go.mod h1:hlAPn3UdzlxIlSILAyOXKxjFSvDJ9oLzTJ9hLAK1KzA=
......
...@@ -266,7 +266,7 @@ func (s *server) runTcpEventLoop(newSession NewSessionCallback) { ...@@ -266,7 +266,7 @@ func (s *server) runTcpEventLoop(newSession NewSessionCallback) {
) )
for { for {
if s.IsClosed() { if s.IsClosed() {
log.Warnf("server{%s} stop accepting client connect request.", s.addr) log.Infof("server{%s} stop accepting client connect request.", s.addr)
return return
} }
if delay != 0 { if delay != 0 {
......
...@@ -476,7 +476,7 @@ func sessionTimerLoop(_ gxtime.TimerID, _ time.Time, arg interface{}) error { ...@@ -476,7 +476,7 @@ func sessionTimerLoop(_ gxtime.TimerID, _ time.Time, arg interface{}) error {
taskPool.AddTaskAlways(f) taskPool.AddTaskAlways(f)
return nil return nil
} }
println("timerHHHF ...")
f() f()
return nil return nil
} }
...@@ -499,9 +499,8 @@ func (s *session) run() { ...@@ -499,9 +499,8 @@ func (s *session) run() {
} }
s.grNum.Add(1) s.grNum.Add(1)
println(1111)
if _, err := defaultTimerWheel.AddTimer(sessionTimerLoop, gxtime.TimerLoop, s.period, s); err != nil { if _, err := defaultTimerWheel.AddTimer(sessionTimerLoop, gxtime.TimerLoop, s.period, s); err != nil {
panic(fmt.Sprintf("failed to add session %s to defaultTimerWheel", s.Stat())) panic(fmt.Sprintf("failed to add session %s to defaultTimerWheel err:%v", s.Stat(), err))
} }
// start read gr // start read gr
go s.handlePackage() go s.handlePackage()
...@@ -609,7 +608,7 @@ func (s *session) handleTCPPackage() error { ...@@ -609,7 +608,7 @@ func (s *session) handleTCPPackage() error {
break break
} }
if perrors.Cause(err) == io.EOF { if perrors.Cause(err) == io.EOF {
log.Infof("%s, [session.conn.read] = error:%+v", s.sessionToken(), perrors.WithStack(err)) log.Infof("%s, session.conn read EOF, client send over, session exit", s.sessionToken())
err = nil err = nil
exit = true exit = true
break break
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment