Commit f40d3daf authored by AlexStocks's avatar AlexStocks

Imp: use bytes pool in session.go;

Fix: taskPoll -> taskPool;
Add: http pprof
parent ded69492
......@@ -23,6 +23,7 @@ import (
)
import (
gxbytes "github.com/divebomb/gost/bytes"
"github.com/gorilla/websocket"
perrors "github.com/pkg/errors"
)
......@@ -163,10 +164,14 @@ func (c *client) dialUDP() Session {
localAddr *net.UDPAddr
peerAddr *net.UDPAddr
length int
bufp *[]byte
buf []byte
)
buf = make([]byte, 128)
// buf = make([]byte, 128)
bufp = gxbytes.GetBytes(128)
defer gxbytes.PutBytes(bufp)
buf = *bufp
localAddr = &net.UDPAddr{IP: net.IPv4zero, Port: 0}
peerAddr, _ = net.ResolveUDPAddr("udp", c.addr)
for {
......
# Run Hello Demo
## 1. prepare
## 1. prepare
```bash
git clone https://github.com/dubbogo/getty.git
......@@ -10,19 +10,30 @@ cd getty/demo/hello
## 2. run server
run server:
run server:
`go run tcp/server/server.go`
Or run server in task pool mode:
```bash
go run tcp/server/server.go -taskPool=true \
-task_queue_length=100 \
-task_queue_number=4 \
-task_pool_size=2000
-task_queue_length=128 \
-task_queue_number=16 \
-task_pool_size=2000 \
-pprof_port=60000
```
## 3. run client
```bash
go run tcp/client/client.go
```
\ No newline at end of file
```
Or run client in task pool mode:
```bash
go run tcp/client/client.go -taskPool=true \
-task_queue_length=100 \
-task_queue_number=4 \
-task_pool_size=50 \
-pprof_port=60001
```
......@@ -13,6 +13,7 @@ import (
import (
"github.com/dubbogo/getty"
"github.com/dubbogo/gost/sync"
)
import (
......@@ -24,12 +25,33 @@ import (
var (
ip = flag.String("ip", "127.0.0.1", "server IP")
connections = flag.Int("conn", 1, "number of tcp connections")
taskPoolMode = flag.Bool("taskPool", false, "task pool mode")
taskPoolQueueLength = flag.Int("task_queue_length", 100, "task queue length")
taskPoolQueueNumber = flag.Int("task_queue_number", 4, "task queue number")
taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
pprofPort = flag.Int("pprof_port", 65431, "pprof http port")
)
var (
taskPool *gxsync.TaskPool
)
func main() {
flag.Parse()
util.SetLimit()
util.Profiling(*pprofPort)
if *taskPoolMode {
taskPool = gxsync.NewTaskPool(
gxsync.WithTaskPoolTaskQueueLength(*taskPoolQueueLength),
gxsync.WithTaskPoolTaskQueueNumber(*taskPoolQueueNumber),
gxsync.WithTaskPoolTaskPoolSize(*taskPoolSize),
)
}
client := getty.NewTCPClient(
getty.WithServerAddress(*ip+":8090"),
getty.WithConnectionNumber(*connections),
......@@ -41,3 +63,4 @@ func main() {
util.WaitCloseSignals(client)
}
......@@ -26,11 +26,16 @@ var (
eventListener = &hello.MessageHandler{}
)
func NewHelloClientSession(session getty.Session) (err error) {
func NewHelloClientSession(session getty.Session, taskPool *gxsync.TaskPool) (err error) {
eventListener.SessionOnOpen = func(session getty.Session) {
hello.Sessions = append(hello.Sessions, session)
}
return InitialSession(session)
err = InitialSession(session)
if err != nil {
return
}
session.SetTaskPool(taskPool)
return
}
func InitialSession(session getty.Session) (err error) {
......
......@@ -22,14 +22,15 @@ import (
)
var (
taskPollMode = flag.Bool("taskPool", false, "task pool mode")
taskPollQueueLength = flag.Int("task_queue_length", 100, "task queue length")
taskPollQueueNumber = flag.Int("task_queue_number", 4, "task queue number")
taskPollSize = flag.Int("task_pool_size", 2000, "task poll size")
taskPoolMode = flag.Bool("taskPool", false, "task pool mode")
taskPoolQueueLength = flag.Int("task_queue_length", 100, "task queue length")
taskPoolQueueNumber = flag.Int("task_queue_number", 4, "task queue number")
taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
pprofPort = flag.Int("pprof_port", 65432, "pprof http port")
)
var (
taskPoll *gxsync.TaskPool
taskPool *gxsync.TaskPool
)
func main() {
......@@ -37,13 +38,15 @@ func main() {
util.SetLimit()
util.Profiling(*pprofPort)
options := []getty.ServerOption{getty.WithLocalAddress(":8090")}
if *taskPollMode {
taskPoll = gxsync.NewTaskPool(
gxsync.WithTaskPoolTaskQueueLength(*taskPollQueueLength),
gxsync.WithTaskPoolTaskQueueNumber(*taskPollQueueNumber),
gxsync.WithTaskPoolTaskPoolSize(*taskPollSize),
if *taskPoolMode {
taskPool = gxsync.NewTaskPool(
gxsync.WithTaskPoolTaskQueueLength(*taskPoolQueueLength),
gxsync.WithTaskPoolTaskQueueNumber(*taskPoolQueueNumber),
gxsync.WithTaskPoolTaskPoolSize(*taskPoolSize),
)
}
......@@ -59,6 +62,6 @@ func NewHelloServerSession(session getty.Session) (err error) {
if err != nil {
return
}
session.SetTaskPool(taskPoll)
session.SetTaskPool(taskPool)
return
}
/******************************************************
# MAINTAINER : Alex Stocks
# LICENCE : Apache License 2.0
# EMAIL : alexstocks@foxmail.com
# MOD : 2019-07-25
******************************************************/
package util
import (
"fmt"
"net/http"
_ "net/http/pprof"
)
func Profiling(port int) {
go func() {
http.ListenAndServe(fmt.Sprintf(":%d", port), nil)
}()
}
......@@ -20,6 +20,7 @@ import (
)
import (
gxbytes "github.com/dubbogo/gost/bytes"
gxsync "github.com/dubbogo/gost/sync"
gxtime "github.com/dubbogo/gost/time"
"github.com/gorilla/websocket"
......@@ -413,16 +414,20 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error {
var (
l int
err error
length uint32
length int
arrp *[]byte
arr []byte
)
length = 0
length = 64
for i := 0; i < len(pkgs); i++ {
length += uint32(len(pkgs[i]))
length += len(pkgs[i])
}
// merge the pkgs
arr = make([]byte, length)
// arr = make([]byte, length)
arrp = gxbytes.GetBytes(length)
defer gxbytes.PutBytes(arrp)
arr = *arrp
l = 0
for i := 0; i < len(pkgs); i++ {
copy(arr[l:], pkgs[i])
......@@ -608,12 +613,16 @@ func (s *session) handleTCPPackage() error {
exit bool
bufLen int
pkgLen int
bufp *[]byte
buf []byte
pktBuf *bytes.Buffer
pkg interface{}
)
buf = make([]byte, maxReadBufLen)
// buf = make([]byte, maxReadBufLen)
bufp = gxbytes.GetBytes(maxReadBufLen)
defer gxbytes.PutBytes(bufp)
buf = *bufp
pktBuf = new(bytes.Buffer)
conn = s.Connection.(*gettyTCPConn)
for {
......@@ -689,6 +698,7 @@ func (s *session) handleUDPPackage() error {
conn *gettyUDPConn
bufLen int
maxBufLen int
bufp *[]byte
buf []byte
addr *net.UDPAddr
pkgLen int
......@@ -700,7 +710,9 @@ func (s *session) handleUDPPackage() error {
if int(s.maxMsgLen<<1) < bufLen {
maxBufLen = int(s.maxMsgLen << 1)
}
buf = make([]byte, maxBufLen)
bufp = gxbytes.GetBytes(maxBufLen) //make([]byte, maxBufLen)
defer gxbytes.PutBytes(bufp)
buf = *bufp
for {
if s.IsClosed() {
break
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment