Commit e3cfbea1 authored by AlexStocks's avatar AlexStocks

Add: protobuf

parent 3ba63bda
...@@ -115,7 +115,6 @@ type Connection interface { ...@@ -115,7 +115,6 @@ type Connection interface {
var ( var (
ErrSessionClosed = errors.New("session Already Closed") ErrSessionClosed = errors.New("session Already Closed")
ErrSessionBlocked = errors.New("session Full Blocked") ErrSessionBlocked = errors.New("session Full Blocked")
ErrMsgTooLong = errors.New("Message Too Long")
ErrNullPeerAddr = errors.New("peer address is nil") ErrNullPeerAddr = errors.New("peer address is nil")
) )
......
...@@ -57,9 +57,9 @@ func (c *Client) Call(typ CodecType, addr, service, method string, args interfac ...@@ -57,9 +57,9 @@ func (c *Client) Call(typ CodecType, addr, service, method string, args interfac
b := &GettyRPCRequest{} b := &GettyRPCRequest{}
b.header.Service = service b.header.Service = service
b.header.Method = method b.header.Method = method
b.header.CallType = gettyTwoWay b.header.CallType = CT_TwoWay
if reply == nil { if reply == nil {
b.header.CallType = gettyTwoWayNoReply b.header.CallType = CT_TwoWayNoReply
} }
b.body = args b.body = args
......
...@@ -11,7 +11,6 @@ import ( ...@@ -11,7 +11,6 @@ import (
import ( import (
log "github.com/AlexStocks/log4go" log "github.com/AlexStocks/log4go"
proto "github.com/gogo/protobuf/proto" proto "github.com/gogo/protobuf/proto"
pb "github.com/golang/protobuf/proto"
"github.com/json-iterator/go" "github.com/json-iterator/go"
jerrors "github.com/juju/errors" jerrors "github.com/juju/errors"
) )
...@@ -43,18 +42,6 @@ func (c gettyCommand) String() string { ...@@ -43,18 +42,6 @@ func (c gettyCommand) String() string {
} }
//////////////////////////////////////////// ////////////////////////////////////////////
// getty call type
////////////////////////////////////////////
type gettyCallType uint32
const (
gettyOneWay gettyCallType = 0x01
gettyTwoWay = 0x02
gettyTwoWayNoReply = 0x03
)
////////////////////////////////////////////
// getty error code // getty error code
//////////////////////////////////////////// ////////////////////////////////////////////
...@@ -117,58 +104,64 @@ func GetCodecType(codecType string) CodecType { ...@@ -117,58 +104,64 @@ func GetCodecType(codecType string) CodecType {
return CodecUnknown return CodecUnknown
} }
// Codec defines the interface that decode/encode body.
type Codec interface { type Codec interface {
Encode(i interface{}) ([]byte, error) Encode(interface{}) ([]byte, error)
Decode(data []byte, i interface{}) error Decode([]byte, interface{}) error
} }
// JSONCodec uses json marshaler and unmarshaler.
type JSONCodec struct{} type JSONCodec struct{}
var ( var (
jsonstd = jsoniter.ConfigCompatibleWithStandardLibrary jsonstd = jsoniter.ConfigCompatibleWithStandardLibrary
) )
// Encode encodes an object into slice of bytes.
func (c JSONCodec) Encode(i interface{}) ([]byte, error) { func (c JSONCodec) Encode(i interface{}) ([]byte, error) {
// return json.Marshal(i) // return json.Marshal(i)
return jsonstd.Marshal(i) return jsonstd.Marshal(i)
} }
// Decode decodes an object from slice of bytes.
func (c JSONCodec) Decode(data []byte, i interface{}) error { func (c JSONCodec) Decode(data []byte, i interface{}) error {
// return json.Unmarshal(data, i) // return json.Unmarshal(data, i)
return jsonstd.Unmarshal(data, i) return jsonstd.Unmarshal(data, i)
} }
// PBCodec uses protobuf marshaler and unmarshaler.
type PBCodec struct{} type PBCodec struct{}
// Encode encodes an object into slice of bytes. // Encode takes the protocol buffer
func (c PBCodec) Encode(i interface{}) ([]byte, error) { // and encodes it into the wire format, returning the data.
if m, ok := i.(proto.Marshaler); ok { func (c PBCodec) Encode(msg interface{}) ([]byte, error) {
return m.Marshal() // Can the object marshal itself?
if pb, ok := msg.(proto.Marshaler); ok {
pbBuf, err := pb.Marshal()
return pbBuf, jerrors.Trace(err)
} }
if m, ok := i.(pb.Message); ok { if pb, ok := msg.(proto.Message); ok {
return pb.Marshal(m) p := proto.NewBuffer(nil)
err := p.Marshal(pb)
return p.Bytes(), jerrors.Trace(err)
} }
return nil, fmt.Errorf("%T is not a proto.Marshaler", i) return nil, fmt.Errorf("protobuf can not marshal %T", msg)
} }
// Decode decodes an object from slice of bytes. // Decode parses the protocol buffer representation in buf and
func (c PBCodec) Decode(data []byte, i interface{}) error { // writes the decoded result to pb. If the struct underlying pb does not match
if m, ok := i.(proto.Unmarshaler); ok { // the data in buf, the results can be unpredictable.
return m.Unmarshal(data) //
// UnmarshalMerge merges into existing data in pb.
// Most code should use Unmarshal instead.
func (c PBCodec) Decode(buf []byte, msg interface{}) error {
// If the object can unmarshal itself, let it.
if u, ok := msg.(proto.Unmarshaler); ok {
return jerrors.Trace(u.Unmarshal(buf))
} }
if m, ok := i.(pb.Message); ok { if pb, ok := msg.(proto.Message); ok {
return pb.Unmarshal(data, m) return jerrors.Trace(proto.NewBuffer(nil).Unmarshal(pb))
} }
return fmt.Errorf("%T is not a proto.Unmarshaler", i) return fmt.Errorf("protobuf can not unmarshal %T", msg)
} }
//////////////////////////////////////////// ////////////////////////////////////////////
...@@ -185,7 +178,6 @@ var ( ...@@ -185,7 +178,6 @@ var (
ErrNotEnoughStream = jerrors.New("packet stream is not enough") ErrNotEnoughStream = jerrors.New("packet stream is not enough")
ErrTooLargePackage = jerrors.New("package length is exceed the getty package's legal maximum length.") ErrTooLargePackage = jerrors.New("package length is exceed the getty package's legal maximum length.")
ErrInvalidPackage = jerrors.New("invalid rpc package") ErrInvalidPackage = jerrors.New("invalid rpc package")
ErrNotFoundServiceOrMethod = jerrors.New("server invalid service or method")
ErrIllegalMagic = jerrors.New("package magic is not right.") ErrIllegalMagic = jerrors.New("package magic is not right.")
) )
...@@ -301,12 +293,12 @@ func (p *GettyPackage) Unmarshal(buf *bytes.Buffer) (int, error) { ...@@ -301,12 +293,12 @@ func (p *GettyPackage) Unmarshal(buf *bytes.Buffer) (int, error) {
type GettyRPCHeaderLenType uint16 type GettyRPCHeaderLenType uint16
// easyjson:json // // easyjson:json
type GettyRPCRequestHeader struct { // type GettyRPCRequestHeader struct {
Service string // Service string
Method string // Method string
CallType gettyCallType // CallType gettyCallType
} // }
type GettyRPCRequest struct { type GettyRPCRequest struct {
header GettyRPCRequestHeader header GettyRPCRequestHeader
...@@ -331,7 +323,7 @@ func (req *GettyRPCRequest) Marshal(sz CodecType, buf *bytes.Buffer) (int, error ...@@ -331,7 +323,7 @@ func (req *GettyRPCRequest) Marshal(sz CodecType, buf *bytes.Buffer) (int, error
if codec == nil { if codec == nil {
return 0, jerrors.Errorf("can not find codec for %s", sz) return 0, jerrors.Errorf("can not find codec for %s", sz)
} }
headerData, err := codec.Encode(req.header) headerData, err := codec.Encode(&req.header)
if err != nil { if err != nil {
return 0, jerrors.Trace(err) return 0, jerrors.Trace(err)
} }
...@@ -410,9 +402,9 @@ func (req *GettyRPCRequest) GetHeader() interface{} { ...@@ -410,9 +402,9 @@ func (req *GettyRPCRequest) GetHeader() interface{} {
// GettyRPCResponse // GettyRPCResponse
//////////////////////////////////////////// ////////////////////////////////////////////
type GettyRPCResponseHeader struct { // type GettyRPCResponseHeader struct {
Error string // Error string
} // }
type GettyRPCResponse struct { type GettyRPCResponse struct {
header GettyRPCResponseHeader header GettyRPCResponseHeader
...@@ -434,7 +426,7 @@ func (resp *GettyRPCResponse) Marshal(sz CodecType, buf *bytes.Buffer) (int, err ...@@ -434,7 +426,7 @@ func (resp *GettyRPCResponse) Marshal(sz CodecType, buf *bytes.Buffer) (int, err
if codec == nil { if codec == nil {
return 0, jerrors.Errorf("can not find codec for %d", sz) return 0, jerrors.Errorf("can not find codec for %d", sz)
} }
headerData, err := codec.Encode(resp.header) headerData, err := codec.Encode(&resp.header)
if err != nil { if err != nil {
return 0, jerrors.Trace(err) return 0, jerrors.Trace(err)
} }
......
This diff is collapsed.
...@@ -90,7 +90,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { ...@@ -90,7 +90,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
h.replyCmd(session, req, gettyCmdHbResponse, "") h.replyCmd(session, req, gettyCmdHbResponse, "")
return return
} }
if req.header.CallType == gettyTwoWayNoReply { if req.header.CallType == CT_TwoWayNoReply {
h.replyCmd(session, req, gettyCmdRPCResponse, "") h.replyCmd(session, req, gettyCmdRPCResponse, "")
function := req.methodType.method.Func function := req.methodType.method.Func
function.Call([]reflect.Value{req.service.rcvr, req.argv, req.replyv}) function.Call([]reflect.Value{req.service.rcvr, req.argv, req.replyv})
......
...@@ -35,3 +35,7 @@ message GettyRPCRequestHeader { ...@@ -35,3 +35,7 @@ message GettyRPCRequestHeader {
optional string Method = 2 [(gogoproto.nullable) = false]; optional string Method = 2 [(gogoproto.nullable) = false];
optional CallType CallType = 3 [(gogoproto.nullable) = false]; optional CallType CallType = 3 [(gogoproto.nullable) = false];
} }
message GettyRPCResponseHeader {
optional string Error = 1 [(gogoproto.nullable) = false];
}
...@@ -9,8 +9,6 @@ ...@@ -9,8 +9,6 @@
# FILE : pb.sh # FILE : pb.sh
# ****************************************************** # ******************************************************
mkdir ./src
# descriptor.proto # descriptor.proto
gopath=~/test/golang/lib/src/github.com/gogo/protobuf/protobuf gopath=~/test/golang/lib/src/github.com/gogo/protobuf/protobuf
# If you are using any gogo.proto extensions you will need to specify the # If you are using any gogo.proto extensions you will need to specify the
...@@ -21,4 +19,4 @@ gogopath=~/test/golang/lib/src/ ...@@ -21,4 +19,4 @@ gogopath=~/test/golang/lib/src/
# protoc -I=$gopath:$gogopath:/Users/alex/test/golang/lib/src/github.com/AlexStocks/goext/database/redis/:./ --gogoslick_out=Mredis_meta.proto="github.com/AlexStocks/goext/database/redis":../app/ cluster_meta.proto # protoc -I=$gopath:$gogopath:/Users/alex/test/golang/lib/src/github.com/AlexStocks/goext/database/redis/:./ --gogoslick_out=Mredis_meta.proto="github.com/AlexStocks/goext/database/redis":../app/ cluster_meta.proto
# protoc -I=$gopath:$gogopath:/Users/alex/test/golang/lib/src/github.com/AlexStocks/goext/database/redis/:./ --gogoslick_out=Mredis_meta.proto="github.com/AlexStocks/goext/database/redis":../app/ response.proto # protoc -I=$gopath:$gogopath:/Users/alex/test/golang/lib/src/github.com/AlexStocks/goext/database/redis/:./ --gogoslick_out=Mredis_meta.proto="github.com/AlexStocks/goext/database/redis":../app/ response.proto
# protoc -I=$gopath:$gogopath:./ --gogoslick_out=Mrole.proto="github.com/AlexStocks/goext/database/registry":./src/ service.proto # protoc -I=$gopath:$gogopath:./ --gogoslick_out=Mrole.proto="github.com/AlexStocks/goext/database/registry":./src/ service.proto
protoc -I=$gopath:$gogopath:./ --gogoslick_out=./src/ rpc.proto protoc -I=$gopath:$gogopath:./ --gogoslick_out=../ codec.proto
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
import ( import (
"github.com/AlexStocks/getty" "github.com/AlexStocks/getty"
"github.com/AlexStocks/goext/log"
log "github.com/AlexStocks/log4go" log "github.com/AlexStocks/log4go"
jerrors "github.com/juju/errors" jerrors "github.com/juju/errors"
) )
...@@ -47,12 +48,16 @@ func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface ...@@ -47,12 +48,16 @@ func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface
return req, length, nil return req, length, nil
} }
// get service & method // get service & method
gxlog.CError("req service:%s, service map:%#v", req.header.Service, p.server.serviceMap)
req.service = p.server.serviceMap[req.header.Service] req.service = p.server.serviceMap[req.header.Service]
if req.service != nil { if req.service != nil {
req.methodType = req.service.method[req.header.Method] req.methodType = req.service.method[req.header.Method]
} }
if req.service == nil || req.methodType == nil { if req.service == nil {
return nil, 0, ErrNotFoundServiceOrMethod return nil, 0, jerrors.Errorf("request service is nil")
}
if req.methodType == nil {
return nil, 0, jerrors.Errorf("request method is nil")
} }
// get args // get args
argIsValue := false argIsValue := false
......
...@@ -504,7 +504,7 @@ LOOP: ...@@ -504,7 +504,7 @@ LOOP:
case outPkg = <-s.wQ: case outPkg = <-s.wQ:
if flag { if flag {
if err = s.writer.Write(s, outPkg); err != nil { if err = s.writer.Write(s, outPkg); err != nil {
log.Error("%s, [session.handleLoop] = error{%s}", s.sessionToken(), err) log.Error("%s, [session.handleLoop] = error{%s}", s.sessionToken(), jerrors.ErrorStack(err))
s.stop() s.stop()
flag = false flag = false
// break LOOP // break LOOP
...@@ -550,7 +550,7 @@ func (s *session) handlePackage() { ...@@ -550,7 +550,7 @@ func (s *session) handlePackage() {
log.Info("%s, [session.handlePackage] gr will exit now, left gr num %d", s.sessionToken(), grNum) log.Info("%s, [session.handlePackage] gr will exit now, left gr num %d", s.sessionToken(), grNum)
s.stop() s.stop()
if err != nil { if err != nil {
log.Error("%s, [session.handlePackage] error{%s}", s.sessionToken(), err) log.Error("%s, [session.handlePackage] error{%s}", s.sessionToken(), jerrors.ErrorStack(err))
s.listener.OnError(s, err) s.listener.OnError(s, err)
} }
}() }()
...@@ -628,7 +628,7 @@ func (s *session) handleTCPPackage() error { ...@@ -628,7 +628,7 @@ func (s *session) handleTCPPackage() error {
// pkg, err = s.pkgHandler.Read(s, pktBuf) // pkg, err = s.pkgHandler.Read(s, pktBuf)
pkg, pkgLen, err = s.reader.Read(s, pktBuf.Bytes()) pkg, pkgLen, err = s.reader.Read(s, pktBuf.Bytes())
if err == nil && s.maxMsgLen > 0 && pkgLen > int(s.maxMsgLen) { if err == nil && s.maxMsgLen > 0 && pkgLen > int(s.maxMsgLen) {
err = ErrMsgTooLong err = jerrors.Errorf("Message Too Long, pkgLen %d, session max message len %d", pkgLen, s.maxMsgLen)
} }
if err != nil { if err != nil {
log.Warn("%s, [session.handleTCPPackage] = len{%d}, error{%s}", log.Warn("%s, [session.handleTCPPackage] = len{%d}, error{%s}",
...@@ -703,7 +703,7 @@ func (s *session) handleUDPPackage() error { ...@@ -703,7 +703,7 @@ func (s *session) handleUDPPackage() error {
pkg, pkgLen, err = s.reader.Read(s, buf[:bufLen]) pkg, pkgLen, err = s.reader.Read(s, buf[:bufLen])
log.Debug("s.reader.Read() = pkg:%#v, pkgLen:%d, err:%s", pkg, pkgLen, jerrors.ErrorStack(err)) log.Debug("s.reader.Read() = pkg:%#v, pkgLen:%d, err:%s", pkg, pkgLen, jerrors.ErrorStack(err))
if err == nil && s.maxMsgLen > 0 && bufLen > int(s.maxMsgLen) { if err == nil && s.maxMsgLen > 0 && bufLen > int(s.maxMsgLen) {
err = ErrMsgTooLong err = jerrors.Errorf("Message Too Long, bufLen %d, session max message len %d", bufLen, s.maxMsgLen)
} }
if err != nil { if err != nil {
log.Warn("%s, [session.handleUDPPackage] = len{%d}, error{%s}", log.Warn("%s, [session.handleUDPPackage] = len{%d}, error{%s}",
...@@ -753,7 +753,7 @@ func (s *session) handleWSPackage() error { ...@@ -753,7 +753,7 @@ func (s *session) handleWSPackage() error {
if s.reader != nil { if s.reader != nil {
unmarshalPkg, length, err = s.reader.Read(s, pkg) unmarshalPkg, length, err = s.reader.Read(s, pkg)
if err == nil && s.maxMsgLen > 0 && length > int(s.maxMsgLen) { if err == nil && s.maxMsgLen > 0 && length > int(s.maxMsgLen) {
err = ErrMsgTooLong err = jerrors.Errorf("Message Too Long, length %d, session max message len %d", length, s.maxMsgLen)
} }
if err != nil { if err != nil {
log.Warn("%s, [session.handleWSPackage] = len{%d}, error{%s}", log.Warn("%s, [session.handleWSPackage] = len{%d}, error{%s}",
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment