Commit a7eaa4af authored by AlexStocks's avatar AlexStocks

Imp: add meta of ServiceMetaConfig for load balance alg

parent f82737ca
...@@ -383,7 +383,6 @@ func (c *client) RunEventLoop(newSession NewSessionCallback) { ...@@ -383,7 +383,6 @@ func (c *client) RunEventLoop(newSession NewSessionCallback) {
c.newSession = newSession c.newSession = newSession
c.Unlock() c.Unlock()
log.Info("run")
c.wg.Add(1) c.wg.Add(1)
// a for-loop goroutine to make sure the connection is valid // a for-loop goroutine to make sure the connection is valid
go func() { go func() {
......
File moved
...@@ -22,19 +22,42 @@ import ( ...@@ -22,19 +22,42 @@ import (
"github.com/AlexStocks/getty/rpc" "github.com/AlexStocks/getty/rpc"
) )
type ClientOption func(*ClientOptions) ////////////////////////////////
// Options
////////////////////////////////
type ClientOptions struct { type ClientOptions struct {
hash gxfilter.ServiceHash hash gxfilter.ServiceHash
rpc.CallOptions
} }
type ClientOption func(*ClientOptions)
func WithServiceHash(hash gxfilter.ServiceHash) ClientOption { func WithServiceHash(hash gxfilter.ServiceHash) ClientOption {
return func(o *ClientOptions) { return func(o *ClientOptions) {
o.hash = hash o.hash = hash
} }
} }
////////////////////////////////
// meta data
////////////////////////////////
const (
DefaultMetaKey = "getty-micro-meta-key"
)
func GetServiceNodeMetadata(service *gxregistry.Service) string {
if service != nil && len(service.Nodes) == 1 && service.Nodes[0].Metadata != nil {
return service.Nodes[0].Metadata[DefaultMetaKey]
}
return ""
}
////////////////////////////////
// Client
////////////////////////////////
type Client struct { type Client struct {
ClientOptions ClientOptions
*rpc.Client *rpc.Client
...@@ -132,8 +155,8 @@ func NewClient(conf *rpc.ClientConfig, regConf *ConsumerRegistryConfig, opts ... ...@@ -132,8 +155,8 @@ func NewClient(conf *rpc.ClientConfig, regConf *ConsumerRegistryConfig, opts ...
return clt, nil return clt, nil
} }
func (c *Client) Call(ctx context.Context, typ rpc.CodecType, func (c *Client) Call(ctx context.Context, typ rpc.CodecType, service, version, method string,
service, version, method string, args interface{}, reply interface{}) error { args interface{}, reply interface{}, opts ...rpc.CallOption) error {
attr := c.attr attr := c.attr
attr.Service = service attr.Service = service
...@@ -170,7 +193,7 @@ func (c *Client) Call(ctx context.Context, typ rpc.CodecType, ...@@ -170,7 +193,7 @@ func (c *Client) Call(ctx context.Context, typ rpc.CodecType,
addr := gxnet.HostAddress(svc.Nodes[0].Address, int(svc.Nodes[0].Port)) addr := gxnet.HostAddress(svc.Nodes[0].Address, int(svc.Nodes[0].Port))
return jerrors.Trace(c.Client.Call(typ, addr, service, method, args, reply)) return jerrors.Trace(c.Client.Call(typ, addr, service, method, args, reply, opts...))
} }
func (c *Client) Close() { func (c *Client) Close() {
......
...@@ -22,6 +22,7 @@ type ServiceConfig struct { ...@@ -22,6 +22,7 @@ type ServiceConfig struct {
Protocol string `default:"json" yaml:"protocol" json:"protocol,omitempty"` Protocol string `default:"json" yaml:"protocol" json:"protocol,omitempty"`
Service string `default:"test" yaml:"service" json:"service,omitempty"` Service string `default:"test" yaml:"service" json:"service,omitempty"`
Version string `default:"v1" yaml:"version" json:"version,omitempty"` Version string `default:"v1" yaml:"version" json:"version,omitempty"`
Meta string `default:"default-meta" yaml:"meta" json:"meta,omitempty"`
} }
// CheckValidity check parameter validity // CheckValidity check parameter validity
......
package micro package micro
import ( import (
"github.com/AlexStocks/goext/net"
"github.com/AlexStocks/goext/strings"
"net" "net"
"strconv" "strconv"
"strings" "strings"
...@@ -10,11 +8,16 @@ import ( ...@@ -10,11 +8,16 @@ import (
) )
import ( import (
jerrors "github.com/juju/errors"
)
import (
"github.com/AlexStocks/getty/rpc" "github.com/AlexStocks/getty/rpc"
"github.com/AlexStocks/goext/database/registry" "github.com/AlexStocks/goext/database/registry"
"github.com/AlexStocks/goext/database/registry/etcdv3" "github.com/AlexStocks/goext/database/registry/etcdv3"
"github.com/AlexStocks/goext/database/registry/zookeeper" "github.com/AlexStocks/goext/database/registry/zookeeper"
jerrors "github.com/juju/errors" "github.com/AlexStocks/goext/net"
"github.com/AlexStocks/goext/strings"
) )
// Server micro service provider // Server micro service provider
...@@ -23,7 +26,6 @@ type Server struct { ...@@ -23,7 +26,6 @@ type Server struct {
// registry // registry
regConf ProviderRegistryConfig regConf ProviderRegistryConfig
registry gxregistry.Registry registry gxregistry.Registry
nodes []*gxregistry.Node
} }
// NewServer initialize a micro service provider // NewServer initialize a micro service provider
...@@ -108,14 +110,17 @@ func (s *Server) Register(rcvr rpc.GettyRPCService) error { ...@@ -108,14 +110,17 @@ func (s *Server) Register(rcvr rpc.GettyRPCService) error {
attr.Group = c.Group attr.Group = c.Group
attr.Protocol = c.Protocol attr.Protocol = c.Protocol
node := &gxregistry.Node{
ID: c.NodeID,
Address: c.LocalHost,
Port: int32(c.LocalPort),
}
if len(c.Meta) != 0 {
node.Metadata = map[string]string{DefaultMetaKey: c.Meta}
}
service := gxregistry.Service{Attr: &attr} service := gxregistry.Service{Attr: &attr}
service.Nodes = append(service.Nodes, service.Nodes = append(service.Nodes, node)
&gxregistry.Node{
ID: c.NodeID,
Address: c.LocalHost,
Port: int32(c.LocalPort),
},
)
if err := s.registry.Register(service); err != nil { if err := s.registry.Register(service); err != nil {
return jerrors.Trace(err) return jerrors.Trace(err)
} }
......
...@@ -34,19 +34,19 @@ type CallOptions struct { ...@@ -34,19 +34,19 @@ type CallOptions struct {
type CallOption func(*CallOptions) type CallOption func(*CallOptions)
func CallRequestTimeout(d time.Duration) CallOption { func WithCallRequestTimeout(d time.Duration) CallOption {
return func(o *CallOptions) { return func(o *CallOptions) {
o.RequestTimeout = d o.RequestTimeout = d
} }
} }
func CallResponseTimeout(d time.Duration) CallOption { func WithCallResponseTimeout(d time.Duration) CallOption {
return func(o *CallOptions) { return func(o *CallOptions) {
o.ResponseTimeout = d o.ResponseTimeout = d
} }
} }
func CallMeta(k, v interface{}) CallOption { func WithCallMeta(k, v interface{}) CallOption {
return func(o *CallOptions) { return func(o *CallOptions) {
if o.Meta == nil { if o.Meta == nil {
o.Meta = make(map[interface{}]interface{}) o.Meta = make(map[interface{}]interface{})
...@@ -55,14 +55,15 @@ func CallMeta(k, v interface{}) CallOption { ...@@ -55,14 +55,15 @@ func CallMeta(k, v interface{}) CallOption {
} }
} }
type AsyncResponse struct { type CallResponse struct {
Opts CallOptions Opts CallOptions
Cause error Cause error
Start time.Time Start time.Time // invoke(call) start time == write start time
Reply interface{} ReadStart time.Time // read start time, write duration = ReadStart - Start
Reply interface{}
} }
type AsyncCallback func(response AsyncResponse) type AsyncCallback func(response CallResponse)
type Client struct { type Client struct {
conf ClientConfig conf ClientConfig
...@@ -221,6 +222,7 @@ func (c *Client) transfer(session getty.Session, typ CodecType, req *GettyRPCReq ...@@ -221,6 +222,7 @@ func (c *Client) transfer(session getty.Session, typ CodecType, req *GettyRPCReq
pkg.B = req pkg.B = req
} }
// cond1
if rsp != nil { if rsp != nil {
rsp.seq = sequence rsp.seq = sequence
c.addPendingResponse(rsp) c.addPendingResponse(rsp)
...@@ -229,6 +231,10 @@ func (c *Client) transfer(session getty.Session, typ CodecType, req *GettyRPCReq ...@@ -229,6 +231,10 @@ func (c *Client) transfer(session getty.Session, typ CodecType, req *GettyRPCReq
err = session.WritePkg(pkg, opts.RequestTimeout) err = session.WritePkg(pkg, opts.RequestTimeout)
if err != nil { if err != nil {
c.removePendingResponse(SequenceType(rsp.seq)) c.removePendingResponse(SequenceType(rsp.seq))
} else if rsp != nil { // cond2
// cond2 should not merged with cond1. cause the response package may be returned very
// soon and it will be handled by other goroutine.
rsp.readStart = time.Now()
} }
return jerrors.Trace(err) return jerrors.Trace(err)
......
...@@ -508,13 +508,14 @@ func (resp *GettyRPCResponse) GetHeader() interface{} { ...@@ -508,13 +508,14 @@ func (resp *GettyRPCResponse) GetHeader() interface{} {
//////////////////////////////////////////// ////////////////////////////////////////////
type PendingResponse struct { type PendingResponse struct {
seq uint64 seq uint64
err error err error
start time.Time start time.Time
callback AsyncCallback readStart time.Time
reply interface{} callback AsyncCallback
opts CallOptions reply interface{}
done chan struct{} opts CallOptions
done chan struct{}
} }
func NewPendingResponse() *PendingResponse { func NewPendingResponse() *PendingResponse {
...@@ -524,11 +525,12 @@ func NewPendingResponse() *PendingResponse { ...@@ -524,11 +525,12 @@ func NewPendingResponse() *PendingResponse {
} }
} }
func (r PendingResponse) GenAsyncResponse() AsyncResponse { func (r PendingResponse) GetCallResponse() CallResponse {
return AsyncResponse{ return CallResponse{
Opts: r.opts, Opts: r.opts,
Cause: r.err, Cause: r.err,
Start: r.start, Start: r.start,
Reply: r.reply, ReadStart: r.readStart,
Reply: r.reply,
} }
} }
...@@ -220,7 +220,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { ...@@ -220,7 +220,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
if pendingResponse.callback == nil { if pendingResponse.callback == nil {
pendingResponse.done <- struct{}{} pendingResponse.done <- struct{}{}
} else { } else {
pendingResponse.callback(pendingResponse.GenAsyncResponse()) pendingResponse.callback(pendingResponse.GetCallResponse())
} }
return return
} }
...@@ -235,7 +235,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { ...@@ -235,7 +235,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
if pendingResponse.callback == nil { if pendingResponse.callback == nil {
pendingResponse.done <- struct{}{} pendingResponse.done <- struct{}{}
} else { } else {
pendingResponse.callback(pendingResponse.GenAsyncResponse()) pendingResponse.callback(pendingResponse.GetCallResponse())
} }
} }
......
...@@ -257,8 +257,8 @@ func (s *session) SetRQLen(readQLen int) { ...@@ -257,8 +257,8 @@ func (s *session) SetRQLen(readQLen int) {
s.lock.Lock() s.lock.Lock()
s.rQ = make(chan interface{}, readQLen) s.rQ = make(chan interface{}, readQLen)
log.Info("%s, [session.SetRQLen] rQ{len:%d, cap:%d}", s.Stat(), len(s.rQ), cap(s.rQ))
s.lock.Unlock() s.lock.Unlock()
log.Debug("%s, [session.SetRQLen] rQ{len:%d, cap:%d}", s.Stat(), len(s.rQ), cap(s.rQ))
} }
// set @session's Write queue size // set @session's Write queue size
...@@ -269,8 +269,8 @@ func (s *session) SetWQLen(writeQLen int) { ...@@ -269,8 +269,8 @@ func (s *session) SetWQLen(writeQLen int) {
s.lock.Lock() s.lock.Lock()
s.wQ = make(chan interface{}, writeQLen) s.wQ = make(chan interface{}, writeQLen)
log.Info("%s, [session.SetWQLen] wQ{len:%d, cap:%d}", s.Stat(), len(s.wQ), cap(s.wQ))
s.lock.Unlock() s.lock.Unlock()
log.Debug("%s, [session.SetWQLen] wQ{len:%d, cap:%d}", s.Stat(), len(s.wQ), cap(s.wQ))
} }
// set maximum wait time when session got error or got exit signal // set maximum wait time when session got error or got exit signal
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment