Unverified Commit 43189243 authored by Xin.Zh's avatar Xin.Zh Committed by GitHub

Merge pull request #6 from hudangwei/feature/rpc

LGTM
parents 6fbdcda8 1ba7f1ba
...@@ -120,7 +120,7 @@ func NewClient(confFile string) (*Client, error) { ...@@ -120,7 +120,7 @@ func NewClient(confFile string) (*Client, error) {
return c, nil return c, nil
} }
func (c *Client) Call(service, method string, args interface{}, reply interface{}) error { func (c *Client) Call(addr, protocol, service, method string, args interface{}, reply interface{}) error {
b := &GettyRPCRequest{} b := &GettyRPCRequest{}
b.header.Service = service b.header.Service = service
b.header.Method = method b.header.Method = method
...@@ -133,7 +133,7 @@ func (c *Client) Call(service, method string, args interface{}, reply interface{ ...@@ -133,7 +133,7 @@ func (c *Client) Call(service, method string, args interface{}, reply interface{
resp := NewPendingResponse() resp := NewPendingResponse()
resp.reply = reply resp.reply = reply
session := c.selectSession() session := c.selectSession(protocol, addr)
if session == nil { if session == nil {
return errSessionNotExist return errSessionNotExist
} }
...@@ -157,8 +157,12 @@ func (c *Client) Close() { ...@@ -157,8 +157,12 @@ func (c *Client) Close() {
c.registry = nil c.registry = nil
} }
func (c *Client) selectSession() getty.Session { func (c *Client) selectSession(protocol, addr string) getty.Session {
return nil rpcConn, err := c.pool.getConn(protocol, addr)
if err != nil {
return nil
}
return rpcConn.selectSession()
} }
func (c *Client) heartbeat(session getty.Session) error { func (c *Client) heartbeat(session getty.Session) error {
......
...@@ -16,9 +16,7 @@ import ( ...@@ -16,9 +16,7 @@ import (
) )
import ( import (
"github.com/AlexStocks/goext/log"
log "github.com/AlexStocks/log4go" log "github.com/AlexStocks/log4go"
"time"
) )
//////////////////////////////////////////// ////////////////////////////////////////////
...@@ -225,14 +223,14 @@ func (p *GettyPackage) Marshal() (*bytes.Buffer, error) { ...@@ -225,14 +223,14 @@ func (p *GettyPackage) Marshal() (*bytes.Buffer, error) {
buf *bytes.Buffer buf *bytes.Buffer
) )
packLen = gettyPackageHeaderLen packLen = rpcPackagePlaceholderLen + gettyPackageHeaderLen
if p.B != nil { if p.B != nil {
buf = &bytes.Buffer{} buf = &bytes.Buffer{}
length, err = p.B.Marshal(p.H.CodecType, buf) length, err = p.B.Marshal(p.H.CodecType, buf)
if err != nil { if err != nil {
return nil, jerrors.Trace(err) return nil, jerrors.Trace(err)
} }
packLen = gettyPackageHeaderLen + length packLen = rpcPackagePlaceholderLen + gettyPackageHeaderLen + length
} }
buf0 := &bytes.Buffer{} buf0 := &bytes.Buffer{}
err = binary.Write(buf0, binary.LittleEndian, uint16(packLen)) err = binary.Write(buf0, binary.LittleEndian, uint16(packLen))
...@@ -278,7 +276,7 @@ func (p *GettyPackage) Unmarshal(buf *bytes.Buffer) (int, error) { ...@@ -278,7 +276,7 @@ func (p *GettyPackage) Unmarshal(buf *bytes.Buffer) (int, error) {
} }
if int(packLen) > rpcPackagePlaceholderLen+gettyPackageHeaderLen { if int(packLen) > rpcPackagePlaceholderLen+gettyPackageHeaderLen {
if err := p.B.Unmarshal(p.H.CodecType, bytes.NewBuffer(buf.Next(int(packLen)-gettyPackageHeaderLen))); err != nil { if err := p.B.Unmarshal(p.H.CodecType, bytes.NewBuffer(buf.Next(int(packLen)-rpcPackagePlaceholderLen-gettyPackageHeaderLen))); err != nil {
return 0, jerrors.Trace(err) return 0, jerrors.Trace(err)
} }
} }
...@@ -473,7 +471,6 @@ func (resp *GettyRPCResponse) Unmarshal(sz gettyCodecType, buf *bytes.Buffer) er ...@@ -473,7 +471,6 @@ func (resp *GettyRPCResponse) Unmarshal(sz gettyCodecType, buf *bytes.Buffer) er
if err != nil { if err != nil {
return jerrors.Trace(err) return jerrors.Trace(err)
} }
body := make([]byte, bodyLen) body := make([]byte, bodyLen)
err = binary.Read(buf, binary.LittleEndian, body) err = binary.Read(buf, binary.LittleEndian, body)
if err != nil { if err != nil {
...@@ -495,8 +492,6 @@ func (resp *GettyRPCResponse) Unmarshal(sz gettyCodecType, buf *bytes.Buffer) er ...@@ -495,8 +492,6 @@ func (resp *GettyRPCResponse) Unmarshal(sz gettyCodecType, buf *bytes.Buffer) er
} }
func (resp *GettyRPCResponse) GetBody() []byte { func (resp *GettyRPCResponse) GetBody() []byte {
gxlog.CWarn("resp body %p", resp.body)
time.Sleep(5e9)
return resp.body.([]byte) return resp.body.([]byte)
} }
......
...@@ -23,7 +23,7 @@ func main() { ...@@ -23,7 +23,7 @@ func main() {
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
go func() { go func() {
var res string var res string
err := client.Call("TestRpc", "Test", data.TestABC{"aaa", "bbb", "ccc"}, &res) err := client.Call("127.0.0.1:20000", "json", "TestRpc", "Test", data.TestABC{"aaa", "bbb", "ccc"}, &res)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
return return
...@@ -35,7 +35,7 @@ func main() { ...@@ -35,7 +35,7 @@ func main() {
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
go func() { go func() {
var result int var result int
err := client.Call("TestRpc", "Add", 1, &result) err := client.Call("127.0.0.1:20000", "json", "TestRpc", "Add", 1, &result)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
return return
...@@ -45,7 +45,7 @@ func main() { ...@@ -45,7 +45,7 @@ func main() {
} }
var errInt int var errInt int
err = client.Call("TestRpc", "Err", 2, &errInt) err = client.Call("127.0.0.1:20000", "json", "TestRpc", "Err", 2, &errInt)
if err != nil { if err != nil {
log.Error(jerrors.ErrorStack(err)) log.Error(jerrors.ErrorStack(err))
} }
......
...@@ -123,7 +123,9 @@ func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface ...@@ -123,7 +123,9 @@ func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface
resp := &GettyRPCResponsePackage{ resp := &GettyRPCResponsePackage{
H: pkg.H, H: pkg.H,
header: pkg.B.GetHeader().(GettyRPCResponseHeader), header: pkg.B.GetHeader().(GettyRPCResponseHeader),
body: pkg.B.GetBody(), }
if pkg.H.Command != gettyCmdHbResponse {
resp.body = pkg.B.GetBody()
} }
return resp, length, nil return resp, length, nil
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment