Commit 4fb3660f authored by hudangwei's avatar hudangwei

add rpc

parent 592b383b
# rpc
\ No newline at end of file
package rpc
import (
"errors"
"fmt"
"math/rand"
"net"
"sync"
"sync/atomic"
"time"
"github.com/AlexStocks/getty"
"github.com/AlexStocks/goext/net"
log "github.com/AlexStocks/log4go"
)
var (
errInvalidAddress = errors.New("remote address invalid or empty")
errSessionNotExist = errors.New("session not exist")
)
func init() {
rand.Seed(time.Now().UnixNano())
}
type Client struct {
lock sync.RWMutex
sessions []*rpcSession
gettyClient getty.Client
sequence uint64
pendingLock sync.RWMutex
pendingResponses map[uint64]*PendingResponse
sendLock sync.Mutex
}
func NewClient() *Client {
c := &Client{
pendingResponses: make(map[uint64]*PendingResponse),
}
c.Init()
return c
}
func (c *Client) Init() {
initConf(defaultClientConfFile)
initLog(defaultClientLogConfFile)
initProfiling()
c.gettyClient = getty.NewTCPClient(
getty.WithServerAddress(gxnet.HostAddress(conf.ServerHost, conf.ServerPort)),
getty.WithConnectionNumber((int)(conf.ConnectionNum)),
)
c.gettyClient.RunEventLoop(c.newSession)
for {
if c.isAvailable() {
break
}
time.Sleep(1e6)
}
log.Info("client init ok")
}
func (c *Client) newSession(session getty.Session) error {
var (
ok bool
tcpConn *net.TCPConn
)
if conf.GettySessionParam.CompressEncoding {
session.SetCompressType(getty.CompressZip)
}
if tcpConn, ok = session.Conn().(*net.TCPConn); !ok {
panic(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection\n", session.Stat(), session.Conn()))
}
tcpConn.SetNoDelay(conf.GettySessionParam.TcpNoDelay)
tcpConn.SetKeepAlive(conf.GettySessionParam.TcpKeepAlive)
if conf.GettySessionParam.TcpKeepAlive {
tcpConn.SetKeepAlivePeriod(conf.GettySessionParam.keepAlivePeriod)
}
tcpConn.SetReadBuffer(conf.GettySessionParam.TcpRBufSize)
tcpConn.SetWriteBuffer(conf.GettySessionParam.TcpWBufSize)
session.SetName(conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewRpcClientPacketHandler()) //
session.SetEventListener(NewRpcClientHandler(c)) //
session.SetRQLen(conf.GettySessionParam.PkgRQSize)
session.SetWQLen(conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)
session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(conf.heartbeatPeriod.Nanoseconds() / 1e6))
session.SetWaitTime(conf.GettySessionParam.waitTimeout)
log.Debug("client new session:%s\n", session.Stat())
return nil
}
func (c *Client) Sequence() uint64 {
return atomic.AddUint64(&c.sequence, 1)
}
func (c *Client) Call(service, method string, args interface{}, reply interface{}) error {
req := NewRpcRequest(nil)
req.header.Service = service
req.header.Method = method
if reply == nil {
req.header.CallType = RequestSendOnly
}
req.body = args
resp := NewPendingResponse()
resp.reply = reply
session := c.selectSession()
if session != nil {
if err := c.transfer(session, req, resp); err != nil {
return err
}
<-resp.done
return resp.err
}
return errSessionNotExist
}
func (c *Client) isAvailable() bool {
if c.selectSession() == nil {
return false
}
return true
}
func (c *Client) Close() {
c.lock.Lock()
if c.gettyClient != nil {
for _, s := range c.sessions {
log.Info("close client session{%s, last active:%s, request number:%d}",
s.session.Stat(), s.session.GetActive().String(), s.reqNum)
s.session.Close()
}
c.gettyClient.Close()
c.gettyClient = nil
c.sessions = c.sessions[:0]
}
c.lock.Unlock()
}
func (c *Client) selectSession() getty.Session {
c.lock.RLock()
defer c.lock.RUnlock()
count := len(c.sessions)
if count == 0 {
return nil
}
return c.sessions[rand.Int31n(int32(count))].session
}
func (c *Client) addSession(session getty.Session) {
log.Debug("add session{%s}", session.Stat())
if session == nil {
return
}
c.lock.Lock()
c.sessions = append(c.sessions, &rpcSession{session: session})
c.lock.Unlock()
}
func (c *Client) removeSession(session getty.Session) {
if session == nil {
return
}
c.lock.Lock()
for i, s := range c.sessions {
if s.session == session {
c.sessions = append(c.sessions[:i], c.sessions[i+1:]...)
log.Debug("delete session{%s}, its index{%d}", session.Stat(), i)
break
}
}
log.Info("after remove session{%s}, left session number:%d", session.Stat(), len(c.sessions))
c.lock.Unlock()
}
func (c *Client) updateSession(session getty.Session) {
if session == nil {
return
}
c.lock.Lock()
for i, s := range c.sessions {
if s.session == session {
c.sessions[i].reqNum++
break
}
}
c.lock.Unlock()
}
func (c *Client) getClientRpcSession(session getty.Session) (rpcSession, error) {
var (
err error
rpcSession rpcSession
)
c.lock.Lock()
err = errSessionNotExist
for _, s := range c.sessions {
if s.session == session {
rpcSession = *s
err = nil
break
}
}
c.lock.Unlock()
return rpcSession, err
}
func (c *Client) ping(session getty.Session) error {
req := NewRpcRequest(nil)
req.header.Service = "go"
req.header.Method = "ping"
req.header.CallType = RequestSendOnly
req.body = nil
resp := NewPendingResponse()
return c.transfer(session, req, resp)
}
func (c *Client) transfer(session getty.Session, req *RpcRequest, resp *PendingResponse) error {
var (
sequence uint64
err error
)
sequence = c.Sequence()
req.header.Seq = sequence
resp.seq = sequence
c.AddPendingResponse(resp)
c.sendLock.Lock()
defer c.sendLock.Unlock()
err = session.WritePkg(req, 0)
if err != nil {
c.RemovePendingResponse(resp.seq)
}
return err
}
func (c *Client) PendingResponseCount() int {
c.pendingLock.RLock()
defer c.pendingLock.RUnlock()
return len(c.pendingResponses)
}
func (c *Client) AddPendingResponse(pr *PendingResponse) {
c.pendingLock.Lock()
defer c.pendingLock.Unlock()
c.pendingResponses[pr.seq] = pr
}
func (c *Client) RemovePendingResponse(seq uint64) *PendingResponse {
c.pendingLock.Lock()
defer c.pendingLock.Unlock()
if c.pendingResponses == nil {
return nil
}
if presp, ok := c.pendingResponses[seq]; ok {
delete(c.pendingResponses, seq)
return presp
}
return nil
}
func (c *Client) ClearPendingResponses() map[uint64]*PendingResponse {
c.pendingLock.Lock()
defer c.pendingLock.Unlock()
presps := c.pendingResponses
c.pendingResponses = nil
return presps
}
package rpc
import (
"fmt"
"path"
"time"
)
import (
log "github.com/AlexStocks/log4go"
config "github.com/koding/multiconfig"
)
const (
defaultClientConfFile string = "client_config.toml"
defaultClientLogConfFile string = "client_log.xml"
defaultServerConfFile string = "server_config.toml"
defaultServerLogConfFile string = "server_log.xml"
)
var (
conf *Config
)
type (
GettySessionParam struct {
CompressEncoding bool `default:"false"`
TcpNoDelay bool `default:"true"`
TcpKeepAlive bool `default:"true"`
KeepAlivePeriod string `default:"180s"`
keepAlivePeriod time.Duration
TcpRBufSize int `default:"262144"`
TcpWBufSize int `default:"65536"`
PkgRQSize int `default:"1024"`
PkgWQSize int `default:"1024"`
TcpReadTimeout string `default:"1s"`
tcpReadTimeout time.Duration
TcpWriteTimeout string `default:"5s"`
tcpWriteTimeout time.Duration
WaitTimeout string `default:"7s"`
waitTimeout time.Duration
MaxMsgLen int `default:"1024"`
SessionName string `default:"echo-client"`
}
// Config holds supported types by the multiconfig package
Config struct {
// local address
AppName string `default:"echo-server"`
Host string `default:"127.0.0.1"`
Ports []string `default:["10000"]`
// server
ServerHost string `default:"127.0.0.1"`
ServerPort int `default:"10000"`
ProfilePort int `default:"10086"`
// session pool
ConnectionNum int `default:"16"`
// heartbeat
HeartbeatPeriod string `default:"15s"`
heartbeatPeriod time.Duration
// session
SessionTimeout string `default:"60s"`
sessionTimeout time.Duration
SessionNumber int `default:"1000"`
// app
FailFastTimeout string `default:"5s"`
failFastTimeout time.Duration
// session tcp parameters
GettySessionParam GettySessionParam `required:"true"`
}
)
func initConf(confFile string) {
var err error
if path.Ext(confFile) != ".toml" {
panic(fmt.Sprintf("application configure file name{%v} suffix must be .toml", confFile))
}
conf = new(Config)
config.MustLoadWithPath(confFile, conf)
conf.heartbeatPeriod, err = time.ParseDuration(conf.HeartbeatPeriod)
if err != nil {
panic(fmt.Sprintf("time.ParseDuration(HeartbeatPeroid{%#v}) = error{%v}", conf.HeartbeatPeriod, err))
}
conf.sessionTimeout, err = time.ParseDuration(conf.SessionTimeout)
if err != nil {
panic(fmt.Sprintf("time.ParseDuration(SessionTimeout{%#v}) = error{%v}", conf.SessionTimeout, err))
}
conf.failFastTimeout, err = time.ParseDuration(conf.FailFastTimeout)
if err != nil {
panic(fmt.Sprintf("time.ParseDuration(FailFastTimeout{%#v}) = error{%v}", conf.FailFastTimeout, err))
}
conf.GettySessionParam.keepAlivePeriod, err = time.ParseDuration(conf.GettySessionParam.KeepAlivePeriod)
if err != nil {
panic(fmt.Sprintf("time.ParseDuration(KeepAlivePeriod{%#v}) = error{%v}", conf.GettySessionParam.KeepAlivePeriod, err))
}
conf.GettySessionParam.tcpReadTimeout, err = time.ParseDuration(conf.GettySessionParam.TcpReadTimeout)
if err != nil {
panic(fmt.Sprintf("time.ParseDuration(TcpReadTimeout{%#v}) = error{%v}", conf.GettySessionParam.TcpReadTimeout, err))
}
conf.GettySessionParam.tcpWriteTimeout, err = time.ParseDuration(conf.GettySessionParam.TcpWriteTimeout)
if err != nil {
panic(fmt.Sprintf("time.ParseDuration(TcpWriteTimeout{%#v}) = error{%v}", conf.GettySessionParam.TcpWriteTimeout, err))
}
conf.GettySessionParam.waitTimeout, err = time.ParseDuration(conf.GettySessionParam.WaitTimeout)
if err != nil {
panic(fmt.Sprintf("time.ParseDuration(WaitTimeout{%#v}) = error{%v}", conf.GettySessionParam.WaitTimeout, err))
}
return
}
func initLog(logFile string) {
if path.Ext(logFile) != ".xml" {
panic(fmt.Sprintf("log configure file name{%v} suffix must be .xml", logFile))
}
log.LoadConfiguration(logFile)
log.Info("config{%#v}", conf)
}
package main
import (
"time"
"github.com/AlexStocks/getty/rpc"
"github.com/AlexStocks/getty/rpc/example/data"
log "github.com/AlexStocks/log4go"
)
func main() {
client := rpc.NewClient()
defer client.Close()
for i := 0; i < 100; i++ {
go func() {
var res string
err := client.Call("TestRpc", "Test", data.TestABC{"aaa", "bbb", "ccc"}, &res)
if err != nil {
log.Error(err)
return
}
log.Info(res)
}()
}
for i := 0; i < 100; i++ {
go func() {
var result int
err := client.Call("TestRpc", "Add", 1, &result)
if err != nil {
log.Error(err)
return
}
log.Info(result)
}()
}
var errInt int
err := client.Call("TestRpc", "Err", 2, &errInt)
if err != nil {
log.Error(err)
}
time.Sleep(20 * time.Second)
}
# toml configure file
# toml中key的首字母可以小写,但是对应的golang中的struct成员首字母必须大写
AppName = "ECHO-CLIENT"
# host
LocalHost = "127.0.0.1"
# server
# ServerHost = "192.168.8.3"
ServerHost = "127.0.0.1"
ServerPort = 10000
ProfilePort = 10080
# connection pool
# 连接池连接数目
ConnectionNum = 2
# session
# client与server之间连接的心跳周期
HeartbeatPeriod = "10s"
# client与server之间连接的超时时间
SessionTimeout = "20s"
# client
# client echo request string
EchoString = "Hello, getty!"
# 发送echo请求次数
EchoTimes = 10000
# app fail fast
FailFastTimeout = "3s"
# tcp
[GettySessionParam]
CompressEncoding = true
TcpNoDelay = true
TcpKeepAlive = true
KeepAlivePeriod = "120s"
TcpRBufSize = 262144
TcpWBufSize = 65536
PkgRQSize = 512
PkgWQSize = 256
TcpReadTimeout = "1s"
TcpWriteTimeout = "5s"
WaitTimeout = "1s"
MaxMsgLen = 128
SessionName = "echo-client"
<logging>
<filter enabled="true">
<tag>stdout</tag>
<type>console</type>
<!-- level is (:?FINEST|FINE|DEBUG|TRACE|INFO|WARNING|ERROR) -->
<level>DEBUG</level>
</filter>
<filter enabled="false">
<tag>debug_file</tag>
<type>file</type>
<level>DEBUG</level>
<property name="filename">logs/debug.log</property>
<property name="format">[%D %T] [%L] [%S] %M</property>
<property name="rotate">true</property> <!-- true enables log rotation, otherwise append -->
<property name="maxsize">0M</property> <!-- \d+[KMG]? Suffixes are in terms of 2**10 -->
<property name="maxlines">0K</property> <!-- \d+[KMG]? Suffixes are in terms of thousands -->
<property name="daily">true</property> <!-- Automatically rotates when a log message is written after midnight -->
</filter>
<filter enabled="true">
<tag>info_file</tag>
<type>file</type>
<level>INFO</level>
<property name="filename">logs/info.log</property>
<!--
%T - Time (15:04:05 MST)
%t - Time (15:04)
%D - Date (2006/01/02)
%d - Date (01/02/06)
%L - Level (FNST, FINE, DEBG, TRAC, WARN, EROR, CRIT)
%S - Source
%M - Message
It ignores unknown format strings (and removes them)
Recommended: "[%D %T] [%L] (%S) %M"
-->
<property name="format">[%D %T] [%L] [%S] %M</property>
<property name="rotate">true</property> <!-- true enables log rotation, otherwise append -->
<property name="maxsize">0M</property> <!-- \d+[KMG]? Suffixes are in terms of 2**10 -->
<property name="maxlines">0K</property> <!-- \d+[KMG]? Suffixes are in terms of thousands -->
<property name="daily">true</property> <!-- Automatically rotates when a log message is written after midnight -->
</filter>
<filter enabled="true">
<tag>warn_file</tag>
<type>file</type>
<level>WARNING</level>
<property name="filename">logs/warn.log</property>
<property name="format">[%D %T] [%L] [%S] %M</property>
<property name="rotate">true</property> <!-- true enables log rotation, otherwise append -->
<property name="maxsize">0M</property> <!-- \d+[KMG]? Suffixes are in terms of 2**10 -->
<property name="maxlines">0K</property> <!-- \d+[KMG]? Suffixes are in terms of thousands -->
<property name="daily">true</property> <!-- Automatically rotates when a log message is written after midnight -->
</filter>
<filter enabled="true">
<tag>error_file</tag>
<type>file</type>
<level>ERROR</level>
<property name="filename">logs/error.log</property>
<property name="format">[%D %T] [%L] [%S] %M</property>
<property name="rotate">true</property> <!-- true enables log rotation, otherwise append -->
<property name="maxsize">0M</property> <!-- \d+[KMG]? Suffixes are in terms of 2**10 -->
<property name="maxlines">0K</property> <!-- \d+[KMG]? Suffixes are in terms of thousands -->
<property name="daily">true</property> <!-- Automatically rotates when a log message is written after midnight -->
</filter>
</logging>
package data
import (
"errors"
log "github.com/AlexStocks/log4go"
)
type TestABC struct {
A, B, C string
}
type TestRpc struct {
i int
}
func (r *TestRpc) Test(arg TestABC, res *string) error {
log.Debug("arg:%+v", arg)
*res = "this is a test"
return nil
}
func (r *TestRpc) Add(n int, res *int) error {
r.i += n
*res = r.i + 100
return nil
}
func (r *TestRpc) Err(n int, res *int) error {
return errors.New("this is a error test")
}
package main
import (
"github.com/AlexStocks/getty/rpc"
"github.com/AlexStocks/getty/rpc/example/data"
)
func main() {
srv := rpc.NewServer()
srv.Register(new(data.TestRpc))
srv.Run()
}
# toml configure file
# toml中key的首字母可以小写,但是对应的golang中的struct成员首字母必须大写
AppName = "ECHO-SERVER"
Host = "127.0.0.1"
# Host = "192.168.35.1"
# Host = "192.168.8.3"
Ports = ["10000", "20000"]
ProfilePort = 10086
# session
# client与server之间连接的超时时间
SessionTimeout = "20s"
SessionNumber = 700
# app
FailFastTimeout = "3s"
# tcp
[GettySessionParam]
CompressEncoding = true
TcpNoDelay = true
TcpKeepAlive = true
KeepAlivePeriod = "120s"
TcpRBufSize = 262144
TcpWBufSize = 524288
PkgRQSize = 1024
PkgWQSize = 512
TcpReadTimeout = "1s"
TcpWriteTimeout = "5s"
WaitTimeout = "1s"
MaxMsgLen = 128
SessionName = "echo-server"
<logging>
<filter enabled="true">
<tag>stdout</tag>
<type>console</type>
<!-- level is (:?FINEST|FINE|DEBUG|TRACE|INFO|WARNING|ERROR) -->
<level>DEBUG</level>
</filter>
<filter enabled="false">
<tag>debug_file</tag>
<type>file</type>
<level>DEBUG</level>
<property name="filename">logs/debug.log</property>
<property name="format">[%D %T] [%L] [%S] %M</property>
<property name="rotate">true</property> <!-- true enables log rotation, otherwise append -->
<property name="maxsize">0M</property> <!-- \d+[KMG]? Suffixes are in terms of 2**10 -->
<property name="maxlines">0K</property> <!-- \d+[KMG]? Suffixes are in terms of thousands -->
<property name="daily">true</property> <!-- Automatically rotates when a log message is written after midnight -->
</filter>
<filter enabled="true">
<tag>info_file</tag>
<type>file</type>
<level>INFO</level>
<property name="filename">logs/info.log</property>
<!--
%T - Time (15:04:05 MST)
%t - Time (15:04)
%D - Date (2006/01/02)
%d - Date (01/02/06)
%L - Level (FNST, FINE, DEBG, TRAC, WARN, EROR, CRIT)
%S - Source
%M - Message
It ignores unknown format strings (and removes them)
Recommended: "[%D %T] [%L] (%S) %M"
-->
<property name="format">[%D %T] [%L] [%S] %M</property>
<property name="rotate">true</property> <!-- true enables log rotation, otherwise append -->
<property name="maxsize">0M</property> <!-- \d+[KMG]? Suffixes are in terms of 2**10 -->
<property name="maxlines">0K</property> <!-- \d+[KMG]? Suffixes are in terms of thousands -->
<property name="daily">true</property> <!-- Automatically rotates when a log message is written after midnight -->
</filter>
<filter enabled="true">
<tag>warn_file</tag>
<type>file</type>
<level>WARNING</level>
<property name="filename">logs/warn.log</property>
<property name="format">[%D %T] [%L] [%S] %M</property>
<property name="rotate">true</property> <!-- true enables log rotation, otherwise append -->
<property name="maxsize">0M</property> <!-- \d+[KMG]? Suffixes are in terms of 2**10 -->
<property name="maxlines">0K</property> <!-- \d+[KMG]? Suffixes are in terms of thousands -->
<property name="daily">true</property> <!-- Automatically rotates when a log message is written after midnight -->
</filter>
<filter enabled="true">
<tag>error_file</tag>
<type>file</type>
<level>ERROR</level>
<property name="filename">logs/error.log</property>
<property name="format">[%D %T] [%L] [%S] %M</property>
<property name="rotate">true</property> <!-- true enables log rotation, otherwise append -->
<property name="maxsize">0M</property> <!-- \d+[KMG]? Suffixes are in terms of 2**10 -->
<property name="maxlines">0K</property> <!-- \d+[KMG]? Suffixes are in terms of thousands -->
<property name="daily">true</property> <!-- Automatically rotates when a log message is written after midnight -->
</filter>
</logging>
package rpc
import (
"encoding/json"
"errors"
"reflect"
"sync"
"time"
"github.com/AlexStocks/getty"
log "github.com/AlexStocks/log4go"
)
const (
CmdTypePing = "ping"
CmdTypeErr = "err"
CmdTypeAck = "ack"
)
var (
errTooManySessions = errors.New("too many echo sessions")
)
type rpcSession struct {
session getty.Session
active time.Time
reqNum int32
}
type RpcServerHandler struct {
sessionMap map[getty.Session]*rpcSession
rwlock sync.RWMutex
sendLock sync.Mutex
}
func NewRpcServerHandler() *RpcServerHandler {
r := &RpcServerHandler{
sessionMap: make(map[getty.Session]*rpcSession),
}
return r
}
func (h *RpcServerHandler) OnOpen(session getty.Session) error {
var err error
h.rwlock.RLock()
if conf.SessionNumber < len(h.sessionMap) {
err = errTooManySessions
}
h.rwlock.RUnlock()
if err != nil {
return err
}
log.Info("got session:%s", session.Stat())
h.rwlock.Lock()
h.sessionMap[session] = &rpcSession{session: session}
h.rwlock.Unlock()
return nil
}
func (h *RpcServerHandler) OnError(session getty.Session, err error) {
log.Info("session{%s} got error{%v}, will be closed.", session.Stat(), err)
h.rwlock.Lock()
delete(h.sessionMap, session)
h.rwlock.Unlock()
}
func (h *RpcServerHandler) OnClose(session getty.Session) {
log.Info("session{%s} is closing......", session.Stat())
h.rwlock.Lock()
delete(h.sessionMap, session)
h.rwlock.Unlock()
}
func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
p, ok := pkg.(*RpcRequest)
if !ok {
log.Error("illegal packge{%#v}", pkg)
return
}
if p.header.IsPing() {
h.replyCmd(session, p.header.Seq, "", CmdTypePing)
return
}
if p.header.CallType == RequestSendOnly {
h.asyncCallService(session, p.header.Seq, p.service, p.methodType, p.argv, p.replyv)
return
}
h.callService(session, p.header.Seq, p.service, p.methodType, p.argv, p.replyv)
}
func (h *RpcServerHandler) OnCron(session getty.Session) {
var (
flag bool
active time.Time
)
h.rwlock.RLock()
if _, ok := h.sessionMap[session]; ok {
active = session.GetActive()
if conf.sessionTimeout.Nanoseconds() < time.Since(active).Nanoseconds() {
flag = true
log.Warn("session{%s} timeout{%s}, reqNum{%d}",
session.Stat(), time.Since(active).String(), h.sessionMap[session].reqNum)
}
}
h.rwlock.RUnlock()
if flag {
h.rwlock.Lock()
delete(h.sessionMap, session)
h.rwlock.Unlock()
session.Close()
}
}
func (h *RpcServerHandler) replyCmd(session getty.Session, seq uint64, err string, cmd string) {
resp := NewRpcResponse()
resp.header.Seq = seq
switch cmd {
case CmdTypePing:
resp.header.ReplyType = ReplyTypePong
case CmdTypeAck:
resp.header.ReplyType = ReplyTypeAck
case CmdTypeErr:
resp.header.ReplyType = ReplyTypeAck
resp.header.Error = err
}
h.sendLock.Lock()
defer h.sendLock.Unlock()
session.WritePkg(resp, 0)
}
func (h *RpcServerHandler) asyncCallService(session getty.Session, seq uint64, service *service, methodType *methodType, argv, replyv reflect.Value) {
h.replyCmd(session, seq, "", CmdTypeAck)
function := methodType.method.Func
function.Call([]reflect.Value{service.rcvr, argv, replyv})
return
}
func (h *RpcServerHandler) callService(session getty.Session, seq uint64, service *service, methodType *methodType, argv, replyv reflect.Value) {
function := methodType.method.Func
returnValues := function.Call([]reflect.Value{service.rcvr, argv, replyv})
errInter := returnValues[0].Interface()
resp := NewRpcResponse()
resp.header.ReplyType = ReplyTypeData
resp.header.Seq = seq
if errInter != nil {
h.replyCmd(session, seq, errInter.(error).Error(), CmdTypeErr)
return
}
resp.body = replyv.Interface()
h.sendLock.Lock()
defer h.sendLock.Unlock()
session.WritePkg(resp, 0)
}
type RpcClientHandler struct {
client *Client
}
func NewRpcClientHandler(client *Client) *RpcClientHandler {
h := &RpcClientHandler{
client: client,
}
return h
}
func (h *RpcClientHandler) OnOpen(session getty.Session) error {
h.client.addSession(session)
return nil
}
func (h *RpcClientHandler) OnError(session getty.Session, err error) {
log.Info("session{%s} got error{%v}, will be closed.", session.Stat(), err)
h.client.removeSession(session)
}
func (h *RpcClientHandler) OnClose(session getty.Session) {
log.Info("session{%s} is closing......", session.Stat())
h.client.removeSession(session)
}
func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
p, ok := pkg.(*RpcResponse)
if !ok {
log.Error("illegal packge{%#v}", pkg)
return
}
log.Debug("get rpc response{%s}", p)
h.client.updateSession(session)
pendingResponse := h.client.RemovePendingResponse(p.header.Seq)
if p.header.ReplyType == ReplyTypePong {
return
}
if len(p.header.Error) > 0 {
pendingResponse.err = errors.New(p.header.Error)
}
err := json.Unmarshal(p.body.([]byte), pendingResponse.reply)
if err != nil {
pendingResponse.err = err
}
pendingResponse.done <- true
}
func (h *RpcClientHandler) OnCron(session getty.Session) {
rpcSession, err := h.client.getClientRpcSession(session)
if err != nil {
log.Error("client.getClientSession(session{%s}) = error{%#v}", session.Stat(), err)
return
}
if conf.sessionTimeout.Nanoseconds() < time.Since(session.GetActive()).Nanoseconds() {
log.Warn("session{%s} timeout{%s}, reqNum{%d}",
session.Stat(), time.Since(session.GetActive()).String(), rpcSession.reqNum)
h.client.removeSession(session)
return
}
h.client.ping(session)
}
package rpc
import (
"bytes"
"encoding/binary"
"encoding/json"
"errors"
"reflect"
)
const (
MaxPacketLen = 16 * 1024
RequestSendOnly int16 = 1
ReplyTypeData = 0x01
ReplyTypePong = 0x10
ReplyTypeAck = 0x100
)
var (
ErrNotEnoughStream = errors.New("packet stream is not enough")
ErrTooLargePackage = errors.New("package length is exceed the echo package's legal maximum length.")
ErrNotFoundServiceOrMethod = errors.New("server invalid service or method")
)
type RequestHeader struct {
Seq uint64
Service string
Method string
CallType int16
}
func NewRequestHeader() *RequestHeader {
return &RequestHeader{}
}
func (reqHeader *RequestHeader) IsPing() bool {
if reqHeader.Service == "go" && reqHeader.Method == "ping" {
return true
}
return false
}
type RpcRequest struct {
server *Server
header *RequestHeader
body interface{}
service *service
methodType *methodType
argv reflect.Value
replyv reflect.Value
}
func NewRpcRequest(server *Server) *RpcRequest {
return &RpcRequest{
server: server,
header: NewRequestHeader(),
}
}
func (req *RpcRequest) Marshal() (*bytes.Buffer, error) {
var err error
var buf *bytes.Buffer
buf = &bytes.Buffer{}
headerData, err := json.Marshal(req.header)
if err != nil {
return nil, err
}
bodyData, err := json.Marshal(req.body)
if err != nil {
return nil, err
}
//前2字节总长度,header长度2字节+header数据,body长度2字节+body数据
packLen := 2 + 2 + len(headerData) + 2 + len(bodyData)
err = binary.Write(buf, binary.LittleEndian, uint16(packLen))
if err != nil {
return nil, err
}
err = binary.Write(buf, binary.LittleEndian, uint16(len(headerData)))
if err != nil {
return nil, err
}
err = binary.Write(buf, binary.LittleEndian, headerData)
if err != nil {
return nil, err
}
err = binary.Write(buf, binary.LittleEndian, uint16(len(bodyData)))
if err != nil {
return nil, err
}
err = binary.Write(buf, binary.LittleEndian, bodyData)
if err != nil {
return nil, err
}
return buf, nil
}
func (req *RpcRequest) Unmarshal(buf *bytes.Buffer) (int, error) {
var err error
if buf.Len() < 7 {
return 0, ErrNotEnoughStream
}
var packLen uint16
err = binary.Read(buf, binary.LittleEndian, &packLen)
if err != nil {
return 0, err
}
if packLen > MaxPacketLen {
return 0, ErrTooLargePackage
}
var headerLen uint16
err = binary.Read(buf, binary.LittleEndian, &headerLen)
if err != nil {
return 0, err
}
header := make([]byte, headerLen)
err = binary.Read(buf, binary.LittleEndian, header)
if err != nil {
return 0, err
}
var bodyLen uint16
err = binary.Read(buf, binary.LittleEndian, &bodyLen)
if err != nil {
return 0, err
}
body := make([]byte, bodyLen)
err = binary.Read(buf, binary.LittleEndian, body)
if err != nil {
return 0, err
}
err = json.Unmarshal(header, req.header)
if err != nil {
return 0, err
}
if req.header.IsPing() {
return int(packLen), nil
}
req.service = req.server.serviceMap[req.header.Service]
if req.service != nil {
req.methodType = req.service.method[req.header.Method]
}
if req.service == nil || req.methodType == nil {
return 0, ErrNotFoundServiceOrMethod
}
argIsValue := false
if req.methodType.ArgType.Kind() == reflect.Ptr {
req.argv = reflect.New(req.methodType.ArgType.Elem())
} else {
req.argv = reflect.New(req.methodType.ArgType)
argIsValue = true
}
err = json.Unmarshal(body, req.argv.Interface())
if err != nil {
return 0, err
}
if argIsValue {
req.argv = req.argv.Elem()
}
req.replyv = reflect.New(req.methodType.ReplyType.Elem())
return int(packLen), nil
}
type ResponseHeader struct {
Seq uint64
ReplyType int16
Error string
}
func NewResponseHeader() *ResponseHeader {
return &ResponseHeader{}
}
type RpcResponse struct {
header *ResponseHeader
body interface{}
}
func NewRpcResponse() *RpcResponse {
r := &RpcResponse{
header: NewResponseHeader(),
}
return r
}
func (resp *RpcResponse) Marshal() (*bytes.Buffer, error) {
var err error
var buf *bytes.Buffer
buf = &bytes.Buffer{}
headerData, err := json.Marshal(resp.header)
if err != nil {
return nil, err
}
bodyData, err := json.Marshal(resp.body)
if err != nil {
return nil, err
}
//前2字节总长度,header长度2字节+header数据,body长度2字节+body数据
packLen := 2 + 2 + len(headerData) + 2 + len(bodyData)
err = binary.Write(buf, binary.LittleEndian, uint16(packLen))
if err != nil {
return nil, err
}
err = binary.Write(buf, binary.LittleEndian, uint16(len(headerData)))
if err != nil {
return nil, err
}
err = binary.Write(buf, binary.LittleEndian, headerData)
if err != nil {
return nil, err
}
err = binary.Write(buf, binary.LittleEndian, uint16(len(bodyData)))
if err != nil {
return nil, err
}
err = binary.Write(buf, binary.LittleEndian, bodyData)
if err != nil {
return nil, err
}
return buf, nil
}
func (resp *RpcResponse) Unmarshal(buf *bytes.Buffer) (int, error) {
var err error
if buf.Len() < 7 {
return 0, ErrNotEnoughStream
}
var packLen uint16
err = binary.Read(buf, binary.LittleEndian, &packLen)
if err != nil {
return 0, err
}
if packLen > MaxPacketLen {
return 0, ErrTooLargePackage
}
var headerLen uint16
err = binary.Read(buf, binary.LittleEndian, &headerLen)
if err != nil {
return 0, err
}
header := make([]byte, headerLen)
err = binary.Read(buf, binary.LittleEndian, header)
if err != nil {
return 0, err
}
var bodyLen uint16
err = binary.Read(buf, binary.LittleEndian, &bodyLen)
if err != nil {
return 0, err
}
body := make([]byte, bodyLen)
err = binary.Read(buf, binary.LittleEndian, body)
if err != nil {
return 0, err
}
resp.body = body
err = json.Unmarshal(header, resp.header)
if err != nil {
return 0, err
}
// err = json.Unmarshal(body, resp.body)
// if err != nil {
// return 0, err
// }
return int(packLen), nil
}
type PendingResponse struct {
seq uint64
err error
reply interface{}
done chan bool
}
func NewPendingResponse() *PendingResponse {
return &PendingResponse{done: make(chan bool)}
}
package rpc
import (
"bytes"
"errors"
)
import (
"github.com/AlexStocks/getty"
log "github.com/AlexStocks/log4go"
)
type RpcServerPacketHandler struct {
server *Server
}
func NewRpcServerPacketHandler(server *Server) *RpcServerPacketHandler {
return &RpcServerPacketHandler{
server: server,
}
}
func (p *RpcServerPacketHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
var (
err error
len int
buf *bytes.Buffer
)
buf = bytes.NewBuffer(data)
req := NewRpcRequest(p.server)
len, err = req.Unmarshal(buf)
if err != nil {
if err == ErrNotEnoughStream {
return nil, 0, nil
}
return nil, 0, err
}
return req, len, nil
}
func (p *RpcServerPacketHandler) Write(ss getty.Session, pkg interface{}) error {
var (
ok bool
err error
resp *RpcResponse
buf *bytes.Buffer
)
if resp, ok = pkg.(*RpcResponse); !ok {
log.Error("illegal pkg:%+v\n", pkg)
return errors.New("invalid rpc response")
}
buf, err = resp.Marshal()
if err != nil {
log.Warn("binary.Write(resp{%#v}) = err{%#v}", resp, err)
return err
}
err = ss.WriteBytes(buf.Bytes())
return err
}
type RpcClientPacketHandler struct {
}
func NewRpcClientPacketHandler() *RpcClientPacketHandler {
return &RpcClientPacketHandler{}
}
func (p *RpcClientPacketHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
var (
err error
len int
buf *bytes.Buffer
)
buf = bytes.NewBuffer(data)
resp := NewRpcResponse()
len, err = resp.Unmarshal(buf)
if err != nil {
if err == ErrNotEnoughStream {
return nil, 0, nil
}
return nil, 0, err
}
return resp, len, nil
}
func (p *RpcClientPacketHandler) Write(ss getty.Session, pkg interface{}) error {
var (
ok bool
err error
req *RpcRequest
buf *bytes.Buffer
)
if req, ok = pkg.(*RpcRequest); !ok {
log.Error("illegal pkg:%+v\n", pkg)
return errors.New("invalid rpc request")
}
buf, err = req.Marshal()
if err != nil {
log.Warn("binary.Write(req{%#v}) = err{%#v}", req, err)
return err
}
err = ss.WriteBytes(buf.Bytes())
return err
}
package rpc
import (
"reflect"
"unicode"
"unicode/utf8"
log "github.com/AlexStocks/log4go"
)
var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
// suitableMethods returns suitable Rpc methods of typ, it will report
// error using log if reportErr is true.
func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
methods := make(map[string]*methodType)
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
mtype := method.Type
mname := method.Name
// Method must be exported.
if method.PkgPath != "" {
continue
}
// Method needs three ins: receiver, *args, *reply.
if mtype.NumIn() != 3 {
if reportErr {
log.Warn("method", mname, "has wrong number of ins:", mtype.NumIn())
}
continue
}
// First arg need not be a pointer.
argType := mtype.In(1)
if !isExportedOrBuiltinType(argType) {
if reportErr {
log.Warn(mname, "argument type not exported:", argType)
}
continue
}
// Second arg must be a pointer.
replyType := mtype.In(2)
if replyType.Kind() != reflect.Ptr {
if reportErr {
log.Warn("method", mname, "reply type not a pointer:", replyType)
}
continue
}
// Reply type must be exported.
if !isExportedOrBuiltinType(replyType) {
if reportErr {
log.Warn("method", mname, "reply type not exported:", replyType)
}
continue
}
// Method needs one out.
if mtype.NumOut() != 1 {
if reportErr {
log.Warn("method", mname, "has wrong number of outs:", mtype.NumOut())
}
continue
}
// The return type of the method must be error.
if returnType := mtype.Out(0); returnType != typeOfError {
if reportErr {
log.Warn("method", mname, "returns", returnType.String(), "not error")
}
continue
}
methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
}
return methods
}
// Is this an exported - upper case - name
func isExported(name string) bool {
rune, _ := utf8.DecodeRuneInString(name)
return unicode.IsUpper(rune)
}
// Is this type exported or a builtin?
func isExportedOrBuiltinType(t reflect.Type) bool {
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
// PkgPath will be non-empty even for an exported type,
// so we need to check the type name as well.
return isExported(t.Name()) || t.PkgPath() == ""
}
package rpc
import (
"errors"
"fmt"
"net"
"os"
"os/signal"
"reflect"
"syscall"
"time"
"github.com/AlexStocks/getty"
"github.com/AlexStocks/goext/log"
"github.com/AlexStocks/goext/net"
log "github.com/AlexStocks/log4go"
)
type Server struct {
tcpServerList []getty.Server
serviceMap map[string]*service
}
func NewServer() *Server {
s := &Server{
serviceMap: make(map[string]*service),
}
return s
}
func (server *Server) Run() {
initConf(defaultServerConfFile)
initLog(defaultServerLogConfFile)
initProfiling()
server.Init()
gxlog.CInfo("%s starts successfull! its version=%s, its listen ends=%s:%s\n",
conf.AppName, Version, conf.Host, conf.Ports)
log.Info("%s starts successfull! its version=%s, its listen ends=%s:%s\n",
conf.AppName, Version, conf.Host, conf.Ports)
server.initSignal()
}
func (server *Server) Register(rcvr interface{}) error {
s := new(service)
s.typ = reflect.TypeOf(rcvr)
s.rcvr = reflect.ValueOf(rcvr)
sname := reflect.Indirect(s.rcvr).Type().Name()
if sname == "" {
s := "rpc.Register: no service name for type " + s.typ.String()
log.Error(s)
return errors.New(s)
}
if !isExported(sname) {
s := "rpc.Register: type " + sname + " is not exported"
log.Error(s)
return errors.New(s)
}
if _, present := server.serviceMap[sname]; present {
return errors.New("rpc: service already defined: " + sname)
}
s.name = sname
// Install the methods
s.method = suitableMethods(s.typ, true)
if len(s.method) == 0 {
str := ""
// To help the user, see if a pointer receiver would work.
method := suitableMethods(reflect.PtrTo(s.typ), false)
if len(method) != 0 {
str = "rpc.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)"
} else {
str = "rpc.Register: type " + sname + " has no exported methods of suitable type"
}
log.Error(s)
return errors.New(str)
}
server.serviceMap[s.name] = s
return nil
}
func (server *Server) newSession(session getty.Session) error {
var (
ok bool
tcpConn *net.TCPConn
)
if conf.GettySessionParam.CompressEncoding {
session.SetCompressType(getty.CompressZip)
}
if tcpConn, ok = session.Conn().(*net.TCPConn); !ok {
panic(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection\n", session.Stat(), session.Conn()))
}
tcpConn.SetNoDelay(conf.GettySessionParam.TcpNoDelay)
tcpConn.SetKeepAlive(conf.GettySessionParam.TcpKeepAlive)
if conf.GettySessionParam.TcpKeepAlive {
tcpConn.SetKeepAlivePeriod(conf.GettySessionParam.keepAlivePeriod)
}
tcpConn.SetReadBuffer(conf.GettySessionParam.TcpRBufSize)
tcpConn.SetWriteBuffer(conf.GettySessionParam.TcpWBufSize)
session.SetName(conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewRpcServerPacketHandler(server)) //
session.SetEventListener(NewRpcServerHandler()) //
session.SetRQLen(conf.GettySessionParam.PkgRQSize)
session.SetWQLen(conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)
session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(conf.sessionTimeout.Nanoseconds() / 1e6))
session.SetWaitTime(conf.GettySessionParam.waitTimeout)
log.Debug("app accepts new session:%s\n", session.Stat())
return nil
}
func (server *Server) Init() {
var (
addr string
portList []string
tcpServer getty.Server
)
portList = conf.Ports
if len(portList) == 0 {
panic("portList is nil")
}
for _, port := range portList {
addr = gxnet.HostAddress2(conf.Host, port)
tcpServer = getty.NewTCPServer(
getty.WithLocalAddress(addr),
)
// run server
tcpServer.RunEventLoop(server.newSession)
log.Debug("server bind addr{%s} ok!", addr)
server.tcpServerList = append(server.tcpServerList, tcpServer)
}
}
func (server *Server) Stop() {
for _, tcpServer := range server.tcpServerList {
tcpServer.Close()
}
}
func (server *Server) initSignal() {
// signal.Notify的ch信道是阻塞的(signal.Notify不会阻塞发送信号), 需要设置缓冲
signals := make(chan os.Signal, 1)
// It is not possible to block SIGKILL or syscall.SIGSTOP
signal.Notify(signals, os.Interrupt, os.Kill, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
for {
sig := <-signals
log.Info("get signal %s", sig.String())
switch sig {
case syscall.SIGHUP:
// reload()
default:
go time.AfterFunc(conf.failFastTimeout, func() {
// log.Warn("app exit now by force...")
// os.Exit(1)
log.Exit("app exit now by force...")
log.Close()
})
// 要么survialTimeout时间内执行完毕下面的逻辑然后程序退出,要么执行上面的超时函数程序强行退出
server.Stop()
// fmt.Println("app exit now...")
log.Exit("app exit now...")
log.Close()
return
}
}
}
package rpc
import (
"reflect"
"sync"
)
type methodType struct {
sync.Mutex
method reflect.Method
ArgType reflect.Type
ReplyType reflect.Type
numCalls uint
}
type service struct {
name string
rcvr reflect.Value
typ reflect.Type
method map[string]*methodType
}
package rpc
import (
"net/http"
"github.com/AlexStocks/goext/net"
log "github.com/AlexStocks/log4go"
)
const (
pprofPath = "/debug/pprof/"
)
func initProfiling() {
var (
addr string
)
// addr = *host + ":" + "10000"
addr = gxnet.HostAddress(conf.Host, conf.ProfilePort)
log.Info("App Profiling startup on address{%v}", addr+pprofPath)
go func() {
log.Info(http.ListenAndServe(addr, nil))
}()
}
package rpc
var (
Version = "0.8.2"
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment