Commit 2ec937f9 authored by AlexStocks's avatar AlexStocks

use UDPPackage for udp client

parent 2fdf2ab2
...@@ -146,7 +146,11 @@ func (this *EchoClient) getClientEchoSession(session getty.Session) (clientEchoS ...@@ -146,7 +146,11 @@ func (this *EchoClient) getClientEchoSession(session getty.Session) (clientEchoS
} }
func (this *EchoClient) heartbeat(session getty.Session) { func (this *EchoClient) heartbeat(session getty.Session) {
var pkg EchoPackage var (
pkg EchoPackage
ctx getty.UDPContext
)
pkg.H.Magic = echoPkgMagic pkg.H.Magic = echoPkgMagic
pkg.H.LogID = (uint32)(src.Int63()) pkg.H.LogID = (uint32)(src.Int63())
pkg.H.Sequence = atomic.AddUint32(&reqID, 1) pkg.H.Sequence = atomic.AddUint32(&reqID, 1)
...@@ -155,8 +159,11 @@ func (this *EchoClient) heartbeat(session getty.Session) { ...@@ -155,8 +159,11 @@ func (this *EchoClient) heartbeat(session getty.Session) {
pkg.B = echoHeartbeatRequestString pkg.B = echoHeartbeatRequestString
pkg.H.Len = (uint16)(len(pkg.B) + 1) pkg.H.Len = (uint16)(len(pkg.B) + 1)
if err := session.WritePkg(&pkg, WritePkgTimeout); err != nil { ctx.Pkg = &pkg
log.Warn("session.WritePkg(session{%s}, pkg{%s}) = error{%v}", session.Stat(), pkg, err) ctx.PeerAddr = &(conf.serverAddr)
if err := session.WritePkg(ctx, WritePkgTimeout); err != nil {
log.Warn("session.WritePkg(session{%s}, context{%#v}) = error{%v}", session.Stat(), ctx, err)
session.Close() session.Close()
this.removeSession(session) this.removeSession(session)
......
...@@ -19,6 +19,7 @@ import ( ...@@ -19,6 +19,7 @@ import (
import ( import (
log "github.com/AlexStocks/log4go" log "github.com/AlexStocks/log4go"
config "github.com/koding/multiconfig" config "github.com/koding/multiconfig"
"net"
) )
const ( const (
...@@ -56,6 +57,7 @@ type ( ...@@ -56,6 +57,7 @@ type (
ServerHost string `default:"127.0.0.1"` ServerHost string `default:"127.0.0.1"`
ServerPort int `default:"10000"` ServerPort int `default:"10000"`
ProfilePort int `default:"10086"` ProfilePort int `default:"10086"`
serverAddr net.UDPAddr
// session pool // session pool
ConnectionNum int `default:"16"` ConnectionNum int `default:"16"`
...@@ -101,6 +103,9 @@ func initConf() { ...@@ -101,6 +103,9 @@ func initConf() {
} }
conf = new(Config) conf = new(Config)
config.MustLoadWithPath(confFile, conf) config.MustLoadWithPath(confFile, conf)
conf.serverAddr = net.UDPAddr{IP: net.ParseIP(conf.ServerHost), Port: conf.ServerPort}
conf.connectInterval, err = time.ParseDuration(conf.ConnectInterval) conf.connectInterval, err = time.ParseDuration(conf.ConnectInterval)
if err != nil { if err != nil {
panic(fmt.Sprintf("time.ParseDuration(ConnectionInterval{%#v}) = error{%v}", conf.ConnectInterval, err)) panic(fmt.Sprintf("time.ParseDuration(ConnectionInterval{%#v}) = error{%v}", conf.ConnectInterval, err))
......
...@@ -55,10 +55,15 @@ func (this *EchoMessageHandler) OnClose(session getty.Session) { ...@@ -55,10 +55,15 @@ func (this *EchoMessageHandler) OnClose(session getty.Session) {
client.removeSession(session) client.removeSession(session)
} }
func (this *EchoMessageHandler) OnMessage(session getty.Session, pkg interface{}) { func (this *EchoMessageHandler) OnMessage(session getty.Session, udpCtx interface{}) {
p, ok := pkg.(*EchoPackage) ctx, ok := udpCtx.(getty.UDPContext)
if !ok { if !ok {
log.Error("illegal packge{%#v}", pkg) log.Error("illegal UDPContext{%#v}", udpCtx)
return
}
p, ok := ctx.Pkg.(*EchoPackage)
if !ok {
log.Error("illegal packge{%#v}", ctx.Pkg)
return return
} }
......
...@@ -147,7 +147,11 @@ func initSignal() { ...@@ -147,7 +147,11 @@ func initSignal() {
} }
func echo() { func echo() {
var pkg EchoPackage var (
pkg EchoPackage
ctx getty.UDPContext
)
pkg.H.Magic = echoPkgMagic pkg.H.Magic = echoPkgMagic
pkg.H.LogID = (uint32)(src.Int63()) pkg.H.LogID = (uint32)(src.Int63())
pkg.H.Sequence = atomic.AddUint32(&reqID, 1) pkg.H.Sequence = atomic.AddUint32(&reqID, 1)
...@@ -156,10 +160,13 @@ func echo() { ...@@ -156,10 +160,13 @@ func echo() {
pkg.B = conf.EchoString pkg.B = conf.EchoString
pkg.H.Len = (uint16)(len(pkg.B)) + 1 pkg.H.Len = (uint16)(len(pkg.B)) + 1
ctx.Pkg = &pkg
ctx.PeerAddr = &(conf.serverAddr)
if session := client.selectSession(); session != nil { if session := client.selectSession(); session != nil {
err := session.WritePkg(&pkg, WritePkgTimeout) err := session.WritePkg(ctx, WritePkgTimeout)
if err != nil { if err != nil {
log.Warn("session.WritePkg(session{%s}, pkg{%s}) = error{%v}", session.Stat(), pkg, err) log.Warn("session.WritePkg(session{%s}, UDPContext{%#v}) = error{%v}", session.Stat(), ctx, err)
session.Close() session.Close()
client.removeSession(session) client.removeSession(session)
} }
......
...@@ -12,6 +12,7 @@ package main ...@@ -12,6 +12,7 @@ package main
import ( import (
"bytes" "bytes"
"errors" "errors"
"fmt"
"time" "time"
) )
...@@ -48,18 +49,25 @@ func (this *EchoPackageHandler) Read(ss getty.Session, data []byte) (interface{} ...@@ -48,18 +49,25 @@ func (this *EchoPackageHandler) Read(ss getty.Session, data []byte) (interface{}
return &pkg, len, nil return &pkg, len, nil
} }
func (this *EchoPackageHandler) Write(ss getty.Session, pkg interface{}) error { func (this *EchoPackageHandler) Write(ss getty.Session, udpCtx interface{}) error {
var ( var (
ok bool ok bool
err error err error
startTime time.Time startTime time.Time
echoPkg *EchoPackage echoPkg *EchoPackage
buf *bytes.Buffer buf *bytes.Buffer
ctx getty.UDPContext
) )
ctx, ok = udpCtx.(getty.UDPContext)
if !ok {
log.Error("illegal UDPContext{%#v}", udpCtx)
return fmt.Errorf("illegal @udpCtx{%#v}", udpCtx)
}
startTime = time.Now() startTime = time.Now()
if echoPkg, ok = pkg.(*EchoPackage); !ok { if echoPkg, ok = ctx.Pkg.(*EchoPackage); !ok {
log.Error("illegal pkg:%+v\n", pkg) log.Error("illegal pkg:%+v\n", ctx.Pkg)
return errors.New("invalid echo package!") return errors.New("invalid echo package!")
} }
...@@ -69,7 +77,7 @@ func (this *EchoPackageHandler) Write(ss getty.Session, pkg interface{}) error { ...@@ -69,7 +77,7 @@ func (this *EchoPackageHandler) Write(ss getty.Session, pkg interface{}) error {
return err return err
} }
err = ss.WriteBytes(buf.Bytes()) _, err = ss.Write(getty.UDPContext{Pkg: buf.Bytes(), PeerAddr: ctx.PeerAddr})
log.Info("WriteEchoPkgTimeMs = %s", time.Since(startTime).String()) log.Info("WriteEchoPkgTimeMs = %s", time.Since(startTime).String())
return err return err
......
...@@ -12,11 +12,11 @@ package main ...@@ -12,11 +12,11 @@ package main
import ( import (
"bytes" "bytes"
"errors" "errors"
"fmt"
"time" "time"
) )
import ( import (
"fmt"
"github.com/AlexStocks/getty" "github.com/AlexStocks/getty"
log "github.com/AlexStocks/log4go" log "github.com/AlexStocks/log4go"
) )
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment