Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in / Register
Toggle navigation
G
getty
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
wei.xuan
getty
Commits
388d0a67
Unverified
Commit
388d0a67
authored
Jul 06, 2018
by
Xin.Zh
Committed by
GitHub
Jul 06, 2018
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #4 from hudangwei/master
LGTM
parents
27a8e0c8
7ea8cfb5
Hide whitespace changes
Inline
Side-by-side
Showing
17 changed files
with
534 additions
and
337 deletions
+534
-337
client.go
client.go
+1
-0
conn.go
conn.go
+4
-1
gopool.go
gopool.go
+58
-0
client.go
rpc/client.go
+31
-37
codec.go
rpc/codec.go
+235
-110
codec_easyjson.go
rpc/codec_easyjson.go
+0
-114
config.go
rpc/config.go
+72
-0
client.go
rpc/example/client/client.go
+2
-1
client_config.toml
rpc/example/client/client_config.toml
+3
-9
server.go
rpc/example/server/server.go
+3
-1
server_config.toml
rpc/example/server/server_config.toml
+2
-2
listener.go
rpc/listener.go
+40
-51
readwriter.go
rpc/readwriter.go
+55
-8
rpc.go
rpc/rpc.go
+0
-1
server.go
rpc/server.go
+2
-1
util.go
rpc/util.go
+19
-0
session.go
session.go
+7
-1
No files found.
client.go
View file @
388d0a67
...
...
@@ -383,6 +383,7 @@ func (c *client) RunEventLoop(newSession NewSessionCallback) {
c
.
newSession
=
newSession
c
.
Unlock
()
log
.
Info
(
"run"
)
c
.
wg
.
Add
(
1
)
// a for-loop goroutine to make sure the connection is valid
go
func
()
{
...
...
conn.go
View file @
388d0a67
...
...
@@ -15,6 +15,7 @@ import (
"fmt"
"io"
"net"
"sync"
"sync/atomic"
"time"
)
...
...
@@ -180,6 +181,7 @@ func newGettyTCPConn(conn net.Conn) *gettyTCPConn {
// for zip compress
type
writeFlusher
struct
{
flusher
*
flate
.
Writer
lock
sync
.
Mutex
}
func
(
t
*
writeFlusher
)
Write
(
p
[]
byte
)
(
int
,
error
)
{
...
...
@@ -187,7 +189,8 @@ func (t *writeFlusher) Write(p []byte) (int, error) {
n
int
err
error
)
t
.
lock
.
Lock
()
defer
t
.
lock
.
Unlock
()
n
,
err
=
t
.
flusher
.
Write
(
p
)
if
err
!=
nil
{
return
n
,
jerrors
.
Trace
(
err
)
...
...
gopool.go
0 → 100644
View file @
388d0a67
package
getty
import
(
"fmt"
"time"
)
var
ErrScheduleTimeout
=
fmt
.
Errorf
(
"schedule error: timed out"
)
type
Pool
struct
{
sem
chan
struct
{}
work
chan
func
()
}
func
NewPool
(
size
,
queue
,
spawn
int
)
*
Pool
{
if
spawn
<=
0
&&
queue
>
0
{
panic
(
"dead queue configuration detected"
)
}
if
spawn
>
size
{
panic
(
"spawn > workers"
)
}
p
:=
&
Pool
{
sem
:
make
(
chan
struct
{},
size
),
work
:
make
(
chan
func
(),
queue
),
}
for
i
:=
0
;
i
<
spawn
;
i
++
{
p
.
sem
<-
struct
{}{}
go
p
.
worker
(
func
()
{})
}
return
p
}
func
(
p
*
Pool
)
ScheduleTimeout
(
timeout
time
.
Duration
,
task
func
())
error
{
return
p
.
schedule
(
task
,
time
.
After
(
timeout
))
}
func
(
p
*
Pool
)
schedule
(
task
func
(),
timeout
<-
chan
time
.
Time
)
error
{
select
{
case
<-
timeout
:
return
ErrScheduleTimeout
case
p
.
work
<-
task
:
return
nil
case
p
.
sem
<-
struct
{}{}
:
go
p
.
worker
(
task
)
return
nil
}
}
func
(
p
*
Pool
)
worker
(
task
func
())
{
defer
func
()
{
<-
p
.
sem
}()
task
()
for
task
:=
range
p
.
work
{
task
()
}
}
rpc/client.go
View file @
388d0a67
...
...
@@ -23,7 +23,6 @@ var (
errInvalidAddress
=
jerrors
.
New
(
"remote address invalid or empty"
)
errSessionNotExist
=
jerrors
.
New
(
"session not exist"
)
errClientClosed
=
jerrors
.
New
(
"client closed"
)
src
=
rand
.
NewSource
(
time
.
Now
()
.
UnixNano
())
)
func
init
()
{
...
...
@@ -35,16 +34,16 @@ type Client struct {
lock
sync
.
RWMutex
sessions
[]
*
rpcSession
gettyClient
getty
.
Client
codecType
SerializeType
sequence
uint64
pendingLock
sync
.
RWMutex
pendingResponses
map
[
uint64
]
*
PendingResponse
sendLock
sync
.
Mutex
}
func
NewClient
(
conf
*
ClientConfig
)
*
Client
{
func
NewClient
(
confFile
string
)
*
Client
{
conf
:=
loadClientConf
(
confFile
)
c
:=
&
Client
{
pendingResponses
:
make
(
map
[
uint64
]
*
PendingResponse
),
conf
:
conf
,
...
...
@@ -52,6 +51,7 @@ func NewClient(conf *ClientConfig) *Client {
getty
.
WithServerAddress
(
gxnet
.
HostAddress
(
conf
.
ServerHost
,
conf
.
ServerPort
)),
getty
.
WithConnectionNumber
((
int
)(
conf
.
ConnectionNum
)),
),
codecType
:
JSON
,
}
c
.
gettyClient
.
RunEventLoop
(
c
.
newSession
)
idx
:=
1
...
...
@@ -71,6 +71,10 @@ func NewClient(conf *ClientConfig) *Client {
return
c
}
func
(
c
*
Client
)
SetCodecType
(
st
SerializeType
)
{
c
.
codecType
=
st
}
func
(
c
*
Client
)
newSession
(
session
getty
.
Session
)
error
{
var
(
ok
bool
...
...
@@ -113,14 +117,14 @@ func (c *Client) Sequence() uint64 {
}
func
(
c
*
Client
)
Call
(
service
,
method
string
,
args
interface
{},
reply
interface
{})
error
{
req
:=
NewGettyRPCRequest
(
nil
)
req
.
header
.
Service
=
service
req
.
header
.
Method
=
method
req
.
header
.
CallType
=
gettyTwoWay
b
:=
&
GettyRPCRequest
{}
b
.
header
.
Service
=
service
b
.
header
.
Method
=
method
b
.
header
.
CallType
=
gettyTwoWay
if
reply
==
nil
{
req
.
header
.
CallType
=
gettyTwoWayNoReply
b
.
header
.
CallType
=
gettyTwoWayNoReply
}
req
.
body
=
args
b
.
body
=
args
resp
:=
NewPendingResponse
()
resp
.
reply
=
reply
...
...
@@ -130,7 +134,7 @@ func (c *Client) Call(service, method string, args interface{}, reply interface{
return
errSessionNotExist
}
if
err
:=
c
.
transfer
(
session
,
req
,
resp
);
err
!=
nil
{
if
err
:=
c
.
transfer
(
session
,
b
,
resp
);
err
!=
nil
{
return
jerrors
.
Trace
(
err
)
}
<-
resp
.
done
...
...
@@ -147,30 +151,24 @@ func (c *Client) isAvailable() bool {
}
func
(
c
*
Client
)
Close
()
{
var
sessions
*
[]
*
rpcSession
c
.
lock
.
Lock
()
if
c
.
gettyClient
!=
nil
{
sessions
=
&
(
c
.
sessions
)
c
.
sessions
=
nil
c
.
gettyClient
.
Close
()
c
.
gettyClient
=
nil
c
.
sessions
=
c
.
sessions
[
:
0
]
}
c
.
lock
.
Unlock
()
if
sessions
!=
nil
{
for
_
,
s
:=
range
*
sessions
{
for
_
,
s
:=
range
c
.
sessions
{
log
.
Info
(
"close client session{%s, last active:%s, request number:%d}"
,
s
.
session
.
Stat
(),
s
.
session
.
GetActive
()
.
String
(),
s
.
reqNum
)
s
.
session
.
Close
()
}
c
.
gettyClient
.
Close
()
c
.
gettyClient
=
nil
c
.
sessions
=
c
.
sessions
[
:
0
]
}
c
.
lock
.
Unlock
()
}
func
(
c
*
Client
)
selectSession
()
getty
.
Session
{
c
.
lock
.
RLock
()
defer
c
.
lock
.
RUnlock
()
if
c
.
sessions
==
nil
{
return
nil
}
...
...
@@ -189,12 +187,8 @@ func (c *Client) addSession(session getty.Session) {
}
c
.
lock
.
Lock
()
defer
c
.
lock
.
Unlock
()
if
c
.
sessions
==
nil
{
return
}
c
.
sessions
=
append
(
c
.
sessions
,
&
rpcSession
{
session
:
session
})
c
.
lock
.
Unlock
()
}
func
(
c
*
Client
)
removeSession
(
session
getty
.
Session
)
{
...
...
@@ -259,8 +253,9 @@ func (c *Client) getClientRpcSession(session getty.Session) (rpcSession, error)
return
rpcSession
,
jerrors
.
Trace
(
err
)
}
func
(
c
*
Client
)
ping
(
session
getty
.
Session
)
error
{
return
c
.
transfer
(
session
,
nil
,
nil
)
func
(
c
*
Client
)
heartbeat
(
session
getty
.
Session
)
error
{
resp
:=
NewPendingResponse
()
return
c
.
transfer
(
session
,
nil
,
resp
)
}
func
(
c
*
Client
)
transfer
(
session
getty
.
Session
,
req
*
GettyRPCRequest
,
resp
*
PendingResponse
)
error
{
...
...
@@ -272,19 +267,18 @@ func (c *Client) transfer(session getty.Session, req *GettyRPCRequest, resp *Pen
sequence
=
c
.
Sequence
()
pkg
.
H
.
Magic
=
gettyPackageMagic
pkg
.
H
.
LogID
=
(
uint32
)(
src
.
Int63
())
pkg
.
H
.
LogID
=
(
uint32
)(
randomID
())
pkg
.
H
.
Sequence
=
sequence
pkg
.
H
.
Command
=
gettyCmdHbRequest
if
req
!=
nil
&&
resp
!=
nil
{
pkg
.
H
.
CodecType
=
c
.
codecType
if
req
!=
nil
{
pkg
.
H
.
Command
=
gettyCmdRPCRequest
pkg
.
B
=
req
resp
.
seq
=
sequence
c
.
AddPendingResponse
(
resp
)
}
c
.
sendLock
.
Lock
()
defer
c
.
sendLock
.
Unlock
()
resp
.
seq
=
sequence
c
.
AddPendingResponse
(
resp
)
err
=
session
.
WritePkg
(
pkg
,
0
)
if
err
!=
nil
&&
resp
!=
nil
{
c
.
RemovePendingResponse
(
resp
.
seq
)
...
...
rpc/codec.go
View file @
388d0a67
...
...
@@ -10,10 +10,15 @@ import (
)
import
(
log
"github.com/AlexStocks/log4go"
proto
"github.com/gogo/protobuf/proto"
pb
"github.com/golang/protobuf/proto"
jerrors
"github.com/juju/errors"
)
import
(
log
"github.com/AlexStocks/log4go"
)
////////////////////////////////////////////
// getty command
////////////////////////////////////////////
...
...
@@ -63,6 +68,68 @@ const (
GettyFail
=
0x01
)
type
SerializeType
byte
const
(
JSON
SerializeType
=
iota
ProtoBuffer
)
var
(
Codecs
=
map
[
SerializeType
]
Codec
{
JSON
:
&
JSONCodec
{},
ProtoBuffer
:
&
PBCodec
{},
}
)
// Codec defines the interface that decode/encode body.
type
Codec
interface
{
Encode
(
i
interface
{})
([]
byte
,
error
)
Decode
(
data
[]
byte
,
i
interface
{})
error
}
// JSONCodec uses json marshaler and unmarshaler.
type
JSONCodec
struct
{}
// Encode encodes an object into slice of bytes.
func
(
c
JSONCodec
)
Encode
(
i
interface
{})
([]
byte
,
error
)
{
return
json
.
Marshal
(
i
)
}
// Decode decodes an object from slice of bytes.
func
(
c
JSONCodec
)
Decode
(
data
[]
byte
,
i
interface
{})
error
{
return
json
.
Unmarshal
(
data
,
i
)
}
// PBCodec uses protobuf marshaler and unmarshaler.
type
PBCodec
struct
{}
// Encode encodes an object into slice of bytes.
func
(
c
PBCodec
)
Encode
(
i
interface
{})
([]
byte
,
error
)
{
if
m
,
ok
:=
i
.
(
proto
.
Marshaler
);
ok
{
return
m
.
Marshal
()
}
if
m
,
ok
:=
i
.
(
pb
.
Message
);
ok
{
return
pb
.
Marshal
(
m
)
}
return
nil
,
fmt
.
Errorf
(
"%T is not a proto.Marshaler"
,
i
)
}
// Decode decodes an object from slice of bytes.
func
(
c
PBCodec
)
Decode
(
data
[]
byte
,
i
interface
{})
error
{
if
m
,
ok
:=
i
.
(
proto
.
Unmarshaler
);
ok
{
return
m
.
Unmarshal
(
data
)
}
if
m
,
ok
:=
i
.
(
pb
.
Message
);
ok
{
return
pb
.
Unmarshal
(
data
,
m
)
}
return
fmt
.
Errorf
(
"%T is not a proto.Unmarshaler"
,
i
)
}
////////////////////////////////////////////
// GettyPackageHandler
////////////////////////////////////////////
...
...
@@ -75,26 +142,25 @@ const (
var
(
ErrNotEnoughStream
=
jerrors
.
New
(
"packet stream is not enough"
)
ErrTooLargePackage
=
jerrors
.
New
(
"package length is exceed the getty package's legal maximum length."
)
ErrInvalidPackage
=
jerrors
.
New
(
"invalid rpc package"
)
ErrNotFoundServiceOrMethod
=
jerrors
.
New
(
"server invalid service or method"
)
ErrIllegalMagic
=
jerrors
.
New
(
"package magic is not right."
)
)
var
(
gettyPackageHeaderLen
int
gettyRPCRequestMinLen
int
gettyRPCResponseMinLen
int
gettyPackageHeaderLen
int
)
func
init
()
{
gettyPackageHeaderLen
=
(
int
)((
uint
)(
unsafe
.
Sizeof
(
GettyPackageHeader
{})))
gettyRPCRequestMinLen
=
(
int
)((
uint
)(
unsafe
.
Sizeof
(
GettyRPCRequestHeader
{})))
+
2
gettyRPCResponseMinLen
=
(
int
)((
uint
)(
unsafe
.
Sizeof
(
GettyRPCResponseHeader
{})))
+
2
}
type
RPCPackage
interface
{
Marshal
(
*
bytes
.
Buffer
)
error
Marshal
(
SerializeType
,
*
bytes
.
Buffer
)
(
int
,
error
)
// @buf length should be equal to GettyPkg.GettyPackageHeader.Len
Unmarshal
(
buf
*
bytes
.
Buffer
)
error
Unmarshal
(
sz
SerializeType
,
buf
*
bytes
.
Buffer
)
error
GetBody
()
[]
byte
GetHeader
()
interface
{}
}
type
GettyPackageHeader
struct
{
...
...
@@ -106,7 +172,7 @@ type GettyPackageHeader struct {
Code
GettyErrorCode
// error code
ServiceID
uint32
// service id
Len
uint32
// body length
CodecType
SerializeType
}
type
GettyPackage
struct
{
...
...
@@ -119,37 +185,55 @@ func (p GettyPackage) String() string {
p
.
H
.
LogID
,
p
.
H
.
Sequence
,
(
gettyCommand
(
p
.
H
.
Command
))
.
String
())
}
func
(
p
GettyPackage
)
Marshal
()
(
*
bytes
.
Buffer
,
error
)
{
func
(
p
*
GettyPackage
)
Marshal
()
(
*
bytes
.
Buffer
,
error
)
{
var
(
err
error
length
,
size
int
buf
,
buf0
*
bytes
.
Buffer
err
error
packLen
,
length
int
buf
*
bytes
.
Buffer
)
buf
=
&
bytes
.
Buffer
{}
err
=
binary
.
Write
(
buf
,
binary
.
LittleEndian
,
p
.
H
)
packLen
=
gettyPackageHeaderLen
if
p
.
B
!=
nil
{
buf
=
&
bytes
.
Buffer
{}
length
,
err
=
p
.
B
.
Marshal
(
p
.
H
.
CodecType
,
buf
)
if
err
!=
nil
{
return
nil
,
jerrors
.
Trace
(
err
)
}
packLen
=
gettyPackageHeaderLen
+
length
}
buf0
:=
&
bytes
.
Buffer
{}
err
=
binary
.
Write
(
buf0
,
binary
.
LittleEndian
,
uint16
(
packLen
))
if
err
!=
nil
{
return
nil
,
jerrors
.
Trace
(
err
)
}
err
=
binary
.
Write
(
buf0
,
binary
.
LittleEndian
,
p
.
H
)
if
err
!=
nil
{
return
nil
,
jerrors
.
Trace
(
err
)
}
if
p
.
B
!=
nil
{
if
err
=
p
.
B
.
Marshal
(
buf
);
err
!=
nil
{
if
err
=
binary
.
Write
(
buf0
,
binary
.
LittleEndian
,
buf
.
Bytes
()
);
err
!=
nil
{
return
nil
,
jerrors
.
Trace
(
err
)
}
// body length
length
=
buf
.
Len
()
-
gettyPackageHeaderLen
size
=
(
int
)((
uint
)(
unsafe
.
Sizeof
(
p
.
H
.
Len
)))
buf0
=
bytes
.
NewBuffer
(
buf
.
Bytes
()[
gettyPackageHeaderLen
-
size
:
size
])
binary
.
Write
(
buf0
,
binary
.
LittleEndian
,
length
)
}
return
buf
,
nil
return
buf0
,
nil
}
func
(
p
*
GettyPackage
)
Unmarshal
(
buf
*
bytes
.
Buffer
)
(
int
,
error
)
{
if
buf
.
Len
()
<
gettyPackageHeaderLen
{
var
err
error
if
buf
.
Len
()
<
2
+
gettyPackageHeaderLen
{
return
0
,
ErrNotEnoughStream
}
var
packLen
uint16
err
=
binary
.
Read
(
buf
,
binary
.
LittleEndian
,
&
packLen
)
if
err
!=
nil
{
return
0
,
jerrors
.
Trace
(
err
)
}
if
int
(
packLen
)
>
maxPackageLen
{
return
0
,
ErrTooLargePackage
}
if
int
(
packLen
)
<
gettyPackageHeaderLen
{
return
0
,
ErrInvalidPackage
}
// header
if
err
:=
binary
.
Read
(
buf
,
binary
.
LittleEndian
,
&
(
p
.
H
));
err
!=
nil
{
...
...
@@ -159,20 +243,14 @@ func (p *GettyPackage) Unmarshal(buf *bytes.Buffer) (int, error) {
log
.
Error
(
"@p.H.Magic{%x}, right magic{%x}"
,
p
.
H
.
Magic
,
gettyPackageMagic
)
return
0
,
ErrIllegalMagic
}
if
buf
.
Len
()
<
(
int
)(
p
.
H
.
Len
)
{
return
0
,
ErrNotEnoughStream
}
if
maxPackageLen
<
p
.
H
.
Len
{
return
0
,
ErrTooLargePackage
}
if
p
.
H
.
Len
!=
0
{
if
err
:=
p
.
B
.
Unmarshal
(
bytes
.
NewBuffer
(
buf
.
Next
(
int
(
p
.
H
.
Len
)
)));
err
!=
nil
{
if
int
(
packLen
)
>
gettyPackageHeaderLen
{
if
err
:=
p
.
B
.
Unmarshal
(
p
.
H
.
CodecType
,
bytes
.
NewBuffer
(
buf
.
Next
(
int
(
packLen
)
-
gettyPackageHeaderLen
)));
err
!=
nil
{
return
0
,
jerrors
.
Trace
(
err
)
}
}
return
(
int
)(
p
.
H
.
Len
)
+
gettyPackageHeaderLen
,
nil
return
int
(
packLen
)
,
nil
}
////////////////////////////////////////////
...
...
@@ -183,62 +261,63 @@ type GettyRPCHeaderLenType uint16
//easyjson:json
type
GettyRPCRequestHeader
struct
{
Service
string
`json:"service,omitempty"`
Method
string
`json:"method,omitempty"`
CallType
gettyCallType
`json:"call_type,omitempty"`
Service
string
Method
string
CallType
gettyCallType
}
type
GettyRPCRequest
struct
{
server
*
Server
header
GettyRPCRequestHeader
body
interface
{}
}
type
GettyRPCRequestPackage
struct
{
H
GettyPackageHeader
header
GettyRPCRequestHeader
body
interface
{}
service
*
service
methodType
*
methodType
argv
reflect
.
Value
replyv
reflect
.
Value
}
// json rpc stream format
// |-- 2B (GettyRPCRequestHeader length) --|-- GettyRPCRequestHeader --|-- rpc body --|
func
NewGettyRPCRequest
(
server
*
Server
)
*
GettyRPCRequest
{
return
&
GettyRPCRequest
{
server
:
server
,
}
func
NewGettyRPCRequest
()
RPCPackage
{
return
&
GettyRPCRequest
{}
}
func
(
req
*
GettyRPCRequest
)
Marshal
(
buf
*
bytes
.
Buffer
)
error
{
headerData
,
err
:=
req
.
header
.
MarshalJSON
()
func
(
req
*
GettyRPCRequest
)
Marshal
(
sz
SerializeType
,
buf
*
bytes
.
Buffer
)
(
int
,
error
)
{
codec
:=
Codecs
[
sz
]
if
codec
==
nil
{
return
0
,
jerrors
.
Errorf
(
"can not find codec for %d"
,
sz
)
}
headerData
,
err
:=
codec
.
Encode
(
req
.
header
)
if
err
!=
nil
{
return
jerrors
.
Trace
(
err
)
return
0
,
jerrors
.
Trace
(
err
)
}
bodyData
,
err
:=
json
.
Marshal
(
req
.
body
)
bodyData
,
err
:=
codec
.
Encode
(
req
.
body
)
if
err
!=
nil
{
return
jerrors
.
Trace
(
err
)
return
0
,
jerrors
.
Trace
(
err
)
}
err
=
binary
.
Write
(
buf
,
binary
.
LittleEndian
,
uint16
(
len
(
headerData
)))
if
err
!=
nil
{
return
jerrors
.
Trace
(
err
)
return
0
,
jerrors
.
Trace
(
err
)
}
err
=
binary
.
Write
(
buf
,
binary
.
LittleEndian
,
headerData
)
if
err
!=
nil
{
return
jerrors
.
Trace
(
err
)
return
0
,
jerrors
.
Trace
(
err
)
}
err
=
binary
.
Write
(
buf
,
binary
.
LittleEndian
,
uint16
(
len
(
bodyData
)))
if
err
!=
nil
{
return
0
,
jerrors
.
Trace
(
err
)
}
err
=
binary
.
Write
(
buf
,
binary
.
LittleEndian
,
bodyData
)
if
err
!=
nil
{
return
jerrors
.
Trace
(
err
)
return
0
,
jerrors
.
Trace
(
err
)
}
return
nil
return
2
+
len
(
headerData
)
+
2
+
len
(
bodyData
),
nil
}
// @buf length should be equal to GettyPkg.GettyPackageHeader.Len
func
(
req
*
GettyRPCRequest
)
Unmarshal
(
buf
*
bytes
.
Buffer
)
error
{
if
buf
.
Len
()
<
gettyRPCRequestMinLen
{
return
ErrNotEnoughStream
}
func
(
req
*
GettyRPCRequest
)
Unmarshal
(
sz
SerializeType
,
buf
*
bytes
.
Buffer
)
error
{
var
headerLen
uint16
err
:=
binary
.
Read
(
buf
,
binary
.
LittleEndian
,
&
headerLen
)
...
...
@@ -246,106 +325,152 @@ func (req *GettyRPCRequest) Unmarshal(buf *bytes.Buffer) error {
return
jerrors
.
Trace
(
err
)
}
header
:=
buf
.
Next
(
int
(
headerLen
))
body
:=
buf
.
Next
(
buf
.
Len
())
err
=
(
&
req
.
header
)
.
UnmarshalJSON
(
header
)
header
:=
make
([]
byte
,
headerLen
)
err
=
binary
.
Read
(
buf
,
binary
.
LittleEndian
,
header
)
if
err
!=
nil
{
return
jerrors
.
Trace
(
err
)
}
// get service & method
req
.
service
=
req
.
server
.
serviceMap
[
req
.
header
.
Service
]
if
req
.
service
!=
nil
{
re
q
.
methodType
=
req
.
service
.
method
[
req
.
header
.
Method
]
var
bodyLen
uint16
err
=
binary
.
Read
(
buf
,
binary
.
LittleEndian
,
&
bodyLen
)
if
err
!=
nil
{
re
turn
jerrors
.
Trace
(
err
)
}
if
req
.
service
==
nil
||
req
.
methodType
==
nil
{
return
ErrNotFoundServiceOrMethod
body
:=
make
([]
byte
,
bodyLen
)
err
=
binary
.
Read
(
buf
,
binary
.
LittleEndian
,
body
)
if
err
!=
nil
{
return
jerrors
.
Trace
(
err
)
}
// get args
argIsValue
:=
false
if
req
.
methodType
.
ArgType
.
Kind
()
==
reflect
.
Ptr
{
req
.
argv
=
reflect
.
New
(
req
.
methodType
.
ArgType
.
Elem
())
}
else
{
req
.
argv
=
reflect
.
New
(
req
.
methodType
.
ArgType
)
argIsValue
=
true
codec
:=
Codecs
[
sz
]
if
codec
==
nil
{
return
jerrors
.
Errorf
(
"can not find codec for %d"
,
sz
)
}
err
=
json
.
Unmarshal
(
body
,
req
.
argv
.
Interface
())
err
=
codec
.
Decode
(
header
,
&
req
.
header
)
if
err
!=
nil
{
return
jerrors
.
Trace
(
err
)
}
if
argIsValue
{
req
.
argv
=
req
.
argv
.
Elem
()
}
// get reply
req
.
replyv
=
reflect
.
New
(
req
.
methodType
.
ReplyType
.
Elem
())
req
.
body
=
body
return
nil
}
func
(
req
*
GettyRPCRequest
)
GetBody
()
[]
byte
{
return
req
.
body
.
([]
byte
)
}
func
(
req
*
GettyRPCRequest
)
GetHeader
()
interface
{}
{
return
req
.
header
}
////////////////////////////////////////////
// GettyRPCResponse
////////////////////////////////////////////
type
GettyRPCResponseHeader
struct
{
Error
string
`json:"error,omitempty"`
// error string
Error
string
}
type
GettyRPCResponse
struct
{
header
GettyRPCResponseHeader
`json:"header,omitempty"`
body
interface
{}
`json:"body,omitempty"`
header
GettyRPCResponseHeader
body
interface
{}
}
type
GettyRPCResponsePackage
struct
{
H
GettyPackageHeader
header
GettyRPCResponseHeader
body
[]
byte
}
func
NewGettyRPCResponse
()
RPCPackage
{
return
&
GettyRPCResponse
{}
}
func
(
resp
*
GettyRPCResponse
)
Marshal
(
buf
*
bytes
.
Buffer
)
error
{
headerData
,
err
:=
json
.
Marshal
(
resp
.
header
)
func
(
resp
*
GettyRPCResponse
)
Marshal
(
sz
SerializeType
,
buf
*
bytes
.
Buffer
)
(
int
,
error
)
{
codec
:=
Codecs
[
sz
]
if
codec
==
nil
{
return
0
,
jerrors
.
Errorf
(
"can not find codec for %d"
,
sz
)
}
headerData
,
err
:=
codec
.
Encode
(
resp
.
header
)
if
err
!=
nil
{
return
jerrors
.
Trace
(
err
)
return
0
,
jerrors
.
Trace
(
err
)
}
bodyData
,
err
:=
json
.
Marshal
(
resp
.
body
)
bodyData
,
err
:=
codec
.
Encode
(
resp
.
body
)
if
err
!=
nil
{
return
jerrors
.
Trace
(
err
)
return
0
,
jerrors
.
Trace
(
err
)
}
err
=
binary
.
Write
(
buf
,
binary
.
LittleEndian
,
(
GettyRPCHeaderLenType
)
(
len
(
headerData
)))
err
=
binary
.
Write
(
buf
,
binary
.
LittleEndian
,
uint16
(
len
(
headerData
)))
if
err
!=
nil
{
return
jerrors
.
Trace
(
err
)
return
0
,
jerrors
.
Trace
(
err
)
}
if
_
,
err
=
buf
.
Write
(
headerData
);
err
!=
nil
{
return
jerrors
.
Trace
(
err
)
err
=
binary
.
Write
(
buf
,
binary
.
LittleEndian
,
headerData
)
if
err
!=
nil
{
return
0
,
jerrors
.
Trace
(
err
)
}
if
_
,
err
=
buf
.
Write
(
bodyData
);
err
!=
nil
{
return
jerrors
.
Trace
(
err
)
err
=
binary
.
Write
(
buf
,
binary
.
LittleEndian
,
uint16
(
len
(
bodyData
)))
if
err
!=
nil
{
return
0
,
jerrors
.
Trace
(
err
)
}
err
=
binary
.
Write
(
buf
,
binary
.
LittleEndian
,
bodyData
)
if
err
!=
nil
{
return
0
,
jerrors
.
Trace
(
err
)
}
return
nil
return
2
+
len
(
headerData
)
+
2
+
len
(
bodyData
),
nil
}
// @buf length should be equal to GettyPkg.GettyPackageHeader.Len
func
(
resp
*
GettyRPCResponse
)
Unmarshal
(
buf
*
bytes
.
Buffer
)
error
{
if
buf
.
Len
()
<
gettyRPCResponseMinLen
{
return
ErrNotEnoughStream
}
func
(
resp
*
GettyRPCResponse
)
Unmarshal
(
sz
SerializeType
,
buf
*
bytes
.
Buffer
)
error
{
var
headerLen
GettyRPCHeaderLenType
var
headerLen
uint16
err
:=
binary
.
Read
(
buf
,
binary
.
LittleEndian
,
&
headerLen
)
if
err
!=
nil
{
return
jerrors
.
Trace
(
err
)
}
header
:=
buf
.
Next
(
int
(
headerLen
))
if
len
(
header
)
!=
int
(
headerLen
)
{
return
ErrNotEnoughStream
header
:=
make
([]
byte
,
headerLen
)
err
=
binary
.
Read
(
buf
,
binary
.
LittleEndian
,
header
)
if
err
!=
nil
{
return
jerrors
.
Trace
(
err
)
}
var
bodyLen
uint16
err
=
binary
.
Read
(
buf
,
binary
.
LittleEndian
,
&
bodyLen
)
if
err
!=
nil
{
return
jerrors
.
Trace
(
err
)
}
body
:=
make
([]
byte
,
bodyLen
)
err
=
binary
.
Read
(
buf
,
binary
.
LittleEndian
,
body
)
if
err
!=
nil
{
return
jerrors
.
Trace
(
err
)
}
codec
:=
Codecs
[
sz
]
if
codec
==
nil
{
return
jerrors
.
Errorf
(
"can not find codec for %d"
,
sz
)
}
resp
.
body
=
buf
.
Next
(
int
(
buf
.
Len
()))
err
=
json
.
Unmarshal
(
header
,
resp
.
header
)
err
=
codec
.
Decode
(
header
,
&
resp
.
header
)
if
err
!=
nil
{
return
jerrors
.
Trace
(
err
)
}
resp
.
body
=
body
return
nil
}
func
(
resp
*
GettyRPCResponse
)
GetBody
()
[]
byte
{
return
resp
.
body
.
([]
byte
)
}
func
(
resp
*
GettyRPCResponse
)
GetHeader
()
interface
{}
{
return
resp
.
header
}
////////////////////////////////////////////
// PendingResponse
////////////////////////////////////////////
...
...
rpc/codec_easyjson.go
deleted
100644 → 0
View file @
27a8e0c8
// Code generated by easyjson for marshaling/unmarshaling. DO NOT EDIT.
package
rpc
import
(
json
"encoding/json"
easyjson
"github.com/mailru/easyjson"
jlexer
"github.com/mailru/easyjson/jlexer"
jwriter
"github.com/mailru/easyjson/jwriter"
)
// suppress unused package warning
var
(
_
*
json
.
RawMessage
_
*
jlexer
.
Lexer
_
*
jwriter
.
Writer
_
easyjson
.
Marshaler
)
func
easyjson38c57360DecodeGithubComAlexStocksGettyRpc
(
in
*
jlexer
.
Lexer
,
out
*
GettyRPCRequestHeader
)
{
isTopLevel
:=
in
.
IsStart
()
if
in
.
IsNull
()
{
if
isTopLevel
{
in
.
Consumed
()
}
in
.
Skip
()
return
}
in
.
Delim
(
'{'
)
for
!
in
.
IsDelim
(
'}'
)
{
key
:=
in
.
UnsafeString
()
in
.
WantColon
()
if
in
.
IsNull
()
{
in
.
Skip
()
in
.
WantComma
()
continue
}
switch
key
{
case
"service"
:
out
.
Service
=
string
(
in
.
String
())
case
"method"
:
out
.
Method
=
string
(
in
.
String
())
case
"call_type"
:
out
.
CallType
=
gettyCallType
(
in
.
Uint32
())
default
:
in
.
SkipRecursive
()
}
in
.
WantComma
()
}
in
.
Delim
(
'}'
)
if
isTopLevel
{
in
.
Consumed
()
}
}
func
easyjson38c57360EncodeGithubComAlexStocksGettyRpc
(
out
*
jwriter
.
Writer
,
in
GettyRPCRequestHeader
)
{
out
.
RawByte
(
'{'
)
first
:=
true
_
=
first
if
in
.
Service
!=
""
{
const
prefix
string
=
",
\"
service
\"
:"
if
first
{
first
=
false
out
.
RawString
(
prefix
[
1
:
])
}
else
{
out
.
RawString
(
prefix
)
}
out
.
String
(
string
(
in
.
Service
))
}
if
in
.
Method
!=
""
{
const
prefix
string
=
",
\"
method
\"
:"
if
first
{
first
=
false
out
.
RawString
(
prefix
[
1
:
])
}
else
{
out
.
RawString
(
prefix
)
}
out
.
String
(
string
(
in
.
Method
))
}
if
in
.
CallType
!=
0
{
const
prefix
string
=
",
\"
call_type
\"
:"
if
first
{
first
=
false
out
.
RawString
(
prefix
[
1
:
])
}
else
{
out
.
RawString
(
prefix
)
}
out
.
Uint32
(
uint32
(
in
.
CallType
))
}
out
.
RawByte
(
'}'
)
}
// MarshalJSON supports json.Marshaler interface
func
(
v
GettyRPCRequestHeader
)
MarshalJSON
()
([]
byte
,
error
)
{
w
:=
jwriter
.
Writer
{}
easyjson38c57360EncodeGithubComAlexStocksGettyRpc
(
&
w
,
v
)
return
w
.
Buffer
.
BuildBytes
(),
w
.
Error
}
// MarshalEasyJSON supports easyjson.Marshaler interface
func
(
v
GettyRPCRequestHeader
)
MarshalEasyJSON
(
w
*
jwriter
.
Writer
)
{
easyjson38c57360EncodeGithubComAlexStocksGettyRpc
(
w
,
v
)
}
// UnmarshalJSON supports json.Unmarshaler interface
func
(
v
*
GettyRPCRequestHeader
)
UnmarshalJSON
(
data
[]
byte
)
error
{
r
:=
jlexer
.
Lexer
{
Data
:
data
}
easyjson38c57360DecodeGithubComAlexStocksGettyRpc
(
&
r
,
v
)
return
r
.
Error
()
}
// UnmarshalEasyJSON supports easyjson.Unmarshaler interface
func
(
v
*
GettyRPCRequestHeader
)
UnmarshalEasyJSON
(
l
*
jlexer
.
Lexer
)
{
easyjson38c57360DecodeGithubComAlexStocksGettyRpc
(
l
,
v
)
}
rpc/config.go
View file @
388d0a67
package
rpc
import
(
"fmt"
"time"
)
import
(
config
"github.com/koding/multiconfig"
)
type
(
GettySessionParam
struct
{
CompressEncoding
bool
`default:"false" yaml:"compress_encoding" json:"compress_encoding,omitempty"`
...
...
@@ -76,3 +81,70 @@ type (
GettySessionParam
GettySessionParam
`required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"`
}
)
func
loadClientConf
(
confFile
string
)
*
ClientConfig
{
var
err
error
conf
:=
new
(
ClientConfig
)
config
.
MustLoadWithPath
(
confFile
,
conf
)
conf
.
heartbeatPeriod
,
err
=
time
.
ParseDuration
(
conf
.
HeartbeatPeriod
)
if
err
!=
nil
{
panic
(
fmt
.
Sprintf
(
"time.ParseDuration(HeartbeatPeroid{%#v}) = error{%v}"
,
conf
.
HeartbeatPeriod
,
err
))
}
conf
.
sessionTimeout
,
err
=
time
.
ParseDuration
(
conf
.
SessionTimeout
)
if
err
!=
nil
{
panic
(
fmt
.
Sprintf
(
"time.ParseDuration(SessionTimeout{%#v}) = error{%v}"
,
conf
.
SessionTimeout
,
err
))
}
conf
.
failFastTimeout
,
err
=
time
.
ParseDuration
(
conf
.
FailFastTimeout
)
if
err
!=
nil
{
panic
(
fmt
.
Sprintf
(
"time.ParseDuration(FailFastTimeout{%#v}) = error{%v}"
,
conf
.
FailFastTimeout
,
err
))
}
conf
.
GettySessionParam
.
keepAlivePeriod
,
err
=
time
.
ParseDuration
(
conf
.
GettySessionParam
.
KeepAlivePeriod
)
if
err
!=
nil
{
panic
(
fmt
.
Sprintf
(
"time.ParseDuration(KeepAlivePeriod{%#v}) = error{%v}"
,
conf
.
GettySessionParam
.
KeepAlivePeriod
,
err
))
}
conf
.
GettySessionParam
.
tcpReadTimeout
,
err
=
time
.
ParseDuration
(
conf
.
GettySessionParam
.
TcpReadTimeout
)
if
err
!=
nil
{
panic
(
fmt
.
Sprintf
(
"time.ParseDuration(TcpReadTimeout{%#v}) = error{%v}"
,
conf
.
GettySessionParam
.
TcpReadTimeout
,
err
))
}
conf
.
GettySessionParam
.
tcpWriteTimeout
,
err
=
time
.
ParseDuration
(
conf
.
GettySessionParam
.
TcpWriteTimeout
)
if
err
!=
nil
{
panic
(
fmt
.
Sprintf
(
"time.ParseDuration(TcpWriteTimeout{%#v}) = error{%v}"
,
conf
.
GettySessionParam
.
TcpWriteTimeout
,
err
))
}
conf
.
GettySessionParam
.
waitTimeout
,
err
=
time
.
ParseDuration
(
conf
.
GettySessionParam
.
WaitTimeout
)
if
err
!=
nil
{
panic
(
fmt
.
Sprintf
(
"time.ParseDuration(WaitTimeout{%#v}) = error{%v}"
,
conf
.
GettySessionParam
.
WaitTimeout
,
err
))
}
return
conf
}
func
loadServerConf
(
confFile
string
)
*
ServerConfig
{
var
err
error
conf
:=
new
(
ServerConfig
)
config
.
MustLoadWithPath
(
confFile
,
conf
)
conf
.
sessionTimeout
,
err
=
time
.
ParseDuration
(
conf
.
SessionTimeout
)
if
err
!=
nil
{
panic
(
fmt
.
Sprintf
(
"time.ParseDuration(SessionTimeout{%#v}) = error{%v}"
,
conf
.
SessionTimeout
,
err
))
}
conf
.
failFastTimeout
,
err
=
time
.
ParseDuration
(
conf
.
FailFastTimeout
)
if
err
!=
nil
{
panic
(
fmt
.
Sprintf
(
"time.ParseDuration(FailFastTimeout{%#v}) = error{%v}"
,
conf
.
FailFastTimeout
,
err
))
}
conf
.
GettySessionParam
.
keepAlivePeriod
,
err
=
time
.
ParseDuration
(
conf
.
GettySessionParam
.
KeepAlivePeriod
)
if
err
!=
nil
{
panic
(
fmt
.
Sprintf
(
"time.ParseDuration(KeepAlivePeriod{%#v}) = error{%v}"
,
conf
.
GettySessionParam
.
KeepAlivePeriod
,
err
))
}
conf
.
GettySessionParam
.
tcpReadTimeout
,
err
=
time
.
ParseDuration
(
conf
.
GettySessionParam
.
TcpReadTimeout
)
if
err
!=
nil
{
panic
(
fmt
.
Sprintf
(
"time.ParseDuration(TcpReadTimeout{%#v}) = error{%v}"
,
conf
.
GettySessionParam
.
TcpReadTimeout
,
err
))
}
conf
.
GettySessionParam
.
tcpWriteTimeout
,
err
=
time
.
ParseDuration
(
conf
.
GettySessionParam
.
TcpWriteTimeout
)
if
err
!=
nil
{
panic
(
fmt
.
Sprintf
(
"time.ParseDuration(TcpWriteTimeout{%#v}) = error{%v}"
,
conf
.
GettySessionParam
.
TcpWriteTimeout
,
err
))
}
conf
.
GettySessionParam
.
waitTimeout
,
err
=
time
.
ParseDuration
(
conf
.
GettySessionParam
.
WaitTimeout
)
if
err
!=
nil
{
panic
(
fmt
.
Sprintf
(
"time.ParseDuration(WaitTimeout{%#v}) = error{%v}"
,
conf
.
GettySessionParam
.
WaitTimeout
,
err
))
}
return
conf
}
rpc/example/client/client.go
View file @
388d0a67
...
...
@@ -9,7 +9,8 @@ import (
)
func
main
()
{
client
:=
rpc
.
NewClient
()
log
.
LoadConfiguration
(
"client_log.xml"
)
client
:=
rpc
.
NewClient
(
"client_config.toml"
)
defer
client
.
Close
()
for
i
:=
0
;
i
<
100
;
i
++
{
...
...
rpc/example/client/client_config.toml
View file @
388d0a67
# toml configure file
# toml中key的首字母可以小写,但是对应的golang中的struct成员首字母必须大写
AppName
=
"
ECHO
-CLIENT"
AppName
=
"
RPC
-CLIENT"
# host
LocalHost
=
"127.0.0.1"
...
...
@@ -14,7 +14,7 @@ ProfilePort = 10080
# connection pool
# 连接池连接数目
ConnectionNum
=
2
ConnectionNum
=
10
# session
# client与server之间连接的心跳周期
...
...
@@ -22,12 +22,6 @@ HeartbeatPeriod = "10s"
# client与server之间连接的超时时间
SessionTimeout
=
"20s"
# client
# client echo request string
EchoString
=
"Hello, getty!"
# 发送echo请求次数
EchoTimes
=
10000
# app fail fast
FailFastTimeout
=
"3s"
...
...
@@ -45,4 +39,4 @@ FailFastTimeout = "3s"
TcpWriteTimeout
=
"5s"
WaitTimeout
=
"1s"
MaxMsgLen
=
128
SessionName
=
"
echo
-client"
SessionName
=
"
rpc
-client"
rpc/example/server/server.go
View file @
388d0a67
...
...
@@ -3,10 +3,12 @@ package main
import
(
"github.com/AlexStocks/getty/rpc"
"github.com/AlexStocks/getty/rpc/example/data"
log
"github.com/AlexStocks/log4go"
)
func
main
()
{
srv
:=
rpc
.
NewServer
()
log
.
LoadConfiguration
(
"server_log.xml"
)
srv
:=
rpc
.
NewServer
(
"server_config.toml"
)
srv
.
Register
(
new
(
data
.
TestRpc
))
srv
.
Run
()
}
rpc/example/server/server_config.toml
View file @
388d0a67
# toml configure file
# toml中key的首字母可以小写,但是对应的golang中的struct成员首字母必须大写
AppName
=
"
ECHO
-SERVER"
AppName
=
"
RPC
-SERVER"
Host
=
"127.0.0.1"
# Host = "192.168.35.1"
...
...
@@ -31,4 +31,4 @@ FailFastTimeout = "3s"
TcpWriteTimeout
=
"5s"
WaitTimeout
=
"1s"
MaxMsgLen
=
128
SessionName
=
"
echo
-server"
SessionName
=
"
rpc
-server"
rpc/listener.go
View file @
388d0a67
package
rpc
import
(
"encoding/json"
"reflect"
"sync"
"time"
...
...
@@ -14,11 +13,6 @@ import (
log
"github.com/AlexStocks/log4go"
)
const
(
CmdTypeErr
=
"err"
CmdTypeAck
=
"ack"
)
var
(
errTooManySessions
=
jerrors
.
New
(
"too many echo sessions"
)
)
...
...
@@ -37,8 +31,6 @@ type RpcServerHandler struct {
sessionTimeout
time
.
Duration
sessionMap
map
[
getty
.
Session
]
*
rpcSession
rwlock
sync
.
RWMutex
sendLock
sync
.
Mutex
}
func
NewRpcServerHandler
(
maxSessionNum
int
,
sessionTimeout
time
.
Duration
)
*
RpcServerHandler
{
...
...
@@ -88,29 +80,23 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
}
h
.
rwlock
.
Unlock
()
p
,
ok
:=
pkg
.
(
*
Getty
Package
)
req
,
ok
:=
pkg
.
(
GettyRPCRequest
Package
)
if
!
ok
{
log
.
Error
(
"illegal packge{%#v}"
,
pkg
)
return
}
req
,
ok
:=
p
.
B
.
(
*
GettyRPCRequest
)
if
!
ok
{
log
.
Error
(
"illegal request{%#v}"
,
p
.
B
)
// heartbeat
if
req
.
H
.
Command
==
gettyCmdHbRequest
{
h
.
replyCmd
(
session
,
req
,
gettyCmdHbResponse
,
""
)
return
}
if
p
.
H
.
Command
==
gettyCmdHbRequest
{
h
.
replyCmd
(
session
,
p
,
gettyCmdHbResponse
,
""
)
return
}
if
req
.
header
.
CallType
==
gettyTwoWayNoReply
{
h
.
replyCmd
(
session
,
p
,
gettyCmdRPCResponse
,
""
)
h
.
replyCmd
(
session
,
req
,
gettyCmdRPCResponse
,
""
)
function
:=
req
.
methodType
.
method
.
Func
function
.
Call
([]
reflect
.
Value
{
req
.
service
.
rcvr
,
req
.
argv
,
req
.
replyv
})
return
}
h
.
callService
(
session
,
p
,
req
.
service
,
req
.
methodType
,
req
.
argv
,
req
.
replyv
)
h
.
callService
(
session
,
req
,
req
.
service
,
req
.
methodType
,
req
.
argv
,
req
.
replyv
)
}
func
(
h
*
RpcServerHandler
)
OnCron
(
session
getty
.
Session
)
{
...
...
@@ -138,45 +124,44 @@ func (h *RpcServerHandler) OnCron(session getty.Session) {
}
}
func
(
h
*
RpcServerHandler
)
replyCmd
(
session
getty
.
Session
,
reqPkg
*
GettyPackage
,
cmd
gettyCommand
,
err
string
)
{
rspPkg
:=
*
reqPkg
rspPkg
.
H
.
Code
=
0
rspPkg
.
H
.
Command
=
gettyCmdRPCResponse
func
(
h
*
RpcServerHandler
)
replyCmd
(
session
getty
.
Session
,
req
GettyRPCRequestPackage
,
cmd
gettyCommand
,
err
string
)
{
resp
:=
GettyPackage
{
H
:
req
.
H
,
}
resp
.
H
.
Command
=
cmd
if
len
(
err
)
!=
0
{
r
spPkg
.
H
.
Code
=
GettyFail
r
spPkg
.
B
=
&
GettyRPCResponse
{
r
esp
.
H
.
Code
=
GettyFail
r
esp
.
B
=
&
GettyRPCResponse
{
header
:
GettyRPCResponseHeader
{
Error
:
err
,
},
}
}
h
.
sendLock
.
Lock
()
defer
h
.
sendLock
.
Unlock
()
session
.
WritePkg
(
&
rspPkg
,
0
)
session
.
WritePkg
(
resp
,
5
*
time
.
Second
)
}
func
(
h
*
RpcServerHandler
)
callService
(
session
getty
.
Session
,
req
Pkg
*
GettyPackage
,
service
*
servic
e
,
methodType
*
methodType
,
argv
,
replyv
reflect
.
Value
)
{
func
(
h
*
RpcServerHandler
)
callService
(
session
getty
.
Session
,
req
GettyRPCRequestPackag
e
,
service
*
service
,
methodType
*
methodType
,
argv
,
replyv
reflect
.
Value
)
{
function
:=
methodType
.
method
.
Func
returnValues
:=
function
.
Call
([]
reflect
.
Value
{
service
.
rcvr
,
argv
,
replyv
})
errInter
:=
returnValues
[
0
]
.
Interface
()
if
errInter
!=
nil
{
h
.
replyCmd
(
session
,
req
Pkg
,
gettyCmdRPCResponse
,
errInter
.
(
error
)
.
Error
())
h
.
replyCmd
(
session
,
req
,
gettyCmdRPCResponse
,
errInter
.
(
error
)
.
Error
())
return
}
rspPkg
:=
*
reqPkg
rspPkg
.
H
.
Code
=
0
rspPkg
.
H
.
Command
=
gettyCmdRPCResponse
rspPkg
.
B
=
&
GettyRPCResponse
{
resp
:=
GettyPackage
{
H
:
req
.
H
,
}
resp
.
H
.
Code
=
GettyOK
resp
.
H
.
Command
=
gettyCmdRPCResponse
resp
.
B
=
&
GettyRPCResponse
{
body
:
replyv
.
Interface
(),
}
h
.
sendLock
.
Lock
()
defer
h
.
sendLock
.
Unlock
()
session
.
WritePkg
(
&
rspPkg
,
0
)
session
.
WritePkg
(
resp
,
5
*
time
.
Second
)
}
////////////////////////////////////////////
...
...
@@ -207,7 +192,7 @@ func (h *RpcClientHandler) OnClose(session getty.Session) {
}
func
(
h
*
RpcClientHandler
)
OnMessage
(
session
getty
.
Session
,
pkg
interface
{})
{
p
,
ok
:=
pkg
.
(
*
GettyPackage
)
p
,
ok
:=
pkg
.
(
*
Getty
RPCResponse
Package
)
if
!
ok
{
log
.
Error
(
"illegal packge{%#v}"
,
pkg
)
return
...
...
@@ -216,24 +201,28 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
h
.
client
.
updateSession
(
session
)
pendingResponse
:=
h
.
client
.
RemovePendingResponse
(
p
.
H
.
Sequence
)
if
p
.
H
.
Command
==
gettyCmdHbResponse
{
if
p
endingResponse
==
nil
{
return
}
if
p
.
B
==
nil
{
log
.
Error
(
"response:{%#v} body is nil"
,
p
)
if
p
.
H
.
Command
==
gettyCmdHbResponse
{
return
}
rsp
,
ok
:=
p
.
B
.
(
*
GettyRPCResponse
)
if
!
ok
{
log
.
Error
(
"response body:{%#v} type is not *GettyRPCResponse"
,
p
.
B
)
if
p
.
H
.
Code
==
GettyFail
&&
len
(
p
.
header
.
Error
)
>
0
{
pendingResponse
.
err
=
jerrors
.
New
(
p
.
header
.
Error
)
pendingResponse
.
done
<-
struct
{}{}
return
}
if
p
.
H
.
Code
==
GettyFail
&&
len
(
rsp
.
header
.
Error
)
>
0
{
pendingResponse
.
err
=
jerrors
.
New
(
rsp
.
header
.
Error
)
codec
:=
Codecs
[
p
.
H
.
CodecType
]
if
codec
==
nil
{
pendingResponse
.
err
=
jerrors
.
Errorf
(
"can not find codec for %d"
,
p
.
H
.
CodecType
)
pendingResponse
.
done
<-
struct
{}{}
return
}
err
:=
json
.
Unmarshal
(
rsp
.
body
.
([]
byte
)
,
pendingResponse
.
reply
)
err
:=
codec
.
Decode
(
p
.
body
,
pendingResponse
.
reply
)
if
err
!=
nil
{
pendingResponse
.
err
=
err
pendingResponse
.
done
<-
struct
{}{}
return
}
pendingResponse
.
done
<-
struct
{}{}
}
...
...
@@ -251,5 +240,5 @@ func (h *RpcClientHandler) OnCron(session getty.Session) {
return
}
h
.
client
.
ping
(
session
)
h
.
client
.
heartbeat
(
session
)
}
rpc/readwriter.go
View file @
388d0a67
...
...
@@ -2,11 +2,12 @@ package rpc
import
(
"bytes"
)
"reflect"
import
(
"github.com/AlexStocks/getty"
log
"github.com/AlexStocks/log4go"
jerrors
"github.com/juju/errors"
)
...
...
@@ -25,7 +26,9 @@ func NewRpcServerPackageHandler(server *Server) *RpcServerPackageHandler {
}
func
(
p
*
RpcServerPackageHandler
)
Read
(
ss
getty
.
Session
,
data
[]
byte
)
(
interface
{},
int
,
error
)
{
var
pkg
GettyPackage
pkg
:=
&
GettyPackage
{
B
:
NewGettyRPCRequest
(),
}
buf
:=
bytes
.
NewBuffer
(
data
)
length
,
err
:=
pkg
.
Unmarshal
(
buf
)
...
...
@@ -36,11 +39,48 @@ func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface
return
nil
,
0
,
jerrors
.
Trace
(
err
)
}
return
&
pkg
,
length
,
nil
req
:=
GettyRPCRequestPackage
{
H
:
pkg
.
H
,
header
:
pkg
.
B
.
GetHeader
()
.
(
GettyRPCRequestHeader
),
}
if
req
.
H
.
Command
==
gettyCmdHbRequest
{
return
req
,
length
,
nil
}
// get service & method
req
.
service
=
p
.
server
.
serviceMap
[
req
.
header
.
Service
]
if
req
.
service
!=
nil
{
req
.
methodType
=
req
.
service
.
method
[
req
.
header
.
Method
]
}
if
req
.
service
==
nil
||
req
.
methodType
==
nil
{
return
nil
,
0
,
ErrNotFoundServiceOrMethod
}
// get args
argIsValue
:=
false
if
req
.
methodType
.
ArgType
.
Kind
()
==
reflect
.
Ptr
{
req
.
argv
=
reflect
.
New
(
req
.
methodType
.
ArgType
.
Elem
())
}
else
{
req
.
argv
=
reflect
.
New
(
req
.
methodType
.
ArgType
)
argIsValue
=
true
}
codec
:=
Codecs
[
req
.
H
.
CodecType
]
if
codec
==
nil
{
return
nil
,
0
,
jerrors
.
Errorf
(
"can not find codec for %d"
,
req
.
H
.
CodecType
)
}
err
=
codec
.
Decode
(
pkg
.
B
.
GetBody
(),
req
.
argv
.
Interface
())
if
err
!=
nil
{
return
nil
,
0
,
jerrors
.
Trace
(
err
)
}
if
argIsValue
{
req
.
argv
=
req
.
argv
.
Elem
()
}
// get reply
req
.
replyv
=
reflect
.
New
(
req
.
methodType
.
ReplyType
.
Elem
())
return
req
,
length
,
nil
}
func
(
p
*
RpcServerPackageHandler
)
Write
(
ss
getty
.
Session
,
pkg
interface
{})
error
{
resp
,
ok
:=
pkg
.
(
*
GettyPackage
)
resp
,
ok
:=
pkg
.
(
GettyPackage
)
if
!
ok
{
log
.
Error
(
"illegal pkg:%+v
\n
"
,
pkg
)
return
jerrors
.
New
(
"invalid rpc response"
)
...
...
@@ -67,7 +107,9 @@ func NewRpcClientPackageHandler() *RpcClientPackageHandler {
}
func
(
p
*
RpcClientPackageHandler
)
Read
(
ss
getty
.
Session
,
data
[]
byte
)
(
interface
{},
int
,
error
)
{
var
pkg
GettyPackage
pkg
:=
&
GettyPackage
{
B
:
NewGettyRPCResponse
(),
}
buf
:=
bytes
.
NewBuffer
(
data
)
length
,
err
:=
pkg
.
Unmarshal
(
buf
)
...
...
@@ -78,11 +120,16 @@ func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface
return
nil
,
0
,
jerrors
.
Trace
(
err
)
}
return
&
pkg
,
length
,
nil
resp
:=
&
GettyRPCResponsePackage
{
H
:
pkg
.
H
,
header
:
pkg
.
B
.
GetHeader
()
.
(
GettyRPCResponseHeader
),
body
:
pkg
.
B
.
GetBody
(),
}
return
resp
,
length
,
nil
}
func
(
p
*
RpcClientPackageHandler
)
Write
(
ss
getty
.
Session
,
pkg
interface
{})
error
{
req
,
ok
:=
pkg
.
(
*
GettyPackage
)
req
,
ok
:=
pkg
.
(
GettyPackage
)
if
!
ok
{
log
.
Error
(
"illegal pkg:%+v
\n
"
,
pkg
)
return
jerrors
.
New
(
"invalid rpc request"
)
...
...
rpc/rpc.go
View file @
388d0a67
...
...
@@ -20,7 +20,6 @@ type methodType struct {
method
reflect
.
Method
ArgType
reflect
.
Type
ReplyType
reflect
.
Type
numCalls
uint
}
type
service
struct
{
...
...
rpc/server.go
View file @
388d0a67
...
...
@@ -24,7 +24,8 @@ type Server struct {
tcpServerList
[]
getty
.
Server
}
func
NewServer
(
conf
*
ServerConfig
)
*
Server
{
func
NewServer
(
confFile
string
)
*
Server
{
conf
:=
loadServerConf
(
confFile
)
s
:=
&
Server
{
serviceMap
:
make
(
map
[
string
]
*
service
),
conf
:
conf
,
...
...
rpc/util.go
0 → 100644
View file @
388d0a67
package
rpc
import
(
"math/rand"
"sync"
"time"
)
var
(
seededIDGen
=
rand
.
New
(
rand
.
NewSource
(
time
.
Now
()
.
UnixNano
()))
// The golang rand generators are *not* intrinsically thread-safe.
seededIDLock
sync
.
Mutex
)
func
randomID
()
uint64
{
seededIDLock
.
Lock
()
defer
seededIDLock
.
Unlock
()
return
uint64
(
seededIDGen
.
Int63
())
}
session.go
View file @
388d0a67
...
...
@@ -74,6 +74,8 @@ type session struct {
// goroutines sync
grNum
int32
lock
sync
.
RWMutex
pool
*
Pool
}
func
newSession
(
endPoint
EndPoint
,
conn
Connection
)
*
session
{
...
...
@@ -245,6 +247,7 @@ func (s *session) SetRQLen(readQLen int) {
s
.
lock
.
Lock
()
s
.
rQ
=
make
(
chan
interface
{},
readQLen
)
log
.
Info
(
"%s, [session.SetRQLen] rQ{len:%d, cap:%d}"
,
s
.
Stat
(),
len
(
s
.
rQ
),
cap
(
s
.
rQ
))
s
.
pool
=
NewPool
(
readQLen
/
2
,
2
,
1
)
s
.
lock
.
Unlock
()
}
...
...
@@ -485,7 +488,10 @@ LOOP:
// read the s.rQ and assure (session)handlePackage gr will not block by (session)rQ.
if
flag
{
log
.
Debug
(
"%#v <-s.rQ"
,
inPkg
)
s
.
listener
.
OnMessage
(
s
,
inPkg
)
pkg
:=
inPkg
s
.
pool
.
ScheduleTimeout
(
s
.
wait
,
func
()
{
s
.
listener
.
OnMessage
(
s
,
pkg
)
})
s
.
incReadPkgNum
()
}
else
{
log
.
Info
(
"[session.handleLoop] drop readin package{%#v}"
,
inPkg
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment