Commit 4e0a57ed authored by AlexStocks's avatar AlexStocks

Mod: delete gxlog

parent 96c2170a
......@@ -176,7 +176,7 @@
END OF TERMS AND CONDITIONS
Copyright 2016 ~ 2017 Alex Stocks.
Copyright 2016 ~ 2018 Alex Stocks.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
......
# getty #
---
# getty
*a netty like asynchronous network I/O library*
## introdction ##
---
## INTRO
Getty is a asynchronous network I/O library in golang. Getty is based on "ngo" whose author is [sanbit](https://github.com/sanbit). Getty works on tcp/udp/websocket network protocol and supplies [a uniform interface](https://github.com/alexstocks/getty/blob/master/getty.go#L45).
......@@ -15,7 +14,29 @@ Whatever if you use websocket, you do not need to care about hearbeat request/re
You can get code example in https://github.com/AlexStocks/getty-examples.
## LICENCE ##
---
## RPC
A open source, Go based, RPC framework.
Feature list:
- 1 Transport: TCP(√), UDP, Websocket
- 2 Codec: ProtoBuf(√), JSON(√)
- 3 Service Discovery: Service Publish(X), Service Watch(X), Service Notify(X)
- 4 Registry: ZooKeeper(X), Etcd(x)
- 5 Strategy: Failover(√), Failfast(√)
- 6 Load Balance: Random(X), RoundRobin(X)
- 7 Metrics: Invoke Statistics(x), User Auth(x)
Code example:
The rpc dir of [getty-examples](https://github.com/alexstocks/getty-examples/) shows how to build rpc client/rpc server.
##
## LICENCE
Apache License 2.0
......@@ -14,6 +14,38 @@
## develop history ##
---
- 2018/08/07
> Improvement
* RPC package format: {2 Bytes Header len + Header + 2 Body len + Body} ---> {Header + Body}
> Bug Fix
* do not encode body if package body is nil
- 2018/07/01
> Feature
* Add RPC
- 2018/06/25
> buf fix
* Using juju/errors.Cause on read/write in case of network i/o timeout
- 2018/03/29
> improvement
* use juju/errors instead of pkg/errors
- 2018/03/20
> bug fix
* ignore connectPingPackage
- 2018/03/19
> improvement
* use gxnet.IsSameAddr
* send out pkg asap in WritePkg when the second parameter @timeout is not greater then 0.
* delete Chinese commenting
* gettyConn:readCount -> gettyConn:readBytes
* gettyConn:writeCount -> gettyConn:writeBytes
* gettyConn:readPkgCount -> gettyConn:readPkgNum
* gettyConn:writePkgCount -> gettyConn:writePkgNum
- 2018/03/18
> improvement
* nerr -> netError
......@@ -22,6 +54,8 @@
* close net.UDPConn when connected failed
* close net.Conn when connected failed
* Session::EndPointType() -> Session::EndPoint()
* time.Sleep() -> wheel.After()
* do not check server.go:server::caCert
- 2018/03/17
> improvement
......@@ -287,9 +321,9 @@
- 2016/08/29
> 1 rename reconnect to errFlag in function session.go:(Session)handlePackage
>
> 2 session.go:(gettyConn)readCount is reconsidered as read in tcp stream bytes
> 2 session.go:(gettyConn)readBytes is reconsidered as read in tcp stream bytes
>
> 3 session.go:(gettyConn)writeCount is reconsidered as write out tcp stream bytes
> 3 session.go:(gettyConn)writeBytes is reconsidered as write out tcp stream bytes
>
> 4 reconstruct session output token string session.go:(Session)sessionToken
>
......
......@@ -22,16 +22,21 @@ import (
)
import (
"github.com/AlexStocks/goext/net"
"github.com/AlexStocks/goext/sync"
log "github.com/AlexStocks/log4go"
"github.com/gorilla/websocket"
jerrors "github.com/juju/errors"
)
const (
connInterval = 3e9 // 3s
connectTimeout = 5e9
maxTimes = 10
pingPacket = "ping"
)
var (
connectPingPackage = []byte("connect-ping")
)
/////////////////////////////////////////
......@@ -126,7 +131,7 @@ func (c *client) dialTCP() Session {
return nil
}
conn, err = net.DialTimeout("tcp", c.addr, connectTimeout)
if err == nil && conn.LocalAddr().String() == conn.RemoteAddr().String() {
if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
conn.Close()
err = errSelfConnect
}
......@@ -134,8 +139,9 @@ func (c *client) dialTCP() Session {
return newTCPSession(conn, c)
}
log.Info("net.DialTimeout(addr:%s, timeout:%v) = error{%s}", c.addr, err)
time.Sleep(connInterval)
log.Info("net.DialTimeout(addr:%s, timeout:%v) = error{%s}", c.addr, jerrors.ErrorStack(err))
// time.Sleep(connInterval)
<-wheel.After(connInterval)
}
}
......@@ -157,34 +163,36 @@ func (c *client) dialUDP() Session {
return nil
}
conn, err = net.DialUDP("udp", localAddr, peerAddr)
if err == nil && conn.LocalAddr().String() == conn.RemoteAddr().String() {
if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
conn.Close()
err = errSelfConnect
}
if err != nil {
log.Warn("net.DialTimeout(addr:%s, timeout:%v) = error{%s}", c.addr, err)
time.Sleep(connInterval)
log.Warn("net.DialTimeout(addr:%s, timeout:%v) = error{%s}", c.addr, jerrors.ErrorStack(err))
// time.Sleep(connInterval)
<-wheel.After(connInterval)
continue
}
// check connection alive by write/read action
copy(buf, []byte(pingPacket))
conn.SetWriteDeadline(wheel.Now().Add(1e9))
if length, err = conn.Write(buf[:len(pingPacket)]); err != nil {
if length, err = conn.Write(connectPingPackage[:]); err != nil {
conn.Close()
log.Warn("conn.Write(%s) = {length:%d, err:%s}", pingPacket, length, err)
time.Sleep(connInterval)
log.Warn("conn.Write(%s) = {length:%d, err:%s}", string(connectPingPackage), length, jerrors.ErrorStack(err))
// time.Sleep(connInterval)
<-wheel.After(connInterval)
continue
}
conn.SetReadDeadline(wheel.Now().Add(1e9))
length, err = conn.Read(buf)
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
if netErr, ok := jerrors.Cause(err).(net.Error); ok && netErr.Timeout() {
err = nil
}
if err != nil {
log.Info("conn{%#v}.Read() = {length:%d, err:%s}", conn, length, err)
log.Info("conn{%#v}.Read() = {length:%d, err:%s}", conn, length, jerrors.ErrorStack(err))
conn.Close()
time.Sleep(connInterval)
// time.Sleep(connInterval)
<-wheel.After(connInterval)
continue
}
//if err == nil {
......@@ -207,8 +215,8 @@ func (c *client) dialWS() Session {
return nil
}
conn, _, err = dialer.Dial(c.addr, nil)
log.Info("websocket.dialer.Dial(addr:%s) = error:%s", c.addr, err)
if err == nil && conn.LocalAddr().String() == conn.RemoteAddr().String() {
log.Info("websocket.dialer.Dial(addr:%s) = error:%s", c.addr, jerrors.ErrorStack(err))
if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
conn.Close()
err = errSelfConnect
}
......@@ -221,8 +229,9 @@ func (c *client) dialWS() Session {
return ss
}
log.Info("websocket.dialer.Dial(addr:%s) = error:%s", c.addr, err)
time.Sleep(connInterval)
log.Info("websocket.dialer.Dial(addr:%s) = error:%s", c.addr, jerrors.ErrorStack(err))
// time.Sleep(connInterval)
<-wheel.After(connInterval)
}
}
......@@ -247,7 +256,7 @@ func (c *client) dialWSS() Session {
if c.cert != "" {
certPEMBlock, err := ioutil.ReadFile(c.cert)
if err != nil {
panic(fmt.Sprintf("ioutil.ReadFile(cert:%s) = error{%s}", c.cert, err))
panic(fmt.Sprintf("ioutil.ReadFile(cert:%s) = error{%s}", c.cert, jerrors.ErrorStack(err)))
}
var cert tls.Certificate
......@@ -269,7 +278,7 @@ func (c *client) dialWSS() Session {
for _, c := range config.Certificates {
roots, err = x509.ParseCertificates(c.Certificate[len(c.Certificate)-1])
if err != nil {
panic(fmt.Sprintf("error parsing server's root cert: %s\n", err))
panic(fmt.Sprintf("error parsing server's root cert: %s\n", jerrors.ErrorStack(err)))
}
for _, root = range roots {
certPool.AddCert(root)
......@@ -285,7 +294,7 @@ func (c *client) dialWSS() Session {
return nil
}
conn, _, err = dialer.Dial(c.addr, nil)
if err == nil && conn.LocalAddr().String() == conn.RemoteAddr().String() {
if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
conn.Close()
err = errSelfConnect
}
......@@ -299,8 +308,9 @@ func (c *client) dialWSS() Session {
return ss
}
log.Info("websocket.dialer.Dial(addr:%s) = error{%s}", c.addr, err)
time.Sleep(connInterval)
log.Info("websocket.dialer.Dial(addr:%s) = error{%s}", c.addr, jerrors.ErrorStack(err))
// time.Sleep(connInterval)
<-wheel.After(connInterval)
}
}
......@@ -361,11 +371,19 @@ func (c *client) connect() {
}
}
// there are two methods to keep connection pool. the first approch is like
// redigo's lazy connection pool(https://github.com/gomodule/redigo/blob/master/redis/pool.go:),
// in which you should apply testOnBorrow to check alive of the connection.
// the second way is as follows. @RunEventLoop detects the aliveness of the connection
// in regular time interval.
// the active method maybe overburden the cpu slightly.
// however, you can get a active tcp connection very quickly.
func (c *client) RunEventLoop(newSession NewSessionCallback) {
c.Lock()
c.newSession = newSession
c.Unlock()
log.Info("run")
c.wg.Add(1)
// a for-loop goroutine to make sure the connection is valid
go func() {
......@@ -389,7 +407,8 @@ func (c *client) RunEventLoop(newSession NewSessionCallback) {
if maxTimes < times {
times = maxTimes
}
time.Sleep(time.Duration(int64(times) * connInterval))
// time.Sleep(time.Duration(int64(times) * connInterval))
<-wheel.After(time.Duration(int64(times) * connInterval))
continue
}
times = 0
......
......@@ -15,6 +15,7 @@ import (
"fmt"
"io"
"net"
"sync"
"sync/atomic"
"time"
)
......@@ -23,12 +24,13 @@ import (
log "github.com/AlexStocks/log4go"
"github.com/golang/snappy"
"github.com/gorilla/websocket"
jerrors "github.com/juju/errors"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
)
var (
launchTime time.Time = time.Now()
launchTime = time.Now()
// ErrInvalidConnection = errors.New("connection has been closed.")
)
......@@ -46,10 +48,10 @@ type gettyConn struct {
compress CompressType
padding1 uint8
padding2 uint16
readCount uint32 // read count
writeCount uint32 // write count
readPkgCount uint32 // send pkg count
writePkgCount uint32 // recv pkg count
readBytes uint32 // read bytes
writeBytes uint32 // write bytes
readPkgNum uint32 // send pkg number
writePkgNum uint32 // recv pkg number
active int64 // last active, in milliseconds
rTimeout time.Duration // network current limiting
wTimeout time.Duration
......@@ -72,12 +74,12 @@ func (c *gettyConn) RemoteAddr() string {
return c.peer
}
func (c *gettyConn) incReadPkgCount() {
atomic.AddUint32(&c.readPkgCount, 1)
func (c *gettyConn) incReadPkgNum() {
atomic.AddUint32(&c.readPkgNum, 1)
}
func (c *gettyConn) incWritePkgCount() {
atomic.AddUint32(&c.writePkgCount, 1)
func (c *gettyConn) incWritePkgNum() {
atomic.AddUint32(&c.writePkgNum, 1)
}
func (c *gettyConn) UpdateActive() {
......@@ -179,6 +181,7 @@ func newGettyTCPConn(conn net.Conn) *gettyTCPConn {
// for zip compress
type writeFlusher struct {
flusher *flate.Writer
lock sync.Mutex
}
func (t *writeFlusher) Write(p []byte) (int, error) {
......@@ -186,13 +189,14 @@ func (t *writeFlusher) Write(p []byte) (int, error) {
n int
err error
)
t.lock.Lock()
defer t.lock.Unlock()
n, err = t.flusher.Write(p)
if err != nil {
return n, err
return n, jerrors.Trace(err)
}
if err := t.flusher.Flush(); err != nil {
return 0, err
return 0, jerrors.Trace(err)
}
return n, nil
......@@ -239,7 +243,7 @@ func (t *gettyTCPConn) read(p []byte) (int, error) {
currentTime = wheel.Now()
if currentTime.Sub(t.rLastDeadline) > (t.rTimeout >> 2) {
if err = t.conn.SetReadDeadline(currentTime.Add(t.rTimeout)); err != nil {
return 0, err
return 0, jerrors.Trace(err)
}
t.rLastDeadline = currentTime
}
......@@ -247,8 +251,9 @@ func (t *gettyTCPConn) read(p []byte) (int, error) {
length, err = t.reader.Read(p)
log.Debug("now:%s, length:%d, err:%s", currentTime, length, err)
atomic.AddUint32(&t.readCount, uint32(length))
return length, err
atomic.AddUint32(&t.readBytes, uint32(length))
return length, jerrors.Trace(err)
//return length, err
}
// tcp connection write
......@@ -258,10 +263,11 @@ func (t *gettyTCPConn) Write(pkg interface{}) (int, error) {
currentTime time.Time
ok bool
p []byte
length int
)
if p, ok = pkg.([]byte); !ok {
return 0, fmt.Errorf("illegal @pkg{%#v} type", pkg)
return 0, jerrors.Errorf("illegal @pkg{%#v} type", pkg)
}
if t.compress == CompressNone && t.wTimeout > 0 {
// Optimization: update write deadline only if more than 25%
......@@ -270,16 +276,18 @@ func (t *gettyTCPConn) Write(pkg interface{}) (int, error) {
currentTime = wheel.Now()
if currentTime.Sub(t.wLastDeadline) > (t.wTimeout >> 2) {
if err = t.conn.SetWriteDeadline(currentTime.Add(t.wTimeout)); err != nil {
return 0, err
return 0, jerrors.Trace(err)
}
t.wLastDeadline = currentTime
}
}
atomic.AddUint32(&t.writeCount, (uint32)(len(p)))
length, err := t.writer.Write(p)
if length, err = t.writer.Write(p); err == nil {
atomic.AddUint32(&t.writeBytes, (uint32)(len(p)))
}
log.Debug("now:%s, length:%d, err:%s", currentTime, length, err)
return length, err
return length, jerrors.Trace(err)
//return length, err
}
// close tcp connection
......@@ -291,7 +299,7 @@ func (t *gettyTCPConn) close(waitSec int) {
if t.conn != nil {
if writer, ok := t.writer.(*snappy.Writer); ok {
if err := writer.Close(); err != nil {
log.Error("snappy.Writer.Close() = error{%s}", err)
log.Error("snappy.Writer.Close() = error{%s}", jerrors.ErrorStack(err))
}
}
t.conn.(*net.TCPConn).SetLinger(waitSec)
......@@ -309,6 +317,10 @@ type UDPContext struct {
PeerAddr *net.UDPAddr
}
func (c UDPContext) String() string {
return fmt.Sprintf("{pkg:%#v, peer addr:%s}", c.Pkg, c.PeerAddr)
}
type gettyUDPConn struct {
gettyConn
compressType CompressType
......@@ -321,7 +333,7 @@ func setUDPSocketOptions(conn *net.UDPConn) error {
err6 := ipv6.NewPacketConn(conn).SetControlMessage(ipv6.FlagDst|ipv6.FlagInterface, true)
err4 := ipv4.NewPacketConn(conn).SetControlMessage(ipv4.FlagDst|ipv4.FlagInterface, true)
if err6 != nil && err4 != nil {
return err4
return jerrors.Trace(err4)
}
return nil
}
......@@ -381,7 +393,7 @@ func (u *gettyUDPConn) read(p []byte) (int, *net.UDPAddr, error) {
currentTime = wheel.Now()
if currentTime.Sub(u.rLastDeadline) > (u.rTimeout >> 2) {
if err = u.conn.SetReadDeadline(currentTime.Add(u.rTimeout)); err != nil {
return 0, nil, err
return 0, nil, jerrors.Trace(err)
}
u.rLastDeadline = currentTime
}
......@@ -390,10 +402,11 @@ func (u *gettyUDPConn) read(p []byte) (int, *net.UDPAddr, error) {
length, addr, err = u.conn.ReadFromUDP(p) // connected udp also can get return @addr
log.Debug("ReadFromUDP() = {length:%d, peerAddr:%s, error:%s}", length, addr, err)
if err == nil {
atomic.AddUint32(&u.readCount, uint32(length))
atomic.AddUint32(&u.readBytes, uint32(length))
}
return length, addr, err
//return length, addr, err
return length, addr, jerrors.Trace(err)
}
// write udp packet, @ctx should be of type UDPContext
......@@ -409,10 +422,10 @@ func (u *gettyUDPConn) Write(udpCtx interface{}) (int, error) {
)
if ctx, ok = udpCtx.(UDPContext); !ok {
return 0, fmt.Errorf("illegal @udpCtx{%#v} type", udpCtx)
return 0, jerrors.Errorf("illegal @udpCtx{%s} type, @udpCtx type:%T", udpCtx, udpCtx)
}
if buf, ok = ctx.Pkg.([]byte); !ok {
return 0, fmt.Errorf("illegal @udpCtx.Pkg{%#v} type", udpCtx)
return 0, jerrors.Errorf("illegal @udpCtx.Pkg{%#v} type", udpCtx)
}
if u.ss.EndPoint().EndPointType() == UDP_ENDPOINT {
peerAddr = ctx.PeerAddr
......@@ -428,18 +441,19 @@ func (u *gettyUDPConn) Write(udpCtx interface{}) (int, error) {
currentTime = wheel.Now()
if currentTime.Sub(u.wLastDeadline) > (u.wTimeout >> 2) {
if err = u.conn.SetWriteDeadline(currentTime.Add(u.wTimeout)); err != nil {
return 0, err
return 0, jerrors.Trace(err)
}
u.wLastDeadline = currentTime
}
}
atomic.AddUint32(&u.writeCount, (uint32)(len(buf)))
length, _, err = u.conn.WriteMsgUDP(buf, nil, peerAddr)
if length, _, err = u.conn.WriteMsgUDP(buf, nil, peerAddr); err == nil {
atomic.AddUint32(&u.writeBytes, (uint32)(len(buf)))
}
log.Debug("WriteMsgUDP(peerAddr:%s) = {length:%d, error:%s}", peerAddr, length, err)
return length, err
return length, jerrors.Trace(err)
//return length, err
}
// close udp connection
......@@ -515,7 +529,7 @@ func (w *gettyWSConn) handlePing(message string) error {
w.UpdateActive()
}
return err
return jerrors.Trace(err)
}
func (w *gettyWSConn) handlePong(string) error {
......@@ -529,14 +543,15 @@ func (w *gettyWSConn) read() ([]byte, error) {
// gorilla/websocket/conn.go:NextReader will always fail when got a timeout error.
_, b, e := w.conn.ReadMessage() // the first return value is message type.
if e == nil {
atomic.AddUint32(&w.readPkgCount, 1)
w.incReadPkgNum()
} else {
if websocket.IsUnexpectedCloseError(e, websocket.CloseGoingAway) {
log.Warn("websocket unexpected close error: %v", e)
}
}
return b, e
return b, jerrors.Trace(e)
//return b, e
}
func (w *gettyWSConn) updateWriteDeadline() error {
......@@ -552,7 +567,7 @@ func (w *gettyWSConn) updateWriteDeadline() error {
currentTime = wheel.Now()
if currentTime.Sub(w.wLastDeadline) > (w.wTimeout >> 2) {
if err = w.conn.SetWriteDeadline(currentTime.Add(w.wTimeout)); err != nil {
return err
return jerrors.Trace(err)
}
w.wLastDeadline = currentTime
}
......@@ -564,28 +579,31 @@ func (w *gettyWSConn) updateWriteDeadline() error {
// websocket connection write
func (w *gettyWSConn) Write(pkg interface{}) (int, error) {
var (
ok bool
p []byte
err error
ok bool
p []byte
)
if p, ok = pkg.([]byte); !ok {
return 0, fmt.Errorf("illegal @pkg{%#v} type", pkg)
return 0, jerrors.Errorf("illegal @pkg{%#v} type", pkg)
}
// atomic.AddUint32(&w.writeCount, 1)
atomic.AddUint32(&w.writeCount, (uint32)(len(p)))
w.updateWriteDeadline()
return len(p), w.conn.WriteMessage(websocket.BinaryMessage, p)
if err = w.conn.WriteMessage(websocket.BinaryMessage, p); err == nil {
atomic.AddUint32(&w.writeBytes, (uint32)(len(p)))
}
return len(p), jerrors.Trace(err)
//return len(p), err
}
func (w *gettyWSConn) writePing() error {
w.updateWriteDeadline()
return w.conn.WriteMessage(websocket.PingMessage, []byte{})
return jerrors.Trace(w.conn.WriteMessage(websocket.PingMessage, []byte{}))
}
func (w *gettyWSConn) writePong(message []byte) error {
w.updateWriteDeadline()
return w.conn.WriteMessage(websocket.PongMessage, message)
return jerrors.Trace(w.conn.WriteMessage(websocket.PongMessage, message))
}
// close websocket connection
......
......@@ -88,8 +88,8 @@ type Connection interface {
SetCompressType(CompressType)
LocalAddr() string
RemoteAddr() string
incReadPkgCount()
incWritePkgCount()
incReadPkgNum()
incWritePkgNum()
// update session's active time
UpdateActive()
// get session's active time
......@@ -115,7 +115,6 @@ type Connection interface {
var (
ErrSessionClosed = errors.New("session Already Closed")
ErrSessionBlocked = errors.New("session Full Blocked")
ErrMsgTooLong = errors.New("Message Too Long")
ErrNullPeerAddr = errors.New("peer address is nil")
)
......@@ -143,9 +142,9 @@ type Session interface {
SetAttribute(interface{}, interface{})
RemoveAttribute(interface{})
// the Writer will invoke this function.
// for udp session, the first parameter should be UDPContext. Otherwise its type is []byte.
WritePkg(interface{}, time.Duration) error
// the Writer will invoke this function. Pls attention that if timeout is less than 0, WritePkg will send @pkg asap.
// for udp session, the first parameter should be UDPContext.
WritePkg(pkg interface{}, timeout time.Duration) error
WriteBytes([]byte) error
WriteBytesArray(...[]byte) error
Close()
......
package getty
import (
"fmt"
"time"
)
var ErrScheduleTimeout = fmt.Errorf("schedule error: timed out")
type Pool struct {
sem chan struct{}
work chan func()
}
func NewPool(size, queue, spawn int) *Pool {
if spawn <= 0 && queue > 0 {
panic("dead queue configuration detected")
}
if spawn > size {
panic("spawn > workers")
}
p := &Pool{
sem: make(chan struct{}, size),
work: make(chan func(), queue),
}
for i := 0; i < spawn; i++ {
p.sem <- struct{}{}
go p.worker(func() {})
}
return p
}
func (p *Pool) ScheduleTimeout(timeout time.Duration, task func()) error {
return p.schedule(task, time.After(timeout))
}
func (p *Pool) schedule(task func(), timeout <-chan time.Time) error {
select {
case <-timeout:
return ErrScheduleTimeout
case p.work <- task:
return nil
case p.sem <- struct{}{}:
go p.worker(task)
return nil
}
}
func (p *Pool) worker(task func()) {
defer func() { <-p.sem }()
task()
for task := range p.work {
task()
}
}
......@@ -71,9 +71,9 @@ type ClientOptions struct {
addr string
number int
// for wss client
// 服务端的证书文件(包含了公钥以及服务端其他一些验证信息:服务端域名、
// 服务端ip、起始有效日期、有效时长、hash算法、秘钥长度等)
// the cert file of wss server which may contain server domain, server ip, the starting effective date, effective
// duration, the hash alg, the len of the private key.
// wss client will use it.
cert string
}
......
package rpc
import (
"math/rand"
"sync"
"time"
)
import (
"github.com/AlexStocks/getty"
"github.com/AlexStocks/goext/sync/atomic"
jerrors "github.com/juju/errors"
)
var (
errInvalidCodecType = jerrors.New("illegal CodecType")
errInvalidAddress = jerrors.New("remote address invalid or empty")
errSessionNotExist = jerrors.New("session not exist")
errClientClosed = jerrors.New("client closed")
errClientReadTimeout = jerrors.New("client read timeout")
)
func init() {
rand.Seed(time.Now().UnixNano())
}
type Client struct {
conf ClientConfig
pool *gettyRPCClientConnPool
sequence gxatomic.Uint64
pendingLock sync.RWMutex
pendingResponses map[SequenceType]*PendingResponse
}
func NewClient(conf *ClientConfig) (*Client, error) {
if err := conf.CheckValidity(); err != nil {
return nil, jerrors.Trace(err)
}
c := &Client{
pendingResponses: make(map[SequenceType]*PendingResponse),
conf: *conf,
}
c.pool = newGettyRPCClientConnPool(c, conf.PoolSize, time.Duration(int(time.Second)*conf.PoolTTL))
return c, nil
}
func (c *Client) Call(typ CodecType, addr, service, method string, args interface{}, reply interface{}) error {
if !typ.CheckValidity() {
return errInvalidCodecType
}
b := &GettyRPCRequest{}
b.header.Service = service
b.header.Method = method
b.header.CallType = CT_TwoWay
if reply == nil {
b.header.CallType = CT_TwoWayNoReply
}
b.body = args
rsp := NewPendingResponse()
rsp.reply = reply
var (
err error
session getty.Session
conn *gettyRPCClientConn
)
conn, session, err = c.selectSession(typ, addr)
if err != nil || session == nil {
return errSessionNotExist
}
defer c.pool.release(conn, err)
if err = c.transfer(session, typ, b, rsp); err != nil {
return jerrors.Trace(err)
}
select {
case <-getty.GetTimeWheel().After(c.conf.GettySessionParam.tcpReadTimeout):
err = errClientReadTimeout
c.RemovePendingResponse(SequenceType(rsp.seq))
case <-rsp.done:
err = rsp.err
}
return jerrors.Trace(err)
}
func (c *Client) Close() {
if c.pool != nil {
c.pool.close()
}
c.pool = nil
}
func (c *Client) selectSession(typ CodecType, addr string) (*gettyRPCClientConn, getty.Session, error) {
rpcConn, err := c.pool.getConn(typ.String(), addr)
if err != nil {
return nil, nil, jerrors.Trace(err)
}
return rpcConn, rpcConn.selectSession(), nil
}
func (c *Client) heartbeat(session getty.Session, typ CodecType) error {
rsp := NewPendingResponse()
return c.transfer(session, typ, nil, rsp)
}
func (c *Client) transfer(session getty.Session, typ CodecType, req *GettyRPCRequest, rsp *PendingResponse) error {
var (
sequence uint64
err error
pkg GettyPackage
)
sequence = c.sequence.Add(1)
pkg.H.Magic = MagicType(gettyPackageMagic)
pkg.H.LogID = LogIDType(randomID())
pkg.H.Sequence = SequenceType(sequence)
pkg.H.Command = gettyCmdHbRequest
pkg.H.CodecType = typ
if req != nil {
pkg.H.Command = gettyCmdRPCRequest
pkg.B = req
}
rsp.seq = sequence
c.AddPendingResponse(rsp)
err = session.WritePkg(pkg, 0)
if err != nil {
c.RemovePendingResponse(SequenceType(rsp.seq))
}
return jerrors.Trace(err)
}
func (c *Client) PendingResponseCount() int {
c.pendingLock.RLock()
defer c.pendingLock.RUnlock()
return len(c.pendingResponses)
}
func (c *Client) AddPendingResponse(pr *PendingResponse) {
c.pendingLock.Lock()
defer c.pendingLock.Unlock()
c.pendingResponses[SequenceType(pr.seq)] = pr
}
func (c *Client) RemovePendingResponse(seq SequenceType) *PendingResponse {
c.pendingLock.Lock()
defer c.pendingLock.Unlock()
if c.pendingResponses == nil {
return nil
}
if presp, ok := c.pendingResponses[seq]; ok {
delete(c.pendingResponses, seq)
return presp
}
return nil
}
func (c *Client) ClearPendingResponses() map[SequenceType]*PendingResponse {
c.pendingLock.Lock()
defer c.pendingLock.Unlock()
presps := c.pendingResponses
c.pendingResponses = nil
return presps
}
This diff is collapsed.
This diff is collapsed.
package rpc
import (
"time"
)
import (
jerrors "github.com/juju/errors"
)
type (
GettySessionParam struct {
CompressEncoding bool `default:"false" yaml:"compress_encoding" json:"compress_encoding,omitempty"`
TcpNoDelay bool `default:"true" yaml:"tcp_no_delay" json:"tcp_no_delay,omitempty"`
TcpKeepAlive bool `default:"true" yaml:"tcp_keep_alive" json:"tcp_keep_alive,omitempty"`
KeepAlivePeriod string `default:"180s" yaml:"keep_alive_period" json:"keep_alive_period,omitempty"`
keepAlivePeriod time.Duration
TcpRBufSize int `default:"262144" yaml:"tcp_r_buf_size" json:"tcp_r_buf_size,omitempty"`
TcpWBufSize int `default:"65536" yaml:"tcp_w_buf_size" json:"tcp_w_buf_size,omitempty"`
PkgRQSize int `default:"1024" yaml:"pkg_rq_size" json:"pkg_rq_size,omitempty"`
PkgWQSize int `default:"1024" yaml:"pkg_wq_size" json:"pkg_wq_size,omitempty"`
TcpReadTimeout string `default:"1s" yaml:"tcp_read_timeout" json:"tcp_read_timeout,omitempty"`
tcpReadTimeout time.Duration
TcpWriteTimeout string `default:"5s" yaml:"tcp_write_timeout" json:"tcp_write_timeout,omitempty"`
tcpWriteTimeout time.Duration
WaitTimeout string `default:"7s" yaml:"wait_timeout" json:"wait_timeout,omitempty"`
waitTimeout time.Duration
MaxMsgLen int `default:"1024" yaml:"max_msg_len" json:"max_msg_len,omitempty"`
SessionName string `default:"rpc" yaml:"session_name" json:"session_name,omitempty"`
}
RegistryConfig struct {
Type string `default:"etcd" yaml:"type" json:"type,omitempty"`
Addr string `default:"" yaml:"addr" json:"addr,omitempty"`
KeepaliveTimeout int `default:"5" yaml:"keepalive_time" json:"keepalive_timeout,omitempty"`
Root string `default:"getty" yaml:"keepalive_time" json:"keepalive_timeout,omitempty"`
IDC string `default:"idc-bj" yaml:"idc" json:"idc,omitempty"`
NodeID string `default:"node0" yaml:"node_id" json:"node_id,omitempty"`
}
// Config holds supported types by the multiconfig package
ServerConfig struct {
// local address
AppName string `default:"rcp-server" yaml:"app_name" json:"app_name,omitempty"`
Host string `default:"127.0.0.1" yaml:"host" json:"host,omitempty"`
Ports []string `yaml:"ports" json:"ports,omitempty"` // `default:["10000"]`
ProfilePort int `default:"10086" yaml:"profile_port" json:"profile_port,omitempty"`
// session
SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"`
sessionTimeout time.Duration
SessionNumber int `default:"1000" yaml:"session_number" json:"session_number,omitempty"`
// app
FailFastTimeout string `default:"5s" yaml:"fail_fast_timeout" json:"fail_fast_timeout,omitempty"`
failFastTimeout time.Duration
// session tcp parameters
GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"`
}
// Config holds supported types by the multiconfig package
ClientConfig struct {
// local address
AppName string `default:"rcp-client" yaml:"app_name" json:"app_name,omitempty"`
Host string `default:"127.0.0.1" yaml:"host" json:"host,omitempty"`
ProfilePort int `default:"10086" yaml:"profile_port" json:"profile_port,omitempty"`
// session pool
ConnectionNum int `default:"16" yaml:"connection_num" json:"connection_num,omitempty"`
// heartbeat
HeartbeatPeriod string `default:"15s" yaml:"heartbeat_period" json:"heartbeat_period, omitempty"`
heartbeatPeriod time.Duration
// session
SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"`
sessionTimeout time.Duration
// app
FailFastTimeout string `default:"5s" yaml:"fail_fast_timeout" json:"fail_fast_timeout, omitempty"`
failFastTimeout time.Duration
// Connection Pool
PoolSize int `default:"2" yaml:"pool_size" json:"pool_size,omitempty"`
PoolTTL int `default:"180" yaml:"pool_ttl" json:"pool_ttl,omitempty"`
// session tcp parameters
GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"`
}
)
func (c *GettySessionParam) CheckValidity() error {
var err error
if c.keepAlivePeriod, err = time.ParseDuration(c.KeepAlivePeriod); err != nil {
return jerrors.Annotatef(err, "time.ParseDuration(KeepAlivePeriod{%#v})", c.KeepAlivePeriod)
}
if c.tcpReadTimeout, err = time.ParseDuration(c.TcpReadTimeout); err != nil {
return jerrors.Annotatef(err, "time.ParseDuration(TcpReadTimeout{%#v})", c.TcpReadTimeout)
}
if c.tcpWriteTimeout, err = time.ParseDuration(c.TcpWriteTimeout); err != nil {
return jerrors.Annotatef(err, "time.ParseDuration(TcpWriteTimeout{%#v})", c.TcpWriteTimeout)
}
if c.waitTimeout, err = time.ParseDuration(c.WaitTimeout); err != nil {
return jerrors.Annotatef(err, "time.ParseDuration(WaitTimeout{%#v})", c.WaitTimeout)
}
return nil
}
func (c *ClientConfig) CheckValidity() error {
var err error
if c.heartbeatPeriod, err = time.ParseDuration(c.HeartbeatPeriod); err != nil {
return jerrors.Annotatef(err, "time.ParseDuration(HeartbeatPeroid{%#v})", c.HeartbeatPeriod)
}
if c.sessionTimeout, err = time.ParseDuration(c.SessionTimeout); err != nil {
return jerrors.Annotatef(err, "time.ParseDuration(SessionTimeout{%#v})", c.SessionTimeout)
}
if c.failFastTimeout, err = time.ParseDuration(c.FailFastTimeout); err != nil {
return jerrors.Annotatef(err, "time.ParseDuration(FailFastTimeout{%#v})", c.FailFastTimeout)
}
return jerrors.Trace(c.GettySessionParam.CheckValidity())
}
func (c *ServerConfig) CheckValidity() error {
var err error
if c.sessionTimeout, err = time.ParseDuration(c.SessionTimeout); err != nil {
return jerrors.Annotatef(err, "time.ParseDuration(SessionTimeout{%#v})", c.SessionTimeout)
}
if c.failFastTimeout, err = time.ParseDuration(c.FailFastTimeout); err != nil {
return jerrors.Annotatef(err, "time.ParseDuration(FailFastTimeout{%#v})", c.FailFastTimeout)
}
return jerrors.Trace(c.GettySessionParam.CheckValidity())
}
package rpc
import (
"reflect"
"sync"
"time"
)
import (
"github.com/AlexStocks/getty"
jerrors "github.com/juju/errors"
log "github.com/AlexStocks/log4go"
)
var (
errTooManySessions = jerrors.New("too many echo sessions")
)
type rpcSession struct {
session getty.Session
reqNum int32
}
////////////////////////////////////////////
// RpcServerHandler
////////////////////////////////////////////
type RpcServerHandler struct {
maxSessionNum int
sessionTimeout time.Duration
sessionMap map[getty.Session]*rpcSession
rwlock sync.RWMutex
}
func NewRpcServerHandler(maxSessionNum int, sessionTimeout time.Duration) *RpcServerHandler {
return &RpcServerHandler{
maxSessionNum: maxSessionNum,
sessionTimeout: sessionTimeout,
sessionMap: make(map[getty.Session]*rpcSession),
}
}
func (h *RpcServerHandler) OnOpen(session getty.Session) error {
var err error
h.rwlock.RLock()
if h.maxSessionNum < len(h.sessionMap) {
err = errTooManySessions
}
h.rwlock.RUnlock()
if err != nil {
return jerrors.Trace(err)
}
log.Info("got session:%s", session.Stat())
h.rwlock.Lock()
h.sessionMap[session] = &rpcSession{session: session}
h.rwlock.Unlock()
return nil
}
func (h *RpcServerHandler) OnError(session getty.Session, err error) {
log.Info("session{%s} got error{%v}, will be closed.", session.Stat(), err)
h.rwlock.Lock()
delete(h.sessionMap, session)
h.rwlock.Unlock()
}
func (h *RpcServerHandler) OnClose(session getty.Session) {
log.Info("session{%s} is closing......", session.Stat())
h.rwlock.Lock()
delete(h.sessionMap, session)
h.rwlock.Unlock()
}
func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
h.rwlock.Lock()
if _, ok := h.sessionMap[session]; ok {
h.sessionMap[session].reqNum++
}
h.rwlock.Unlock()
req, ok := pkg.(GettyRPCRequestPackage)
if !ok {
log.Error("illegal packge{%#v}", pkg)
return
}
// heartbeat
if req.H.Command == gettyCmdHbRequest {
h.replyCmd(session, req, gettyCmdHbResponse, "")
return
}
if req.header.CallType == CT_TwoWayNoReply {
h.replyCmd(session, req, gettyCmdRPCResponse, "")
function := req.methodType.method.Func
function.Call([]reflect.Value{req.service.rcvr, req.argv, req.replyv})
return
}
h.callService(session, req, req.service, req.methodType, req.argv, req.replyv)
}
func (h *RpcServerHandler) OnCron(session getty.Session) {
var (
flag bool
active time.Time
)
h.rwlock.RLock()
if _, ok := h.sessionMap[session]; ok {
active = session.GetActive()
if h.sessionTimeout.Nanoseconds() < time.Since(active).Nanoseconds() {
flag = true
log.Warn("session{%s} timeout{%s}, reqNum{%d}",
session.Stat(), time.Since(active).String(), h.sessionMap[session].reqNum)
}
}
h.rwlock.RUnlock()
if flag {
h.rwlock.Lock()
delete(h.sessionMap, session)
h.rwlock.Unlock()
session.Close()
}
}
func (h *RpcServerHandler) replyCmd(session getty.Session, req GettyRPCRequestPackage, cmd gettyCommand, err string) {
resp := GettyPackage{
H: req.H,
}
resp.H.Command = cmd
if len(err) != 0 {
resp.H.Code = GettyFail
resp.B = &GettyRPCResponse{
header: GettyRPCResponseHeader{
Error: err,
},
}
}
session.WritePkg(resp, 5*time.Second)
}
func (h *RpcServerHandler) callService(session getty.Session, req GettyRPCRequestPackage,
service *service, methodType *methodType, argv, replyv reflect.Value) {
function := methodType.method.Func
returnValues := function.Call([]reflect.Value{service.rcvr, argv, replyv})
errInter := returnValues[0].Interface()
if errInter != nil {
h.replyCmd(session, req, gettyCmdRPCResponse, errInter.(error).Error())
return
}
resp := GettyPackage{
H: req.H,
}
resp.H.Code = GettyOK
resp.H.Command = gettyCmdRPCResponse
resp.B = &GettyRPCResponse{
body: replyv.Interface(),
}
session.WritePkg(resp, 5*time.Second)
}
////////////////////////////////////////////
// RpcClientHandler
////////////////////////////////////////////
type RpcClientHandler struct {
conn *gettyRPCClientConn
}
func NewRpcClientHandler(client *gettyRPCClientConn) *RpcClientHandler {
return &RpcClientHandler{conn: client}
}
func (h *RpcClientHandler) OnOpen(session getty.Session) error {
h.conn.addSession(session)
return nil
}
func (h *RpcClientHandler) OnError(session getty.Session, err error) {
log.Info("session{%s} got error{%v}, will be closed.", session.Stat(), err)
h.conn.removeSession(session)
}
func (h *RpcClientHandler) OnClose(session getty.Session) {
log.Info("session{%s} is closing......", session.Stat())
h.conn.removeSession(session)
}
func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
p, ok := pkg.(*GettyRPCResponsePackage)
if !ok {
log.Error("illegal packge{%#v}", pkg)
return
}
log.Debug("get rpc response{%s}", p)
h.conn.updateSession(session)
pendingResponse := h.conn.pool.rpcClient.RemovePendingResponse(p.H.Sequence)
if pendingResponse == nil {
return
}
if p.H.Command == gettyCmdHbResponse {
return
}
if p.H.Code == GettyFail && len(p.header.Error) > 0 {
pendingResponse.err = jerrors.New(p.header.Error)
pendingResponse.done <- struct{}{}
return
}
codec := Codecs[p.H.CodecType]
if codec == nil {
pendingResponse.err = jerrors.Errorf("can not find codec for %d", p.H.CodecType)
pendingResponse.done <- struct{}{}
return
}
err := codec.Decode(p.body, pendingResponse.reply)
if err != nil {
pendingResponse.err = err
pendingResponse.done <- struct{}{}
return
}
pendingResponse.done <- struct{}{}
}
func (h *RpcClientHandler) OnCron(session getty.Session) {
rpcSession, err := h.conn.getClientRpcSession(session)
if err != nil {
log.Error("client.getClientSession(session{%s}) = error{%s}",
session.Stat(), jerrors.ErrorStack(err))
return
}
if h.conn.pool.rpcClient.conf.sessionTimeout.Nanoseconds() < time.Since(session.GetActive()).Nanoseconds() {
log.Warn("session{%s} timeout{%s}, reqNum{%d}",
session.Stat(), time.Since(session.GetActive()).String(), rpcSession.reqNum)
h.conn.removeSession(session) // -> h.conn.close() -> h.conn.pool.remove(h.conn)
return
}
codecType := GetCodecType(h.conn.protocol)
h.conn.pool.rpcClient.heartbeat(session, codecType)
}
package rpc
import (
"fmt"
"math/rand"
"net"
"strings"
"sync"
"time"
)
import (
"github.com/AlexStocks/getty"
log "github.com/AlexStocks/log4go"
jerrors "github.com/juju/errors"
)
type gettyRPCClientConn struct {
once sync.Once
protocol string
addr string
created int64 // 为0,则说明没有被创建或者被销毁了
pool *gettyRPCClientConnPool
lock sync.RWMutex
gettyClient getty.Client
sessions []*rpcSession
}
var (
errClientPoolClosed = jerrors.New("client pool closed")
)
func newGettyRPCClientConn(pool *gettyRPCClientConnPool, protocol, addr string) (*gettyRPCClientConn, error) {
c := &gettyRPCClientConn{
protocol: protocol,
addr: addr,
pool: pool,
gettyClient: getty.NewTCPClient(
getty.WithServerAddress(addr),
getty.WithConnectionNumber((int)(pool.rpcClient.conf.ConnectionNum)),
),
}
c.gettyClient.RunEventLoop(c.newSession)
idx := 1
for {
idx++
if c.isAvailable() {
break
}
if idx > 5000 {
return nil, jerrors.New(fmt.Sprintf("failed to create client connection to %s in 5 seconds", addr))
}
time.Sleep(1e6)
}
log.Info("client init ok")
c.created = time.Now().Unix()
return c, nil
}
func (c *gettyRPCClientConn) newSession(session getty.Session) error {
var (
ok bool
tcpConn *net.TCPConn
conf ClientConfig
)
conf = c.pool.rpcClient.conf
if conf.GettySessionParam.CompressEncoding {
session.SetCompressType(getty.CompressZip)
}
if tcpConn, ok = session.Conn().(*net.TCPConn); !ok {
panic(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection\n", session.Stat(), session.Conn()))
}
tcpConn.SetNoDelay(conf.GettySessionParam.TcpNoDelay)
tcpConn.SetKeepAlive(conf.GettySessionParam.TcpKeepAlive)
if conf.GettySessionParam.TcpKeepAlive {
tcpConn.SetKeepAlivePeriod(conf.GettySessionParam.keepAlivePeriod)
}
tcpConn.SetReadBuffer(conf.GettySessionParam.TcpRBufSize)
tcpConn.SetWriteBuffer(conf.GettySessionParam.TcpWBufSize)
session.SetName(conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewRpcClientPackageHandler())
session.SetEventListener(NewRpcClientHandler(c))
session.SetRQLen(conf.GettySessionParam.PkgRQSize)
session.SetWQLen(conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)
session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(conf.heartbeatPeriod.Nanoseconds() / 1e6))
session.SetWaitTime(conf.GettySessionParam.waitTimeout)
log.Debug("client new session:%s\n", session.Stat())
return nil
}
func (c *gettyRPCClientConn) selectSession() getty.Session {
c.lock.RLock()
defer c.lock.RUnlock()
if c.sessions == nil {
return nil
}
count := len(c.sessions)
if count == 0 {
return nil
}
return c.sessions[rand.Int31n(int32(count))].session
}
func (c *gettyRPCClientConn) addSession(session getty.Session) {
log.Debug("add session{%s}", session.Stat())
if session == nil {
return
}
c.lock.Lock()
c.sessions = append(c.sessions, &rpcSession{session: session})
c.lock.Unlock()
}
func (c *gettyRPCClientConn) removeSession(session getty.Session) {
if session == nil {
return
}
c.lock.Lock()
defer c.lock.Unlock()
if c.sessions == nil {
return
}
for i, s := range c.sessions {
if s.session == session {
c.sessions = append(c.sessions[:i], c.sessions[i+1:]...)
log.Debug("delete session{%s}, its index{%d}", session.Stat(), i)
break
}
}
log.Info("after remove session{%s}, left session number:%d", session.Stat(), len(c.sessions))
if len(c.sessions) == 0 {
c.close() // -> pool.remove(c)
}
}
func (c *gettyRPCClientConn) updateSession(session getty.Session) {
if session == nil {
return
}
c.lock.Lock()
defer c.lock.Unlock()
if c.sessions == nil {
return
}
for i, s := range c.sessions {
if s.session == session {
c.sessions[i].reqNum++
break
}
}
}
func (c *gettyRPCClientConn) getClientRpcSession(session getty.Session) (rpcSession, error) {
var (
err error
rpcSession rpcSession
)
c.lock.Lock()
defer c.lock.Unlock()
if c.sessions == nil {
return rpcSession, errClientClosed
}
err = errSessionNotExist
for _, s := range c.sessions {
if s.session == session {
rpcSession = *s
err = nil
break
}
}
return rpcSession, jerrors.Trace(err)
}
func (c *gettyRPCClientConn) isAvailable() bool {
if c.selectSession() == nil {
return false
}
return true
}
func (c *gettyRPCClientConn) close() error {
err := jerrors.Errorf("close gettyRPCClientConn{%#v} again", c)
c.once.Do(func() {
// delete @c from client pool
c.pool.remove(c)
for _, s := range c.sessions {
log.Info("close client session{%s, last active:%s, request number:%d}",
s.session.Stat(), s.session.GetActive().String(), s.reqNum)
s.session.Close()
}
c.gettyClient.Close()
c.gettyClient = nil
c.sessions = c.sessions[:0]
c.created = 0
err = nil
})
return err
}
type gettyRPCClientConnPool struct {
rpcClient *Client
size int // []*gettyRPCClientConn数组的size
ttl int64 // 每个gettyRPCClientConn的有效期时间. pool对象会在getConn时执行ttl检查
sync.Mutex
connMap map[string][]*gettyRPCClientConn // 从[]*gettyRPCClientConn 可见key是连接地址,而value是对应这个地址的连接数组
}
func newGettyRPCClientConnPool(rpcClient *Client, size int, ttl time.Duration) *gettyRPCClientConnPool {
return &gettyRPCClientConnPool{
rpcClient: rpcClient,
size: size,
ttl: int64(ttl.Seconds()),
connMap: make(map[string][]*gettyRPCClientConn),
}
}
func (p *gettyRPCClientConnPool) close() {
p.Lock()
connMap := p.connMap
p.connMap = nil
p.Unlock()
for _, connArray := range connMap {
for _, conn := range connArray {
conn.close()
}
}
}
func (p *gettyRPCClientConnPool) getConn(protocol, addr string) (*gettyRPCClientConn, error) {
var builder strings.Builder
builder.WriteString(addr)
builder.WriteString("@")
builder.WriteString(protocol)
key := builder.String()
p.Lock()
defer p.Unlock()
if p.connMap == nil {
return nil, errClientPoolClosed
}
connArray := p.connMap[key]
now := time.Now().Unix()
for len(connArray) > 0 {
conn := connArray[len(connArray)-1]
connArray = connArray[:len(connArray)-1]
p.connMap[key] = connArray
if d := now - conn.created; d > p.ttl {
conn.close() // -> pool.remove(c)
continue
}
return conn, nil
}
// create new conn
return newGettyRPCClientConn(p, protocol, addr)
}
func (p *gettyRPCClientConnPool) release(conn *gettyRPCClientConn, err error) {
if conn == nil || conn.created == 0 {
return
}
if err != nil {
conn.close()
return
}
var builder strings.Builder
builder.WriteString(conn.addr)
builder.WriteString("@")
builder.WriteString(conn.protocol)
key := builder.String()
p.Lock()
defer p.Unlock()
if p.connMap == nil {
return
}
connArray := p.connMap[key]
if len(connArray) >= p.size {
p.Unlock()
conn.close()
return
}
p.connMap[key] = append(connArray, conn)
}
func (p *gettyRPCClientConnPool) remove(conn *gettyRPCClientConn) {
if conn == nil || conn.created == 0 {
return
}
var builder strings.Builder
builder.WriteString(conn.addr)
builder.WriteString("@")
builder.WriteString(conn.protocol)
key := builder.String()
p.Lock()
defer p.Unlock()
if p.connMap == nil {
return
}
connArray := p.connMap[key]
if len(connArray) > 0 {
for idx, c := range connArray {
if conn == c {
p.connMap[key] = append(connArray[:idx], connArray[idx+1:]...)
break
}
}
}
}
syntax = "proto2";
package rpc;
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
option (gogoproto.gostring_all) = true;
option (gogoproto.equal_all) = true;
option (gogoproto.verbose_equal_all) = true;
// option (gogoproto.goproto_stringer_all) = false;
// option (gogoproto.stringer_all) = true;
// option (gogoproto.populate_all) = true;
// option (gogoproto.testgen_all) = true;
// option (gogoproto.benchgen_all) = true;
option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
option (gogoproto.goproto_getters_all) = false;
option (gogoproto.goproto_enum_prefix_all) = false;
//////////////////////////////////////////
// Request Header
//////////////////////////////////////////
enum CallType {
CT_UNKOWN = 0;
CT_OneWay = 1;
CT_TwoWay = 2;
CT_TwoWayNoReply = 3;
}
message GettyRPCRequestHeader {
optional string Service = 1 [(gogoproto.nullable) = false];
optional string Method = 2 [(gogoproto.nullable) = false];
optional CallType CallType = 3 [(gogoproto.nullable) = false];
}
message GettyRPCResponseHeader {
optional string Error = 1 [(gogoproto.nullable) = false];
}
#!/usr/bin/env bash
# ******************************************************
# DESC :
# AUTHOR : Alex Stocks
# VERSION : 1.0
# LICENCE : Apache License 2.0
# EMAIL : alexstocks@foxmail.com
# MOD : 2017-09-04 22:53
# FILE : pb.sh
# ******************************************************
# descriptor.proto
gopath=~/test/golang/lib/src/github.com/gogo/protobuf/protobuf
# If you are using any gogo.proto extensions you will need to specify the
# proto_path to include the descriptor.proto and gogo.proto.
# gogo.proto is located in github.com/gogo/protobuf/gogoproto
gogopath=~/test/golang/lib/src/
# protoc -I=$gopath:$gogopath:/Users/alex/test/golang/lib/src/github.com/AlexStocks/goext/database/redis/:./ --gogoslick_out=Mredis_meta.proto="github.com/AlexStocks/goext/database/redis":../app/ cluster_meta.proto
# protoc -I=$gopath:$gogopath:/Users/alex/test/golang/lib/src/github.com/AlexStocks/goext/database/redis/:./ --gogoslick_out=Mredis_meta.proto="github.com/AlexStocks/goext/database/redis":../app/ response.proto
# protoc -I=$gopath:$gogopath:./ --gogoslick_out=Mrole.proto="github.com/AlexStocks/goext/database/registry":./src/ service.proto
protoc -I=$gopath:$gogopath:./ --gogoslick_out=../ codec.proto
package rpc
import (
"bytes"
"reflect"
)
import (
"github.com/AlexStocks/getty"
log "github.com/AlexStocks/log4go"
jerrors "github.com/juju/errors"
)
////////////////////////////////////////////
// RpcServerPackageHandler
////////////////////////////////////////////
type RpcServerPackageHandler struct {
server *Server
}
func NewRpcServerPackageHandler(server *Server) *RpcServerPackageHandler {
return &RpcServerPackageHandler{
server: server,
}
}
func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
pkg := &GettyPackage{
B: NewGettyRPCRequest(),
}
buf := bytes.NewBuffer(data)
length, err := pkg.Unmarshal(buf)
if err != nil {
if jerrors.Cause(err) == ErrNotEnoughStream {
return nil, 0, nil
}
return nil, 0, jerrors.Trace(err)
}
req := GettyRPCRequestPackage{
H: pkg.H,
header: pkg.B.GetHeader().(GettyRPCRequestHeader),
}
if req.H.Command == gettyCmdHbRequest {
return req, length, nil
}
// get service & method
req.service = p.server.serviceMap[req.header.Service]
if req.service != nil {
req.methodType = req.service.method[req.header.Method]
}
if req.service == nil {
return nil, 0, jerrors.Errorf("request service is nil")
}
if req.methodType == nil {
return nil, 0, jerrors.Errorf("request method is nil")
}
// get args
argIsValue := false
if req.methodType.ArgType.Kind() == reflect.Ptr {
req.argv = reflect.New(req.methodType.ArgType.Elem())
} else {
req.argv = reflect.New(req.methodType.ArgType)
argIsValue = true
}
codec := Codecs[req.H.CodecType]
if codec == nil {
return nil, 0, jerrors.Errorf("can not find codec for %d", req.H.CodecType)
}
err = codec.Decode(pkg.B.GetBody(), req.argv.Interface())
if err != nil {
return nil, 0, jerrors.Trace(err)
}
if argIsValue {
req.argv = req.argv.Elem()
}
// get reply
req.replyv = reflect.New(req.methodType.ReplyType.Elem())
return req, length, nil
}
func (p *RpcServerPackageHandler) Write(ss getty.Session, pkg interface{}) error {
resp, ok := pkg.(GettyPackage)
if !ok {
log.Error("illegal pkg:%+v\n", pkg)
return jerrors.New("invalid rpc response")
}
buf, err := resp.Marshal()
if err != nil {
log.Warn("binary.Write(resp{%#v}) = err{%#v}", resp, err)
return jerrors.Trace(err)
}
return jerrors.Trace(ss.WriteBytes(buf.Bytes()))
}
////////////////////////////////////////////
// RpcClientPackageHandler
////////////////////////////////////////////
type RpcClientPackageHandler struct {
}
func NewRpcClientPackageHandler() *RpcClientPackageHandler {
return &RpcClientPackageHandler{}
}
func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
pkg := &GettyPackage{
B: NewGettyRPCResponse(),
}
buf := bytes.NewBuffer(data)
length, err := pkg.Unmarshal(buf)
if err != nil {
if err == ErrNotEnoughStream {
return nil, 0, nil
}
return nil, 0, jerrors.Trace(err)
}
resp := &GettyRPCResponsePackage{
H: pkg.H,
header: pkg.B.GetHeader().(GettyRPCResponseHeader),
}
if pkg.H.Command != gettyCmdHbResponse {
resp.body = pkg.B.GetBody()
}
return resp, length, nil
}
func (p *RpcClientPackageHandler) Write(ss getty.Session, pkg interface{}) error {
req, ok := pkg.(GettyPackage)
if !ok {
log.Error("illegal pkg:%+v\n", pkg)
return jerrors.New("invalid rpc request")
}
buf, err := req.Marshal()
if err != nil {
log.Warn("binary.Write(req{%#v}) = err{%#v}", req, jerrors.ErrorStack(err))
return jerrors.Trace(err)
}
return jerrors.Trace(ss.WriteBytes(buf.Bytes()))
}
package rpc
import (
"reflect"
"sync"
"unicode"
"unicode/utf8"
)
import (
log "github.com/AlexStocks/log4go"
)
var (
typeOfError = reflect.TypeOf((*error)(nil)).Elem()
)
type GettyRPCService interface {
Service() string // Service Interface
Version() string
}
type methodType struct {
sync.Mutex
method reflect.Method
ArgType reflect.Type
ReplyType reflect.Type
}
type service struct {
name string
rcvr reflect.Value
typ reflect.Type
method map[string]*methodType
}
// Is this an exported - upper case - name
func isExported(name string) bool {
rune, _ := utf8.DecodeRuneInString(name)
return unicode.IsUpper(rune)
}
// Is this type exported or a builtin?
func isExportedOrBuiltinType(t reflect.Type) bool {
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
// PkgPath will be non-empty even for an exported type,
// so we need to check the type name as well.
return isExported(t.Name()) || t.PkgPath() == ""
}
// suitableMethods returns suitable Rpc methods of typ
func suitableMethods(typ reflect.Type) map[string]*methodType {
methods := make(map[string]*methodType)
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
mtype := method.Type
mname := method.Name
// Method must be exported.
if method.PkgPath != "" {
continue
}
// Method needs three ins: receiver, *args, *reply.
if mtype.NumIn() != 3 {
log.Warn("method %s has wrong number of ins %d which should be 3", mname, mtype.NumIn())
continue
}
// First arg need not be a pointer.
argType := mtype.In(1)
if !isExportedOrBuiltinType(argType) {
log.Error("method{%s} argument type not exported{%v}", mname, argType)
continue
}
// Second arg must be a pointer.
replyType := mtype.In(2)
if replyType.Kind() != reflect.Ptr {
log.Error("method{%s} reply type not a pointer{%v}", mname, replyType)
continue
}
// Reply type must be exported.
if !isExportedOrBuiltinType(replyType) {
log.Error("method{%s} reply type not exported{%v}", mname, replyType)
continue
}
// Method needs one out.
if mtype.NumOut() != 1 {
log.Error("method{%s} has wrong number of out parameters{%d}", mname, mtype.NumOut())
continue
}
// The return type of the method must be error.
if returnType := mtype.Out(0); returnType != typeOfError {
log.Error("method{%s}'s return type{%s} is not error", mname, returnType.String())
continue
}
methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
}
return methods
}
package rpc
import (
"fmt"
"net"
"reflect"
)
import (
"github.com/AlexStocks/getty"
"github.com/AlexStocks/goext/net"
log "github.com/AlexStocks/log4go"
jerrors "github.com/juju/errors"
)
type Server struct {
conf ServerConfig
serviceMap map[string]*service
tcpServerList []getty.Server
}
var (
ErrIllegalConf = "illegal conf: "
)
func NewServer(conf *ServerConfig) (*Server, error) {
if err := conf.CheckValidity(); err != nil {
return nil, jerrors.Trace(err)
}
s := &Server{
serviceMap: make(map[string]*service),
conf: *conf,
}
return s, nil
}
func (s *Server) Register(rcvr GettyRPCService) error {
svc := &service{
typ: reflect.TypeOf(rcvr),
rcvr: reflect.ValueOf(rcvr),
name: reflect.Indirect(reflect.ValueOf(rcvr)).Type().Name(),
// Install the methods
method: suitableMethods(reflect.TypeOf(rcvr)),
}
if svc.name == "" {
s := "rpc.Register: no service name for type " + svc.typ.String()
log.Error(s)
return jerrors.New(s)
}
if !isExported(svc.name) {
s := "rpc.Register: type " + svc.name + " is not exported"
log.Error(s)
return jerrors.New(s)
}
if _, present := s.serviceMap[svc.name]; present {
return jerrors.New("rpc: service already defined: " + svc.name)
}
if len(svc.method) == 0 {
// To help the user, see if a pointer receiver would work.
method := suitableMethods(reflect.PtrTo(svc.typ))
str := "rpc.Register: type " + svc.name + " has no exported methods of suitable type"
if len(method) != 0 {
str = "rpc.Register: type " + svc.name + " has no exported methods of suitable type (" +
"hint: pass a pointer to value of that type)"
}
log.Error(str)
return jerrors.New(str)
}
s.serviceMap[svc.name] = svc
return nil
}
func (s *Server) newSession(session getty.Session) error {
var (
ok bool
tcpConn *net.TCPConn
)
if s.conf.GettySessionParam.CompressEncoding {
session.SetCompressType(getty.CompressZip)
}
if tcpConn, ok = session.Conn().(*net.TCPConn); !ok {
panic(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection\n", session.Stat(), session.Conn()))
}
tcpConn.SetNoDelay(s.conf.GettySessionParam.TcpNoDelay)
tcpConn.SetKeepAlive(s.conf.GettySessionParam.TcpKeepAlive)
if s.conf.GettySessionParam.TcpKeepAlive {
tcpConn.SetKeepAlivePeriod(s.conf.GettySessionParam.keepAlivePeriod)
}
tcpConn.SetReadBuffer(s.conf.GettySessionParam.TcpRBufSize)
tcpConn.SetWriteBuffer(s.conf.GettySessionParam.TcpWBufSize)
session.SetName(s.conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(s.conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewRpcServerPackageHandler(s))
session.SetEventListener(NewRpcServerHandler(s.conf.SessionNumber, s.conf.sessionTimeout))
session.SetRQLen(s.conf.GettySessionParam.PkgRQSize)
session.SetWQLen(s.conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(s.conf.GettySessionParam.tcpReadTimeout)
session.SetWriteTimeout(s.conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(s.conf.sessionTimeout.Nanoseconds() / 1e6))
session.SetWaitTime(s.conf.GettySessionParam.waitTimeout)
log.Debug("app accepts new session:%s\n", session.Stat())
return nil
}
func (s *Server) Start() {
var (
addr string
portList []string
tcpServer getty.Server
)
portList = s.conf.Ports
if len(portList) == 0 {
panic("portList is nil")
}
for _, port := range portList {
addr = gxnet.HostAddress2(s.conf.Host, port)
tcpServer = getty.NewTCPServer(
getty.WithLocalAddress(addr),
)
tcpServer.RunEventLoop(s.newSession)
log.Debug("s bind addr{%s} ok!", addr)
s.tcpServerList = append(s.tcpServerList, tcpServer)
}
}
func (s *Server) Stop() {
list := s.tcpServerList
s.tcpServerList = nil
if list != nil {
for _, tcpServer := range list {
tcpServer.Close()
}
}
}
package rpc
import (
"math/rand"
"sync"
"time"
)
var (
seededIDGen = rand.New(rand.NewSource(time.Now().UnixNano()))
// The golang rand generators are *not* intrinsically thread-safe.
seededIDLock sync.Mutex
)
func randomID() uint64 {
seededIDLock.Lock()
defer seededIDLock.Unlock()
return uint64(seededIDGen.Int63())
}
......@@ -22,15 +22,16 @@ import (
)
import (
"github.com/AlexStocks/goext/net"
"github.com/AlexStocks/goext/sync"
"github.com/AlexStocks/goext/time"
log "github.com/AlexStocks/log4go"
"github.com/gorilla/websocket"
"github.com/pkg/errors"
jerrors "github.com/juju/errors"
)
var (
errSelfConnect = errors.New("connect self!")
errSelfConnect = jerrors.New("connect self!")
serverFastFailTimeout = gxtime.TimeSecondDuration(1)
)
......@@ -89,7 +90,7 @@ func NewWSServer(opts ...ServerOption) Server {
func NewWSSServer(opts ...ServerOption) Server {
s := newServer(WSS_SERVER, opts...)
if s.addr == "" || s.cert == "" || s.privateKey == "" || s.caCert == "" {
if s.addr == "" || s.cert == "" || s.privateKey == "" {
panic(fmt.Sprintf("@addr:%s, @cert:%s, @privateKey:%s, @caCert:%s",
s.addr, s.cert, s.privateKey, s.caCert))
}
......@@ -117,16 +118,15 @@ func (s *server) stop() {
if s.server != nil {
ctx, _ = context.WithTimeout(context.Background(), serverFastFailTimeout)
if err = s.server.Shutdown(ctx); err != nil {
// 如果下面内容输出为:server shutdown ctx: context deadline exceeded,
// 则说明有未处理完的active connections。
// if the log output is "shutdown ctx: context deadline exceeded", it means that
// there are still some active connections.
log.Error("server shutdown ctx:%s error:%s", ctx, err)
}
}
s.server = nil
s.lock.Unlock()
if s.streamListener != nil {
// 把streamListener.Close放在这里,既能防止多次关闭调用,
// 又能及时让Server因accept返回错误而从RunEventLoop退出
// let the server exit asap when got error from RunEventLoop.
s.streamListener.Close()
s.streamListener = nil
}
......@@ -158,7 +158,7 @@ func (s *server) listenTCP() error {
streamListener, err = net.Listen("tcp", s.addr)
if err != nil {
return errors.Wrapf(err, "net.Listen(tcp, addr:%s))", s.addr)
return jerrors.Annotatef(err, "net.Listen(tcp, addr:%s))", s.addr)
}
s.streamListener = streamListener
......@@ -175,11 +175,11 @@ func (s *server) listenUDP() error {
localAddr, err = net.ResolveUDPAddr("udp", s.addr)
if err != nil {
return errors.Wrapf(err, "net.ResolveUDPAddr(udp, addr:%s)", s.addr)
return jerrors.Annotatef(err, "net.ResolveUDPAddr(udp, addr:%s)", s.addr)
}
pktListener, err = net.ListenUDP("udp", localAddr)
if err != nil {
return errors.Wrapf(err, "net.ListenUDP((udp, localAddr:%#v)", localAddr)
return jerrors.Annotatef(err, "net.ListenUDP((udp, localAddr:%#v)", localAddr)
}
// if err = setUDPSocketOptions(pktListener); err != nil {
// return errors.Wrapf(err, "setUDPSocketOptions(pktListener:%#v)", pktListener)
......@@ -194,9 +194,9 @@ func (s *server) listenUDP() error {
func (s *server) listen() error {
switch s.endPointType {
case TCP_SERVER, WS_SERVER, WSS_SERVER:
return s.listenTCP()
return jerrors.Trace(s.listenTCP())
case UDP_ENDPOINT:
return s.listenUDP()
return jerrors.Trace(s.listenUDP())
}
return nil
......@@ -205,9 +205,9 @@ func (s *server) listen() error {
func (s *server) accept(newSession NewSessionCallback) (Session, error) {
conn, err := s.streamListener.Accept()
if err != nil {
return nil, err
return nil, jerrors.Trace(err)
}
if conn.RemoteAddr().String() == conn.LocalAddr().String() {
if gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
log.Warn("conn.localAddr{%s} == conn.RemoteAddr", conn.LocalAddr().String(), conn.RemoteAddr().String())
return nil, errSelfConnect
}
......@@ -216,7 +216,7 @@ func (s *server) accept(newSession NewSessionCallback) (Session, error) {
err = newSession(ss)
if err != nil {
conn.Close()
return nil, err
return nil, jerrors.Trace(err)
}
return ss, nil
......@@ -237,7 +237,8 @@ func (s *server) runTcpEventLoop(newSession NewSessionCallback) {
return
}
if delay != 0 {
time.Sleep(delay)
// time.Sleep(delay)
<-wheel.After(delay)
}
client, err = s.accept(newSession)
if err != nil {
......@@ -252,7 +253,7 @@ func (s *server) runTcpEventLoop(newSession NewSessionCallback) {
}
continue
}
log.Warn("server{%s}.Accept() = err {%#v}", s.addr, err)
log.Warn("server{%s}.Accept() = err {%#v}", s.addr, jerrors.ErrorStack(err))
continue
}
delay = 0
......@@ -355,7 +356,7 @@ func (s *server) runWSEventLoop(newSession NewSessionCallback) {
s.lock.Unlock()
err = server.Serve(s.streamListener)
if err != nil {
log.Error("http.server.Serve(addr{%s}) = err{%s}", s.addr, err)
log.Error("http.server.Serve(addr{%s}) = err{%s}", s.addr, jerrors.ErrorStack(err))
// panic(err)
}
}()
......@@ -378,11 +379,12 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
defer s.wg.Done()
if certificate, err = tls.LoadX509KeyPair(s.cert, s.privateKey); err != nil {
panic(fmt.Sprintf("tls.LoadX509KeyPair(cert{%s}, privateKey{%s}) = err{%s}", s.cert, s.privateKey, err))
panic(fmt.Sprintf("tls.LoadX509KeyPair(cert{%s}, privateKey{%s}) = err{%s}",
s.cert, s.privateKey, jerrors.ErrorStack(err)))
return
}
config = &tls.Config{
InsecureSkipVerify: true, // 不对对端的证书进行校验
InsecureSkipVerify: true, // do not verify peer cert
ClientAuth: tls.NoClientCert,
NextProtos: []string{"http/1.1"},
Certificates: []tls.Certificate{certificate},
......@@ -391,7 +393,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
if s.caCert != "" {
certPem, err = ioutil.ReadFile(s.caCert)
if err != nil {
panic(fmt.Errorf("ioutil.ReadFile(certFile{%s}) = err{%s}", s.caCert, err))
panic(fmt.Errorf("ioutil.ReadFile(certFile{%s}) = err{%s}", s.caCert, jerrors.ErrorStack(err)))
}
certPool = x509.NewCertPool()
if ok := certPool.AppendCertsFromPEM(certPem); !ok {
......@@ -416,7 +418,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
s.lock.Unlock()
err = server.Serve(tls.NewListener(s.streamListener, config))
if err != nil {
log.Error("http.server.Serve(addr{%s}) = err{%s}", s.addr, err)
log.Error("http.server.Serve(addr{%s}) = err{%s}", s.addr, jerrors.ErrorStack(err))
panic(err)
}
}()
......@@ -426,7 +428,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
// @newSession: new connection callback
func (s *server) RunEventLoop(newSession NewSessionCallback) {
if err := s.listen(); err != nil {
panic(fmt.Errorf("server.listen() = error:%s", err))
panic(fmt.Errorf("server.listen() = error:%s", jerrors.ErrorStack(err)))
}
switch s.endPointType {
......
This diff is collapsed.
......@@ -10,9 +10,9 @@
package getty
const (
Version = "0.8.2"
DATE = "2018/03/17"
Version = "0.9.3"
DATE = "2018/08/07"
GETTY_MAJOR = 0
GETTY_MINOR = 8
GETTY_MINOR = 9
GETTY_BUILD = 2
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment