Commit d7967ea5 authored by AlexStocks's avatar AlexStocks Committed by watermelo

Imp: tcp stream handler

parent 5846fc59
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
package hello package hello
import ( import (
"encoding/binary"
"errors" "errors"
) )
...@@ -18,16 +19,44 @@ import ( ...@@ -18,16 +19,44 @@ import (
type PackageHandler struct{} type PackageHandler struct{}
func (h *PackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) { func (h *PackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
s := string(data) dataLen := len(data)
return s, len(s), nil if dataLen < 4 {
return nil, 0, nil
}
start := 0
pos := start + 4
pkgLen := int(binary.LittleEndian.Uint32(data[start:pos]))
if dataLen < pos+pkgLen {
return nil, pos + pkgLen, nil
}
start = pos
pos = start + pkgLen
s := string(data[start:pos])
return s, pos, nil
} }
func (h *PackageHandler) Write(ss getty.Session, pkg interface{}) ([]byte, error) { func (h *PackageHandler) Write(ss getty.Session, p interface{}) ([]byte, error) {
s, ok := pkg.(string) pkg, ok := p.(string)
if !ok { if !ok {
log.Infof("illegal pkg:%+v", pkg) log.Infof("illegal pkg:%+v", p)
return nil, errors.New("invalid package") return nil, errors.New("invalid package")
} }
return []byte(s), nil pkgLen := int32(len(pkg))
pkgStreams := make([]byte, 0, 4+len(pkg))
// pkg len
start := 0
pos := start + 4
binary.LittleEndian.PutUint32(pkgStreams[start:pos], uint32(pkgLen))
start = pos
// pkg
pos = start + int(pkgLen)
copy(pkgStreams[start:pos], pkg[:])
return pkgStreams[:pos], nil
} }
...@@ -57,9 +57,21 @@ func main() { ...@@ -57,9 +57,21 @@ func main() {
getty.WithConnectionNumber(*connections), getty.WithConnectionNumber(*connections),
) )
client.RunEventLoop(tcp.NewHelloClientSession) client.RunEventLoop(NewHelloClientSession)
go hello.ClientRequest() go hello.ClientRequest()
util.WaitCloseSignals(client) util.WaitCloseSignals(client)
taskPool.Close()
}
func NewHelloClientSession(session getty.Session) (err error) {
tcp.EventListener.SessionOnOpen = func(session getty.Session) {
hello.Sessions = append(hello.Sessions, session)
}
err = tcp.InitialSession(session)
if err != nil {
return
}
return
} }
...@@ -15,7 +15,6 @@ import ( ...@@ -15,7 +15,6 @@ import (
import ( import (
"github.com/dubbogo/getty" "github.com/dubbogo/getty"
"github.com/dubbogo/gost/sync"
) )
import ( import (
...@@ -24,21 +23,9 @@ import ( ...@@ -24,21 +23,9 @@ import (
var ( var (
pkgHandler = &hello.PackageHandler{} pkgHandler = &hello.PackageHandler{}
eventListener = &hello.MessageHandler{} EventListener = &hello.MessageHandler{}
) )
func NewHelloClientSession(session getty.Session, taskPool *gxsync.TaskPool) (err error) {
eventListener.SessionOnOpen = func(session getty.Session) {
hello.Sessions = append(hello.Sessions, session)
}
err = InitialSession(session)
if err != nil {
return
}
session.SetTaskPool(taskPool)
return
}
func InitialSession(session getty.Session) (err error) { func InitialSession(session getty.Session) (err error) {
session.SetCompressType(getty.CompressZip) session.SetCompressType(getty.CompressZip)
...@@ -65,7 +52,7 @@ func InitialSession(session getty.Session) (err error) { ...@@ -65,7 +52,7 @@ func InitialSession(session getty.Session) (err error) {
session.SetName("hello") session.SetName("hello")
session.SetMaxMsgLen(128) session.SetMaxMsgLen(128)
session.SetRQLen(1024) // session.SetRQLen(1024)
session.SetWQLen(512) session.SetWQLen(512)
session.SetReadTimeout(time.Second) session.SetReadTimeout(time.Second)
session.SetWriteTimeout(5 * time.Second) session.SetWriteTimeout(5 * time.Second)
...@@ -73,6 +60,6 @@ func InitialSession(session getty.Session) (err error) { ...@@ -73,6 +60,6 @@ func InitialSession(session getty.Session) (err error) {
session.SetWaitTime(time.Second) session.SetWaitTime(time.Second)
session.SetPkgHandler(pkgHandler) session.SetPkgHandler(pkgHandler)
session.SetEventListener(eventListener) session.SetEventListener(EventListener)
return nil return nil
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment