Commit fc8c917d authored by AlexStocks's avatar AlexStocks

Add: use multiple service config in service configuration

parent 8de316ee
......@@ -14,6 +14,12 @@
## develop history ##
---
- 2018/08/19
> Feature
* use multiple service config in service configuration
* do not register consumer
* version: v1.0.2
- 2018/08/16
> Feature
* Add gxpool.watcher filter
......
......@@ -45,7 +45,7 @@ type Client struct {
}
// NewServer initialize a micro service consumer
func NewClient(conf *rpc.ClientConfig, regConf *RegistryConfig, opts ...ClientOption) (*Client, error) {
func NewClient(conf *rpc.ClientConfig, regConf *ConsumerRegistryConfig, opts ...ClientOption) (*Client, error) {
var (
err error
rpcClient *rpc.Client
......@@ -61,17 +61,17 @@ func NewClient(conf *rpc.ClientConfig, regConf *RegistryConfig, opts ...ClientOp
return nil, jerrors.Trace(err)
}
addrList := strings.Split(regConf.Addr, ",")
regAddrList := strings.Split(regConf.RegAddr, ",")
switch regConf.Type {
case "etcd":
registry, err = gxetcd.NewRegistry(
gxregistry.WithAddrs(addrList...),
gxregistry.WithAddrs(regAddrList...),
gxregistry.WithTimeout(time.Duration(1e9*regConf.KeepaliveTimeout)),
gxregistry.WithRoot(regConf.Root),
)
case "zookeeper":
registry, err = gxzookeeper.NewRegistry(
gxregistry.WithAddrs(addrList...),
gxregistry.WithAddrs(regAddrList...),
gxregistry.WithTimeout(time.Duration(1e9*regConf.KeepaliveTimeout)),
gxregistry.WithRoot(regConf.Root),
)
......@@ -83,8 +83,7 @@ func NewClient(conf *rpc.ClientConfig, regConf *RegistryConfig, opts ...ClientOp
}
serviceAttrFilter := gxregistry.ServiceAttr{
Group: regConf.IDC,
Role: gxregistry.SRT_Provider,
Role: gxregistry.SRT_Provider,
}
gxctx := gxcontext.NewValuesContext(nil)
gxctx.Set(gxpool.GxfilterServiceAttrKey, serviceAttrFilter)
......@@ -97,29 +96,29 @@ func NewClient(conf *rpc.ClientConfig, regConf *RegistryConfig, opts ...ClientOp
return nil, jerrors.Trace(err)
}
service := gxregistry.Service{
Attr: &gxregistry.ServiceAttr{
Group: regConf.IDC,
Role: gxregistry.SRT_Consumer,
Protocol: regConf.Codec,
},
Nodes: []*gxregistry.Node{
&gxregistry.Node{
ID: regConf.NodeID,
Address: conf.Host,
// Port: 0,
},
},
}
if err = registry.Register(service); err != nil {
return nil, jerrors.Trace(err)
}
// service := gxregistry.Service{
// Attr: &gxregistry.ServiceAttr{
// Group: regConf.IDC,
// Role: gxregistry.SRT_Consumer,
// Protocol: regConf.Codec,
// },
// Nodes: []*gxregistry.Node{
// &gxregistry.Node{
// ID: regConf.NodeID,
// Address: conf.Host,
// // Port: 0,
// },
// },
// }
// if err = registry.Register(service); err != nil {
// return nil, jerrors.Trace(err)
// }
clt := &Client{
Client: rpcClient,
registry: registry,
attr: gxregistry.ServiceAttr{
Group: regConf.IDC,
Group: regConf.Group,
},
filter: filter,
svcMap: make(map[gxregistry.ServiceAttr]*gxfilter.ServiceArray),
......
......@@ -2,8 +2,8 @@ package micro
import (
"github.com/AlexStocks/getty/rpc"
"github.com/AlexStocks/goext/container/set/strset"
jerrors "github.com/juju/errors"
"github.com/scylladb/go-set/strset"
)
const (
......@@ -14,16 +14,55 @@ var (
registryArray = strset.New("zookeeper", "etcd")
)
type ServiceConfig struct {
LocalHost string `default:"127.0.0.1" yaml:"local_host" json:"local_host, omitempty"`
LocalPort int `default:"10001" yaml:"local_port" json:"local_port, omitempty"`
Group string `default:"idc-bj" yaml:"group" json:"group,omitempty"`
NodeID string `default:"node0" yaml:"node_id" json:"node_id,omitempty"`
Protocol string `default:"json" yaml:"protocol" json:"protocol,omitempty"`
Service string `default:"test" yaml:"service" json:"service,omitempty"`
Version string `default:"v1" yaml:"version" json:"version,omitempty"`
}
// CheckValidity check parameter validity
func (c *ServiceConfig) CheckValidity() error {
if len(c.LocalHost) == 0 {
return jerrors.Errorf(ErrIllegalConf+"local host %s", c.LocalHost)
}
if c.LocalPort <= 0 || 65535 < c.LocalPort {
return jerrors.Errorf(ErrIllegalConf+"local port %s", c.LocalPort)
}
if len(c.Group) == 0 {
return jerrors.Errorf(ErrIllegalConf+"group %s", c.Group)
}
if len(c.NodeID) == 0 {
return jerrors.Errorf(ErrIllegalConf+"node id %s", c.NodeID)
}
if codec := rpc.GetCodecType(c.Protocol); codec == rpc.CodecUnknown {
return jerrors.Errorf(ErrIllegalConf+"protocol type %s", c.Protocol)
}
if len(c.Service) == 0 {
return jerrors.Errorf(ErrIllegalConf+"service %s", c.Service)
}
if len(c.Version) == 0 {
return jerrors.Errorf(ErrIllegalConf+"service version %s", c.Version)
}
return nil
}
// RegistryConfig provides configuration for registry
type RegistryConfig struct {
Type string `default:"etcd" yaml:"type" json:"type,omitempty"`
Addr string `default:"" yaml:"addr" json:"addr,omitempty"`
KeepaliveTimeout int `default:"5" yaml:"keepalive_time" json:"keepalive_timeout,omitempty"`
Root string `default:"getty" yaml:"root" json:"root,omitempty"`
IDC string `default:"idc-bj" yaml:"idc" json:"idc,omitempty"`
NodeID string `default:"node0" yaml:"node_id" json:"node_id,omitempty"`
Codec string `default:"json" yaml:"codec" json:"codec,omitempty"`
codec rpc.CodecType
RegAddr string `default:"127.0.0.1:2181" yaml:"reg_addr" json:"reg_addr,omitempty"`
KeepaliveTimeout int `default:"5" yaml:"keepalive_timeout" json:"keepalive_timeout,omitempty"`
Root string `default:"/getty" yaml:"root" json:"root,omitempty"`
}
// CheckValidity check parameter validity
......@@ -31,9 +70,8 @@ func (c *RegistryConfig) CheckValidity() error {
if !registryArray.Has(c.Type) {
return jerrors.Errorf(ErrIllegalConf+"registry type %s", c.Type)
}
if len(c.Addr) == 0 {
return jerrors.Errorf(ErrIllegalConf+"registry addr %s", c.Addr)
if len(c.RegAddr) == 0 {
return jerrors.Errorf(ErrIllegalConf+"registry addr %s", c.RegAddr)
}
if c.KeepaliveTimeout < 0 {
......@@ -44,12 +82,45 @@ func (c *RegistryConfig) CheckValidity() error {
return jerrors.Errorf(ErrIllegalConf+"root %s", c.Root)
}
if len(c.NodeID) == 0 {
return jerrors.Errorf(ErrIllegalConf+"node id %s", c.NodeID)
return nil
}
// ProviderRegistryConfig provides provider configuration for registry
type ProviderRegistryConfig struct {
RegistryConfig `yaml:"basic" json:"basic,omitempty"`
ServiceArray []ServiceConfig `default:"" yaml:"service_array" json:"service_array,omitempty"`
}
// CheckValidity check parameter validity
func (c *ProviderRegistryConfig) CheckValidity() error {
if err := c.RegistryConfig.CheckValidity(); err != nil {
return jerrors.Trace(err)
}
for idx := 0; idx < len(c.ServiceArray); idx++ {
if err := c.ServiceArray[idx].CheckValidity(); err != nil {
return jerrors.Errorf(ErrIllegalConf+"service reference config, idx:%d, err:%s",
idx, jerrors.ErrorStack(err))
}
}
return nil
}
// ConsumerRegistryConfig provides consumer configuration for registry
type ConsumerRegistryConfig struct {
RegistryConfig `yaml:"basic" json:"basic,omitempty"`
Group string `default:"idc-bj" yaml:"group" json:"group,omitempty"`
}
// CheckValidity check parameter validity
func (c *ConsumerRegistryConfig) CheckValidity() error {
if err := c.RegistryConfig.CheckValidity(); err != nil {
return jerrors.Trace(err)
}
if c.codec = rpc.GetCodecType(c.Codec); c.codec == rpc.CodecUnknown {
return jerrors.Errorf(ErrIllegalConf+"codec type %s", c.Codec)
if len(c.Group) == 0 {
return jerrors.Errorf(ErrIllegalConf+"group %s", c.Group)
}
return nil
......
......@@ -2,6 +2,8 @@ package micro
import (
"github.com/AlexStocks/goext/net"
"github.com/AlexStocks/goext/strings"
"net"
"strconv"
"strings"
"time"
......@@ -19,19 +21,17 @@ import (
type Server struct {
*rpc.Server
// registry
regConf RegistryConfig
regConf ProviderRegistryConfig
registry gxregistry.Registry
attr gxregistry.ServiceAttr
nodes []*gxregistry.Node
}
// NewServer initialize a micro service provider
func NewServer(conf *rpc.ServerConfig, regConf *RegistryConfig) (*Server, error) {
func NewServer(conf *rpc.ServerConfig, regConf *ProviderRegistryConfig) (*Server, error) {
var (
err error
rpcServer *rpc.Server
registry gxregistry.Registry
nodes []*gxregistry.Node
)
if err = regConf.CheckValidity(); err != nil {
......@@ -42,17 +42,17 @@ func NewServer(conf *rpc.ServerConfig, regConf *RegistryConfig) (*Server, error)
return nil, jerrors.Trace(err)
}
addrList := strings.Split(regConf.Addr, ",")
regAddrList := strings.Split(regConf.RegAddr, ",")
switch regConf.Type {
case "etcd":
registry, err = gxetcd.NewRegistry(
gxregistry.WithAddrs(addrList...),
gxregistry.WithAddrs(regAddrList...),
gxregistry.WithTimeout(time.Duration(1e9*regConf.KeepaliveTimeout)),
gxregistry.WithRoot(regConf.Root),
)
case "zookeeper":
registry, err = gxzookeeper.NewRegistry(
gxregistry.WithAddrs(addrList...),
gxregistry.WithAddrs(regAddrList...),
gxregistry.WithTimeout(time.Duration(1e9*regConf.KeepaliveTimeout)),
gxregistry.WithRoot(regConf.Root),
)
......@@ -63,47 +63,70 @@ func NewServer(conf *rpc.ServerConfig, regConf *RegistryConfig) (*Server, error)
return nil, jerrors.Trace(err)
}
var localAddrArr []string
for _, p := range conf.Ports {
port, err := strconv.Atoi(p)
if err != nil {
return nil, jerrors.Trace(err)
}
if port <= 0 || 65535 < port {
return nil, jerrors.Errorf("illegal port %s", p)
}
nodes = append(nodes,
&gxregistry.Node{
// use host port as part of NodeID to defeat the case: on process listens on many ports
ID: regConf.NodeID + "@" + gxnet.HostAddress(conf.Host, port),
Address: conf.Host,
Port: int32(port),
},
)
localAddrArr = append(localAddrArr, net.JoinHostPort(conf.Host, p))
}
for _, svr := range regConf.ServiceArray {
addr := gxnet.HostAddress(svr.LocalHost, svr.LocalPort)
if ok := gxstrings.Contains(localAddrArr, addr); !ok {
return nil, jerrors.Errorf("can not find ServiceConfig addr %s in conf address array %#v",
addr, localAddrArr)
}
}
return &Server{
Server: rpcServer,
regConf: *regConf,
registry: registry,
nodes: nodes,
attr: gxregistry.ServiceAttr{
Group: regConf.IDC,
Role: gxregistry.SRT_Provider,
Protocol: regConf.Codec,
},
}, nil
}
// Register the @rcvr
func (s *Server) Register(rcvr rpc.GettyRPCService) error {
if err := s.Server.Register(rcvr); err != nil {
return jerrors.Trace(err)
}
var (
flag bool
attr gxregistry.ServiceAttr
)
attr := s.attr
attr.Role = gxregistry.SRT_Provider
attr.Service = rcvr.Service()
attr.Version = rcvr.Version()
service := gxregistry.Service{Attr: &attr, Nodes: s.nodes}
if err := s.registry.Register(service); err != nil {
for _, c := range s.regConf.ServiceArray {
if c.Service == rcvr.Service() && c.Version == rcvr.Version() {
flag = true
attr.Group = c.Group
attr.Protocol = c.Protocol
service := gxregistry.Service{Attr: &attr}
service.Nodes = append(service.Nodes,
&gxregistry.Node{
ID: c.NodeID,
Address: c.LocalHost,
Port: int32(c.LocalPort),
},
)
if err := s.registry.Register(service); err != nil {
return jerrors.Trace(err)
}
}
}
if !flag {
return jerrors.Errorf("can not find @rcvr{service:%s, version:%s} in registry config:%#v",
rcvr.Service(), rcvr.Version(), s.regConf)
}
if err := s.Server.Register(rcvr); err != nil {
return jerrors.Trace(err)
}
......
......@@ -32,7 +32,7 @@ type (
// Config holds supported types by the multiconfig package
ServerConfig struct {
// local address
AppName string `default:"rcp-server" yaml:"app_name" json:"app_name,omitempty"`
AppName string `default:"rpc-server" yaml:"app_name" json:"app_name,omitempty"`
Host string `default:"127.0.0.1" yaml:"host" json:"host,omitempty"`
Ports []string `yaml:"ports" json:"ports,omitempty"` // `default:["10000"]`
ProfilePort int `default:"10086" yaml:"profile_port" json:"profile_port,omitempty"`
......@@ -53,12 +53,12 @@ type (
// Config holds supported types by the multiconfig package
ClientConfig struct {
// local address
AppName string `default:"rcp-client" yaml:"app_name" json:"app_name,omitempty"`
AppName string `default:"rpc-client" yaml:"app_name" json:"app_name,omitempty"`
Host string `default:"127.0.0.1" yaml:"host" json:"host,omitempty"`
ProfilePort int `default:"10086" yaml:"profile_port" json:"profile_port,omitempty"`
// session pool
ConnectionNum int `default:"16" yaml:"connection_num" json:"connection_num,omitempty"`
ConnectionNum int `default:"16" yaml:"connection_number" json:"connection_number,omitempty"`
// heartbeat
HeartbeatPeriod string `default:"15s" yaml:"heartbeat_period" json:"heartbeat_period,omitempty"`
......
......@@ -39,13 +39,12 @@ func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface
return nil, 0, jerrors.Trace(err)
}
req := GettyRPCRequestPackage{
H: pkg.H,
header: pkg.B.GetHeader().(GettyRPCRequestHeader),
}
if req.H.Command == gettyCmdHbRequest {
req := GettyRPCRequestPackage{H: pkg.H}
if pkg.H.Command == gettyCmdHbRequest {
return req, length, nil
}
req.header = pkg.B.GetHeader().(GettyRPCRequestHeader)
// get service & method
req.service = p.server.serviceMap[req.header.Service]
if req.service != nil {
......
......@@ -618,7 +618,7 @@ func (s *session) handleTCPPackage() error {
break
}
if 0 == bufLen {
continue // just continue if connection has read no more stream bytes.
continue // just continue if session can not read no more stream bytes.
}
pktBuf.Write(buf[:bufLen])
for {
......@@ -628,7 +628,7 @@ func (s *session) handleTCPPackage() error {
// pkg, err = s.pkgHandler.Read(s, pktBuf)
pkg, pkgLen, err = s.reader.Read(s, pktBuf.Bytes())
if err == nil && s.maxMsgLen > 0 && pkgLen > int(s.maxMsgLen) {
err = jerrors.Errorf("Message Too Long, pkgLen %d, session max message len %d", pkgLen, s.maxMsgLen)
err = jerrors.Errorf("pkgLen %d > session max message len %d", pkgLen, s.maxMsgLen)
}
if err != nil {
log.Warn("%s, [session.handleTCPPackage] = len{%d}, error{%s}",
......
......@@ -10,9 +10,9 @@
package getty
const (
Version = "1.0.1"
DATE = "2018/08/13"
Version = "1.0.2"
DATE = "2018/08/19"
GETTY_MAJOR = 1
GETTY_MINOR = 0
GETTY_BUILD = 1
GETTY_BUILD = 2
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment