Commit 23196e55 authored by alexstocks's avatar alexstocks

network lib in golang

parent f938af14
# getty # getty #
*a netty like asynchronous network I/O library* ---
*a netty like asynchronous network I/O library*
## introdction ##
---
> DESC : a asynchronous network I/O library in golang
LICENCE : Apache License 2.0
AUTHOR : https://github.com/sanbit
MAINTAINER : Alex Stocks
EMAIL : alexstocks@foxmail.com
## develop history ##
---
- 2016/08/22
> rename (Session)OnIdle to (Session)OnCron
> rewrite server.go: add Server{done, wg}
> add utils.go
> version: 0.2.03
- 2016/08/21
> add name for Session
> add OnError for Codec
- 2016/08/18
> delete last clause of handleRead
> add reqQ handle case in last clause of handleLoop
> add conditon check in (*Session)RunEventLoop()
> version: 0.2.02
- 2016/08/16
> rename all structs
> add getty connection
> rewrite (Session)handleRead & (Session)handleEventLoop
> version: 0.2.01
/******************************************************
# DESC : codec interface
# MAINTAINER : Alex Stocks
# LICENCE : Apache License 2.0
# EMAIL : alexstocks@foxmail.com
# MOD : 2016-08-17 11:20
# FILE : codec.go
******************************************************/
package getty
import (
"bytes"
)
// Reader is used to unmarshal a complete pkg from buffer
type Reader interface {
// Parse pkg from buffer and if possible return a complete pkg
Read(*Session, *bytes.Buffer) (interface{}, error)
}
// Writer is used to marshal pkg and write to session
type Writer interface {
Write(*Session, interface{}) error
}
// packet handler interface
type ReadWriter interface {
Reader
Writer
}
// EventListener is used to process pkg that recved from remote session
type EventListener interface {
// invoked when session opened
OnOpen(*Session)
// invoked when session closed
OnClose(*Session)
// invoked when got error
OnError(*Session, error)
// invoked periodically, its period can be set by (Session)SetCronPeriod
OnCron(*Session)
// invoked when receive packge. Pls attention that do not handle long time logic processing in this func.
OnMessage(*Session, interface{})
}
/******************************************************
# DESC : getty server
# MAINTAINER : Alex Stocks
# LICENCE : Apache License 2.0
# EMAIL : alexstocks@foxmail.com
# MOD : 2016-08-17 11:21
# FILE : server.go
******************************************************/
package getty
import (
"errors"
"net"
"sync"
"time"
)
import (
log "github.com/AlexStocks/log4go"
)
type SessionCallback func(*Session)
type Server struct {
// net
host string
port int
listener net.Listener
sync.Once
done chan struct{}
wg sync.WaitGroup
}
func NewServer() *Server {
return &Server{done: make(chan struct{})}
}
func (this *Server) stop() {
select {
case <-this.done:
return
default:
close(this.done)
this.Once.Do(func() {
// 把listener.Close放在这里,既能防止多次关闭调用,
// 又能及时让Server因accept返回错误而从RunEventloop退出
this.listener.Close()
})
}
}
func (this *Server) IsClosed() bool {
select {
case <-this.done:
return true
default:
return false
}
return false
}
func (this *Server) Bind(network string, host string, port int) error {
if port <= 0 {
return errors.New("port<=0 illegal")
}
this.host = host
this.port = port
listener, err := net.Listen(network, HostAddress(host, port))
if err != nil {
return err
}
this.listener = listener
return nil
}
func (this *Server) RunEventloop(newSessionCallback func(*Session)) {
this.wg.Add(1)
go func() {
defer this.wg.Done()
var (
err error
client *Session
delay time.Duration
)
for {
if this.IsClosed() {
log.Warn("Server{%s:%d} stop acceptting client connect request.", this.host, this.port)
return
}
if delay != 0 {
time.Sleep(delay)
}
client, err = this.Accept(newSessionCallback)
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
if delay == 0 {
delay = 5 * time.Millisecond
} else {
delay *= 2
}
if max := 1 * time.Second; delay > max {
delay = max
}
continue
}
log.Info("Server{%s:%d}.Accept() = err {%+v}", this.host, this.port, err)
continue
}
delay = 0
client.RunEventloop()
}
}()
}
func (this *Server) Listener() net.Listener {
return this.listener
}
func (this *Server) Accept(newSessionCallback func(*Session)) (*Session, error) {
conn, err := this.listener.Accept()
if err != nil {
return nil, err
}
session := NewSession(conn)
newSessionCallback(session)
return session, nil
}
func (this *Server) Close() {
this.stop()
this.wg.Wait()
}
/******************************************************
# DESC : session
# MAINTAINER : Alex Stocks
# LICENCE : Apache License 2.0
# EMAIL : alexstocks@foxmail.com
# MOD : 2016-08-17 11:21
# FILE : session.go
******************************************************/
package getty
import (
"bytes"
"errors"
"fmt"
"net"
"sync"
"sync/atomic"
"time"
)
import (
log "github.com/AlexStocks/log4go"
)
const (
maxReadBufLen = 4 * 1024
netIOTimeout = 100e6 // 100ms
cronPeriod = 60 * 1e9 // 1 minute
pendingDuration = 3e9
defaultSessionName = "Session"
outputFormat = "%s:%d:%s, Read Count: %d, Write Count: %d, Read Pkg Count: %d, Write Pkg Count: %d"
)
var (
connId uint32 = 0
ErrSessionClosed = errors.New("Session Already Closed")
ErrSessionBlocked = errors.New("Session full blocked")
)
/////////////////////////////////////////
// getty connection
/////////////////////////////////////////
type gettyConn struct {
conn net.Conn
Id uint32
readCount uint32 // read() count
writeCount uint32 // write() count
readPkgCount uint32 // send pkg count
writePkgCount uint32 // recv pkg count
}
func newGettyConn(conn net.Conn) *gettyConn {
return &gettyConn{conn: conn, Id: atomic.AddUint32(&connId, 1)}
}
func (this *gettyConn) read(p []byte) (int, error) {
atomic.AddUint32(&this.readCount, 1)
return this.conn.Read(p)
}
func (this *gettyConn) write(p []byte) (int, error) {
atomic.AddUint32(&this.writeCount, 1)
return this.conn.Write(p)
}
func (this *gettyConn) incReadPkgCount() {
atomic.AddUint32(&this.readPkgCount, 1)
}
func (this *gettyConn) incWritePkgCount() {
atomic.AddUint32(&this.writePkgCount, 1)
}
/////////////////////////////////////////
// session
/////////////////////////////////////////
// getty base session
type Session struct {
name string
// net read write
*gettyConn
pkgHandler ReadWriter
listener EventListener
done chan struct{}
readerDone chan struct{} // end reader
cronPeriod time.Duration
readDeadline time.Duration
writeDeadline time.Duration
closeWait time.Duration
reqQ chan interface{}
rspQ chan interface{}
// attribute
attrs map[string]interface{}
// goroutines sync
wg sync.WaitGroup
lock sync.RWMutex
}
func NewSession(conn net.Conn) *Session {
return &Session{
name: defaultSessionName,
gettyConn: newGettyConn(conn),
done: make(chan struct{}),
readerDone: make(chan struct{}),
cronPeriod: cronPeriod,
readDeadline: netIOTimeout,
writeDeadline: netIOTimeout,
closeWait: pendingDuration,
attrs: make(map[string]interface{}),
}
}
func (this *Session) Conn() net.Conn { return this.conn }
func (this *Session) Stat() string {
return fmt.Sprintf(
outputFormat,
this.name,
this.Id,
this.conn.RemoteAddr().String(),
atomic.LoadUint32(&(this.readCount)),
atomic.LoadUint32(&(this.writeCount)),
atomic.LoadUint32(&(this.readPkgCount)),
atomic.LoadUint32(&(this.writePkgCount)),
)
}
func (this *Session) IsClosed() bool {
select {
case <-this.done:
return true
default:
return false
}
return false
}
func (this *Session) SetName(name string) { this.name = name }
func (this *Session) SetEventListener(listener EventListener) {
this.listener = listener
}
// set package handler
func (this *Session) SetPkgHandler(handler ReadWriter) {
this.pkgHandler = handler
}
// period is in millisecond
func (this *Session) SetCronPeriod(period int) {
if period < 1 {
panic("@period < 1")
}
this.lock.Lock()
this.cronPeriod = time.Duration(period) * time.Millisecond
this.lock.Unlock()
}
func (this *Session) SetReadChanLen(readChanLen int) {
if readChanLen < 1 {
panic("@readChanLen < 1")
}
this.lock.Lock()
this.reqQ = make(chan interface{}, readChanLen)
this.lock.Unlock()
}
func (this *Session) SetWriteChanLen(writeChanLen int) {
if writeChanLen < 1 {
panic("@writeChanLen < 1")
}
this.lock.Lock()
this.rspQ = make(chan interface{}, writeChanLen)
this.lock.Unlock()
}
func (this *Session) SetReadDeadline(readDeadline time.Duration) {
if readDeadline < 1 {
panic("@readDeadline < 1")
}
this.lock.Lock()
this.readDeadline = readDeadline
if this.writeDeadline == 0 {
this.writeDeadline = readDeadline
}
this.lock.Unlock()
}
func (this *Session) SetWriteDeadline(writeDeadline time.Duration) {
if writeDeadline < 1 {
panic("@writeDeadline < 1")
}
this.lock.Lock()
this.writeDeadline = writeDeadline
this.lock.Unlock()
}
func (this *Session) SetCloseWait(closeWait time.Duration) {
if closeWait < 1 {
panic("@closeWait < 1")
}
this.lock.Lock()
this.closeWait = closeWait
this.lock.Unlock()
}
func (this *Session) GetAttribute(key string) interface{} {
var ret interface{}
this.lock.RLock()
ret = this.attrs[key]
this.lock.RUnlock()
return ret
}
func (this *Session) SetAttribute(key string, value interface{}) {
this.lock.Lock()
this.attrs[key] = value
this.lock.Unlock()
}
func (this *Session) RemoveAttribute(key string) {
this.lock.Lock()
delete(this.attrs, key)
this.lock.Unlock()
}
func (this *Session) stop() {
select {
case <-this.done:
return
default:
close(this.done)
}
}
func (this *Session) Close() {
this.stop()
this.wg.Wait()
}
// Queued write, for handler
func (this *Session) WritePkg(pkg interface{}) error {
if this.IsClosed() {
return ErrSessionClosed
}
defer func() {
if err := recover(); err != nil {
log.Error("%s:%d:%s [session.WritePkg] err=%+v\n",
this.name, this.Id, this.conn.RemoteAddr().String(), err)
}
}()
select {
case this.rspQ <- pkg:
break // for possible gen a new pkg
default:
return ErrSessionBlocked
}
return nil
}
// for codecs
func (this *Session) WriteBytes(pkg []byte) error {
this.conn.SetWriteDeadline(time.Now().Add(this.readDeadline))
_, err := this.write(pkg)
return err
}
func (this *Session) dispose() {
this.conn.Close()
}
func (this *Session) RunEventloop() {
if this.reqQ == nil || this.rspQ == nil {
errStr := fmt.Sprintf("Session{name:%s, reqQ:%#v, rspQ:%#v}",
this.name, this.reqQ, this.rspQ)
log.Error(errStr)
panic(errStr)
}
if this.conn == nil || this.pkgHandler == nil {
errStr := fmt.Sprintf("Session{name:%s, conn:%#v, pkgHandler:%#v}",
this.name, this.conn, this.pkgHandler)
log.Error(errStr)
panic(errStr)
}
this.wg.Add(1)
go this.handleLoop()
this.wg.Add(1)
go this.handlePackage()
}
func (this *Session) handleLoop() {
var (
err error
start time.Time
ticker *time.Ticker
reqPkg interface{}
rspPkg interface{}
)
defer func() {
if err := recover(); err != nil {
log.Error("%s:%d:%s [session.handleLoop] err=%+v\n",
this.name, this.Id, this.conn.RemoteAddr().String(), err)
}
close(this.rspQ)
close(this.reqQ)
this.wg.Done()
if this.listener != nil {
this.listener.OnClose(this)
}
// real close connection, dispose会调用(Session)Close,(Session)Close中的wg.Wait多次调用并不会引起任何问题
this.dispose()
log.Info("%s:%d:%s [session.handleLoop] goroutine end, statistic{%s}",
this.name, this.Id, this.conn.RemoteAddr().String(), this.Stat())
}()
// call session opened
if this.listener != nil {
this.listener.OnOpen(this)
}
ticker = time.NewTicker(this.cronPeriod)
LOOP:
for {
select {
case <-this.done:
log.Info("%s:%d:%s [session.handleLoop] got done signal ",
this.name, this.Id, this.conn.RemoteAddr().String())
break LOOP
case reqPkg = <-this.reqQ:
if this.listener != nil {
this.incReadPkgCount()
this.listener.OnMessage(this, reqPkg)
}
case rspPkg = <-this.rspQ:
if err = this.pkgHandler.Write(this, rspPkg); err != nil {
log.Error("%s:%d:%s [session.handleLoop] = error{%+v}",
this.name, this.Id, this.conn.RemoteAddr().String(), err)
break LOOP
}
this.incWritePkgCount()
case <-ticker.C:
if this.listener != nil {
this.listener.OnCron(this)
}
}
}
ticker.Stop()
this.stop()
// wait for reader goroutine closed
<-this.readerDone
// process pending pkg
start = time.Now()
LAST:
for {
if time.Since(start).Nanoseconds() > this.closeWait.Nanoseconds() {
break
}
select {
case rspPkg = <-this.rspQ:
if err = this.pkgHandler.Write(this, rspPkg); err != nil {
break LAST
}
this.incWritePkgCount()
case reqPkg = <-this.reqQ:
if this.listener != nil {
this.incReadPkgCount()
this.listener.OnMessage(this, reqPkg)
}
default:
log.Info("%s:%d:%s [session.handleLoop] default", this.name, this.Id, this.conn.RemoteAddr().String())
break LAST
}
}
}
// get package from tcp stream(packet)
func (this *Session) handlePackage() {
var (
err error
nerr net.Error
ok bool
exit bool
len int
buf []byte
pktBuf *bytes.Buffer
pkg interface{}
)
defer func() {
if err := recover(); err != nil {
log.Error("%s:%d:%s [session.handlePackage] = err{%+v}",
this.name, this.Id, this.conn.RemoteAddr().String(), err)
}
this.wg.Done()
this.stop()
close(this.readerDone)
log.Info("%s:%d:%s [session.handlePackage] goroutine exit......",
this.name, this.Id, this.conn.RemoteAddr().String())
}()
buf = make([]byte, maxReadBufLen)
pktBuf = new(bytes.Buffer)
for {
if this.IsClosed() {
exit = true
}
for {
if exit {
break // 退出前不再读取任何packet,跳到下个for-loop处理完pktBuf中的stream
}
this.conn.SetReadDeadline(time.Now().Add(this.readDeadline))
len, err = this.read(buf)
if err != nil {
log.Warn("%s:%d:%s [session.conn.read] = error{%s}",
this.name, this.Id, this.conn.RemoteAddr().String(), err.Error())
if nerr, ok = err.(net.Error); ok && nerr.Timeout() {
break
}
// 遇到网络错误的时候,handlePackage能够及时退出,但是handleLoop的第一个for-select因为要处理(Codec)OnMessage
// 导致程序不能及时退出,此处添加(Codec)OnError调用以及时通知getty调用者
// AS, 2016/08/21
if this.listener != nil {
this.listener.OnError(this, nerr)
}
exit = true
}
break
}
if 0 < len {
pktBuf.Write(buf[:len])
}
for {
if pktBuf.Len() <= 0 {
break
}
pkg, err = this.pkgHandler.Read(this, pktBuf)
if err != nil || pkg == nil {
log.Info("%s:%d:%s [session.pkgHandler.Read] = pkg{%#v}, error{%+v}",
this.name, this.Id, this.conn.RemoteAddr().String(), pkg, err)
exit = true
break
}
this.reqQ <- pkg
}
if exit {
break
}
}
}
/******************************************************
# DESC : getty utility
# AUTHOR : Alex Stocks
# LICENCE : Apache License 2.0
# EMAIL : alexstocks@foxmail.com
# MOD : 2016-08-22 17:44
# FILE : utils.go
******************************************************/
package getty
import (
"bytes"
"encoding/binary"
"net"
"strconv"
"time"
)
func HostAddress(host string, port int) string {
return net.JoinHostPort(host, strconv.Itoa(port))
}
func dial(addr string) (net.Conn, error) {
return net.Dial("tcp", addr)
}
func dialTimeout(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("tcp", addr, timeout)
}
////////////////////////////////////////
// enc/dec
////////////////////////////////////////
func Int2Bytes(x int32) []byte {
var buf = bytes.NewBuffer([]byte{})
binary.Write(buf, binary.BigEndian, x)
return buf.Bytes()
}
func Bytes2Int(b []byte) int32 {
var (
x int32
buf *bytes.Buffer
)
buf = bytes.NewBuffer(b)
binary.Read(buf, binary.BigEndian, &x)
return x
}
/******************************************************
# DESC : getty version
# MAINTAINER : Alex Stocks
# LICENCE : Apache License 2.0
# EMAIL : alexstocks@foxmail.com
# MOD : 2016-08-17 11:23
# FILE : version.go
******************************************************/
package getty
var (
Version = "0.2.03"
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment