Commit c8ccd9d0 authored by AlexStocks's avatar AlexStocks

Add: client connection pool

parent bc4d8526
package rpc package rpc
import ( import (
"fmt"
"math/rand" "math/rand"
"net" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
) )
import ( import (
jerrors "github.com/juju/errors"
)
import (
"github.com/AlexStocks/getty" "github.com/AlexStocks/getty"
"github.com/AlexStocks/goext/database/filter"
"github.com/AlexStocks/goext/database/filter/pool"
"github.com/AlexStocks/goext/database/registry"
"github.com/AlexStocks/goext/database/registry/etcdv3"
"github.com/AlexStocks/goext/database/registry/zookeeper"
"github.com/AlexStocks/goext/net" "github.com/AlexStocks/goext/net"
log "github.com/AlexStocks/log4go" "github.com/AlexStocks/goext/sync/atomic"
jerrors "github.com/juju/errors"
) )
var ( var (
...@@ -30,90 +30,89 @@ func init() { ...@@ -30,90 +30,89 @@ func init() {
} }
type Client struct { type Client struct {
conf *ClientConfig conf *ClientConfig
lock sync.RWMutex pool *gettyRPCClientConnPool
sessions []*rpcSession sequence gxatomic.Uint64
gettyClient getty.Client
codecType SerializeType
sequence uint64
pendingLock sync.RWMutex pendingLock sync.RWMutex
pendingResponses map[uint64]*PendingResponse pendingResponses map[uint64]*PendingResponse
// registry
registry gxregistry.Registry
sa gxregistry.ServiceAttr
filter gxfilter.Filter
} }
func NewClient(confFile string) *Client { func NewClient(confFile string) (*Client, error) {
conf := loadClientConf(confFile) conf := loadClientConf(confFile)
c := &Client{ c := &Client{
pendingResponses: make(map[uint64]*PendingResponse), pendingResponses: make(map[uint64]*PendingResponse),
conf: conf, conf: conf,
gettyClient: getty.NewTCPClient(
getty.WithServerAddress(gxnet.HostAddress(conf.ServerHost, conf.ServerPort)),
getty.WithConnectionNumber((int)(conf.ConnectionNum)),
),
codecType: JSON,
} }
c.gettyClient.RunEventLoop(c.newSession) c.pool = newGettyRPCClientConnPool(c, conf.PoolSize, time.Duration(int(time.Second)*conf.PoolTTL))
idx := 1
for {
idx++
if c.isAvailable() {
break
}
if idx > 12000 { if len(c.conf.Registry.Addr) == 0 {
panic("failed to create client in 2 minutes") if conf.codecType = String2CodecType(conf.CodecType); conf.codecType == gettyCodecUnknown {
return nil, ErrIllegalSerialType
} }
time.Sleep(1e6)
}
log.Info("client init ok")
return c
}
func (c *Client) SetCodecType(st SerializeType) {
c.codecType = st
}
func (c *Client) newSession(session getty.Session) error {
var (
ok bool
tcpConn *net.TCPConn
)
if c.conf.GettySessionParam.CompressEncoding {
session.SetCompressType(getty.CompressZip)
}
if tcpConn, ok = session.Conn().(*net.TCPConn); !ok { _, err := c.pool.getConn(conf.CodecType, gxnet.HostAddress(conf.ServerHost, conf.ServerPort))
panic(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection\n", session.Stat(), session.Conn()))
} return c, jerrors.Trace(err)
}
var err error
var registry gxregistry.Registry
addrList := strings.Split(c.conf.Registry.Addr, ",")
switch c.conf.Registry.Type {
case "etcd":
registry, err = gxetcd.NewRegistry(
gxregistry.WithAddrs(addrList...),
gxregistry.WithTimeout(time.Duration(int(time.Second)*c.conf.Registry.KeepaliveTimeout)),
gxregistry.WithRoot(c.conf.Registry.Root),
)
case "zookeeper":
registry, err = gxzookeeper.NewRegistry(
gxregistry.WithAddrs(addrList...),
gxregistry.WithTimeout(time.Duration(int(time.Second)*c.conf.Registry.KeepaliveTimeout)),
gxregistry.WithRoot(c.conf.Registry.Root),
)
}
if err != nil {
return nil, jerrors.Trace(err)
}
if registry != nil {
c.registry = registry
c.filter, err = gxpool.NewFilter(
gxfilter.WithBalancerMode(gxfilter.SM_Hash),
gxfilter.WithRegistry(c.registry),
gxpool.WithTTL(time.Duration(int(time.Second)*c.conf.Registry.KeepaliveTimeout)),
)
if err == nil {
return nil, jerrors.Trace(err)
}
tcpConn.SetNoDelay(c.conf.GettySessionParam.TcpNoDelay) service := gxregistry.Service{
tcpConn.SetKeepAlive(c.conf.GettySessionParam.TcpKeepAlive) Attr: &gxregistry.ServiceAttr{
if c.conf.GettySessionParam.TcpKeepAlive { Group: c.conf.Registry.IDC,
tcpConn.SetKeepAlivePeriod(c.conf.GettySessionParam.keepAlivePeriod) Role: gxregistry.SRT_Consumer,
Protocol: c.conf.CodecType,
},
Nodes: []*gxregistry.Node{&gxregistry.Node{
ID: c.conf.Registry.NodeID,
Address: c.conf.Host,
Port: 0,
},
},
}
if err = c.registry.Register(service); err != nil {
return nil, jerrors.Trace(err)
}
} }
tcpConn.SetReadBuffer(c.conf.GettySessionParam.TcpRBufSize)
tcpConn.SetWriteBuffer(c.conf.GettySessionParam.TcpWBufSize)
session.SetName(c.conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(c.conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewRpcClientPackageHandler())
session.SetEventListener(NewRpcClientHandler(c))
session.SetRQLen(c.conf.GettySessionParam.PkgRQSize)
session.SetWQLen(c.conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(c.conf.GettySessionParam.tcpReadTimeout)
session.SetWriteTimeout(c.conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(c.conf.heartbeatPeriod.Nanoseconds() / 1e6))
session.SetWaitTime(c.conf.GettySessionParam.waitTimeout)
log.Debug("client new session:%s\n", session.Stat())
return nil return c, nil
}
func (c *Client) Sequence() uint64 {
return atomic.AddUint64(&c.sequence, 1)
} }
func (c *Client) Call(service, method string, args interface{}, reply interface{}) error { func (c *Client) Call(service, method string, args interface{}, reply interface{}) error {
...@@ -142,115 +141,13 @@ func (c *Client) Call(service, method string, args interface{}, reply interface{ ...@@ -142,115 +141,13 @@ func (c *Client) Call(service, method string, args interface{}, reply interface{
return jerrors.Trace(resp.err) return jerrors.Trace(resp.err)
} }
func (c *Client) isAvailable() bool {
if c.selectSession() == nil {
return false
}
return true
}
func (c *Client) Close() { func (c *Client) Close() {
c.lock.Lock() c.pool.close()
if c.gettyClient != nil { c.registry.Close()
for _, s := range c.sessions {
log.Info("close client session{%s, last active:%s, request number:%d}",
s.session.Stat(), s.session.GetActive().String(), s.reqNum)
s.session.Close()
}
c.gettyClient.Close()
c.gettyClient = nil
c.sessions = c.sessions[:0]
}
c.lock.Unlock()
} }
func (c *Client) selectSession() getty.Session { func (c *Client) selectSession() getty.Session {
c.lock.RLock() return nil
defer c.lock.RUnlock()
if c.sessions == nil {
return nil
}
count := len(c.sessions)
if count == 0 {
return nil
}
return c.sessions[rand.Int31n(int32(count))].session
}
func (c *Client) addSession(session getty.Session) {
log.Debug("add session{%s}", session.Stat())
if session == nil {
return
}
c.lock.Lock()
c.sessions = append(c.sessions, &rpcSession{session: session})
c.lock.Unlock()
}
func (c *Client) removeSession(session getty.Session) {
if session == nil {
return
}
c.lock.Lock()
defer c.lock.Unlock()
if c.sessions == nil {
return
}
for i, s := range c.sessions {
if s.session == session {
c.sessions = append(c.sessions[:i], c.sessions[i+1:]...)
log.Debug("delete session{%s}, its index{%d}", session.Stat(), i)
break
}
}
log.Info("after remove session{%s}, left session number:%d", session.Stat(), len(c.sessions))
}
func (c *Client) updateSession(session getty.Session) {
if session == nil {
return
}
c.lock.Lock()
defer c.lock.Unlock()
if c.sessions == nil {
return
}
for i, s := range c.sessions {
if s.session == session {
c.sessions[i].reqNum++
break
}
}
}
func (c *Client) getClientRpcSession(session getty.Session) (rpcSession, error) {
var (
err error
rpcSession rpcSession
)
c.lock.Lock()
defer c.lock.Unlock()
if c.sessions == nil {
return rpcSession, errClientClosed
}
err = errSessionNotExist
for _, s := range c.sessions {
if s.session == session {
rpcSession = *s
err = nil
break
}
}
return rpcSession, jerrors.Trace(err)
} }
func (c *Client) heartbeat(session getty.Session) error { func (c *Client) heartbeat(session getty.Session) error {
...@@ -265,12 +162,12 @@ func (c *Client) transfer(session getty.Session, req *GettyRPCRequest, resp *Pen ...@@ -265,12 +162,12 @@ func (c *Client) transfer(session getty.Session, req *GettyRPCRequest, resp *Pen
pkg GettyPackage pkg GettyPackage
) )
sequence = c.Sequence() sequence = c.sequence.Add(1)
pkg.H.Magic = gettyPackageMagic pkg.H.Magic = gettyPackageMagic
pkg.H.LogID = (uint32)(randomID()) pkg.H.LogID = (uint32)(randomID())
pkg.H.Sequence = sequence pkg.H.Sequence = sequence
pkg.H.Command = gettyCmdHbRequest pkg.H.Command = gettyCmdHbRequest
pkg.H.CodecType = c.codecType pkg.H.CodecType = c.conf.codecType
if req != nil { if req != nil {
pkg.H.Command = gettyCmdRPCRequest pkg.H.Command = gettyCmdRPCRequest
pkg.B = req pkg.B = req
......
...@@ -23,43 +23,6 @@ import ( ...@@ -23,43 +23,6 @@ import (
// getty command // getty command
//////////////////////////////////////////// ////////////////////////////////////////////
type gettyCodecType uint32
const (
gettyCodecUnknown gettyCodecType = 0x00
gettyJson = 0x01
gettyProtobuf = 0x02
)
var gettyCodecTypeStrings = [...]string{
"unknown",
"json",
"protobuf",
}
func (c gettyCodecType) String() string {
if c == gettyJson || c == gettyProtobuf {
return gettyCodecTypeStrings[c]
}
return gettyCodecTypeStrings[gettyCodecUnknown]
}
func String2CodecType(codecType string) gettyCodecType {
switch codecType {
case gettyCodecTypeStrings[gettyJson]:
return gettyJson
case gettyCodecTypeStrings[gettyProtobuf]:
return gettyProtobuf
}
return gettyCodecUnknown
}
////////////////////////////////////////////
// getty command
////////////////////////////////////////////
type gettyCommand uint32 type gettyCommand uint32
const ( const (
...@@ -105,20 +68,50 @@ const ( ...@@ -105,20 +68,50 @@ const (
GettyFail = 0x01 GettyFail = 0x01
) )
type SerializeType byte ////////////////////////////////////////////
// getty codec type
////////////////////////////////////////////
type gettyCodecType uint32
const ( const (
JSON SerializeType = iota gettyCodecUnknown gettyCodecType = 0x00
ProtoBuffer gettyCodecJson = 0x01
gettyCodecProtobuf = 0x02
) )
var ( var (
Codecs = map[SerializeType]Codec{ gettyCodecStrings = [...]string{
JSON: &JSONCodec{}, "unknown",
ProtoBuffer: &PBCodec{}, "json",
"protobuf",
}
Codecs = map[gettyCodecType]Codec{
gettyCodecJson: &JSONCodec{},
gettyCodecProtobuf: &PBCodec{},
} }
) )
func (c gettyCodecType) String() string {
if c == gettyCodecJson || c == gettyCodecProtobuf {
return gettyCodecStrings[c]
}
return gettyCodecStrings[gettyCodecUnknown]
}
func String2CodecType(codecType string) gettyCodecType {
switch codecType {
case gettyCodecStrings[gettyCodecJson]:
return gettyCodecJson
case gettyCodecStrings[gettyCodecProtobuf]:
return gettyCodecProtobuf
}
return gettyCodecUnknown
}
// Codec defines the interface that decode/encode body. // Codec defines the interface that decode/encode body.
type Codec interface { type Codec interface {
Encode(i interface{}) ([]byte, error) Encode(i interface{}) ([]byte, error)
...@@ -194,9 +187,9 @@ func init() { ...@@ -194,9 +187,9 @@ func init() {
} }
type RPCPackage interface { type RPCPackage interface {
Marshal(SerializeType, *bytes.Buffer) (int, error) Marshal(gettyCodecType, *bytes.Buffer) (int, error)
// @buf length should be equal to GettyPkg.GettyPackageHeader.Len // @buf length should be equal to GettyPkg.GettyPackageHeader.Len
Unmarshal(sz SerializeType, buf *bytes.Buffer) error Unmarshal(sz gettyCodecType, buf *bytes.Buffer) error
GetBody() []byte GetBody() []byte
GetHeader() interface{} GetHeader() interface{}
} }
...@@ -210,7 +203,7 @@ type GettyPackageHeader struct { ...@@ -210,7 +203,7 @@ type GettyPackageHeader struct {
Code GettyErrorCode // error code Code GettyErrorCode // error code
ServiceID uint32 // service id ServiceID uint32 // service id
CodecType SerializeType CodecType gettyCodecType
} }
type GettyPackage struct { type GettyPackage struct {
...@@ -322,7 +315,7 @@ func NewGettyRPCRequest() RPCPackage { ...@@ -322,7 +315,7 @@ func NewGettyRPCRequest() RPCPackage {
return &GettyRPCRequest{} return &GettyRPCRequest{}
} }
func (req *GettyRPCRequest) Marshal(sz SerializeType, buf *bytes.Buffer) (int, error) { func (req *GettyRPCRequest) Marshal(sz gettyCodecType, buf *bytes.Buffer) (int, error) {
codec := Codecs[sz] codec := Codecs[sz]
if codec == nil { if codec == nil {
return 0, jerrors.Errorf("can not find codec for %d", sz) return 0, jerrors.Errorf("can not find codec for %d", sz)
...@@ -355,8 +348,7 @@ func (req *GettyRPCRequest) Marshal(sz SerializeType, buf *bytes.Buffer) (int, e ...@@ -355,8 +348,7 @@ func (req *GettyRPCRequest) Marshal(sz SerializeType, buf *bytes.Buffer) (int, e
return 2 + len(headerData) + 2 + len(bodyData), nil return 2 + len(headerData) + 2 + len(bodyData), nil
} }
func (req *GettyRPCRequest) Unmarshal(sz SerializeType, buf *bytes.Buffer) error { func (req *GettyRPCRequest) Unmarshal(sz gettyCodecType, buf *bytes.Buffer) error {
var headerLen uint16 var headerLen uint16
err := binary.Read(buf, binary.LittleEndian, &headerLen) err := binary.Read(buf, binary.LittleEndian, &headerLen)
if err != nil { if err != nil {
...@@ -426,7 +418,7 @@ func NewGettyRPCResponse() RPCPackage { ...@@ -426,7 +418,7 @@ func NewGettyRPCResponse() RPCPackage {
return &GettyRPCResponse{} return &GettyRPCResponse{}
} }
func (resp *GettyRPCResponse) Marshal(sz SerializeType, buf *bytes.Buffer) (int, error) { func (resp *GettyRPCResponse) Marshal(sz gettyCodecType, buf *bytes.Buffer) (int, error) {
codec := Codecs[sz] codec := Codecs[sz]
if codec == nil { if codec == nil {
return 0, jerrors.Errorf("can not find codec for %d", sz) return 0, jerrors.Errorf("can not find codec for %d", sz)
...@@ -461,8 +453,7 @@ func (resp *GettyRPCResponse) Marshal(sz SerializeType, buf *bytes.Buffer) (int, ...@@ -461,8 +453,7 @@ func (resp *GettyRPCResponse) Marshal(sz SerializeType, buf *bytes.Buffer) (int,
return 2 + len(headerData) + 2 + len(bodyData), nil return 2 + len(headerData) + 2 + len(bodyData), nil
} }
func (resp *GettyRPCResponse) Unmarshal(sz SerializeType, buf *bytes.Buffer) error { func (resp *GettyRPCResponse) Unmarshal(sz gettyCodecType, buf *bytes.Buffer) error {
var headerLen uint16 var headerLen uint16
err := binary.Read(buf, binary.LittleEndian, &headerLen) err := binary.Read(buf, binary.LittleEndian, &headerLen)
if err != nil { if err != nil {
......
...@@ -74,8 +74,11 @@ type ( ...@@ -74,8 +74,11 @@ type (
ProfilePort int `default:"10086" yaml:"profile_port" json:"profile_port,omitempty"` ProfilePort int `default:"10086" yaml:"profile_port" json:"profile_port,omitempty"`
// server // server
// !!! Attention: If u wanna use registry, the ServerHost & ServerPort could be nil.
ServerHost string `default:"127.0.0.1" yaml:"server_host" json:"server_host,omitempty"` ServerHost string `default:"127.0.0.1" yaml:"server_host" json:"server_host,omitempty"`
ServerPort int `default:"10000" yaml:"server_port" json:"server_port,omitempty"` ServerPort int `default:"10000" yaml:"server_port" json:"server_port,omitempty"`
CodecType string `default:"json" yaml:"codec_type" json:"codec_type,omitempty"`
codecType gettyCodecType
// session pool // session pool
ConnectionNum int `default:"16" yaml:"connection_num" json:"connection_num,omitempty"` ConnectionNum int `default:"16" yaml:"connection_num" json:"connection_num,omitempty"`
...@@ -92,6 +95,10 @@ type ( ...@@ -92,6 +95,10 @@ type (
FailFastTimeout string `default:"5s" yaml:"fail_fast_timeout" json:"fail_fast_timeout,omitempty"` FailFastTimeout string `default:"5s" yaml:"fail_fast_timeout" json:"fail_fast_timeout,omitempty"`
failFastTimeout time.Duration failFastTimeout time.Duration
// Connection Pool
PoolSize int `default:"2" yaml:"pool_size" json:"pool_size,omitempty"`
PoolTTL int `default:"180" yaml:"pool_ttl" json:"pool_ttl,omitempty"`
// session tcp parameters // session tcp parameters
GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"` GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"`
......
...@@ -8,8 +8,8 @@ import ( ...@@ -8,8 +8,8 @@ import (
) )
func main() { func main() {
log.LoadConfiguration("/Users/alex/test/golang/lib/src/github.com/AlexStocks/getty/rpc/example/server/server_log.xml") log.LoadConfiguration("./server_log.xml")
srv, err := rpc.NewServer("/Users/alex/test/golang/lib/src/github.com/AlexStocks/getty/rpc/example/server/server_config.toml") srv, err := rpc.NewServer("./server_config.toml")
if err != nil { if err != nil {
panic(jerrors.ErrorStack(err)) panic(jerrors.ErrorStack(err))
} }
......
...@@ -169,26 +169,26 @@ func (h *RpcServerHandler) callService(session getty.Session, req GettyRPCReques ...@@ -169,26 +169,26 @@ func (h *RpcServerHandler) callService(session getty.Session, req GettyRPCReques
//////////////////////////////////////////// ////////////////////////////////////////////
type RpcClientHandler struct { type RpcClientHandler struct {
client *Client conn *gettyRPCClientConn
} }
func NewRpcClientHandler(client *Client) *RpcClientHandler { func NewRpcClientHandler(client *gettyRPCClientConn) *RpcClientHandler {
return &RpcClientHandler{client: client} return &RpcClientHandler{conn: client}
} }
func (h *RpcClientHandler) OnOpen(session getty.Session) error { func (h *RpcClientHandler) OnOpen(session getty.Session) error {
h.client.addSession(session) h.conn.addSession(session)
return nil return nil
} }
func (h *RpcClientHandler) OnError(session getty.Session, err error) { func (h *RpcClientHandler) OnError(session getty.Session, err error) {
log.Info("session{%s} got error{%v}, will be closed.", session.Stat(), err) log.Info("session{%s} got error{%v}, will be closed.", session.Stat(), err)
h.client.removeSession(session) h.conn.removeSession(session)
} }
func (h *RpcClientHandler) OnClose(session getty.Session) { func (h *RpcClientHandler) OnClose(session getty.Session) {
log.Info("session{%s} is closing......", session.Stat()) log.Info("session{%s} is closing......", session.Stat())
h.client.removeSession(session) h.conn.removeSession(session)
} }
func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
...@@ -198,9 +198,9 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { ...@@ -198,9 +198,9 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
return return
} }
log.Debug("get rpc response{%s}", p) log.Debug("get rpc response{%s}", p)
h.client.updateSession(session) h.conn.updateSession(session)
pendingResponse := h.client.RemovePendingResponse(p.H.Sequence) pendingResponse := h.conn.pool.rpcClient.RemovePendingResponse(p.H.Sequence)
if pendingResponse == nil { if pendingResponse == nil {
return return
} }
...@@ -228,17 +228,17 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { ...@@ -228,17 +228,17 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
} }
func (h *RpcClientHandler) OnCron(session getty.Session) { func (h *RpcClientHandler) OnCron(session getty.Session) {
rpcSession, err := h.client.getClientRpcSession(session) rpcSession, err := h.conn.getClientRpcSession(session)
if err != nil { if err != nil {
log.Error("client.getClientSession(session{%s}) = error{%#v}", session.Stat(), err) log.Error("client.getClientSession(session{%s}) = error{%#v}", session.Stat(), err)
return return
} }
if h.client.conf.sessionTimeout.Nanoseconds() < time.Since(session.GetActive()).Nanoseconds() { if h.conn.pool.rpcClient.conf.sessionTimeout.Nanoseconds() < time.Since(session.GetActive()).Nanoseconds() {
log.Warn("session{%s} timeout{%s}, reqNum{%d}", log.Warn("session{%s} timeout{%s}, reqNum{%d}",
session.Stat(), time.Since(session.GetActive()).String(), rpcSession.reqNum) session.Stat(), time.Since(session.GetActive()).String(), rpcSession.reqNum)
h.client.removeSession(session) h.conn.removeSession(session) // -> h.conn.close() -> h.conn.pool.remove(h.conn)
return return
} }
h.client.heartbeat(session) h.conn.pool.rpcClient.heartbeat(session)
} }
package rpc
import (
"fmt"
"math/rand"
"net"
"strings"
"sync"
"time"
)
import (
"github.com/AlexStocks/getty"
log "github.com/AlexStocks/log4go"
jerrors "github.com/juju/errors"
)
type gettyRPCClientConn struct {
once sync.Once
protocol string
addr string
created int64 // 为0,则说明没有被创建或者被销毁了
pool *gettyRPCClientConnPool
lock sync.RWMutex
gettyClient getty.Client
sessions []*rpcSession
}
func newGettyRPCClientConn(pool *gettyRPCClientConnPool, protocol, addr string) (*gettyRPCClientConn, error) {
c := &gettyRPCClientConn{
protocol: protocol,
addr: addr,
pool: pool,
gettyClient: getty.NewTCPClient(
getty.WithServerAddress(addr),
getty.WithConnectionNumber((int)(pool.rpcClient.conf.ConnectionNum)),
),
}
c.gettyClient.RunEventLoop(c.newSession)
idx := 1
for {
idx++
if c.isAvailable() {
break
}
if idx > 5000 {
return nil, jerrors.New(fmt.Sprintf("failed to create client connection to %s in 5 seconds", addr))
}
time.Sleep(1e6)
}
log.Info("client init ok")
c.created = time.Now().Unix()
return c, nil
}
func (c *gettyRPCClientConn) newSession(session getty.Session) error {
var (
ok bool
tcpConn *net.TCPConn
conf *ClientConfig
)
conf = c.pool.rpcClient.conf
if conf.GettySessionParam.CompressEncoding {
session.SetCompressType(getty.CompressZip)
}
if tcpConn, ok = session.Conn().(*net.TCPConn); !ok {
panic(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection\n", session.Stat(), session.Conn()))
}
tcpConn.SetNoDelay(conf.GettySessionParam.TcpNoDelay)
tcpConn.SetKeepAlive(conf.GettySessionParam.TcpKeepAlive)
if conf.GettySessionParam.TcpKeepAlive {
tcpConn.SetKeepAlivePeriod(conf.GettySessionParam.keepAlivePeriod)
}
tcpConn.SetReadBuffer(conf.GettySessionParam.TcpRBufSize)
tcpConn.SetWriteBuffer(conf.GettySessionParam.TcpWBufSize)
session.SetName(conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewRpcClientPackageHandler())
session.SetEventListener(NewRpcClientHandler(c))
session.SetRQLen(conf.GettySessionParam.PkgRQSize)
session.SetWQLen(conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)
session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(conf.heartbeatPeriod.Nanoseconds() / 1e6))
session.SetWaitTime(conf.GettySessionParam.waitTimeout)
log.Debug("client new session:%s\n", session.Stat())
return nil
}
func (c *gettyRPCClientConn) selectSession() getty.Session {
c.lock.RLock()
defer c.lock.RUnlock()
if c.sessions == nil {
return nil
}
count := len(c.sessions)
if count == 0 {
return nil
}
return c.sessions[rand.Int31n(int32(count))].session
}
func (c *gettyRPCClientConn) addSession(session getty.Session) {
log.Debug("add session{%s}", session.Stat())
if session == nil {
return
}
c.lock.Lock()
c.sessions = append(c.sessions, &rpcSession{session: session})
c.lock.Unlock()
}
func (c *gettyRPCClientConn) removeSession(session getty.Session) {
if session == nil {
return
}
c.lock.Lock()
defer c.lock.Unlock()
if c.sessions == nil {
return
}
for i, s := range c.sessions {
if s.session == session {
c.sessions = append(c.sessions[:i], c.sessions[i+1:]...)
log.Debug("delete session{%s}, its index{%d}", session.Stat(), i)
break
}
}
log.Info("after remove session{%s}, left session number:%d", session.Stat(), len(c.sessions))
if len(c.sessions) == 0 {
c.close() // -> pool.remove(c)
}
}
func (c *gettyRPCClientConn) updateSession(session getty.Session) {
if session == nil {
return
}
c.lock.Lock()
defer c.lock.Unlock()
if c.sessions == nil {
return
}
for i, s := range c.sessions {
if s.session == session {
c.sessions[i].reqNum++
break
}
}
}
func (c *gettyRPCClientConn) getClientRpcSession(session getty.Session) (rpcSession, error) {
var (
err error
rpcSession rpcSession
)
c.lock.Lock()
defer c.lock.Unlock()
if c.sessions == nil {
return rpcSession, errClientClosed
}
err = errSessionNotExist
for _, s := range c.sessions {
if s.session == session {
rpcSession = *s
err = nil
break
}
}
return rpcSession, jerrors.Trace(err)
}
func (c *gettyRPCClientConn) isAvailable() bool {
if c.selectSession() == nil {
return false
}
return true
}
func (c *gettyRPCClientConn) close() error {
err := jerrors.Errorf("close gettyRPCClientConn{%#v} again", c)
c.once.Do(func() {
for _, s := range c.sessions {
log.Info("close client session{%s, last active:%s, request number:%d}",
s.session.Stat(), s.session.GetActive().String(), s.reqNum)
s.session.Close()
}
c.gettyClient.Close()
c.gettyClient = nil
c.sessions = c.sessions[:0]
c.created = 0
err = nil
c.pool.remove(c)
})
return err
}
type gettyRPCClientConnPool struct {
rpcClient *Client
size int // []*gettyRPCClientConn数组的size
ttl int64 // 每个gettyRPCClientConn的有效期时间. pool对象会在getConn时执行ttl检查
sync.Mutex
connMap map[string][]*gettyRPCClientConn // 从[]*gettyRPCClientConn 可见key是连接地址,而value是对应这个地址的连接数组
}
func newGettyRPCClientConnPool(rpcClient *Client, size int, ttl time.Duration) *gettyRPCClientConnPool {
return &gettyRPCClientConnPool{
rpcClient: rpcClient,
size: size,
ttl: int64(ttl.Seconds()),
connMap: make(map[string][]*gettyRPCClientConn),
}
}
func (p *gettyRPCClientConnPool) close() {
p.Lock()
connMap := p.connMap
p.connMap = nil
p.Unlock()
for _, connArray := range connMap {
for _, conn := range connArray {
conn.close()
}
}
}
func (p *gettyRPCClientConnPool) getConn(protocol, addr string) (*gettyRPCClientConn, error) {
p.Lock()
var builder strings.Builder
builder.WriteString(addr)
builder.WriteString("@")
builder.WriteString(protocol)
key := builder.String()
connArray := p.connMap[key]
now := time.Now().Unix()
for len(connArray) > 0 {
conn := connArray[len(connArray)-1]
connArray = connArray[:len(connArray)-1]
p.connMap[key] = connArray
if d := now - conn.created; d > p.ttl {
conn.close()
continue
}
p.Unlock()
return conn, nil
}
p.Unlock()
// create new conn
return newGettyRPCClientConn(p, protocol, addr)
}
func (p *gettyRPCClientConnPool) release(conn *gettyRPCClientConn, err error) {
if conn == nil || conn.created == 0 {
return
}
if err != nil {
conn.close()
return
}
var builder strings.Builder
builder.WriteString(conn.addr)
builder.WriteString("@")
builder.WriteString(conn.protocol)
key := builder.String()
p.Lock()
connArray := p.connMap[key]
if len(connArray) >= p.size {
p.Unlock()
conn.close()
return
}
p.connMap[key] = append(connArray, conn)
p.Unlock()
}
func (p *gettyRPCClientConnPool) remove(conn *gettyRPCClientConn) {
if conn == nil || conn.created == 0 {
return
}
var builder strings.Builder
builder.WriteString(conn.addr)
builder.WriteString("@")
builder.WriteString(conn.protocol)
key := builder.String()
p.Lock()
connArray := p.connMap[key]
if len(connArray) > 0 {
for idx, c := range connArray {
if conn == c {
p.connMap[key] = append(connArray[:idx], connArray[idx+1:]...)
break
}
}
}
p.Unlock()
}
...@@ -26,19 +26,21 @@ type Server struct { ...@@ -26,19 +26,21 @@ type Server struct {
conf *ServerConfig conf *ServerConfig
serviceMap map[string]*service serviceMap map[string]*service
tcpServerList []getty.Server tcpServerList []getty.Server
registry gxregistry.Registry
sa gxregistry.ServiceAttr // registry
nodes []*gxregistry.Node registry gxregistry.Registry
sa gxregistry.ServiceAttr
nodes []*gxregistry.Node
} }
var ( var (
ErrIllegalCodecType = jerrors.New("illegal codec type") ErrIllegalSerialType = jerrors.New("illegal codec type")
) )
func NewServer(confFile string) (*Server, error) { func NewServer(confFile string) (*Server, error) {
conf := loadServerConf(confFile) conf := loadServerConf(confFile)
if conf.codecType = String2CodecType(conf.CodecType); conf.codecType == gettyCodecUnknown { if conf.codecType = String2CodecType(conf.CodecType); conf.codecType == gettyCodecUnknown {
return nil, ErrIllegalCodecType return nil, ErrIllegalSerialType
} }
s := &Server{ s := &Server{
...@@ -46,9 +48,9 @@ func NewServer(confFile string) (*Server, error) { ...@@ -46,9 +48,9 @@ func NewServer(confFile string) (*Server, error) {
conf: conf, conf: conf,
} }
var err error
var registry gxregistry.Registry
if len(s.conf.Registry.Addr) != 0 { if len(s.conf.Registry.Addr) != 0 {
var err error
var registry gxregistry.Registry
addrList := strings.Split(s.conf.Registry.Addr, ",") addrList := strings.Split(s.conf.Registry.Addr, ",")
switch s.conf.Registry.Type { switch s.conf.Registry.Type {
case "etcd": case "etcd":
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment