Commit 4b3836b3 authored by AlexStocks's avatar AlexStocks

Fix: double unlock

parent 82715a2e
...@@ -29,7 +29,7 @@ type CallOptions struct { ...@@ -29,7 +29,7 @@ type CallOptions struct {
RequestTimeout time.Duration RequestTimeout time.Duration
// response timeout // response timeout
ResponseTimeout time.Duration ResponseTimeout time.Duration
Meta map[interface{}]interface{} Meta map[interface{}]interface{}
} }
type CallOption func(*CallOptions) type CallOption func(*CallOptions)
...@@ -82,49 +82,48 @@ func NewClient(conf *ClientConfig) (*Client, error) { ...@@ -82,49 +82,48 @@ func NewClient(conf *ClientConfig) (*Client, error) {
func (c *Client) Notify(typ CodecType, addr, service, method string, args interface{}, opts ...CallOption) error { func (c *Client) Notify(typ CodecType, addr, service, method string, args interface{}, opts ...CallOption) error {
var copts CallOptions var copts CallOptions
for _, o := range opts { for _, o := range opts {
o(&copts) o(&copts)
} }
return jerrors.Trace(c.call(CT_OneWay, typ, addr, service, method, args, nil, nil, copts)) return jerrors.Trace(c.call(CT_OneWay, typ, addr, service, method, args, nil, nil, copts))
} }
// if @reply is nil, the transport layer will get the response without notify the invoker. // if @reply is nil, the transport layer will get the response without notify the invoker.
func (c *Client) Call(typ CodecType, addr, service, method string, args, reply interface{}, opts ...CallOption) error { func (c *Client) Call(typ CodecType, addr, service, method string, args, reply interface{}, opts ...CallOption) error {
var copts CallOptions var copts CallOptions
for _, o := range opts { for _, o := range opts {
o(&copts) o(&copts)
} }
ct := CT_TwoWay ct := CT_TwoWay
if reply == nil { if reply == nil {
ct = CT_TwoWayNoReply ct = CT_TwoWayNoReply
} }
return jerrors.Trace(c.call(ct, typ, addr, service, method, args, reply, nil, copts)) return jerrors.Trace(c.call(ct, typ, addr, service, method, args, reply, nil, copts))
} }
func (c *Client) AsyncCall(typ CodecType, addr, service, method string, func (c *Client) AsyncCall(typ CodecType, addr, service, method string, args, reply interface{}, handler AsyncHandler, opts ...CallOption) error {
args, reply interface{}, handler AsyncHandler, opts ...CallOption) error {
var copts CallOptions var copts CallOptions
for _, o := range opts { for _, o := range opts {
o(&copts) o(&copts)
} }
return jerrors.Trace(c.call(CT_TwoWay, typ, addr, service, method, args, reply, handler, copts)) return jerrors.Trace(c.call(CT_TwoWay, typ, addr, service, method, args, reply, handler, copts))
} }
func (c *Client) call(ct CallType, typ CodecType, addr, service, method string, func (c *Client) call(ct CallType, typ CodecType, addr, service, method string,
args, reply interface{}, handler AsyncHandler, opts CallOptions) error { args, reply interface{}, handler AsyncHandler, opts CallOptions) error {
if opts.RequestTimeout == 0 { if opts.RequestTimeout == 0 {
opts.RequestTimeout = c.conf.GettySessionParam.tcpWriteTimeout opts.RequestTimeout = c.conf.GettySessionParam.tcpWriteTimeout
} }
if opts.ResponseTimeout == 0 { if opts.ResponseTimeout == 0 {
opts.ResponseTimeout = c.conf.GettySessionParam.tcpReadTimeout opts.ResponseTimeout = c.conf.GettySessionParam.tcpReadTimeout
} }
if !typ.CheckValidity() { if !typ.CheckValidity() {
return errInvalidCodecType return errInvalidCodecType
...@@ -135,7 +134,7 @@ func (c *Client) call(ct CallType, typ CodecType, addr, service, method string, ...@@ -135,7 +134,7 @@ func (c *Client) call(ct CallType, typ CodecType, addr, service, method string,
b.header.Method = method b.header.Method = method
b.header.CallType = ct b.header.CallType = ct
b.body = args b.body = args
var rsp *PendingResponse var rsp *PendingResponse
if ct != CT_OneWay { if ct != CT_OneWay {
rsp = NewPendingResponse() rsp = NewPendingResponse()
...@@ -158,9 +157,9 @@ func (c *Client) call(ct CallType, typ CodecType, addr, service, method string, ...@@ -158,9 +157,9 @@ func (c *Client) call(ct CallType, typ CodecType, addr, service, method string,
if err = c.transfer(session, typ, b, rsp, opts); err != nil { if err = c.transfer(session, typ, b, rsp, opts); err != nil {
return jerrors.Trace(err) return jerrors.Trace(err)
} }
if ct == CT_OneWay || handler != nil { if ct == CT_OneWay || handler != nil {
return nil return nil
} }
select { select {
...@@ -193,9 +192,9 @@ func (c *Client) heartbeat(session getty.Session, typ CodecType) error { ...@@ -193,9 +192,9 @@ func (c *Client) heartbeat(session getty.Session, typ CodecType) error {
return c.transfer(session, typ, nil, NewPendingResponse(), CallOptions{}) return c.transfer(session, typ, nil, NewPendingResponse(), CallOptions{})
} }
func (c *Client) transfer(session getty.Session, typ CodecType,req *GettyRPCRequest, func (c *Client) transfer(session getty.Session, typ CodecType, req *GettyRPCRequest,
rsp *PendingResponse, opts CallOptions) error { rsp *PendingResponse, opts CallOptions) error {
var ( var (
sequence uint64 sequence uint64
err error err error
...@@ -212,7 +211,7 @@ func (c *Client) transfer(session getty.Session, typ CodecType,req *GettyRPCRequ ...@@ -212,7 +211,7 @@ func (c *Client) transfer(session getty.Session, typ CodecType,req *GettyRPCRequ
pkg.H.Command = gettyCmdRPCRequest pkg.H.Command = gettyCmdRPCRequest
pkg.B = req pkg.B = req
} }
if rsp != nil { if rsp != nil {
rsp.seq = sequence rsp.seq = sequence
c.addPendingResponse(rsp) c.addPendingResponse(rsp)
......
...@@ -309,7 +309,6 @@ func (p *gettyRPCClientPool) release(conn *gettyRPCClient, err error) { ...@@ -309,7 +309,6 @@ func (p *gettyRPCClientPool) release(conn *gettyRPCClient, err error) {
connArray := p.connMap[key] connArray := p.connMap[key]
if len(connArray) >= p.size { if len(connArray) >= p.size {
p.Unlock()
conn.close() conn.close()
return return
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment