Commit f16d6d53 authored by AlexStocks's avatar AlexStocks

merge rpc

parents 11c18d3f 43189243
......@@ -16,7 +16,21 @@ You can get code example in https://github.com/AlexStocks/getty-examples.
## RPC
Support Json and Protobuf.
A open source, Go based, RPC framework.
Feature list:
- 1 Transport: TCP(√), UDP, Websocket
- 2 Codec: ProtoBuf(√), JSON(√)
- 3 Service Discovery: Service Publish(√), Service Watch(√), Service Notify(√)
- 4 Registry: ZooKeeper(√), Etcd(x)
- 5 Strategy: Failover(√), Failfast(√)
- 6 Load Balance: Random(√), RoundRobin(√)
- 7 Metrics: Invoke Statistics(x), User Auth(x)
##
## LICENCE
......
......@@ -3,20 +3,21 @@ package rpc
import (
"fmt"
"math/rand"
"net"
"strings"
"sync"
"sync/atomic"
"time"
)
import (
jerrors "github.com/juju/errors"
)
import (
"github.com/AlexStocks/getty"
"github.com/AlexStocks/goext/database/filter"
"github.com/AlexStocks/goext/database/filter/pool"
"github.com/AlexStocks/goext/database/registry"
"github.com/AlexStocks/goext/database/registry/etcdv3"
"github.com/AlexStocks/goext/database/registry/zookeeper"
"github.com/AlexStocks/goext/net"
log "github.com/AlexStocks/log4go"
"github.com/AlexStocks/goext/sync/atomic"
jerrors "github.com/juju/errors"
)
var (
......@@ -30,93 +31,96 @@ func init() {
}
type Client struct {
conf *ClientConfig
lock sync.RWMutex
sessions []*rpcSession
gettyClient getty.Client
codecType SerializeType
sequence uint64
conf *ClientConfig
pool *gettyRPCClientConnPool
sequence gxatomic.Uint64
pendingLock sync.RWMutex
pendingResponses map[uint64]*PendingResponse
// registry
registry gxregistry.Registry
sa gxregistry.ServiceAttr
filter gxfilter.Filter
}
func NewClient(confFile string) *Client {
func NewClient(confFile string) (*Client, error) {
conf := loadClientConf(confFile)
c := &Client{
pendingResponses: make(map[uint64]*PendingResponse),
conf: conf,
gettyClient: getty.NewTCPClient(
getty.WithServerAddress(gxnet.HostAddress(conf.ServerHost, conf.ServerPort)),
getty.WithConnectionNumber((int)(conf.ConnectionNum)),
),
codecType: JSON,
}
c.gettyClient.RunEventLoop(c.newSession)
idx := 1
for {
idx++
if c.isAvailable() {
break
}
c.pool = newGettyRPCClientConnPool(c, conf.PoolSize, time.Duration(int(time.Second)*conf.PoolTTL))
if idx > 12000 {
panic("failed to create client in 2 minutes")
if len(c.conf.Registry.Addr) == 0 {
if conf.codecType = String2CodecType(conf.CodecType); conf.codecType == gettyCodecUnknown {
return nil, jerrors.New(fmt.Sprintf(ErrIllegalConf+"codec type %s", conf.CodecType))
}
if conf.ServerPort == 0 {
return nil, jerrors.New(fmt.Sprintf(ErrIllegalConf + "both registry addr and ServerPort is nil"))
}
time.Sleep(1e6)
}
log.Info("client init ok")
return c
}
func (c *Client) SetCodecType(st SerializeType) {
c.codecType = st
}
func (c *Client) newSession(session getty.Session) error {
var (
ok bool
tcpConn *net.TCPConn
)
if c.conf.GettySessionParam.CompressEncoding {
session.SetCompressType(getty.CompressZip)
}
if tcpConn, ok = session.Conn().(*net.TCPConn); !ok {
panic(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection\n", session.Stat(), session.Conn()))
}
_, err := c.pool.getConn(conf.CodecType, gxnet.HostAddress(conf.ServerHost, conf.ServerPort))
return c, jerrors.Trace(err)
}
var err error
var registry gxregistry.Registry
addrList := strings.Split(c.conf.Registry.Addr, ",")
switch c.conf.Registry.Type {
case "etcd":
registry, err = gxetcd.NewRegistry(
gxregistry.WithAddrs(addrList...),
gxregistry.WithTimeout(time.Duration(int(time.Second)*c.conf.Registry.KeepaliveTimeout)),
gxregistry.WithRoot(c.conf.Registry.Root),
)
case "zookeeper":
registry, err = gxzookeeper.NewRegistry(
gxregistry.WithAddrs(addrList...),
gxregistry.WithTimeout(time.Duration(int(time.Second)*c.conf.Registry.KeepaliveTimeout)),
gxregistry.WithRoot(c.conf.Registry.Root),
)
default:
return nil, jerrors.New(fmt.Sprintf(ErrIllegalConf+"registry type %s", c.conf.Registry.Type))
}
if err != nil {
return nil, jerrors.Trace(err)
}
c.registry = registry
if c.registry != nil {
c.filter, err = gxpool.NewFilter(
gxfilter.WithBalancerMode(gxfilter.SM_Hash),
gxfilter.WithRegistry(c.registry),
gxpool.WithTTL(time.Duration(int(time.Second)*c.conf.Registry.KeepaliveTimeout)),
)
if err == nil {
return nil, jerrors.Trace(err)
}
tcpConn.SetNoDelay(c.conf.GettySessionParam.TcpNoDelay)
tcpConn.SetKeepAlive(c.conf.GettySessionParam.TcpKeepAlive)
if c.conf.GettySessionParam.TcpKeepAlive {
tcpConn.SetKeepAlivePeriod(c.conf.GettySessionParam.keepAlivePeriod)
service := gxregistry.Service{
Attr: &gxregistry.ServiceAttr{
Group: c.conf.Registry.IDC,
Role: gxregistry.SRT_Consumer,
Protocol: c.conf.CodecType,
},
Nodes: []*gxregistry.Node{&gxregistry.Node{
ID: c.conf.Registry.NodeID,
Address: c.conf.Host,
Port: 0,
},
},
}
if err = c.registry.Register(service); err != nil {
return nil, jerrors.Trace(err)
}
}
tcpConn.SetReadBuffer(c.conf.GettySessionParam.TcpRBufSize)
tcpConn.SetWriteBuffer(c.conf.GettySessionParam.TcpWBufSize)
session.SetName(c.conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(c.conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewRpcClientPackageHandler())
session.SetEventListener(NewRpcClientHandler(c))
session.SetRQLen(c.conf.GettySessionParam.PkgRQSize)
session.SetWQLen(c.conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(c.conf.GettySessionParam.tcpReadTimeout)
session.SetWriteTimeout(c.conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(c.conf.heartbeatPeriod.Nanoseconds() / 1e6))
session.SetWaitTime(c.conf.GettySessionParam.waitTimeout)
log.Debug("client new session:%s\n", session.Stat())
return nil
return c, nil
}
func (c *Client) Sequence() uint64 {
return atomic.AddUint64(&c.sequence, 1)
}
func (c *Client) Call(service, method string, args interface{}, reply interface{}) error {
func (c *Client) Call(addr, protocol, service, method string, args interface{}, reply interface{}) error {
b := &GettyRPCRequest{}
b.header.Service = service
b.header.Method = method
......@@ -129,7 +133,7 @@ func (c *Client) Call(service, method string, args interface{}, reply interface{
resp := NewPendingResponse()
resp.reply = reply
session := c.selectSession()
session := c.selectSession(protocol, addr)
if session == nil {
return errSessionNotExist
}
......@@ -142,115 +146,23 @@ func (c *Client) Call(service, method string, args interface{}, reply interface{
return jerrors.Trace(resp.err)
}
func (c *Client) isAvailable() bool {
if c.selectSession() == nil {
return false
}
return true
}
func (c *Client) Close() {
c.lock.Lock()
if c.gettyClient != nil {
for _, s := range c.sessions {
log.Info("close client session{%s, last active:%s, request number:%d}",
s.session.Stat(), s.session.GetActive().String(), s.reqNum)
s.session.Close()
}
c.gettyClient.Close()
c.gettyClient = nil
c.sessions = c.sessions[:0]
}
c.lock.Unlock()
}
func (c *Client) selectSession() getty.Session {
c.lock.RLock()
defer c.lock.RUnlock()
if c.sessions == nil {
return nil
}
count := len(c.sessions)
if count == 0 {
return nil
}
return c.sessions[rand.Int31n(int32(count))].session
}
func (c *Client) addSession(session getty.Session) {
log.Debug("add session{%s}", session.Stat())
if session == nil {
return
}
c.lock.Lock()
c.sessions = append(c.sessions, &rpcSession{session: session})
c.lock.Unlock()
}
func (c *Client) removeSession(session getty.Session) {
if session == nil {
return
}
c.lock.Lock()
defer c.lock.Unlock()
if c.sessions == nil {
return
if c.pool != nil {
c.pool.close()
}
for i, s := range c.sessions {
if s.session == session {
c.sessions = append(c.sessions[:i], c.sessions[i+1:]...)
log.Debug("delete session{%s}, its index{%d}", session.Stat(), i)
break
}
c.pool = nil
if c.registry != nil {
c.registry.Close()
}
log.Info("after remove session{%s}, left session number:%d", session.Stat(), len(c.sessions))
c.registry = nil
}
func (c *Client) updateSession(session getty.Session) {
if session == nil {
return
}
c.lock.Lock()
defer c.lock.Unlock()
if c.sessions == nil {
return
}
for i, s := range c.sessions {
if s.session == session {
c.sessions[i].reqNum++
break
}
}
}
func (c *Client) getClientRpcSession(session getty.Session) (rpcSession, error) {
var (
err error
rpcSession rpcSession
)
c.lock.Lock()
defer c.lock.Unlock()
if c.sessions == nil {
return rpcSession, errClientClosed
}
err = errSessionNotExist
for _, s := range c.sessions {
if s.session == session {
rpcSession = *s
err = nil
break
}
func (c *Client) selectSession(protocol, addr string) getty.Session {
rpcConn, err := c.pool.getConn(protocol, addr)
if err != nil {
return nil
}
return rpcSession, jerrors.Trace(err)
return rpcConn.selectSession()
}
func (c *Client) heartbeat(session getty.Session) error {
......@@ -265,12 +177,12 @@ func (c *Client) transfer(session getty.Session, req *GettyRPCRequest, resp *Pen
pkg GettyPackage
)
sequence = c.Sequence()
sequence = c.sequence.Add(1)
pkg.H.Magic = gettyPackageMagic
pkg.H.LogID = (uint32)(randomID())
pkg.H.Sequence = sequence
pkg.H.Command = gettyCmdHbRequest
pkg.H.CodecType = c.codecType
pkg.H.CodecType = c.conf.codecType
if req != nil {
pkg.H.Command = gettyCmdRPCRequest
pkg.B = req
......
......@@ -23,43 +23,6 @@ import (
// getty command
////////////////////////////////////////////
type gettyCodecType uint32
const (
gettyCodecUnknown gettyCodecType = 0x00
gettyJson = 0x01
gettyProtobuf = 0x02
)
var gettyCodecTypeStrings = [...]string{
"unknown",
"json",
"protobuf",
}
func (c gettyCodecType) String() string {
if c == gettyJson || c == gettyProtobuf {
return gettyCodecTypeStrings[c]
}
return gettyCodecTypeStrings[gettyCodecUnknown]
}
func String2CodecType(codecType string) gettyCodecType {
switch codecType {
case gettyCodecTypeStrings[gettyJson]:
return gettyJson
case gettyCodecTypeStrings[gettyProtobuf]:
return gettyProtobuf
}
return gettyCodecUnknown
}
////////////////////////////////////////////
// getty command
////////////////////////////////////////////
type gettyCommand uint32
const (
......@@ -105,20 +68,50 @@ const (
GettyFail = 0x01
)
type SerializeType byte
////////////////////////////////////////////
// getty codec type
////////////////////////////////////////////
type gettyCodecType uint32
const (
JSON SerializeType = iota
ProtoBuffer
gettyCodecUnknown gettyCodecType = 0x00
gettyCodecJson = 0x01
gettyCodecProtobuf = 0x02
)
var (
Codecs = map[SerializeType]Codec{
JSON: &JSONCodec{},
ProtoBuffer: &PBCodec{},
gettyCodecStrings = [...]string{
"unknown",
"json",
"protobuf",
}
Codecs = map[gettyCodecType]Codec{
gettyCodecJson: &JSONCodec{},
gettyCodecProtobuf: &PBCodec{},
}
)
func (c gettyCodecType) String() string {
if c == gettyCodecJson || c == gettyCodecProtobuf {
return gettyCodecStrings[c]
}
return gettyCodecStrings[gettyCodecUnknown]
}
func String2CodecType(codecType string) gettyCodecType {
switch codecType {
case gettyCodecStrings[gettyCodecJson]:
return gettyCodecJson
case gettyCodecStrings[gettyCodecProtobuf]:
return gettyCodecProtobuf
}
return gettyCodecUnknown
}
// Codec defines the interface that decode/encode body.
type Codec interface {
Encode(i interface{}) ([]byte, error)
......@@ -194,9 +187,9 @@ func init() {
}
type RPCPackage interface {
Marshal(SerializeType, *bytes.Buffer) (int, error)
Marshal(gettyCodecType, *bytes.Buffer) (int, error)
// @buf length should be equal to GettyPkg.GettyPackageHeader.Len
Unmarshal(sz SerializeType, buf *bytes.Buffer) error
Unmarshal(sz gettyCodecType, buf *bytes.Buffer) error
GetBody() []byte
GetHeader() interface{}
}
......@@ -210,7 +203,7 @@ type GettyPackageHeader struct {
Code GettyErrorCode // error code
ServiceID uint32 // service id
CodecType SerializeType
CodecType gettyCodecType
}
type GettyPackage struct {
......@@ -230,14 +223,14 @@ func (p *GettyPackage) Marshal() (*bytes.Buffer, error) {
buf *bytes.Buffer
)
packLen = gettyPackageHeaderLen
packLen = rpcPackagePlaceholderLen + gettyPackageHeaderLen
if p.B != nil {
buf = &bytes.Buffer{}
length, err = p.B.Marshal(p.H.CodecType, buf)
if err != nil {
return nil, jerrors.Trace(err)
}
packLen = gettyPackageHeaderLen + length
packLen = rpcPackagePlaceholderLen + gettyPackageHeaderLen + length
}
buf0 := &bytes.Buffer{}
err = binary.Write(buf0, binary.LittleEndian, uint16(packLen))
......@@ -283,7 +276,7 @@ func (p *GettyPackage) Unmarshal(buf *bytes.Buffer) (int, error) {
}
if int(packLen) > rpcPackagePlaceholderLen+gettyPackageHeaderLen {
if err := p.B.Unmarshal(p.H.CodecType, bytes.NewBuffer(buf.Next(int(packLen)-gettyPackageHeaderLen))); err != nil {
if err := p.B.Unmarshal(p.H.CodecType, bytes.NewBuffer(buf.Next(int(packLen)-rpcPackagePlaceholderLen-gettyPackageHeaderLen))); err != nil {
return 0, jerrors.Trace(err)
}
}
......@@ -322,7 +315,7 @@ func NewGettyRPCRequest() RPCPackage {
return &GettyRPCRequest{}
}
func (req *GettyRPCRequest) Marshal(sz SerializeType, buf *bytes.Buffer) (int, error) {
func (req *GettyRPCRequest) Marshal(sz gettyCodecType, buf *bytes.Buffer) (int, error) {
codec := Codecs[sz]
if codec == nil {
return 0, jerrors.Errorf("can not find codec for %d", sz)
......@@ -355,8 +348,7 @@ func (req *GettyRPCRequest) Marshal(sz SerializeType, buf *bytes.Buffer) (int, e
return 2 + len(headerData) + 2 + len(bodyData), nil
}
func (req *GettyRPCRequest) Unmarshal(sz SerializeType, buf *bytes.Buffer) error {
func (req *GettyRPCRequest) Unmarshal(sz gettyCodecType, buf *bytes.Buffer) error {
var headerLen uint16
err := binary.Read(buf, binary.LittleEndian, &headerLen)
if err != nil {
......@@ -426,7 +418,7 @@ func NewGettyRPCResponse() RPCPackage {
return &GettyRPCResponse{}
}
func (resp *GettyRPCResponse) Marshal(sz SerializeType, buf *bytes.Buffer) (int, error) {
func (resp *GettyRPCResponse) Marshal(sz gettyCodecType, buf *bytes.Buffer) (int, error) {
codec := Codecs[sz]
if codec == nil {
return 0, jerrors.Errorf("can not find codec for %d", sz)
......@@ -461,8 +453,7 @@ func (resp *GettyRPCResponse) Marshal(sz SerializeType, buf *bytes.Buffer) (int,
return 2 + len(headerData) + 2 + len(bodyData), nil
}
func (resp *GettyRPCResponse) Unmarshal(sz SerializeType, buf *bytes.Buffer) error {
func (resp *GettyRPCResponse) Unmarshal(sz gettyCodecType, buf *bytes.Buffer) error {
var headerLen uint16
err := binary.Read(buf, binary.LittleEndian, &headerLen)
if err != nil {
......@@ -480,7 +471,6 @@ func (resp *GettyRPCResponse) Unmarshal(sz SerializeType, buf *bytes.Buffer) err
if err != nil {
return jerrors.Trace(err)
}
body := make([]byte, bodyLen)
err = binary.Read(buf, binary.LittleEndian, body)
if err != nil {
......
......@@ -32,7 +32,7 @@ type (
RegistryConfig struct {
Type string `default:"etcd" yaml:"type" json:"type,omitempty"`
Addr string `default:"127.0.0.1:2379" yaml:"addr" json:"addr,omitempty"`
Addr string `default:"" yaml:"addr" json:"addr,omitempty"`
KeepaliveTimeout int `default:"5" yaml:"keepalive_time" json:"keepalive_timeout,omitempty"`
Root string `default:"getty" yaml:"keepalive_time" json:"keepalive_timeout,omitempty"`
IDC string `default:"idc-bj" yaml:"idc" json:"idc,omitempty"`
......@@ -74,8 +74,11 @@ type (
ProfilePort int `default:"10086" yaml:"profile_port" json:"profile_port,omitempty"`
// server
// !!! Attention: If u wanna use registry, the ServerHost & ServerPort could be nil.
ServerHost string `default:"127.0.0.1" yaml:"server_host" json:"server_host,omitempty"`
ServerPort int `default:"10000" yaml:"server_port" json:"server_port,omitempty"`
CodecType string `default:"json" yaml:"codec_type" json:"codec_type,omitempty"`
codecType gettyCodecType
// session pool
ConnectionNum int `default:"16" yaml:"connection_num" json:"connection_num,omitempty"`
......@@ -92,6 +95,10 @@ type (
FailFastTimeout string `default:"5s" yaml:"fail_fast_timeout" json:"fail_fast_timeout,omitempty"`
failFastTimeout time.Duration
// Connection Pool
PoolSize int `default:"2" yaml:"pool_size" json:"pool_size,omitempty"`
PoolTTL int `default:"180" yaml:"pool_ttl" json:"pool_ttl,omitempty"`
// session tcp parameters
GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"`
......
......@@ -8,18 +8,22 @@ import (
"github.com/AlexStocks/getty/rpc"
"github.com/AlexStocks/getty/rpc/example/data"
log "github.com/AlexStocks/log4go"
jerrors "github.com/juju/errors"
)
func main() {
log.LoadConfiguration("client_log.xml")
client := rpc.NewClient("client_config.toml")
client, err := rpc.NewClient("client_config.toml")
if err != nil {
panic(jerrors.ErrorStack(err))
}
// client.SetCodecType(rpc.ProtoBuffer)//默认是json序列化
defer client.Close()
for i := 0; i < 100; i++ {
go func() {
var res string
err := client.Call("TestRpc", "Test", data.TestABC{"aaa", "bbb", "ccc"}, &res)
err := client.Call("127.0.0.1:20000", "json", "TestRpc", "Test", data.TestABC{"aaa", "bbb", "ccc"}, &res)
if err != nil {
log.Error(err)
return
......@@ -31,7 +35,7 @@ func main() {
for i := 0; i < 100; i++ {
go func() {
var result int
err := client.Call("TestRpc", "Add", 1, &result)
err := client.Call("127.0.0.1:20000", "json", "TestRpc", "Add", 1, &result)
if err != nil {
log.Error(err)
return
......@@ -41,9 +45,9 @@ func main() {
}
var errInt int
err := client.Call("TestRpc", "Err", 2, &errInt)
err = client.Call("127.0.0.1:20000", "json", "TestRpc", "Err", 2, &errInt)
if err != nil {
log.Error(err)
log.Error(jerrors.ErrorStack(err))
}
time.Sleep(20 * time.Second)
......
......@@ -39,4 +39,13 @@ FailFastTimeout = "3s"
TcpWriteTimeout = "5s"
WaitTimeout = "1s"
MaxMsgLen = 128
SessionName = "rpc-client"
SessionName = "getty-rpc-client"
# registry
[Registry]
Type = "etcd"
# Addr = "127.0.0.1:2379"
KeepaliveTimeout = 5
Root = "/getty"
IDC = "bj-unicom"
NodeID = "n147"
......@@ -32,12 +32,12 @@ FailFastTimeout = "3s"
TcpWriteTimeout = "5s"
WaitTimeout = "1s"
MaxMsgLen = 128
SessionName = "rpc-server"
SessionName = "getty-rpc-server"
# registry
[Registry]
Type = "etcd"
Addr = "127.0.0.1:2379"
# Addr = "127.0.0.1:2379"
KeepaliveTimeout = 5
Root = "/getty"
IDC = "bj-unicom"
......
......@@ -169,26 +169,26 @@ func (h *RpcServerHandler) callService(session getty.Session, req GettyRPCReques
////////////////////////////////////////////
type RpcClientHandler struct {
client *Client
conn *gettyRPCClientConn
}
func NewRpcClientHandler(client *Client) *RpcClientHandler {
return &RpcClientHandler{client: client}
func NewRpcClientHandler(client *gettyRPCClientConn) *RpcClientHandler {
return &RpcClientHandler{conn: client}
}
func (h *RpcClientHandler) OnOpen(session getty.Session) error {
h.client.addSession(session)
h.conn.addSession(session)
return nil
}
func (h *RpcClientHandler) OnError(session getty.Session, err error) {
log.Info("session{%s} got error{%v}, will be closed.", session.Stat(), err)
h.client.removeSession(session)
h.conn.removeSession(session)
}
func (h *RpcClientHandler) OnClose(session getty.Session) {
log.Info("session{%s} is closing......", session.Stat())
h.client.removeSession(session)
h.conn.removeSession(session)
}
func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
......@@ -198,9 +198,9 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
return
}
log.Debug("get rpc response{%s}", p)
h.client.updateSession(session)
h.conn.updateSession(session)
pendingResponse := h.client.RemovePendingResponse(p.H.Sequence)
pendingResponse := h.conn.pool.rpcClient.RemovePendingResponse(p.H.Sequence)
if pendingResponse == nil {
return
}
......@@ -228,17 +228,17 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
}
func (h *RpcClientHandler) OnCron(session getty.Session) {
rpcSession, err := h.client.getClientRpcSession(session)
rpcSession, err := h.conn.getClientRpcSession(session)
if err != nil {
log.Error("client.getClientSession(session{%s}) = error{%#v}", session.Stat(), err)
return
}
if h.client.conf.sessionTimeout.Nanoseconds() < time.Since(session.GetActive()).Nanoseconds() {
if h.conn.pool.rpcClient.conf.sessionTimeout.Nanoseconds() < time.Since(session.GetActive()).Nanoseconds() {
log.Warn("session{%s} timeout{%s}, reqNum{%d}",
session.Stat(), time.Since(session.GetActive()).String(), rpcSession.reqNum)
h.client.removeSession(session)
h.conn.removeSession(session) // -> h.conn.close() -> h.conn.pool.remove(h.conn)
return
}
h.client.heartbeat(session)
h.conn.pool.rpcClient.heartbeat(session)
}
package rpc
import (
"fmt"
"math/rand"
"net"
"strings"
"sync"
"time"
)
import (
"github.com/AlexStocks/getty"
log "github.com/AlexStocks/log4go"
jerrors "github.com/juju/errors"
)
type gettyRPCClientConn struct {
once sync.Once
protocol string
addr string
created int64 // 为0,则说明没有被创建或者被销毁了
pool *gettyRPCClientConnPool
lock sync.RWMutex
gettyClient getty.Client
sessions []*rpcSession
}
var (
errClientPoolClosed = jerrors.New("client pool closed")
)
func newGettyRPCClientConn(pool *gettyRPCClientConnPool, protocol, addr string) (*gettyRPCClientConn, error) {
c := &gettyRPCClientConn{
protocol: protocol,
addr: addr,
pool: pool,
gettyClient: getty.NewTCPClient(
getty.WithServerAddress(addr),
getty.WithConnectionNumber((int)(pool.rpcClient.conf.ConnectionNum)),
),
}
c.gettyClient.RunEventLoop(c.newSession)
idx := 1
for {
idx++
if c.isAvailable() {
break
}
if idx > 5000 {
return nil, jerrors.New(fmt.Sprintf("failed to create client connection to %s in 5 seconds", addr))
}
time.Sleep(1e6)
}
log.Info("client init ok")
c.created = time.Now().Unix()
return c, nil
}
func (c *gettyRPCClientConn) newSession(session getty.Session) error {
var (
ok bool
tcpConn *net.TCPConn
conf *ClientConfig
)
conf = c.pool.rpcClient.conf
if conf.GettySessionParam.CompressEncoding {
session.SetCompressType(getty.CompressZip)
}
if tcpConn, ok = session.Conn().(*net.TCPConn); !ok {
panic(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection\n", session.Stat(), session.Conn()))
}
tcpConn.SetNoDelay(conf.GettySessionParam.TcpNoDelay)
tcpConn.SetKeepAlive(conf.GettySessionParam.TcpKeepAlive)
if conf.GettySessionParam.TcpKeepAlive {
tcpConn.SetKeepAlivePeriod(conf.GettySessionParam.keepAlivePeriod)
}
tcpConn.SetReadBuffer(conf.GettySessionParam.TcpRBufSize)
tcpConn.SetWriteBuffer(conf.GettySessionParam.TcpWBufSize)
session.SetName(conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewRpcClientPackageHandler())
session.SetEventListener(NewRpcClientHandler(c))
session.SetRQLen(conf.GettySessionParam.PkgRQSize)
session.SetWQLen(conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)
session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(conf.heartbeatPeriod.Nanoseconds() / 1e6))
session.SetWaitTime(conf.GettySessionParam.waitTimeout)
log.Debug("client new session:%s\n", session.Stat())
return nil
}
func (c *gettyRPCClientConn) selectSession() getty.Session {
c.lock.RLock()
defer c.lock.RUnlock()
if c.sessions == nil {
return nil
}
count := len(c.sessions)
if count == 0 {
return nil
}
return c.sessions[rand.Int31n(int32(count))].session
}
func (c *gettyRPCClientConn) addSession(session getty.Session) {
log.Debug("add session{%s}", session.Stat())
if session == nil {
return
}
c.lock.Lock()
c.sessions = append(c.sessions, &rpcSession{session: session})
c.lock.Unlock()
}
func (c *gettyRPCClientConn) removeSession(session getty.Session) {
if session == nil {
return
}
c.lock.Lock()
defer c.lock.Unlock()
if c.sessions == nil {
return
}
for i, s := range c.sessions {
if s.session == session {
c.sessions = append(c.sessions[:i], c.sessions[i+1:]...)
log.Debug("delete session{%s}, its index{%d}", session.Stat(), i)
break
}
}
log.Info("after remove session{%s}, left session number:%d", session.Stat(), len(c.sessions))
if len(c.sessions) == 0 {
c.close() // -> pool.remove(c)
}
}
func (c *gettyRPCClientConn) updateSession(session getty.Session) {
if session == nil {
return
}
c.lock.Lock()
defer c.lock.Unlock()
if c.sessions == nil {
return
}
for i, s := range c.sessions {
if s.session == session {
c.sessions[i].reqNum++
break
}
}
}
func (c *gettyRPCClientConn) getClientRpcSession(session getty.Session) (rpcSession, error) {
var (
err error
rpcSession rpcSession
)
c.lock.Lock()
defer c.lock.Unlock()
if c.sessions == nil {
return rpcSession, errClientClosed
}
err = errSessionNotExist
for _, s := range c.sessions {
if s.session == session {
rpcSession = *s
err = nil
break
}
}
return rpcSession, jerrors.Trace(err)
}
func (c *gettyRPCClientConn) isAvailable() bool {
if c.selectSession() == nil {
return false
}
return true
}
func (c *gettyRPCClientConn) close() error {
err := jerrors.Errorf("close gettyRPCClientConn{%#v} again", c)
c.once.Do(func() {
// delete @c from client pool
c.pool.remove(c)
for _, s := range c.sessions {
log.Info("close client session{%s, last active:%s, request number:%d}",
s.session.Stat(), s.session.GetActive().String(), s.reqNum)
s.session.Close()
}
c.gettyClient.Close()
c.gettyClient = nil
c.sessions = c.sessions[:0]
c.created = 0
err = nil
})
return err
}
type gettyRPCClientConnPool struct {
rpcClient *Client
size int // []*gettyRPCClientConn数组的size
ttl int64 // 每个gettyRPCClientConn的有效期时间. pool对象会在getConn时执行ttl检查
sync.Mutex
connMap map[string][]*gettyRPCClientConn // 从[]*gettyRPCClientConn 可见key是连接地址,而value是对应这个地址的连接数组
}
func newGettyRPCClientConnPool(rpcClient *Client, size int, ttl time.Duration) *gettyRPCClientConnPool {
return &gettyRPCClientConnPool{
rpcClient: rpcClient,
size: size,
ttl: int64(ttl.Seconds()),
connMap: make(map[string][]*gettyRPCClientConn),
}
}
func (p *gettyRPCClientConnPool) close() {
p.Lock()
connMap := p.connMap
p.connMap = nil
p.Unlock()
for _, connArray := range connMap {
for _, conn := range connArray {
conn.close()
}
}
}
func (p *gettyRPCClientConnPool) getConn(protocol, addr string) (*gettyRPCClientConn, error) {
var builder strings.Builder
builder.WriteString(addr)
builder.WriteString("@")
builder.WriteString(protocol)
key := builder.String()
p.Lock()
defer p.Unlock()
if p.connMap == nil {
return nil, errClientPoolClosed
}
connArray := p.connMap[key]
now := time.Now().Unix()
for len(connArray) > 0 {
conn := connArray[len(connArray)-1]
connArray = connArray[:len(connArray)-1]
p.connMap[key] = connArray
if d := now - conn.created; d > p.ttl {
conn.close() // -> pool.remove(c)
continue
}
p.Unlock()
return conn, nil
}
// create new conn
return newGettyRPCClientConn(p, protocol, addr)
}
func (p *gettyRPCClientConnPool) release(conn *gettyRPCClientConn, err error) {
if conn == nil || conn.created == 0 {
return
}
if err != nil {
conn.close() //
return
}
var builder strings.Builder
builder.WriteString(conn.addr)
builder.WriteString("@")
builder.WriteString(conn.protocol)
key := builder.String()
p.Lock()
defer p.Unlock()
if p.connMap == nil {
return
}
connArray := p.connMap[key]
if len(connArray) >= p.size {
p.Unlock()
conn.close()
return
}
p.connMap[key] = append(connArray, conn)
}
func (p *gettyRPCClientConnPool) remove(conn *gettyRPCClientConn) {
if conn == nil || conn.created == 0 {
return
}
var builder strings.Builder
builder.WriteString(conn.addr)
builder.WriteString("@")
builder.WriteString(conn.protocol)
key := builder.String()
p.Lock()
defer p.Unlock()
if p.connMap == nil {
return
}
connArray := p.connMap[key]
if len(connArray) > 0 {
for idx, c := range connArray {
if conn == c {
p.connMap[key] = append(connArray[:idx], connArray[idx+1:]...)
break
}
}
}
}
......@@ -123,7 +123,9 @@ func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface
resp := &GettyRPCResponsePackage{
H: pkg.H,
header: pkg.B.GetHeader().(GettyRPCResponseHeader),
body: pkg.B.GetBody(),
}
if pkg.H.Command != gettyCmdHbResponse {
resp.body = pkg.B.GetBody()
}
return resp, length, nil
}
......
......@@ -26,19 +26,21 @@ type Server struct {
conf *ServerConfig
serviceMap map[string]*service
tcpServerList []getty.Server
registry gxregistry.Registry
sa gxregistry.ServiceAttr
nodes []*gxregistry.Node
// registry
registry gxregistry.Registry
sa gxregistry.ServiceAttr
nodes []*gxregistry.Node
}
var (
ErrIllegalCodecType = jerrors.New("illegal codec type")
ErrIllegalConf = "illegal conf: "
)
func NewServer(confFile string) (*Server, error) {
conf := loadServerConf(confFile)
if conf.codecType = String2CodecType(conf.CodecType); conf.codecType == gettyCodecUnknown {
return nil, ErrIllegalCodecType
return nil, jerrors.New(fmt.Sprintf(ErrIllegalConf+"codec type %s", conf.CodecType))
}
s := &Server{
......@@ -46,9 +48,9 @@ func NewServer(confFile string) (*Server, error) {
conf: conf,
}
var err error
var registry gxregistry.Registry
if len(s.conf.Registry.Addr) != 0 {
var err error
var registry gxregistry.Registry
addrList := strings.Split(s.conf.Registry.Addr, ",")
switch s.conf.Registry.Type {
case "etcd":
......@@ -63,13 +65,15 @@ func NewServer(confFile string) (*Server, error) {
gxregistry.WithTimeout(time.Duration(int(time.Second)*s.conf.Registry.KeepaliveTimeout)),
gxregistry.WithRoot(s.conf.Registry.Root),
)
default:
return nil, jerrors.New(fmt.Sprintf(ErrIllegalConf+"registry type %s", s.conf.Registry.Type))
}
if err != nil {
return nil, jerrors.Trace(err)
}
if registry != nil {
s.registry = registry
s.registry = registry
if s.registry != nil {
s.sa = gxregistry.ServiceAttr{
Group: s.conf.Registry.IDC,
Role: gxregistry.SRT_Provider,
......@@ -86,7 +90,9 @@ func NewServer(confFile string) (*Server, error) {
&gxregistry.Node{
ID: s.conf.Registry.NodeID + "-" + net.JoinHostPort(s.conf.Host, p),
Address: s.conf.Host,
Port: int32(port)})
Port: int32(port),
},
)
}
}
}
......@@ -210,8 +216,15 @@ func (s *Server) Init() {
}
func (s *Server) Stop() {
for _, tcpServer := range s.tcpServerList {
tcpServer.Close()
if s.registry != nil {
s.registry.Close()
}
list := s.tcpServerList
s.tcpServerList = nil
if list != nil {
for _, tcpServer := range list {
tcpServer.Close()
}
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment