Unverified Commit 533281db authored by fangyincheng's avatar fangyincheng Committed by GitHub

Merge pull request #1 from fangyincheng/master

Mod:AlexStocks/log4go->dubbogo/log4go
parents 1ca64ac5 84d1c961
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
## INTRO ## INTRO
Getty is a asynchronous network I/O library in golang. Getty is based on "ngo" whose author is [sanbit](https://github.com/sanbit). Getty works on tcp/udp/websocket network protocol and supplies [a uniform interface](https://github.com/alexstocks/getty/blob/master/getty.go#L45). Getty is a asynchronous network I/O library in golang. Getty is based on "ngo" whose author is [sanbit](https://github.com/sanbit). Getty works on tcp/udp/websocket network protocol and supplies [a uniform interface](https://github.com/dubbogo/getty/blob/master/getty.go#L45).
In getty there are two goroutines in one connection(session), one reads tcp stream/udp packet/websocket package, the other handles logic process and writes response into network write buffer. If your logic process may take a long time, you should start a new logic process goroutine by yourself in codec.go:(Codec)OnMessage. In getty there are two goroutines in one connection(session), one reads tcp stream/udp packet/websocket package, the other handles logic process and writes response into network write buffer. If your logic process may take a long time, you should start a new logic process goroutine by yourself in codec.go:(Codec)OnMessage.
......
...@@ -23,7 +23,7 @@ import ( ...@@ -23,7 +23,7 @@ import (
import ( import (
"github.com/AlexStocks/goext/net" "github.com/AlexStocks/goext/net"
log "github.com/AlexStocks/log4go" log "github.com/dubbogo/log4go"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
jerrors "github.com/juju/errors" jerrors "github.com/juju/errors"
) )
......
...@@ -21,7 +21,7 @@ import ( ...@@ -21,7 +21,7 @@ import (
) )
import ( import (
log "github.com/AlexStocks/log4go" log "github.com/dubbogo/log4go"
"github.com/golang/snappy" "github.com/golang/snappy"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
jerrors "github.com/juju/errors" jerrors "github.com/juju/errors"
......
module github.com/AlexStocks/getty module github.com/dubbogo/getty
require ( require (
github.com/AlexStocks/goext v0.3.2 github.com/AlexStocks/goext v0.3.2
github.com/AlexStocks/log4go v1.0.2 github.com/AlexStocks/log4go v1.0.2 // indirect
github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
github.com/coreos/bbolt v1.3.2 // indirect github.com/dubbogo/log4go v0.0.0-20190406152735-41c57e1073e9
github.com/coreos/etcd v3.3.12+incompatible // indirect
github.com/coreos/go-semver v0.2.0 // indirect
github.com/coreos/go-systemd v0.0.0-20190318101727-c7c1946145b6 // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/fatih/set v0.2.1 // indirect
github.com/gogo/protobuf v1.2.1
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef // indirect
github.com/golang/protobuf v1.3.1 // indirect
github.com/golang/snappy v0.0.1 github.com/golang/snappy v0.0.1
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c // indirect
github.com/gorilla/websocket v1.4.0 github.com/gorilla/websocket v1.4.0
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.8.5 // indirect
github.com/jonboulle/clockwork v0.1.0 // indirect
github.com/json-iterator/go v1.1.6
github.com/juju/clock v0.0.0-20190205081909-9c5c9712527c // indirect
github.com/juju/errors v0.0.0-20190207033735-e65537c515d7 github.com/juju/errors v0.0.0-20190207033735-e65537c515d7
github.com/juju/loggo v0.0.0-20190212223446-d976af380377 // indirect github.com/juju/loggo v0.0.0-20190212223446-d976af380377 // indirect
github.com/juju/retry v0.0.0-20180821225755-9058e192b216 // indirect
github.com/juju/testing v0.0.0-20180920084828-472a3e8b2073 // indirect github.com/juju/testing v0.0.0-20180920084828-472a3e8b2073 // indirect
github.com/juju/utils v0.0.0-20180820210520-bf9cc5bdd62d // indirect
github.com/juju/version v0.0.0-20180108022336-b64dbd566305 // indirect
github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 // indirect github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 // indirect
github.com/k0kubun/pp v3.0.0+incompatible // indirect github.com/k0kubun/pp v3.0.1+incompatible // indirect
github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe // indirect github.com/kr/pretty v0.1.0 // indirect
github.com/mattn/go-colorable v0.1.1 // indirect github.com/mattn/go-colorable v0.1.1 // indirect
github.com/mattn/go-isatty v0.0.7 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/name5566/leaf v0.0.0-20181103040206-1364c176dfbd // indirect
github.com/pkg/errors v0.8.1 // indirect
github.com/prometheus/client_golang v0.9.2 // indirect
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 // indirect
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec // indirect
github.com/sirupsen/logrus v1.4.0 // indirect
github.com/soheilhy/cmux v0.1.4 // indirect
github.com/stretchr/testify v1.3.0 // indirect github.com/stretchr/testify v1.3.0 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect
github.com/ugorji/go/codec v0.0.0-20190320090025-2dc34c0b8780 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
go.etcd.io/bbolt v1.3.2 // indirect
go.uber.org/atomic v1.3.2 // indirect
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.9.1 // indirect
golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53 golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce // indirect gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce // indirect
gopkg.in/yaml.v2 v2.2.2 // indirect gopkg.in/yaml.v2 v2.2.2 // indirect
) )
github.com/AlexStocks/goext v0.3.2 h1:Bn4C+R6/E5Yjk2Uc/voawtbGv91x9aCid92xwYL2AS0=
github.com/AlexStocks/goext v0.3.2/go.mod h1:3M5j9Pjge4CdkNg2WIjRLUeoPedJHHKwkkglDGSl3Hc=
github.com/AlexStocks/log4go v1.0.2 h1:1K5WM8KjSUECaoXUl8FSF05KGeCJDfBrhKUBsxwUvhk=
github.com/AlexStocks/log4go v1.0.2/go.mod h1:6kCCRo/orDo8mh5CEDOeuSSM674wBQ8M6E0K8dVOIz4=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dubbogo/log4go v0.0.0-20190406152735-41c57e1073e9 h1:RCRkCLJPUZNyAHLEEJvbFrNkyzmmzFnrRbk+eGvUwNQ=
github.com/dubbogo/log4go v0.0.0-20190406152735-41c57e1073e9/go.mod h1:iyyiSbUgJZcUgpt4hQs7YHZUop6982EGjQxIBeEmevQ=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/juju/errors v0.0.0-20190207033735-e65537c515d7 h1:dMIPRDg6gi7CUp0Kj2+HxqJ5kTr1iAdzsXYIrLCNSmU=
github.com/juju/errors v0.0.0-20190207033735-e65537c515d7/go.mod h1:W54LbzXuIE0boCoNJfwqpmkKJ1O4TCTZMetAt6jGk7Q=
github.com/juju/loggo v0.0.0-20190212223446-d976af380377 h1:n6QjW3g5JNY3xPmIjFt6z1H6tFQA6BhwOC2bvTAm1YU=
github.com/juju/loggo v0.0.0-20190212223446-d976af380377/go.mod h1:vgyd7OREkbtVEN/8IXZe5Ooef3LQePvuBm9UWj6ZL8U=
github.com/juju/testing v0.0.0-20180920084828-472a3e8b2073 h1:WQM1NildKThwdP7qWrNAFGzp4ijNLw8RlgENkaI4MJs=
github.com/juju/testing v0.0.0-20180920084828-472a3e8b2073/go.mod h1:63prj8cnj0tU0S9OHjGJn+b1h0ZghCndfnbQolrYTwA=
github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 h1:uC1QfSlInpQF+M0ao65imhwqKnz3Q2z/d8PWZRMQvDM=
github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88/go.mod h1:3w7q1U84EfirKl04SVQ/s7nPm1ZPhiXd34z40TNz36k=
github.com/k0kubun/pp v3.0.1+incompatible h1:3tqvf7QgUnZ5tXO6pNAZlrvHgl6DvifjDrd9g2S9Z40=
github.com/k0kubun/pp v3.0.1+incompatible/go.mod h1:GWse8YhT0p8pT4ir3ZgBbfZild3tgzSScAn6HmfYukg=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/mailru/easyjson v0.0.0-20190403194419-1ea4449da983 h1:wL11wNW7dhKIcRCHSm4sHKPWz0tt4mwBsVodG7+Xyqg=
github.com/mailru/easyjson v0.0.0-20190403194419-1ea4449da983/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mattn/go-colorable v0.1.1 h1:G1f5SKeVxmagw/IyvzvtZE4Gybcc4Tr1tf7I8z0XgOg=
github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ=
github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.7 h1:UvyT9uN+3r7yLEYSlJsbQGdsaB/a0DlgWP3pql6iwOc=
github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53 h1:kcXqo9vE6fsZY5X5Rd7R1l7fTgnWaDCVmln65REefiE=
golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 h1:DH4skfRX4EBpamg7iV4ZlCpblAHI6s6TDM39bFZumv8=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce h1:xcEWjVhvbDy+nHP67nPDDpbYrY+ILlfndk4bRioVHaU=
gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
package micro
import (
"context"
"strings"
"time"
)
import (
"github.com/AlexStocks/goext/context"
"github.com/AlexStocks/goext/database/filter"
"github.com/AlexStocks/goext/database/filter/pool"
"github.com/AlexStocks/goext/database/registry"
"github.com/AlexStocks/goext/database/registry/etcdv3"
"github.com/AlexStocks/goext/database/registry/zookeeper"
"github.com/AlexStocks/goext/net"
jerrors "github.com/juju/errors"
)
import (
"github.com/AlexStocks/getty/rpc"
)
////////////////////////////////
// Options
////////////////////////////////
type ClientOptions struct {
hash gxfilter.ServiceHash
}
type ClientOption func(*ClientOptions)
func WithServiceHash(hash gxfilter.ServiceHash) ClientOption {
return func(o *ClientOptions) {
o.hash = hash
}
}
////////////////////////////////
// meta data
////////////////////////////////
const (
DefaultMetaKey = "getty-micro-meta-key"
)
func GetServiceNodeMetadata(service *gxregistry.Service) string {
if service != nil && len(service.Nodes) == 1 && service.Nodes[0].Metadata != nil {
return service.Nodes[0].Metadata[DefaultMetaKey]
}
return ""
}
////////////////////////////////
// Client
////////////////////////////////
type Client struct {
ClientOptions
*rpc.Client
// registry
registry gxregistry.Registry
attr gxregistry.ServiceAttr
filter gxfilter.Filter
svcMap map[gxregistry.ServiceAttr]*gxfilter.ServiceArray
}
// NewServer initialize a micro service consumer
func NewClient(conf *rpc.ClientConfig, regConf *ConsumerRegistryConfig, opts ...ClientOption) (*Client, error) {
var (
err error
rpcClient *rpc.Client
registry gxregistry.Registry
filter gxfilter.Filter
)
if err = regConf.CheckValidity(); err != nil {
return nil, jerrors.Trace(err)
}
if rpcClient, err = rpc.NewClient(conf); err != nil {
return nil, jerrors.Trace(err)
}
regAddrList := strings.Split(regConf.RegAddr, ",")
switch regConf.Type {
case "etcd":
registry, err = gxetcd.NewRegistry(
gxregistry.WithAddrs(regAddrList...),
gxregistry.WithTimeout(time.Duration(1e9*regConf.KeepaliveTimeout)),
gxregistry.WithRoot(regConf.Root),
)
case "zookeeper":
registry, err = gxzookeeper.NewRegistry(
gxregistry.WithAddrs(regAddrList...),
gxregistry.WithTimeout(time.Duration(1e9*regConf.KeepaliveTimeout)),
gxregistry.WithRoot(regConf.Root),
)
default:
return nil, jerrors.Errorf(ErrIllegalConf+"registry type %s", regConf.Type)
}
if err != nil {
return nil, jerrors.Trace(err)
}
serviceAttrFilter := gxregistry.ServiceAttr{
Role: gxregistry.SRT_Provider,
}
gxctx := gxcontext.NewValuesContext(nil)
gxctx.Set(gxpool.GxfilterServiceAttrKey, serviceAttrFilter)
if filter, err = gxpool.NewFilter(
gxfilter.WithRegistry(registry),
gxfilter.WithContext(gxctx),
gxpool.WithTTL(time.Duration(1e9*regConf.KeepaliveTimeout)),
); err != nil {
return nil, jerrors.Trace(err)
}
// service := gxregistry.Service{
// Attr: &gxregistry.ServiceAttr{
// Group: regConf.IDC,
// Role: gxregistry.SRT_Consumer,
// Protocol: regConf.Codec,
// },
// Nodes: []*gxregistry.Node{
// &gxregistry.Node{
// ID: regConf.NodeID,
// Address: conf.Host,
// // Port: 0,
// },
// },
// }
// if err = registry.Register(service); err != nil {
// return nil, jerrors.Trace(err)
// }
clt := &Client{
Client: rpcClient,
registry: registry,
attr: gxregistry.ServiceAttr{
Group: regConf.Group,
},
filter: filter,
svcMap: make(map[gxregistry.ServiceAttr]*gxfilter.ServiceArray),
}
for _, o := range opts {
o(&(clt.ClientOptions))
}
return clt, nil
}
func (c *Client) getServiceAddr(ctx context.Context, typ rpc.CodecType, service, version string) (string, error) {
attr := c.attr
attr.Service = service
attr.Protocol = typ.String()
attr.Role = gxregistry.SRT_Provider
attr.Version = version
var addr string
flag := false
svcArray, ok := c.svcMap[attr]
if !ok {
flag = true
} else {
if ok = c.filter.CheckServiceAlive(attr, svcArray); !ok {
flag = true
}
}
if flag {
var err error
if svcArray, err = c.filter.Filter(attr); err != nil {
return addr, jerrors.Trace(err)
}
c.svcMap[attr] = svcArray
}
svc, err := svcArray.Select(ctx, c.ClientOptions.hash)
if err != nil {
return addr, jerrors.Trace(err)
}
if len(svc.Nodes) != 1 {
return addr, jerrors.Errorf("illegal service %#v", svc)
}
return gxnet.HostAddress(svc.Nodes[0].Address, int(svc.Nodes[0].Port)), nil
}
func (c *Client) CallOneway(ctx context.Context, typ rpc.CodecType, service, version, method string,
args interface{}, opts ...rpc.CallOption) error {
addr, err := c.getServiceAddr(ctx, typ, service, version)
if err != nil {
return jerrors.Trace(err)
}
return jerrors.Trace(c.Client.CallOneway(typ, addr, service, method, args, opts...))
}
func (c *Client) Call(ctx context.Context, typ rpc.CodecType, service, version, method string,
args interface{}, reply interface{}, opts ...rpc.CallOption) error {
addr, err := c.getServiceAddr(ctx, typ, service, version)
if err != nil {
return jerrors.Trace(err)
}
return jerrors.Trace(c.Client.Call(typ, addr, service, method, args, reply, opts...))
}
func (c *Client) AsyncCall(ctx context.Context, typ rpc.CodecType, service, version, method string,
args interface{}, callback rpc.AsyncCallback, reply interface{}, opts ...rpc.CallOption) error {
addr, err := c.getServiceAddr(ctx, typ, service, version)
if err != nil {
return jerrors.Trace(err)
}
return jerrors.Trace(c.Client.AsyncCall(typ, addr, service, method, args, callback, reply, opts...))
}
func (c *Client) Close() {
c.filter.Close()
c.registry.Close()
c.Client.Close()
}
package micro
import (
"github.com/AlexStocks/getty/rpc"
"github.com/AlexStocks/goext/container/set/strset"
jerrors "github.com/juju/errors"
)
const (
ErrIllegalConf = "illegal conf "
)
var (
registryArray = strset.New("zookeeper", "etcd")
)
type ServiceConfig struct {
LocalHost string `default:"127.0.0.1" yaml:"local_host" json:"local_host, omitempty"`
LocalPort int `default:"10001" yaml:"local_port" json:"local_port, omitempty"`
Group string `default:"idc-bj" yaml:"group" json:"group,omitempty"`
NodeID string `default:"node0" yaml:"node_id" json:"node_id,omitempty"`
Protocol string `default:"json" yaml:"protocol" json:"protocol,omitempty"`
Service string `default:"test" yaml:"service" json:"service,omitempty"`
Version string `default:"v1" yaml:"version" json:"version,omitempty"`
Meta string `default:"default-meta" yaml:"meta" json:"meta,omitempty"`
}
// CheckValidity check parameter validity
func (c *ServiceConfig) CheckValidity() error {
if len(c.LocalHost) == 0 {
return jerrors.Errorf(ErrIllegalConf+"local host %s", c.LocalHost)
}
if c.LocalPort <= 0 || 65535 < c.LocalPort {
return jerrors.Errorf(ErrIllegalConf+"local port %s", c.LocalPort)
}
if len(c.Group) == 0 {
return jerrors.Errorf(ErrIllegalConf+"group %s", c.Group)
}
if len(c.NodeID) == 0 {
return jerrors.Errorf(ErrIllegalConf+"node id %s", c.NodeID)
}
if codec := rpc.GetCodecType(c.Protocol); codec == rpc.CodecUnknown {
return jerrors.Errorf(ErrIllegalConf+"protocol type %s", c.Protocol)
}
if len(c.Service) == 0 {
return jerrors.Errorf(ErrIllegalConf+"service %s", c.Service)
}
if len(c.Version) == 0 {
return jerrors.Errorf(ErrIllegalConf+"service version %s", c.Version)
}
return nil
}
// RegistryConfig provides configuration for registry
type RegistryConfig struct {
Type string `default:"etcd" yaml:"type" json:"type,omitempty"`
RegAddr string `default:"127.0.0.1:2181" yaml:"reg_addr" json:"reg_addr,omitempty"`
KeepaliveTimeout int `default:"5" yaml:"keepalive_timeout" json:"keepalive_timeout,omitempty"`
Root string `default:"/getty" yaml:"root" json:"root,omitempty"`
}
// CheckValidity check parameter validity
func (c *RegistryConfig) CheckValidity() error {
if !registryArray.Has(c.Type) {
return jerrors.Errorf(ErrIllegalConf+"registry type %s", c.Type)
}
if len(c.RegAddr) == 0 {
return jerrors.Errorf(ErrIllegalConf+"registry addr %s", c.RegAddr)
}
if c.KeepaliveTimeout < 0 {
return jerrors.Errorf(ErrIllegalConf+"keepalive timeout %d", c.KeepaliveTimeout)
}
if len(c.Root) == 0 {
return jerrors.Errorf(ErrIllegalConf+"root %s", c.Root)
}
return nil
}
// ProviderRegistryConfig provides provider configuration for registry
type ProviderRegistryConfig struct {
RegistryConfig `yaml:"basic" json:"basic,omitempty"`
ServiceArray []ServiceConfig `default:"" yaml:"service_array" json:"service_array,omitempty"`
}
// CheckValidity check parameter validity
func (c *ProviderRegistryConfig) CheckValidity() error {
if err := c.RegistryConfig.CheckValidity(); err != nil {
return jerrors.Trace(err)
}
for idx := 0; idx < len(c.ServiceArray); idx++ {
if err := c.ServiceArray[idx].CheckValidity(); err != nil {
return jerrors.Errorf(ErrIllegalConf+"service reference config, idx:%d, err:%s",
idx, jerrors.ErrorStack(err))
}
}
return nil
}
// ConsumerRegistryConfig provides consumer configuration for registry
type ConsumerRegistryConfig struct {
RegistryConfig `yaml:"basic" json:"basic,omitempty"`
Group string `default:"idc-bj" yaml:"group" json:"group,omitempty"`
}
// CheckValidity check parameter validity
func (c *ConsumerRegistryConfig) CheckValidity() error {
if err := c.RegistryConfig.CheckValidity(); err != nil {
return jerrors.Trace(err)
}
if len(c.Group) == 0 {
return jerrors.Errorf(ErrIllegalConf+"group %s", c.Group)
}
return nil
}
package micro
import (
"net"
"strconv"
"strings"
"time"
)
import (
jerrors "github.com/juju/errors"
)
import (
"github.com/AlexStocks/getty/rpc"
"github.com/AlexStocks/goext/database/registry"
"github.com/AlexStocks/goext/database/registry/etcdv3"
"github.com/AlexStocks/goext/database/registry/zookeeper"
"github.com/AlexStocks/goext/net"
"github.com/AlexStocks/goext/strings"
)
// Server micro service provider
type Server struct {
*rpc.Server
// registry
regConf ProviderRegistryConfig
registry gxregistry.Registry
}
// NewServer initialize a micro service provider
func NewServer(conf *rpc.ServerConfig, regConf *ProviderRegistryConfig) (*Server, error) {
var (
err error
rpcServer *rpc.Server
registry gxregistry.Registry
)
if err = regConf.CheckValidity(); err != nil {
return nil, jerrors.Trace(err)
}
if rpcServer, err = rpc.NewServer(conf); err != nil {
return nil, jerrors.Trace(err)
}
regAddrList := strings.Split(regConf.RegAddr, ",")
switch regConf.Type {
case "etcd":
registry, err = gxetcd.NewRegistry(
gxregistry.WithAddrs(regAddrList...),
gxregistry.WithTimeout(time.Duration(1e9*regConf.KeepaliveTimeout)),
gxregistry.WithRoot(regConf.Root),
)
case "zookeeper":
registry, err = gxzookeeper.NewRegistry(
gxregistry.WithAddrs(regAddrList...),
gxregistry.WithTimeout(time.Duration(1e9*regConf.KeepaliveTimeout)),
gxregistry.WithRoot(regConf.Root),
)
default:
return nil, jerrors.Errorf(ErrIllegalConf+"registry type %s", regConf.Type)
}
if err != nil {
return nil, jerrors.Trace(err)
}
var localAddrArr []string
for _, p := range conf.Ports {
port, err := strconv.Atoi(p)
if err != nil {
return nil, jerrors.Trace(err)
}
if port <= 0 || 65535 < port {
return nil, jerrors.Errorf("illegal port %s", p)
}
localAddrArr = append(localAddrArr, net.JoinHostPort(conf.Host, p))
}
for _, svr := range regConf.ServiceArray {
addr := gxnet.HostAddress(svr.LocalHost, svr.LocalPort)
if ok := gxstrings.Contains(gxstrings.Strings2Ifs(localAddrArr), addr); !ok {
return nil, jerrors.Errorf("can not find ServiceConfig addr %s in conf address array %#v",
addr, localAddrArr)
}
}
return &Server{
Server: rpcServer,
regConf: *regConf,
registry: registry,
}, nil
}
// Register the @rcvr
func (s *Server) Register(rcvr rpc.GettyRPCService) error {
var (
flag bool
attr gxregistry.ServiceAttr
)
attr.Role = gxregistry.SRT_Provider
attr.Service = rcvr.Service()
attr.Version = rcvr.Version()
for _, c := range s.regConf.ServiceArray {
if c.Service == rcvr.Service() && c.Version == rcvr.Version() {
flag = true
attr.Group = c.Group
attr.Protocol = c.Protocol
node := &gxregistry.Node{
ID: c.NodeID,
Address: c.LocalHost,
Port: int32(c.LocalPort),
}
if len(c.Meta) != 0 {
node.Metadata = map[string]string{DefaultMetaKey: c.Meta}
}
service := gxregistry.Service{Attr: &attr}
service.Nodes = append(service.Nodes, node)
if err := s.registry.Register(service); err != nil {
return jerrors.Trace(err)
}
}
}
if !flag {
return jerrors.Errorf("can not find @rcvr{service:%s, version:%s} in registry config:%#v",
rcvr.Service(), rcvr.Version(), s.regConf)
}
if err := s.Server.Register(rcvr); err != nil {
return jerrors.Trace(err)
}
return nil
}
// func (s *Server) Start() {
// s.Server.Start()
// }
// func (s *Server) Stop() {
// s.Server.Stop()
// }
package rpc
import (
"math/rand"
"sync"
"time"
)
import (
"github.com/AlexStocks/getty"
"github.com/AlexStocks/goext/sync/atomic"
jerrors "github.com/juju/errors"
)
var (
errInvalidCodecType = jerrors.New("illegal CodecType")
errInvalidAddress = jerrors.New("remote address invalid or empty")
errSessionNotExist = jerrors.New("session not exist")
errClientClosed = jerrors.New("client closed")
errClientReadTimeout = jerrors.New("client read timeout")
)
func init() {
rand.Seed(time.Now().UnixNano())
}
type CallOptions struct {
// request timeout
RequestTimeout time.Duration
// response timeout
ResponseTimeout time.Duration
Meta map[interface{}]interface{}
}
type CallOption func(*CallOptions)
func WithCallRequestTimeout(d time.Duration) CallOption {
return func(o *CallOptions) {
o.RequestTimeout = d
}
}
func WithCallResponseTimeout(d time.Duration) CallOption {
return func(o *CallOptions) {
o.ResponseTimeout = d
}
}
func WithCallMeta(k, v interface{}) CallOption {
return func(o *CallOptions) {
if o.Meta == nil {
o.Meta = make(map[interface{}]interface{})
}
o.Meta[k] = v
}
}
type CallResponse struct {
Opts CallOptions
Cause error
Start time.Time // invoke(call) start time == write start time
ReadStart time.Time // read start time, write duration = ReadStart - Start
Reply interface{}
}
type AsyncCallback func(response CallResponse)
type Client struct {
conf ClientConfig
pool *gettyRPCClientPool
sequence gxatomic.Uint64
pendingLock sync.RWMutex
pendingResponses map[SequenceType]*PendingResponse
}
func NewClient(conf *ClientConfig) (*Client, error) {
if err := conf.CheckValidity(); err != nil {
return nil, jerrors.Trace(err)
}
c := &Client{
pendingResponses: make(map[SequenceType]*PendingResponse),
conf: *conf,
}
c.pool = newGettyRPCClientConnPool(c, conf.PoolSize, time.Duration(int(time.Second)*conf.PoolTTL))
return c, nil
}
// call one way
func (c *Client) CallOneway(typ CodecType, addr, service, method string, args interface{}, opts ...CallOption) error {
var copts CallOptions
for _, o := range opts {
o(&copts)
}
return jerrors.Trace(c.call(CT_OneWay, typ, addr, service, method, args, nil, nil, copts))
}
// if @reply is nil, the transport layer will get the response without notify the invoker.
func (c *Client) Call(typ CodecType, addr, service, method string, args, reply interface{}, opts ...CallOption) error {
var copts CallOptions
for _, o := range opts {
o(&copts)
}
ct := CT_TwoWay
if reply == nil {
ct = CT_TwoWayNoReply
}
return jerrors.Trace(c.call(ct, typ, addr, service, method, args, reply, nil, copts))
}
func (c *Client) AsyncCall(typ CodecType, addr, service, method string, args interface{},
callback AsyncCallback, reply interface{}, opts ...CallOption) error {
var copts CallOptions
for _, o := range opts {
o(&copts)
}
return jerrors.Trace(c.call(CT_TwoWay, typ, addr, service, method, args, reply, callback, copts))
}
func (c *Client) call(ct CallType, typ CodecType, addr, service, method string,
args, reply interface{}, callback AsyncCallback, opts CallOptions) error {
if opts.RequestTimeout == 0 {
opts.RequestTimeout = c.conf.GettySessionParam.tcpWriteTimeout
}
if opts.ResponseTimeout == 0 {
opts.ResponseTimeout = c.conf.GettySessionParam.tcpReadTimeout
}
if !typ.CheckValidity() {
return errInvalidCodecType
}
b := &GettyRPCRequest{}
b.header.Service = service
b.header.Method = method
b.header.CallType = ct
b.body = args
var rsp *PendingResponse
if ct != CT_OneWay {
rsp = NewPendingResponse()
rsp.reply = reply
rsp.callback = callback
rsp.opts = opts
}
var (
err error
session getty.Session
conn *gettyRPCClient
)
conn, session, err = c.selectSession(typ, addr)
if err != nil || session == nil {
return errSessionNotExist
}
defer c.pool.release(conn, err)
if err = c.transfer(session, typ, b, rsp, opts); err != nil {
return jerrors.Trace(err)
}
if ct == CT_OneWay || callback != nil {
return nil
}
select {
case <-getty.GetTimeWheel().After(opts.ResponseTimeout):
err = errClientReadTimeout
c.removePendingResponse(SequenceType(rsp.seq))
case <-rsp.done:
err = rsp.err
}
return jerrors.Trace(err)
}
func (c *Client) Close() {
if c.pool != nil {
c.pool.close()
}
c.pool = nil
}
func (c *Client) selectSession(typ CodecType, addr string) (*gettyRPCClient, getty.Session, error) {
rpcConn, err := c.pool.getConn(typ.String(), addr)
if err != nil {
return nil, nil, jerrors.Trace(err)
}
return rpcConn, rpcConn.selectSession(), nil
}
func (c *Client) heartbeat(session getty.Session, typ CodecType) error {
return c.transfer(session, typ, nil, NewPendingResponse(), CallOptions{})
}
func (c *Client) transfer(session getty.Session, typ CodecType, req *GettyRPCRequest,
rsp *PendingResponse, opts CallOptions) error {
var (
sequence uint64
err error
pkg GettyPackage
)
sequence = c.sequence.Add(1)
pkg.H.Magic = MagicType(gettyPackageMagic)
pkg.H.LogID = LogIDType(randomID())
pkg.H.Sequence = SequenceType(sequence)
pkg.H.Command = gettyCmdHbRequest
pkg.H.CodecType = typ
if req != nil {
pkg.H.Command = gettyCmdRPCRequest
pkg.B = req
}
// cond1
if rsp != nil {
rsp.seq = sequence
c.addPendingResponse(rsp)
}
err = session.WritePkg(pkg, opts.RequestTimeout)
if err != nil {
c.removePendingResponse(SequenceType(rsp.seq))
} else if rsp != nil { // cond2
// cond2 should not merged with cond1. cause the response package may be returned very
// soon and it will be handled by other goroutine.
rsp.readStart = time.Now()
}
return jerrors.Trace(err)
}
// func (c *Client) PendingResponseCount() int {
// c.pendingLock.RLock()
// defer c.pendingLock.RUnlock()
// return len(c.pendingResponses)
// }
func (c *Client) addPendingResponse(pr *PendingResponse) {
c.pendingLock.Lock()
defer c.pendingLock.Unlock()
c.pendingResponses[SequenceType(pr.seq)] = pr
}
func (c *Client) removePendingResponse(seq SequenceType) *PendingResponse {
c.pendingLock.Lock()
defer c.pendingLock.Unlock()
if c.pendingResponses == nil {
return nil
}
if presp, ok := c.pendingResponses[seq]; ok {
delete(c.pendingResponses, seq)
return presp
}
return nil
}
// func (c *Client) ClearPendingResponses() map[SequenceType]*PendingResponse {
// c.pendingLock.Lock()
// defer c.pendingLock.Unlock()
// presps := c.pendingResponses
// c.pendingResponses = nil
// return presps
// }
package rpc
import (
"bytes"
"encoding/binary"
"fmt"
"reflect"
"time"
"unsafe"
)
import (
log "github.com/AlexStocks/log4go"
proto "github.com/gogo/protobuf/proto"
"github.com/json-iterator/go"
jerrors "github.com/juju/errors"
)
////////////////////////////////////////////
// getty command
////////////////////////////////////////////
type gettyCommand int16
const (
gettyDefaultCmd gettyCommand = 0x00
gettyCmdHbRequest = 0x01
gettyCmdHbResponse = 0x02
gettyCmdRPCRequest = 0x03
gettyCmdRPCResponse = 0x04
)
var gettyCommandStrings = [...]string{
"getty-default",
"getty-heartbeat-request",
"getty-heartbeat-response",
"getty-request",
"getty-response",
}
func (c gettyCommand) String() string {
return gettyCommandStrings[c]
}
////////////////////////////////////////////
// getty error code
////////////////////////////////////////////
type ErrorCode int16
const (
GettyOK ErrorCode = 0x00
GettyFail = 0x01
)
////////////////////////////////////////////
// getty codec type
////////////////////////////////////////////
type CodecType int16
const (
CodecUnknown CodecType = 0x00
CodecJson = 0x01
CodecProtobuf = 0x02
)
var (
gettyCodecStrings = [...]string{
"unknown",
"json",
"protobuf",
}
Codecs = map[CodecType]Codec{
CodecJson: &JSONCodec{},
CodecProtobuf: &PBCodec{},
}
)
func (c CodecType) String() string {
if c == CodecJson || c == CodecProtobuf {
return gettyCodecStrings[c]
}
return gettyCodecStrings[CodecUnknown]
}
func (c CodecType) CheckValidity() bool {
if c == CodecJson || c == CodecProtobuf {
return true
}
return false
}
func GetCodecType(codecType string) CodecType {
switch codecType {
case gettyCodecStrings[CodecJson]:
return CodecJson
case gettyCodecStrings[CodecProtobuf]:
return CodecProtobuf
}
return CodecUnknown
}
////////////////////////////////////////////
// getty codec
////////////////////////////////////////////
type Codec interface {
Encode(interface{}) ([]byte, error)
Decode([]byte, interface{}) error
}
type JSONCodec struct{}
var (
jsonstd = jsoniter.ConfigCompatibleWithStandardLibrary
)
func (c JSONCodec) Encode(i interface{}) ([]byte, error) {
// return json.Marshal(i)
return jsonstd.Marshal(i)
}
func (c JSONCodec) Decode(data []byte, i interface{}) error {
// return json.Unmarshal(data, i)
return jsonstd.Unmarshal(data, i)
}
type PBCodec struct{}
// Encode takes the protocol buffer
// and encodes it into the wire format, returning the data.
func (c PBCodec) Encode(msg interface{}) ([]byte, error) {
// Can the object marshal itself?
if pb, ok := msg.(proto.Marshaler); ok {
pbBuf, err := pb.Marshal()
return pbBuf, jerrors.Trace(err)
}
if pb, ok := msg.(proto.Message); ok {
p := proto.NewBuffer(nil)
err := p.Marshal(pb)
return p.Bytes(), jerrors.Trace(err)
}
return nil, fmt.Errorf("protobuf can not marshal %T", msg)
}
// Decode parses the protocol buffer representation in buf and
// writes the decoded result to pb. If the struct underlying pb does not match
// the data in buf, the results can be unpredictable.
//
// UnmarshalMerge merges into existing data in pb.
// Most code should use Unmarshal instead.
func (c PBCodec) Decode(buf []byte, msg interface{}) error {
// If the object can unmarshal itself, let it.
if u, ok := msg.(proto.Unmarshaler); ok {
return jerrors.Trace(u.Unmarshal(buf))
}
if pb, ok := msg.(proto.Message); ok {
return jerrors.Trace(proto.NewBuffer(nil).Unmarshal(pb))
}
return fmt.Errorf("protobuf can not unmarshal %T", msg)
}
////////////////////////////////////////////
// GettyPackageHandler
////////////////////////////////////////////
const (
gettyPackageMagic = 0x20160905
maxPackageLen = 4 * 1024 * 1024
gettyPackageHeaderLen = (int)((uint)(unsafe.Sizeof(GettyPackageHeader{})))
)
var (
ErrNotEnoughStream = jerrors.New("packet stream is not enough")
ErrTooLargePackage = jerrors.New("package length is exceed the getty package's legal maximum length.")
ErrInvalidPackage = jerrors.New("invalid rpc package")
ErrIllegalMagic = jerrors.New("package magic is not right.")
)
type RPCPackage interface {
Marshal(CodecType, *bytes.Buffer) (int, error)
// @buf length should be equal to GettyPkg.GettyPackageHeader.Len
Unmarshal(sz CodecType, buf *bytes.Buffer) error
GetBody() []byte
GetHeader() interface{}
}
type (
MagicType int32
LogIDType int64
SequenceType uint64
ServiceIDType int16
PkgLenType int32
)
type GettyPackageHeader struct {
Magic MagicType // magic number
Command gettyCommand // operation command code
ServiceID ServiceIDType // service id
Sequence SequenceType // request/response sequence
LogID LogIDType // log id
Code ErrorCode // error code
CodecType CodecType // codec type
PkgLen PkgLenType // package body length
}
type GettyPackage struct {
H GettyPackageHeader
B RPCPackage
}
func (p GettyPackage) String() string {
return fmt.Sprintf("log id:%d, sequence:%d, command:%s",
p.H.LogID, p.H.Sequence, (gettyCommand(p.H.Command)).String())
}
func (p *GettyPackage) Marshal() (*bytes.Buffer, error) {
var (
err error
headerBuf, buf *bytes.Buffer
)
buf = bytes.NewBuffer(make([]byte, gettyPackageHeaderLen, gettyPackageHeaderLen<<2))
// body
if p.B != nil {
length, err := p.B.Marshal(p.H.CodecType, buf)
if err != nil {
return nil, jerrors.Trace(err)
}
p.H.PkgLen = PkgLenType(length)
}
// header
headerBuf = bytes.NewBuffer(nil)
err = binary.Write(headerBuf, binary.LittleEndian, p.H)
if err != nil {
return nil, jerrors.Trace(err)
}
copy(buf.Bytes(), headerBuf.Bytes()[:gettyPackageHeaderLen])
return buf, nil
}
func (p *GettyPackage) Unmarshal(buf *bytes.Buffer) (int, error) {
bufLen := buf.Len()
if bufLen < gettyPackageHeaderLen {
return 0, ErrNotEnoughStream
}
// header
if err := binary.Read(buf, binary.LittleEndian, &(p.H)); err != nil {
return 0, jerrors.Trace(err)
}
if p.H.Magic != gettyPackageMagic {
log.Error("@p.H.Magic{%x}, right magic{%x}", p.H.Magic, gettyPackageMagic)
return 0, ErrIllegalMagic
}
totalLen := int(PkgLenType(gettyPackageHeaderLen) + p.H.PkgLen)
if totalLen > maxPackageLen {
return 0, ErrTooLargePackage
}
if bufLen >= totalLen && p.H.PkgLen != 0 {
if err := p.B.Unmarshal(p.H.CodecType, bytes.NewBuffer(buf.Next(int(p.H.PkgLen)))); err != nil {
return 0, jerrors.Trace(err)
}
}
return totalLen, nil
}
////////////////////////////////////////////
// GettyRPCRequest
////////////////////////////////////////////
type GettyRPCHeaderLenType uint16
// // easyjson:json
// type GettyRPCRequestHeader struct {
// Service string
// Method string
// CallType gettyCallType
// }
type GettyRPCRequest struct {
header GettyRPCRequestHeader
body interface{}
}
type GettyRPCRequestPackage struct {
H GettyPackageHeader
header GettyRPCRequestHeader
service *service
methodType *methodType
argv reflect.Value
replyv reflect.Value
}
func NewGettyRPCRequest() RPCPackage {
return &GettyRPCRequest{}
}
func (req *GettyRPCRequest) Marshal(sz CodecType, buf *bytes.Buffer) (int, error) {
codec := Codecs[sz]
if codec == nil {
return 0, jerrors.Errorf("can not find codec for %s", sz)
}
headerData, err := codec.Encode(&req.header)
if err != nil {
return 0, jerrors.Trace(err)
}
bodyData, err := codec.Encode(req.body)
if err != nil {
return 0, jerrors.Trace(err)
}
err = binary.Write(buf, binary.LittleEndian, uint16(len(headerData)))
if err != nil {
return 0, jerrors.Trace(err)
}
err = binary.Write(buf, binary.LittleEndian, headerData)
if err != nil {
return 0, jerrors.Trace(err)
}
err = binary.Write(buf, binary.LittleEndian, uint16(len(bodyData)))
if err != nil {
return 0, jerrors.Trace(err)
}
err = binary.Write(buf, binary.LittleEndian, bodyData)
if err != nil {
return 0, jerrors.Trace(err)
}
return 2 + len(headerData) + 2 + len(bodyData), nil
}
func (req *GettyRPCRequest) Unmarshal(ct CodecType, buf *bytes.Buffer) error {
var headerLen uint16
err := binary.Read(buf, binary.LittleEndian, &headerLen)
if err != nil {
return jerrors.Trace(err)
}
header := make([]byte, headerLen)
err = binary.Read(buf, binary.LittleEndian, header)
if err != nil {
return jerrors.Trace(err)
}
var bodyLen uint16
err = binary.Read(buf, binary.LittleEndian, &bodyLen)
if err != nil {
return jerrors.Trace(err)
}
body := make([]byte, bodyLen)
err = binary.Read(buf, binary.LittleEndian, body)
if err != nil {
return jerrors.Trace(err)
}
codec := Codecs[ct]
if codec == nil {
return jerrors.Errorf("can not find codec for %d", ct)
}
err = codec.Decode(header, &req.header)
if err != nil {
return jerrors.Trace(err)
}
req.body = body
return nil
}
func (req *GettyRPCRequest) GetBody() []byte {
return req.body.([]byte)
}
func (req *GettyRPCRequest) GetHeader() interface{} {
return req.header
}
////////////////////////////////////////////
// GettyRPCResponse
////////////////////////////////////////////
// type GettyRPCResponseHeader struct {
// Error string
// }
type GettyRPCResponse struct {
header GettyRPCResponseHeader
body interface{}
}
type GettyRPCResponsePackage struct {
H GettyPackageHeader
header GettyRPCResponseHeader
body []byte
}
func NewGettyRPCResponse() RPCPackage {
return &GettyRPCResponse{}
}
func (resp *GettyRPCResponse) Marshal(sz CodecType, buf *bytes.Buffer) (int, error) {
codec := Codecs[sz]
if codec == nil {
return 0, jerrors.Errorf("can not find codec for %d", sz)
}
var err error
var headerData, bodyData []byte
headerData, err = codec.Encode(&resp.header)
if err != nil {
return 0, jerrors.Trace(err)
}
if resp.body != nil {
bodyData, err = codec.Encode(resp.body)
if err != nil {
return 0, jerrors.Trace(err)
}
}
err = binary.Write(buf, binary.LittleEndian, uint16(len(headerData)))
if err != nil {
return 0, jerrors.Trace(err)
}
err = binary.Write(buf, binary.LittleEndian, headerData)
if err != nil {
return 0, jerrors.Trace(err)
}
err = binary.Write(buf, binary.LittleEndian, uint16(len(bodyData)))
if err != nil {
return 0, jerrors.Trace(err)
}
err = binary.Write(buf, binary.LittleEndian, bodyData)
if err != nil {
return 0, jerrors.Trace(err)
}
return 2 + len(headerData) + 2 + len(bodyData), nil
}
func (resp *GettyRPCResponse) Unmarshal(sz CodecType, buf *bytes.Buffer) error {
var headerLen uint16
err := binary.Read(buf, binary.LittleEndian, &headerLen)
if err != nil {
return jerrors.Trace(err)
}
header := make([]byte, headerLen)
err = binary.Read(buf, binary.LittleEndian, header)
if err != nil {
return jerrors.Trace(err)
}
var bodyLen uint16
err = binary.Read(buf, binary.LittleEndian, &bodyLen)
if err != nil {
return jerrors.Trace(err)
}
body := make([]byte, bodyLen)
err = binary.Read(buf, binary.LittleEndian, body)
if err != nil {
return jerrors.Trace(err)
}
codec := Codecs[sz]
if codec == nil {
return jerrors.Errorf("can not find codec for %d", sz)
}
err = codec.Decode(header, &resp.header)
if err != nil {
return jerrors.Trace(err)
}
resp.body = body
return nil
}
func (resp *GettyRPCResponse) GetBody() []byte {
return resp.body.([]byte)
}
func (resp *GettyRPCResponse) GetHeader() interface{} {
return resp.header
}
////////////////////////////////////////////
// PendingResponse
////////////////////////////////////////////
type PendingResponse struct {
seq uint64
err error
start time.Time
readStart time.Time
callback AsyncCallback
reply interface{}
opts CallOptions
done chan struct{}
}
func NewPendingResponse() *PendingResponse {
return &PendingResponse{
start: time.Now(),
done: make(chan struct{}),
}
}
func (r PendingResponse) GetCallResponse() CallResponse {
return CallResponse{
Opts: r.opts,
Cause: r.err,
Start: r.start,
ReadStart: r.readStart,
Reply: r.reply,
}
}
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: codec.proto
/*
Package rpc is a generated protocol buffer package.
It is generated from these files:
codec.proto
It has these top-level messages:
GettyRPCRequestHeader
GettyRPCResponseHeader
*/
package rpc
import proto "github.com/gogo/protobuf/proto"
import fmt "fmt"
import math "math"
import _ "github.com/gogo/protobuf/gogoproto"
import strconv "strconv"
import strings "strings"
import reflect "reflect"
import io "io"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
type CallType int32
const (
CT_UNKOWN CallType = 0
CT_OneWay CallType = 1
CT_TwoWay CallType = 2
CT_TwoWayNoReply CallType = 3
)
var CallType_name = map[int32]string{
0: "CT_UNKOWN",
1: "CT_OneWay",
2: "CT_TwoWay",
3: "CT_TwoWayNoReply",
}
var CallType_value = map[string]int32{
"CT_UNKOWN": 0,
"CT_OneWay": 1,
"CT_TwoWay": 2,
"CT_TwoWayNoReply": 3,
}
func (x CallType) Enum() *CallType {
p := new(CallType)
*p = x
return p
}
func (x CallType) MarshalJSON() ([]byte, error) {
return proto.MarshalJSONEnum(CallType_name, int32(x))
}
func (x *CallType) UnmarshalJSON(data []byte) error {
value, err := proto.UnmarshalJSONEnum(CallType_value, data, "CallType")
if err != nil {
return err
}
*x = CallType(value)
return nil
}
func (CallType) EnumDescriptor() ([]byte, []int) { return fileDescriptorCodec, []int{0} }
type GettyRPCRequestHeader struct {
Service string `protobuf:"bytes,1,opt,name=Service" json:"Service"`
Method string `protobuf:"bytes,2,opt,name=Method" json:"Method"`
CallType CallType `protobuf:"varint,3,opt,name=CallType,enum=rpc.CallType" json:"CallType"`
}
func (m *GettyRPCRequestHeader) Reset() { *m = GettyRPCRequestHeader{} }
func (*GettyRPCRequestHeader) ProtoMessage() {}
func (*GettyRPCRequestHeader) Descriptor() ([]byte, []int) { return fileDescriptorCodec, []int{0} }
type GettyRPCResponseHeader struct {
Error string `protobuf:"bytes,1,opt,name=Error" json:"Error"`
}
func (m *GettyRPCResponseHeader) Reset() { *m = GettyRPCResponseHeader{} }
func (*GettyRPCResponseHeader) ProtoMessage() {}
func (*GettyRPCResponseHeader) Descriptor() ([]byte, []int) { return fileDescriptorCodec, []int{1} }
func init() {
proto.RegisterType((*GettyRPCRequestHeader)(nil), "rpc.GettyRPCRequestHeader")
proto.RegisterType((*GettyRPCResponseHeader)(nil), "rpc.GettyRPCResponseHeader")
proto.RegisterEnum("rpc.CallType", CallType_name, CallType_value)
}
func (x CallType) String() string {
s, ok := CallType_name[int32(x)]
if ok {
return s
}
return strconv.Itoa(int(x))
}
func (this *GettyRPCRequestHeader) VerboseEqual(that interface{}) error {
if that == nil {
if this == nil {
return nil
}
return fmt.Errorf("that == nil && this != nil")
}
that1, ok := that.(*GettyRPCRequestHeader)
if !ok {
that2, ok := that.(GettyRPCRequestHeader)
if ok {
that1 = &that2
} else {
return fmt.Errorf("that is not of type *GettyRPCRequestHeader")
}
}
if that1 == nil {
if this == nil {
return nil
}
return fmt.Errorf("that is type *GettyRPCRequestHeader but is nil && this != nil")
} else if this == nil {
return fmt.Errorf("that is type *GettyRPCRequestHeader but is not nil && this == nil")
}
if this.Service != that1.Service {
return fmt.Errorf("Service this(%v) Not Equal that(%v)", this.Service, that1.Service)
}
if this.Method != that1.Method {
return fmt.Errorf("Method this(%v) Not Equal that(%v)", this.Method, that1.Method)
}
if this.CallType != that1.CallType {
return fmt.Errorf("CallType this(%v) Not Equal that(%v)", this.CallType, that1.CallType)
}
return nil
}
func (this *GettyRPCRequestHeader) Equal(that interface{}) bool {
if that == nil {
if this == nil {
return true
}
return false
}
that1, ok := that.(*GettyRPCRequestHeader)
if !ok {
that2, ok := that.(GettyRPCRequestHeader)
if ok {
that1 = &that2
} else {
return false
}
}
if that1 == nil {
if this == nil {
return true
}
return false
} else if this == nil {
return false
}
if this.Service != that1.Service {
return false
}
if this.Method != that1.Method {
return false
}
if this.CallType != that1.CallType {
return false
}
return true
}
func (this *GettyRPCResponseHeader) VerboseEqual(that interface{}) error {
if that == nil {
if this == nil {
return nil
}
return fmt.Errorf("that == nil && this != nil")
}
that1, ok := that.(*GettyRPCResponseHeader)
if !ok {
that2, ok := that.(GettyRPCResponseHeader)
if ok {
that1 = &that2
} else {
return fmt.Errorf("that is not of type *GettyRPCResponseHeader")
}
}
if that1 == nil {
if this == nil {
return nil
}
return fmt.Errorf("that is type *GettyRPCResponseHeader but is nil && this != nil")
} else if this == nil {
return fmt.Errorf("that is type *GettyRPCResponseHeader but is not nil && this == nil")
}
if this.Error != that1.Error {
return fmt.Errorf("Error this(%v) Not Equal that(%v)", this.Error, that1.Error)
}
return nil
}
func (this *GettyRPCResponseHeader) Equal(that interface{}) bool {
if that == nil {
if this == nil {
return true
}
return false
}
that1, ok := that.(*GettyRPCResponseHeader)
if !ok {
that2, ok := that.(GettyRPCResponseHeader)
if ok {
that1 = &that2
} else {
return false
}
}
if that1 == nil {
if this == nil {
return true
}
return false
} else if this == nil {
return false
}
if this.Error != that1.Error {
return false
}
return true
}
func (this *GettyRPCRequestHeader) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 7)
s = append(s, "&rpc.GettyRPCRequestHeader{")
s = append(s, "Service: "+fmt.Sprintf("%#v", this.Service)+",\n")
s = append(s, "Method: "+fmt.Sprintf("%#v", this.Method)+",\n")
s = append(s, "CallType: "+fmt.Sprintf("%#v", this.CallType)+",\n")
s = append(s, "}")
return strings.Join(s, "")
}
func (this *GettyRPCResponseHeader) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 5)
s = append(s, "&rpc.GettyRPCResponseHeader{")
s = append(s, "Error: "+fmt.Sprintf("%#v", this.Error)+",\n")
s = append(s, "}")
return strings.Join(s, "")
}
func valueToGoStringCodec(v interface{}, typ string) string {
rv := reflect.ValueOf(v)
if rv.IsNil() {
return "nil"
}
pv := reflect.Indirect(rv).Interface()
return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv)
}
func (m *GettyRPCRequestHeader) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *GettyRPCRequestHeader) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
dAtA[i] = 0xa
i++
i = encodeVarintCodec(dAtA, i, uint64(len(m.Service)))
i += copy(dAtA[i:], m.Service)
dAtA[i] = 0x12
i++
i = encodeVarintCodec(dAtA, i, uint64(len(m.Method)))
i += copy(dAtA[i:], m.Method)
dAtA[i] = 0x18
i++
i = encodeVarintCodec(dAtA, i, uint64(m.CallType))
return i, nil
}
func (m *GettyRPCResponseHeader) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *GettyRPCResponseHeader) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
dAtA[i] = 0xa
i++
i = encodeVarintCodec(dAtA, i, uint64(len(m.Error)))
i += copy(dAtA[i:], m.Error)
return i, nil
}
func encodeFixed64Codec(dAtA []byte, offset int, v uint64) int {
dAtA[offset] = uint8(v)
dAtA[offset+1] = uint8(v >> 8)
dAtA[offset+2] = uint8(v >> 16)
dAtA[offset+3] = uint8(v >> 24)
dAtA[offset+4] = uint8(v >> 32)
dAtA[offset+5] = uint8(v >> 40)
dAtA[offset+6] = uint8(v >> 48)
dAtA[offset+7] = uint8(v >> 56)
return offset + 8
}
func encodeFixed32Codec(dAtA []byte, offset int, v uint32) int {
dAtA[offset] = uint8(v)
dAtA[offset+1] = uint8(v >> 8)
dAtA[offset+2] = uint8(v >> 16)
dAtA[offset+3] = uint8(v >> 24)
return offset + 4
}
func encodeVarintCodec(dAtA []byte, offset int, v uint64) int {
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return offset + 1
}
func (m *GettyRPCRequestHeader) Size() (n int) {
var l int
_ = l
l = len(m.Service)
n += 1 + l + sovCodec(uint64(l))
l = len(m.Method)
n += 1 + l + sovCodec(uint64(l))
n += 1 + sovCodec(uint64(m.CallType))
return n
}
func (m *GettyRPCResponseHeader) Size() (n int) {
var l int
_ = l
l = len(m.Error)
n += 1 + l + sovCodec(uint64(l))
return n
}
func sovCodec(x uint64) (n int) {
for {
n++
x >>= 7
if x == 0 {
break
}
}
return n
}
func sozCodec(x uint64) (n int) {
return sovCodec(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (this *GettyRPCRequestHeader) String() string {
if this == nil {
return "nil"
}
s := strings.Join([]string{`&GettyRPCRequestHeader{`,
`Service:` + fmt.Sprintf("%v", this.Service) + `,`,
`Method:` + fmt.Sprintf("%v", this.Method) + `,`,
`CallType:` + fmt.Sprintf("%v", this.CallType) + `,`,
`}`,
}, "")
return s
}
func (this *GettyRPCResponseHeader) String() string {
if this == nil {
return "nil"
}
s := strings.Join([]string{`&GettyRPCResponseHeader{`,
`Error:` + fmt.Sprintf("%v", this.Error) + `,`,
`}`,
}, "")
return s
}
func valueToStringCodec(v interface{}) string {
rv := reflect.ValueOf(v)
if rv.IsNil() {
return "nil"
}
pv := reflect.Indirect(rv).Interface()
return fmt.Sprintf("*%v", pv)
}
func (m *GettyRPCRequestHeader) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCodec
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: GettyRPCRequestHeader: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: GettyRPCRequestHeader: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Service", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCodec
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthCodec
}
postIndex := iNdEx + intStringLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Service = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Method", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCodec
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthCodec
}
postIndex := iNdEx + intStringLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Method = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 3:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field CallType", wireType)
}
m.CallType = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCodec
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.CallType |= (CallType(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
default:
iNdEx = preIndex
skippy, err := skipCodec(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthCodec
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *GettyRPCResponseHeader) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCodec
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: GettyRPCResponseHeader: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: GettyRPCResponseHeader: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Error", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCodec
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthCodec
}
postIndex := iNdEx + intStringLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Error = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipCodec(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthCodec
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipCodec(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowCodec
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowCodec
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
return iNdEx, nil
case 1:
iNdEx += 8
return iNdEx, nil
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowCodec
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
iNdEx += length
if length < 0 {
return 0, ErrInvalidLengthCodec
}
return iNdEx, nil
case 3:
for {
var innerWire uint64
var start int = iNdEx
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowCodec
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
innerWire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
innerWireType := int(innerWire & 0x7)
if innerWireType == 4 {
break
}
next, err := skipCodec(dAtA[start:])
if err != nil {
return 0, err
}
iNdEx = start + next
}
return iNdEx, nil
case 4:
return iNdEx, nil
case 5:
iNdEx += 4
return iNdEx, nil
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
}
panic("unreachable")
}
var (
ErrInvalidLengthCodec = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowCodec = fmt.Errorf("proto: integer overflow")
)
func init() { proto.RegisterFile("codec.proto", fileDescriptorCodec) }
var fileDescriptorCodec = []byte{
// 304 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x8e, 0x4f, 0x4b, 0x02, 0x41,
0x18, 0xc6, 0xe7, 0xd5, 0xfe, 0x39, 0x61, 0x2c, 0x4b, 0x85, 0x48, 0xbc, 0x89, 0x27, 0x09, 0x5a,
0x21, 0xfc, 0x04, 0x4a, 0x14, 0x84, 0x1a, 0x9b, 0xe1, 0x31, 0x74, 0x7c, 0x53, 0xc1, 0x9c, 0x69,
0x1c, 0x8b, 0xbd, 0x75, 0xea, 0xdc, 0xc7, 0xe8, 0xa3, 0x78, 0xf4, 0xd8, 0x29, 0xdc, 0xe9, 0xd2,
0xd1, 0x8f, 0x10, 0xad, 0xab, 0xd6, 0x6d, 0x7e, 0xbf, 0xe7, 0x19, 0xde, 0x87, 0xef, 0x0a, 0xd9,
0x21, 0xe1, 0x29, 0x2d, 0x8d, 0x74, 0x93, 0x5a, 0x89, 0xec, 0x69, 0xb7, 0x6f, 0x7a, 0xe3, 0xb6,
0x27, 0xe4, 0x43, 0xb1, 0x2b, 0xbb, 0xb2, 0x18, 0x65, 0xed, 0xf1, 0x7d, 0x44, 0x11, 0x44, 0xaf,
0xc5, 0x9f, 0xfc, 0x2b, 0xf0, 0x83, 0x0b, 0x32, 0x26, 0xf0, 0xaf, 0x2b, 0x3e, 0x3d, 0x8e, 0x69,
0x64, 0x2e, 0xa9, 0xd5, 0x21, 0xed, 0x22, 0xdf, 0xbe, 0x21, 0xfd, 0xd4, 0x17, 0x94, 0x81, 0x1c,
0x14, 0x52, 0xe5, 0x8d, 0xc9, 0xe7, 0x31, 0xf3, 0x97, 0xd2, 0x3d, 0xe2, 0x5b, 0x55, 0x32, 0x3d,
0xd9, 0xc9, 0x24, 0xfe, 0xc4, 0xb1, 0x73, 0x8b, 0x7c, 0xa7, 0xd2, 0x1a, 0x0c, 0x1a, 0x81, 0xa2,
0x4c, 0x32, 0x07, 0x85, 0xbd, 0xb3, 0xb4, 0xa7, 0x95, 0xf0, 0x96, 0x32, 0xae, 0xaf, 0x4a, 0xf9,
0x12, 0x3f, 0x5c, 0xef, 0x18, 0x29, 0x39, 0x1c, 0x51, 0x3c, 0x24, 0xcb, 0x37, 0xcf, 0xb5, 0x96,
0xfa, 0xdf, 0x8c, 0x85, 0x3a, 0xa9, 0xae, 0xcf, 0xb8, 0x69, 0x9e, 0xaa, 0x34, 0xee, 0x6e, 0x6b,
0x57, 0xf5, 0x66, 0xcd, 0x61, 0x31, 0xd6, 0x87, 0xd4, 0x6c, 0x05, 0x0e, 0xc4, 0xd8, 0x78, 0x96,
0xbf, 0x98, 0x70, 0xf7, 0xb9, 0xb3, 0xc2, 0x9a, 0xf4, 0x49, 0x0d, 0x02, 0x27, 0x59, 0x2e, 0x4d,
0x42, 0x64, 0xd3, 0x10, 0xd9, 0x47, 0x88, 0x6c, 0x16, 0x22, 0xcc, 0x43, 0x84, 0x17, 0x8b, 0xf0,
0x6e, 0x11, 0x26, 0x16, 0x61, 0x6a, 0x11, 0x66, 0x16, 0xe1, 0xdb, 0x22, 0x9b, 0x5b, 0x84, 0xb7,
0x2f, 0x64, 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, 0xa8, 0xe8, 0x87, 0x62, 0x85, 0x01, 0x00, 0x00,
}
package rpc
import (
"time"
)
import (
jerrors "github.com/juju/errors"
)
type (
GettySessionParam struct {
CompressEncoding bool `default:"false" yaml:"compress_encoding" json:"compress_encoding,omitempty"`
TcpNoDelay bool `default:"true" yaml:"tcp_no_delay" json:"tcp_no_delay,omitempty"`
TcpKeepAlive bool `default:"true" yaml:"tcp_keep_alive" json:"tcp_keep_alive,omitempty"`
KeepAlivePeriod string `default:"180s" yaml:"keep_alive_period" json:"keep_alive_period,omitempty"`
keepAlivePeriod time.Duration
TcpRBufSize int `default:"262144" yaml:"tcp_r_buf_size" json:"tcp_r_buf_size,omitempty"`
TcpWBufSize int `default:"65536" yaml:"tcp_w_buf_size" json:"tcp_w_buf_size,omitempty"`
PkgRQSize int `default:"1024" yaml:"pkg_rq_size" json:"pkg_rq_size,omitempty"`
PkgWQSize int `default:"1024" yaml:"pkg_wq_size" json:"pkg_wq_size,omitempty"`
TcpReadTimeout string `default:"1s" yaml:"tcp_read_timeout" json:"tcp_read_timeout,omitempty"`
tcpReadTimeout time.Duration
TcpWriteTimeout string `default:"5s" yaml:"tcp_write_timeout" json:"tcp_write_timeout,omitempty"`
tcpWriteTimeout time.Duration
WaitTimeout string `default:"7s" yaml:"wait_timeout" json:"wait_timeout,omitempty"`
waitTimeout time.Duration
MaxMsgLen int `default:"1024" yaml:"max_msg_len" json:"max_msg_len,omitempty"`
SessionName string `default:"rpc" yaml:"session_name" json:"session_name,omitempty"`
}
// Config holds supported types by the multiconfig package
ServerConfig struct {
// local address
AppName string `default:"rpc-server" yaml:"app_name" json:"app_name,omitempty"`
Host string `default:"127.0.0.1" yaml:"host" json:"host,omitempty"`
Ports []string `yaml:"ports" json:"ports,omitempty"` // `default:["10000"]`
ProfilePort int `default:"10086" yaml:"profile_port" json:"profile_port,omitempty"`
// session
SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"`
sessionTimeout time.Duration
SessionNumber int `default:"1000" yaml:"session_number" json:"session_number,omitempty"`
// app
FailFastTimeout string `default:"5s" yaml:"fail_fast_timeout" json:"fail_fast_timeout,omitempty"`
failFastTimeout time.Duration
// session tcp parameters
GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"`
}
// Config holds supported types by the multiconfig package
ClientConfig struct {
// local address
AppName string `default:"rpc-client" yaml:"app_name" json:"app_name,omitempty"`
Host string `default:"127.0.0.1" yaml:"host" json:"host,omitempty"`
ProfilePort int `default:"10086" yaml:"profile_port" json:"profile_port,omitempty"`
// session pool
ConnectionNum int `default:"16" yaml:"connection_number" json:"connection_number,omitempty"`
// heartbeat
HeartbeatPeriod string `default:"15s" yaml:"heartbeat_period" json:"heartbeat_period,omitempty"`
heartbeatPeriod time.Duration
// session
SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"`
sessionTimeout time.Duration
// app
FailFastTimeout string `default:"5s" yaml:"fail_fast_timeout" json:"fail_fast_timeout,omitempty"`
failFastTimeout time.Duration
// Connection Pool
PoolSize int `default:"2" yaml:"pool_size" json:"pool_size,omitempty"`
PoolTTL int `default:"180" yaml:"pool_ttl" json:"pool_ttl,omitempty"`
// session tcp parameters
GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"`
}
)
func (c *GettySessionParam) CheckValidity() error {
var err error
if c.keepAlivePeriod, err = time.ParseDuration(c.KeepAlivePeriod); err != nil {
return jerrors.Annotatef(err, "time.ParseDuration(KeepAlivePeriod{%#v})", c.KeepAlivePeriod)
}
if c.tcpReadTimeout, err = time.ParseDuration(c.TcpReadTimeout); err != nil {
return jerrors.Annotatef(err, "time.ParseDuration(TcpReadTimeout{%#v})", c.TcpReadTimeout)
}
if c.tcpWriteTimeout, err = time.ParseDuration(c.TcpWriteTimeout); err != nil {
return jerrors.Annotatef(err, "time.ParseDuration(TcpWriteTimeout{%#v})", c.TcpWriteTimeout)
}
if c.waitTimeout, err = time.ParseDuration(c.WaitTimeout); err != nil {
return jerrors.Annotatef(err, "time.ParseDuration(WaitTimeout{%#v})", c.WaitTimeout)
}
return nil
}
func (c *ClientConfig) CheckValidity() error {
var err error
if c.heartbeatPeriod, err = time.ParseDuration(c.HeartbeatPeriod); err != nil {
return jerrors.Annotatef(err, "time.ParseDuration(HeartbeatPeroid{%#v})", c.HeartbeatPeriod)
}
if c.sessionTimeout, err = time.ParseDuration(c.SessionTimeout); err != nil {
return jerrors.Annotatef(err, "time.ParseDuration(SessionTimeout{%#v})", c.SessionTimeout)
}
if c.failFastTimeout, err = time.ParseDuration(c.FailFastTimeout); err != nil {
return jerrors.Annotatef(err, "time.ParseDuration(FailFastTimeout{%#v})", c.FailFastTimeout)
}
return jerrors.Trace(c.GettySessionParam.CheckValidity())
}
func (c *ServerConfig) CheckValidity() error {
var err error
if c.sessionTimeout, err = time.ParseDuration(c.SessionTimeout); err != nil {
return jerrors.Annotatef(err, "time.ParseDuration(SessionTimeout{%#v})", c.SessionTimeout)
}
if c.failFastTimeout, err = time.ParseDuration(c.FailFastTimeout); err != nil {
return jerrors.Annotatef(err, "time.ParseDuration(FailFastTimeout{%#v})", c.FailFastTimeout)
}
return jerrors.Trace(c.GettySessionParam.CheckValidity())
}
package rpc
import (
"reflect"
"sync"
"time"
)
import (
"github.com/AlexStocks/getty"
jerrors "github.com/juju/errors"
log "github.com/AlexStocks/log4go"
)
var (
errTooManySessions = jerrors.New("too many sessions")
)
type rpcSession struct {
session getty.Session
reqNum int32
}
////////////////////////////////////////////
// RpcServerHandler
////////////////////////////////////////////
type RpcServerHandler struct {
maxSessionNum int
sessionTimeout time.Duration
sessionMap map[getty.Session]*rpcSession
rwlock sync.RWMutex
}
func NewRpcServerHandler(maxSessionNum int, sessionTimeout time.Duration) *RpcServerHandler {
return &RpcServerHandler{
maxSessionNum: maxSessionNum,
sessionTimeout: sessionTimeout,
sessionMap: make(map[getty.Session]*rpcSession),
}
}
func (h *RpcServerHandler) OnOpen(session getty.Session) error {
var err error
h.rwlock.RLock()
if h.maxSessionNum <= len(h.sessionMap) {
err = errTooManySessions
}
h.rwlock.RUnlock()
if err != nil {
return jerrors.Trace(err)
}
log.Info("got session:%s", session.Stat())
h.rwlock.Lock()
h.sessionMap[session] = &rpcSession{session: session}
h.rwlock.Unlock()
return nil
}
func (h *RpcServerHandler) OnError(session getty.Session, err error) {
log.Info("session{%s} got error{%v}, will be closed.", session.Stat(), err)
h.rwlock.Lock()
delete(h.sessionMap, session)
h.rwlock.Unlock()
}
func (h *RpcServerHandler) OnClose(session getty.Session) {
log.Info("session{%s} is closing......", session.Stat())
h.rwlock.Lock()
delete(h.sessionMap, session)
h.rwlock.Unlock()
}
func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
h.rwlock.Lock()
if _, ok := h.sessionMap[session]; ok {
h.sessionMap[session].reqNum++
}
h.rwlock.Unlock()
req, ok := pkg.(GettyRPCRequestPackage)
if !ok {
log.Error("illegal package{%#v}", pkg)
return
}
// heartbeat
if req.H.Command == gettyCmdHbRequest {
h.replyCmd(session, req, gettyCmdHbResponse, "")
return
}
if req.header.CallType == CT_OneWay {
function := req.methodType.method.Func
function.Call([]reflect.Value{req.service.rcvr, req.argv})
return
}
if req.header.CallType == CT_TwoWayNoReply {
h.replyCmd(session, req, gettyCmdRPCResponse, "")
function := req.methodType.method.Func
function.Call([]reflect.Value{req.service.rcvr, req.argv, req.replyv})
return
}
err := h.callService(session, req, req.service, req.methodType, req.argv, req.replyv)
if err != nil {
log.Error("h.callService(session:%#v, req:%#v) = %s", session, req, jerrors.ErrorStack(err))
}
}
func (h *RpcServerHandler) OnCron(session getty.Session) {
var (
flag bool
active time.Time
)
h.rwlock.RLock()
if _, ok := h.sessionMap[session]; ok {
active = session.GetActive()
if h.sessionTimeout.Nanoseconds() < time.Since(active).Nanoseconds() {
flag = true
log.Warn("session{%s} timeout{%s}, reqNum{%d}",
session.Stat(), time.Since(active).String(), h.sessionMap[session].reqNum)
}
}
h.rwlock.RUnlock()
if flag {
h.rwlock.Lock()
delete(h.sessionMap, session)
h.rwlock.Unlock()
session.Close()
}
}
func (h *RpcServerHandler) replyCmd(session getty.Session, req GettyRPCRequestPackage, cmd gettyCommand, err string) {
resp := GettyPackage{
H: req.H,
}
resp.H.Command = cmd
if len(err) != 0 {
resp.H.Code = GettyFail
resp.B = &GettyRPCResponse{
header: GettyRPCResponseHeader{
Error: err,
},
}
}
session.WritePkg(resp, 5*time.Second)
}
func (h *RpcServerHandler) callService(session getty.Session, req GettyRPCRequestPackage,
service *service, methodType *methodType, argv, replyv reflect.Value) error {
function := methodType.method.Func
returnValues := function.Call([]reflect.Value{service.rcvr, argv, replyv})
errInter := returnValues[0].Interface()
if errInter != nil {
h.replyCmd(session, req, gettyCmdRPCResponse, errInter.(error).Error())
return nil
}
resp := GettyPackage{
H: req.H,
}
resp.H.Code = GettyOK
resp.H.Command = gettyCmdRPCResponse
resp.B = &GettyRPCResponse{
body: replyv.Interface(),
}
return jerrors.Trace(session.WritePkg(resp, 5*time.Second))
}
////////////////////////////////////////////
// RpcClientHandler
////////////////////////////////////////////
type RpcClientHandler struct {
conn *gettyRPCClient
}
func NewRpcClientHandler(client *gettyRPCClient) *RpcClientHandler {
return &RpcClientHandler{conn: client}
}
func (h *RpcClientHandler) OnOpen(session getty.Session) error {
h.conn.addSession(session)
return nil
}
func (h *RpcClientHandler) OnError(session getty.Session, err error) {
log.Info("session{%s} got error{%v}, will be closed.", session.Stat(), err)
h.conn.removeSession(session)
}
func (h *RpcClientHandler) OnClose(session getty.Session) {
log.Info("session{%s} is closing......", session.Stat())
h.conn.removeSession(session)
}
func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
p, ok := pkg.(*GettyRPCResponsePackage)
if !ok {
log.Error("illegal package{%#v}", pkg)
return
}
log.Debug("get rpc response{%#v}", p)
h.conn.updateSession(session)
pendingResponse := h.conn.pool.rpcClient.removePendingResponse(p.H.Sequence)
if pendingResponse == nil {
return
}
if p.H.Command == gettyCmdHbResponse {
return
}
if p.H.Code == GettyFail && len(p.header.Error) > 0 {
pendingResponse.err = jerrors.New(p.header.Error)
if pendingResponse.callback == nil {
pendingResponse.done <- struct{}{}
} else {
pendingResponse.callback(pendingResponse.GetCallResponse())
}
return
}
codec := Codecs[p.H.CodecType]
if codec == nil {
pendingResponse.err = jerrors.Errorf("can not find codec for %d", p.H.CodecType)
pendingResponse.done <- struct{}{}
return
}
err := codec.Decode(p.body, pendingResponse.reply)
pendingResponse.err = err
if pendingResponse.callback == nil {
pendingResponse.done <- struct{}{}
} else {
pendingResponse.callback(pendingResponse.GetCallResponse())
}
}
func (h *RpcClientHandler) OnCron(session getty.Session) {
rpcSession, err := h.conn.getClientRpcSession(session)
if err != nil {
log.Error("client.getClientSession(session{%s}) = error{%s}",
session.Stat(), jerrors.ErrorStack(err))
return
}
if h.conn.pool.rpcClient.conf.sessionTimeout.Nanoseconds() < time.Since(session.GetActive()).Nanoseconds() {
log.Warn("session{%s} timeout{%s}, reqNum{%d}",
session.Stat(), time.Since(session.GetActive()).String(), rpcSession.reqNum)
h.conn.removeSession(session) // -> h.conn.close() -> h.conn.pool.remove(h.conn)
return
}
codecType := GetCodecType(h.conn.protocol)
h.conn.pool.rpcClient.heartbeat(session, codecType)
}
package rpc
import (
"fmt"
"math/rand"
"net"
"strings"
"sync"
"time"
)
import (
"github.com/AlexStocks/getty"
log "github.com/AlexStocks/log4go"
jerrors "github.com/juju/errors"
)
type gettyRPCClient struct {
once sync.Once
protocol string
addr string
created int64 // 为0,则说明没有被创建或者被销毁了
pool *gettyRPCClientPool
lock sync.RWMutex
gettyClient getty.Client
sessions []*rpcSession
}
var (
errClientPoolClosed = jerrors.New("client pool closed")
)
func newGettyRPCClientConn(pool *gettyRPCClientPool, protocol, addr string) (*gettyRPCClient, error) {
c := &gettyRPCClient{
protocol: protocol,
addr: addr,
pool: pool,
gettyClient: getty.NewTCPClient(
getty.WithServerAddress(addr),
getty.WithConnectionNumber((int)(pool.rpcClient.conf.ConnectionNum)),
),
}
c.gettyClient.RunEventLoop(c.newSession)
idx := 1
for {
idx++
if c.isAvailable() {
break
}
if idx > 5000 {
return nil, jerrors.New(fmt.Sprintf("failed to create client connection to %s in 5 seconds", addr))
}
time.Sleep(1e6)
}
log.Info("client init ok")
c.created = time.Now().Unix()
return c, nil
}
func (c *gettyRPCClient) newSession(session getty.Session) error {
var (
ok bool
tcpConn *net.TCPConn
conf ClientConfig
)
conf = c.pool.rpcClient.conf
if conf.GettySessionParam.CompressEncoding {
session.SetCompressType(getty.CompressZip)
}
if tcpConn, ok = session.Conn().(*net.TCPConn); !ok {
panic(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection\n", session.Stat(), session.Conn()))
}
tcpConn.SetNoDelay(conf.GettySessionParam.TcpNoDelay)
tcpConn.SetKeepAlive(conf.GettySessionParam.TcpKeepAlive)
if conf.GettySessionParam.TcpKeepAlive {
tcpConn.SetKeepAlivePeriod(conf.GettySessionParam.keepAlivePeriod)
}
tcpConn.SetReadBuffer(conf.GettySessionParam.TcpRBufSize)
tcpConn.SetWriteBuffer(conf.GettySessionParam.TcpWBufSize)
session.SetName(conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewRpcClientPackageHandler())
session.SetEventListener(NewRpcClientHandler(c))
session.SetRQLen(conf.GettySessionParam.PkgRQSize)
session.SetWQLen(conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)
session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(conf.heartbeatPeriod.Nanoseconds() / 1e6))
session.SetWaitTime(conf.GettySessionParam.waitTimeout)
log.Debug("client new session:%s\n", session.Stat())
return nil
}
func (c *gettyRPCClient) selectSession() getty.Session {
c.lock.RLock()
defer c.lock.RUnlock()
count := len(c.sessions)
if count == 0 {
return nil
}
return c.sessions[rand.Int31n(int32(count))].session
}
func (c *gettyRPCClient) addSession(session getty.Session) {
log.Debug("add session{%s}", session.Stat())
if session == nil {
return
}
c.lock.Lock()
c.sessions = append(c.sessions, &rpcSession{session: session})
c.lock.Unlock()
}
func (c *gettyRPCClient) removeSession(session getty.Session) {
if session == nil {
return
}
c.lock.Lock()
defer c.lock.Unlock()
if c.sessions == nil {
return
}
for i, s := range c.sessions {
if s.session == session {
c.sessions = append(c.sessions[:i], c.sessions[i+1:]...)
log.Debug("delete session{%s}, its index{%d}", session.Stat(), i)
break
}
}
log.Info("after remove session{%s}, left session number:%d", session.Stat(), len(c.sessions))
if len(c.sessions) == 0 {
c.close() // -> pool.remove(c)
}
}
func (c *gettyRPCClient) updateSession(session getty.Session) {
if session == nil {
return
}
c.lock.Lock()
defer c.lock.Unlock()
if c.sessions == nil {
return
}
for i, s := range c.sessions {
if s.session == session {
c.sessions[i].reqNum++
break
}
}
}
func (c *gettyRPCClient) getClientRpcSession(session getty.Session) (rpcSession, error) {
var (
err error
rpcSession rpcSession
)
c.lock.Lock()
defer c.lock.Unlock()
if c.sessions == nil {
return rpcSession, errClientClosed
}
err = errSessionNotExist
for _, s := range c.sessions {
if s.session == session {
rpcSession = *s
err = nil
break
}
}
return rpcSession, jerrors.Trace(err)
}
func (c *gettyRPCClient) isAvailable() bool {
if c.selectSession() == nil {
return false
}
return true
}
func (c *gettyRPCClient) close() error {
err := jerrors.Errorf("close gettyRPCClient{%#v} again", c)
c.once.Do(func() {
// delete @c from client pool
c.pool.remove(c)
for _, s := range c.sessions {
log.Info("close client session{%s, last active:%s, request number:%d}",
s.session.Stat(), s.session.GetActive().String(), s.reqNum)
s.session.Close()
}
c.gettyClient.Close()
c.gettyClient = nil
c.sessions = c.sessions[:0]
c.created = 0
err = nil
})
return err
}
type gettyRPCClientPool struct {
rpcClient *Client
size int // []*gettyRPCClient数组的size
ttl int64 // 每个gettyRPCClient的有效期时间. pool对象会在getConn时执行ttl检查
sync.Mutex
connMap map[string][]*gettyRPCClient // 从[]*gettyRPCClient 可见key是连接地址,而value是对应这个地址的连接数组
}
func newGettyRPCClientConnPool(rpcClient *Client, size int, ttl time.Duration) *gettyRPCClientPool {
return &gettyRPCClientPool{
rpcClient: rpcClient,
size: size,
ttl: int64(ttl.Seconds()),
connMap: make(map[string][]*gettyRPCClient),
}
}
func (p *gettyRPCClientPool) close() {
p.Lock()
connMap := p.connMap
p.connMap = nil
p.Unlock()
for _, connArray := range connMap {
for _, conn := range connArray {
conn.close()
}
}
}
func (p *gettyRPCClientPool) getConn(protocol, addr string) (*gettyRPCClient, error) {
var builder strings.Builder
builder.WriteString(addr)
builder.WriteString("@")
builder.WriteString(protocol)
key := builder.String()
p.Lock()
defer p.Unlock()
if p.connMap == nil {
return nil, errClientPoolClosed
}
connArray := p.connMap[key]
now := time.Now().Unix()
for len(connArray) > 0 {
conn := connArray[len(connArray)-1]
connArray = connArray[:len(connArray)-1]
p.connMap[key] = connArray
if d := now - conn.created; d > p.ttl {
conn.close() // -> pool.remove(c)
continue
}
return conn, nil
}
// create new conn
return newGettyRPCClientConn(p, protocol, addr)
}
func (p *gettyRPCClientPool) release(conn *gettyRPCClient, err error) {
if conn == nil || conn.created == 0 {
return
}
if err != nil {
conn.close()
return
}
var builder strings.Builder
builder.WriteString(conn.addr)
builder.WriteString("@")
builder.WriteString(conn.protocol)
key := builder.String()
p.Lock()
defer p.Unlock()
if p.connMap == nil {
return
}
connArray := p.connMap[key]
if len(connArray) >= p.size {
conn.close()
return
}
p.connMap[key] = append(connArray, conn)
}
func (p *gettyRPCClientPool) remove(conn *gettyRPCClient) {
if conn == nil || conn.created == 0 {
return
}
var builder strings.Builder
builder.WriteString(conn.addr)
builder.WriteString("@")
builder.WriteString(conn.protocol)
key := builder.String()
p.Lock()
defer p.Unlock()
if p.connMap == nil {
return
}
connArray := p.connMap[key]
if len(connArray) > 0 {
for idx, c := range connArray {
if conn == c {
p.connMap[key] = append(connArray[:idx], connArray[idx+1:]...)
break
}
}
}
}
syntax = "proto2";
package rpc;
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
option (gogoproto.gostring_all) = true;
option (gogoproto.equal_all) = true;
option (gogoproto.verbose_equal_all) = true;
// option (gogoproto.goproto_stringer_all) = false;
// option (gogoproto.stringer_all) = true;
// option (gogoproto.populate_all) = true;
// option (gogoproto.testgen_all) = true;
// option (gogoproto.benchgen_all) = true;
option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
option (gogoproto.goproto_getters_all) = false;
option (gogoproto.goproto_enum_prefix_all) = false;
//////////////////////////////////////////
// Request Header
//////////////////////////////////////////
enum CallType {
CT_UNKOWN = 0;
CT_OneWay = 1;
CT_TwoWay = 2;
CT_TwoWayNoReply = 3;
}
message GettyRPCRequestHeader {
optional string Service = 1 [(gogoproto.nullable) = false];
optional string Method = 2 [(gogoproto.nullable) = false];
optional CallType CallType = 3 [(gogoproto.nullable) = false];
}
message GettyRPCResponseHeader {
optional string Error = 1 [(gogoproto.nullable) = false];
}
#!/usr/bin/env bash
# ******************************************************
# DESC :
# AUTHOR : Alex Stocks
# VERSION : 1.0
# LICENCE : Apache License 2.0
# EMAIL : alexstocks@foxmail.com
# MOD : 2017-09-04 22:53
# FILE : pb.sh
# ******************************************************
# descriptor.proto
gopath=~/test/golang/lib/src/github.com/gogo/protobuf/protobuf
# If you are using any gogo.proto extensions you will need to specify the
# proto_path to include the descriptor.proto and gogo.proto.
# gogo.proto is located in github.com/gogo/protobuf/gogoproto
gogopath=~/test/golang/lib/src/
# protoc -I=$gopath:$gogopath:/Users/alex/test/golang/lib/src/github.com/AlexStocks/goext/database/redis/:./ --gogoslick_out=Mredis_meta.proto="github.com/AlexStocks/goext/database/redis":../app/ cluster_meta.proto
# protoc -I=$gopath:$gogopath:/Users/alex/test/golang/lib/src/github.com/AlexStocks/goext/database/redis/:./ --gogoslick_out=Mredis_meta.proto="github.com/AlexStocks/goext/database/redis":../app/ response.proto
# protoc -I=$gopath:$gogopath:./ --gogoslick_out=Mrole.proto="github.com/AlexStocks/goext/database/registry":./src/ service.proto
protoc -I=$gopath:$gogopath:./ --gogoslick_out=../ codec.proto
package rpc
import (
"bytes"
"reflect"
)
import (
"github.com/AlexStocks/getty"
log "github.com/AlexStocks/log4go"
jerrors "github.com/juju/errors"
)
////////////////////////////////////////////
// RpcServerPackageHandler
////////////////////////////////////////////
type RpcServerPackageHandler struct {
server *Server
}
func NewRpcServerPackageHandler(server *Server) *RpcServerPackageHandler {
return &RpcServerPackageHandler{
server: server,
}
}
func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
pkg := &GettyPackage{
B: NewGettyRPCRequest(),
}
buf := bytes.NewBuffer(data)
length, err := pkg.Unmarshal(buf)
if err != nil {
if jerrors.Cause(err) == ErrNotEnoughStream {
return nil, 0, nil
}
return nil, 0, jerrors.Trace(err)
}
req := GettyRPCRequestPackage{H: pkg.H}
if pkg.H.Command == gettyCmdHbRequest {
return req, length, nil
}
req.header = pkg.B.GetHeader().(GettyRPCRequestHeader)
// get service & method
req.service = p.server.serviceMap[req.header.Service]
if req.service != nil {
req.methodType = req.service.method[req.header.Method]
}
if req.service == nil {
return nil, 0, jerrors.Errorf("request service is nil")
}
if req.methodType == nil {
return nil, 0, jerrors.Errorf("request method is nil")
}
// get args
argIsValue := false
if req.methodType.ArgType.Kind() == reflect.Ptr {
req.argv = reflect.New(req.methodType.ArgType.Elem())
} else {
req.argv = reflect.New(req.methodType.ArgType)
argIsValue = true
}
codec := Codecs[req.H.CodecType]
if codec == nil {
return nil, 0, jerrors.Errorf("can not find codec for %d", req.H.CodecType)
}
err = codec.Decode(pkg.B.GetBody(), req.argv.Interface())
if err != nil {
return nil, 0, jerrors.Trace(err)
}
if argIsValue {
req.argv = req.argv.Elem()
}
// get reply
if req.methodType.ReplyType != nil {
req.replyv = reflect.New(req.methodType.ReplyType.Elem())
}
return req, length, nil
}
func (p *RpcServerPackageHandler) Write(ss getty.Session, pkg interface{}) error {
resp, ok := pkg.(GettyPackage)
if !ok {
log.Error("illegal pkg:%+v\n", pkg)
return jerrors.New("invalid rpc response")
}
buf, err := resp.Marshal()
if err != nil {
log.Warn("binary.Write(resp{%#v}) = err{%#v}", resp, err)
return jerrors.Trace(err)
}
return jerrors.Trace(ss.WriteBytes(buf.Bytes()))
}
////////////////////////////////////////////
// RpcClientPackageHandler
////////////////////////////////////////////
type RpcClientPackageHandler struct {
}
func NewRpcClientPackageHandler() *RpcClientPackageHandler {
return &RpcClientPackageHandler{}
}
func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
pkg := &GettyPackage{
B: NewGettyRPCResponse(),
}
buf := bytes.NewBuffer(data)
length, err := pkg.Unmarshal(buf)
if err != nil {
if err == ErrNotEnoughStream {
return nil, 0, nil
}
return nil, 0, jerrors.Trace(err)
}
resp := &GettyRPCResponsePackage{
H: pkg.H,
header: pkg.B.GetHeader().(GettyRPCResponseHeader),
}
if pkg.H.Command != gettyCmdHbResponse {
resp.body = pkg.B.GetBody()
}
return resp, length, nil
}
func (p *RpcClientPackageHandler) Write(ss getty.Session, pkg interface{}) error {
req, ok := pkg.(GettyPackage)
if !ok {
log.Error("illegal pkg:%+v\n", pkg)
return jerrors.New("invalid rpc request")
}
buf, err := req.Marshal()
if err != nil {
log.Warn("binary.Write(req{%#v}) = err{%#v}", req, jerrors.ErrorStack(err))
return jerrors.Trace(err)
}
return jerrors.Trace(ss.WriteBytes(buf.Bytes()))
}
package rpc
import (
"reflect"
"sync"
"unicode"
"unicode/utf8"
)
import (
log "github.com/AlexStocks/log4go"
)
var (
typeOfError = reflect.TypeOf((*error)(nil)).Elem()
)
type GettyRPCService interface {
Service() string // Service Interface
Version() string
}
type methodType struct {
sync.Mutex
method reflect.Method
ArgType reflect.Type
ReplyType reflect.Type
}
type service struct {
name string
rcvr reflect.Value
typ reflect.Type
method map[string]*methodType
}
// Is this an exported - upper case - name
func isExported(name string) bool {
rune, _ := utf8.DecodeRuneInString(name)
return unicode.IsUpper(rune)
}
// Is this type exported or a builtin?
func isExportedOrBuiltinType(t reflect.Type) bool {
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
// PkgPath will be non-empty even for an exported type,
// so we need to check the type name as well.
return isExported(t.Name()) || t.PkgPath() == ""
}
// suitableMethods returns suitable Rpc methods of typ
func suitableMethods(typ reflect.Type) map[string]*methodType {
methods := make(map[string]*methodType)
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
mtype := method.Type
mname := method.Name
// Method must be exported.
if method.PkgPath != "" {
continue
}
// service Method needs three ins: receiver, *args, *reply.
// notify Method needs two ins: receiver, *args.
mInNum := mtype.NumIn()
if mInNum != 2 && mInNum != 3 {
log.Warn("method %s has wrong number of ins %d which should be "+
"2(notify method) or 3(serive method)", mname, mtype.NumIn())
continue
}
// First arg need not be a pointer.
argType := mtype.In(1)
if !isExportedOrBuiltinType(argType) {
log.Error("method{%s} argument type not exported{%v}", mname, argType)
continue
}
var replyType reflect.Type
if mInNum == 3 {
// Second arg must be a pointer.
replyType = mtype.In(2)
if replyType.Kind() != reflect.Ptr {
log.Error("method{%s} reply type not a pointer{%v}", mname, replyType)
continue
}
// Reply type must be exported.
if !isExportedOrBuiltinType(replyType) {
log.Error("method{%s} reply type not exported{%v}", mname, replyType)
continue
}
}
// Method needs one out.
if mtype.NumOut() != 1 {
log.Error("method{%s} has wrong number of out parameters{%d}", mname, mtype.NumOut())
continue
}
// The return type of the method must be error.
if returnType := mtype.Out(0); returnType != typeOfError {
log.Error("method{%s}'s return type{%s} is not error", mname, returnType.String())
continue
}
methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
}
return methods
}
package rpc
import (
"fmt"
"net"
"reflect"
)
import (
"github.com/AlexStocks/getty"
"github.com/AlexStocks/goext/net"
log "github.com/AlexStocks/log4go"
jerrors "github.com/juju/errors"
)
type Server struct {
conf ServerConfig
serviceMap map[string]*service
tcpServerList []getty.Server
}
func NewServer(conf *ServerConfig) (*Server, error) {
if err := conf.CheckValidity(); err != nil {
return nil, jerrors.Trace(err)
}
s := &Server{
serviceMap: make(map[string]*service),
conf: *conf,
}
return s, nil
}
func (s *Server) Register(rcvr GettyRPCService) error {
svc := &service{
typ: reflect.TypeOf(rcvr),
rcvr: reflect.ValueOf(rcvr),
name: reflect.Indirect(reflect.ValueOf(rcvr)).Type().Name(),
// Install the methods
method: suitableMethods(reflect.TypeOf(rcvr)),
}
if svc.name == "" {
s := "rpc.Register: no service name for type " + svc.typ.String()
log.Error(s)
return jerrors.New(s)
}
if !isExported(svc.name) {
s := "rpc.Register: type " + svc.name + " is not exported"
log.Error(s)
return jerrors.New(s)
}
if _, present := s.serviceMap[svc.name]; present {
return jerrors.New("rpc: service already defined: " + svc.name)
}
if len(svc.method) == 0 {
// To help the user, see if a pointer receiver would work.
method := suitableMethods(reflect.PtrTo(svc.typ))
str := "rpc.Register: type " + svc.name + " has no exported methods of suitable type"
if len(method) != 0 {
str = "rpc.Register: type " + svc.name + " has no exported methods of suitable type (" +
"hint: pass a pointer to value of that type)"
}
log.Error(str)
return jerrors.New(str)
}
s.serviceMap[svc.name] = svc
return nil
}
func (s *Server) newSession(session getty.Session) error {
var (
ok bool
tcpConn *net.TCPConn
)
if s.conf.GettySessionParam.CompressEncoding {
session.SetCompressType(getty.CompressZip)
}
if tcpConn, ok = session.Conn().(*net.TCPConn); !ok {
panic(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection\n", session.Stat(), session.Conn()))
}
tcpConn.SetNoDelay(s.conf.GettySessionParam.TcpNoDelay)
tcpConn.SetKeepAlive(s.conf.GettySessionParam.TcpKeepAlive)
if s.conf.GettySessionParam.TcpKeepAlive {
tcpConn.SetKeepAlivePeriod(s.conf.GettySessionParam.keepAlivePeriod)
}
tcpConn.SetReadBuffer(s.conf.GettySessionParam.TcpRBufSize)
tcpConn.SetWriteBuffer(s.conf.GettySessionParam.TcpWBufSize)
session.SetName(s.conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(s.conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewRpcServerPackageHandler(s))
session.SetEventListener(NewRpcServerHandler(s.conf.SessionNumber, s.conf.sessionTimeout))
session.SetRQLen(s.conf.GettySessionParam.PkgRQSize)
session.SetWQLen(s.conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(s.conf.GettySessionParam.tcpReadTimeout)
session.SetWriteTimeout(s.conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(s.conf.sessionTimeout.Nanoseconds() / 1e6))
session.SetWaitTime(s.conf.GettySessionParam.waitTimeout)
log.Debug("app accepts new session:%s\n", session.Stat())
return nil
}
func (s *Server) Start() {
var (
addr string
portList []string
tcpServer getty.Server
)
portList = s.conf.Ports
if len(portList) == 0 {
panic("portList is nil")
}
for _, port := range portList {
addr = gxnet.HostAddress2(s.conf.Host, port)
tcpServer = getty.NewTCPServer(
getty.WithLocalAddress(addr),
)
tcpServer.RunEventLoop(s.newSession)
log.Debug("s bind addr{%s} ok!", addr)
s.tcpServerList = append(s.tcpServerList, tcpServer)
}
}
func (s *Server) Stop() {
list := s.tcpServerList
s.tcpServerList = nil
if list != nil {
for _, tcpServer := range list {
tcpServer.Close()
}
}
}
package rpc
import (
"math/rand"
"sync"
"time"
)
var (
seededIDGen = rand.New(rand.NewSource(time.Now().UnixNano()))
// The golang rand generators are *not* intrinsically thread-safe.
seededIDLock sync.Mutex
)
func randomID() uint64 {
seededIDLock.Lock()
defer seededIDLock.Unlock()
return uint64(seededIDGen.Int63())
}
...@@ -24,7 +24,7 @@ import ( ...@@ -24,7 +24,7 @@ import (
import ( import (
"github.com/AlexStocks/goext/net" "github.com/AlexStocks/goext/net"
"github.com/AlexStocks/goext/time" "github.com/AlexStocks/goext/time"
log "github.com/AlexStocks/log4go" log "github.com/dubbogo/log4go"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
jerrors "github.com/juju/errors" jerrors "github.com/juju/errors"
) )
......
...@@ -20,7 +20,7 @@ import ( ...@@ -20,7 +20,7 @@ import (
) )
import ( import (
log "github.com/AlexStocks/log4go" log "github.com/dubbogo/log4go"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
jerrors "github.com/juju/errors" jerrors "github.com/juju/errors"
) )
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment