Commit 82715a2e authored by AlexStocks's avatar AlexStocks

U: add CallOptions & AsyncCall

parent 8471e55d
...@@ -24,6 +24,39 @@ func init() { ...@@ -24,6 +24,39 @@ func init() {
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
} }
type CallOptions struct {
// request timeout
RequestTimeout time.Duration
// response timeout
ResponseTimeout time.Duration
Meta map[interface{}]interface{}
}
type CallOption func(*CallOptions)
func CallRequestTimeout(d time.Duration) CallOption {
return func(o *CallOptions) {
o.RequestTimeout = d
}
}
func CallResponseTimeout(d time.Duration) CallOption {
return func(o *CallOptions) {
o.ResponseTimeout = d
}
}
func CallMeta(k, v interface{}) CallOption {
return func(o *CallOptions) {
if o.Meta == nil {
o.Meta = make(map[interface{}]interface{})
}
o.Meta[k] = v
}
}
type AsyncHandler func(reply interface{}, opts CallOptions)
type Client struct { type Client struct {
conf ClientConfig conf ClientConfig
pool *gettyRPCClientPool pool *gettyRPCClientPool
...@@ -47,7 +80,52 @@ func NewClient(conf *ClientConfig) (*Client, error) { ...@@ -47,7 +80,52 @@ func NewClient(conf *ClientConfig) (*Client, error) {
return c, nil return c, nil
} }
func (c *Client) Call(typ CodecType, addr, service, method string, args interface{}, reply interface{}) error { func (c *Client) Notify(typ CodecType, addr, service, method string, args interface{}, opts ...CallOption) error {
var copts CallOptions
for _, o := range opts {
o(&copts)
}
return jerrors.Trace(c.call(CT_OneWay, typ, addr, service, method, args, nil, nil, copts))
}
// if @reply is nil, the transport layer will get the response without notify the invoker.
func (c *Client) Call(typ CodecType, addr, service, method string, args, reply interface{}, opts ...CallOption) error {
var copts CallOptions
for _, o := range opts {
o(&copts)
}
ct := CT_TwoWay
if reply == nil {
ct = CT_TwoWayNoReply
}
return jerrors.Trace(c.call(ct, typ, addr, service, method, args, reply, nil, copts))
}
func (c *Client) AsyncCall(typ CodecType, addr, service, method string,
args, reply interface{}, handler AsyncHandler, opts ...CallOption) error {
var copts CallOptions
for _, o := range opts {
o(&copts)
}
return jerrors.Trace(c.call(CT_TwoWay, typ, addr, service, method, args, reply, handler, copts))
}
func (c *Client) call(ct CallType, typ CodecType, addr, service, method string,
args, reply interface{}, handler AsyncHandler, opts CallOptions) error {
if opts.RequestTimeout == 0 {
opts.RequestTimeout = c.conf.GettySessionParam.tcpWriteTimeout
}
if opts.ResponseTimeout == 0 {
opts.ResponseTimeout = c.conf.GettySessionParam.tcpReadTimeout
}
if !typ.CheckValidity() { if !typ.CheckValidity() {
return errInvalidCodecType return errInvalidCodecType
} }
...@@ -55,14 +133,16 @@ func (c *Client) Call(typ CodecType, addr, service, method string, args interfac ...@@ -55,14 +133,16 @@ func (c *Client) Call(typ CodecType, addr, service, method string, args interfac
b := &GettyRPCRequest{} b := &GettyRPCRequest{}
b.header.Service = service b.header.Service = service
b.header.Method = method b.header.Method = method
b.header.CallType = CT_TwoWay b.header.CallType = ct
if reply == nil {
b.header.CallType = CT_TwoWayNoReply
}
b.body = args b.body = args
rsp := NewPendingResponse() var rsp *PendingResponse
rsp.reply = reply if ct != CT_OneWay {
rsp = NewPendingResponse()
rsp.reply = reply
rsp.handler = handler
rsp.opts = opts
}
var ( var (
err error err error
...@@ -75,12 +155,16 @@ func (c *Client) Call(typ CodecType, addr, service, method string, args interfac ...@@ -75,12 +155,16 @@ func (c *Client) Call(typ CodecType, addr, service, method string, args interfac
} }
defer c.pool.release(conn, err) defer c.pool.release(conn, err)
if err = c.transfer(session, typ, b, rsp); err != nil { if err = c.transfer(session, typ, b, rsp, opts); err != nil {
return jerrors.Trace(err) return jerrors.Trace(err)
} }
if ct == CT_OneWay || handler != nil {
return nil
}
select { select {
case <-getty.GetTimeWheel().After(c.conf.GettySessionParam.tcpReadTimeout): case <-getty.GetTimeWheel().After(opts.ResponseTimeout):
err = errClientReadTimeout err = errClientReadTimeout
c.removePendingResponse(SequenceType(rsp.seq)) c.removePendingResponse(SequenceType(rsp.seq))
case <-rsp.done: case <-rsp.done:
...@@ -106,11 +190,12 @@ func (c *Client) selectSession(typ CodecType, addr string) (*gettyRPCClient, get ...@@ -106,11 +190,12 @@ func (c *Client) selectSession(typ CodecType, addr string) (*gettyRPCClient, get
} }
func (c *Client) heartbeat(session getty.Session, typ CodecType) error { func (c *Client) heartbeat(session getty.Session, typ CodecType) error {
rsp := NewPendingResponse() return c.transfer(session, typ, nil, NewPendingResponse(), CallOptions{})
return c.transfer(session, typ, nil, rsp)
} }
func (c *Client) transfer(session getty.Session, typ CodecType, req *GettyRPCRequest, rsp *PendingResponse) error { func (c *Client) transfer(session getty.Session, typ CodecType,req *GettyRPCRequest,
rsp *PendingResponse, opts CallOptions) error {
var ( var (
sequence uint64 sequence uint64
err error err error
...@@ -127,11 +212,13 @@ func (c *Client) transfer(session getty.Session, typ CodecType, req *GettyRPCReq ...@@ -127,11 +212,13 @@ func (c *Client) transfer(session getty.Session, typ CodecType, req *GettyRPCReq
pkg.H.Command = gettyCmdRPCRequest pkg.H.Command = gettyCmdRPCRequest
pkg.B = req pkg.B = req
} }
if rsp != nil {
rsp.seq = sequence
c.addPendingResponse(rsp)
}
rsp.seq = sequence err = session.WritePkg(pkg, opts.RequestTimeout)
c.addPendingResponse(rsp)
err = session.WritePkg(pkg, 0)
if err != nil { if err != nil {
c.removePendingResponse(SequenceType(rsp.seq)) c.removePendingResponse(SequenceType(rsp.seq))
} }
......
...@@ -507,10 +507,12 @@ func (resp *GettyRPCResponse) GetHeader() interface{} { ...@@ -507,10 +507,12 @@ func (resp *GettyRPCResponse) GetHeader() interface{} {
//////////////////////////////////////////// ////////////////////////////////////////////
type PendingResponse struct { type PendingResponse struct {
seq uint64 seq uint64
err error err error
reply interface{} handler AsyncHandler
done chan struct{} reply interface{}
opts CallOptions
done chan struct{}
} }
func NewPendingResponse() *PendingResponse { func NewPendingResponse() *PendingResponse {
......
package rpc package rpc
import ( import (
"reflect" "reflect"
"sync" "sync"
"time" "time"
) )
...@@ -90,13 +90,21 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { ...@@ -90,13 +90,21 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
h.replyCmd(session, req, gettyCmdHbResponse, "") h.replyCmd(session, req, gettyCmdHbResponse, "")
return return
} }
if req.header.CallType == CT_OneWay {
function := req.methodType.method.Func
function.Call([]reflect.Value{req.service.rcvr, req.argv, req.replyv})
return
}
if req.header.CallType == CT_TwoWayNoReply { if req.header.CallType == CT_TwoWayNoReply {
h.replyCmd(session, req, gettyCmdRPCResponse, "") h.replyCmd(session, req, gettyCmdRPCResponse, "")
function := req.methodType.method.Func function := req.methodType.method.Func
function.Call([]reflect.Value{req.service.rcvr, req.argv, req.replyv}) function.Call([]reflect.Value{req.service.rcvr, req.argv, req.replyv})
return return
} }
h.callService(session, req, req.service, req.methodType, req.argv, req.replyv) err := h.callService(session, req, req.service, req.methodType, req.argv, req.replyv)
if err != nil {
log.Error("h.callService(session:%#v, req:%#v) = %s", session, req, jerrors.ErrorStack(err))
}
} }
func (h *RpcServerHandler) OnCron(session getty.Session) { func (h *RpcServerHandler) OnCron(session getty.Session) {
...@@ -142,14 +150,14 @@ func (h *RpcServerHandler) replyCmd(session getty.Session, req GettyRPCRequestPa ...@@ -142,14 +150,14 @@ func (h *RpcServerHandler) replyCmd(session getty.Session, req GettyRPCRequestPa
} }
func (h *RpcServerHandler) callService(session getty.Session, req GettyRPCRequestPackage, func (h *RpcServerHandler) callService(session getty.Session, req GettyRPCRequestPackage,
service *service, methodType *methodType, argv, replyv reflect.Value) { service *service, methodType *methodType, argv, replyv reflect.Value) error {
function := methodType.method.Func function := methodType.method.Func
returnValues := function.Call([]reflect.Value{service.rcvr, argv, replyv}) returnValues := function.Call([]reflect.Value{service.rcvr, argv, replyv})
errInter := returnValues[0].Interface() errInter := returnValues[0].Interface()
if errInter != nil { if errInter != nil {
h.replyCmd(session, req, gettyCmdRPCResponse, errInter.(error).Error()) h.replyCmd(session, req, gettyCmdRPCResponse, errInter.(error).Error())
return return nil
} }
resp := GettyPackage{ resp := GettyPackage{
...@@ -161,7 +169,7 @@ func (h *RpcServerHandler) callService(session getty.Session, req GettyRPCReques ...@@ -161,7 +169,7 @@ func (h *RpcServerHandler) callService(session getty.Session, req GettyRPCReques
body: replyv.Interface(), body: replyv.Interface(),
} }
session.WritePkg(resp, 5*time.Second) return jerrors.Trace(session.WritePkg(resp, 5*time.Second))
} }
//////////////////////////////////////////// ////////////////////////////////////////////
...@@ -220,8 +228,12 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { ...@@ -220,8 +228,12 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
} }
err := codec.Decode(p.body, pendingResponse.reply) err := codec.Decode(p.body, pendingResponse.reply)
if err != nil { if err != nil {
pendingResponse.err = err if pendingResponse.handler == nil {
pendingResponse.done <- struct{}{} pendingResponse.err = err
pendingResponse.done <- struct{}{}
} else {
pendingResponse.handler(pendingResponse.reply, pendingResponse.opts)
}
return return
} }
pendingResponse.done <- struct{}{} pendingResponse.done <- struct{}{}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment