Commit 509f1bdd authored by AlexStocks's avatar AlexStocks

write udp pkg asap

parent c3fb7ea5
...@@ -150,6 +150,7 @@ func (this *EchoClient) getClientEchoSession(session getty.Session) (clientEchoS ...@@ -150,6 +150,7 @@ func (this *EchoClient) getClientEchoSession(session getty.Session) (clientEchoS
func (this *EchoClient) heartbeat(session getty.Session) { func (this *EchoClient) heartbeat(session getty.Session) {
var ( var (
err error
pkg EchoPackage pkg EchoPackage
ctx getty.UDPContext ctx getty.UDPContext
) )
...@@ -166,7 +167,7 @@ func (this *EchoClient) heartbeat(session getty.Session) { ...@@ -166,7 +167,7 @@ func (this *EchoClient) heartbeat(session getty.Session) {
ctx.PeerAddr = &(this.serverAddr) ctx.PeerAddr = &(this.serverAddr)
//if err := session.WritePkg(ctx, WritePkgTimeout); err != nil { //if err := session.WritePkg(ctx, WritePkgTimeout); err != nil {
if err := session.WritePkg(ctx, WritePkgASAP); err != nil { if err = session.WritePkg(ctx, WritePkgASAP); err != nil {
log.Warn("session.WritePkg(session{%s}, context{%#v}) = error{%v}", session.Stat(), ctx, err) log.Warn("session.WritePkg(session{%s}, context{%#v}) = error{%v}", session.Stat(), ctx, err)
session.Close() session.Close()
......
...@@ -16,6 +16,7 @@ import ( ...@@ -16,6 +16,7 @@ import (
import ( import (
"github.com/AlexStocks/getty" "github.com/AlexStocks/getty"
"github.com/AlexStocks/goext/log"
log "github.com/AlexStocks/log4go" log "github.com/AlexStocks/log4go"
) )
...@@ -69,6 +70,7 @@ func (this *EchoMessageHandler) OnMessage(session getty.Session, udpCtx interfac ...@@ -69,6 +70,7 @@ func (this *EchoMessageHandler) OnMessage(session getty.Session, udpCtx interfac
} }
log.Debug("get echo package{%s}", p) log.Debug("get echo package{%s}", p)
gxlog.CError("session:%s, get echo package{%s}", session.Stat(), p)
this.client.updateSession(session) this.client.updateSession(session)
} }
......
...@@ -17,7 +17,7 @@ import ( ...@@ -17,7 +17,7 @@ import (
_ "net/http/pprof" _ "net/http/pprof"
"os" "os"
"os/signal" "os/signal"
// "strings"
"sync/atomic" "sync/atomic"
"syscall" "syscall"
"time" "time"
...@@ -81,6 +81,7 @@ func newSession(session getty.Session) error { ...@@ -81,6 +81,7 @@ func newSession(session getty.Session) error {
udpConn *net.UDPConn udpConn *net.UDPConn
gettyClient getty.Client gettyClient getty.Client
client *EchoClient client *EchoClient
sessionName string
) )
if gettyClient, ok = session.EndPoint().(getty.Client); !ok { if gettyClient, ok = session.EndPoint().(getty.Client); !ok {
...@@ -90,9 +91,11 @@ func newSession(session getty.Session) error { ...@@ -90,9 +91,11 @@ func newSession(session getty.Session) error {
switch gettyClient { switch gettyClient {
case connectedClient.gettyClient: case connectedClient.gettyClient:
client = &connectedClient client = &connectedClient
sessionName = "connected-" + conf.GettySessionParam.SessionName
case unconnectedClient.gettyClient: case unconnectedClient.gettyClient:
client = &unconnectedClient client = &unconnectedClient
sessionName = "unconnected-" + conf.GettySessionParam.SessionName
default: default:
panic(fmt.Sprintf("illegal session{%#v} endpoint", session)) panic(fmt.Sprintf("illegal session{%#v} endpoint", session))
...@@ -109,7 +112,7 @@ func newSession(session getty.Session) error { ...@@ -109,7 +112,7 @@ func newSession(session getty.Session) error {
udpConn.SetReadBuffer(conf.GettySessionParam.UdpRBufSize) udpConn.SetReadBuffer(conf.GettySessionParam.UdpRBufSize)
udpConn.SetWriteBuffer(conf.GettySessionParam.UdpWBufSize) udpConn.SetWriteBuffer(conf.GettySessionParam.UdpWBufSize)
session.SetName(conf.GettySessionParam.SessionName) session.SetName(sessionName)
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen) session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewEchoPackageHandler()) session.SetPkgHandler(NewEchoPackageHandler())
session.SetEventListener(newEchoMessageHandler(client)) session.SetEventListener(newEchoMessageHandler(client))
...@@ -175,6 +178,7 @@ func initSignal() { ...@@ -175,6 +178,7 @@ func initSignal() {
func echo(client *EchoClient) { func echo(client *EchoClient) {
var ( var (
err error
pkg EchoPackage pkg EchoPackage
ctx getty.UDPContext ctx getty.UDPContext
) )
...@@ -191,7 +195,8 @@ func echo(client *EchoClient) { ...@@ -191,7 +195,8 @@ func echo(client *EchoClient) {
ctx.PeerAddr = &(client.serverAddr) ctx.PeerAddr = &(client.serverAddr)
if session := client.selectSession(); session != nil { if session := client.selectSession(); session != nil {
err := session.WritePkg(ctx, WritePkgTimeout) // err := session.WritePkg(ctx, WritePkgTimeout)
err = session.WritePkg(ctx, WritePkgASAP)
if err != nil { if err != nil {
log.Warn("session.WritePkg(session{%s}, UDPContext{%#v}) = error{%v}", session.Stat(), ctx, err) log.Warn("session.WritePkg(session{%s}, UDPContext{%#v}) = error{%v}", session.Stat(), ctx, err)
session.Close() session.Close()
...@@ -207,7 +212,7 @@ func testEchoClient(client *EchoClient) { ...@@ -207,7 +212,7 @@ func testEchoClient(client *EchoClient) {
) )
for { for {
if unconnectedClient.isAvailable() { if client.isAvailable() {
break break
} }
time.Sleep(3e9) time.Sleep(3e9)
...@@ -215,11 +220,10 @@ func testEchoClient(client *EchoClient) { ...@@ -215,11 +220,10 @@ func testEchoClient(client *EchoClient) {
counter.Start() counter.Start()
for i := 0; i < conf.EchoTimes; i++ { for i := 0; i < conf.EchoTimes; i++ {
echo(&unconnectedClient) echo(client)
} }
cost = counter.Count() cost = counter.Count()
log.Info("after loop %d times, client:%s echo cost %d ms", log.Info("after loop %d times, echo cost %d ms", conf.EchoTimes, cost/1e6)
conf.EchoTimes, client.gettyClient.EndPointType(), cost/1e6)
} }
func test() { func test() {
......
...@@ -66,7 +66,7 @@ func (this *EchoPackageHandler) Write(ss getty.Session, udpCtx interface{}) erro ...@@ -66,7 +66,7 @@ func (this *EchoPackageHandler) Write(ss getty.Session, udpCtx interface{}) erro
startTime = time.Now() startTime = time.Now()
if echoPkg, ok = ctx.Pkg.(*EchoPackage); !ok { if echoPkg, ok = ctx.Pkg.(*EchoPackage); !ok {
log.Error("illegal pkg:%+v\n", ctx.Pkg) log.Error("illegal pkg:%+v, its type:%T\n", ctx.Pkg, ctx.Pkg)
return errors.New("invalid echo package!") return errors.New("invalid echo package!")
} }
...@@ -76,8 +76,7 @@ func (this *EchoPackageHandler) Write(ss getty.Session, udpCtx interface{}) erro ...@@ -76,8 +76,7 @@ func (this *EchoPackageHandler) Write(ss getty.Session, udpCtx interface{}) erro
return err return err
} }
// _, err = ss.Write(getty.UDPContext{Pkg: buf.Bytes(), PeerAddr: ctx.PeerAddr}) _, err = ss.Write(getty.UDPContext{Pkg: buf.Bytes(), PeerAddr: ctx.PeerAddr})
err = ss.WritePkg(getty.UDPContext{Pkg: buf.Bytes(), PeerAddr: ctx.PeerAddr}, WritePkgASAP)
log.Info("WriteEchoPkgTimeMs = %s", time.Since(startTime).String()) log.Info("WriteEchoPkgTimeMs = %s", time.Since(startTime).String())
return err return err
......
...@@ -26,7 +26,7 @@ SessionTimeout = "20s" ...@@ -26,7 +26,7 @@ SessionTimeout = "20s"
# client echo request string # client echo request string
EchoString = "Hello, getty!" EchoString = "Hello, getty!"
# 发送echo请求次数 # 发送echo请求次数
EchoTimes = 10000 EchoTimes = 100000
# app fail fast # app fail fast
FailFastTimeout = "3s" FailFastTimeout = "3s"
......
...@@ -11,12 +11,12 @@ package main ...@@ -11,12 +11,12 @@ package main
import ( import (
"errors" "errors"
"fmt"
"sync" "sync"
"time" "time"
) )
import ( import (
"fmt"
"github.com/AlexStocks/getty" "github.com/AlexStocks/getty"
log "github.com/AlexStocks/log4go" log "github.com/AlexStocks/log4go"
) )
...@@ -42,8 +42,9 @@ type HeartbeatHandler struct{} ...@@ -42,8 +42,9 @@ type HeartbeatHandler struct{}
func (this *HeartbeatHandler) Handle(session getty.Session, ctx getty.UDPContext) error { func (this *HeartbeatHandler) Handle(session getty.Session, ctx getty.UDPContext) error {
var ( var (
pkg *EchoPackage ok bool
ok bool pkg *EchoPackage
rspPkg EchoPackage
) )
log.Debug("get echo heartbeat udp context{%#v}", ctx) log.Debug("get echo heartbeat udp context{%#v}", ctx)
...@@ -51,7 +52,6 @@ func (this *HeartbeatHandler) Handle(session getty.Session, ctx getty.UDPContext ...@@ -51,7 +52,6 @@ func (this *HeartbeatHandler) Handle(session getty.Session, ctx getty.UDPContext
return fmt.Errorf("illegal @ctx.Pkg:%#v", ctx.Pkg) return fmt.Errorf("illegal @ctx.Pkg:%#v", ctx.Pkg)
} }
var rspPkg EchoPackage
rspPkg.H = pkg.H rspPkg.H = pkg.H
rspPkg.B = echoHeartbeatResponseString rspPkg.B = echoHeartbeatResponseString
rspPkg.H.Len = uint16(len(rspPkg.B) + 1) rspPkg.H.Len = uint16(len(rspPkg.B) + 1)
......
...@@ -67,7 +67,7 @@ func (this *EchoPackageHandler) Write(ss getty.Session, udpCtx interface{}) erro ...@@ -67,7 +67,7 @@ func (this *EchoPackageHandler) Write(ss getty.Session, udpCtx interface{}) erro
startTime = time.Now() startTime = time.Now()
if echoPkg, ok = ctx.Pkg.(*EchoPackage); !ok { if echoPkg, ok = ctx.Pkg.(*EchoPackage); !ok {
log.Error("illegal pkg:%+v\n", ctx.Pkg) log.Error("illegal pkg:%+v, addr:%s\n", ctx.Pkg, ctx.PeerAddr)
return errors.New("invalid echo package!") return errors.New("invalid echo package!")
} }
...@@ -77,8 +77,7 @@ func (this *EchoPackageHandler) Write(ss getty.Session, udpCtx interface{}) erro ...@@ -77,8 +77,7 @@ func (this *EchoPackageHandler) Write(ss getty.Session, udpCtx interface{}) erro
return err return err
} }
// _, err = ss.Write(getty.UDPContext{Pkg: buf.Bytes(), PeerAddr: ctx.PeerAddr}) _, err = ss.Write(getty.UDPContext{Pkg: buf.Bytes(), PeerAddr: ctx.PeerAddr})
err = ss.WritePkg(getty.UDPContext{Pkg: buf.Bytes(), PeerAddr: ctx.PeerAddr}, WritePkgASAP)
log.Info("WriteEchoPkgTimeMs = %s", time.Since(startTime).String()) log.Info("WriteEchoPkgTimeMs = %s", time.Since(startTime).String())
return err return err
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment