Commit bc4d8526 authored by AlexStocks's avatar AlexStocks

Add: server registry

parent 97d432dd
...@@ -23,6 +23,43 @@ import ( ...@@ -23,6 +23,43 @@ import (
// getty command // getty command
//////////////////////////////////////////// ////////////////////////////////////////////
type gettyCodecType uint32
const (
gettyCodecUnknown gettyCodecType = 0x00
gettyJson = 0x01
gettyProtobuf = 0x02
)
var gettyCodecTypeStrings = [...]string{
"unknown",
"json",
"protobuf",
}
func (c gettyCodecType) String() string {
if c == gettyJson || c == gettyProtobuf {
return gettyCodecTypeStrings[c]
}
return gettyCodecTypeStrings[gettyCodecUnknown]
}
func String2CodecType(codecType string) gettyCodecType {
switch codecType {
case gettyCodecTypeStrings[gettyJson]:
return gettyJson
case gettyCodecTypeStrings[gettyProtobuf]:
return gettyProtobuf
}
return gettyCodecUnknown
}
////////////////////////////////////////////
// getty command
////////////////////////////////////////////
type gettyCommand uint32 type gettyCommand uint32
const ( const (
......
...@@ -30,12 +30,24 @@ type ( ...@@ -30,12 +30,24 @@ type (
SessionName string `default:"rpc" yaml:"session_name" json:"session_name,omitempty"` SessionName string `default:"rpc" yaml:"session_name" json:"session_name,omitempty"`
} }
RegistryConfig struct {
Type string `default:"etcd" yaml:"type" json:"type,omitempty"`
Addr string `default:"127.0.0.1:2379" yaml:"addr" json:"addr,omitempty"`
KeepaliveTimeout int `default:"5" yaml:"keepalive_time" json:"keepalive_timeout,omitempty"`
Root string `default:"getty" yaml:"keepalive_time" json:"keepalive_timeout,omitempty"`
IDC string `default:"idc-bj" yaml:"idc" json:"idc,omitempty"`
NodeID string `default:"node0" yaml:"node_id" json:"node_id,omitempty"`
}
// Config holds supported types by the multiconfig package // Config holds supported types by the multiconfig package
ServerConfig struct { ServerConfig struct {
// local address // local address
AppName string `default:"rcp-server" yaml:"app_name" json:"app_name,omitempty"` AppName string `default:"rcp-server" yaml:"app_name" json:"app_name,omitempty"`
Host string `default:"127.0.0.1" yaml:"host" json:"host,omitempty"` Host string `default:"127.0.0.1" yaml:"host" json:"host,omitempty"`
Ports []string `yaml:"ports" json:"ports,omitempty"` // `default:["10000"]` Ports []string `yaml:"ports" json:"ports,omitempty"` // `default:["10000"]`
ProfilePort int `default:"10086" yaml:"profile_port" json:"profile_port,omitempty"`
CodecType string `default:"json" yaml:"codec_type" json:"codec_type,omitempty"`
codecType gettyCodecType
// session // session
SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"` SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"`
...@@ -48,6 +60,9 @@ type ( ...@@ -48,6 +60,9 @@ type (
// session tcp parameters // session tcp parameters
GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"` GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"`
// registry center
Registry RegistryConfig `required:"true" yaml:"registry_config" json:"registry_config,omitempty"`
} }
// Config holds supported types by the multiconfig package // Config holds supported types by the multiconfig package
...@@ -56,11 +71,11 @@ type ( ...@@ -56,11 +71,11 @@ type (
AppName string `default:"rcp-client" yaml:"app_name" json:"app_name,omitempty"` AppName string `default:"rcp-client" yaml:"app_name" json:"app_name,omitempty"`
Host string `default:"127.0.0.1" yaml:"host" json:"host,omitempty"` Host string `default:"127.0.0.1" yaml:"host" json:"host,omitempty"`
Ports []string `yaml:"ports" json:"ports,omitempty"` // `default:["10000"]` Ports []string `yaml:"ports" json:"ports,omitempty"` // `default:["10000"]`
ProfilePort int `default:"10086" yaml:"profile_port" json:"profile_port,omitempty"`
// server // server
ServerHost string `default:"127.0.0.1" yaml:"server_host" json:"server_host,omitempty"` ServerHost string `default:"127.0.0.1" yaml:"server_host" json:"server_host,omitempty"`
ServerPort int `default:"10000" yaml:"server_port" json:"server_port,omitempty"` ServerPort int `default:"10000" yaml:"server_port" json:"server_port,omitempty"`
ProfilePort int `default:"10086" yaml:"profile_port" json:"profile_port,omitempty"`
// session pool // session pool
ConnectionNum int `default:"16" yaml:"connection_num" json:"connection_num,omitempty"` ConnectionNum int `default:"16" yaml:"connection_num" json:"connection_num,omitempty"`
...@@ -79,6 +94,9 @@ type ( ...@@ -79,6 +94,9 @@ type (
// session tcp parameters // session tcp parameters
GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"` GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"`
// registry center
Registry RegistryConfig `required:"true" yaml:"registry_config" json:"registry_config,omitempty"`
} }
) )
......
...@@ -14,6 +14,14 @@ type TestRpc struct { ...@@ -14,6 +14,14 @@ type TestRpc struct {
i int i int
} }
func (r *TestRpc) Service() string {
return "TestRpc"
}
func (r *TestRpc) Version() string {
return "v1.0"
}
func (r *TestRpc) Test(arg TestABC, res *string) error { func (r *TestRpc) Test(arg TestABC, res *string) error {
log.Debug("arg:%+v", arg) log.Debug("arg:%+v", arg)
*res = "this is a test" *res = "this is a test"
......
...@@ -4,11 +4,15 @@ import ( ...@@ -4,11 +4,15 @@ import (
"github.com/AlexStocks/getty/rpc" "github.com/AlexStocks/getty/rpc"
"github.com/AlexStocks/getty/rpc/example/data" "github.com/AlexStocks/getty/rpc/example/data"
log "github.com/AlexStocks/log4go" log "github.com/AlexStocks/log4go"
jerrors "github.com/juju/errors"
) )
func main() { func main() {
log.LoadConfiguration("server_log.xml") log.LoadConfiguration("/Users/alex/test/golang/lib/src/github.com/AlexStocks/getty/rpc/example/server/server_log.xml")
srv := rpc.NewServer("server_config.toml") srv, err := rpc.NewServer("/Users/alex/test/golang/lib/src/github.com/AlexStocks/getty/rpc/example/server/server_config.toml")
srv.Register(new(data.TestRpc)) if err != nil {
panic(jerrors.ErrorStack(err))
}
err = srv.Register(new(data.TestRpc))
srv.Run() srv.Run()
} }
...@@ -8,6 +8,7 @@ Host = "127.0.0.1" ...@@ -8,6 +8,7 @@ Host = "127.0.0.1"
# Host = "192.168.8.3" # Host = "192.168.8.3"
Ports = ["10000", "20000"] Ports = ["10000", "20000"]
ProfilePort = 10086 ProfilePort = 10086
CodecType = "json"
# session # session
# client与server之间连接的超时时间 # client与server之间连接的超时时间
...@@ -32,3 +33,12 @@ FailFastTimeout = "3s" ...@@ -32,3 +33,12 @@ FailFastTimeout = "3s"
WaitTimeout = "1s" WaitTimeout = "1s"
MaxMsgLen = 128 MaxMsgLen = 128
SessionName = "rpc-server" SessionName = "rpc-server"
# registry
[Registry]
Type = "etcd"
Addr = "127.0.0.1:2379"
KeepaliveTimeout = 5
Root = "/getty"
IDC = "bj-unicom"
NodeID = "n147"
...@@ -15,6 +15,11 @@ var ( ...@@ -15,6 +15,11 @@ var (
typeOfError = reflect.TypeOf((*error)(nil)).Elem() typeOfError = reflect.TypeOf((*error)(nil)).Elem()
) )
type GettyRPCService interface {
Service() string // Service Interface
Version() string
}
type methodType struct { type methodType struct {
sync.Mutex sync.Mutex
method reflect.Method method reflect.Method
......
...@@ -6,32 +6,92 @@ import ( ...@@ -6,32 +6,92 @@ import (
"os" "os"
"os/signal" "os/signal"
"reflect" "reflect"
"strconv"
"strings"
"syscall" "syscall"
"time" "time"
) )
import ( import (
"github.com/AlexStocks/getty" "github.com/AlexStocks/getty"
"github.com/AlexStocks/goext/database/registry"
"github.com/AlexStocks/goext/database/registry/etcdv3"
"github.com/AlexStocks/goext/database/registry/zookeeper"
"github.com/AlexStocks/goext/net" "github.com/AlexStocks/goext/net"
jerrors "github.com/juju/errors"
log "github.com/AlexStocks/log4go" log "github.com/AlexStocks/log4go"
jerrors "github.com/juju/errors"
) )
type Server struct { type Server struct {
conf *ServerConfig conf *ServerConfig
serviceMap map[string]*service serviceMap map[string]*service
tcpServerList []getty.Server tcpServerList []getty.Server
registry gxregistry.Registry
sa gxregistry.ServiceAttr
nodes []*gxregistry.Node
} }
func NewServer(confFile string) *Server { var (
ErrIllegalCodecType = jerrors.New("illegal codec type")
)
func NewServer(confFile string) (*Server, error) {
conf := loadServerConf(confFile) conf := loadServerConf(confFile)
if conf.codecType = String2CodecType(conf.CodecType); conf.codecType == gettyCodecUnknown {
return nil, ErrIllegalCodecType
}
s := &Server{ s := &Server{
serviceMap: make(map[string]*service), serviceMap: make(map[string]*service),
conf: conf, conf: conf,
} }
return s var err error
var registry gxregistry.Registry
if len(s.conf.Registry.Addr) != 0 {
addrList := strings.Split(s.conf.Registry.Addr, ",")
switch s.conf.Registry.Type {
case "etcd":
registry, err = gxetcd.NewRegistry(
gxregistry.WithAddrs(addrList...),
gxregistry.WithTimeout(time.Duration(int(time.Second)*s.conf.Registry.KeepaliveTimeout)),
gxregistry.WithRoot(s.conf.Registry.Root),
)
case "zookeeper":
registry, err = gxzookeeper.NewRegistry(
gxregistry.WithAddrs(addrList...),
gxregistry.WithTimeout(time.Duration(int(time.Second)*s.conf.Registry.KeepaliveTimeout)),
gxregistry.WithRoot(s.conf.Registry.Root),
)
}
if err != nil {
return nil, jerrors.Trace(err)
}
if registry != nil {
s.registry = registry
s.sa = gxregistry.ServiceAttr{
Group: s.conf.Registry.IDC,
Role: gxregistry.SRT_Provider,
Protocol: s.conf.CodecType,
}
for _, p := range s.conf.Ports {
port, err := strconv.Atoi(p)
if err != nil {
return nil, jerrors.New(fmt.Sprintf("illegal port %s", p))
}
s.nodes = append(s.nodes,
&gxregistry.Node{
ID: s.conf.Registry.NodeID + "-" + net.JoinHostPort(s.conf.Host, p),
Address: s.conf.Host,
Port: int32(port)})
}
}
}
return s, nil
} }
func (s *Server) Run() { func (s *Server) Run() {
...@@ -41,7 +101,7 @@ func (s *Server) Run() { ...@@ -41,7 +101,7 @@ func (s *Server) Run() {
s.initSignal() s.initSignal()
} }
func (s *Server) Register(rcvr interface{}) error { func (s *Server) Register(rcvr GettyRPCService) error {
svc := &service{ svc := &service{
typ: reflect.TypeOf(rcvr), typ: reflect.TypeOf(rcvr),
rcvr: reflect.ValueOf(rcvr), rcvr: reflect.ValueOf(rcvr),
...@@ -77,6 +137,15 @@ func (s *Server) Register(rcvr interface{}) error { ...@@ -77,6 +137,15 @@ func (s *Server) Register(rcvr interface{}) error {
} }
s.serviceMap[svc.name] = svc s.serviceMap[svc.name] = svc
if s.registry != nil {
sa := s.sa
sa.Service = rcvr.Service()
sa.Version = rcvr.Version()
service := gxregistry.Service{Attr: &sa, Nodes: s.nodes}
if err := s.registry.Register(service); err != nil {
return jerrors.Trace(err)
}
}
return nil return nil
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment