Commit 9785c0d0 authored by AlexStocks's avatar AlexStocks

Mod: gettyRPCClientConn -> gettyRPCClient, gettyRPCClientConnPool -> gettyRPCClientPool

parent fc8c917d
...@@ -26,7 +26,7 @@ func init() { ...@@ -26,7 +26,7 @@ func init() {
type Client struct { type Client struct {
conf ClientConfig conf ClientConfig
pool *gettyRPCClientConnPool pool *gettyRPCClientPool
sequence gxatomic.Uint64 sequence gxatomic.Uint64
pendingLock sync.RWMutex pendingLock sync.RWMutex
...@@ -67,7 +67,7 @@ func (c *Client) Call(typ CodecType, addr, service, method string, args interfac ...@@ -67,7 +67,7 @@ func (c *Client) Call(typ CodecType, addr, service, method string, args interfac
var ( var (
err error err error
session getty.Session session getty.Session
conn *gettyRPCClientConn conn *gettyRPCClient
) )
conn, session, err = c.selectSession(typ, addr) conn, session, err = c.selectSession(typ, addr)
if err != nil || session == nil { if err != nil || session == nil {
...@@ -97,7 +97,7 @@ func (c *Client) Close() { ...@@ -97,7 +97,7 @@ func (c *Client) Close() {
c.pool = nil c.pool = nil
} }
func (c *Client) selectSession(typ CodecType, addr string) (*gettyRPCClientConn, getty.Session, error) { func (c *Client) selectSession(typ CodecType, addr string) (*gettyRPCClient, getty.Session, error) {
rpcConn, err := c.pool.getConn(typ.String(), addr) rpcConn, err := c.pool.getConn(typ.String(), addr)
if err != nil { if err != nil {
return nil, nil, jerrors.Trace(err) return nil, nil, jerrors.Trace(err)
......
...@@ -169,10 +169,10 @@ func (h *RpcServerHandler) callService(session getty.Session, req GettyRPCReques ...@@ -169,10 +169,10 @@ func (h *RpcServerHandler) callService(session getty.Session, req GettyRPCReques
//////////////////////////////////////////// ////////////////////////////////////////////
type RpcClientHandler struct { type RpcClientHandler struct {
conn *gettyRPCClientConn conn *gettyRPCClient
} }
func NewRpcClientHandler(client *gettyRPCClientConn) *RpcClientHandler { func NewRpcClientHandler(client *gettyRPCClient) *RpcClientHandler {
return &RpcClientHandler{conn: client} return &RpcClientHandler{conn: client}
} }
......
...@@ -15,13 +15,13 @@ import ( ...@@ -15,13 +15,13 @@ import (
jerrors "github.com/juju/errors" jerrors "github.com/juju/errors"
) )
type gettyRPCClientConn struct { type gettyRPCClient struct {
once sync.Once once sync.Once
protocol string protocol string
addr string addr string
created int64 // 为0,则说明没有被创建或者被销毁了 created int64 // 为0,则说明没有被创建或者被销毁了
pool *gettyRPCClientConnPool pool *gettyRPCClientPool
lock sync.RWMutex lock sync.RWMutex
gettyClient getty.Client gettyClient getty.Client
...@@ -32,8 +32,8 @@ var ( ...@@ -32,8 +32,8 @@ var (
errClientPoolClosed = jerrors.New("client pool closed") errClientPoolClosed = jerrors.New("client pool closed")
) )
func newGettyRPCClientConn(pool *gettyRPCClientConnPool, protocol, addr string) (*gettyRPCClientConn, error) { func newGettyRPCClientConn(pool *gettyRPCClientPool, protocol, addr string) (*gettyRPCClient, error) {
c := &gettyRPCClientConn{ c := &gettyRPCClient{
protocol: protocol, protocol: protocol,
addr: addr, addr: addr,
pool: pool, pool: pool,
...@@ -61,7 +61,7 @@ func newGettyRPCClientConn(pool *gettyRPCClientConnPool, protocol, addr string) ...@@ -61,7 +61,7 @@ func newGettyRPCClientConn(pool *gettyRPCClientConnPool, protocol, addr string)
return c, nil return c, nil
} }
func (c *gettyRPCClientConn) newSession(session getty.Session) error { func (c *gettyRPCClient) newSession(session getty.Session) error {
var ( var (
ok bool ok bool
tcpConn *net.TCPConn tcpConn *net.TCPConn
...@@ -100,7 +100,7 @@ func (c *gettyRPCClientConn) newSession(session getty.Session) error { ...@@ -100,7 +100,7 @@ func (c *gettyRPCClientConn) newSession(session getty.Session) error {
return nil return nil
} }
func (c *gettyRPCClientConn) selectSession() getty.Session { func (c *gettyRPCClient) selectSession() getty.Session {
c.lock.RLock() c.lock.RLock()
defer c.lock.RUnlock() defer c.lock.RUnlock()
...@@ -115,7 +115,7 @@ func (c *gettyRPCClientConn) selectSession() getty.Session { ...@@ -115,7 +115,7 @@ func (c *gettyRPCClientConn) selectSession() getty.Session {
return c.sessions[rand.Int31n(int32(count))].session return c.sessions[rand.Int31n(int32(count))].session
} }
func (c *gettyRPCClientConn) addSession(session getty.Session) { func (c *gettyRPCClient) addSession(session getty.Session) {
log.Debug("add session{%s}", session.Stat()) log.Debug("add session{%s}", session.Stat())
if session == nil { if session == nil {
return return
...@@ -126,7 +126,7 @@ func (c *gettyRPCClientConn) addSession(session getty.Session) { ...@@ -126,7 +126,7 @@ func (c *gettyRPCClientConn) addSession(session getty.Session) {
c.lock.Unlock() c.lock.Unlock()
} }
func (c *gettyRPCClientConn) removeSession(session getty.Session) { func (c *gettyRPCClient) removeSession(session getty.Session) {
if session == nil { if session == nil {
return return
} }
...@@ -150,7 +150,7 @@ func (c *gettyRPCClientConn) removeSession(session getty.Session) { ...@@ -150,7 +150,7 @@ func (c *gettyRPCClientConn) removeSession(session getty.Session) {
} }
} }
func (c *gettyRPCClientConn) updateSession(session getty.Session) { func (c *gettyRPCClient) updateSession(session getty.Session) {
if session == nil { if session == nil {
return return
} }
...@@ -168,7 +168,7 @@ func (c *gettyRPCClientConn) updateSession(session getty.Session) { ...@@ -168,7 +168,7 @@ func (c *gettyRPCClientConn) updateSession(session getty.Session) {
} }
} }
func (c *gettyRPCClientConn) getClientRpcSession(session getty.Session) (rpcSession, error) { func (c *gettyRPCClient) getClientRpcSession(session getty.Session) (rpcSession, error) {
var ( var (
err error err error
rpcSession rpcSession rpcSession rpcSession
...@@ -191,7 +191,7 @@ func (c *gettyRPCClientConn) getClientRpcSession(session getty.Session) (rpcSess ...@@ -191,7 +191,7 @@ func (c *gettyRPCClientConn) getClientRpcSession(session getty.Session) (rpcSess
return rpcSession, jerrors.Trace(err) return rpcSession, jerrors.Trace(err)
} }
func (c *gettyRPCClientConn) isAvailable() bool { func (c *gettyRPCClient) isAvailable() bool {
if c.selectSession() == nil { if c.selectSession() == nil {
return false return false
} }
...@@ -199,8 +199,8 @@ func (c *gettyRPCClientConn) isAvailable() bool { ...@@ -199,8 +199,8 @@ func (c *gettyRPCClientConn) isAvailable() bool {
return true return true
} }
func (c *gettyRPCClientConn) close() error { func (c *gettyRPCClient) close() error {
err := jerrors.Errorf("close gettyRPCClientConn{%#v} again", c) err := jerrors.Errorf("close gettyRPCClient{%#v} again", c)
c.once.Do(func() { c.once.Do(func() {
// delete @c from client pool // delete @c from client pool
c.pool.remove(c) c.pool.remove(c)
...@@ -219,25 +219,25 @@ func (c *gettyRPCClientConn) close() error { ...@@ -219,25 +219,25 @@ func (c *gettyRPCClientConn) close() error {
return err return err
} }
type gettyRPCClientConnPool struct { type gettyRPCClientPool struct {
rpcClient *Client rpcClient *Client
size int // []*gettyRPCClientConn数组的size size int // []*gettyRPCClient数组的size
ttl int64 // 每个gettyRPCClientConn的有效期时间. pool对象会在getConn时执行ttl检查 ttl int64 // 每个gettyRPCClient的有效期时间. pool对象会在getConn时执行ttl检查
sync.Mutex sync.Mutex
connMap map[string][]*gettyRPCClientConn // 从[]*gettyRPCClientConn 可见key是连接地址,而value是对应这个地址的连接数组 connMap map[string][]*gettyRPCClient // 从[]*gettyRPCClient 可见key是连接地址,而value是对应这个地址的连接数组
} }
func newGettyRPCClientConnPool(rpcClient *Client, size int, ttl time.Duration) *gettyRPCClientConnPool { func newGettyRPCClientConnPool(rpcClient *Client, size int, ttl time.Duration) *gettyRPCClientPool {
return &gettyRPCClientConnPool{ return &gettyRPCClientPool{
rpcClient: rpcClient, rpcClient: rpcClient,
size: size, size: size,
ttl: int64(ttl.Seconds()), ttl: int64(ttl.Seconds()),
connMap: make(map[string][]*gettyRPCClientConn), connMap: make(map[string][]*gettyRPCClient),
} }
} }
func (p *gettyRPCClientConnPool) close() { func (p *gettyRPCClientPool) close() {
p.Lock() p.Lock()
connMap := p.connMap connMap := p.connMap
p.connMap = nil p.connMap = nil
...@@ -249,7 +249,7 @@ func (p *gettyRPCClientConnPool) close() { ...@@ -249,7 +249,7 @@ func (p *gettyRPCClientConnPool) close() {
} }
} }
func (p *gettyRPCClientConnPool) getConn(protocol, addr string) (*gettyRPCClientConn, error) { func (p *gettyRPCClientPool) getConn(protocol, addr string) (*gettyRPCClient, error) {
var builder strings.Builder var builder strings.Builder
builder.WriteString(addr) builder.WriteString(addr)
...@@ -284,7 +284,7 @@ func (p *gettyRPCClientConnPool) getConn(protocol, addr string) (*gettyRPCClient ...@@ -284,7 +284,7 @@ func (p *gettyRPCClientConnPool) getConn(protocol, addr string) (*gettyRPCClient
return newGettyRPCClientConn(p, protocol, addr) return newGettyRPCClientConn(p, protocol, addr)
} }
func (p *gettyRPCClientConnPool) release(conn *gettyRPCClientConn, err error) { func (p *gettyRPCClientPool) release(conn *gettyRPCClient, err error) {
if conn == nil || conn.created == 0 { if conn == nil || conn.created == 0 {
return return
} }
...@@ -316,7 +316,7 @@ func (p *gettyRPCClientConnPool) release(conn *gettyRPCClientConn, err error) { ...@@ -316,7 +316,7 @@ func (p *gettyRPCClientConnPool) release(conn *gettyRPCClientConn, err error) {
p.connMap[key] = append(connArray, conn) p.connMap[key] = append(connArray, conn)
} }
func (p *gettyRPCClientConnPool) remove(conn *gettyRPCClientConn) { func (p *gettyRPCClientPool) remove(conn *gettyRPCClient) {
if conn == nil || conn.created == 0 { if conn == nil || conn.created == 0 {
return return
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment