Commit 1ca66977 authored by alexstocks's avatar alexstocks

add client

parent 45ae4ff3
...@@ -9,6 +9,14 @@ ...@@ -9,6 +9,14 @@
## develop history ## ## develop history ##
--- ---
- 2016/09/02
> 1 add session.go:(gettyConn)close and session.go:(Session)dispose
>
> 2 add client.go:Client
>
> 3 version: 0.3.00
- 2016/08/29 - 2016/08/29
> 1 rename reconnect to errFlag in function session.go:(Session)handlePackage > 1 rename reconnect to errFlag in function session.go:(Session)handlePackage
> >
......
/******************************************************
# DESC : getty client
# MAINTAINER : Alex Stocks
# LICENCE : Apache License 2.0
# EMAIL : alexstocks@foxmail.com
# MOD : 2016-09-01 21:32
# FILE : client.go
******************************************************/
package getty
import (
"net"
"sync"
"time"
)
import (
log "github.com/AlexStocks/log4go"
)
const (
defaultInterval = 3e9 // 3s
)
type empty struct{}
type Client struct {
// net
sync.Mutex
number int
interval time.Duration
addr string
newSession SessionCallback
sessionMap map[*Session]empty
sync.Once
done chan struct{}
wg sync.WaitGroup
}
// NewClient function builds a client.
// @connNum is connection number.
// @connInterval is reconnect sleep interval when getty fails to connect the server.
// @serverAddr is server address.
func NewClient(connNum int, connInterval time.Duration, serverAddr string) *Client {
if connNum < 0 {
connNum = 1
}
if connInterval < defaultInterval {
connInterval = defaultInterval
}
return &Client{
number: connNum,
interval: connInterval,
addr: serverAddr,
sessionMap: make(map[*Session]empty, connNum),
done: make(chan struct{}),
}
}
func (this *Client) dial() net.Conn {
var (
err error
conn net.Conn
)
for {
if this.IsClosed() {
return nil
}
conn, err = net.DialTimeout("tcp", this.addr, this.interval)
if err == nil {
return conn
}
log.Info("net.Connect(addr:%s, timeout:%v) = error{%v}", this.addr, err)
time.Sleep(this.interval)
continue
}
}
func (this *Client) getSessionNum() int {
var num int
this.Lock()
for s := range this.sessionMap {
if s.IsClosed() {
delete(this.sessionMap, s)
}
}
num = len(this.sessionMap)
this.Unlock()
return num
}
func (this *Client) connect() {
var (
err error
conn net.Conn
ss *Session
)
for {
conn = this.dial()
if conn == nil {
// client has been closed
break
}
ss = NewSession(conn)
err = this.newSession(ss)
if err == nil {
ss.RunEventloop()
this.Lock()
this.sessionMap[ss] = empty{}
this.Unlock()
break
}
conn.Close()
}
}
func (this *Client) RunEventLoop(newSession SessionCallback) {
this.Lock()
this.newSession = newSession
this.Unlock()
this.wg.Add(1)
go func() {
var num, max int
defer this.wg.Done()
this.Lock()
max = this.number
this.Unlock()
for {
if this.IsClosed() {
log.Warn("client{peer:%s} goroutine exit now.", this.addr)
break
}
num = this.getSessionNum()
if max <= num {
time.Sleep(this.interval)
continue
}
this.connect()
}
}()
}
func (this *Client) stop() {
select {
case <-this.done:
return
default:
this.Once.Do(func() {
close(this.done)
this.Lock()
for s := range this.sessionMap {
s.Close()
}
this.sessionMap = nil
this.Unlock()
})
}
}
func (this *Client) IsClosed() bool {
select {
case <-this.done:
return true
default:
return false
}
}
func (this *Client) Close() {
this.stop()
this.wg.Wait()
}
...@@ -45,5 +45,7 @@ type EventListener interface { ...@@ -45,5 +45,7 @@ type EventListener interface {
OnCron(*Session) OnCron(*Session)
// invoked when receive packge. Pls attention that do not handle long time logic processing in this func. // invoked when receive packge. Pls attention that do not handle long time logic processing in this func.
// Y'd better set the package's maximum length. If the message's length is greater than it, u should
// should return err and getty will close this connection soon.
OnMessage(*Session, interface{}) OnMessage(*Session, interface{})
} }
...@@ -20,7 +20,7 @@ import ( ...@@ -20,7 +20,7 @@ import (
log "github.com/AlexStocks/log4go" log "github.com/AlexStocks/log4go"
) )
type SessionCallback func(*Session) type SessionCallback func(*Session) error
type Server struct { type Server struct {
// net // net
...@@ -76,7 +76,7 @@ func (this *Server) Bind(network string, host string, port int) error { ...@@ -76,7 +76,7 @@ func (this *Server) Bind(network string, host string, port int) error {
return nil return nil
} }
func (this *Server) RunEventloop(newSessionCallback func(*Session)) { func (this *Server) RunEventloop(newSession SessionCallback) {
this.wg.Add(1) this.wg.Add(1)
go func() { go func() {
defer this.wg.Done() defer this.wg.Done()
...@@ -93,9 +93,9 @@ func (this *Server) RunEventloop(newSessionCallback func(*Session)) { ...@@ -93,9 +93,9 @@ func (this *Server) RunEventloop(newSessionCallback func(*Session)) {
if delay != 0 { if delay != 0 {
time.Sleep(delay) time.Sleep(delay)
} }
client, err = this.Accept(newSessionCallback) client, err = this.Accept(newSession)
if err != nil { if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() { if netErr, ok := err.(net.Error); ok && netErr.Temporary() {
if delay == 0 { if delay == 0 {
delay = 5 * time.Millisecond delay = 5 * time.Millisecond
} else { } else {
...@@ -119,14 +119,18 @@ func (this *Server) Listener() net.Listener { ...@@ -119,14 +119,18 @@ func (this *Server) Listener() net.Listener {
return this.listener return this.listener
} }
func (this *Server) Accept(newSessionCallback func(*Session)) (*Session, error) { func (this *Server) Accept(newSession SessionCallback) (*Session, error) {
conn, err := this.listener.Accept() conn, err := this.listener.Accept()
if err != nil { if err != nil {
return nil, err return nil, err
} }
session := NewSession(conn) session := NewSession(conn)
newSessionCallback(session) err = newSession(session)
if err != nil {
conn.Close()
return nil, err
}
return session, nil return session, nil
} }
......
...@@ -67,6 +67,14 @@ func (this *gettyConn) write(p []byte) (int, error) { ...@@ -67,6 +67,14 @@ func (this *gettyConn) write(p []byte) (int, error) {
return this.conn.Write(p) return this.conn.Write(p)
} }
func (this *gettyConn) close(waitSec int) {
// if tcpConn, ok := this.conn.(*net.TCPConn); ok {
// tcpConn.SetLinger(0)
// }
this.conn.(*net.TCPConn).SetLinger(waitSec)
this.conn.Close()
}
func (this *gettyConn) incReadPkgCount() { func (this *gettyConn) incReadPkgCount() {
atomic.AddUint32(&this.readPkgCount, 1) atomic.AddUint32(&this.readPkgCount, 1)
} }
...@@ -294,7 +302,10 @@ func (this *Session) WriteBytes(pkg []byte) error { ...@@ -294,7 +302,10 @@ func (this *Session) WriteBytes(pkg []byte) error {
} }
func (this *Session) dispose() { func (this *Session) dispose() {
this.conn.Close() this.lock.Lock()
// this.conn.Close()
this.gettyConn.close((int)((int64)(this.wait)))
this.lock.Unlock()
} }
func (this *Session) RunEventloop() { func (this *Session) RunEventloop() {
......
...@@ -10,5 +10,5 @@ ...@@ -10,5 +10,5 @@
package getty package getty
var ( var (
Version = "0.2.07" Version = "0.3.00"
) )
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment