Commit 6fbdcda8 authored by AlexStocks's avatar AlexStocks

Mod: mod conf to test p2p

parent a2eb5128
package rpc package rpc
import ( import (
"fmt"
"math/rand" "math/rand"
"strings" "strings"
"sync" "sync"
...@@ -53,7 +54,10 @@ func NewClient(confFile string) (*Client, error) { ...@@ -53,7 +54,10 @@ func NewClient(confFile string) (*Client, error) {
if len(c.conf.Registry.Addr) == 0 { if len(c.conf.Registry.Addr) == 0 {
if conf.codecType = String2CodecType(conf.CodecType); conf.codecType == gettyCodecUnknown { if conf.codecType = String2CodecType(conf.CodecType); conf.codecType == gettyCodecUnknown {
return nil, ErrIllegalSerialType return nil, jerrors.New(fmt.Sprintf(ErrIllegalConf+"codec type %s", conf.CodecType))
}
if conf.ServerPort == 0 {
return nil, jerrors.New(fmt.Sprintf(ErrIllegalConf + "both registry addr and ServerPort is nil"))
} }
_, err := c.pool.getConn(conf.CodecType, gxnet.HostAddress(conf.ServerHost, conf.ServerPort)) _, err := c.pool.getConn(conf.CodecType, gxnet.HostAddress(conf.ServerHost, conf.ServerPort))
...@@ -77,14 +81,15 @@ func NewClient(confFile string) (*Client, error) { ...@@ -77,14 +81,15 @@ func NewClient(confFile string) (*Client, error) {
gxregistry.WithTimeout(time.Duration(int(time.Second)*c.conf.Registry.KeepaliveTimeout)), gxregistry.WithTimeout(time.Duration(int(time.Second)*c.conf.Registry.KeepaliveTimeout)),
gxregistry.WithRoot(c.conf.Registry.Root), gxregistry.WithRoot(c.conf.Registry.Root),
) )
default:
return nil, jerrors.New(fmt.Sprintf(ErrIllegalConf+"registry type %s", c.conf.Registry.Type))
} }
if err != nil { if err != nil {
return nil, jerrors.Trace(err) return nil, jerrors.Trace(err)
} }
if registry != nil { c.registry = registry
c.registry = registry if c.registry != nil {
c.filter, err = gxpool.NewFilter( c.filter, err = gxpool.NewFilter(
gxfilter.WithBalancerMode(gxfilter.SM_Hash), gxfilter.WithBalancerMode(gxfilter.SM_Hash),
gxfilter.WithRegistry(c.registry), gxfilter.WithRegistry(c.registry),
...@@ -142,8 +147,14 @@ func (c *Client) Call(service, method string, args interface{}, reply interface{ ...@@ -142,8 +147,14 @@ func (c *Client) Call(service, method string, args interface{}, reply interface{
} }
func (c *Client) Close() { func (c *Client) Close() {
c.pool.close() if c.pool != nil {
c.registry.Close() c.pool.close()
}
c.pool = nil
if c.registry != nil {
c.registry.Close()
}
c.registry = nil
} }
func (c *Client) selectSession() getty.Session { func (c *Client) selectSession() getty.Session {
......
...@@ -16,7 +16,9 @@ import ( ...@@ -16,7 +16,9 @@ import (
) )
import ( import (
"github.com/AlexStocks/goext/log"
log "github.com/AlexStocks/log4go" log "github.com/AlexStocks/log4go"
"time"
) )
//////////////////////////////////////////// ////////////////////////////////////////////
...@@ -493,6 +495,8 @@ func (resp *GettyRPCResponse) Unmarshal(sz gettyCodecType, buf *bytes.Buffer) er ...@@ -493,6 +495,8 @@ func (resp *GettyRPCResponse) Unmarshal(sz gettyCodecType, buf *bytes.Buffer) er
} }
func (resp *GettyRPCResponse) GetBody() []byte { func (resp *GettyRPCResponse) GetBody() []byte {
gxlog.CWarn("resp body %p", resp.body)
time.Sleep(5e9)
return resp.body.([]byte) return resp.body.([]byte)
} }
......
...@@ -32,7 +32,7 @@ type ( ...@@ -32,7 +32,7 @@ type (
RegistryConfig struct { RegistryConfig struct {
Type string `default:"etcd" yaml:"type" json:"type,omitempty"` Type string `default:"etcd" yaml:"type" json:"type,omitempty"`
Addr string `default:"127.0.0.1:2379" yaml:"addr" json:"addr,omitempty"` Addr string `default:"" yaml:"addr" json:"addr,omitempty"`
KeepaliveTimeout int `default:"5" yaml:"keepalive_time" json:"keepalive_timeout,omitempty"` KeepaliveTimeout int `default:"5" yaml:"keepalive_time" json:"keepalive_timeout,omitempty"`
Root string `default:"getty" yaml:"keepalive_time" json:"keepalive_timeout,omitempty"` Root string `default:"getty" yaml:"keepalive_time" json:"keepalive_timeout,omitempty"`
IDC string `default:"idc-bj" yaml:"idc" json:"idc,omitempty"` IDC string `default:"idc-bj" yaml:"idc" json:"idc,omitempty"`
......
...@@ -8,11 +8,15 @@ import ( ...@@ -8,11 +8,15 @@ import (
"github.com/AlexStocks/getty/rpc" "github.com/AlexStocks/getty/rpc"
"github.com/AlexStocks/getty/rpc/example/data" "github.com/AlexStocks/getty/rpc/example/data"
log "github.com/AlexStocks/log4go" log "github.com/AlexStocks/log4go"
jerrors "github.com/juju/errors"
) )
func main() { func main() {
log.LoadConfiguration("client_log.xml") log.LoadConfiguration("client_log.xml")
client := rpc.NewClient("client_config.toml") client, err := rpc.NewClient("client_config.toml")
if err != nil {
panic(jerrors.ErrorStack(err))
}
// client.SetCodecType(rpc.ProtoBuffer)//默认是json序列化 // client.SetCodecType(rpc.ProtoBuffer)//默认是json序列化
defer client.Close() defer client.Close()
...@@ -41,9 +45,9 @@ func main() { ...@@ -41,9 +45,9 @@ func main() {
} }
var errInt int var errInt int
err := client.Call("TestRpc", "Err", 2, &errInt) err = client.Call("TestRpc", "Err", 2, &errInt)
if err != nil { if err != nil {
log.Error(err) log.Error(jerrors.ErrorStack(err))
} }
time.Sleep(20 * time.Second) time.Sleep(20 * time.Second)
......
...@@ -39,4 +39,13 @@ FailFastTimeout = "3s" ...@@ -39,4 +39,13 @@ FailFastTimeout = "3s"
TcpWriteTimeout = "5s" TcpWriteTimeout = "5s"
WaitTimeout = "1s" WaitTimeout = "1s"
MaxMsgLen = 128 MaxMsgLen = 128
SessionName = "rpc-client" SessionName = "getty-rpc-client"
# registry
[Registry]
Type = "etcd"
# Addr = "127.0.0.1:2379"
KeepaliveTimeout = 5
Root = "/getty"
IDC = "bj-unicom"
NodeID = "n147"
...@@ -32,12 +32,12 @@ FailFastTimeout = "3s" ...@@ -32,12 +32,12 @@ FailFastTimeout = "3s"
TcpWriteTimeout = "5s" TcpWriteTimeout = "5s"
WaitTimeout = "1s" WaitTimeout = "1s"
MaxMsgLen = 128 MaxMsgLen = 128
SessionName = "rpc-server" SessionName = "getty-rpc-server"
# registry # registry
[Registry] [Registry]
Type = "etcd" Type = "etcd"
Addr = "127.0.0.1:2379" # Addr = "127.0.0.1:2379"
KeepaliveTimeout = 5 KeepaliveTimeout = 5
Root = "/getty" Root = "/getty"
IDC = "bj-unicom" IDC = "bj-unicom"
......
...@@ -28,6 +28,10 @@ type gettyRPCClientConn struct { ...@@ -28,6 +28,10 @@ type gettyRPCClientConn struct {
sessions []*rpcSession sessions []*rpcSession
} }
var (
errClientPoolClosed = jerrors.New("client pool closed")
)
func newGettyRPCClientConn(pool *gettyRPCClientConnPool, protocol, addr string) (*gettyRPCClientConn, error) { func newGettyRPCClientConn(pool *gettyRPCClientConnPool, protocol, addr string) (*gettyRPCClientConn, error) {
c := &gettyRPCClientConn{ c := &gettyRPCClientConn{
protocol: protocol, protocol: protocol,
...@@ -198,6 +202,8 @@ func (c *gettyRPCClientConn) isAvailable() bool { ...@@ -198,6 +202,8 @@ func (c *gettyRPCClientConn) isAvailable() bool {
func (c *gettyRPCClientConn) close() error { func (c *gettyRPCClientConn) close() error {
err := jerrors.Errorf("close gettyRPCClientConn{%#v} again", c) err := jerrors.Errorf("close gettyRPCClientConn{%#v} again", c)
c.once.Do(func() { c.once.Do(func() {
// delete @c from client pool
c.pool.remove(c)
for _, s := range c.sessions { for _, s := range c.sessions {
log.Info("close client session{%s, last active:%s, request number:%d}", log.Info("close client session{%s, last active:%s, request number:%d}",
s.session.Stat(), s.session.GetActive().String(), s.reqNum) s.session.Stat(), s.session.GetActive().String(), s.reqNum)
...@@ -209,7 +215,6 @@ func (c *gettyRPCClientConn) close() error { ...@@ -209,7 +215,6 @@ func (c *gettyRPCClientConn) close() error {
c.created = 0 c.created = 0
err = nil err = nil
c.pool.remove(c)
}) })
return err return err
} }
...@@ -245,7 +250,6 @@ func (p *gettyRPCClientConnPool) close() { ...@@ -245,7 +250,6 @@ func (p *gettyRPCClientConnPool) close() {
} }
func (p *gettyRPCClientConnPool) getConn(protocol, addr string) (*gettyRPCClientConn, error) { func (p *gettyRPCClientConnPool) getConn(protocol, addr string) (*gettyRPCClientConn, error) {
p.Lock()
var builder strings.Builder var builder strings.Builder
builder.WriteString(addr) builder.WriteString(addr)
...@@ -254,6 +258,12 @@ func (p *gettyRPCClientConnPool) getConn(protocol, addr string) (*gettyRPCClient ...@@ -254,6 +258,12 @@ func (p *gettyRPCClientConnPool) getConn(protocol, addr string) (*gettyRPCClient
key := builder.String() key := builder.String()
p.Lock()
defer p.Unlock()
if p.connMap == nil {
return nil, errClientPoolClosed
}
connArray := p.connMap[key] connArray := p.connMap[key]
now := time.Now().Unix() now := time.Now().Unix()
...@@ -263,7 +273,7 @@ func (p *gettyRPCClientConnPool) getConn(protocol, addr string) (*gettyRPCClient ...@@ -263,7 +273,7 @@ func (p *gettyRPCClientConnPool) getConn(protocol, addr string) (*gettyRPCClient
p.connMap[key] = connArray p.connMap[key] = connArray
if d := now - conn.created; d > p.ttl { if d := now - conn.created; d > p.ttl {
conn.close() conn.close() // -> pool.remove(c)
continue continue
} }
...@@ -272,8 +282,6 @@ func (p *gettyRPCClientConnPool) getConn(protocol, addr string) (*gettyRPCClient ...@@ -272,8 +282,6 @@ func (p *gettyRPCClientConnPool) getConn(protocol, addr string) (*gettyRPCClient
return conn, nil return conn, nil
} }
p.Unlock()
// create new conn // create new conn
return newGettyRPCClientConn(p, protocol, addr) return newGettyRPCClientConn(p, protocol, addr)
} }
...@@ -283,7 +291,7 @@ func (p *gettyRPCClientConnPool) release(conn *gettyRPCClientConn, err error) { ...@@ -283,7 +291,7 @@ func (p *gettyRPCClientConnPool) release(conn *gettyRPCClientConn, err error) {
return return
} }
if err != nil { if err != nil {
conn.close() conn.close() //
return return
} }
...@@ -296,6 +304,11 @@ func (p *gettyRPCClientConnPool) release(conn *gettyRPCClientConn, err error) { ...@@ -296,6 +304,11 @@ func (p *gettyRPCClientConnPool) release(conn *gettyRPCClientConn, err error) {
key := builder.String() key := builder.String()
p.Lock() p.Lock()
defer p.Unlock()
if p.connMap == nil {
return
}
connArray := p.connMap[key] connArray := p.connMap[key]
if len(connArray) >= p.size { if len(connArray) >= p.size {
p.Unlock() p.Unlock()
...@@ -303,7 +316,6 @@ func (p *gettyRPCClientConnPool) release(conn *gettyRPCClientConn, err error) { ...@@ -303,7 +316,6 @@ func (p *gettyRPCClientConnPool) release(conn *gettyRPCClientConn, err error) {
return return
} }
p.connMap[key] = append(connArray, conn) p.connMap[key] = append(connArray, conn)
p.Unlock()
} }
func (p *gettyRPCClientConnPool) remove(conn *gettyRPCClientConn) { func (p *gettyRPCClientConnPool) remove(conn *gettyRPCClientConn) {
...@@ -320,6 +332,11 @@ func (p *gettyRPCClientConnPool) remove(conn *gettyRPCClientConn) { ...@@ -320,6 +332,11 @@ func (p *gettyRPCClientConnPool) remove(conn *gettyRPCClientConn) {
key := builder.String() key := builder.String()
p.Lock() p.Lock()
defer p.Unlock()
if p.connMap == nil {
return
}
connArray := p.connMap[key] connArray := p.connMap[key]
if len(connArray) > 0 { if len(connArray) > 0 {
for idx, c := range connArray { for idx, c := range connArray {
...@@ -329,5 +346,4 @@ func (p *gettyRPCClientConnPool) remove(conn *gettyRPCClientConn) { ...@@ -329,5 +346,4 @@ func (p *gettyRPCClientConnPool) remove(conn *gettyRPCClientConn) {
} }
} }
} }
p.Unlock()
} }
...@@ -34,13 +34,13 @@ type Server struct { ...@@ -34,13 +34,13 @@ type Server struct {
} }
var ( var (
ErrIllegalSerialType = jerrors.New("illegal codec type") ErrIllegalConf = "illegal conf: "
) )
func NewServer(confFile string) (*Server, error) { func NewServer(confFile string) (*Server, error) {
conf := loadServerConf(confFile) conf := loadServerConf(confFile)
if conf.codecType = String2CodecType(conf.CodecType); conf.codecType == gettyCodecUnknown { if conf.codecType = String2CodecType(conf.CodecType); conf.codecType == gettyCodecUnknown {
return nil, ErrIllegalSerialType return nil, jerrors.New(fmt.Sprintf(ErrIllegalConf+"codec type %s", conf.CodecType))
} }
s := &Server{ s := &Server{
...@@ -65,13 +65,15 @@ func NewServer(confFile string) (*Server, error) { ...@@ -65,13 +65,15 @@ func NewServer(confFile string) (*Server, error) {
gxregistry.WithTimeout(time.Duration(int(time.Second)*s.conf.Registry.KeepaliveTimeout)), gxregistry.WithTimeout(time.Duration(int(time.Second)*s.conf.Registry.KeepaliveTimeout)),
gxregistry.WithRoot(s.conf.Registry.Root), gxregistry.WithRoot(s.conf.Registry.Root),
) )
default:
return nil, jerrors.New(fmt.Sprintf(ErrIllegalConf+"registry type %s", s.conf.Registry.Type))
} }
if err != nil { if err != nil {
return nil, jerrors.Trace(err) return nil, jerrors.Trace(err)
} }
if registry != nil { s.registry = registry
s.registry = registry if s.registry != nil {
s.sa = gxregistry.ServiceAttr{ s.sa = gxregistry.ServiceAttr{
Group: s.conf.Registry.IDC, Group: s.conf.Registry.IDC,
Role: gxregistry.SRT_Provider, Role: gxregistry.SRT_Provider,
...@@ -88,7 +90,9 @@ func NewServer(confFile string) (*Server, error) { ...@@ -88,7 +90,9 @@ func NewServer(confFile string) (*Server, error) {
&gxregistry.Node{ &gxregistry.Node{
ID: s.conf.Registry.NodeID + "-" + net.JoinHostPort(s.conf.Host, p), ID: s.conf.Registry.NodeID + "-" + net.JoinHostPort(s.conf.Host, p),
Address: s.conf.Host, Address: s.conf.Host,
Port: int32(port)}) Port: int32(port),
},
)
} }
} }
} }
...@@ -212,8 +216,15 @@ func (s *Server) Init() { ...@@ -212,8 +216,15 @@ func (s *Server) Init() {
} }
func (s *Server) Stop() { func (s *Server) Stop() {
for _, tcpServer := range s.tcpServerList { if s.registry != nil {
tcpServer.Close() s.registry.Close()
}
list := s.tcpServerList
s.tcpServerList = nil
if list != nil {
for _, tcpServer := range list {
tcpServer.Close()
}
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment