Commit b302c8cf authored by AlexStocks's avatar AlexStocks

Update: Notify -> CallOneway

parent 69758dca
...@@ -55,7 +55,14 @@ func CallMeta(k, v interface{}) CallOption { ...@@ -55,7 +55,14 @@ func CallMeta(k, v interface{}) CallOption {
} }
} }
type AsyncHandler func(reply interface{}, opts CallOptions) type AsyncResponse struct {
Opts CallOptions
Cause error
Start time.Time
Reply interface{}
}
type AsyncCallback func(response AsyncResponse)
type Client struct { type Client struct {
conf ClientConfig conf ClientConfig
...@@ -80,7 +87,8 @@ func NewClient(conf *ClientConfig) (*Client, error) { ...@@ -80,7 +87,8 @@ func NewClient(conf *ClientConfig) (*Client, error) {
return c, nil return c, nil
} }
func (c *Client) Notify(typ CodecType, addr, service, method string, args interface{}, opts ...CallOption) error { // call one way
func (c *Client) CallOneway(typ CodecType, addr, service, method string, args interface{}, opts ...CallOption) error {
var copts CallOptions var copts CallOptions
for _, o := range opts { for _, o := range opts {
...@@ -106,18 +114,19 @@ func (c *Client) Call(typ CodecType, addr, service, method string, args, reply i ...@@ -106,18 +114,19 @@ func (c *Client) Call(typ CodecType, addr, service, method string, args, reply i
return jerrors.Trace(c.call(ct, typ, addr, service, method, args, reply, nil, copts)) return jerrors.Trace(c.call(ct, typ, addr, service, method, args, reply, nil, copts))
} }
func (c *Client) AsyncCall(typ CodecType, addr, service, method string, args, reply interface{}, handler AsyncHandler, opts ...CallOption) error { func (c *Client) AsyncCall(typ CodecType, addr, service, method string, args interface{},
var copts CallOptions callback AsyncCallback, reply interface{}, opts ...CallOption) error {
var copts CallOptions
for _, o := range opts { for _, o := range opts {
o(&copts) o(&copts)
} }
return jerrors.Trace(c.call(CT_TwoWay, typ, addr, service, method, args, reply, handler, copts)) return jerrors.Trace(c.call(CT_TwoWay, typ, addr, service, method, args, reply, callback, copts))
} }
func (c *Client) call(ct CallType, typ CodecType, addr, service, method string, func (c *Client) call(ct CallType, typ CodecType, addr, service, method string,
args, reply interface{}, handler AsyncHandler, opts CallOptions) error { args, reply interface{}, callback AsyncCallback, opts CallOptions) error {
if opts.RequestTimeout == 0 { if opts.RequestTimeout == 0 {
opts.RequestTimeout = c.conf.GettySessionParam.tcpWriteTimeout opts.RequestTimeout = c.conf.GettySessionParam.tcpWriteTimeout
...@@ -139,7 +148,7 @@ func (c *Client) call(ct CallType, typ CodecType, addr, service, method string, ...@@ -139,7 +148,7 @@ func (c *Client) call(ct CallType, typ CodecType, addr, service, method string,
if ct != CT_OneWay { if ct != CT_OneWay {
rsp = NewPendingResponse() rsp = NewPendingResponse()
rsp.reply = reply rsp.reply = reply
rsp.handler = handler rsp.callback = callback
rsp.opts = opts rsp.opts = opts
} }
...@@ -158,7 +167,7 @@ func (c *Client) call(ct CallType, typ CodecType, addr, service, method string, ...@@ -158,7 +167,7 @@ func (c *Client) call(ct CallType, typ CodecType, addr, service, method string,
return jerrors.Trace(err) return jerrors.Trace(err)
} }
if ct == CT_OneWay || handler != nil { if ct == CT_OneWay || callback != nil {
return nil return nil
} }
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"reflect" "reflect"
"time"
"unsafe" "unsafe"
) )
...@@ -507,14 +508,27 @@ func (resp *GettyRPCResponse) GetHeader() interface{} { ...@@ -507,14 +508,27 @@ func (resp *GettyRPCResponse) GetHeader() interface{} {
//////////////////////////////////////////// ////////////////////////////////////////////
type PendingResponse struct { type PendingResponse struct {
seq uint64 seq uint64
err error err error
handler AsyncHandler start time.Time
reply interface{} callback AsyncCallback
opts CallOptions reply interface{}
done chan struct{} opts CallOptions
done chan struct{}
} }
func NewPendingResponse() *PendingResponse { func NewPendingResponse() *PendingResponse {
return &PendingResponse{done: make(chan struct{})} return &PendingResponse{
start: time.Now(),
done: make(chan struct{}),
}
}
func (r PendingResponse) GenAsyncResponse() AsyncResponse {
return AsyncResponse{
Opts: r.opts,
Cause: r.err,
Start: r.start,
Reply: r.reply,
}
} }
package rpc package rpc
import ( import (
"reflect" "reflect"
"sync" "sync"
"time" "time"
) )
...@@ -103,7 +103,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { ...@@ -103,7 +103,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
} }
err := h.callService(session, req, req.service, req.methodType, req.argv, req.replyv) err := h.callService(session, req, req.service, req.methodType, req.argv, req.replyv)
if err != nil { if err != nil {
log.Error("h.callService(session:%#v, req:%#v) = %s", session, req, jerrors.ErrorStack(err)) log.Error("h.callService(session:%#v, req:%#v) = %s", session, req, jerrors.ErrorStack(err))
} }
} }
...@@ -205,7 +205,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { ...@@ -205,7 +205,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
log.Error("illegal packge{%#v}", pkg) log.Error("illegal packge{%#v}", pkg)
return return
} }
log.Debug("get rpc response{%s}", p) log.Debug("get rpc response{%#v}", p)
h.conn.updateSession(session) h.conn.updateSession(session)
pendingResponse := h.conn.pool.rpcClient.removePendingResponse(p.H.Sequence) pendingResponse := h.conn.pool.rpcClient.removePendingResponse(p.H.Sequence)
...@@ -217,7 +217,11 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { ...@@ -217,7 +217,11 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
} }
if p.H.Code == GettyFail && len(p.header.Error) > 0 { if p.H.Code == GettyFail && len(p.header.Error) > 0 {
pendingResponse.err = jerrors.New(p.header.Error) pendingResponse.err = jerrors.New(p.header.Error)
pendingResponse.done <- struct{}{} if pendingResponse.callback == nil {
pendingResponse.done <- struct{}{}
} else {
pendingResponse.callback(pendingResponse.GenAsyncResponse())
}
return return
} }
codec := Codecs[p.H.CodecType] codec := Codecs[p.H.CodecType]
...@@ -227,16 +231,12 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { ...@@ -227,16 +231,12 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
return return
} }
err := codec.Decode(p.body, pendingResponse.reply) err := codec.Decode(p.body, pendingResponse.reply)
if err != nil { pendingResponse.err = err
if pendingResponse.handler == nil { if pendingResponse.callback == nil {
pendingResponse.err = err pendingResponse.done <- struct{}{}
pendingResponse.done <- struct{}{} } else {
} else { pendingResponse.callback(pendingResponse.GenAsyncResponse())
pendingResponse.handler(pendingResponse.reply, pendingResponse.opts)
}
return
} }
pendingResponse.done <- struct{}{}
} }
func (h *RpcClientHandler) OnCron(session getty.Session) { func (h *RpcClientHandler) OnCron(session getty.Session) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment