Commit 7ea8cfb5 authored by hudangwei's avatar hudangwei

优化一些细节

parent 27a8e0c8
...@@ -383,6 +383,7 @@ func (c *client) RunEventLoop(newSession NewSessionCallback) { ...@@ -383,6 +383,7 @@ func (c *client) RunEventLoop(newSession NewSessionCallback) {
c.newSession = newSession c.newSession = newSession
c.Unlock() c.Unlock()
log.Info("run")
c.wg.Add(1) c.wg.Add(1)
// a for-loop goroutine to make sure the connection is valid // a for-loop goroutine to make sure the connection is valid
go func() { go func() {
......
...@@ -15,6 +15,7 @@ import ( ...@@ -15,6 +15,7 @@ import (
"fmt" "fmt"
"io" "io"
"net" "net"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
) )
...@@ -180,6 +181,7 @@ func newGettyTCPConn(conn net.Conn) *gettyTCPConn { ...@@ -180,6 +181,7 @@ func newGettyTCPConn(conn net.Conn) *gettyTCPConn {
// for zip compress // for zip compress
type writeFlusher struct { type writeFlusher struct {
flusher *flate.Writer flusher *flate.Writer
lock sync.Mutex
} }
func (t *writeFlusher) Write(p []byte) (int, error) { func (t *writeFlusher) Write(p []byte) (int, error) {
...@@ -187,7 +189,8 @@ func (t *writeFlusher) Write(p []byte) (int, error) { ...@@ -187,7 +189,8 @@ func (t *writeFlusher) Write(p []byte) (int, error) {
n int n int
err error err error
) )
t.lock.Lock()
defer t.lock.Unlock()
n, err = t.flusher.Write(p) n, err = t.flusher.Write(p)
if err != nil { if err != nil {
return n, jerrors.Trace(err) return n, jerrors.Trace(err)
......
package getty
import (
"fmt"
"time"
)
var ErrScheduleTimeout = fmt.Errorf("schedule error: timed out")
type Pool struct {
sem chan struct{}
work chan func()
}
func NewPool(size, queue, spawn int) *Pool {
if spawn <= 0 && queue > 0 {
panic("dead queue configuration detected")
}
if spawn > size {
panic("spawn > workers")
}
p := &Pool{
sem: make(chan struct{}, size),
work: make(chan func(), queue),
}
for i := 0; i < spawn; i++ {
p.sem <- struct{}{}
go p.worker(func() {})
}
return p
}
func (p *Pool) ScheduleTimeout(timeout time.Duration, task func()) error {
return p.schedule(task, time.After(timeout))
}
func (p *Pool) schedule(task func(), timeout <-chan time.Time) error {
select {
case <-timeout:
return ErrScheduleTimeout
case p.work <- task:
return nil
case p.sem <- struct{}{}:
go p.worker(task)
return nil
}
}
func (p *Pool) worker(task func()) {
defer func() { <-p.sem }()
task()
for task := range p.work {
task()
}
}
...@@ -23,7 +23,6 @@ var ( ...@@ -23,7 +23,6 @@ var (
errInvalidAddress = jerrors.New("remote address invalid or empty") errInvalidAddress = jerrors.New("remote address invalid or empty")
errSessionNotExist = jerrors.New("session not exist") errSessionNotExist = jerrors.New("session not exist")
errClientClosed = jerrors.New("client closed") errClientClosed = jerrors.New("client closed")
src = rand.NewSource(time.Now().UnixNano())
) )
func init() { func init() {
...@@ -35,16 +34,16 @@ type Client struct { ...@@ -35,16 +34,16 @@ type Client struct {
lock sync.RWMutex lock sync.RWMutex
sessions []*rpcSession sessions []*rpcSession
gettyClient getty.Client gettyClient getty.Client
codecType SerializeType
sequence uint64 sequence uint64
pendingLock sync.RWMutex pendingLock sync.RWMutex
pendingResponses map[uint64]*PendingResponse pendingResponses map[uint64]*PendingResponse
sendLock sync.Mutex
} }
func NewClient(conf *ClientConfig) *Client { func NewClient(confFile string) *Client {
conf := loadClientConf(confFile)
c := &Client{ c := &Client{
pendingResponses: make(map[uint64]*PendingResponse), pendingResponses: make(map[uint64]*PendingResponse),
conf: conf, conf: conf,
...@@ -52,6 +51,7 @@ func NewClient(conf *ClientConfig) *Client { ...@@ -52,6 +51,7 @@ func NewClient(conf *ClientConfig) *Client {
getty.WithServerAddress(gxnet.HostAddress(conf.ServerHost, conf.ServerPort)), getty.WithServerAddress(gxnet.HostAddress(conf.ServerHost, conf.ServerPort)),
getty.WithConnectionNumber((int)(conf.ConnectionNum)), getty.WithConnectionNumber((int)(conf.ConnectionNum)),
), ),
codecType: JSON,
} }
c.gettyClient.RunEventLoop(c.newSession) c.gettyClient.RunEventLoop(c.newSession)
idx := 1 idx := 1
...@@ -71,6 +71,10 @@ func NewClient(conf *ClientConfig) *Client { ...@@ -71,6 +71,10 @@ func NewClient(conf *ClientConfig) *Client {
return c return c
} }
func (c *Client) SetCodecType(st SerializeType) {
c.codecType = st
}
func (c *Client) newSession(session getty.Session) error { func (c *Client) newSession(session getty.Session) error {
var ( var (
ok bool ok bool
...@@ -113,14 +117,14 @@ func (c *Client) Sequence() uint64 { ...@@ -113,14 +117,14 @@ func (c *Client) Sequence() uint64 {
} }
func (c *Client) Call(service, method string, args interface{}, reply interface{}) error { func (c *Client) Call(service, method string, args interface{}, reply interface{}) error {
req := NewGettyRPCRequest(nil) b := &GettyRPCRequest{}
req.header.Service = service b.header.Service = service
req.header.Method = method b.header.Method = method
req.header.CallType = gettyTwoWay b.header.CallType = gettyTwoWay
if reply == nil { if reply == nil {
req.header.CallType = gettyTwoWayNoReply b.header.CallType = gettyTwoWayNoReply
} }
req.body = args b.body = args
resp := NewPendingResponse() resp := NewPendingResponse()
resp.reply = reply resp.reply = reply
...@@ -130,7 +134,7 @@ func (c *Client) Call(service, method string, args interface{}, reply interface{ ...@@ -130,7 +134,7 @@ func (c *Client) Call(service, method string, args interface{}, reply interface{
return errSessionNotExist return errSessionNotExist
} }
if err := c.transfer(session, req, resp); err != nil { if err := c.transfer(session, b, resp); err != nil {
return jerrors.Trace(err) return jerrors.Trace(err)
} }
<-resp.done <-resp.done
...@@ -147,30 +151,24 @@ func (c *Client) isAvailable() bool { ...@@ -147,30 +151,24 @@ func (c *Client) isAvailable() bool {
} }
func (c *Client) Close() { func (c *Client) Close() {
var sessions *[]*rpcSession
c.lock.Lock() c.lock.Lock()
if c.gettyClient != nil { if c.gettyClient != nil {
sessions = &(c.sessions) for _, s := range c.sessions {
c.sessions = nil
c.gettyClient.Close()
c.gettyClient = nil
c.sessions = c.sessions[:0]
}
c.lock.Unlock()
if sessions != nil {
for _, s := range *sessions {
log.Info("close client session{%s, last active:%s, request number:%d}", log.Info("close client session{%s, last active:%s, request number:%d}",
s.session.Stat(), s.session.GetActive().String(), s.reqNum) s.session.Stat(), s.session.GetActive().String(), s.reqNum)
s.session.Close() s.session.Close()
} }
c.gettyClient.Close()
c.gettyClient = nil
c.sessions = c.sessions[:0]
} }
c.lock.Unlock()
} }
func (c *Client) selectSession() getty.Session { func (c *Client) selectSession() getty.Session {
c.lock.RLock() c.lock.RLock()
defer c.lock.RUnlock() defer c.lock.RUnlock()
if c.sessions == nil { if c.sessions == nil {
return nil return nil
} }
...@@ -189,12 +187,8 @@ func (c *Client) addSession(session getty.Session) { ...@@ -189,12 +187,8 @@ func (c *Client) addSession(session getty.Session) {
} }
c.lock.Lock() c.lock.Lock()
defer c.lock.Unlock()
if c.sessions == nil {
return
}
c.sessions = append(c.sessions, &rpcSession{session: session}) c.sessions = append(c.sessions, &rpcSession{session: session})
c.lock.Unlock()
} }
func (c *Client) removeSession(session getty.Session) { func (c *Client) removeSession(session getty.Session) {
...@@ -259,8 +253,9 @@ func (c *Client) getClientRpcSession(session getty.Session) (rpcSession, error) ...@@ -259,8 +253,9 @@ func (c *Client) getClientRpcSession(session getty.Session) (rpcSession, error)
return rpcSession, jerrors.Trace(err) return rpcSession, jerrors.Trace(err)
} }
func (c *Client) ping(session getty.Session) error { func (c *Client) heartbeat(session getty.Session) error {
return c.transfer(session, nil, nil) resp := NewPendingResponse()
return c.transfer(session, nil, resp)
} }
func (c *Client) transfer(session getty.Session, req *GettyRPCRequest, resp *PendingResponse) error { func (c *Client) transfer(session getty.Session, req *GettyRPCRequest, resp *PendingResponse) error {
...@@ -272,19 +267,18 @@ func (c *Client) transfer(session getty.Session, req *GettyRPCRequest, resp *Pen ...@@ -272,19 +267,18 @@ func (c *Client) transfer(session getty.Session, req *GettyRPCRequest, resp *Pen
sequence = c.Sequence() sequence = c.Sequence()
pkg.H.Magic = gettyPackageMagic pkg.H.Magic = gettyPackageMagic
pkg.H.LogID = (uint32)(src.Int63()) pkg.H.LogID = (uint32)(randomID())
pkg.H.Sequence = sequence pkg.H.Sequence = sequence
pkg.H.Command = gettyCmdHbRequest pkg.H.Command = gettyCmdHbRequest
if req != nil && resp != nil { pkg.H.CodecType = c.codecType
if req != nil {
pkg.H.Command = gettyCmdRPCRequest pkg.H.Command = gettyCmdRPCRequest
pkg.B = req pkg.B = req
resp.seq = sequence
c.AddPendingResponse(resp)
} }
c.sendLock.Lock() resp.seq = sequence
defer c.sendLock.Unlock() c.AddPendingResponse(resp)
err = session.WritePkg(pkg, 0) err = session.WritePkg(pkg, 0)
if err != nil && resp != nil { if err != nil && resp != nil {
c.RemovePendingResponse(resp.seq) c.RemovePendingResponse(resp.seq)
......
This diff is collapsed.
// Code generated by easyjson for marshaling/unmarshaling. DO NOT EDIT.
package rpc
import (
json "encoding/json"
easyjson "github.com/mailru/easyjson"
jlexer "github.com/mailru/easyjson/jlexer"
jwriter "github.com/mailru/easyjson/jwriter"
)
// suppress unused package warning
var (
_ *json.RawMessage
_ *jlexer.Lexer
_ *jwriter.Writer
_ easyjson.Marshaler
)
func easyjson38c57360DecodeGithubComAlexStocksGettyRpc(in *jlexer.Lexer, out *GettyRPCRequestHeader) {
isTopLevel := in.IsStart()
if in.IsNull() {
if isTopLevel {
in.Consumed()
}
in.Skip()
return
}
in.Delim('{')
for !in.IsDelim('}') {
key := in.UnsafeString()
in.WantColon()
if in.IsNull() {
in.Skip()
in.WantComma()
continue
}
switch key {
case "service":
out.Service = string(in.String())
case "method":
out.Method = string(in.String())
case "call_type":
out.CallType = gettyCallType(in.Uint32())
default:
in.SkipRecursive()
}
in.WantComma()
}
in.Delim('}')
if isTopLevel {
in.Consumed()
}
}
func easyjson38c57360EncodeGithubComAlexStocksGettyRpc(out *jwriter.Writer, in GettyRPCRequestHeader) {
out.RawByte('{')
first := true
_ = first
if in.Service != "" {
const prefix string = ",\"service\":"
if first {
first = false
out.RawString(prefix[1:])
} else {
out.RawString(prefix)
}
out.String(string(in.Service))
}
if in.Method != "" {
const prefix string = ",\"method\":"
if first {
first = false
out.RawString(prefix[1:])
} else {
out.RawString(prefix)
}
out.String(string(in.Method))
}
if in.CallType != 0 {
const prefix string = ",\"call_type\":"
if first {
first = false
out.RawString(prefix[1:])
} else {
out.RawString(prefix)
}
out.Uint32(uint32(in.CallType))
}
out.RawByte('}')
}
// MarshalJSON supports json.Marshaler interface
func (v GettyRPCRequestHeader) MarshalJSON() ([]byte, error) {
w := jwriter.Writer{}
easyjson38c57360EncodeGithubComAlexStocksGettyRpc(&w, v)
return w.Buffer.BuildBytes(), w.Error
}
// MarshalEasyJSON supports easyjson.Marshaler interface
func (v GettyRPCRequestHeader) MarshalEasyJSON(w *jwriter.Writer) {
easyjson38c57360EncodeGithubComAlexStocksGettyRpc(w, v)
}
// UnmarshalJSON supports json.Unmarshaler interface
func (v *GettyRPCRequestHeader) UnmarshalJSON(data []byte) error {
r := jlexer.Lexer{Data: data}
easyjson38c57360DecodeGithubComAlexStocksGettyRpc(&r, v)
return r.Error()
}
// UnmarshalEasyJSON supports easyjson.Unmarshaler interface
func (v *GettyRPCRequestHeader) UnmarshalEasyJSON(l *jlexer.Lexer) {
easyjson38c57360DecodeGithubComAlexStocksGettyRpc(l, v)
}
package rpc package rpc
import ( import (
"fmt"
"time" "time"
) )
import (
config "github.com/koding/multiconfig"
)
type ( type (
GettySessionParam struct { GettySessionParam struct {
CompressEncoding bool `default:"false" yaml:"compress_encoding" json:"compress_encoding,omitempty"` CompressEncoding bool `default:"false" yaml:"compress_encoding" json:"compress_encoding,omitempty"`
...@@ -76,3 +81,70 @@ type ( ...@@ -76,3 +81,70 @@ type (
GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"` GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"`
} }
) )
func loadClientConf(confFile string) *ClientConfig {
var err error
conf := new(ClientConfig)
config.MustLoadWithPath(confFile, conf)
conf.heartbeatPeriod, err = time.ParseDuration(conf.HeartbeatPeriod)
if err != nil {
panic(fmt.Sprintf("time.ParseDuration(HeartbeatPeroid{%#v}) = error{%v}", conf.HeartbeatPeriod, err))
}
conf.sessionTimeout, err = time.ParseDuration(conf.SessionTimeout)
if err != nil {
panic(fmt.Sprintf("time.ParseDuration(SessionTimeout{%#v}) = error{%v}", conf.SessionTimeout, err))
}
conf.failFastTimeout, err = time.ParseDuration(conf.FailFastTimeout)
if err != nil {
panic(fmt.Sprintf("time.ParseDuration(FailFastTimeout{%#v}) = error{%v}", conf.FailFastTimeout, err))
}
conf.GettySessionParam.keepAlivePeriod, err = time.ParseDuration(conf.GettySessionParam.KeepAlivePeriod)
if err != nil {
panic(fmt.Sprintf("time.ParseDuration(KeepAlivePeriod{%#v}) = error{%v}", conf.GettySessionParam.KeepAlivePeriod, err))
}
conf.GettySessionParam.tcpReadTimeout, err = time.ParseDuration(conf.GettySessionParam.TcpReadTimeout)
if err != nil {
panic(fmt.Sprintf("time.ParseDuration(TcpReadTimeout{%#v}) = error{%v}", conf.GettySessionParam.TcpReadTimeout, err))
}
conf.GettySessionParam.tcpWriteTimeout, err = time.ParseDuration(conf.GettySessionParam.TcpWriteTimeout)
if err != nil {
panic(fmt.Sprintf("time.ParseDuration(TcpWriteTimeout{%#v}) = error{%v}", conf.GettySessionParam.TcpWriteTimeout, err))
}
conf.GettySessionParam.waitTimeout, err = time.ParseDuration(conf.GettySessionParam.WaitTimeout)
if err != nil {
panic(fmt.Sprintf("time.ParseDuration(WaitTimeout{%#v}) = error{%v}", conf.GettySessionParam.WaitTimeout, err))
}
return conf
}
func loadServerConf(confFile string) *ServerConfig {
var err error
conf := new(ServerConfig)
config.MustLoadWithPath(confFile, conf)
conf.sessionTimeout, err = time.ParseDuration(conf.SessionTimeout)
if err != nil {
panic(fmt.Sprintf("time.ParseDuration(SessionTimeout{%#v}) = error{%v}", conf.SessionTimeout, err))
}
conf.failFastTimeout, err = time.ParseDuration(conf.FailFastTimeout)
if err != nil {
panic(fmt.Sprintf("time.ParseDuration(FailFastTimeout{%#v}) = error{%v}", conf.FailFastTimeout, err))
}
conf.GettySessionParam.keepAlivePeriod, err = time.ParseDuration(conf.GettySessionParam.KeepAlivePeriod)
if err != nil {
panic(fmt.Sprintf("time.ParseDuration(KeepAlivePeriod{%#v}) = error{%v}", conf.GettySessionParam.KeepAlivePeriod, err))
}
conf.GettySessionParam.tcpReadTimeout, err = time.ParseDuration(conf.GettySessionParam.TcpReadTimeout)
if err != nil {
panic(fmt.Sprintf("time.ParseDuration(TcpReadTimeout{%#v}) = error{%v}", conf.GettySessionParam.TcpReadTimeout, err))
}
conf.GettySessionParam.tcpWriteTimeout, err = time.ParseDuration(conf.GettySessionParam.TcpWriteTimeout)
if err != nil {
panic(fmt.Sprintf("time.ParseDuration(TcpWriteTimeout{%#v}) = error{%v}", conf.GettySessionParam.TcpWriteTimeout, err))
}
conf.GettySessionParam.waitTimeout, err = time.ParseDuration(conf.GettySessionParam.WaitTimeout)
if err != nil {
panic(fmt.Sprintf("time.ParseDuration(WaitTimeout{%#v}) = error{%v}", conf.GettySessionParam.WaitTimeout, err))
}
return conf
}
...@@ -9,7 +9,8 @@ import ( ...@@ -9,7 +9,8 @@ import (
) )
func main() { func main() {
client := rpc.NewClient() log.LoadConfiguration("client_log.xml")
client := rpc.NewClient("client_config.toml")
defer client.Close() defer client.Close()
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
......
# toml configure file # toml configure file
# toml中key的首字母可以小写,但是对应的golang中的struct成员首字母必须大写 # toml中key的首字母可以小写,但是对应的golang中的struct成员首字母必须大写
AppName = "ECHO-CLIENT" AppName = "RPC-CLIENT"
# host # host
LocalHost = "127.0.0.1" LocalHost = "127.0.0.1"
...@@ -14,7 +14,7 @@ ProfilePort = 10080 ...@@ -14,7 +14,7 @@ ProfilePort = 10080
# connection pool # connection pool
# 连接池连接数目 # 连接池连接数目
ConnectionNum = 2 ConnectionNum = 10
# session # session
# client与server之间连接的心跳周期 # client与server之间连接的心跳周期
...@@ -22,12 +22,6 @@ HeartbeatPeriod = "10s" ...@@ -22,12 +22,6 @@ HeartbeatPeriod = "10s"
# client与server之间连接的超时时间 # client与server之间连接的超时时间
SessionTimeout = "20s" SessionTimeout = "20s"
# client
# client echo request string
EchoString = "Hello, getty!"
# 发送echo请求次数
EchoTimes = 10000
# app fail fast # app fail fast
FailFastTimeout = "3s" FailFastTimeout = "3s"
...@@ -45,4 +39,4 @@ FailFastTimeout = "3s" ...@@ -45,4 +39,4 @@ FailFastTimeout = "3s"
TcpWriteTimeout = "5s" TcpWriteTimeout = "5s"
WaitTimeout = "1s" WaitTimeout = "1s"
MaxMsgLen = 128 MaxMsgLen = 128
SessionName = "echo-client" SessionName = "rpc-client"
...@@ -3,10 +3,12 @@ package main ...@@ -3,10 +3,12 @@ package main
import ( import (
"github.com/AlexStocks/getty/rpc" "github.com/AlexStocks/getty/rpc"
"github.com/AlexStocks/getty/rpc/example/data" "github.com/AlexStocks/getty/rpc/example/data"
log "github.com/AlexStocks/log4go"
) )
func main() { func main() {
srv := rpc.NewServer() log.LoadConfiguration("server_log.xml")
srv := rpc.NewServer("server_config.toml")
srv.Register(new(data.TestRpc)) srv.Register(new(data.TestRpc))
srv.Run() srv.Run()
} }
# toml configure file # toml configure file
# toml中key的首字母可以小写,但是对应的golang中的struct成员首字母必须大写 # toml中key的首字母可以小写,但是对应的golang中的struct成员首字母必须大写
AppName = "ECHO-SERVER" AppName = "RPC-SERVER"
Host = "127.0.0.1" Host = "127.0.0.1"
# Host = "192.168.35.1" # Host = "192.168.35.1"
...@@ -31,4 +31,4 @@ FailFastTimeout = "3s" ...@@ -31,4 +31,4 @@ FailFastTimeout = "3s"
TcpWriteTimeout = "5s" TcpWriteTimeout = "5s"
WaitTimeout = "1s" WaitTimeout = "1s"
MaxMsgLen = 128 MaxMsgLen = 128
SessionName = "echo-server" SessionName = "rpc-server"
package rpc package rpc
import ( import (
"encoding/json"
"reflect" "reflect"
"sync" "sync"
"time" "time"
...@@ -14,11 +13,6 @@ import ( ...@@ -14,11 +13,6 @@ import (
log "github.com/AlexStocks/log4go" log "github.com/AlexStocks/log4go"
) )
const (
CmdTypeErr = "err"
CmdTypeAck = "ack"
)
var ( var (
errTooManySessions = jerrors.New("too many echo sessions") errTooManySessions = jerrors.New("too many echo sessions")
) )
...@@ -37,8 +31,6 @@ type RpcServerHandler struct { ...@@ -37,8 +31,6 @@ type RpcServerHandler struct {
sessionTimeout time.Duration sessionTimeout time.Duration
sessionMap map[getty.Session]*rpcSession sessionMap map[getty.Session]*rpcSession
rwlock sync.RWMutex rwlock sync.RWMutex
sendLock sync.Mutex
} }
func NewRpcServerHandler(maxSessionNum int, sessionTimeout time.Duration) *RpcServerHandler { func NewRpcServerHandler(maxSessionNum int, sessionTimeout time.Duration) *RpcServerHandler {
...@@ -88,29 +80,23 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { ...@@ -88,29 +80,23 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
} }
h.rwlock.Unlock() h.rwlock.Unlock()
p, ok := pkg.(*GettyPackage) req, ok := pkg.(GettyRPCRequestPackage)
if !ok { if !ok {
log.Error("illegal packge{%#v}", pkg) log.Error("illegal packge{%#v}", pkg)
return return
} }
req, ok := p.B.(*GettyRPCRequest) // heartbeat
if !ok { if req.H.Command == gettyCmdHbRequest {
log.Error("illegal request{%#v}", p.B) h.replyCmd(session, req, gettyCmdHbResponse, "")
return return
} }
if p.H.Command == gettyCmdHbRequest {
h.replyCmd(session, p, gettyCmdHbResponse, "")
return
}
if req.header.CallType == gettyTwoWayNoReply { if req.header.CallType == gettyTwoWayNoReply {
h.replyCmd(session, p, gettyCmdRPCResponse, "") h.replyCmd(session, req, gettyCmdRPCResponse, "")
function := req.methodType.method.Func function := req.methodType.method.Func
function.Call([]reflect.Value{req.service.rcvr, req.argv, req.replyv}) function.Call([]reflect.Value{req.service.rcvr, req.argv, req.replyv})
return return
} }
h.callService(session, p, req.service, req.methodType, req.argv, req.replyv) h.callService(session, req, req.service, req.methodType, req.argv, req.replyv)
} }
func (h *RpcServerHandler) OnCron(session getty.Session) { func (h *RpcServerHandler) OnCron(session getty.Session) {
...@@ -138,45 +124,44 @@ func (h *RpcServerHandler) OnCron(session getty.Session) { ...@@ -138,45 +124,44 @@ func (h *RpcServerHandler) OnCron(session getty.Session) {
} }
} }
func (h *RpcServerHandler) replyCmd(session getty.Session, reqPkg *GettyPackage, cmd gettyCommand, err string) { func (h *RpcServerHandler) replyCmd(session getty.Session, req GettyRPCRequestPackage, cmd gettyCommand, err string) {
rspPkg := *reqPkg resp := GettyPackage{
rspPkg.H.Code = 0 H: req.H,
rspPkg.H.Command = gettyCmdRPCResponse }
resp.H.Command = cmd
if len(err) != 0 { if len(err) != 0 {
rspPkg.H.Code = GettyFail resp.H.Code = GettyFail
rspPkg.B = &GettyRPCResponse{ resp.B = &GettyRPCResponse{
header: GettyRPCResponseHeader{ header: GettyRPCResponseHeader{
Error: err, Error: err,
}, },
} }
} }
h.sendLock.Lock() session.WritePkg(resp, 5*time.Second)
defer h.sendLock.Unlock()
session.WritePkg(&rspPkg, 0)
} }
func (h *RpcServerHandler) callService(session getty.Session, reqPkg *GettyPackage, service *service, func (h *RpcServerHandler) callService(session getty.Session, req GettyRPCRequestPackage,
methodType *methodType, argv, replyv reflect.Value) { service *service, methodType *methodType, argv, replyv reflect.Value) {
function := methodType.method.Func function := methodType.method.Func
returnValues := function.Call([]reflect.Value{service.rcvr, argv, replyv}) returnValues := function.Call([]reflect.Value{service.rcvr, argv, replyv})
errInter := returnValues[0].Interface() errInter := returnValues[0].Interface()
if errInter != nil { if errInter != nil {
h.replyCmd(session, reqPkg, gettyCmdRPCResponse, errInter.(error).Error()) h.replyCmd(session, req, gettyCmdRPCResponse, errInter.(error).Error())
return return
} }
rspPkg := *reqPkg resp := GettyPackage{
rspPkg.H.Code = 0 H: req.H,
rspPkg.H.Command = gettyCmdRPCResponse }
rspPkg.B = &GettyRPCResponse{ resp.H.Code = GettyOK
resp.H.Command = gettyCmdRPCResponse
resp.B = &GettyRPCResponse{
body: replyv.Interface(), body: replyv.Interface(),
} }
h.sendLock.Lock()
defer h.sendLock.Unlock() session.WritePkg(resp, 5*time.Second)
session.WritePkg(&rspPkg, 0)
} }
//////////////////////////////////////////// ////////////////////////////////////////////
...@@ -207,7 +192,7 @@ func (h *RpcClientHandler) OnClose(session getty.Session) { ...@@ -207,7 +192,7 @@ func (h *RpcClientHandler) OnClose(session getty.Session) {
} }
func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
p, ok := pkg.(*GettyPackage) p, ok := pkg.(*GettyRPCResponsePackage)
if !ok { if !ok {
log.Error("illegal packge{%#v}", pkg) log.Error("illegal packge{%#v}", pkg)
return return
...@@ -216,24 +201,28 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { ...@@ -216,24 +201,28 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
h.client.updateSession(session) h.client.updateSession(session)
pendingResponse := h.client.RemovePendingResponse(p.H.Sequence) pendingResponse := h.client.RemovePendingResponse(p.H.Sequence)
if p.H.Command == gettyCmdHbResponse { if pendingResponse == nil {
return return
} }
if p.B == nil { if p.H.Command == gettyCmdHbResponse {
log.Error("response:{%#v} body is nil", p)
return return
} }
rsp, ok := p.B.(*GettyRPCResponse) if p.H.Code == GettyFail && len(p.header.Error) > 0 {
if !ok { pendingResponse.err = jerrors.New(p.header.Error)
log.Error("response body:{%#v} type is not *GettyRPCResponse", p.B) pendingResponse.done <- struct{}{}
return return
} }
if p.H.Code == GettyFail && len(rsp.header.Error) > 0 { codec := Codecs[p.H.CodecType]
pendingResponse.err = jerrors.New(rsp.header.Error) if codec == nil {
pendingResponse.err = jerrors.Errorf("can not find codec for %d", p.H.CodecType)
pendingResponse.done <- struct{}{}
return
} }
err := json.Unmarshal(rsp.body.([]byte), pendingResponse.reply) err := codec.Decode(p.body, pendingResponse.reply)
if err != nil { if err != nil {
pendingResponse.err = err pendingResponse.err = err
pendingResponse.done <- struct{}{}
return
} }
pendingResponse.done <- struct{}{} pendingResponse.done <- struct{}{}
} }
...@@ -251,5 +240,5 @@ func (h *RpcClientHandler) OnCron(session getty.Session) { ...@@ -251,5 +240,5 @@ func (h *RpcClientHandler) OnCron(session getty.Session) {
return return
} }
h.client.ping(session) h.client.heartbeat(session)
} }
...@@ -2,11 +2,12 @@ package rpc ...@@ -2,11 +2,12 @@ package rpc
import ( import (
"bytes" "bytes"
) "reflect"
import (
"github.com/AlexStocks/getty" "github.com/AlexStocks/getty"
log "github.com/AlexStocks/log4go" log "github.com/AlexStocks/log4go"
jerrors "github.com/juju/errors" jerrors "github.com/juju/errors"
) )
...@@ -25,7 +26,9 @@ func NewRpcServerPackageHandler(server *Server) *RpcServerPackageHandler { ...@@ -25,7 +26,9 @@ func NewRpcServerPackageHandler(server *Server) *RpcServerPackageHandler {
} }
func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) { func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
var pkg GettyPackage pkg := &GettyPackage{
B: NewGettyRPCRequest(),
}
buf := bytes.NewBuffer(data) buf := bytes.NewBuffer(data)
length, err := pkg.Unmarshal(buf) length, err := pkg.Unmarshal(buf)
...@@ -36,11 +39,48 @@ func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface ...@@ -36,11 +39,48 @@ func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface
return nil, 0, jerrors.Trace(err) return nil, 0, jerrors.Trace(err)
} }
return &pkg, length, nil req := GettyRPCRequestPackage{
H: pkg.H,
header: pkg.B.GetHeader().(GettyRPCRequestHeader),
}
if req.H.Command == gettyCmdHbRequest {
return req, length, nil
}
// get service & method
req.service = p.server.serviceMap[req.header.Service]
if req.service != nil {
req.methodType = req.service.method[req.header.Method]
}
if req.service == nil || req.methodType == nil {
return nil, 0, ErrNotFoundServiceOrMethod
}
// get args
argIsValue := false
if req.methodType.ArgType.Kind() == reflect.Ptr {
req.argv = reflect.New(req.methodType.ArgType.Elem())
} else {
req.argv = reflect.New(req.methodType.ArgType)
argIsValue = true
}
codec := Codecs[req.H.CodecType]
if codec == nil {
return nil, 0, jerrors.Errorf("can not find codec for %d", req.H.CodecType)
}
err = codec.Decode(pkg.B.GetBody(), req.argv.Interface())
if err != nil {
return nil, 0, jerrors.Trace(err)
}
if argIsValue {
req.argv = req.argv.Elem()
}
// get reply
req.replyv = reflect.New(req.methodType.ReplyType.Elem())
return req, length, nil
} }
func (p *RpcServerPackageHandler) Write(ss getty.Session, pkg interface{}) error { func (p *RpcServerPackageHandler) Write(ss getty.Session, pkg interface{}) error {
resp, ok := pkg.(*GettyPackage) resp, ok := pkg.(GettyPackage)
if !ok { if !ok {
log.Error("illegal pkg:%+v\n", pkg) log.Error("illegal pkg:%+v\n", pkg)
return jerrors.New("invalid rpc response") return jerrors.New("invalid rpc response")
...@@ -67,7 +107,9 @@ func NewRpcClientPackageHandler() *RpcClientPackageHandler { ...@@ -67,7 +107,9 @@ func NewRpcClientPackageHandler() *RpcClientPackageHandler {
} }
func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) { func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
var pkg GettyPackage pkg := &GettyPackage{
B: NewGettyRPCResponse(),
}
buf := bytes.NewBuffer(data) buf := bytes.NewBuffer(data)
length, err := pkg.Unmarshal(buf) length, err := pkg.Unmarshal(buf)
...@@ -78,11 +120,16 @@ func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface ...@@ -78,11 +120,16 @@ func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface
return nil, 0, jerrors.Trace(err) return nil, 0, jerrors.Trace(err)
} }
return &pkg, length, nil resp := &GettyRPCResponsePackage{
H: pkg.H,
header: pkg.B.GetHeader().(GettyRPCResponseHeader),
body: pkg.B.GetBody(),
}
return resp, length, nil
} }
func (p *RpcClientPackageHandler) Write(ss getty.Session, pkg interface{}) error { func (p *RpcClientPackageHandler) Write(ss getty.Session, pkg interface{}) error {
req, ok := pkg.(*GettyPackage) req, ok := pkg.(GettyPackage)
if !ok { if !ok {
log.Error("illegal pkg:%+v\n", pkg) log.Error("illegal pkg:%+v\n", pkg)
return jerrors.New("invalid rpc request") return jerrors.New("invalid rpc request")
......
...@@ -20,7 +20,6 @@ type methodType struct { ...@@ -20,7 +20,6 @@ type methodType struct {
method reflect.Method method reflect.Method
ArgType reflect.Type ArgType reflect.Type
ReplyType reflect.Type ReplyType reflect.Type
numCalls uint
} }
type service struct { type service struct {
......
...@@ -24,7 +24,8 @@ type Server struct { ...@@ -24,7 +24,8 @@ type Server struct {
tcpServerList []getty.Server tcpServerList []getty.Server
} }
func NewServer(conf *ServerConfig) *Server { func NewServer(confFile string) *Server {
conf := loadServerConf(confFile)
s := &Server{ s := &Server{
serviceMap: make(map[string]*service), serviceMap: make(map[string]*service),
conf: conf, conf: conf,
......
package rpc
import (
"math/rand"
"sync"
"time"
)
var (
seededIDGen = rand.New(rand.NewSource(time.Now().UnixNano()))
// The golang rand generators are *not* intrinsically thread-safe.
seededIDLock sync.Mutex
)
func randomID() uint64 {
seededIDLock.Lock()
defer seededIDLock.Unlock()
return uint64(seededIDGen.Int63())
}
...@@ -74,6 +74,8 @@ type session struct { ...@@ -74,6 +74,8 @@ type session struct {
// goroutines sync // goroutines sync
grNum int32 grNum int32
lock sync.RWMutex lock sync.RWMutex
pool *Pool
} }
func newSession(endPoint EndPoint, conn Connection) *session { func newSession(endPoint EndPoint, conn Connection) *session {
...@@ -245,6 +247,7 @@ func (s *session) SetRQLen(readQLen int) { ...@@ -245,6 +247,7 @@ func (s *session) SetRQLen(readQLen int) {
s.lock.Lock() s.lock.Lock()
s.rQ = make(chan interface{}, readQLen) s.rQ = make(chan interface{}, readQLen)
log.Info("%s, [session.SetRQLen] rQ{len:%d, cap:%d}", s.Stat(), len(s.rQ), cap(s.rQ)) log.Info("%s, [session.SetRQLen] rQ{len:%d, cap:%d}", s.Stat(), len(s.rQ), cap(s.rQ))
s.pool = NewPool(readQLen/2, 2, 1)
s.lock.Unlock() s.lock.Unlock()
} }
...@@ -485,7 +488,10 @@ LOOP: ...@@ -485,7 +488,10 @@ LOOP:
// read the s.rQ and assure (session)handlePackage gr will not block by (session)rQ. // read the s.rQ and assure (session)handlePackage gr will not block by (session)rQ.
if flag { if flag {
log.Debug("%#v <-s.rQ", inPkg) log.Debug("%#v <-s.rQ", inPkg)
s.listener.OnMessage(s, inPkg) pkg := inPkg
s.pool.ScheduleTimeout(s.wait, func() {
s.listener.OnMessage(s, pkg)
})
s.incReadPkgNum() s.incReadPkgNum()
} else { } else {
log.Info("[session.handleLoop] drop readin package{%#v}", inPkg) log.Info("[session.handleLoop] drop readin package{%#v}", inPkg)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment