Unverified Commit 293a60d4 authored by 望哥's avatar 望哥 Committed by GitHub

Merge pull request #22 from divebomb/master

Imp: use bytes pool in session.go
parents ded69492 1d7c20c9
...@@ -23,6 +23,7 @@ import ( ...@@ -23,6 +23,7 @@ import (
) )
import ( import (
"github.com/dubbogo/gost/bytes"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
perrors "github.com/pkg/errors" perrors "github.com/pkg/errors"
) )
...@@ -163,10 +164,14 @@ func (c *client) dialUDP() Session { ...@@ -163,10 +164,14 @@ func (c *client) dialUDP() Session {
localAddr *net.UDPAddr localAddr *net.UDPAddr
peerAddr *net.UDPAddr peerAddr *net.UDPAddr
length int length int
bufp *[]byte
buf []byte buf []byte
) )
buf = make([]byte, 128) // buf = make([]byte, 128)
bufp = gxbytes.GetBytes(128)
defer gxbytes.PutBytes(bufp)
buf = *bufp
localAddr = &net.UDPAddr{IP: net.IPv4zero, Port: 0} localAddr = &net.UDPAddr{IP: net.IPv4zero, Port: 0}
peerAddr, _ = net.ResolveUDPAddr("udp", c.addr) peerAddr, _ = net.ResolveUDPAddr("udp", c.addr)
for { for {
......
# Run Hello Demo # Run Hello Demo
## 1. prepare ## 1. prepare
```bash ```bash
git clone https://github.com/dubbogo/getty.git git clone https://github.com/dubbogo/getty.git
...@@ -10,19 +10,30 @@ cd getty/demo/hello ...@@ -10,19 +10,30 @@ cd getty/demo/hello
## 2. run server ## 2. run server
run server: run server:
`go run tcp/server/server.go` `go run tcp/server/server.go`
Or run server in task pool mode: Or run server in task pool mode:
```bash ```bash
go run tcp/server/server.go -taskPool=true \ go run tcp/server/server.go -taskPool=true \
-task_queue_length=100 \ -task_queue_length=128 \
-task_queue_number=4 \ -task_queue_number=16 \
-task_pool_size=2000 -task_pool_size=2000 \
-pprof_port=60000
``` ```
## 3. run client ## 3. run client
```bash ```bash
go run tcp/client/client.go go run tcp/client/client.go
``` ```
\ No newline at end of file
Or run client in task pool mode:
```bash
go run tcp/client/client.go -taskPool=true \
-task_queue_length=100 \
-task_queue_number=4 \
-task_pool_size=50 \
-pprof_port=60001
```
...@@ -13,6 +13,7 @@ import ( ...@@ -13,6 +13,7 @@ import (
import ( import (
"github.com/dubbogo/getty" "github.com/dubbogo/getty"
"github.com/dubbogo/gost/sync"
) )
import ( import (
...@@ -24,12 +25,33 @@ import ( ...@@ -24,12 +25,33 @@ import (
var ( var (
ip = flag.String("ip", "127.0.0.1", "server IP") ip = flag.String("ip", "127.0.0.1", "server IP")
connections = flag.Int("conn", 1, "number of tcp connections") connections = flag.Int("conn", 1, "number of tcp connections")
taskPoolMode = flag.Bool("taskPool", false, "task pool mode")
taskPoolQueueLength = flag.Int("task_queue_length", 100, "task queue length")
taskPoolQueueNumber = flag.Int("task_queue_number", 4, "task queue number")
taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
pprofPort = flag.Int("pprof_port", 65431, "pprof http port")
)
var (
taskPool *gxsync.TaskPool
) )
func main() { func main() {
flag.Parse() flag.Parse()
util.SetLimit() util.SetLimit()
util.Profiling(*pprofPort)
if *taskPoolMode {
taskPool = gxsync.NewTaskPool(
gxsync.WithTaskPoolTaskQueueLength(*taskPoolQueueLength),
gxsync.WithTaskPoolTaskQueueNumber(*taskPoolQueueNumber),
gxsync.WithTaskPoolTaskPoolSize(*taskPoolSize),
)
}
client := getty.NewTCPClient( client := getty.NewTCPClient(
getty.WithServerAddress(*ip+":8090"), getty.WithServerAddress(*ip+":8090"),
getty.WithConnectionNumber(*connections), getty.WithConnectionNumber(*connections),
...@@ -41,3 +63,4 @@ func main() { ...@@ -41,3 +63,4 @@ func main() {
util.WaitCloseSignals(client) util.WaitCloseSignals(client)
} }
...@@ -26,11 +26,16 @@ var ( ...@@ -26,11 +26,16 @@ var (
eventListener = &hello.MessageHandler{} eventListener = &hello.MessageHandler{}
) )
func NewHelloClientSession(session getty.Session) (err error) { func NewHelloClientSession(session getty.Session, taskPool *gxsync.TaskPool) (err error) {
eventListener.SessionOnOpen = func(session getty.Session) { eventListener.SessionOnOpen = func(session getty.Session) {
hello.Sessions = append(hello.Sessions, session) hello.Sessions = append(hello.Sessions, session)
} }
return InitialSession(session) err = InitialSession(session)
if err != nil {
return
}
session.SetTaskPool(taskPool)
return
} }
func InitialSession(session getty.Session) (err error) { func InitialSession(session getty.Session) (err error) {
......
...@@ -22,14 +22,15 @@ import ( ...@@ -22,14 +22,15 @@ import (
) )
var ( var (
taskPollMode = flag.Bool("taskPool", false, "task pool mode") taskPoolMode = flag.Bool("taskPool", false, "task pool mode")
taskPollQueueLength = flag.Int("task_queue_length", 100, "task queue length") taskPoolQueueLength = flag.Int("task_queue_length", 100, "task queue length")
taskPollQueueNumber = flag.Int("task_queue_number", 4, "task queue number") taskPoolQueueNumber = flag.Int("task_queue_number", 4, "task queue number")
taskPollSize = flag.Int("task_pool_size", 2000, "task poll size") taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
pprofPort = flag.Int("pprof_port", 65432, "pprof http port")
) )
var ( var (
taskPoll *gxsync.TaskPool taskPool *gxsync.TaskPool
) )
func main() { func main() {
...@@ -37,13 +38,15 @@ func main() { ...@@ -37,13 +38,15 @@ func main() {
util.SetLimit() util.SetLimit()
util.Profiling(*pprofPort)
options := []getty.ServerOption{getty.WithLocalAddress(":8090")} options := []getty.ServerOption{getty.WithLocalAddress(":8090")}
if *taskPollMode { if *taskPoolMode {
taskPoll = gxsync.NewTaskPool( taskPool = gxsync.NewTaskPool(
gxsync.WithTaskPoolTaskQueueLength(*taskPollQueueLength), gxsync.WithTaskPoolTaskQueueLength(*taskPoolQueueLength),
gxsync.WithTaskPoolTaskQueueNumber(*taskPollQueueNumber), gxsync.WithTaskPoolTaskQueueNumber(*taskPoolQueueNumber),
gxsync.WithTaskPoolTaskPoolSize(*taskPollSize), gxsync.WithTaskPoolTaskPoolSize(*taskPoolSize),
) )
} }
...@@ -59,6 +62,6 @@ func NewHelloServerSession(session getty.Session) (err error) { ...@@ -59,6 +62,6 @@ func NewHelloServerSession(session getty.Session) (err error) {
if err != nil { if err != nil {
return return
} }
session.SetTaskPool(taskPoll) session.SetTaskPool(taskPool)
return return
} }
/******************************************************
# MAINTAINER : Alex Stocks
# LICENCE : Apache License 2.0
# EMAIL : alexstocks@foxmail.com
# MOD : 2019-07-25
******************************************************/
package util
import (
"fmt"
"net/http"
_ "net/http/pprof"
)
func Profiling(port int) {
go func() {
http.ListenAndServe(fmt.Sprintf(":%d", port), nil)
}()
}
...@@ -20,6 +20,7 @@ import ( ...@@ -20,6 +20,7 @@ import (
) )
import ( import (
gxbytes "github.com/dubbogo/gost/bytes"
gxsync "github.com/dubbogo/gost/sync" gxsync "github.com/dubbogo/gost/sync"
gxtime "github.com/dubbogo/gost/time" gxtime "github.com/dubbogo/gost/time"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
...@@ -413,16 +414,20 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error { ...@@ -413,16 +414,20 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error {
var ( var (
l int l int
err error err error
length uint32 length int
arrp *[]byte
arr []byte arr []byte
) )
length = 0 length = 0
for i := 0; i < len(pkgs); i++ { for i := 0; i < len(pkgs); i++ {
length += uint32(len(pkgs[i])) length += len(pkgs[i])
} }
// merge the pkgs // merge the pkgs
arr = make([]byte, length) // arr = make([]byte, length)
arrp = gxbytes.GetBytes(length)
defer gxbytes.PutBytes(arrp)
arr = *arrp
l = 0 l = 0
for i := 0; i < len(pkgs); i++ { for i := 0; i < len(pkgs); i++ {
copy(arr[l:], pkgs[i]) copy(arr[l:], pkgs[i])
...@@ -608,13 +613,24 @@ func (s *session) handleTCPPackage() error { ...@@ -608,13 +613,24 @@ func (s *session) handleTCPPackage() error {
exit bool exit bool
bufLen int bufLen int
pkgLen int pkgLen int
bufp *[]byte
buf []byte buf []byte
pktBuf *bytes.Buffer pktBuf *bytes.Buffer
pkg interface{} pkg interface{}
) )
buf = make([]byte, maxReadBufLen) // buf = make([]byte, maxReadBufLen)
pktBuf = new(bytes.Buffer) bufp = gxbytes.GetBytes(maxReadBufLen)
buf = *bufp
// pktBuf = new(bytes.Buffer)
pktBuf = gxbytes.GetBytesBuffer()
defer func() {
gxbytes.PutBytes(bufp)
gxbytes.PutBytesBuffer(pktBuf)
}()
conn = s.Connection.(*gettyTCPConn) conn = s.Connection.(*gettyTCPConn)
for { for {
if s.IsClosed() { if s.IsClosed() {
...@@ -689,6 +705,7 @@ func (s *session) handleUDPPackage() error { ...@@ -689,6 +705,7 @@ func (s *session) handleUDPPackage() error {
conn *gettyUDPConn conn *gettyUDPConn
bufLen int bufLen int
maxBufLen int maxBufLen int
bufp *[]byte
buf []byte buf []byte
addr *net.UDPAddr addr *net.UDPAddr
pkgLen int pkgLen int
...@@ -700,7 +717,9 @@ func (s *session) handleUDPPackage() error { ...@@ -700,7 +717,9 @@ func (s *session) handleUDPPackage() error {
if int(s.maxMsgLen<<1) < bufLen { if int(s.maxMsgLen<<1) < bufLen {
maxBufLen = int(s.maxMsgLen << 1) maxBufLen = int(s.maxMsgLen << 1)
} }
buf = make([]byte, maxBufLen) bufp = gxbytes.GetBytes(maxBufLen) //make([]byte, maxBufLen)
defer gxbytes.PutBytes(bufp)
buf = *bufp
for { for {
if s.IsClosed() { if s.IsClosed() {
break break
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment