Commit 54b2d545 authored by AlexStocks's avatar AlexStocks

Mod: package format form {2 Bytes Header len + Header + 2 Body len + Body} to {Header + Body}

parent b79f0126
...@@ -32,7 +32,7 @@ type Client struct { ...@@ -32,7 +32,7 @@ type Client struct {
sequence gxatomic.Uint64 sequence gxatomic.Uint64
pendingLock sync.RWMutex pendingLock sync.RWMutex
pendingResponses map[uint64]*PendingResponse pendingResponses map[SequenceType]*PendingResponse
} }
func NewClient(conf *ClientConfig) (*Client, error) { func NewClient(conf *ClientConfig) (*Client, error) {
...@@ -41,7 +41,7 @@ func NewClient(conf *ClientConfig) (*Client, error) { ...@@ -41,7 +41,7 @@ func NewClient(conf *ClientConfig) (*Client, error) {
} }
c := &Client{ c := &Client{
pendingResponses: make(map[uint64]*PendingResponse), pendingResponses: make(map[SequenceType]*PendingResponse),
conf: *conf, conf: *conf,
} }
c.pool = newGettyRPCClientConnPool(c, conf.PoolSize, time.Duration(int(time.Second)*conf.PoolTTL)) c.pool = newGettyRPCClientConnPool(c, conf.PoolSize, time.Duration(int(time.Second)*conf.PoolTTL))
...@@ -85,7 +85,7 @@ func (c *Client) Call(typ CodecType, addr, service, method string, args interfac ...@@ -85,7 +85,7 @@ func (c *Client) Call(typ CodecType, addr, service, method string, args interfac
select { select {
case <-getty.GetTimeWheel().After(c.conf.GettySessionParam.tcpReadTimeout): case <-getty.GetTimeWheel().After(c.conf.GettySessionParam.tcpReadTimeout):
err = errClientReadTimeout err = errClientReadTimeout
c.RemovePendingResponse(rsp.seq) c.RemovePendingResponse(SequenceType(rsp.seq))
case <-rsp.done: case <-rsp.done:
err = rsp.err err = rsp.err
} }
...@@ -121,9 +121,9 @@ func (c *Client) transfer(session getty.Session, typ CodecType, req *GettyRPCReq ...@@ -121,9 +121,9 @@ func (c *Client) transfer(session getty.Session, typ CodecType, req *GettyRPCReq
) )
sequence = c.sequence.Add(1) sequence = c.sequence.Add(1)
pkg.H.Magic = gettyPackageMagic pkg.H.Magic = MagicType(gettyPackageMagic)
pkg.H.LogID = (uint32)(randomID()) pkg.H.LogID = LogIDType(randomID())
pkg.H.Sequence = sequence pkg.H.Sequence = SequenceType(sequence)
pkg.H.Command = gettyCmdHbRequest pkg.H.Command = gettyCmdHbRequest
pkg.H.CodecType = typ pkg.H.CodecType = typ
if req != nil { if req != nil {
...@@ -136,7 +136,7 @@ func (c *Client) transfer(session getty.Session, typ CodecType, req *GettyRPCReq ...@@ -136,7 +136,7 @@ func (c *Client) transfer(session getty.Session, typ CodecType, req *GettyRPCReq
err = session.WritePkg(pkg, 0) err = session.WritePkg(pkg, 0)
if err != nil { if err != nil {
c.RemovePendingResponse(rsp.seq) c.RemovePendingResponse(SequenceType(rsp.seq))
} }
return jerrors.Trace(err) return jerrors.Trace(err)
...@@ -151,10 +151,10 @@ func (c *Client) PendingResponseCount() int { ...@@ -151,10 +151,10 @@ func (c *Client) PendingResponseCount() int {
func (c *Client) AddPendingResponse(pr *PendingResponse) { func (c *Client) AddPendingResponse(pr *PendingResponse) {
c.pendingLock.Lock() c.pendingLock.Lock()
defer c.pendingLock.Unlock() defer c.pendingLock.Unlock()
c.pendingResponses[pr.seq] = pr c.pendingResponses[SequenceType(pr.seq)] = pr
} }
func (c *Client) RemovePendingResponse(seq uint64) *PendingResponse { func (c *Client) RemovePendingResponse(seq SequenceType) *PendingResponse {
c.pendingLock.Lock() c.pendingLock.Lock()
defer c.pendingLock.Unlock() defer c.pendingLock.Unlock()
if c.pendingResponses == nil { if c.pendingResponses == nil {
...@@ -167,7 +167,7 @@ func (c *Client) RemovePendingResponse(seq uint64) *PendingResponse { ...@@ -167,7 +167,7 @@ func (c *Client) RemovePendingResponse(seq uint64) *PendingResponse {
return nil return nil
} }
func (c *Client) ClearPendingResponses() map[uint64]*PendingResponse { func (c *Client) ClearPendingResponses() map[SequenceType]*PendingResponse {
c.pendingLock.Lock() c.pendingLock.Lock()
defer c.pendingLock.Unlock() defer c.pendingLock.Unlock()
presps := c.pendingResponses presps := c.pendingResponses
......
...@@ -19,7 +19,7 @@ import ( ...@@ -19,7 +19,7 @@ import (
// getty command // getty command
//////////////////////////////////////////// ////////////////////////////////////////////
type gettyCommand uint32 type gettyCommand int32
const ( const (
gettyDefaultCmd gettyCommand = 0x00 gettyDefaultCmd gettyCommand = 0x00
...@@ -45,7 +45,7 @@ func (c gettyCommand) String() string { ...@@ -45,7 +45,7 @@ func (c gettyCommand) String() string {
// getty error code // getty error code
//////////////////////////////////////////// ////////////////////////////////////////////
type GettyErrorCode int32 type GettyErrorCode int16
const ( const (
GettyOK GettyErrorCode = 0x00 GettyOK GettyErrorCode = 0x00
...@@ -56,7 +56,7 @@ const ( ...@@ -56,7 +56,7 @@ const (
// getty codec type // getty codec type
//////////////////////////////////////////// ////////////////////////////////////////////
type CodecType uint32 type CodecType int16
const ( const (
CodecUnknown CodecType = 0x00 CodecUnknown CodecType = 0x00
...@@ -104,6 +104,10 @@ func GetCodecType(codecType string) CodecType { ...@@ -104,6 +104,10 @@ func GetCodecType(codecType string) CodecType {
return CodecUnknown return CodecUnknown
} }
////////////////////////////////////////////
// getty codec
////////////////////////////////////////////
type Codec interface { type Codec interface {
Encode(interface{}) ([]byte, error) Encode(interface{}) ([]byte, error)
Decode([]byte, interface{}) error Decode([]byte, interface{}) error
...@@ -170,8 +174,8 @@ func (c PBCodec) Decode(buf []byte, msg interface{}) error { ...@@ -170,8 +174,8 @@ func (c PBCodec) Decode(buf []byte, msg interface{}) error {
const ( const (
gettyPackageMagic = 0x20160905 gettyPackageMagic = 0x20160905
maxPackageLen = 1024 * 1024 maxPackageLen = 4 * 1024 * 1024
rpcPackagePlaceholderLen = 2 gettyPackageHeaderLen = (int)((uint)(unsafe.Sizeof(GettyPackageHeader{})))
) )
var ( var (
...@@ -181,14 +185,6 @@ var ( ...@@ -181,14 +185,6 @@ var (
ErrIllegalMagic = jerrors.New("package magic is not right.") ErrIllegalMagic = jerrors.New("package magic is not right.")
) )
var (
gettyPackageHeaderLen int
)
func init() {
gettyPackageHeaderLen = (int)((uint)(unsafe.Sizeof(GettyPackageHeader{})))
}
type RPCPackage interface { type RPCPackage interface {
Marshal(CodecType, *bytes.Buffer) (int, error) Marshal(CodecType, *bytes.Buffer) (int, error)
// @buf length should be equal to GettyPkg.GettyPackageHeader.Len // @buf length should be equal to GettyPkg.GettyPackageHeader.Len
...@@ -197,16 +193,25 @@ type RPCPackage interface { ...@@ -197,16 +193,25 @@ type RPCPackage interface {
GetHeader() interface{} GetHeader() interface{}
} }
type (
MagicType int32
LogIDType int32
SequenceType uint64
ServiceIDType int32
PkgLenType int32
)
type GettyPackageHeader struct { type GettyPackageHeader struct {
Magic uint32 // magic number Magic MagicType // magic number
LogID uint32 // log id LogID LogIDType // log id
Sequence uint64 // request/response sequence Sequence SequenceType // request/response sequence
Command gettyCommand // operation command code Command gettyCommand // operation command code
Code GettyErrorCode // error code ServiceID ServiceIDType // service id
ServiceID uint32 // service id Code GettyErrorCode // error code
CodecType CodecType CodecType CodecType
PkgLen PkgLenType
} }
type GettyPackage struct { type GettyPackage struct {
...@@ -222,69 +227,59 @@ func (p GettyPackage) String() string { ...@@ -222,69 +227,59 @@ func (p GettyPackage) String() string {
func (p *GettyPackage) Marshal() (*bytes.Buffer, error) { func (p *GettyPackage) Marshal() (*bytes.Buffer, error) {
var ( var (
err error err error
packLen, length int headerBuf, buf *bytes.Buffer
buf *bytes.Buffer
) )
packLen = rpcPackagePlaceholderLen + gettyPackageHeaderLen buf = bytes.NewBuffer(make([]byte, gettyPackageHeaderLen))
// body
if p.B != nil { if p.B != nil {
buf = &bytes.Buffer{} length, err := p.B.Marshal(p.H.CodecType, buf)
length, err = p.B.Marshal(p.H.CodecType, buf)
if err != nil { if err != nil {
return nil, jerrors.Trace(err) return nil, jerrors.Trace(err)
} }
packLen = rpcPackagePlaceholderLen + gettyPackageHeaderLen + length p.H.PkgLen = PkgLenType(length)
}
buf0 := &bytes.Buffer{}
err = binary.Write(buf0, binary.LittleEndian, uint16(packLen))
if err != nil {
return nil, jerrors.Trace(err)
} }
err = binary.Write(buf0, binary.LittleEndian, p.H)
// header
headerBuf = bytes.NewBuffer(nil)
err = binary.Write(headerBuf, binary.LittleEndian, p.H)
if err != nil { if err != nil {
return nil, jerrors.Trace(err) return nil, jerrors.Trace(err)
} }
if p.B != nil { copy(buf.Bytes(), headerBuf.Bytes()[:gettyPackageHeaderLen])
if err = binary.Write(buf0, binary.LittleEndian, buf.Bytes()); err != nil {
return nil, jerrors.Trace(err) return buf, nil
}
}
return buf0, nil
} }
func (p *GettyPackage) Unmarshal(buf *bytes.Buffer) (int, error) { func (p *GettyPackage) Unmarshal(buf *bytes.Buffer) (int, error) {
var err error bufLen := buf.Len()
if buf.Len() < rpcPackagePlaceholderLen+gettyPackageHeaderLen { if bufLen < gettyPackageHeaderLen {
return 0, ErrNotEnoughStream return 0, ErrNotEnoughStream
} }
var packLen uint16
err = binary.Read(buf, binary.LittleEndian, &packLen)
if err != nil {
return 0, jerrors.Trace(err)
}
if int(packLen) > maxPackageLen {
return 0, ErrTooLargePackage
}
if int(packLen) < gettyPackageHeaderLen {
return 0, ErrInvalidPackage
}
// header // header
if err := binary.Read(buf, binary.LittleEndian, &(p.H)); err != nil { if err := binary.Read(buf, binary.LittleEndian, &(p.H)); err != nil {
return 0, jerrors.Trace(err) return 0, jerrors.Trace(err)
} }
if p.H.Magic != gettyPackageMagic { if p.H.Magic != gettyPackageMagic {
log.Error("@p.H.Magic{%x}, right magic{%x}", p.H.Magic, gettyPackageMagic) log.Error("@p.H.Magic{%x}, right magic{%x}", p.H.Magic, gettyPackageMagic)
return 0, ErrIllegalMagic return 0, ErrIllegalMagic
} }
if int(packLen) > rpcPackagePlaceholderLen+gettyPackageHeaderLen { totalLen := int(PkgLenType(gettyPackageHeaderLen) + p.H.PkgLen)
if err := p.B.Unmarshal(p.H.CodecType, bytes.NewBuffer(buf.Next(int(packLen)-rpcPackagePlaceholderLen-gettyPackageHeaderLen))); err != nil { if totalLen > maxPackageLen {
return 0, ErrTooLargePackage
}
if bufLen >= totalLen {
if err := p.B.Unmarshal(p.H.CodecType, bytes.NewBuffer(buf.Next(int(p.H.PkgLen)))); err != nil {
return 0, jerrors.Trace(err) return 0, jerrors.Trace(err)
} }
} }
return int(packLen), nil return totalLen, nil
} }
//////////////////////////////////////////// ////////////////////////////////////////////
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment