Commit 8eec657e authored by AlexStocks's avatar AlexStocks

Add: RPC

parent 3f3fa13a
...@@ -14,9 +14,13 @@ ...@@ -14,9 +14,13 @@
## develop history ## ## develop history ##
--- ---
- 2018/07/01
> Feature
* Add RPC
- 2018/06/25 - 2018/06/25
> buf fix > buf fix
* delete juju/errors on read/write in case of network i/o timeout * Using juju/errors.Cause on read/write in case of network i/o timeout
- 2018/03/29 - 2018/03/29
> improvement > improvement
......
...@@ -185,7 +185,7 @@ func (c *client) dialUDP() Session { ...@@ -185,7 +185,7 @@ func (c *client) dialUDP() Session {
} }
conn.SetReadDeadline(wheel.Now().Add(1e9)) conn.SetReadDeadline(wheel.Now().Add(1e9))
length, err = conn.Read(buf) length, err = conn.Read(buf)
if netErr, ok := err.(net.Error); ok && netErr.Timeout() { if netErr, ok := jerrors.Cause(err).(net.Error); ok && netErr.Timeout() {
err = nil err = nil
} }
if err != nil { if err != nil {
......
...@@ -249,8 +249,8 @@ func (t *gettyTCPConn) read(p []byte) (int, error) { ...@@ -249,8 +249,8 @@ func (t *gettyTCPConn) read(p []byte) (int, error) {
length, err = t.reader.Read(p) length, err = t.reader.Read(p)
log.Debug("now:%s, length:%d, err:%s", currentTime, length, err) log.Debug("now:%s, length:%d, err:%s", currentTime, length, err)
atomic.AddUint32(&t.readBytes, uint32(length)) atomic.AddUint32(&t.readBytes, uint32(length))
//return length, jerrors.Trace(err) return length, jerrors.Trace(err)
return length, err //return length, err
} }
// tcp connection write // tcp connection write
...@@ -283,8 +283,8 @@ func (t *gettyTCPConn) Write(pkg interface{}) (int, error) { ...@@ -283,8 +283,8 @@ func (t *gettyTCPConn) Write(pkg interface{}) (int, error) {
atomic.AddUint32(&t.writeBytes, (uint32)(len(p))) atomic.AddUint32(&t.writeBytes, (uint32)(len(p)))
} }
log.Debug("now:%s, length:%d, err:%s", currentTime, length, err) log.Debug("now:%s, length:%d, err:%s", currentTime, length, err)
//return length, jerrors.Trace(err) return length, jerrors.Trace(err)
return length, err //return length, err
} }
// close tcp connection // close tcp connection
...@@ -402,8 +402,8 @@ func (u *gettyUDPConn) read(p []byte) (int, *net.UDPAddr, error) { ...@@ -402,8 +402,8 @@ func (u *gettyUDPConn) read(p []byte) (int, *net.UDPAddr, error) {
atomic.AddUint32(&u.readBytes, uint32(length)) atomic.AddUint32(&u.readBytes, uint32(length))
} }
return length, addr, err //return length, addr, err
// return length, addr, jerrors.Trace(err) return length, addr, jerrors.Trace(err)
} }
// write udp packet, @ctx should be of type UDPContext // write udp packet, @ctx should be of type UDPContext
...@@ -449,8 +449,8 @@ func (u *gettyUDPConn) Write(udpCtx interface{}) (int, error) { ...@@ -449,8 +449,8 @@ func (u *gettyUDPConn) Write(udpCtx interface{}) (int, error) {
} }
log.Debug("WriteMsgUDP(peerAddr:%s) = {length:%d, error:%s}", peerAddr, length, err) log.Debug("WriteMsgUDP(peerAddr:%s) = {length:%d, error:%s}", peerAddr, length, err)
// return length, jerrors.Trace(err) return length, jerrors.Trace(err)
return length, err //return length, err
} }
// close udp connection // close udp connection
...@@ -547,8 +547,8 @@ func (w *gettyWSConn) read() ([]byte, error) { ...@@ -547,8 +547,8 @@ func (w *gettyWSConn) read() ([]byte, error) {
} }
} }
// return b, jerrors.Trace(e) return b, jerrors.Trace(e)
return b, e //return b, e
} }
func (w *gettyWSConn) updateWriteDeadline() error { func (w *gettyWSConn) updateWriteDeadline() error {
...@@ -589,8 +589,8 @@ func (w *gettyWSConn) Write(pkg interface{}) (int, error) { ...@@ -589,8 +589,8 @@ func (w *gettyWSConn) Write(pkg interface{}) (int, error) {
if err = w.conn.WriteMessage(websocket.BinaryMessage, p); err == nil { if err = w.conn.WriteMessage(websocket.BinaryMessage, p); err == nil {
atomic.AddUint32(&w.writeBytes, (uint32)(len(p))) atomic.AddUint32(&w.writeBytes, (uint32)(len(p)))
} }
// return len(p), jerrors.Trace(err) return len(p), jerrors.Trace(err)
return len(p), err //return len(p), err
} }
func (w *gettyWSConn) writePing() error { func (w *gettyWSConn) writePing() error {
......
package rpc package rpc
import ( import (
"errors"
"fmt" "fmt"
"math/rand" "math/rand"
"net" "net"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
)
import (
jerrors "github.com/juju/errors"
)
import (
"github.com/AlexStocks/getty" "github.com/AlexStocks/getty"
"github.com/AlexStocks/goext/net" "github.com/AlexStocks/goext/net"
log "github.com/AlexStocks/log4go" log "github.com/AlexStocks/log4go"
) )
var ( var (
errInvalidAddress = errors.New("remote address invalid or empty") errInvalidAddress = jerrors.New("remote address invalid or empty")
errSessionNotExist = errors.New("session not exist") errSessionNotExist = jerrors.New("session not exist")
errClientClosed = jerrors.New("client closed")
) )
func init() { func init() {
...@@ -24,6 +30,7 @@ func init() { ...@@ -24,6 +30,7 @@ func init() {
} }
type Client struct { type Client struct {
conf *Config
lock sync.RWMutex lock sync.RWMutex
sessions []*rpcSession sessions []*rpcSession
gettyClient getty.Client gettyClient getty.Client
...@@ -36,22 +43,15 @@ type Client struct { ...@@ -36,22 +43,15 @@ type Client struct {
sendLock sync.Mutex sendLock sync.Mutex
} }
func NewClient() *Client { func NewClient(conf *Config) *Client {
c := &Client{ c := &Client{
pendingResponses: make(map[uint64]*PendingResponse), pendingResponses: make(map[uint64]*PendingResponse),
conf: conf,
gettyClient: getty.NewTCPClient(
getty.WithServerAddress(gxnet.HostAddress(conf.ServerHost, conf.ServerPort)),
getty.WithConnectionNumber((int)(conf.ConnectionNum)),
),
} }
c.Init()
return c
}
func (c *Client) Init() {
initConf(defaultClientConfFile)
initLog(defaultClientLogConfFile)
initProfiling()
c.gettyClient = getty.NewTCPClient(
getty.WithServerAddress(gxnet.HostAddress(conf.ServerHost, conf.ServerPort)),
getty.WithConnectionNumber((int)(conf.ConnectionNum)),
)
c.gettyClient.RunEventLoop(c.newSession) c.gettyClient.RunEventLoop(c.newSession)
for { for {
if c.isAvailable() { if c.isAvailable() {
...@@ -60,6 +60,8 @@ func (c *Client) Init() { ...@@ -60,6 +60,8 @@ func (c *Client) Init() {
time.Sleep(1e6) time.Sleep(1e6)
} }
log.Info("client init ok") log.Info("client init ok")
return c
} }
func (c *Client) newSession(session getty.Session) error { func (c *Client) newSession(session getty.Session) error {
...@@ -86,8 +88,8 @@ func (c *Client) newSession(session getty.Session) error { ...@@ -86,8 +88,8 @@ func (c *Client) newSession(session getty.Session) error {
session.SetName(conf.GettySessionParam.SessionName) session.SetName(conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen) session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewRpcClientPacketHandler()) // session.SetPkgHandler(NewRpcClientPacketHandler())
session.SetEventListener(NewRpcClientHandler(c)) // session.SetEventListener(NewRpcClientHandler(c))
session.SetRQLen(conf.GettySessionParam.PkgRQSize) session.SetRQLen(conf.GettySessionParam.PkgRQSize)
session.SetWQLen(conf.GettySessionParam.PkgWQSize) session.SetWQLen(conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout) session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)
...@@ -123,6 +125,7 @@ func (c *Client) Call(service, method string, args interface{}, reply interface{ ...@@ -123,6 +125,7 @@ func (c *Client) Call(service, method string, args interface{}, reply interface{
<-resp.done <-resp.done
return resp.err return resp.err
} }
return errSessionNotExist return errSessionNotExist
} }
...@@ -130,27 +133,39 @@ func (c *Client) isAvailable() bool { ...@@ -130,27 +133,39 @@ func (c *Client) isAvailable() bool {
if c.selectSession() == nil { if c.selectSession() == nil {
return false return false
} }
return true return true
} }
func (c *Client) Close() { func (c *Client) Close() {
var sessions *[]*rpcSession
c.lock.Lock() c.lock.Lock()
if c.gettyClient != nil { if c.gettyClient != nil {
for _, s := range c.sessions { sessions = &(c.sessions)
log.Info("close client session{%s, last active:%s, request number:%d}", c.sessions = nil
s.session.Stat(), s.session.GetActive().String(), s.reqNum)
s.session.Close()
}
c.gettyClient.Close() c.gettyClient.Close()
c.gettyClient = nil c.gettyClient = nil
c.sessions = c.sessions[:0] c.sessions = c.sessions[:0]
} }
c.lock.Unlock() c.lock.Unlock()
if sessions != nil {
for _, s := range *sessions {
log.Info("close client session{%s, last active:%s, request number:%d}",
s.session.Stat(), s.session.GetActive().String(), s.reqNum)
s.session.Close()
}
}
} }
func (c *Client) selectSession() getty.Session { func (c *Client) selectSession() getty.Session {
c.lock.RLock() c.lock.RLock()
defer c.lock.RUnlock() defer c.lock.RUnlock()
if c.sessions == nil {
return nil
}
count := len(c.sessions) count := len(c.sessions)
if count == 0 { if count == 0 {
return nil return nil
...@@ -165,15 +180,25 @@ func (c *Client) addSession(session getty.Session) { ...@@ -165,15 +180,25 @@ func (c *Client) addSession(session getty.Session) {
} }
c.lock.Lock() c.lock.Lock()
defer c.lock.Unlock()
if c.sessions == nil {
return
}
c.sessions = append(c.sessions, &rpcSession{session: session}) c.sessions = append(c.sessions, &rpcSession{session: session})
c.lock.Unlock()
} }
func (c *Client) removeSession(session getty.Session) { func (c *Client) removeSession(session getty.Session) {
if session == nil { if session == nil {
return return
} }
c.lock.Lock() c.lock.Lock()
defer c.lock.Unlock()
if c.sessions == nil {
return
}
for i, s := range c.sessions { for i, s := range c.sessions {
if s.session == session { if s.session == session {
c.sessions = append(c.sessions[:i], c.sessions[i+1:]...) c.sessions = append(c.sessions[:i], c.sessions[i+1:]...)
...@@ -182,7 +207,6 @@ func (c *Client) removeSession(session getty.Session) { ...@@ -182,7 +207,6 @@ func (c *Client) removeSession(session getty.Session) {
} }
} }
log.Info("after remove session{%s}, left session number:%d", session.Stat(), len(c.sessions)) log.Info("after remove session{%s}, left session number:%d", session.Stat(), len(c.sessions))
c.lock.Unlock()
} }
func (c *Client) updateSession(session getty.Session) { func (c *Client) updateSession(session getty.Session) {
...@@ -190,13 +214,17 @@ func (c *Client) updateSession(session getty.Session) { ...@@ -190,13 +214,17 @@ func (c *Client) updateSession(session getty.Session) {
return return
} }
c.lock.Lock() c.lock.Lock()
defer c.lock.Unlock()
if c.sessions == nil {
return
}
for i, s := range c.sessions { for i, s := range c.sessions {
if s.session == session { if s.session == session {
c.sessions[i].reqNum++ c.sessions[i].reqNum++
break break
} }
} }
c.lock.Unlock()
} }
func (c *Client) getClientRpcSession(session getty.Session) (rpcSession, error) { func (c *Client) getClientRpcSession(session getty.Session) (rpcSession, error) {
...@@ -205,6 +233,11 @@ func (c *Client) getClientRpcSession(session getty.Session) (rpcSession, error) ...@@ -205,6 +233,11 @@ func (c *Client) getClientRpcSession(session getty.Session) (rpcSession, error)
rpcSession rpcSession rpcSession rpcSession
) )
c.lock.Lock() c.lock.Lock()
defer c.lock.Unlock()
if c.sessions == nil {
return rpcSession, errClientClosed
}
err = errSessionNotExist err = errSessionNotExist
for _, s := range c.sessions { for _, s := range c.sessions {
if s.session == session { if s.session == session {
...@@ -213,7 +246,6 @@ func (c *Client) getClientRpcSession(session getty.Session) (rpcSession, error) ...@@ -213,7 +246,6 @@ func (c *Client) getClientRpcSession(session getty.Session) (rpcSession, error)
break break
} }
} }
c.lock.Unlock()
return rpcSession, err return rpcSession, err
} }
......
package rpc package rpc
import ( import (
"fmt"
"path"
"time" "time"
) )
import (
log "github.com/AlexStocks/log4go"
config "github.com/koding/multiconfig"
)
const (
defaultClientConfFile string = "client_config.toml"
defaultClientLogConfFile string = "client_log.xml"
defaultServerConfFile string = "server_config.toml"
defaultServerLogConfFile string = "server_log.xml"
)
var (
conf *Config
)
type ( type (
GettySessionParam struct { GettySessionParam struct {
CompressEncoding bool `default:"false"` CompressEncoding bool `default:"false"`
...@@ -75,51 +57,3 @@ type ( ...@@ -75,51 +57,3 @@ type (
GettySessionParam GettySessionParam `required:"true"` GettySessionParam GettySessionParam `required:"true"`
} }
) )
func initConf(confFile string) {
var err error
if path.Ext(confFile) != ".toml" {
panic(fmt.Sprintf("application configure file name{%v} suffix must be .toml", confFile))
}
conf = new(Config)
config.MustLoadWithPath(confFile, conf)
conf.heartbeatPeriod, err = time.ParseDuration(conf.HeartbeatPeriod)
if err != nil {
panic(fmt.Sprintf("time.ParseDuration(HeartbeatPeroid{%#v}) = error{%v}", conf.HeartbeatPeriod, err))
}
conf.sessionTimeout, err = time.ParseDuration(conf.SessionTimeout)
if err != nil {
panic(fmt.Sprintf("time.ParseDuration(SessionTimeout{%#v}) = error{%v}", conf.SessionTimeout, err))
}
conf.failFastTimeout, err = time.ParseDuration(conf.FailFastTimeout)
if err != nil {
panic(fmt.Sprintf("time.ParseDuration(FailFastTimeout{%#v}) = error{%v}", conf.FailFastTimeout, err))
}
conf.GettySessionParam.keepAlivePeriod, err = time.ParseDuration(conf.GettySessionParam.KeepAlivePeriod)
if err != nil {
panic(fmt.Sprintf("time.ParseDuration(KeepAlivePeriod{%#v}) = error{%v}", conf.GettySessionParam.KeepAlivePeriod, err))
}
conf.GettySessionParam.tcpReadTimeout, err = time.ParseDuration(conf.GettySessionParam.TcpReadTimeout)
if err != nil {
panic(fmt.Sprintf("time.ParseDuration(TcpReadTimeout{%#v}) = error{%v}", conf.GettySessionParam.TcpReadTimeout, err))
}
conf.GettySessionParam.tcpWriteTimeout, err = time.ParseDuration(conf.GettySessionParam.TcpWriteTimeout)
if err != nil {
panic(fmt.Sprintf("time.ParseDuration(TcpWriteTimeout{%#v}) = error{%v}", conf.GettySessionParam.TcpWriteTimeout, err))
}
conf.GettySessionParam.waitTimeout, err = time.ParseDuration(conf.GettySessionParam.WaitTimeout)
if err != nil {
panic(fmt.Sprintf("time.ParseDuration(WaitTimeout{%#v}) = error{%v}", conf.GettySessionParam.WaitTimeout, err))
}
return
}
func initLog(logFile string) {
if path.Ext(logFile) != ".xml" {
panic(fmt.Sprintf("log configure file name{%v} suffix must be .xml", logFile))
}
log.LoadConfiguration(logFile)
log.Info("config{%#v}", conf)
}
...@@ -2,12 +2,14 @@ package rpc ...@@ -2,12 +2,14 @@ package rpc
import ( import (
"encoding/json" "encoding/json"
"errors"
"reflect" "reflect"
"sync" "sync"
"time" "time"
)
import (
"github.com/AlexStocks/getty" "github.com/AlexStocks/getty"
jerrors "github.com/juju/errors"
log "github.com/AlexStocks/log4go" log "github.com/AlexStocks/log4go"
) )
...@@ -19,7 +21,7 @@ const ( ...@@ -19,7 +21,7 @@ const (
) )
var ( var (
errTooManySessions = errors.New("too many echo sessions") errTooManySessions = jerrors.New("too many echo sessions")
) )
type rpcSession struct { type rpcSession struct {
...@@ -29,23 +31,26 @@ type rpcSession struct { ...@@ -29,23 +31,26 @@ type rpcSession struct {
} }
type RpcServerHandler struct { type RpcServerHandler struct {
sessionMap map[getty.Session]*rpcSession maxSessionNum int
rwlock sync.RWMutex sessionTimeout time.Duration
sessionMap map[getty.Session]*rpcSession
rwlock sync.RWMutex
sendLock sync.Mutex sendLock sync.Mutex
} }
func NewRpcServerHandler() *RpcServerHandler { func NewRpcServerHandler(maxSessionNum int, sessionTimeout time.Duration) *RpcServerHandler {
r := &RpcServerHandler{ return &RpcServerHandler{
sessionMap: make(map[getty.Session]*rpcSession), maxSessionNum: maxSessionNum,
sessionTimeout: sessionTimeout,
sessionMap: make(map[getty.Session]*rpcSession),
} }
return r
} }
func (h *RpcServerHandler) OnOpen(session getty.Session) error { func (h *RpcServerHandler) OnOpen(session getty.Session) error {
var err error var err error
h.rwlock.RLock() h.rwlock.RLock()
if conf.SessionNumber < len(h.sessionMap) { if h.maxSessionNum < len(h.sessionMap) {
err = errTooManySessions err = errTooManySessions
} }
h.rwlock.RUnlock() h.rwlock.RUnlock()
...@@ -101,7 +106,7 @@ func (h *RpcServerHandler) OnCron(session getty.Session) { ...@@ -101,7 +106,7 @@ func (h *RpcServerHandler) OnCron(session getty.Session) {
h.rwlock.RLock() h.rwlock.RLock()
if _, ok := h.sessionMap[session]; ok { if _, ok := h.sessionMap[session]; ok {
active = session.GetActive() active = session.GetActive()
if conf.sessionTimeout.Nanoseconds() < time.Since(active).Nanoseconds() { if h.sessionTimeout.Nanoseconds() < time.Since(active).Nanoseconds() {
flag = true flag = true
log.Warn("session{%s} timeout{%s}, reqNum{%d}", log.Warn("session{%s} timeout{%s}, reqNum{%d}",
session.Stat(), time.Since(active).String(), h.sessionMap[session].reqNum) session.Stat(), time.Since(active).String(), h.sessionMap[session].reqNum)
...@@ -198,13 +203,13 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { ...@@ -198,13 +203,13 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
return return
} }
if len(p.header.Error) > 0 { if len(p.header.Error) > 0 {
pendingResponse.err = errors.New(p.header.Error) pendingResponse.err = jerrors.New(p.header.Error)
} }
err := json.Unmarshal(p.body.([]byte), pendingResponse.reply) err := json.Unmarshal(p.body.([]byte), pendingResponse.reply)
if err != nil { if err != nil {
pendingResponse.err = err pendingResponse.err = err
} }
pendingResponse.done <- true pendingResponse.done <- struct{}{}
} }
func (h *RpcClientHandler) OnCron(session getty.Session) { func (h *RpcClientHandler) OnCron(session getty.Session) {
......
...@@ -4,10 +4,13 @@ import ( ...@@ -4,10 +4,13 @@ import (
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"encoding/json" "encoding/json"
"errors"
"reflect" "reflect"
) )
import (
jerrors "github.com/juju/errors"
)
const ( const (
MaxPacketLen = 16 * 1024 MaxPacketLen = 16 * 1024
RequestSendOnly int16 = 1 RequestSendOnly int16 = 1
...@@ -18,9 +21,9 @@ const ( ...@@ -18,9 +21,9 @@ const (
) )
var ( var (
ErrNotEnoughStream = errors.New("packet stream is not enough") ErrNotEnoughStream = jerrors.New("packet stream is not enough")
ErrTooLargePackage = errors.New("package length is exceed the echo package's legal maximum length.") ErrTooLargePackage = jerrors.New("package length is exceed the echo package's legal maximum length.")
ErrNotFoundServiceOrMethod = errors.New("server invalid service or method") ErrNotFoundServiceOrMethod = jerrors.New("server invalid service or method")
) )
type RequestHeader struct { type RequestHeader struct {
...@@ -279,9 +282,9 @@ type PendingResponse struct { ...@@ -279,9 +282,9 @@ type PendingResponse struct {
seq uint64 seq uint64
err error err error
reply interface{} reply interface{}
done chan bool done chan struct{}
} }
func NewPendingResponse() *PendingResponse { func NewPendingResponse() *PendingResponse {
return &PendingResponse{done: make(chan bool)} return &PendingResponse{done: make(chan struct{})}
} }
...@@ -2,12 +2,12 @@ package rpc ...@@ -2,12 +2,12 @@ package rpc
import ( import (
"bytes" "bytes"
"errors"
) )
import ( import (
"github.com/AlexStocks/getty" "github.com/AlexStocks/getty"
log "github.com/AlexStocks/log4go" log "github.com/AlexStocks/log4go"
jerrors "github.com/juju/errors"
) )
type RpcServerPacketHandler struct { type RpcServerPacketHandler struct {
...@@ -49,7 +49,7 @@ func (p *RpcServerPacketHandler) Write(ss getty.Session, pkg interface{}) error ...@@ -49,7 +49,7 @@ func (p *RpcServerPacketHandler) Write(ss getty.Session, pkg interface{}) error
if resp, ok = pkg.(*RpcResponse); !ok { if resp, ok = pkg.(*RpcResponse); !ok {
log.Error("illegal pkg:%+v\n", pkg) log.Error("illegal pkg:%+v\n", pkg)
return errors.New("invalid rpc response") return jerrors.New("invalid rpc response")
} }
buf, err = resp.Marshal() buf, err = resp.Marshal()
...@@ -99,7 +99,7 @@ func (p *RpcClientPacketHandler) Write(ss getty.Session, pkg interface{}) error ...@@ -99,7 +99,7 @@ func (p *RpcClientPacketHandler) Write(ss getty.Session, pkg interface{}) error
if req, ok = pkg.(*RpcRequest); !ok { if req, ok = pkg.(*RpcRequest); !ok {
log.Error("illegal pkg:%+v\n", pkg) log.Error("illegal pkg:%+v\n", pkg)
return errors.New("invalid rpc request") return jerrors.New("invalid rpc request")
} }
buf, err = req.Marshal() buf, err = req.Marshal()
......
...@@ -2,17 +2,52 @@ package rpc ...@@ -2,17 +2,52 @@ package rpc
import ( import (
"reflect" "reflect"
"sync"
"unicode" "unicode"
"unicode/utf8" "unicode/utf8"
)
import (
log "github.com/AlexStocks/log4go" log "github.com/AlexStocks/log4go"
) )
var typeOfError = reflect.TypeOf((*error)(nil)).Elem() var (
typeOfError = reflect.TypeOf((*error)(nil)).Elem()
)
type methodType struct {
sync.Mutex
method reflect.Method
ArgType reflect.Type
ReplyType reflect.Type
numCalls uint
}
type service struct {
name string
rcvr reflect.Value
typ reflect.Type
method map[string]*methodType
}
// Is this an exported - upper case - name
func isExported(name string) bool {
rune, _ := utf8.DecodeRuneInString(name)
return unicode.IsUpper(rune)
}
// suitableMethods returns suitable Rpc methods of typ, it will report // Is this type exported or a builtin?
// error using log if reportErr is true. func isExportedOrBuiltinType(t reflect.Type) bool {
func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType { for t.Kind() == reflect.Ptr {
t = t.Elem()
}
// PkgPath will be non-empty even for an exported type,
// so we need to check the type name as well.
return isExported(t.Name()) || t.PkgPath() == ""
}
// prepareMethods returns suitable Rpc methods of typ
func prepareMethods(typ reflect.Type) map[string]*methodType {
methods := make(map[string]*methodType) methods := make(map[string]*methodType)
for m := 0; m < typ.NumMethod(); m++ { for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m) method := typ.Method(m)
...@@ -24,65 +59,37 @@ func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType { ...@@ -24,65 +59,37 @@ func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
} }
// Method needs three ins: receiver, *args, *reply. // Method needs three ins: receiver, *args, *reply.
if mtype.NumIn() != 3 { if mtype.NumIn() != 3 {
if reportErr { log.Warn("method %s has wrong number of ins %d which should be 3", mname, mtype.NumIn())
log.Warn("method", mname, "has wrong number of ins:", mtype.NumIn())
}
continue continue
} }
// First arg need not be a pointer. // First arg need not be a pointer.
argType := mtype.In(1) argType := mtype.In(1)
if !isExportedOrBuiltinType(argType) { if !isExportedOrBuiltinType(argType) {
if reportErr { log.Error("method{%s} argument type not exported{%v}", mname, argType)
log.Warn(mname, "argument type not exported:", argType)
}
continue continue
} }
// Second arg must be a pointer. // Second arg must be a pointer.
replyType := mtype.In(2) replyType := mtype.In(2)
if replyType.Kind() != reflect.Ptr { if replyType.Kind() != reflect.Ptr {
if reportErr { log.Error("method{%s} reply type not a pointer{%v}", mname, replyType)
log.Warn("method", mname, "reply type not a pointer:", replyType)
}
continue continue
} }
// Reply type must be exported. // Reply type must be exported.
if !isExportedOrBuiltinType(replyType) { if !isExportedOrBuiltinType(replyType) {
if reportErr { log.Error("method{%s} reply type not exported{%v}", mname, replyType)
log.Warn("method", mname, "reply type not exported:", replyType)
}
continue continue
} }
// Method needs one out. // Method needs one out.
if mtype.NumOut() != 1 { if mtype.NumOut() != 1 {
if reportErr { log.Error("method{%s} has wrong number of out parameters{%d}", mname, mtype.NumOut())
log.Warn("method", mname, "has wrong number of outs:", mtype.NumOut())
}
continue continue
} }
// The return type of the method must be error. // The return type of the method must be error.
if returnType := mtype.Out(0); returnType != typeOfError { if returnType := mtype.Out(0); returnType != typeOfError {
if reportErr { log.Error("method{%s}'s return type{%s} is not error", mname, returnType.String())
log.Warn("method", mname, "returns", returnType.String(), "not error")
}
continue continue
} }
methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType} methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
} }
return methods return methods
} }
// Is this an exported - upper case - name
func isExported(name string) bool {
rune, _ := utf8.DecodeRuneInString(name)
return unicode.IsUpper(rune)
}
// Is this type exported or a builtin?
func isExportedOrBuiltinType(t reflect.Type) bool {
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
// PkgPath will be non-empty even for an exported type,
// so we need to check the type name as well.
return isExported(t.Name()) || t.PkgPath() == ""
}
package rpc package rpc
import ( import (
"errors"
"fmt" "fmt"
"net" "net"
"os" "os"
...@@ -9,10 +8,12 @@ import ( ...@@ -9,10 +8,12 @@ import (
"reflect" "reflect"
"syscall" "syscall"
"time" "time"
)
import (
"github.com/AlexStocks/getty" "github.com/AlexStocks/getty"
"github.com/AlexStocks/goext/log"
"github.com/AlexStocks/goext/net" "github.com/AlexStocks/goext/net"
jerrors "github.com/juju/errors"
log "github.com/AlexStocks/log4go" log "github.com/AlexStocks/log4go"
) )
...@@ -20,72 +21,72 @@ import ( ...@@ -20,72 +21,72 @@ import (
type Server struct { type Server struct {
tcpServerList []getty.Server tcpServerList []getty.Server
serviceMap map[string]*service serviceMap map[string]*service
conf *Config
} }
func NewServer() *Server { func NewServer(conf *Config) *Server {
s := &Server{ s := &Server{
serviceMap: make(map[string]*service), serviceMap: make(map[string]*service),
conf: conf,
} }
return s return s
} }
func (server *Server) Run() { func (s *Server) Run() {
initConf(defaultServerConfFile) s.Init()
initLog(defaultServerLogConfFile)
initProfiling()
server.Init()
gxlog.CInfo("%s starts successfull! its version=%s, its listen ends=%s:%s\n",
conf.AppName, Version, conf.Host, conf.Ports)
log.Info("%s starts successfull! its version=%s, its listen ends=%s:%s\n", log.Info("%s starts successfull! its version=%s, its listen ends=%s:%s\n",
conf.AppName, Version, conf.Host, conf.Ports) s.conf.AppName, getty.Version, s.conf.Host, s.conf.Ports)
server.initSignal() s.initSignal()
} }
func (server *Server) Register(rcvr interface{}) error { func (s *Server) Register(rcvr interface{}) error {
s := new(service) svc := &service{
s.typ = reflect.TypeOf(rcvr) typ: reflect.TypeOf(rcvr),
s.rcvr = reflect.ValueOf(rcvr) rcvr: reflect.ValueOf(rcvr),
sname := reflect.Indirect(s.rcvr).Type().Name() name: reflect.Indirect(reflect.ValueOf(rcvr)).Type().Name(),
if sname == "" { // Install the methods
s := "rpc.Register: no service name for type " + s.typ.String() method: prepareMethods(reflect.TypeOf(rcvr)),
}
if svc.name == "" {
s := "rpc.Register: no service name for type " + svc.typ.String()
log.Error(s) log.Error(s)
return errors.New(s) return jerrors.New(s)
} }
if !isExported(sname) { if !isExported(svc.name) {
s := "rpc.Register: type " + sname + " is not exported" s := "rpc.Register: type " + svc.name + " is not exported"
log.Error(s) log.Error(s)
return errors.New(s) return jerrors.New(s)
} }
if _, present := server.serviceMap[sname]; present { if _, present := s.serviceMap[svc.name]; present {
return errors.New("rpc: service already defined: " + sname) return jerrors.New("rpc: service already defined: " + svc.name)
} }
s.name = sname
// Install the methods if len(svc.method) == 0 {
s.method = suitableMethods(s.typ, true)
if len(s.method) == 0 {
str := ""
// To help the user, see if a pointer receiver would work. // To help the user, see if a pointer receiver would work.
method := suitableMethods(reflect.PtrTo(s.typ), false) method := prepareMethods(reflect.PtrTo(svc.typ))
str := "rpc.Register: type " + svc.name + " has no exported methods of suitable type"
if len(method) != 0 { if len(method) != 0 {
str = "rpc.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)" str = "rpc.Register: type " + svc.name + " has no exported methods of suitable type (" +
} else { "hint: pass a pointer to value of that type)"
str = "rpc.Register: type " + sname + " has no exported methods of suitable type"
} }
log.Error(s) log.Error(str)
return errors.New(str)
return jerrors.New(str)
} }
server.serviceMap[s.name] = s
s.serviceMap[svc.name] = svc
return nil return nil
} }
func (server *Server) newSession(session getty.Session) error { func (s *Server) newSession(session getty.Session) error {
var ( var (
ok bool ok bool
tcpConn *net.TCPConn tcpConn *net.TCPConn
) )
if conf.GettySessionParam.CompressEncoding { if s.conf.GettySessionParam.CompressEncoding {
session.SetCompressType(getty.CompressZip) session.SetCompressType(getty.CompressZip)
} }
...@@ -93,59 +94,59 @@ func (server *Server) newSession(session getty.Session) error { ...@@ -93,59 +94,59 @@ func (server *Server) newSession(session getty.Session) error {
panic(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection\n", session.Stat(), session.Conn())) panic(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection\n", session.Stat(), session.Conn()))
} }
tcpConn.SetNoDelay(conf.GettySessionParam.TcpNoDelay) tcpConn.SetNoDelay(s.conf.GettySessionParam.TcpNoDelay)
tcpConn.SetKeepAlive(conf.GettySessionParam.TcpKeepAlive) tcpConn.SetKeepAlive(s.conf.GettySessionParam.TcpKeepAlive)
if conf.GettySessionParam.TcpKeepAlive { if s.conf.GettySessionParam.TcpKeepAlive {
tcpConn.SetKeepAlivePeriod(conf.GettySessionParam.keepAlivePeriod) tcpConn.SetKeepAlivePeriod(s.conf.GettySessionParam.keepAlivePeriod)
} }
tcpConn.SetReadBuffer(conf.GettySessionParam.TcpRBufSize) tcpConn.SetReadBuffer(s.conf.GettySessionParam.TcpRBufSize)
tcpConn.SetWriteBuffer(conf.GettySessionParam.TcpWBufSize) tcpConn.SetWriteBuffer(s.conf.GettySessionParam.TcpWBufSize)
session.SetName(conf.GettySessionParam.SessionName) session.SetName(s.conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen) session.SetMaxMsgLen(s.conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewRpcServerPacketHandler(server)) // session.SetPkgHandler(NewRpcServerPacketHandler(s)) //
session.SetEventListener(NewRpcServerHandler()) // session.SetEventListener(NewRpcServerHandler()) //
session.SetRQLen(conf.GettySessionParam.PkgRQSize) session.SetRQLen(s.conf.GettySessionParam.PkgRQSize)
session.SetWQLen(conf.GettySessionParam.PkgWQSize) session.SetWQLen(s.conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout) session.SetReadTimeout(s.conf.GettySessionParam.tcpReadTimeout)
session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout) session.SetWriteTimeout(s.conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(conf.sessionTimeout.Nanoseconds() / 1e6)) session.SetCronPeriod((int)(s.conf.sessionTimeout.Nanoseconds() / 1e6))
session.SetWaitTime(conf.GettySessionParam.waitTimeout) session.SetWaitTime(s.conf.GettySessionParam.waitTimeout)
log.Debug("app accepts new session:%s\n", session.Stat()) log.Debug("app accepts new session:%s\n", session.Stat())
return nil return nil
} }
func (server *Server) Init() { func (s *Server) Init() {
var ( var (
addr string addr string
portList []string portList []string
tcpServer getty.Server tcpServer getty.Server
) )
portList = conf.Ports portList = s.conf.Ports
if len(portList) == 0 { if len(portList) == 0 {
panic("portList is nil") panic("portList is nil")
} }
for _, port := range portList { for _, port := range portList {
addr = gxnet.HostAddress2(conf.Host, port) addr = gxnet.HostAddress2(s.conf.Host, port)
tcpServer = getty.NewTCPServer( tcpServer = getty.NewTCPServer(
getty.WithLocalAddress(addr), getty.WithLocalAddress(addr),
) )
// run server // run s
tcpServer.RunEventLoop(server.newSession) tcpServer.RunEventLoop(s.newSession)
log.Debug("server bind addr{%s} ok!", addr) log.Debug("s bind addr{%s} ok!", addr)
server.tcpServerList = append(server.tcpServerList, tcpServer) s.tcpServerList = append(s.tcpServerList, tcpServer)
} }
} }
func (server *Server) Stop() { func (s *Server) Stop() {
for _, tcpServer := range server.tcpServerList { for _, tcpServer := range s.tcpServerList {
tcpServer.Close() tcpServer.Close()
} }
} }
func (server *Server) initSignal() { func (s *Server) initSignal() {
// signal.Notify的ch信道是阻塞的(signal.Notify不会阻塞发送信号), 需要设置缓冲 // signal.Notify的ch信道是阻塞的(signal.Notify不会阻塞发送信号), 需要设置缓冲
signals := make(chan os.Signal, 1) signals := make(chan os.Signal, 1)
// It is not possible to block SIGKILL or syscall.SIGSTOP // It is not possible to block SIGKILL or syscall.SIGSTOP
...@@ -157,7 +158,7 @@ func (server *Server) initSignal() { ...@@ -157,7 +158,7 @@ func (server *Server) initSignal() {
case syscall.SIGHUP: case syscall.SIGHUP:
// reload() // reload()
default: default:
go time.AfterFunc(conf.failFastTimeout, func() { go time.AfterFunc(s.conf.failFastTimeout, func() {
// log.Warn("app exit now by force...") // log.Warn("app exit now by force...")
// os.Exit(1) // os.Exit(1)
log.Exit("app exit now by force...") log.Exit("app exit now by force...")
...@@ -165,7 +166,7 @@ func (server *Server) initSignal() { ...@@ -165,7 +166,7 @@ func (server *Server) initSignal() {
}) })
// 要么survialTimeout时间内执行完毕下面的逻辑然后程序退出,要么执行上面的超时函数程序强行退出 // 要么survialTimeout时间内执行完毕下面的逻辑然后程序退出,要么执行上面的超时函数程序强行退出
server.Stop() s.Stop()
// fmt.Println("app exit now...") // fmt.Println("app exit now...")
log.Exit("app exit now...") log.Exit("app exit now...")
log.Close() log.Close()
......
package rpc
import (
"reflect"
"sync"
)
type methodType struct {
sync.Mutex
method reflect.Method
ArgType reflect.Type
ReplyType reflect.Type
numCalls uint
}
type service struct {
name string
rcvr reflect.Value
typ reflect.Type
method map[string]*methodType
}
package rpc
import (
"net/http"
"github.com/AlexStocks/goext/net"
log "github.com/AlexStocks/log4go"
)
const (
pprofPath = "/debug/pprof/"
)
func initProfiling() {
var (
addr string
)
// addr = *host + ":" + "10000"
addr = gxnet.HostAddress(conf.Host, conf.ProfilePort)
log.Info("App Profiling startup on address{%v}", addr+pprofPath)
go func() {
log.Info(http.ListenAndServe(addr, nil))
}()
}
package rpc
var (
Version = "0.8.2"
)
...@@ -594,7 +594,7 @@ func (s *session) handleTCPPackage() error { ...@@ -594,7 +594,7 @@ func (s *session) handleTCPPackage() error {
// s.conn.SetReadTimeout(time.Now().Add(s.rTimeout)) // s.conn.SetReadTimeout(time.Now().Add(s.rTimeout))
bufLen, err = conn.read(buf) bufLen, err = conn.read(buf)
if err != nil { if err != nil {
if netError, ok = err.(net.Error); ok && netError.Timeout() { if netError, ok = jerrors.Cause(err).(net.Error); ok && netError.Timeout() {
break break
} }
log.Error("%s, [session.conn.read] = error{%s}", s.sessionToken(), jerrors.ErrorStack(err)) log.Error("%s, [session.conn.read] = error{%s}", s.sessionToken(), jerrors.ErrorStack(err))
...@@ -670,7 +670,7 @@ func (s *session) handleUDPPackage() error { ...@@ -670,7 +670,7 @@ func (s *session) handleUDPPackage() error {
bufLen, addr, err = conn.read(buf) bufLen, addr, err = conn.read(buf)
log.Debug("conn.read() = bufLen:%d, addr:%#v, err:%s", bufLen, addr, jerrors.ErrorStack(err)) log.Debug("conn.read() = bufLen:%d, addr:%#v, err:%s", bufLen, addr, jerrors.ErrorStack(err))
if netError, ok = err.(net.Error); ok && netError.Timeout() { if netError, ok = jerrors.Cause(err).(net.Error); ok && netError.Timeout() {
continue continue
} }
if err != nil { if err != nil {
...@@ -730,7 +730,7 @@ func (s *session) handleWSPackage() error { ...@@ -730,7 +730,7 @@ func (s *session) handleWSPackage() error {
break break
} }
pkg, err = conn.read() pkg, err = conn.read()
if netError, ok = err.(net.Error); ok && netError.Timeout() { if netError, ok = jerrors.Cause(err).(net.Error); ok && netError.Timeout() {
continue continue
} }
if err != nil { if err != nil {
......
...@@ -10,9 +10,9 @@ ...@@ -10,9 +10,9 @@
package getty package getty
const ( const (
Version = "0.8.4" Version = "0.9.1"
DATE = "2018/06/25" DATE = "2018/06/30"
GETTY_MAJOR = 0 GETTY_MAJOR = 0
GETTY_MINOR = 8 GETTY_MINOR = 9
GETTY_BUILD = 5 GETTY_BUILD = 1
) )
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment