Commit d98ff0fe authored by AlexStocks's avatar AlexStocks

add unconnected udp endpoint

parent 33719044
......@@ -95,8 +95,8 @@ func newSession(session getty.Session) error {
tcpConn.SetReadBuffer(conf.GettySessionParam.TcpRBufSize)
tcpConn.SetWriteBuffer(conf.GettySessionParam.TcpWBufSize)
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetName(conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewEchoPackageHandler())
session.SetEventListener(newEchoMessageHandler())
session.SetRQLen(conf.GettySessionParam.PkgRQSize)
......
......@@ -96,8 +96,8 @@ func newSession(session getty.Session) error {
tcpConn.SetReadBuffer(conf.GettySessionParam.TcpRBufSize)
tcpConn.SetWriteBuffer(conf.GettySessionParam.TcpWBufSize)
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetName(conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewEchoPackageHandler())
session.SetEventListener(newEchoMessageHandler())
session.SetRQLen(conf.GettySessionParam.PkgRQSize)
......
......@@ -11,6 +11,7 @@ package main
import (
"math/rand"
"net"
"sync"
"sync/atomic"
"time"
......@@ -19,7 +20,6 @@ import (
import (
"github.com/AlexStocks/getty"
log "github.com/AlexStocks/log4go"
"net"
)
var (
......@@ -51,18 +51,18 @@ func (this *EchoClient) isAvailable() bool {
}
func (this *EchoClient) close() {
client.lock.Lock()
if client.gettyClient != nil {
this.lock.Lock()
if this.gettyClient != nil {
for _, s := range this.sessions {
log.Info("close client session{%s, last active:%s, request number:%d}",
s.session.Stat(), s.session.GetActive().String(), s.reqNum)
s.session.Close()
}
client.gettyClient.Close()
client.gettyClient = nil
client.sessions = client.sessions[:0]
this.gettyClient.Close()
this.gettyClient = nil
this.sessions = this.sessions[:0]
}
client.lock.Unlock()
this.lock.Unlock()
}
func (this *EchoClient) selectSession() getty.Session {
......@@ -86,6 +86,7 @@ func (this *EchoClient) addSession(session getty.Session) {
this.lock.Lock()
this.sessions = append(this.sessions, &clientEchoSession{session: session})
log.Debug("after add session{%s}, session number:%d", session.Stat(), len(this.sessions))
this.lock.Unlock()
}
......@@ -170,4 +171,5 @@ func (this *EchoClient) heartbeat(session getty.Session) {
this.removeSession(session)
}
log.Debug("session.WritePkg(session{%s}, context{%#v})", session.Stat(), ctx)
}
......@@ -58,7 +58,7 @@ type (
ServerPort int `default:"10000"`
ProfilePort int `default:"10086"`
// session pool
// client session pool
ConnectionNum int `default:"16"`
// heartbeat
......
......@@ -140,6 +140,10 @@ func (this *EchoPackage) Unmarshal(buf *bytes.Buffer) (int, error) {
return 0, nil
}
this.B = (string)(buf.Next((int)(len)))
//if strings.HasPrefix(this.B, "Hello, getty!") {
// gxlog.CError("idx:%d, body:%s", idx, this.B)
// idx++
//}
return (int)(this.H.Len) + echoPkgHeaderLen, nil
}
......@@ -33,26 +33,27 @@ type clientEchoSession struct {
}
type EchoMessageHandler struct {
client *EchoClient
}
func newEchoMessageHandler() *EchoMessageHandler {
return &EchoMessageHandler{}
func newEchoMessageHandler(client *EchoClient) *EchoMessageHandler {
return &EchoMessageHandler{client: client}
}
func (this *EchoMessageHandler) OnOpen(session getty.Session) error {
client.addSession(session)
this.client.addSession(session)
return nil
}
func (this *EchoMessageHandler) OnError(session getty.Session, err error) {
log.Info("session{%s} got error{%v}, will be closed.", session.Stat(), err)
client.removeSession(session)
this.client.removeSession(session)
}
func (this *EchoMessageHandler) OnClose(session getty.Session) {
log.Info("session{%s} is closing......", session.Stat())
client.removeSession(session)
this.client.removeSession(session)
}
func (this *EchoMessageHandler) OnMessage(session getty.Session, udpCtx interface{}) {
......@@ -68,11 +69,12 @@ func (this *EchoMessageHandler) OnMessage(session getty.Session, udpCtx interfac
}
log.Debug("get echo package{%s}", p)
client.updateSession(session)
this.client.updateSession(session)
}
func (this *EchoMessageHandler) OnCron(session getty.Session) {
clientEchoSession, err := client.getClientEchoSession(session)
clientEchoSession, err := this.client.getClientEchoSession(session)
if err != nil {
log.Error("client.getClientSession(session{%s}) = error{%#v}", session.Stat(), err)
return
......@@ -80,9 +82,9 @@ func (this *EchoMessageHandler) OnCron(session getty.Session) {
if conf.sessionTimeout.Nanoseconds() < time.Since(session.GetActive()).Nanoseconds() {
log.Warn("session{%s} timeout{%s}, reqNum{%d}",
session.Stat(), time.Since(session.GetActive()).String(), clientEchoSession.reqNum)
// client.removeSession(session)
// return
this.client.removeSession(session)
return
}
client.heartbeat(session)
this.client.heartbeat(session)
}
......@@ -36,11 +36,12 @@ const (
)
const (
WritePkgTimeout = 1e8
WritePkgTimeout = 1e9
)
var (
client EchoClient
connectedClient EchoClient
unconnectedClient EchoClient
)
////////////////////////////////////////////////////////////////////
......@@ -53,8 +54,8 @@ func main() {
initProfiling()
initClient()
gxlog.CInfo("%s starts successfull! its version=%s\n", conf.AppName, Version)
log.Info("%s starts successfull! its version=%s\n", conf.AppName, Version)
gxlog.CInfo("%s starts successfull! its version=%s\n", conf.AppName, Version)
go test()
......@@ -77,23 +78,40 @@ func newSession(session getty.Session) error {
var (
ok bool
udpConn *net.UDPConn
gettyClient getty.Client
client *EchoClient
)
if gettyClient, ok = session.EndPoint().(getty.Client); !ok {
panic(fmt.Sprintf("the endpoint type of session{%#v} is not getty.Client", session))
}
switch gettyClient {
case connectedClient.gettyClient:
client = &connectedClient
case unconnectedClient.gettyClient:
client = &unconnectedClient
default:
panic(fmt.Sprintf("illegal session{%#v} endpoint", session))
}
if conf.GettySessionParam.CompressEncoding {
session.SetCompressType(getty.CompressZip)
}
if udpConn, ok = session.Conn().(*net.UDPConn); !ok {
panic(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection\n", session.Stat(), session.Conn()))
panic(fmt.Sprintf("%s, session.conn{%#v} is not udp connection\n", session.Stat(), session.Conn()))
}
udpConn.SetReadBuffer(conf.GettySessionParam.UdpRBufSize)
udpConn.SetWriteBuffer(conf.GettySessionParam.UdpWBufSize)
session.SetMaxMsgLen(conf.MaxMsgLen)
session.SetName(conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewEchoPackageHandler())
session.SetEventListener(newEchoMessageHandler())
session.SetEventListener(newEchoMessageHandler(client))
session.SetRQLen(conf.GettySessionParam.PkgRQSize)
session.SetWQLen(conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(conf.GettySessionParam.udpReadTimeout)
......@@ -101,21 +119,28 @@ func newSession(session getty.Session) error {
session.SetCronPeriod((int)(conf.heartbeatPeriod.Nanoseconds() / 1e6))
session.SetWaitTime(conf.GettySessionParam.waitTimeout)
log.Debug("client new session:%s\n", session.Stat())
gxlog.CDebug("client new session:%s\n", session.Stat())
return nil
}
func initClient() {
client.gettyClient = getty.NewUDPClient(
unconnectedClient.gettyClient = getty.NewUDPPEndPoint(
getty.WithLocalAddress(gxnet.HostAddress(net.IPv4zero.String(), 0)),
)
unconnectedClient.gettyClient.RunEventLoop(newSession)
unconnectedClient.serverAddr = net.UDPAddr{IP: net.ParseIP(conf.ServerHost), Port: conf.ServerPort}
connectedClient.gettyClient = getty.NewUDPClient(
getty.WithServerAddress(gxnet.HostAddress(conf.ServerHost, conf.ServerPort)),
getty.WithConnectionNumber((int)(conf.ConnectionNum)),
)
client.gettyClient.RunEventLoop(newSession)
client.serverAddr = net.UDPAddr{IP: net.ParseIP(conf.ServerHost), Port: conf.ServerPort}
connectedClient.gettyClient.RunEventLoop(newSession)
}
func uninitClient() {
client.close()
connectedClient.close()
unconnectedClient.close()
}
func initSignal() {
......@@ -147,7 +172,7 @@ func initSignal() {
}
}
func echo(serverAddr *net.UDPAddr) {
func echo(client *EchoClient) {
var (
pkg EchoPackage
ctx getty.UDPContext
......@@ -162,7 +187,7 @@ func echo(serverAddr *net.UDPAddr) {
pkg.H.Len = (uint16)(len(pkg.B)) + 1
ctx.Pkg = &pkg
ctx.PeerAddr = serverAddr
ctx.PeerAddr = &(client.serverAddr)
if session := client.selectSession(); session != nil {
err := session.WritePkg(ctx, WritePkgTimeout)
......@@ -174,27 +199,29 @@ func echo(serverAddr *net.UDPAddr) {
}
}
func test() {
func testEchoClient(client *EchoClient) {
var (
cost int64
serverAddr net.UDPAddr
counter gxtime.CountWatch
)
for {
if client.isAvailable() {
if unconnectedClient.isAvailable() {
break
}
time.Sleep(1e6)
time.Sleep(3e9)
}
serverAddr = net.UDPAddr{IP: net.ParseIP(conf.ServerHost), Port: conf.ServerPort}
counter.Start()
for i := 0; i < conf.EchoTimes; i++ {
echo(&serverAddr)
echo(&unconnectedClient)
}
cost = counter.Count()
log.Info("after loop %d times, echo cost %d ms", conf.EchoTimes, cost/1e6)
gxlog.CInfo("after loop %d times, echo cost %d ms", conf.EchoTimes, cost/1e6)
log.Info("after loop %d times, client:%s echo cost %d ms",
conf.EchoTimes, client.gettyClient.EndPointType(), cost/1e6)
}
func test() {
testEchoClient(&unconnectedClient)
testEchoClient(&connectedClient)
}
......@@ -38,7 +38,7 @@ FailFastTimeout = "3s"
UdpWBufSize = 65536
PkgRQSize = 512
PkgWQSize = 256
UdpReadTimeout = "1s"
UdpReadTimeout = "10s"
UdpWriteTimeout = "5s"
WaitTimeout = "1s"
MaxMsgLen = 128
......
......@@ -92,8 +92,8 @@ func newSession(session getty.Session) error {
udpConn.SetReadBuffer(conf.GettySessionParam.UdpRBufSize)
udpConn.SetWriteBuffer(conf.GettySessionParam.UdpWBufSize)
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetName(conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewEchoPackageHandler())
session.SetEventListener(newEchoMessageHandler())
session.SetRQLen(conf.GettySessionParam.PkgRQSize)
......@@ -129,7 +129,7 @@ func initServer() {
}
for _, port := range portList {
addr = gxnet.HostAddress2(conf.Host, port)
server = getty.NewUDPPServer(
server = getty.NewUDPPEndPoint(
getty.WithLocalAddress(addr),
)
// run server
......
......@@ -25,7 +25,7 @@ FailFastTimeout = "3s"
UdpWBufSize = 524288
PkgRQSize = 1024
PkgWQSize = 512
UdpReadTimeout = "1s"
UdpReadTimeout = "10s"
UdpWriteTimeout = "5s"
WaitTimeout = "1s"
MaxMsgLen = 128
......
......@@ -96,8 +96,8 @@ func newSession(session getty.Session) error {
tcpConn.SetWriteBuffer(conf.GettySessionParam.TcpWBufSize)
}
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetName(conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewEchoPackageHandler())
session.SetEventListener(newEchoMessageHandler())
session.SetRQLen(conf.GettySessionParam.PkgRQSize)
......
......@@ -101,8 +101,8 @@ func newSession(session getty.Session) error {
tcpConn.SetWriteBuffer(conf.GettySessionParam.TcpWBufSize)
}
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetName(conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewEchoPackageHandler())
session.SetEventListener(newEchoMessageHandler())
session.SetRQLen(conf.GettySessionParam.PkgRQSize)
......
......@@ -96,8 +96,8 @@ func newSession(session getty.Session) error {
tcpConn.SetWriteBuffer(conf.GettySessionParam.TcpWBufSize)
}
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetName(conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewEchoPackageHandler())
session.SetEventListener(newEchoMessageHandler())
session.SetRQLen(conf.GettySessionParam.PkgRQSize)
......
......@@ -101,8 +101,8 @@ func newSession(session getty.Session) error {
tcpConn.SetWriteBuffer(conf.GettySessionParam.TcpWBufSize)
}
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetName(conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewEchoPackageHandler())
session.SetEventListener(newEchoMessageHandler())
session.SetRQLen(conf.GettySessionParam.PkgRQSize)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment