Commit e545f6fa authored by AlexStocks's avatar AlexStocks

Imp: tcp stream handler

parent 8e2387f7
......@@ -8,6 +8,7 @@
package hello
import (
"encoding/binary"
"errors"
)
......@@ -18,16 +19,44 @@ import (
type PackageHandler struct{}
func (h *PackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
s := string(data)
return s, len(s), nil
dataLen := len(data)
if dataLen < 4 {
return nil, 0, nil
}
start := 0
pos := start + 4
pkgLen := int(binary.LittleEndian.Uint32(data[start:pos]))
if dataLen < pos+pkgLen {
return nil, pos + pkgLen, nil
}
start = pos
pos = start + pkgLen
s := string(data[start:pos])
return s, pos, nil
}
func (h *PackageHandler) Write(ss getty.Session, pkg interface{}) ([]byte, error) {
s, ok := pkg.(string)
func (h *PackageHandler) Write(ss getty.Session, p interface{}) ([]byte, error) {
pkg, ok := p.(string)
if !ok {
log.Infof("illegal pkg:%+v", pkg)
log.Infof("illegal pkg:%+v", p)
return nil, errors.New("invalid package")
}
return []byte(s), nil
pkgLen := int32(len(pkg))
pkgStreams := make([]byte, 0, 4+len(pkg))
// pkg len
start := 0
pos := start + 4
binary.LittleEndian.PutUint32(pkgStreams[start:pos], uint32(pkgLen))
start = pos
// pkg
pos = start + int(pkgLen)
copy(pkgStreams[start:pos], pkg[:])
return pkgStreams[:pos], nil
}
......@@ -57,9 +57,21 @@ func main() {
getty.WithConnectionNumber(*connections),
)
client.RunEventLoop(tcp.NewHelloClientSession)
client.RunEventLoop(NewHelloClientSession)
go hello.ClientRequest()
util.WaitCloseSignals(client)
taskPool.Close()
}
func NewHelloClientSession(session getty.Session) (err error) {
tcp.EventListener.SessionOnOpen = func(session getty.Session) {
hello.Sessions = append(hello.Sessions, session)
}
err = tcp.InitialSession(session)
if err != nil {
return
}
return
}
......@@ -15,7 +15,6 @@ import (
import (
"github.com/dubbogo/getty"
"github.com/dubbogo/gost/sync"
)
import (
......@@ -24,21 +23,9 @@ import (
var (
pkgHandler = &hello.PackageHandler{}
eventListener = &hello.MessageHandler{}
EventListener = &hello.MessageHandler{}
)
func NewHelloClientSession(session getty.Session, taskPool *gxsync.TaskPool) (err error) {
eventListener.SessionOnOpen = func(session getty.Session) {
hello.Sessions = append(hello.Sessions, session)
}
err = InitialSession(session)
if err != nil {
return
}
session.SetTaskPool(taskPool)
return
}
func InitialSession(session getty.Session) (err error) {
session.SetCompressType(getty.CompressZip)
......@@ -65,7 +52,7 @@ func InitialSession(session getty.Session) (err error) {
session.SetName("hello")
session.SetMaxMsgLen(128)
session.SetRQLen(1024)
// session.SetRQLen(1024)
session.SetWQLen(512)
session.SetReadTimeout(time.Second)
session.SetWriteTimeout(5 * time.Second)
......@@ -73,6 +60,6 @@ func InitialSession(session getty.Session) (err error) {
session.SetWaitTime(time.Second)
session.SetPkgHandler(pkgHandler)
session.SetEventListener(eventListener)
session.SetEventListener(EventListener)
return nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment