Commit 1b60b2bc authored by AlexStocks's avatar AlexStocks

reformat codes

parent aaf21345
...@@ -255,7 +255,8 @@ func (c *Client) getClientRpcSession(session getty.Session) (rpcSession, error) ...@@ -255,7 +255,8 @@ func (c *Client) getClientRpcSession(session getty.Session) (rpcSession, error)
break break
} }
} }
return rpcSession, err
return rpcSession, jerrors.Trace(err)
} }
func (c *Client) ping(session getty.Session) error { func (c *Client) ping(session getty.Session) error {
......
...@@ -53,15 +53,23 @@ const ( ...@@ -53,15 +53,23 @@ const (
) )
//////////////////////////////////////////// ////////////////////////////////////////////
// getty error code
////////////////////////////////////////////
type GettyErrorCode int32
const (
GettyOK GettyErrorCode = 0x00
GettyFail = 0x01
)
////////////////////////////////////////////
// GettyPackageHandler // GettyPackageHandler
//////////////////////////////////////////// ////////////////////////////////////////////
const ( const (
gettyPackageMagic = 0x20160905 gettyPackageMagic = 0x20160905
maxPackageLen = 1024 * 1024 maxPackageLen = 1024 * 1024
ReplyTypeData = 0x01
ReplyTypeAck = 0x03
) )
var ( var (
...@@ -90,12 +98,12 @@ type RPCPackage interface { ...@@ -90,12 +98,12 @@ type RPCPackage interface {
} }
type GettyPackageHeader struct { type GettyPackageHeader struct {
Magic uint32 Magic uint32 // magic number
LogID uint32 // log id LogID uint32 // log id
Sequence uint64 // request/response sequence Sequence uint64 // request/response sequence
Command gettyCommand // operation command code Command gettyCommand // operation command code
Code int32 // error code Code GettyErrorCode // error code
ServiceID uint32 // service id ServiceID uint32 // service id
Len uint32 // body length Len uint32 // body length
...@@ -121,16 +129,17 @@ func (p GettyPackage) Marshal() (*bytes.Buffer, error) { ...@@ -121,16 +129,17 @@ func (p GettyPackage) Marshal() (*bytes.Buffer, error) {
buf = &bytes.Buffer{} buf = &bytes.Buffer{}
err = binary.Write(buf, binary.LittleEndian, p.H) err = binary.Write(buf, binary.LittleEndian, p.H)
if err != nil { if err != nil {
return nil, err return nil, jerrors.Trace(err)
} }
if p.B != nil { if p.B != nil {
if err = p.B.Marshal(buf); err != nil { if err = p.B.Marshal(buf); err != nil {
return nil, jerrors.Trace(err) return nil, jerrors.Trace(err)
} }
// body length
length = buf.Len() - gettyPackageHeaderLen length = buf.Len() - gettyPackageHeaderLen
size = (int)((uint)(unsafe.Sizeof(p.H.Len))) size = (int)((uint)(unsafe.Sizeof(p.H.Len)))
buf0 = bytes.NewBuffer(buf.Bytes()[gettyPackageHeaderLen-size:]) buf0 = bytes.NewBuffer(buf.Bytes()[gettyPackageHeaderLen-size : size])
binary.Write(buf0, binary.LittleEndian, length) binary.Write(buf0, binary.LittleEndian, length)
} }
...@@ -143,8 +152,7 @@ func (p *GettyPackage) Unmarshal(buf *bytes.Buffer) (int, error) { ...@@ -143,8 +152,7 @@ func (p *GettyPackage) Unmarshal(buf *bytes.Buffer) (int, error) {
} }
// header // header
err := binary.Read(buf, binary.LittleEndian, &(p.H)) if err := binary.Read(buf, binary.LittleEndian, &(p.H)); err != nil {
if err != nil {
return 0, jerrors.Trace(err) return 0, jerrors.Trace(err)
} }
if p.H.Magic != gettyPackageMagic { if p.H.Magic != gettyPackageMagic {
...@@ -159,7 +167,7 @@ func (p *GettyPackage) Unmarshal(buf *bytes.Buffer) (int, error) { ...@@ -159,7 +167,7 @@ func (p *GettyPackage) Unmarshal(buf *bytes.Buffer) (int, error) {
} }
if p.H.Len != 0 { if p.H.Len != 0 {
if err = p.B.Unmarshal(bytes.NewBuffer(buf.Next(int(p.H.Len)))); err != nil { if err := p.B.Unmarshal(bytes.NewBuffer(buf.Next(int(p.H.Len)))); err != nil {
return 0, jerrors.Trace(err) return 0, jerrors.Trace(err)
} }
} }
...@@ -171,6 +179,8 @@ func (p *GettyPackage) Unmarshal(buf *bytes.Buffer) (int, error) { ...@@ -171,6 +179,8 @@ func (p *GettyPackage) Unmarshal(buf *bytes.Buffer) (int, error) {
// GettyRPCRequest // GettyRPCRequest
//////////////////////////////////////////// ////////////////////////////////////////////
type GettyRPCHeaderLenType uint16
type GettyRPCRequestHeader struct { type GettyRPCRequestHeader struct {
Service string `json:"service,omitempty"` Service string `json:"service,omitempty"`
Method string `json:"method,omitempty"` Method string `json:"method,omitempty"`
...@@ -188,7 +198,7 @@ type GettyRPCRequest struct { ...@@ -188,7 +198,7 @@ type GettyRPCRequest struct {
} }
// json rpc stream format // json rpc stream format
// |-- 2B (GettyRPCRequestHeader length) --|-- GettyRPCRequestHeader --| // |-- 2B (GettyRPCRequestHeader length) --|-- GettyRPCRequestHeader --|-- rpc body --|
func NewGettyRPCRequest(server *Server) *GettyRPCRequest { func NewGettyRPCRequest(server *Server) *GettyRPCRequest {
return &GettyRPCRequest{ return &GettyRPCRequest{
...@@ -277,7 +287,7 @@ func (req *GettyRPCRequest) Unmarshal(buf *bytes.Buffer) error { ...@@ -277,7 +287,7 @@ func (req *GettyRPCRequest) Unmarshal(buf *bytes.Buffer) error {
//////////////////////////////////////////// ////////////////////////////////////////////
type GettyRPCResponseHeader struct { type GettyRPCResponseHeader struct {
Error string `json:"error,omitempty"` Error string `json:"error,omitempty"` // error string
} }
type GettyRPCResponse struct { type GettyRPCResponse struct {
...@@ -296,16 +306,14 @@ func (resp *GettyRPCResponse) Marshal(buf *bytes.Buffer) error { ...@@ -296,16 +306,14 @@ func (resp *GettyRPCResponse) Marshal(buf *bytes.Buffer) error {
return jerrors.Trace(err) return jerrors.Trace(err)
} }
err = binary.Write(buf, binary.LittleEndian, uint16(len(headerData))) err = binary.Write(buf, binary.LittleEndian, (GettyRPCHeaderLenType)(len(headerData)))
if err != nil { if err != nil {
return err return jerrors.Trace(err)
} }
_, err = buf.Write(headerData) if _, err = buf.Write(headerData); err != nil {
if err != nil {
return jerrors.Trace(err) return jerrors.Trace(err)
} }
_, err = buf.Write(bodyData) if _, err = buf.Write(bodyData); err != nil {
if err != nil {
return jerrors.Trace(err) return jerrors.Trace(err)
} }
...@@ -318,21 +326,17 @@ func (resp *GettyRPCResponse) Unmarshal(buf *bytes.Buffer) error { ...@@ -318,21 +326,17 @@ func (resp *GettyRPCResponse) Unmarshal(buf *bytes.Buffer) error {
return ErrNotEnoughStream return ErrNotEnoughStream
} }
var headerLen uint16 var headerLen GettyRPCHeaderLenType
err := binary.Read(buf, binary.LittleEndian, &headerLen) err := binary.Read(buf, binary.LittleEndian, &headerLen)
if err != nil { if err != nil {
return err return jerrors.Trace(err)
} }
header := buf.Next(int(headerLen)) header := buf.Next(int(headerLen))
if len(header) != int(headerLen) { if len(header) != int(headerLen) {
return ErrNotEnoughStream return ErrNotEnoughStream
} }
body := buf.Next(int(buf.Len())) resp.body = buf.Next(int(buf.Len()))
if len(body) == 0 {
return ErrNotEnoughStream
}
resp.body = body
err = json.Unmarshal(header, resp.header) err = json.Unmarshal(header, resp.header)
if err != nil { if err != nil {
return jerrors.Trace(err) return jerrors.Trace(err)
......
...@@ -57,7 +57,7 @@ func (h *RpcServerHandler) OnOpen(session getty.Session) error { ...@@ -57,7 +57,7 @@ func (h *RpcServerHandler) OnOpen(session getty.Session) error {
} }
h.rwlock.RUnlock() h.rwlock.RUnlock()
if err != nil { if err != nil {
return err return jerrors.Trace(err)
} }
log.Info("got session:%s", session.Stat()) log.Info("got session:%s", session.Stat())
...@@ -143,6 +143,7 @@ func (h *RpcServerHandler) replyCmd(session getty.Session, reqPkg *GettyPackage, ...@@ -143,6 +143,7 @@ func (h *RpcServerHandler) replyCmd(session getty.Session, reqPkg *GettyPackage,
rspPkg.H.Code = 0 rspPkg.H.Code = 0
rspPkg.H.Command = gettyCmdRPCResponse rspPkg.H.Command = gettyCmdRPCResponse
if len(err) != 0 { if len(err) != 0 {
rspPkg.H.Code = GettyFail
rspPkg.B = &GettyRPCResponse{ rspPkg.B = &GettyRPCResponse{
header: GettyRPCResponseHeader{ header: GettyRPCResponseHeader{
Error: err, Error: err,
...@@ -227,7 +228,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { ...@@ -227,7 +228,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
log.Error("response body:{%#v} type is not *GettyRPCResponse", p.B) log.Error("response body:{%#v} type is not *GettyRPCResponse", p.B)
return return
} }
if len(rsp.header.Error) > 0 { if p.H.Code == GettyFail && len(rsp.header.Error) > 0 {
pendingResponse.err = jerrors.New(rsp.header.Error) pendingResponse.err = jerrors.New(rsp.header.Error)
} }
err := json.Unmarshal(rsp.body.([]byte), pendingResponse.reply) err := json.Unmarshal(rsp.body.([]byte), pendingResponse.reply)
......
...@@ -25,15 +25,10 @@ func NewRpcServerPackageHandler(server *Server) *RpcServerPackageHandler { ...@@ -25,15 +25,10 @@ func NewRpcServerPackageHandler(server *Server) *RpcServerPackageHandler {
} }
func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) { func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
var ( var pkg GettyPackage
err error
length int buf := bytes.NewBuffer(data)
pkg GettyPackage length, err := pkg.Unmarshal(buf)
buf *bytes.Buffer
)
buf = bytes.NewBuffer(data)
length, err = pkg.Unmarshal(buf)
if err != nil { if err != nil {
if jerrors.Cause(err) == ErrNotEnoughStream { if jerrors.Cause(err) == ErrNotEnoughStream {
return nil, 0, nil return nil, 0, nil
...@@ -45,19 +40,13 @@ func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface ...@@ -45,19 +40,13 @@ func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface
} }
func (p *RpcServerPackageHandler) Write(ss getty.Session, pkg interface{}) error { func (p *RpcServerPackageHandler) Write(ss getty.Session, pkg interface{}) error {
var ( resp, ok := pkg.(*GettyPackage)
ok bool if !ok {
err error
resp *GettyPackage
buf *bytes.Buffer
)
if resp, ok = pkg.(*GettyPackage); !ok {
log.Error("illegal pkg:%+v\n", pkg) log.Error("illegal pkg:%+v\n", pkg)
return jerrors.New("invalid rpc response") return jerrors.New("invalid rpc response")
} }
buf, err = resp.Marshal() buf, err := resp.Marshal()
if err != nil { if err != nil {
log.Warn("binary.Write(resp{%#v}) = err{%#v}", resp, err) log.Warn("binary.Write(resp{%#v}) = err{%#v}", resp, err)
return jerrors.Trace(err) return jerrors.Trace(err)
...@@ -78,15 +67,10 @@ func NewRpcClientPackageHandler() *RpcClientPackageHandler { ...@@ -78,15 +67,10 @@ func NewRpcClientPackageHandler() *RpcClientPackageHandler {
} }
func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) { func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
var ( var pkg GettyPackage
err error
length int buf := bytes.NewBuffer(data)
pkg GettyPackage length, err := pkg.Unmarshal(buf)
buf *bytes.Buffer
)
buf = bytes.NewBuffer(data)
length, err = pkg.Unmarshal(buf)
if err != nil { if err != nil {
if err == ErrNotEnoughStream { if err == ErrNotEnoughStream {
return nil, 0, nil return nil, 0, nil
...@@ -98,19 +82,13 @@ func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface ...@@ -98,19 +82,13 @@ func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface
} }
func (p *RpcClientPackageHandler) Write(ss getty.Session, pkg interface{}) error { func (p *RpcClientPackageHandler) Write(ss getty.Session, pkg interface{}) error {
var ( req, ok := pkg.(*GettyPackage)
ok bool if !ok {
err error
req *GettyPackage
buf *bytes.Buffer
)
if req, ok = pkg.(*GettyPackage); !ok {
log.Error("illegal pkg:%+v\n", pkg) log.Error("illegal pkg:%+v\n", pkg)
return jerrors.New("invalid rpc request") return jerrors.New("invalid rpc request")
} }
buf, err = req.Marshal() buf, err := req.Marshal()
if err != nil { if err != nil {
log.Warn("binary.Write(req{%#v}) = err{%#v}", req, err) log.Warn("binary.Write(req{%#v}) = err{%#v}", req, err)
return jerrors.Trace(err) return jerrors.Trace(err)
......
...@@ -323,7 +323,7 @@ func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error { ...@@ -323,7 +323,7 @@ func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error {
s.incWritePkgNum() s.incWritePkgNum()
// gxlog.CError("after incWritePkgNum, ss:%s", s.Stat()) // gxlog.CError("after incWritePkgNum, ss:%s", s.Stat())
} }
return err return jerrors.Trace(err)
} }
select { select {
case s.wQ <- pkg: case s.wQ <- pkg:
...@@ -640,7 +640,7 @@ func (s *session) handleTCPPackage() error { ...@@ -640,7 +640,7 @@ func (s *session) handleTCPPackage() error {
} }
} }
return err return jerrors.Trace(err)
} }
// get package from udp packet // get package from udp packet
...@@ -709,7 +709,7 @@ func (s *session) handleUDPPackage() error { ...@@ -709,7 +709,7 @@ func (s *session) handleUDPPackage() error {
s.rQ <- UDPContext{Pkg: pkg, PeerAddr: addr} s.rQ <- UDPContext{Pkg: pkg, PeerAddr: addr}
} }
return err return jerrors.Trace(err)
} }
// get package from websocket stream // get package from websocket stream
...@@ -737,7 +737,7 @@ func (s *session) handleWSPackage() error { ...@@ -737,7 +737,7 @@ func (s *session) handleWSPackage() error {
log.Warn("%s, [session.handleWSPackage] = error{%s}", log.Warn("%s, [session.handleWSPackage] = error{%s}",
s.sessionToken(), jerrors.ErrorStack(err)) s.sessionToken(), jerrors.ErrorStack(err))
// s.errFlag = true // s.errFlag = true
return err return jerrors.Trace(err)
} }
s.UpdateActive() s.UpdateActive()
if s.reader != nil { if s.reader != nil {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment