Commit 2c8048f9 authored by AlexStocks's avatar AlexStocks

Add: service discovery

parent f8b2b44f
......@@ -30,11 +30,7 @@ Feature list:
Code example:
The rpc dir of [getty-examples](https://github.com/alexstocks/getty-examples/) shows how to build rpc client/rpc server.
##
The subdirectory rpc of [getty-examples](https://github.com/alexstocks/getty-examples/) shows how to build rpc client/rpc server.
## LICENCE
......
......@@ -14,6 +14,10 @@
## develop history ##
---
- 2018/08/13
> Feature
* Add Micro
- 2018/08/07
> Improvement
* RPC package format: {2 Bytes Header len + Header + 2 Body len + Body} ---> {Header + Body}
......
package micro
import (
"context"
"strings"
"time"
)
import (
"github.com/AlexStocks/getty/rpc"
"github.com/AlexStocks/goext/database/filter"
"github.com/AlexStocks/goext/database/filter/pool"
"github.com/AlexStocks/goext/database/registry"
"github.com/AlexStocks/goext/database/registry/etcdv3"
"github.com/AlexStocks/goext/database/registry/zookeeper"
"github.com/AlexStocks/goext/net"
jerrors "github.com/juju/errors"
)
type ClientOption func(*ClientOptions)
type ClientOptions struct {
hash gxfilter.ServiceHash
}
func WithServiceHash(hash gxfilter.ServiceHash) ClientOption {
return func(o *ClientOptions) {
o.hash = hash
}
}
type Client struct {
ClientOptions
*rpc.Client
// registry
registry gxregistry.Registry
attr gxregistry.ServiceAttr
filter gxfilter.Filter
svcMap map[gxregistry.ServiceAttr]*gxfilter.ServiceArray
}
// NewServer initialize a micro service consumer
func NewClient(conf *rpc.ClientConfig, regConf *RegistryConfig, opts ...ClientOption) (*Client, error) {
var (
err error
rpcClient *rpc.Client
registry gxregistry.Registry
filter gxfilter.Filter
)
if err = regConf.CheckValidity(); err != nil {
return nil, jerrors.Trace(err)
}
if rpcClient, err = rpc.NewClient(conf); err != nil {
return nil, jerrors.Trace(err)
}
addrList := strings.Split(regConf.Addr, ",")
switch regConf.Type {
case "etcd":
registry, err = gxetcd.NewRegistry(
gxregistry.WithAddrs(addrList...),
gxregistry.WithTimeout(time.Duration(1e9*regConf.KeepaliveTimeout)),
gxregistry.WithRoot(regConf.Root),
)
case "zookeeper":
registry, err = gxzookeeper.NewRegistry(
gxregistry.WithAddrs(addrList...),
gxregistry.WithTimeout(time.Duration(1e9*regConf.KeepaliveTimeout)),
gxregistry.WithRoot(regConf.Root),
)
default:
return nil, jerrors.Errorf(ErrIllegalConf+"registry type %s", regConf.Type)
}
if err != nil {
return nil, jerrors.Trace(err)
}
if filter, err = gxpool.NewFilter(
gxfilter.WithRegistry(registry),
gxpool.WithTTL(time.Duration(1e9*regConf.KeepaliveTimeout)),
); err != nil {
return nil, jerrors.Trace(err)
}
service := gxregistry.Service{
Attr: &gxregistry.ServiceAttr{
Group: regConf.IDC,
Role: gxregistry.SRT_Consumer,
Protocol: regConf.Codec,
},
Nodes: []*gxregistry.Node{
&gxregistry.Node{
ID: regConf.NodeID,
Address: conf.Host,
// Port: 0,
},
},
}
if err = registry.Register(service); err != nil {
return nil, jerrors.Trace(err)
}
clt := &Client{
Client: rpcClient,
registry: registry,
filter: filter,
}
for _, o := range opts {
o(&(clt.ClientOptions))
}
return clt, nil
}
func (c *Client) Call(ctx context.Context, typ rpc.CodecType, service, method string, args interface{}, reply interface{}) error {
attr := c.attr
attr.Service = service
attr.Protocol = typ.String()
flag := false
svcArray, ok := c.svcMap[attr]
if !ok {
flag = true
} else {
if ok = c.filter.CheckServiceAlive(attr, svcArray); !ok {
flag = true
}
}
var err error
if flag {
if svcArray, err = c.filter.Filter(attr); err != nil {
return jerrors.Trace(err)
}
c.svcMap[attr] = svcArray
}
svc, err := svcArray.Select(ctx, c.ClientOptions.hash)
if err != nil {
return jerrors.Trace(err)
}
return jerrors.Trace(c.Client.Call(typ,
gxnet.HostAddress(svc.Nodes[0].Address, int(svc.Nodes[0].Port)),
service, method, args, reply))
}
func (c *Client) Close() {
c.filter.Close()
c.registry.Close()
c.Client.Close()
}
package micro
import (
"github.com/AlexStocks/getty/rpc"
jerrors "github.com/juju/errors"
"github.com/scylladb/go-set/strset"
)
const (
ErrIllegalConf = "illegal conf "
)
var (
registryArray = strset.New("zookeeper", "etcd")
)
// RegistryConfig provides configuration for registry
type RegistryConfig struct {
Type string `default:"etcd" yaml:"type" json:"type,omitempty"`
Addr string `default:"" yaml:"addr" json:"addr,omitempty"`
KeepaliveTimeout int `default:"5" yaml:"keepalive_time" json:"keepalive_timeout,omitempty"`
Root string `default:"getty" yaml:"root" json:"root,omitempty"`
IDC string `default:"idc-bj" yaml:"idc" json:"idc,omitempty"`
NodeID string `default:"node0" yaml:"node_id" json:"node_id,omitempty"`
Codec string `default:"json" yaml:"codec" json:"codec,omitempty"`
codec rpc.CodecType
}
// CheckValidity check parameter validity
func (c *RegistryConfig) CheckValidity() error {
if !registryArray.Has(c.Type) {
return jerrors.Errorf(ErrIllegalConf+"registry type %s", c.Type)
}
if len(c.Addr) == 0 {
return jerrors.Errorf(ErrIllegalConf+"registry addr %s", c.Addr)
}
if c.KeepaliveTimeout < 0 {
return jerrors.Errorf(ErrIllegalConf+"keepalive timeout %d", c.KeepaliveTimeout)
}
if len(c.Root) == 0 {
return jerrors.Errorf(ErrIllegalConf+"root %s", c.Root)
}
if len(c.NodeID) == 0 {
return jerrors.Errorf(ErrIllegalConf+"node id %s", c.NodeID)
}
if c.codec = rpc.GetCodecType(c.Codec); c.codec == rpc.CodecUnknown {
return jerrors.Errorf(ErrIllegalConf+"codec type %s", c.Codec)
}
return nil
}
package micro
import (
"strconv"
"strings"
"time"
)
import (
"github.com/AlexStocks/getty/rpc"
"github.com/AlexStocks/goext/database/registry"
"github.com/AlexStocks/goext/database/registry/etcdv3"
"github.com/AlexStocks/goext/database/registry/zookeeper"
jerrors "github.com/juju/errors"
)
// Server micro service provider
type Server struct {
*rpc.Server
// registry
regConf RegistryConfig
registry gxregistry.Registry
attr gxregistry.ServiceAttr
nodes []*gxregistry.Node
}
// NewServer initialize a micro service provider
func NewServer(conf *rpc.ServerConfig, regConf *RegistryConfig) (*Server, error) {
var (
err error
rpcServer *rpc.Server
registry gxregistry.Registry
nodes []*gxregistry.Node
)
if err = regConf.CheckValidity(); err != nil {
return nil, jerrors.Trace(err)
}
if rpcServer, err = rpc.NewServer(conf); err != nil {
return nil, jerrors.Trace(err)
}
addrList := strings.Split(regConf.Addr, ",")
switch regConf.Type {
case "etcd":
registry, err = gxetcd.NewRegistry(
gxregistry.WithAddrs(addrList...),
gxregistry.WithTimeout(time.Duration(1e9*regConf.KeepaliveTimeout)),
gxregistry.WithRoot(regConf.Root),
)
case "zookeeper":
registry, err = gxzookeeper.NewRegistry(
gxregistry.WithAddrs(addrList...),
gxregistry.WithTimeout(time.Duration(1e9*regConf.KeepaliveTimeout)),
gxregistry.WithRoot(regConf.Root),
)
default:
return nil, jerrors.Errorf(ErrIllegalConf+"registry type %s", regConf.Type)
}
if err != nil {
return nil, jerrors.Trace(err)
}
for _, p := range conf.Ports {
port, err := strconv.Atoi(p)
if err != nil {
return nil, jerrors.Errorf("illegal port %s", p)
}
nodes = append(nodes,
&gxregistry.Node{
ID: regConf.NodeID,
Address: conf.Host,
Port: int32(port),
},
)
}
return &Server{
Server: rpcServer,
regConf: *regConf,
registry: registry,
nodes: nodes,
attr: gxregistry.ServiceAttr{
Group: regConf.IDC,
Role: gxregistry.SRT_Provider,
Protocol: regConf.Codec,
},
}, nil
}
// Register the @rcvr
func (s *Server) Register(rcvr rpc.GettyRPCService) error {
if err := s.Server.Register(rcvr); err != nil {
return jerrors.Trace(err)
}
attr := s.attr
attr.Service = rcvr.Service()
attr.Version = rcvr.Version()
service := gxregistry.Service{Attr: &attr, Nodes: s.nodes}
if err := s.registry.Register(service); err != nil {
return jerrors.Trace(err)
}
s.Stop()
return nil
}
// func (s *Server) Start() {
// s.Server.Start()
// }
// func (s *Server) Stop() {
// s.Server.Stop()
// }
......@@ -82,7 +82,7 @@ func (c *Client) Call(typ CodecType, addr, service, method string, args interfac
select {
case <-getty.GetTimeWheel().After(c.conf.GettySessionParam.tcpReadTimeout):
err = errClientReadTimeout
c.RemovePendingResponse(SequenceType(rsp.seq))
c.removePendingResponse(SequenceType(rsp.seq))
case <-rsp.done:
err = rsp.err
}
......@@ -129,29 +129,29 @@ func (c *Client) transfer(session getty.Session, typ CodecType, req *GettyRPCReq
}
rsp.seq = sequence
c.AddPendingResponse(rsp)
c.addPendingResponse(rsp)
err = session.WritePkg(pkg, 0)
if err != nil {
c.RemovePendingResponse(SequenceType(rsp.seq))
c.removePendingResponse(SequenceType(rsp.seq))
}
return jerrors.Trace(err)
}
func (c *Client) PendingResponseCount() int {
c.pendingLock.RLock()
defer c.pendingLock.RUnlock()
return len(c.pendingResponses)
}
// func (c *Client) PendingResponseCount() int {
// c.pendingLock.RLock()
// defer c.pendingLock.RUnlock()
// return len(c.pendingResponses)
// }
func (c *Client) AddPendingResponse(pr *PendingResponse) {
func (c *Client) addPendingResponse(pr *PendingResponse) {
c.pendingLock.Lock()
defer c.pendingLock.Unlock()
c.pendingResponses[SequenceType(pr.seq)] = pr
}
func (c *Client) RemovePendingResponse(seq SequenceType) *PendingResponse {
func (c *Client) removePendingResponse(seq SequenceType) *PendingResponse {
c.pendingLock.Lock()
defer c.pendingLock.Unlock()
if c.pendingResponses == nil {
......@@ -164,10 +164,10 @@ func (c *Client) RemovePendingResponse(seq SequenceType) *PendingResponse {
return nil
}
func (c *Client) ClearPendingResponses() map[SequenceType]*PendingResponse {
c.pendingLock.Lock()
defer c.pendingLock.Unlock()
presps := c.pendingResponses
c.pendingResponses = nil
return presps
}
// func (c *Client) ClearPendingResponses() map[SequenceType]*PendingResponse {
// c.pendingLock.Lock()
// defer c.pendingLock.Unlock()
// presps := c.pendingResponses
// c.pendingResponses = nil
// return presps
// }
......@@ -229,7 +229,7 @@ func (p *GettyPackage) Marshal() (*bytes.Buffer, error) {
headerBuf, buf *bytes.Buffer
)
buf = bytes.NewBuffer(make([]byte, gettyPackageHeaderLen))
buf = bytes.NewBuffer(make([]byte, gettyPackageHeaderLen, gettyPackageHeaderLen<<2))
// body
if p.B != nil {
......
......@@ -29,15 +29,6 @@ type (
SessionName string `default:"rpc" yaml:"session_name" json:"session_name,omitempty"`
}
RegistryConfig struct {
Type string `default:"etcd" yaml:"type" json:"type,omitempty"`
Addr string `default:"" yaml:"addr" json:"addr,omitempty"`
KeepaliveTimeout int `default:"5" yaml:"keepalive_time" json:"keepalive_timeout,omitempty"`
Root string `default:"getty" yaml:"keepalive_time" json:"keepalive_timeout,omitempty"`
IDC string `default:"idc-bj" yaml:"idc" json:"idc,omitempty"`
NodeID string `default:"node0" yaml:"node_id" json:"node_id,omitempty"`
}
// Config holds supported types by the multiconfig package
ServerConfig struct {
// local address
......@@ -70,7 +61,7 @@ type (
ConnectionNum int `default:"16" yaml:"connection_num" json:"connection_num,omitempty"`
// heartbeat
HeartbeatPeriod string `default:"15s" yaml:"heartbeat_period" json:"heartbeat_period, omitempty"`
HeartbeatPeriod string `default:"15s" yaml:"heartbeat_period" json:"heartbeat_period,omitempty"`
heartbeatPeriod time.Duration
// session
......@@ -78,7 +69,7 @@ type (
sessionTimeout time.Duration
// app
FailFastTimeout string `default:"5s" yaml:"fail_fast_timeout" json:"fail_fast_timeout, omitempty"`
FailFastTimeout string `default:"5s" yaml:"fail_fast_timeout" json:"fail_fast_timeout,omitempty"`
failFastTimeout time.Duration
// Connection Pool
......
......@@ -200,7 +200,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
log.Debug("get rpc response{%s}", p)
h.conn.updateSession(session)
pendingResponse := h.conn.pool.rpcClient.RemovePendingResponse(p.H.Sequence)
pendingResponse := h.conn.pool.rpcClient.removePendingResponse(p.H.Sequence)
if pendingResponse == nil {
return
}
......
......@@ -19,10 +19,6 @@ type Server struct {
tcpServerList []getty.Server
}
var (
ErrIllegalConf = "illegal conf: "
)
func NewServer(conf *ServerConfig) (*Server, error) {
if err := conf.CheckValidity(); err != nil {
return nil, jerrors.Trace(err)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment