Commit 027d4c26 authored by AlexStocks's avatar AlexStocks
parent ea356d13
...@@ -22,12 +22,16 @@ Feature list: ...@@ -22,12 +22,16 @@ Feature list:
- 1 Transport: TCP(√), UDP, Websocket - 1 Transport: TCP(√), UDP, Websocket
- 2 Codec: ProtoBuf(√), JSON(√) - 2 Codec: ProtoBuf(√), JSON(√)
- 3 Service Discovery: Service Publish(√), Service Watch(√), Service Notify(√) - 3 Service Discovery: Service Publish(X), Service Watch(X), Service Notify(X)
- 4 Registry: ZooKeeper(), Etcd(x) - 4 Registry: ZooKeeper(X), Etcd(x)
- 5 Strategy: Failover(√), Failfast(√) - 5 Strategy: Failover(√), Failfast(√)
- 6 Load Balance: Random(√), RoundRobin(√) - 6 Load Balance: Random(X), RoundRobin(X)
- 7 Metrics: Invoke Statistics(x), User Auth(x) - 7 Metrics: Invoke Statistics(x), User Auth(x)
Code example:
The rpc dir of [getty-examples](https://github.com/alexstocks/getty-examples/) shows how to build rpc client/rpc server.
## ##
......
package rpc package rpc
import ( import (
"fmt"
"math/rand" "math/rand"
"strings"
"sync" "sync"
"time" "time"
)
import (
"github.com/AlexStocks/getty" "github.com/AlexStocks/getty"
"github.com/AlexStocks/goext/database/filter"
"github.com/AlexStocks/goext/database/filter/pool"
"github.com/AlexStocks/goext/database/registry"
"github.com/AlexStocks/goext/database/registry/etcdv3"
"github.com/AlexStocks/goext/database/registry/zookeeper"
"github.com/AlexStocks/goext/net"
"github.com/AlexStocks/goext/sync/atomic" "github.com/AlexStocks/goext/sync/atomic"
jerrors "github.com/juju/errors" jerrors "github.com/juju/errors"
) )
var ( var (
errInvalidCodecType = jerrors.New("illegal CodecType")
errInvalidAddress = jerrors.New("remote address invalid or empty") errInvalidAddress = jerrors.New("remote address invalid or empty")
errSessionNotExist = jerrors.New("session not exist") errSessionNotExist = jerrors.New("session not exist")
errClientClosed = jerrors.New("client closed") errClientClosed = jerrors.New("client closed")
...@@ -32,96 +24,33 @@ func init() { ...@@ -32,96 +24,33 @@ func init() {
} }
type Client struct { type Client struct {
conf *ClientConfig conf ClientConfig
pool *gettyRPCClientConnPool pool *gettyRPCClientConnPool
sequence gxatomic.Uint64 sequence gxatomic.Uint64
pendingLock sync.RWMutex pendingLock sync.RWMutex
pendingResponses map[uint64]*PendingResponse pendingResponses map[uint64]*PendingResponse
// registry
registry gxregistry.Registry
sa gxregistry.ServiceAttr
filter gxfilter.Filter
} }
func NewClient(confFile string) (*Client, error) { func NewClient(conf *ClientConfig) (*Client, error) {
conf := loadClientConf(confFile) if err := conf.CheckValidity(); err != nil {
c := &Client{
pendingResponses: make(map[uint64]*PendingResponse),
conf: conf,
}
c.pool = newGettyRPCClientConnPool(c, conf.PoolSize, time.Duration(int(time.Second)*conf.PoolTTL))
if len(c.conf.Registry.Addr) == 0 {
if conf.codecType = String2CodecType(conf.CodecType); conf.codecType == gettyCodecUnknown {
return nil, jerrors.New(fmt.Sprintf(ErrIllegalConf+"codec type %s", conf.CodecType))
}
if conf.ServerPort == 0 {
return nil, jerrors.New(fmt.Sprintf(ErrIllegalConf + "both registry addr and ServerPort is nil"))
}
_, err := c.pool.getConn(conf.CodecType, gxnet.HostAddress(conf.ServerHost, conf.ServerPort))
return c, jerrors.Trace(err)
}
var err error
var registry gxregistry.Registry
addrList := strings.Split(c.conf.Registry.Addr, ",")
switch c.conf.Registry.Type {
case "etcd":
registry, err = gxetcd.NewRegistry(
gxregistry.WithAddrs(addrList...),
gxregistry.WithTimeout(time.Duration(int(time.Second)*c.conf.Registry.KeepaliveTimeout)),
gxregistry.WithRoot(c.conf.Registry.Root),
)
case "zookeeper":
registry, err = gxzookeeper.NewRegistry(
gxregistry.WithAddrs(addrList...),
gxregistry.WithTimeout(time.Duration(int(time.Second)*c.conf.Registry.KeepaliveTimeout)),
gxregistry.WithRoot(c.conf.Registry.Root),
)
default:
return nil, jerrors.New(fmt.Sprintf(ErrIllegalConf+"registry type %s", c.conf.Registry.Type))
}
if err != nil {
return nil, jerrors.Trace(err)
}
c.registry = registry
if c.registry != nil {
c.filter, err = gxpool.NewFilter(
gxfilter.WithBalancerMode(gxfilter.SM_Hash),
gxfilter.WithRegistry(c.registry),
gxpool.WithTTL(time.Duration(int(time.Second)*c.conf.Registry.KeepaliveTimeout)),
)
if err == nil {
return nil, jerrors.Trace(err) return nil, jerrors.Trace(err)
} }
service := gxregistry.Service{ c := &Client{
Attr: &gxregistry.ServiceAttr{ pendingResponses: make(map[uint64]*PendingResponse),
Group: c.conf.Registry.IDC, conf: *conf,
Role: gxregistry.SRT_Consumer,
Protocol: c.conf.CodecType,
},
Nodes: []*gxregistry.Node{&gxregistry.Node{
ID: c.conf.Registry.NodeID,
Address: c.conf.Host,
Port: 0,
},
},
}
if err = c.registry.Register(service); err != nil {
return nil, jerrors.Trace(err)
}
} }
c.pool = newGettyRPCClientConnPool(c, conf.PoolSize, time.Duration(int(time.Second)*conf.PoolTTL))
return c, nil return c, nil
} }
func (c *Client) Call(addr, protocol, service, method string, args interface{}, reply interface{}) error { func (c *Client) Call(typ CodecType, addr, service, method string, args interface{}, reply interface{}) error {
if !typ.CheckValidity() {
return errInvalidCodecType
}
b := &GettyRPCRequest{} b := &GettyRPCRequest{}
b.header.Service = service b.header.Service = service
b.header.Method = method b.header.Method = method
...@@ -134,16 +63,17 @@ func (c *Client) Call(addr, protocol, service, method string, args interface{}, ...@@ -134,16 +63,17 @@ func (c *Client) Call(addr, protocol, service, method string, args interface{},
resp := NewPendingResponse() resp := NewPendingResponse()
resp.reply = reply resp.reply = reply
session := c.selectSession(protocol, addr) var err error
if session == nil { var session getty.Session
session, err = c.selectSession(typ, addr)
if err != nil || session == nil {
return errSessionNotExist return errSessionNotExist
} }
if err := c.transfer(session, b, resp); err != nil { if err = c.transfer(session, typ, b, resp); err != nil {
return jerrors.Trace(err) return jerrors.Trace(err)
} }
var err error
select { select {
case <-getty.GetTimeWheel().After(c.conf.GettySessionParam.tcpReadTimeout): case <-getty.GetTimeWheel().After(c.conf.GettySessionParam.tcpReadTimeout):
err = errClientReadTimeout err = errClientReadTimeout
...@@ -160,26 +90,22 @@ func (c *Client) Close() { ...@@ -160,26 +90,22 @@ func (c *Client) Close() {
c.pool.close() c.pool.close()
} }
c.pool = nil c.pool = nil
if c.registry != nil {
c.registry.Close()
}
c.registry = nil
} }
func (c *Client) selectSession(protocol, addr string) getty.Session { func (c *Client) selectSession(typ CodecType, addr string) (getty.Session, error) {
rpcConn, err := c.pool.getConn(protocol, addr) rpcConn, err := c.pool.getConn(typ.String(), addr)
if err != nil { if err != nil {
return nil return nil, jerrors.Trace(err)
} }
return rpcConn.selectSession() return rpcConn.selectSession(), nil
} }
func (c *Client) heartbeat(session getty.Session) error { func (c *Client) heartbeat(session getty.Session, typ CodecType) error {
resp := NewPendingResponse() resp := NewPendingResponse()
return c.transfer(session, nil, resp) return c.transfer(session, typ, nil, resp)
} }
func (c *Client) transfer(session getty.Session, req *GettyRPCRequest, resp *PendingResponse) error { func (c *Client) transfer(session getty.Session, typ CodecType, req *GettyRPCRequest, resp *PendingResponse) error {
var ( var (
sequence uint64 sequence uint64
err error err error
...@@ -191,7 +117,7 @@ func (c *Client) transfer(session getty.Session, req *GettyRPCRequest, resp *Pen ...@@ -191,7 +117,7 @@ func (c *Client) transfer(session getty.Session, req *GettyRPCRequest, resp *Pen
pkg.H.LogID = (uint32)(randomID()) pkg.H.LogID = (uint32)(randomID())
pkg.H.Sequence = sequence pkg.H.Sequence = sequence
pkg.H.Command = gettyCmdHbRequest pkg.H.Command = gettyCmdHbRequest
pkg.H.CodecType = c.conf.codecType pkg.H.CodecType = typ
if req != nil { if req != nil {
pkg.H.Command = gettyCmdRPCRequest pkg.H.Command = gettyCmdRPCRequest
pkg.B = req pkg.B = req
......
...@@ -72,12 +72,12 @@ const ( ...@@ -72,12 +72,12 @@ const (
// getty codec type // getty codec type
//////////////////////////////////////////// ////////////////////////////////////////////
type gettyCodecType uint32 type CodecType uint32
const ( const (
gettyCodecUnknown gettyCodecType = 0x00 CodecUnknown CodecType = 0x00
gettyCodecJson = 0x01 CodecJson = 0x01
gettyCodecProtobuf = 0x02 CodecProtobuf = 0x02
) )
var ( var (
...@@ -87,29 +87,37 @@ var ( ...@@ -87,29 +87,37 @@ var (
"protobuf", "protobuf",
} }
Codecs = map[gettyCodecType]Codec{ Codecs = map[CodecType]Codec{
gettyCodecJson: &JSONCodec{}, CodecJson: &JSONCodec{},
gettyCodecProtobuf: &PBCodec{}, CodecProtobuf: &PBCodec{},
} }
) )
func (c gettyCodecType) String() string { func (c CodecType) String() string {
if c == gettyCodecJson || c == gettyCodecProtobuf { if c == CodecJson || c == CodecProtobuf {
return gettyCodecStrings[c] return gettyCodecStrings[c]
} }
return gettyCodecStrings[gettyCodecUnknown] return gettyCodecStrings[CodecUnknown]
} }
func String2CodecType(codecType string) gettyCodecType { func (c CodecType) CheckValidity() bool {
if c == CodecJson || c == CodecProtobuf {
return true
}
return false
}
func GetCodecType(codecType string) CodecType {
switch codecType { switch codecType {
case gettyCodecStrings[gettyCodecJson]: case gettyCodecStrings[CodecJson]:
return gettyCodecJson return CodecJson
case gettyCodecStrings[gettyCodecProtobuf]: case gettyCodecStrings[CodecProtobuf]:
return gettyCodecProtobuf return CodecProtobuf
} }
return gettyCodecUnknown return CodecUnknown
} }
// Codec defines the interface that decode/encode body. // Codec defines the interface that decode/encode body.
...@@ -187,9 +195,9 @@ func init() { ...@@ -187,9 +195,9 @@ func init() {
} }
type RPCPackage interface { type RPCPackage interface {
Marshal(gettyCodecType, *bytes.Buffer) (int, error) Marshal(CodecType, *bytes.Buffer) (int, error)
// @buf length should be equal to GettyPkg.GettyPackageHeader.Len // @buf length should be equal to GettyPkg.GettyPackageHeader.Len
Unmarshal(sz gettyCodecType, buf *bytes.Buffer) error Unmarshal(sz CodecType, buf *bytes.Buffer) error
GetBody() []byte GetBody() []byte
GetHeader() interface{} GetHeader() interface{}
} }
...@@ -203,7 +211,7 @@ type GettyPackageHeader struct { ...@@ -203,7 +211,7 @@ type GettyPackageHeader struct {
Code GettyErrorCode // error code Code GettyErrorCode // error code
ServiceID uint32 // service id ServiceID uint32 // service id
CodecType gettyCodecType CodecType CodecType
} }
type GettyPackage struct { type GettyPackage struct {
...@@ -315,7 +323,7 @@ func NewGettyRPCRequest() RPCPackage { ...@@ -315,7 +323,7 @@ func NewGettyRPCRequest() RPCPackage {
return &GettyRPCRequest{} return &GettyRPCRequest{}
} }
func (req *GettyRPCRequest) Marshal(sz gettyCodecType, buf *bytes.Buffer) (int, error) { func (req *GettyRPCRequest) Marshal(sz CodecType, buf *bytes.Buffer) (int, error) {
codec := Codecs[sz] codec := Codecs[sz]
if codec == nil { if codec == nil {
return 0, jerrors.Errorf("can not find codec for %d", sz) return 0, jerrors.Errorf("can not find codec for %d", sz)
...@@ -348,7 +356,7 @@ func (req *GettyRPCRequest) Marshal(sz gettyCodecType, buf *bytes.Buffer) (int, ...@@ -348,7 +356,7 @@ func (req *GettyRPCRequest) Marshal(sz gettyCodecType, buf *bytes.Buffer) (int,
return 2 + len(headerData) + 2 + len(bodyData), nil return 2 + len(headerData) + 2 + len(bodyData), nil
} }
func (req *GettyRPCRequest) Unmarshal(sz gettyCodecType, buf *bytes.Buffer) error { func (req *GettyRPCRequest) Unmarshal(sz CodecType, buf *bytes.Buffer) error {
var headerLen uint16 var headerLen uint16
err := binary.Read(buf, binary.LittleEndian, &headerLen) err := binary.Read(buf, binary.LittleEndian, &headerLen)
if err != nil { if err != nil {
...@@ -418,7 +426,7 @@ func NewGettyRPCResponse() RPCPackage { ...@@ -418,7 +426,7 @@ func NewGettyRPCResponse() RPCPackage {
return &GettyRPCResponse{} return &GettyRPCResponse{}
} }
func (resp *GettyRPCResponse) Marshal(sz gettyCodecType, buf *bytes.Buffer) (int, error) { func (resp *GettyRPCResponse) Marshal(sz CodecType, buf *bytes.Buffer) (int, error) {
codec := Codecs[sz] codec := Codecs[sz]
if codec == nil { if codec == nil {
return 0, jerrors.Errorf("can not find codec for %d", sz) return 0, jerrors.Errorf("can not find codec for %d", sz)
...@@ -453,7 +461,7 @@ func (resp *GettyRPCResponse) Marshal(sz gettyCodecType, buf *bytes.Buffer) (int ...@@ -453,7 +461,7 @@ func (resp *GettyRPCResponse) Marshal(sz gettyCodecType, buf *bytes.Buffer) (int
return 2 + len(headerData) + 2 + len(bodyData), nil return 2 + len(headerData) + 2 + len(bodyData), nil
} }
func (resp *GettyRPCResponse) Unmarshal(sz gettyCodecType, buf *bytes.Buffer) error { func (resp *GettyRPCResponse) Unmarshal(sz CodecType, buf *bytes.Buffer) error {
var headerLen uint16 var headerLen uint16
err := binary.Read(buf, binary.LittleEndian, &headerLen) err := binary.Read(buf, binary.LittleEndian, &headerLen)
if err != nil { if err != nil {
......
package rpc package rpc
import ( import "time"
"fmt"
"time"
)
import ( import (
config "github.com/koding/multiconfig" jerrors "github.com/juju/errors"
) )
type ( type (
...@@ -46,8 +43,6 @@ type ( ...@@ -46,8 +43,6 @@ type (
Host string `default:"127.0.0.1" yaml:"host" json:"host,omitempty"` Host string `default:"127.0.0.1" yaml:"host" json:"host,omitempty"`
Ports []string `yaml:"ports" json:"ports,omitempty"` // `default:["10000"]` Ports []string `yaml:"ports" json:"ports,omitempty"` // `default:["10000"]`
ProfilePort int `default:"10086" yaml:"profile_port" json:"profile_port,omitempty"` ProfilePort int `default:"10086" yaml:"profile_port" json:"profile_port,omitempty"`
CodecType string `default:"json" yaml:"codec_type" json:"codec_type,omitempty"`
codecType gettyCodecType
// session // session
SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"` SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"`
...@@ -60,9 +55,6 @@ type ( ...@@ -60,9 +55,6 @@ type (
// session tcp parameters // session tcp parameters
GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"` GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"`
// registry center
Registry RegistryConfig `required:"true" yaml:"registry_config" json:"registry_config,omitempty"`
} }
// Config holds supported types by the multiconfig package // Config holds supported types by the multiconfig package
...@@ -70,21 +62,13 @@ type ( ...@@ -70,21 +62,13 @@ type (
// local address // local address
AppName string `default:"rcp-client" yaml:"app_name" json:"app_name,omitempty"` AppName string `default:"rcp-client" yaml:"app_name" json:"app_name,omitempty"`
Host string `default:"127.0.0.1" yaml:"host" json:"host,omitempty"` Host string `default:"127.0.0.1" yaml:"host" json:"host,omitempty"`
Ports []string `yaml:"ports" json:"ports,omitempty"` // `default:["10000"]`
ProfilePort int `default:"10086" yaml:"profile_port" json:"profile_port,omitempty"` ProfilePort int `default:"10086" yaml:"profile_port" json:"profile_port,omitempty"`
// server
// !!! Attention: If u wanna use registry, the ServerHost & ServerPort could be nil.
ServerHost string `default:"127.0.0.1" yaml:"server_host" json:"server_host,omitempty"`
ServerPort int `default:"10000" yaml:"server_port" json:"server_port,omitempty"`
CodecType string `default:"json" yaml:"codec_type" json:"codec_type,omitempty"`
codecType gettyCodecType
// session pool // session pool
ConnectionNum int `default:"16" yaml:"connection_num" json:"connection_num,omitempty"` ConnectionNum int `default:"16" yaml:"connection_num" json:"connection_num,omitempty"`
// heartbeat // heartbeat
HeartbeatPeriod string `default:"15s" yaml:"heartbeat_period" json:"heartbeat_period,omitempty"` HeartbeatPeriod string `default:"15s" yaml:"heartbeat_period" json:"heartbeat_period, omitempty"`
heartbeatPeriod time.Duration heartbeatPeriod time.Duration
// session // session
...@@ -92,7 +76,7 @@ type ( ...@@ -92,7 +76,7 @@ type (
sessionTimeout time.Duration sessionTimeout time.Duration
// app // app
FailFastTimeout string `default:"5s" yaml:"fail_fast_timeout" json:"fail_fast_timeout,omitempty"` FailFastTimeout string `default:"5s" yaml:"fail_fast_timeout" json:"fail_fast_timeout, omitempty"`
failFastTimeout time.Duration failFastTimeout time.Duration
// Connection Pool // Connection Pool
...@@ -101,75 +85,59 @@ type ( ...@@ -101,75 +85,59 @@ type (
// session tcp parameters // session tcp parameters
GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"` GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"`
// registry center
Registry RegistryConfig `required:"true" yaml:"registry_config" json:"registry_config,omitempty"`
} }
) )
func loadClientConf(confFile string) *ClientConfig { func (c *GettySessionParam) CheckValidity() error {
var err error var err error
conf := new(ClientConfig)
config.MustLoadWithPath(confFile, conf) if c.keepAlivePeriod, err = time.ParseDuration(c.KeepAlivePeriod); err != nil {
conf.heartbeatPeriod, err = time.ParseDuration(conf.HeartbeatPeriod) return jerrors.Annotatef(err, "time.ParseDuration(KeepAlivePeriod{%#v})", c.KeepAlivePeriod)
if err != nil {
panic(fmt.Sprintf("time.ParseDuration(HeartbeatPeroid{%#v}) = error{%v}", conf.HeartbeatPeriod, err))
}
conf.sessionTimeout, err = time.ParseDuration(conf.SessionTimeout)
if err != nil {
panic(fmt.Sprintf("time.ParseDuration(SessionTimeout{%#v}) = error{%v}", conf.SessionTimeout, err))
}
conf.failFastTimeout, err = time.ParseDuration(conf.FailFastTimeout)
if err != nil {
panic(fmt.Sprintf("time.ParseDuration(FailFastTimeout{%#v}) = error{%v}", conf.FailFastTimeout, err))
}
conf.GettySessionParam.keepAlivePeriod, err = time.ParseDuration(conf.GettySessionParam.KeepAlivePeriod)
if err != nil {
panic(fmt.Sprintf("time.ParseDuration(KeepAlivePeriod{%#v}) = error{%v}", conf.GettySessionParam.KeepAlivePeriod, err))
} }
conf.GettySessionParam.tcpReadTimeout, err = time.ParseDuration(conf.GettySessionParam.TcpReadTimeout)
if err != nil { if c.tcpReadTimeout, err = time.ParseDuration(c.TcpReadTimeout); err != nil {
panic(fmt.Sprintf("time.ParseDuration(TcpReadTimeout{%#v}) = error{%v}", conf.GettySessionParam.TcpReadTimeout, err)) return jerrors.Annotatef(err, "time.ParseDuration(TcpReadTimeout{%#v})", c.TcpReadTimeout)
} }
conf.GettySessionParam.tcpWriteTimeout, err = time.ParseDuration(conf.GettySessionParam.TcpWriteTimeout)
if err != nil { if c.tcpWriteTimeout, err = time.ParseDuration(c.TcpWriteTimeout); err != nil {
panic(fmt.Sprintf("time.ParseDuration(TcpWriteTimeout{%#v}) = error{%v}", conf.GettySessionParam.TcpWriteTimeout, err)) return jerrors.Annotatef(err, "time.ParseDuration(TcpWriteTimeout{%#v})", c.TcpWriteTimeout)
} }
conf.GettySessionParam.waitTimeout, err = time.ParseDuration(conf.GettySessionParam.WaitTimeout)
if err != nil { if c.waitTimeout, err = time.ParseDuration(c.WaitTimeout); err != nil {
panic(fmt.Sprintf("time.ParseDuration(WaitTimeout{%#v}) = error{%v}", conf.GettySessionParam.WaitTimeout, err)) return jerrors.Annotatef(err, "time.ParseDuration(WaitTimeout{%#v})", c.WaitTimeout)
} }
return conf
return nil
} }
func loadServerConf(confFile string) *ServerConfig { func (c *ClientConfig) CheckValidity() error {
var err error var err error
conf := new(ServerConfig)
config.MustLoadWithPath(confFile, conf)
conf.sessionTimeout, err = time.ParseDuration(conf.SessionTimeout) if c.heartbeatPeriod, err = time.ParseDuration(c.HeartbeatPeriod); err != nil {
if err != nil { return jerrors.Annotatef(err, "time.ParseDuration(HeartbeatPeroid{%#v})", c.HeartbeatPeriod)
panic(fmt.Sprintf("time.ParseDuration(SessionTimeout{%#v}) = error{%v}", conf.SessionTimeout, err))
}
conf.failFastTimeout, err = time.ParseDuration(conf.FailFastTimeout)
if err != nil {
panic(fmt.Sprintf("time.ParseDuration(FailFastTimeout{%#v}) = error{%v}", conf.FailFastTimeout, err))
} }
conf.GettySessionParam.keepAlivePeriod, err = time.ParseDuration(conf.GettySessionParam.KeepAlivePeriod)
if err != nil { if c.sessionTimeout, err = time.ParseDuration(c.SessionTimeout); err != nil {
panic(fmt.Sprintf("time.ParseDuration(KeepAlivePeriod{%#v}) = error{%v}", conf.GettySessionParam.KeepAlivePeriod, err)) return jerrors.Annotatef(err, "time.ParseDuration(SessionTimeout{%#v})", c.SessionTimeout)
} }
conf.GettySessionParam.tcpReadTimeout, err = time.ParseDuration(conf.GettySessionParam.TcpReadTimeout)
if err != nil { if c.failFastTimeout, err = time.ParseDuration(c.FailFastTimeout); err != nil {
panic(fmt.Sprintf("time.ParseDuration(TcpReadTimeout{%#v}) = error{%v}", conf.GettySessionParam.TcpReadTimeout, err)) return jerrors.Annotatef(err, "time.ParseDuration(FailFastTimeout{%#v})", c.FailFastTimeout)
} }
conf.GettySessionParam.tcpWriteTimeout, err = time.ParseDuration(conf.GettySessionParam.TcpWriteTimeout)
if err != nil { return jerrors.Trace(c.GettySessionParam.CheckValidity())
panic(fmt.Sprintf("time.ParseDuration(TcpWriteTimeout{%#v}) = error{%v}", conf.GettySessionParam.TcpWriteTimeout, err)) }
func (c *ServerConfig) CheckValidity() error {
var err error
if c.sessionTimeout, err = time.ParseDuration(c.SessionTimeout); err != nil {
return jerrors.Annotatef(err, "time.ParseDuration(SessionTimeout{%#v})", c.SessionTimeout)
} }
conf.GettySessionParam.waitTimeout, err = time.ParseDuration(conf.GettySessionParam.WaitTimeout)
if err != nil { if c.failFastTimeout, err = time.ParseDuration(c.FailFastTimeout); err != nil {
panic(fmt.Sprintf("time.ParseDuration(WaitTimeout{%#v}) = error{%v}", conf.GettySessionParam.WaitTimeout, err)) return jerrors.Annotatef(err, "time.ParseDuration(FailFastTimeout{%#v})", c.FailFastTimeout)
} }
return conf
return jerrors.Trace(c.GettySessionParam.CheckValidity())
} }
package main
import (
"time"
)
import (
"github.com/AlexStocks/getty/rpc"
"github.com/AlexStocks/getty/rpc/example/data"
log "github.com/AlexStocks/log4go"
jerrors "github.com/juju/errors"
)
func main() {
log.LoadConfiguration("client_log.xml")
client, err := rpc.NewClient("client_config.toml")
if err != nil {
panic(jerrors.ErrorStack(err))
}
// client.SetCodecType(rpc.ProtoBuffer)//默认是json序列化
defer client.Close()
for i := 0; i < 100; i++ {
go func() {
var res string
err := client.Call("127.0.0.1:20000", "json", "TestRpc", "Test", data.TestABC{"aaa", "bbb", "ccc"}, &res)
if err != nil {
log.Error(err)
return
}
log.Info(res)
}()
}
for i := 0; i < 100; i++ {
go func() {
var result int
err := client.Call("127.0.0.1:20000", "json", "TestRpc", "Add", 1, &result)
if err != nil {
log.Error(err)
return
}
log.Info(result)
}()
}
var errInt int
err = client.Call("127.0.0.1:20000", "json", "TestRpc", "Err", 2, &errInt)
if err != nil {
log.Error(jerrors.ErrorStack(err))
}
time.Sleep(20 * time.Second)
}
# toml configure file
# toml中key的首字母可以小写,但是对应的golang中的struct成员首字母必须大写
AppName = "RPC-CLIENT"
# host
LocalHost = "127.0.0.1"
# server
# ServerHost = "192.168.8.3"
ServerHost = "127.0.0.1"
ServerPort = 10000
ProfilePort = 10080
# connection pool
# 连接池连接数目
ConnectionNum = 10
# session
# client与server之间连接的心跳周期
HeartbeatPeriod = "10s"
# client与server之间连接的超时时间
SessionTimeout = "20s"
# app fail fast
FailFastTimeout = "3s"
# tcp
[GettySessionParam]
CompressEncoding = true
TcpNoDelay = true
TcpKeepAlive = true
KeepAlivePeriod = "120s"
TcpRBufSize = 262144
TcpWBufSize = 65536
PkgRQSize = 512
PkgWQSize = 256
TcpReadTimeout = "1s"
TcpWriteTimeout = "5s"
WaitTimeout = "1s"
MaxMsgLen = 128
SessionName = "getty-rpc-client"
# registry
[Registry]
Type = "etcd"
# Addr = "127.0.0.1:2379"
KeepaliveTimeout = 5
Root = "/getty"
IDC = "bj-unicom"
NodeID = "n147"
<logging>
<filter enabled="true">
<tag>stdout</tag>
<type>console</type>
<!-- level is (:?FINEST|FINE|DEBUG|TRACE|INFO|WARNING|ERROR) -->
<level>DEBUG</level>
</filter>
<filter enabled="false">
<tag>debug_file</tag>
<type>file</type>
<level>DEBUG</level>
<property name="filename">logs/debug.log</property>
<property name="format">[%D %T] [%L] [%S] %M</property>
<property name="rotate">true</property> <!-- true enables log rotation, otherwise append -->
<property name="maxsize">0M</property> <!-- \d+[KMG]? Suffixes are in terms of 2**10 -->
<property name="maxlines">0K</property> <!-- \d+[KMG]? Suffixes are in terms of thousands -->
<property name="daily">true</property> <!-- Automatically rotates when a log message is written after midnight -->
</filter>
<filter enabled="true">
<tag>info_file</tag>
<type>file</type>
<level>INFO</level>
<property name="filename">logs/info.log</property>
<!--
%T - Time (15:04:05 MST)
%t - Time (15:04)
%D - Date (2006/01/02)
%d - Date (01/02/06)
%L - Level (FNST, FINE, DEBG, TRAC, WARN, EROR, CRIT)
%S - Source
%M - Message
It ignores unknown format strings (and removes them)
Recommended: "[%D %T] [%L] (%S) %M"
-->
<property name="format">[%D %T] [%L] [%S] %M</property>
<property name="rotate">true</property> <!-- true enables log rotation, otherwise append -->
<property name="maxsize">0M</property> <!-- \d+[KMG]? Suffixes are in terms of 2**10 -->
<property name="maxlines">0K</property> <!-- \d+[KMG]? Suffixes are in terms of thousands -->
<property name="daily">true</property> <!-- Automatically rotates when a log message is written after midnight -->
</filter>
<filter enabled="true">
<tag>warn_file</tag>
<type>file</type>
<level>WARNING</level>
<property name="filename">logs/warn.log</property>
<property name="format">[%D %T] [%L] [%S] %M</property>
<property name="rotate">true</property> <!-- true enables log rotation, otherwise append -->
<property name="maxsize">0M</property> <!-- \d+[KMG]? Suffixes are in terms of 2**10 -->
<property name="maxlines">0K</property> <!-- \d+[KMG]? Suffixes are in terms of thousands -->
<property name="daily">true</property> <!-- Automatically rotates when a log message is written after midnight -->
</filter>
<filter enabled="true">
<tag>error_file</tag>
<type>file</type>
<level>ERROR</level>
<property name="filename">logs/error.log</property>
<property name="format">[%D %T] [%L] [%S] %M</property>
<property name="rotate">true</property> <!-- true enables log rotation, otherwise append -->
<property name="maxsize">0M</property> <!-- \d+[KMG]? Suffixes are in terms of 2**10 -->
<property name="maxlines">0K</property> <!-- \d+[KMG]? Suffixes are in terms of thousands -->
<property name="daily">true</property> <!-- Automatically rotates when a log message is written after midnight -->
</filter>
</logging>
package data
import (
"errors"
log "github.com/AlexStocks/log4go"
)
type TestABC struct {
A, B, C string
}
type TestRpc struct {
i int
}
func (r *TestRpc) Service() string {
return "TestRpc"
}
func (r *TestRpc) Version() string {
return "v1.0"
}
func (r *TestRpc) Test(arg TestABC, res *string) error {
log.Debug("arg:%+v", arg)
*res = "this is a test"
return nil
}
func (r *TestRpc) Add(n int, res *int) error {
r.i += n
*res = r.i + 100
return nil
}
func (r *TestRpc) Err(n int, res *int) error {
return errors.New("this is a error test")
}
package main
import (
"github.com/AlexStocks/getty/rpc"
"github.com/AlexStocks/getty/rpc/example/data"
log "github.com/AlexStocks/log4go"
jerrors "github.com/juju/errors"
)
func main() {
log.LoadConfiguration("./server_log.xml")
srv, err := rpc.NewServer("./server_config.toml")
if err != nil {
panic(jerrors.ErrorStack(err))
}
err = srv.Register(new(data.TestRpc))
srv.Run()
}
# toml configure file
# toml中key的首字母可以小写,但是对应的golang中的struct成员首字母必须大写
AppName = "RPC-SERVER"
Host = "127.0.0.1"
# Host = "192.168.35.1"
# Host = "192.168.8.3"
Ports = ["10000", "20000"]
ProfilePort = 10086
CodecType = "json"
# session
# client与server之间连接的超时时间
SessionTimeout = "20s"
SessionNumber = 700
# app
FailFastTimeout = "3s"
# tcp
[GettySessionParam]
CompressEncoding = true
TcpNoDelay = true
TcpKeepAlive = true
KeepAlivePeriod = "120s"
TcpRBufSize = 262144
TcpWBufSize = 524288
PkgRQSize = 1024
PkgWQSize = 512
TcpReadTimeout = "1s"
TcpWriteTimeout = "5s"
WaitTimeout = "1s"
MaxMsgLen = 128
SessionName = "getty-rpc-server"
# registry
[Registry]
Type = "etcd"
# Addr = "127.0.0.1:2379"
KeepaliveTimeout = 5
Root = "/getty"
IDC = "bj-unicom"
NodeID = "n147"
<logging>
<filter enabled="true">
<tag>stdout</tag>
<type>console</type>
<!-- level is (:?FINEST|FINE|DEBUG|TRACE|INFO|WARNING|ERROR) -->
<level>DEBUG</level>
</filter>
<filter enabled="false">
<tag>debug_file</tag>
<type>file</type>
<level>DEBUG</level>
<property name="filename">logs/debug.log</property>
<property name="format">[%D %T] [%L] [%S] %M</property>
<property name="rotate">true</property> <!-- true enables log rotation, otherwise append -->
<property name="maxsize">0M</property> <!-- \d+[KMG]? Suffixes are in terms of 2**10 -->
<property name="maxlines">0K</property> <!-- \d+[KMG]? Suffixes are in terms of thousands -->
<property name="daily">true</property> <!-- Automatically rotates when a log message is written after midnight -->
</filter>
<filter enabled="true">
<tag>info_file</tag>
<type>file</type>
<level>INFO</level>
<property name="filename">logs/info.log</property>
<!--
%T - Time (15:04:05 MST)
%t - Time (15:04)
%D - Date (2006/01/02)
%d - Date (01/02/06)
%L - Level (FNST, FINE, DEBG, TRAC, WARN, EROR, CRIT)
%S - Source
%M - Message
It ignores unknown format strings (and removes them)
Recommended: "[%D %T] [%L] (%S) %M"
-->
<property name="format">[%D %T] [%L] [%S] %M</property>
<property name="rotate">true</property> <!-- true enables log rotation, otherwise append -->
<property name="maxsize">0M</property> <!-- \d+[KMG]? Suffixes are in terms of 2**10 -->
<property name="maxlines">0K</property> <!-- \d+[KMG]? Suffixes are in terms of thousands -->
<property name="daily">true</property> <!-- Automatically rotates when a log message is written after midnight -->
</filter>
<filter enabled="true">
<tag>warn_file</tag>
<type>file</type>
<level>WARNING</level>
<property name="filename">logs/warn.log</property>
<property name="format">[%D %T] [%L] [%S] %M</property>
<property name="rotate">true</property> <!-- true enables log rotation, otherwise append -->
<property name="maxsize">0M</property> <!-- \d+[KMG]? Suffixes are in terms of 2**10 -->
<property name="maxlines">0K</property> <!-- \d+[KMG]? Suffixes are in terms of thousands -->
<property name="daily">true</property> <!-- Automatically rotates when a log message is written after midnight -->
</filter>
<filter enabled="true">
<tag>error_file</tag>
<type>file</type>
<level>ERROR</level>
<property name="filename">logs/error.log</property>
<property name="format">[%D %T] [%L] [%S] %M</property>
<property name="rotate">true</property> <!-- true enables log rotation, otherwise append -->
<property name="maxsize">0M</property> <!-- \d+[KMG]? Suffixes are in terms of 2**10 -->
<property name="maxlines">0K</property> <!-- \d+[KMG]? Suffixes are in terms of thousands -->
<property name="daily">true</property> <!-- Automatically rotates when a log message is written after midnight -->
</filter>
</logging>
...@@ -230,7 +230,8 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { ...@@ -230,7 +230,8 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
func (h *RpcClientHandler) OnCron(session getty.Session) { func (h *RpcClientHandler) OnCron(session getty.Session) {
rpcSession, err := h.conn.getClientRpcSession(session) rpcSession, err := h.conn.getClientRpcSession(session)
if err != nil { if err != nil {
log.Error("client.getClientSession(session{%s}) = error{%#v}", session.Stat(), err) log.Error("client.getClientSession(session{%s}) = error{%s}",
session.Stat(), jerrors.ErrorStack(err))
return return
} }
if h.conn.pool.rpcClient.conf.sessionTimeout.Nanoseconds() < time.Since(session.GetActive()).Nanoseconds() { if h.conn.pool.rpcClient.conf.sessionTimeout.Nanoseconds() < time.Since(session.GetActive()).Nanoseconds() {
...@@ -240,5 +241,7 @@ func (h *RpcClientHandler) OnCron(session getty.Session) { ...@@ -240,5 +241,7 @@ func (h *RpcClientHandler) OnCron(session getty.Session) {
return return
} }
h.conn.pool.rpcClient.heartbeat(session) codecType := GetCodecType(h.conn.protocol)
h.conn.pool.rpcClient.heartbeat(session, codecType)
} }
...@@ -65,7 +65,7 @@ func (c *gettyRPCClientConn) newSession(session getty.Session) error { ...@@ -65,7 +65,7 @@ func (c *gettyRPCClientConn) newSession(session getty.Session) error {
var ( var (
ok bool ok bool
tcpConn *net.TCPConn tcpConn *net.TCPConn
conf *ClientConfig conf ClientConfig
) )
conf = c.pool.rpcClient.conf conf = c.pool.rpcClient.conf
......
...@@ -3,110 +3,39 @@ package rpc ...@@ -3,110 +3,39 @@ package rpc
import ( import (
"fmt" "fmt"
"net" "net"
"os"
"os/signal"
"reflect" "reflect"
"strconv"
"strings"
"syscall"
"time"
) )
import ( import (
"github.com/AlexStocks/getty" "github.com/AlexStocks/getty"
"github.com/AlexStocks/goext/database/registry"
"github.com/AlexStocks/goext/database/registry/etcdv3"
"github.com/AlexStocks/goext/database/registry/zookeeper"
"github.com/AlexStocks/goext/net" "github.com/AlexStocks/goext/net"
log "github.com/AlexStocks/log4go" log "github.com/AlexStocks/log4go"
jerrors "github.com/juju/errors" jerrors "github.com/juju/errors"
) )
type Server struct { type Server struct {
conf *ServerConfig conf ServerConfig
serviceMap map[string]*service serviceMap map[string]*service
tcpServerList []getty.Server tcpServerList []getty.Server
// registry
registry gxregistry.Registry
sa gxregistry.ServiceAttr
nodes []*gxregistry.Node
} }
var ( var (
ErrIllegalConf = "illegal conf: " ErrIllegalConf = "illegal conf: "
) )
func NewServer(confFile string) (*Server, error) { func NewServer(conf *ServerConfig) (*Server, error) {
conf := loadServerConf(confFile) if err := conf.CheckValidity(); err != nil {
if conf.codecType = String2CodecType(conf.CodecType); conf.codecType == gettyCodecUnknown { return nil, jerrors.Trace(err)
return nil, jerrors.New(fmt.Sprintf(ErrIllegalConf+"codec type %s", conf.CodecType))
} }
s := &Server{ s := &Server{
serviceMap: make(map[string]*service), serviceMap: make(map[string]*service),
conf: conf, conf: *conf,
}
if len(s.conf.Registry.Addr) != 0 {
var err error
var registry gxregistry.Registry
addrList := strings.Split(s.conf.Registry.Addr, ",")
switch s.conf.Registry.Type {
case "etcd":
registry, err = gxetcd.NewRegistry(
gxregistry.WithAddrs(addrList...),
gxregistry.WithTimeout(time.Duration(int(time.Second)*s.conf.Registry.KeepaliveTimeout)),
gxregistry.WithRoot(s.conf.Registry.Root),
)
case "zookeeper":
registry, err = gxzookeeper.NewRegistry(
gxregistry.WithAddrs(addrList...),
gxregistry.WithTimeout(time.Duration(int(time.Second)*s.conf.Registry.KeepaliveTimeout)),
gxregistry.WithRoot(s.conf.Registry.Root),
)
default:
return nil, jerrors.New(fmt.Sprintf(ErrIllegalConf+"registry type %s", s.conf.Registry.Type))
}
if err != nil {
return nil, jerrors.Trace(err)
}
s.registry = registry
if s.registry != nil {
s.sa = gxregistry.ServiceAttr{
Group: s.conf.Registry.IDC,
Role: gxregistry.SRT_Provider,
Protocol: s.conf.CodecType,
}
for _, p := range s.conf.Ports {
port, err := strconv.Atoi(p)
if err != nil {
return nil, jerrors.New(fmt.Sprintf("illegal port %s", p))
}
s.nodes = append(s.nodes,
&gxregistry.Node{
ID: s.conf.Registry.NodeID + "-" + net.JoinHostPort(s.conf.Host, p),
Address: s.conf.Host,
Port: int32(port),
},
)
}
}
} }
return s, nil return s, nil
} }
func (s *Server) Run() {
s.Init()
log.Info("%s starts successfull! its version=%s, its listen ends=%s:%s\n",
s.conf.AppName, getty.Version, s.conf.Host, s.conf.Ports)
s.initSignal()
}
func (s *Server) Register(rcvr GettyRPCService) error { func (s *Server) Register(rcvr GettyRPCService) error {
svc := &service{ svc := &service{
typ: reflect.TypeOf(rcvr), typ: reflect.TypeOf(rcvr),
...@@ -143,15 +72,6 @@ func (s *Server) Register(rcvr GettyRPCService) error { ...@@ -143,15 +72,6 @@ func (s *Server) Register(rcvr GettyRPCService) error {
} }
s.serviceMap[svc.name] = svc s.serviceMap[svc.name] = svc
if s.registry != nil {
sa := s.sa
sa.Service = rcvr.Service()
sa.Version = rcvr.Version()
service := gxregistry.Service{Attr: &sa, Nodes: s.nodes}
if err := s.registry.Register(service); err != nil {
return jerrors.Trace(err)
}
}
return nil return nil
} }
...@@ -193,7 +113,7 @@ func (s *Server) newSession(session getty.Session) error { ...@@ -193,7 +113,7 @@ func (s *Server) newSession(session getty.Session) error {
return nil return nil
} }
func (s *Server) Init() { func (s *Server) Start() {
var ( var (
addr string addr string
portList []string portList []string
...@@ -216,9 +136,6 @@ func (s *Server) Init() { ...@@ -216,9 +136,6 @@ func (s *Server) Init() {
} }
func (s *Server) Stop() { func (s *Server) Stop() {
if s.registry != nil {
s.registry.Close()
}
list := s.tcpServerList list := s.tcpServerList
s.tcpServerList = nil s.tcpServerList = nil
if list != nil { if list != nil {
...@@ -227,28 +144,3 @@ func (s *Server) Stop() { ...@@ -227,28 +144,3 @@ func (s *Server) Stop() {
} }
} }
} }
func (s *Server) initSignal() {
signals := make(chan os.Signal, 1)
// It is impossible to block SIGKILL or syscall.SIGSTOP
signal.Notify(signals, os.Interrupt, os.Kill, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
for {
sig := <-signals
log.Info("get signal %s", sig.String())
switch sig {
case syscall.SIGHUP:
// reload()
default:
go time.AfterFunc(s.conf.failFastTimeout, func() {
log.Exit("app exit now by force...")
log.Close()
})
// if @s can not stop in s.conf.failFastTimeout, getty will Force Quit.
s.Stop()
log.Exit("app exit now...")
log.Close()
return
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment