Commit aaf21345 authored by AlexStocks's avatar AlexStocks

Mod: update codec

parent 8eec657e
# rpc
\ No newline at end of file
......@@ -23,6 +23,7 @@ var (
errInvalidAddress = jerrors.New("remote address invalid or empty")
errSessionNotExist = jerrors.New("session not exist")
errClientClosed = jerrors.New("client closed")
src = rand.NewSource(time.Now().UnixNano())
)
func init() {
......@@ -30,7 +31,7 @@ func init() {
}
type Client struct {
conf *Config
conf *ClientConfig
lock sync.RWMutex
sessions []*rpcSession
gettyClient getty.Client
......@@ -43,7 +44,7 @@ type Client struct {
sendLock sync.Mutex
}
func NewClient(conf *Config) *Client {
func NewClient(conf *ClientConfig) *Client {
c := &Client{
pendingResponses: make(map[uint64]*PendingResponse),
conf: conf,
......@@ -53,10 +54,16 @@ func NewClient(conf *Config) *Client {
),
}
c.gettyClient.RunEventLoop(c.newSession)
idx := 1
for {
idx++
if c.isAvailable() {
break
}
if idx > 12000 {
panic("failed to create client in 2 minutes")
}
time.Sleep(1e6)
}
log.Info("client init ok")
......@@ -70,7 +77,7 @@ func (c *Client) newSession(session getty.Session) error {
tcpConn *net.TCPConn
)
if conf.GettySessionParam.CompressEncoding {
if c.conf.GettySessionParam.CompressEncoding {
session.SetCompressType(getty.CompressZip)
}
......@@ -78,24 +85,24 @@ func (c *Client) newSession(session getty.Session) error {
panic(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection\n", session.Stat(), session.Conn()))
}
tcpConn.SetNoDelay(conf.GettySessionParam.TcpNoDelay)
tcpConn.SetKeepAlive(conf.GettySessionParam.TcpKeepAlive)
if conf.GettySessionParam.TcpKeepAlive {
tcpConn.SetKeepAlivePeriod(conf.GettySessionParam.keepAlivePeriod)
tcpConn.SetNoDelay(c.conf.GettySessionParam.TcpNoDelay)
tcpConn.SetKeepAlive(c.conf.GettySessionParam.TcpKeepAlive)
if c.conf.GettySessionParam.TcpKeepAlive {
tcpConn.SetKeepAlivePeriod(c.conf.GettySessionParam.keepAlivePeriod)
}
tcpConn.SetReadBuffer(conf.GettySessionParam.TcpRBufSize)
tcpConn.SetWriteBuffer(conf.GettySessionParam.TcpWBufSize)
tcpConn.SetReadBuffer(c.conf.GettySessionParam.TcpRBufSize)
tcpConn.SetWriteBuffer(c.conf.GettySessionParam.TcpWBufSize)
session.SetName(conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewRpcClientPacketHandler())
session.SetName(c.conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(c.conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewRpcClientPackageHandler())
session.SetEventListener(NewRpcClientHandler(c))
session.SetRQLen(conf.GettySessionParam.PkgRQSize)
session.SetWQLen(conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)
session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(conf.heartbeatPeriod.Nanoseconds() / 1e6))
session.SetWaitTime(conf.GettySessionParam.waitTimeout)
session.SetRQLen(c.conf.GettySessionParam.PkgRQSize)
session.SetWQLen(c.conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(c.conf.GettySessionParam.tcpReadTimeout)
session.SetWriteTimeout(c.conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(c.conf.heartbeatPeriod.Nanoseconds() / 1e6))
session.SetWaitTime(c.conf.GettySessionParam.waitTimeout)
log.Debug("client new session:%s\n", session.Stat())
return nil
......@@ -106,11 +113,12 @@ func (c *Client) Sequence() uint64 {
}
func (c *Client) Call(service, method string, args interface{}, reply interface{}) error {
req := NewRpcRequest(nil)
req := NewGettyRPCRequest(nil)
req.header.Service = service
req.header.Method = method
req.header.CallType = gettyTwoWay
if reply == nil {
req.header.CallType = RequestSendOnly
req.header.CallType = gettyTwoWayNoReply
}
req.body = args
......@@ -118,15 +126,16 @@ func (c *Client) Call(service, method string, args interface{}, reply interface{
resp.reply = reply
session := c.selectSession()
if session != nil {
if session == nil {
return errSessionNotExist
}
if err := c.transfer(session, req, resp); err != nil {
return err
return jerrors.Trace(err)
}
<-resp.done
return resp.err
}
return errSessionNotExist
return jerrors.Trace(resp.err)
}
func (c *Client) isAvailable() bool {
......@@ -250,34 +259,36 @@ func (c *Client) getClientRpcSession(session getty.Session) (rpcSession, error)
}
func (c *Client) ping(session getty.Session) error {
req := NewRpcRequest(nil)
req.header.Service = "go"
req.header.Method = "ping"
req.header.CallType = RequestSendOnly
req.body = nil
resp := NewPendingResponse()
return c.transfer(session, req, resp)
return c.transfer(session, nil, nil)
}
func (c *Client) transfer(session getty.Session, req *RpcRequest, resp *PendingResponse) error {
func (c *Client) transfer(session getty.Session, req *GettyRPCRequest, resp *PendingResponse) error {
var (
sequence uint64
err error
pkg GettyPackage
)
sequence = c.Sequence()
req.header.Seq = sequence
pkg.H.Magic = gettyPackageMagic
pkg.H.LogID = (uint32)(src.Int63())
pkg.H.Sequence = sequence
pkg.H.Command = gettyCmdHbRequest
if req != nil && resp != nil {
pkg.H.Command = gettyCmdRPCRequest
pkg.B = req
resp.seq = sequence
c.AddPendingResponse(resp)
}
c.sendLock.Lock()
defer c.sendLock.Unlock()
err = session.WritePkg(req, 0)
if err != nil {
err = session.WritePkg(pkg, 0)
if err != nil && resp != nil {
c.RemovePendingResponse(resp.seq)
}
return err
return jerrors.Trace(err)
}
func (c *Client) PendingResponseCount() int {
......@@ -285,6 +296,7 @@ func (c *Client) PendingResponseCount() int {
defer c.pendingLock.RUnlock()
return len(c.pendingResponses)
}
func (c *Client) AddPendingResponse(pr *PendingResponse) {
c.pendingLock.Lock()
defer c.pendingLock.Unlock()
......
package rpc
import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"reflect"
"unsafe"
)
import (
log "github.com/AlexStocks/log4go"
jerrors "github.com/juju/errors"
)
////////////////////////////////////////////
// getty command
////////////////////////////////////////////
type gettyCommand uint32
const (
gettyDefaultCmd gettyCommand = 0x00
gettyCmdHbRequest = 0x01
gettyCmdHbResponse = 0x02
gettyCmdRPCRequest = 0x03
gettyCmdRPCResponse = 0x04
)
var gettyCommandStrings = [...]string{
"getty-default",
"getty-heartbeat-request",
"getty-heartbeat-response",
"getty-request",
"getty-response",
}
func (c gettyCommand) String() string {
return gettyCommandStrings[c]
}
////////////////////////////////////////////
// getty call type
////////////////////////////////////////////
type gettyCallType uint32
const (
gettyOneWay gettyCallType = 0x01
gettyTwoWay = 0x02
gettyTwoWayNoReply = 0x03
)
////////////////////////////////////////////
// GettyPackageHandler
////////////////////////////////////////////
const (
gettyPackageMagic = 0x20160905
maxPackageLen = 1024 * 1024
ReplyTypeData = 0x01
ReplyTypeAck = 0x03
)
var (
ErrNotEnoughStream = jerrors.New("packet stream is not enough")
ErrTooLargePackage = jerrors.New("package length is exceed the getty package's legal maximum length.")
ErrNotFoundServiceOrMethod = jerrors.New("server invalid service or method")
ErrIllegalMagic = jerrors.New("package magic is not right.")
)
var (
gettyPackageHeaderLen int
gettyRPCRequestMinLen int
gettyRPCResponseMinLen int
)
func init() {
gettyPackageHeaderLen = (int)((uint)(unsafe.Sizeof(GettyPackageHeader{})))
gettyRPCRequestMinLen = (int)((uint)(unsafe.Sizeof(GettyRPCRequestHeader{}))) + 2
gettyRPCResponseMinLen = (int)((uint)(unsafe.Sizeof(GettyRPCResponseHeader{}))) + 2
}
type RPCPackage interface {
Marshal(*bytes.Buffer) error
// @buf length should be equal to GettyPkg.GettyPackageHeader.Len
Unmarshal(buf *bytes.Buffer) error
}
type GettyPackageHeader struct {
Magic uint32
LogID uint32 // log id
Sequence uint64 // request/response sequence
Command gettyCommand // operation command code
Code int32 // error code
ServiceID uint32 // service id
Len uint32 // body length
}
type GettyPackage struct {
H GettyPackageHeader
B RPCPackage
}
func (p GettyPackage) String() string {
return fmt.Sprintf("log id:%d, sequence:%d, command:%s",
p.H.LogID, p.H.Sequence, (gettyCommand(p.H.Command)).String())
}
func (p GettyPackage) Marshal() (*bytes.Buffer, error) {
var (
err error
length, size int
buf, buf0 *bytes.Buffer
)
buf = &bytes.Buffer{}
err = binary.Write(buf, binary.LittleEndian, p.H)
if err != nil {
return nil, err
}
if p.B != nil {
if err = p.B.Marshal(buf); err != nil {
return nil, jerrors.Trace(err)
}
length = buf.Len() - gettyPackageHeaderLen
size = (int)((uint)(unsafe.Sizeof(p.H.Len)))
buf0 = bytes.NewBuffer(buf.Bytes()[gettyPackageHeaderLen-size:])
binary.Write(buf0, binary.LittleEndian, length)
}
return buf, nil
}
func (p *GettyPackage) Unmarshal(buf *bytes.Buffer) (int, error) {
if buf.Len() < gettyPackageHeaderLen {
return 0, ErrNotEnoughStream
}
// header
err := binary.Read(buf, binary.LittleEndian, &(p.H))
if err != nil {
return 0, jerrors.Trace(err)
}
if p.H.Magic != gettyPackageMagic {
log.Error("@p.H.Magic{%x}, right magic{%x}", p.H.Magic, gettyPackageMagic)
return 0, ErrIllegalMagic
}
if buf.Len() < (int)(p.H.Len) {
return 0, ErrNotEnoughStream
}
if maxPackageLen < p.H.Len {
return 0, ErrTooLargePackage
}
if p.H.Len != 0 {
if err = p.B.Unmarshal(bytes.NewBuffer(buf.Next(int(p.H.Len)))); err != nil {
return 0, jerrors.Trace(err)
}
}
return (int)(p.H.Len) + gettyPackageHeaderLen, nil
}
////////////////////////////////////////////
// GettyRPCRequest
////////////////////////////////////////////
type GettyRPCRequestHeader struct {
Service string `json:"service,omitempty"`
Method string `json:"method,omitempty"`
CallType gettyCallType `json:"call_type,omitempty"`
}
type GettyRPCRequest struct {
server *Server
header GettyRPCRequestHeader
body interface{}
service *service
methodType *methodType
argv reflect.Value
replyv reflect.Value
}
// json rpc stream format
// |-- 2B (GettyRPCRequestHeader length) --|-- GettyRPCRequestHeader --|
func NewGettyRPCRequest(server *Server) *GettyRPCRequest {
return &GettyRPCRequest{
server: server,
}
}
func (req *GettyRPCRequest) Marshal(buf *bytes.Buffer) error {
headerData, err := json.Marshal(req.header)
if err != nil {
return jerrors.Trace(err)
}
bodyData, err := json.Marshal(req.body)
if err != nil {
return jerrors.Trace(err)
}
err = binary.Write(buf, binary.LittleEndian, uint16(len(headerData)))
if err != nil {
return jerrors.Trace(err)
}
err = binary.Write(buf, binary.LittleEndian, headerData)
if err != nil {
return jerrors.Trace(err)
}
err = binary.Write(buf, binary.LittleEndian, bodyData)
if err != nil {
return jerrors.Trace(err)
}
return nil
}
// @buf length should be equal to GettyPkg.GettyPackageHeader.Len
func (req *GettyRPCRequest) Unmarshal(buf *bytes.Buffer) error {
if buf.Len() < gettyRPCRequestMinLen {
return ErrNotEnoughStream
}
var headerLen uint16
err := binary.Read(buf, binary.LittleEndian, &headerLen)
if err != nil {
return jerrors.Trace(err)
}
header := buf.Next(int(headerLen))
body := buf.Next(buf.Len())
err = json.Unmarshal(header, &req.header)
if err != nil {
return jerrors.Trace(err)
}
// get service & method
req.service = req.server.serviceMap[req.header.Service]
if req.service != nil {
req.methodType = req.service.method[req.header.Method]
}
if req.service == nil || req.methodType == nil {
return ErrNotFoundServiceOrMethod
}
// get args
argIsValue := false
if req.methodType.ArgType.Kind() == reflect.Ptr {
req.argv = reflect.New(req.methodType.ArgType.Elem())
} else {
req.argv = reflect.New(req.methodType.ArgType)
argIsValue = true
}
err = json.Unmarshal(body, req.argv.Interface())
if err != nil {
return jerrors.Trace(err)
}
if argIsValue {
req.argv = req.argv.Elem()
}
// get reply
req.replyv = reflect.New(req.methodType.ReplyType.Elem())
return nil
}
////////////////////////////////////////////
// GettyRPCResponse
////////////////////////////////////////////
type GettyRPCResponseHeader struct {
Error string `json:"error,omitempty"`
}
type GettyRPCResponse struct {
header GettyRPCResponseHeader `json:"header,omitempty"`
body interface{} `json:"body,omitempty"`
}
func (resp *GettyRPCResponse) Marshal(buf *bytes.Buffer) error {
headerData, err := json.Marshal(resp.header)
if err != nil {
return jerrors.Trace(err)
}
bodyData, err := json.Marshal(resp.body)
if err != nil {
return jerrors.Trace(err)
}
err = binary.Write(buf, binary.LittleEndian, uint16(len(headerData)))
if err != nil {
return err
}
_, err = buf.Write(headerData)
if err != nil {
return jerrors.Trace(err)
}
_, err = buf.Write(bodyData)
if err != nil {
return jerrors.Trace(err)
}
return nil
}
// @buf length should be equal to GettyPkg.GettyPackageHeader.Len
func (resp *GettyRPCResponse) Unmarshal(buf *bytes.Buffer) error {
if buf.Len() < gettyRPCResponseMinLen {
return ErrNotEnoughStream
}
var headerLen uint16
err := binary.Read(buf, binary.LittleEndian, &headerLen)
if err != nil {
return err
}
header := buf.Next(int(headerLen))
if len(header) != int(headerLen) {
return ErrNotEnoughStream
}
body := buf.Next(int(buf.Len()))
if len(body) == 0 {
return ErrNotEnoughStream
}
resp.body = body
err = json.Unmarshal(header, resp.header)
if err != nil {
return jerrors.Trace(err)
}
return nil
}
////////////////////////////////////////////
// PendingResponse
////////////////////////////////////////////
type PendingResponse struct {
seq uint64
err error
reply interface{}
done chan struct{}
}
func NewPendingResponse() *PendingResponse {
return &PendingResponse{done: make(chan struct{})}
}
......@@ -6,54 +6,73 @@ import (
type (
GettySessionParam struct {
CompressEncoding bool `default:"false"`
TcpNoDelay bool `default:"true"`
TcpKeepAlive bool `default:"true"`
KeepAlivePeriod string `default:"180s"`
CompressEncoding bool `default:"false" yaml:"compress_encoding" json:"compress_encoding,omitempty"`
TcpNoDelay bool `default:"true" yaml:"tcp_no_delay" json:"tcp_no_delay,omitempty"`
TcpKeepAlive bool `default:"true" yaml:"tcp_keep_alive" json:"tcp_keep_alive,omitempty"`
KeepAlivePeriod string `default:"180s" yaml:"keep_alive_period" json:"keep_alive_period,omitempty"`
keepAlivePeriod time.Duration
TcpRBufSize int `default:"262144"`
TcpWBufSize int `default:"65536"`
PkgRQSize int `default:"1024"`
PkgWQSize int `default:"1024"`
TcpReadTimeout string `default:"1s"`
TcpRBufSize int `default:"262144" yaml:"tcp_r_buf_size" json:"tcp_r_buf_size,omitempty"`
TcpWBufSize int `default:"65536" yaml:"tcp_w_buf_size" json:"tcp_w_buf_size,omitempty"`
PkgRQSize int `default:"1024" yaml:"pkg_rq_size" json:"pkg_rq_size,omitempty"`
PkgWQSize int `default:"1024" yaml:"pkg_wq_size" json:"pkg_wq_size,omitempty"`
TcpReadTimeout string `default:"1s" yaml:"tcp_read_timeout" json:"tcp_read_timeout,omitempty"`
tcpReadTimeout time.Duration
TcpWriteTimeout string `default:"5s"`
TcpWriteTimeout string `default:"5s" yaml:"tcp_write_timeout" json:"tcp_write_timeout,omitempty"`
tcpWriteTimeout time.Duration
WaitTimeout string `default:"7s"`
WaitTimeout string `default:"7s" yaml:"wait_timeout" json:"wait_timeout,omitempty"`
waitTimeout time.Duration
MaxMsgLen int `default:"1024"`
SessionName string `default:"echo-client"`
MaxMsgLen int `default:"1024" yaml:"max_msg_len" json:"max_msg_len,omitempty"`
SessionName string `default:"rpc" yaml:"session_name" json:"session_name,omitempty"`
}
// Config holds supported types by the multiconfig package
Config struct {
ServerConfig struct {
// local address
AppName string `default:"echo-server"`
Host string `default:"127.0.0.1"`
Ports []string `default:["10000"]`
AppName string `default:"rcp-server" yaml:"app_name" json:"app_name,omitempty"`
Host string `default:"127.0.0.1" yaml:"host" json:"host,omitempty"`
Ports []string `yaml:"ports" json:"ports,omitempty"` // `default:["10000"]`
// session
SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"`
sessionTimeout time.Duration
SessionNumber int `default:"1000" yaml:"session_number" json:"session_number,omitempty"`
// app
FailFastTimeout string `default:"5s" yaml:"fail_fast_timeout" json:"fail_fast_timeout,omitempty"`
failFastTimeout time.Duration
// session tcp parameters
GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"`
}
// Config holds supported types by the multiconfig package
ClientConfig struct {
// local address
AppName string `default:"rcp-client" yaml:"app_name" json:"app_name,omitempty"`
Host string `default:"127.0.0.1" yaml:"host" json:"host,omitempty"`
Ports []string `yaml:"ports" json:"ports,omitempty"` // `default:["10000"]`
// server
ServerHost string `default:"127.0.0.1"`
ServerPort int `default:"10000"`
ProfilePort int `default:"10086"`
ServerHost string `default:"127.0.0.1" yaml:"server_host" json:"server_host,omitempty"`
ServerPort int `default:"10000" yaml:"server_port" json:"server_port,omitempty"`
ProfilePort int `default:"10086" yaml:"profile_port" json:"profile_port,omitempty"`
// session pool
ConnectionNum int `default:"16"`
ConnectionNum int `default:"16" yaml:"connection_num" json:"connection_num,omitempty"`
// heartbeat
HeartbeatPeriod string `default:"15s"`
HeartbeatPeriod string `default:"15s" yaml:"heartbeat_period" json:"heartbeat_period,omitempty"`
heartbeatPeriod time.Duration
// session
SessionTimeout string `default:"60s"`
SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"`
sessionTimeout time.Duration
SessionNumber int `default:"1000"`
// app
FailFastTimeout string `default:"5s"`
FailFastTimeout string `default:"5s" yaml:"fail_fast_timeout" json:"fail_fast_timeout,omitempty"`
failFastTimeout time.Duration
// session tcp parameters
GettySessionParam GettySessionParam `required:"true"`
GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"`
}
)
......@@ -15,7 +15,6 @@ import (
)
const (
CmdTypePing = "ping"
CmdTypeErr = "err"
CmdTypeAck = "ack"
)
......@@ -26,10 +25,13 @@ var (
type rpcSession struct {
session getty.Session
active time.Time
reqNum int32
}
////////////////////////////////////////////
// RpcServerHandler
////////////////////////////////////////////
type RpcServerHandler struct {
maxSessionNum int
sessionTimeout time.Duration
......@@ -80,22 +82,35 @@ func (h *RpcServerHandler) OnClose(session getty.Session) {
}
func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
p, ok := pkg.(*RpcRequest)
h.rwlock.Lock()
if _, ok := h.sessionMap[session]; ok {
h.sessionMap[session].reqNum++
}
h.rwlock.Unlock()
p, ok := pkg.(*GettyPackage)
if !ok {
log.Error("illegal packge{%#v}", pkg)
return
}
req, ok := p.B.(*GettyRPCRequest)
if !ok {
log.Error("illegal request{%#v}", p.B)
return
}
if p.header.IsPing() {
h.replyCmd(session, p.header.Seq, "", CmdTypePing)
if p.H.Command == gettyCmdHbRequest {
h.replyCmd(session, p, gettyCmdHbResponse, "")
return
}
if p.header.CallType == RequestSendOnly {
h.asyncCallService(session, p.header.Seq, p.service, p.methodType, p.argv, p.replyv)
if req.header.CallType == gettyTwoWayNoReply {
h.replyCmd(session, p, gettyCmdRPCResponse, "")
function := req.methodType.method.Func
function.Call([]reflect.Value{req.service.rcvr, req.argv, req.replyv})
return
}
h.callService(session, p.header.Seq, p.service, p.methodType, p.argv, p.replyv)
h.callService(session, p, req.service, req.methodType, req.argv, req.replyv)
}
func (h *RpcServerHandler) OnCron(session getty.Session) {
......@@ -103,6 +118,7 @@ func (h *RpcServerHandler) OnCron(session getty.Session) {
flag bool
active time.Time
)
h.rwlock.RLock()
if _, ok := h.sessionMap[session]; ok {
active = session.GetActive()
......@@ -113,6 +129,7 @@ func (h *RpcServerHandler) OnCron(session getty.Session) {
}
}
h.rwlock.RUnlock()
if flag {
h.rwlock.Lock()
delete(h.sessionMap, session)
......@@ -121,57 +138,56 @@ func (h *RpcServerHandler) OnCron(session getty.Session) {
}
}
func (h *RpcServerHandler) replyCmd(session getty.Session, seq uint64, err string, cmd string) {
resp := NewRpcResponse()
resp.header.Seq = seq
switch cmd {
case CmdTypePing:
resp.header.ReplyType = ReplyTypePong
case CmdTypeAck:
resp.header.ReplyType = ReplyTypeAck
case CmdTypeErr:
resp.header.ReplyType = ReplyTypeAck
resp.header.Error = err
func (h *RpcServerHandler) replyCmd(session getty.Session, reqPkg *GettyPackage, cmd gettyCommand, err string) {
rspPkg := *reqPkg
rspPkg.H.Code = 0
rspPkg.H.Command = gettyCmdRPCResponse
if len(err) != 0 {
rspPkg.B = &GettyRPCResponse{
header: GettyRPCResponseHeader{
Error: err,
},
}
}
h.sendLock.Lock()
defer h.sendLock.Unlock()
session.WritePkg(resp, 0)
session.WritePkg(&rspPkg, 0)
}
func (h *RpcServerHandler) asyncCallService(session getty.Session, seq uint64, service *service, methodType *methodType, argv, replyv reflect.Value) {
h.replyCmd(session, seq, "", CmdTypeAck)
function := methodType.method.Func
function.Call([]reflect.Value{service.rcvr, argv, replyv})
return
}
func (h *RpcServerHandler) callService(session getty.Session, reqPkg *GettyPackage, service *service,
methodType *methodType, argv, replyv reflect.Value) {
func (h *RpcServerHandler) callService(session getty.Session, seq uint64, service *service, methodType *methodType, argv, replyv reflect.Value) {
function := methodType.method.Func
returnValues := function.Call([]reflect.Value{service.rcvr, argv, replyv})
errInter := returnValues[0].Interface()
resp := NewRpcResponse()
resp.header.ReplyType = ReplyTypeData
resp.header.Seq = seq
if errInter != nil {
h.replyCmd(session, seq, errInter.(error).Error(), CmdTypeErr)
h.replyCmd(session, reqPkg, gettyCmdRPCResponse, errInter.(error).Error())
return
}
resp.body = replyv.Interface()
rspPkg := *reqPkg
rspPkg.H.Code = 0
rspPkg.H.Command = gettyCmdRPCResponse
rspPkg.B = &GettyRPCResponse{
body: replyv.Interface(),
}
h.sendLock.Lock()
defer h.sendLock.Unlock()
session.WritePkg(resp, 0)
session.WritePkg(&rspPkg, 0)
}
////////////////////////////////////////////
// RpcClientHandler
////////////////////////////////////////////
type RpcClientHandler struct {
client *Client
}
func NewRpcClientHandler(client *Client) *RpcClientHandler {
h := &RpcClientHandler{
client: client,
}
return h
return &RpcClientHandler{client: client}
}
func (h *RpcClientHandler) OnOpen(session getty.Session) error {
......@@ -190,7 +206,7 @@ func (h *RpcClientHandler) OnClose(session getty.Session) {
}
func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
p, ok := pkg.(*RpcResponse)
p, ok := pkg.(*GettyPackage)
if !ok {
log.Error("illegal packge{%#v}", pkg)
return
......@@ -198,14 +214,23 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
log.Debug("get rpc response{%s}", p)
h.client.updateSession(session)
pendingResponse := h.client.RemovePendingResponse(p.header.Seq)
if p.header.ReplyType == ReplyTypePong {
pendingResponse := h.client.RemovePendingResponse(p.H.Sequence)
if p.H.Command == gettyCmdHbResponse {
return
}
if p.B == nil {
log.Error("response:{%#v} body is nil", p)
return
}
rsp, ok := p.B.(*GettyRPCResponse)
if !ok {
log.Error("response body:{%#v} type is not *GettyRPCResponse", p.B)
return
}
if len(p.header.Error) > 0 {
pendingResponse.err = jerrors.New(p.header.Error)
if len(rsp.header.Error) > 0 {
pendingResponse.err = jerrors.New(rsp.header.Error)
}
err := json.Unmarshal(p.body.([]byte), pendingResponse.reply)
err := json.Unmarshal(rsp.body.([]byte), pendingResponse.reply)
if err != nil {
pendingResponse.err = err
}
......@@ -218,7 +243,7 @@ func (h *RpcClientHandler) OnCron(session getty.Session) {
log.Error("client.getClientSession(session{%s}) = error{%#v}", session.Stat(), err)
return
}
if conf.sessionTimeout.Nanoseconds() < time.Since(session.GetActive()).Nanoseconds() {
if h.client.conf.sessionTimeout.Nanoseconds() < time.Since(session.GetActive()).Nanoseconds() {
log.Warn("session{%s} timeout{%s}, reqNum{%d}",
session.Stat(), time.Since(session.GetActive()).String(), rpcSession.reqNum)
h.client.removeSession(session)
......
package rpc
import (
"bytes"
"encoding/binary"
"encoding/json"
"reflect"
)
import (
jerrors "github.com/juju/errors"
)
const (
MaxPacketLen = 16 * 1024
RequestSendOnly int16 = 1
ReplyTypeData = 0x01
ReplyTypePong = 0x10
ReplyTypeAck = 0x100
)
var (
ErrNotEnoughStream = jerrors.New("packet stream is not enough")
ErrTooLargePackage = jerrors.New("package length is exceed the echo package's legal maximum length.")
ErrNotFoundServiceOrMethod = jerrors.New("server invalid service or method")
)
type RequestHeader struct {
Seq uint64
Service string
Method string
CallType int16
}
func NewRequestHeader() *RequestHeader {
return &RequestHeader{}
}
func (reqHeader *RequestHeader) IsPing() bool {
if reqHeader.Service == "go" && reqHeader.Method == "ping" {
return true
}
return false
}
type RpcRequest struct {
server *Server
header *RequestHeader
body interface{}
service *service
methodType *methodType
argv reflect.Value
replyv reflect.Value
}
func NewRpcRequest(server *Server) *RpcRequest {
return &RpcRequest{
server: server,
header: NewRequestHeader(),
}
}
func (req *RpcRequest) Marshal() (*bytes.Buffer, error) {
var err error
var buf *bytes.Buffer
buf = &bytes.Buffer{}
headerData, err := json.Marshal(req.header)
if err != nil {
return nil, err
}
bodyData, err := json.Marshal(req.body)
if err != nil {
return nil, err
}
//前2字节总长度,header长度2字节+header数据,body长度2字节+body数据
packLen := 2 + 2 + len(headerData) + 2 + len(bodyData)
err = binary.Write(buf, binary.LittleEndian, uint16(packLen))
if err != nil {
return nil, err
}
err = binary.Write(buf, binary.LittleEndian, uint16(len(headerData)))
if err != nil {
return nil, err
}
err = binary.Write(buf, binary.LittleEndian, headerData)
if err != nil {
return nil, err
}
err = binary.Write(buf, binary.LittleEndian, uint16(len(bodyData)))
if err != nil {
return nil, err
}
err = binary.Write(buf, binary.LittleEndian, bodyData)
if err != nil {
return nil, err
}
return buf, nil
}
func (req *RpcRequest) Unmarshal(buf *bytes.Buffer) (int, error) {
var err error
if buf.Len() < 7 {
return 0, ErrNotEnoughStream
}
var packLen uint16
err = binary.Read(buf, binary.LittleEndian, &packLen)
if err != nil {
return 0, err
}
if packLen > MaxPacketLen {
return 0, ErrTooLargePackage
}
var headerLen uint16
err = binary.Read(buf, binary.LittleEndian, &headerLen)
if err != nil {
return 0, err
}
header := make([]byte, headerLen)
err = binary.Read(buf, binary.LittleEndian, header)
if err != nil {
return 0, err
}
var bodyLen uint16
err = binary.Read(buf, binary.LittleEndian, &bodyLen)
if err != nil {
return 0, err
}
body := make([]byte, bodyLen)
err = binary.Read(buf, binary.LittleEndian, body)
if err != nil {
return 0, err
}
err = json.Unmarshal(header, req.header)
if err != nil {
return 0, err
}
if req.header.IsPing() {
return int(packLen), nil
}
req.service = req.server.serviceMap[req.header.Service]
if req.service != nil {
req.methodType = req.service.method[req.header.Method]
}
if req.service == nil || req.methodType == nil {
return 0, ErrNotFoundServiceOrMethod
}
argIsValue := false
if req.methodType.ArgType.Kind() == reflect.Ptr {
req.argv = reflect.New(req.methodType.ArgType.Elem())
} else {
req.argv = reflect.New(req.methodType.ArgType)
argIsValue = true
}
err = json.Unmarshal(body, req.argv.Interface())
if err != nil {
return 0, err
}
if argIsValue {
req.argv = req.argv.Elem()
}
req.replyv = reflect.New(req.methodType.ReplyType.Elem())
return int(packLen), nil
}
type ResponseHeader struct {
Seq uint64
ReplyType int16
Error string
}
func NewResponseHeader() *ResponseHeader {
return &ResponseHeader{}
}
type RpcResponse struct {
header *ResponseHeader
body interface{}
}
func NewRpcResponse() *RpcResponse {
r := &RpcResponse{
header: NewResponseHeader(),
}
return r
}
func (resp *RpcResponse) Marshal() (*bytes.Buffer, error) {
var err error
var buf *bytes.Buffer
buf = &bytes.Buffer{}
headerData, err := json.Marshal(resp.header)
if err != nil {
return nil, err
}
bodyData, err := json.Marshal(resp.body)
if err != nil {
return nil, err
}
//前2字节总长度,header长度2字节+header数据,body长度2字节+body数据
packLen := 2 + 2 + len(headerData) + 2 + len(bodyData)
err = binary.Write(buf, binary.LittleEndian, uint16(packLen))
if err != nil {
return nil, err
}
err = binary.Write(buf, binary.LittleEndian, uint16(len(headerData)))
if err != nil {
return nil, err
}
err = binary.Write(buf, binary.LittleEndian, headerData)
if err != nil {
return nil, err
}
err = binary.Write(buf, binary.LittleEndian, uint16(len(bodyData)))
if err != nil {
return nil, err
}
err = binary.Write(buf, binary.LittleEndian, bodyData)
if err != nil {
return nil, err
}
return buf, nil
}
func (resp *RpcResponse) Unmarshal(buf *bytes.Buffer) (int, error) {
var err error
if buf.Len() < 7 {
return 0, ErrNotEnoughStream
}
var packLen uint16
err = binary.Read(buf, binary.LittleEndian, &packLen)
if err != nil {
return 0, err
}
if packLen > MaxPacketLen {
return 0, ErrTooLargePackage
}
var headerLen uint16
err = binary.Read(buf, binary.LittleEndian, &headerLen)
if err != nil {
return 0, err
}
header := make([]byte, headerLen)
err = binary.Read(buf, binary.LittleEndian, header)
if err != nil {
return 0, err
}
var bodyLen uint16
err = binary.Read(buf, binary.LittleEndian, &bodyLen)
if err != nil {
return 0, err
}
body := make([]byte, bodyLen)
err = binary.Read(buf, binary.LittleEndian, body)
if err != nil {
return 0, err
}
resp.body = body
err = json.Unmarshal(header, resp.header)
if err != nil {
return 0, err
}
// err = json.Unmarshal(body, resp.body)
// if err != nil {
// return 0, err
// }
return int(packLen), nil
}
type PendingResponse struct {
seq uint64
err error
reply interface{}
done chan struct{}
}
func NewPendingResponse() *PendingResponse {
return &PendingResponse{done: make(chan struct{})}
}
......@@ -10,44 +10,49 @@ import (
jerrors "github.com/juju/errors"
)
type RpcServerPacketHandler struct {
////////////////////////////////////////////
// RpcServerPackageHandler
////////////////////////////////////////////
type RpcServerPackageHandler struct {
server *Server
}
func NewRpcServerPacketHandler(server *Server) *RpcServerPacketHandler {
return &RpcServerPacketHandler{
func NewRpcServerPackageHandler(server *Server) *RpcServerPackageHandler {
return &RpcServerPackageHandler{
server: server,
}
}
func (p *RpcServerPacketHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
var (
err error
len int
length int
pkg GettyPackage
buf *bytes.Buffer
)
buf = bytes.NewBuffer(data)
req := NewRpcRequest(p.server)
len, err = req.Unmarshal(buf)
length, err = pkg.Unmarshal(buf)
if err != nil {
if err == ErrNotEnoughStream {
if jerrors.Cause(err) == ErrNotEnoughStream {
return nil, 0, nil
}
return nil, 0, err
return nil, 0, jerrors.Trace(err)
}
return req, len, nil
return &pkg, length, nil
}
func (p *RpcServerPacketHandler) Write(ss getty.Session, pkg interface{}) error {
func (p *RpcServerPackageHandler) Write(ss getty.Session, pkg interface{}) error {
var (
ok bool
err error
resp *RpcResponse
resp *GettyPackage
buf *bytes.Buffer
)
if resp, ok = pkg.(*RpcResponse); !ok {
if resp, ok = pkg.(*GettyPackage); !ok {
log.Error("illegal pkg:%+v\n", pkg)
return jerrors.New("invalid rpc response")
}
......@@ -55,49 +60,52 @@ func (p *RpcServerPacketHandler) Write(ss getty.Session, pkg interface{}) error
buf, err = resp.Marshal()
if err != nil {
log.Warn("binary.Write(resp{%#v}) = err{%#v}", resp, err)
return err
return jerrors.Trace(err)
}
err = ss.WriteBytes(buf.Bytes())
return err
return jerrors.Trace(ss.WriteBytes(buf.Bytes()))
}
type RpcClientPacketHandler struct {
////////////////////////////////////////////
// RpcClientPackageHandler
////////////////////////////////////////////
type RpcClientPackageHandler struct {
}
func NewRpcClientPacketHandler() *RpcClientPacketHandler {
return &RpcClientPacketHandler{}
func NewRpcClientPackageHandler() *RpcClientPackageHandler {
return &RpcClientPackageHandler{}
}
func (p *RpcClientPacketHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
var (
err error
len int
length int
pkg GettyPackage
buf *bytes.Buffer
)
buf = bytes.NewBuffer(data)
resp := NewRpcResponse()
len, err = resp.Unmarshal(buf)
length, err = pkg.Unmarshal(buf)
if err != nil {
if err == ErrNotEnoughStream {
return nil, 0, nil
}
return nil, 0, err
return nil, 0, jerrors.Trace(err)
}
return resp, len, nil
return &pkg, length, nil
}
func (p *RpcClientPacketHandler) Write(ss getty.Session, pkg interface{}) error {
func (p *RpcClientPackageHandler) Write(ss getty.Session, pkg interface{}) error {
var (
ok bool
err error
req *RpcRequest
req *GettyPackage
buf *bytes.Buffer
)
if req, ok = pkg.(*RpcRequest); !ok {
if req, ok = pkg.(*GettyPackage); !ok {
log.Error("illegal pkg:%+v\n", pkg)
return jerrors.New("invalid rpc request")
}
......@@ -105,10 +113,8 @@ func (p *RpcClientPacketHandler) Write(ss getty.Session, pkg interface{}) error
buf, err = req.Marshal()
if err != nil {
log.Warn("binary.Write(req{%#v}) = err{%#v}", req, err)
return err
return jerrors.Trace(err)
}
err = ss.WriteBytes(buf.Bytes())
return err
return jerrors.Trace(ss.WriteBytes(buf.Bytes()))
}
......@@ -46,8 +46,8 @@ func isExportedOrBuiltinType(t reflect.Type) bool {
return isExported(t.Name()) || t.PkgPath() == ""
}
// prepareMethods returns suitable Rpc methods of typ
func prepareMethods(typ reflect.Type) map[string]*methodType {
// suitableMethods returns suitable Rpc methods of typ
func suitableMethods(typ reflect.Type) map[string]*methodType {
methods := make(map[string]*methodType)
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
......
......@@ -19,12 +19,12 @@ import (
)
type Server struct {
tcpServerList []getty.Server
conf *ServerConfig
serviceMap map[string]*service
conf *Config
tcpServerList []getty.Server
}
func NewServer(conf *Config) *Server {
func NewServer(conf *ServerConfig) *Server {
s := &Server{
serviceMap: make(map[string]*service),
conf: conf,
......@@ -64,7 +64,7 @@ func (s *Server) Register(rcvr interface{}) error {
if len(svc.method) == 0 {
// To help the user, see if a pointer receiver would work.
method := prepareMethods(reflect.PtrTo(svc.typ))
method := suitableMethods(reflect.PtrTo(svc.typ))
str := "rpc.Register: type " + svc.name + " has no exported methods of suitable type"
if len(method) != 0 {
str = "rpc.Register: type " + svc.name + " has no exported methods of suitable type (" +
......@@ -104,8 +104,8 @@ func (s *Server) newSession(session getty.Session) error {
session.SetName(s.conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(s.conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewRpcServerPacketHandler(s)) //
session.SetEventListener(NewRpcServerHandler()) //
session.SetPkgHandler(NewRpcServerPacketHandler(s))
session.SetEventListener(NewRpcServerHandler(s.conf.SessionNumber, s.conf.sessionTimeout))
session.SetRQLen(s.conf.GettySessionParam.PkgRQSize)
session.SetWQLen(s.conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(s.conf.GettySessionParam.tcpReadTimeout)
......@@ -133,7 +133,6 @@ func (s *Server) Init() {
tcpServer = getty.NewTCPServer(
getty.WithLocalAddress(addr),
)
// run s
tcpServer.RunEventLoop(s.newSession)
log.Debug("s bind addr{%s} ok!", addr)
s.tcpServerList = append(s.tcpServerList, tcpServer)
......@@ -147,9 +146,8 @@ func (s *Server) Stop() {
}
func (s *Server) initSignal() {
// signal.Notify的ch信道是阻塞的(signal.Notify不会阻塞发送信号), 需要设置缓冲
signals := make(chan os.Signal, 1)
// It is not possible to block SIGKILL or syscall.SIGSTOP
// It is impossible to block SIGKILL or syscall.SIGSTOP
signal.Notify(signals, os.Interrupt, os.Kill, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
for {
sig := <-signals
......@@ -159,15 +157,12 @@ func (s *Server) initSignal() {
// reload()
default:
go time.AfterFunc(s.conf.failFastTimeout, func() {
// log.Warn("app exit now by force...")
// os.Exit(1)
log.Exit("app exit now by force...")
log.Close()
})
// 要么survialTimeout时间内执行完毕下面的逻辑然后程序退出,要么执行上面的超时函数程序强行退出
// if @s can not stop in s.conf.failFastTimeout, getty will Force Quit.
s.Stop()
// fmt.Println("app exit now...")
log.Exit("app exit now...")
log.Close()
return
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment