Commit a615e175 authored by AlexStocks's avatar AlexStocks

Imp: add micro CallOneway/AsyncCall

parent a7eaa4af
......@@ -16,7 +16,7 @@
- 2018/10/16
> Feature
* add CallOneway/AsyncCall
* add rpc/micro CallOneway/AsyncCall
* version: v1.0.3
- 2018/08/19
......
......@@ -155,15 +155,15 @@ func NewClient(conf *rpc.ClientConfig, regConf *ConsumerRegistryConfig, opts ...
return clt, nil
}
func (c *Client) Call(ctx context.Context, typ rpc.CodecType, service, version, method string,
args interface{}, reply interface{}, opts ...rpc.CallOption) error {
func (c *Client) getServiceAddr(ctx context.Context, typ rpc.CodecType, service, version string) (string, error) {
attr := c.attr
attr.Service = service
attr.Protocol = typ.String()
attr.Role = gxregistry.SRT_Provider
attr.Version = version
var addr string
flag := false
svcArray, ok := c.svcMap[attr]
if !ok {
......@@ -176,7 +176,7 @@ func (c *Client) Call(ctx context.Context, typ rpc.CodecType, service, version,
if flag {
var err error
if svcArray, err = c.filter.Filter(attr); err != nil {
return jerrors.Trace(err)
return addr, jerrors.Trace(err)
}
c.svcMap[attr] = svcArray
......@@ -184,18 +184,48 @@ func (c *Client) Call(ctx context.Context, typ rpc.CodecType, service, version,
svc, err := svcArray.Select(ctx, c.ClientOptions.hash)
if err != nil {
return jerrors.Trace(err)
return addr, jerrors.Trace(err)
}
if len(svc.Nodes) != 1 {
return jerrors.Errorf("illegal service %#v", svc)
return addr, jerrors.Errorf("illegal service %#v", svc)
}
return gxnet.HostAddress(svc.Nodes[0].Address, int(svc.Nodes[0].Port)), nil
}
func (c *Client) CallOneway(ctx context.Context, typ rpc.CodecType, service, version, method string,
args interface{}, opts ...rpc.CallOption) error {
addr, err := c.getServiceAddr(ctx, typ, service, version)
if err != nil {
return jerrors.Trace(err)
}
addr := gxnet.HostAddress(svc.Nodes[0].Address, int(svc.Nodes[0].Port))
return jerrors.Trace(c.Client.CallOneway(typ, addr, service, method, args, opts...))
}
func (c *Client) Call(ctx context.Context, typ rpc.CodecType, service, version, method string,
args interface{}, reply interface{}, opts ...rpc.CallOption) error {
addr, err := c.getServiceAddr(ctx, typ, service, version)
if err != nil {
return jerrors.Trace(err)
}
return jerrors.Trace(c.Client.Call(typ, addr, service, method, args, reply, opts...))
}
func (c *Client) AsyncCall(ctx context.Context, typ rpc.CodecType, service, version, method string,
args interface{}, callback rpc.AsyncCallback, reply interface{}, opts ...rpc.CallOption) error {
addr, err := c.getServiceAddr(ctx, typ, service, version)
if err != nil {
return jerrors.Trace(err)
}
return jerrors.Trace(c.Client.AsyncCall(typ, addr, service, method, args, callback, reply, opts...))
}
func (c *Client) Close() {
c.filter.Close()
c.registry.Close()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment