Commit 05ebc1c5 authored by alexstocks's avatar alexstocks

add error return for OnOpen

parent 38afa856
...@@ -11,6 +11,13 @@ ...@@ -11,6 +11,13 @@
## develop history ## ## develop history ##
--- ---
- 2016/09/06
> 1 codec.go:(Reader)Read(*Session, []byte) (interface{}, error) -> codec.go:(Reader)Read(*Session, []byte) (interface{}, int, error)
>
> 2 codec.go:(EventListener)OnOpen(*Session) -> codec.go:(EventListener)OnOpen(*Session) error
>
> 3 version: 0.3.04
- 2016/09/05 - 2016/09/05
> 1 add 'errFlag = true' when got err in pkgHandler.Read loop clause in session.go:(Session)handlePackage > 1 add 'errFlag = true' when got err in pkgHandler.Read loop clause in session.go:(Session)handlePackage
> >
...@@ -41,7 +48,7 @@ ...@@ -41,7 +48,7 @@
- 2016/09/02 - 2016/09/02
> 1 add session.go:(gettyConn)close and session.go:(Session)dispose > 1 add session.go:(gettyConn)close and session.go:(Session)dispose
> >
> 2 modify return value of server.go:SessionCallback from void to err > 2 modify return value of server.go:NewSessionCallback from void to err
> >
> 3 add client.go:Client > 3 add client.go:Client
> >
......
...@@ -33,7 +33,7 @@ type Client struct { ...@@ -33,7 +33,7 @@ type Client struct {
number int number int
interval time.Duration interval time.Duration
addr string addr string
newSession SessionCallback newSession NewSessionCallback
sessionMap map[*Session]empty sessionMap map[*Session]empty
sync.Once sync.Once
...@@ -100,9 +100,9 @@ func (this *Client) sessionNum() int { ...@@ -100,9 +100,9 @@ func (this *Client) sessionNum() int {
func (this *Client) connect() { func (this *Client) connect() {
var ( var (
err error err error
conn net.Conn conn net.Conn
ss *Session session *Session
) )
for { for {
...@@ -111,12 +111,12 @@ func (this *Client) connect() { ...@@ -111,12 +111,12 @@ func (this *Client) connect() {
// client has been closed // client has been closed
break break
} }
ss = NewSession(conn) session = NewSession(conn)
err = this.newSession(ss) err = this.newSession(session)
if err == nil { if err == nil {
ss.RunEventloop() session.RunEventLoop()
this.Lock() this.Lock()
this.sessionMap[ss] = empty{} this.sessionMap[session] = empty{}
this.Unlock() this.Unlock()
break break
} }
...@@ -124,7 +124,7 @@ func (this *Client) connect() { ...@@ -124,7 +124,7 @@ func (this *Client) connect() {
} }
} }
func (this *Client) RunEventLoop(newSession SessionCallback) { func (this *Client) RunEventLoop(newSession NewSessionCallback) {
this.Lock() this.Lock()
this.newSession = newSession this.newSession = newSession
this.Unlock() this.Unlock()
......
...@@ -9,16 +9,17 @@ ...@@ -9,16 +9,17 @@
package getty package getty
// SessionCallback will be invoked when server accepts a new client connection or client connects to server successfully. // NewSessionCallback will be invoked when server accepts a new client connection or client connects to server successfully.
// if there are too many client connections or u do not want to connect a server again, u can return non-nil error. And // if there are too many client connections or u do not want to connect a server again, u can return non-nil error. And
// then getty will close the new session. // then getty will close the new session.
type SessionCallback func(*Session) error type NewSessionCallback func(*Session) error
// Reader is used to unmarshal a complete pkg from buffer // Reader is used to unmarshal a complete pkg from buffer
type Reader interface { type Reader interface {
// Parse pkg from buffer and if possible return a complete pkg // Parse pkg from buffer and if possible return a complete pkg
// If length of buf is not long enough, u should return {nil, nil} // If length of buf is not long enough, u should return {nil,0, nil}
Read(*Session, []byte) (interface{}, error) // The second return value is the length of the pkg.
Read(*Session, []byte) (interface{}, int, error)
} }
// Writer is used to marshal pkg and write to session // Writer is used to marshal pkg and write to session
...@@ -35,7 +36,7 @@ type ReadWriter interface { ...@@ -35,7 +36,7 @@ type ReadWriter interface {
// EventListener is used to process pkg that recved from remote session // EventListener is used to process pkg that recved from remote session
type EventListener interface { type EventListener interface {
// invoked when session opened // invoked when session opened
OnOpen(*Session) OnOpen(*Session) error
// invoked when session closed // invoked when session closed
OnClose(*Session) OnClose(*Session)
......
...@@ -77,7 +77,7 @@ func (this *Server) Listen(network string, addr string) error { ...@@ -77,7 +77,7 @@ func (this *Server) Listen(network string, addr string) error {
return nil return nil
} }
func (this *Server) RunEventloop(newSession SessionCallback) { func (this *Server) RunEventloop(newSession NewSessionCallback) {
this.wg.Add(1) this.wg.Add(1)
go func() { go func() {
defer this.wg.Done() defer this.wg.Done()
...@@ -111,7 +111,7 @@ func (this *Server) RunEventloop(newSession SessionCallback) { ...@@ -111,7 +111,7 @@ func (this *Server) RunEventloop(newSession SessionCallback) {
continue continue
} }
delay = 0 delay = 0
client.RunEventloop() client.RunEventLoop()
} }
}() }()
} }
...@@ -120,7 +120,7 @@ func (this *Server) Listener() net.Listener { ...@@ -120,7 +120,7 @@ func (this *Server) Listener() net.Listener {
return this.listener return this.listener
} }
func (this *Server) Accept(newSession SessionCallback) (*Session, error) { func (this *Server) Accept(newSession NewSessionCallback) (*Session, error) {
conn, err := this.listener.Accept() conn, err := this.listener.Accept()
if err != nil { if err != nil {
return nil, err return nil, err
......
...@@ -17,7 +17,9 @@ import ( ...@@ -17,7 +17,9 @@ import (
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
)
import (
log "github.com/AlexStocks/log4go" log "github.com/AlexStocks/log4go"
) )
...@@ -30,6 +32,10 @@ const ( ...@@ -30,6 +32,10 @@ const (
outputFormat = "session %s, Read Count: %d, Write Count: %d, Read Pkg Count: %d, Write Pkg Count: %d" outputFormat = "session %s, Read Count: %d, Write Count: %d, Read Pkg Count: %d, Write Pkg Count: %d"
) )
var (
ErrInvalidConnection = errors.New("connection has been closed.")
)
///////////////////////////////////////// /////////////////////////////////////////
// getty connection // getty connection
///////////////////////////////////////// /////////////////////////////////////////
...@@ -52,12 +58,19 @@ func newGettyConn(conn net.Conn) gettyConn { ...@@ -52,12 +58,19 @@ func newGettyConn(conn net.Conn) gettyConn {
} }
func (this *gettyConn) read(p []byte) (int, error) { func (this *gettyConn) read(p []byte) (int, error) {
// if this.conn == nil {
// return 0, ErrInvalidConnection
// }
// atomic.AddUint32(&this.readCount, 1) // atomic.AddUint32(&this.readCount, 1)
atomic.AddUint32(&this.readCount, (uint32)(len(p))) atomic.AddUint32(&this.readCount, (uint32)(len(p)))
return this.conn.Read(p) return this.conn.Read(p)
} }
func (this *gettyConn) write(p []byte) (int, error) { func (this *gettyConn) write(p []byte) (int, error) {
// if this.conn == nil {
// return 0, ErrInvalidConnection
// }
// atomic.AddUint32(&this.writeCount, 1) // atomic.AddUint32(&this.writeCount, 1)
atomic.AddUint32(&this.writeCount, (uint32)(len(p))) atomic.AddUint32(&this.writeCount, (uint32)(len(p)))
return this.conn.Write(p) return this.conn.Write(p)
...@@ -67,8 +80,11 @@ func (this *gettyConn) close(waitSec int) { ...@@ -67,8 +80,11 @@ func (this *gettyConn) close(waitSec int) {
// if tcpConn, ok := this.conn.(*net.TCPConn); ok { // if tcpConn, ok := this.conn.(*net.TCPConn); ok {
// tcpConn.SetLinger(0) // tcpConn.SetLinger(0)
// } // }
this.conn.(*net.TCPConn).SetLinger(waitSec) if this.conn != nil {
this.conn.Close() this.conn.(*net.TCPConn).SetLinger(waitSec)
this.conn.Close()
this.conn = nil
}
} }
func (this *gettyConn) incReadPkgCount() { func (this *gettyConn) incReadPkgCount() {
...@@ -141,6 +157,7 @@ func (this *Session) Reset() { ...@@ -141,6 +157,7 @@ func (this *Session) Reset() {
func (this *Session) SetConn(conn net.Conn) { this.gettyConn = newGettyConn(conn) } func (this *Session) SetConn(conn net.Conn) { this.gettyConn = newGettyConn(conn) }
func (this *Session) Conn() net.Conn { return this.conn } func (this *Session) Conn() net.Conn { return this.conn }
// return the connect statistic data
func (this *Session) Stat() string { func (this *Session) Stat() string {
return fmt.Sprintf( return fmt.Sprintf(
outputFormat, outputFormat,
...@@ -152,6 +169,7 @@ func (this *Session) Stat() string { ...@@ -152,6 +169,7 @@ func (this *Session) Stat() string {
) )
} }
// check whether the session has been closed.
func (this *Session) IsClosed() bool { func (this *Session) IsClosed() bool {
select { select {
case <-this.done: case <-this.done:
...@@ -205,6 +223,7 @@ func (this *Session) SetWQLen(writeQLen int) { ...@@ -205,6 +223,7 @@ func (this *Session) SetWQLen(writeQLen int) {
this.lock.Unlock() this.lock.Unlock()
} }
// SetReadDeadline sets deadline for the future read calls.
func (this *Session) SetReadDeadline(rDeadline time.Duration) { func (this *Session) SetReadDeadline(rDeadline time.Duration) {
if rDeadline < 1 { if rDeadline < 1 {
panic("@rDeadline < 1") panic("@rDeadline < 1")
...@@ -218,6 +237,7 @@ func (this *Session) SetReadDeadline(rDeadline time.Duration) { ...@@ -218,6 +237,7 @@ func (this *Session) SetReadDeadline(rDeadline time.Duration) {
this.lock.Unlock() this.lock.Unlock()
} }
// SetWriteDeadlile sets deadline for the future read calls.
func (this *Session) SetWriteDeadline(wDeadline time.Duration) { func (this *Session) SetWriteDeadline(wDeadline time.Duration) {
if wDeadline < 1 { if wDeadline < 1 {
panic("@wDeadline < 1") panic("@wDeadline < 1")
...@@ -259,25 +279,6 @@ func (this *Session) RemoveAttribute(key string) { ...@@ -259,25 +279,6 @@ func (this *Session) RemoveAttribute(key string) {
this.lock.Unlock() this.lock.Unlock()
} }
func (this *Session) stop() {
select {
case <-this.done:
return
default:
// defeat "panic: close of closed channel"
this.once.Do(func() { close(this.done) })
}
}
func (this *Session) Close() error {
this.stop()
this.attrs = nil
log.Info("%s closed now, its current gr num %d",
this.sessionToken(), atomic.LoadInt32(&(this.grNum)))
return nil
}
func (this *Session) sessionToken() string { func (this *Session) sessionToken() string {
return fmt.Sprintf( return fmt.Sprintf(
"{%s, %d, %s <-> %s}", "{%s, %d, %s <-> %s}",
...@@ -317,27 +318,26 @@ func (this *Session) WriteBytes(pkg []byte) error { ...@@ -317,27 +318,26 @@ func (this *Session) WriteBytes(pkg []byte) error {
return err return err
} }
func (this *Session) dispose() { func (this *Session) RunEventLoop() {
this.lock.Lock()
// this.conn.Close()
this.gettyConn.close((int)((int64)(this.wait)))
this.lock.Unlock()
}
func (this *Session) RunEventloop() {
if this.rQ == nil || this.wQ == nil { if this.rQ == nil || this.wQ == nil {
errStr := fmt.Sprintf("Session{name:%s, rQ:%#v, wQ:%#v}", errStr := fmt.Sprintf("Session{name:%s, rQ:%#v, wQ:%#v}",
this.name, this.rQ, this.wQ) this.name, this.rQ, this.wQ)
log.Error(errStr) log.Error(errStr)
panic(errStr) panic(errStr)
} }
if this.conn == nil || this.pkgHandler == nil { if this.conn == nil || this.listener == nil || this.pkgHandler == nil {
errStr := fmt.Sprintf("Session{name:%s, conn:%#v, pkgHandler:%#v}", errStr := fmt.Sprintf("Session{name:%s, conn:%#v, listener:%#v, pkgHandler:%#v}",
this.name, this.conn, this.pkgHandler) this.name, this.conn, this.listener, this.pkgHandler)
log.Error(errStr) log.Error(errStr)
panic(errStr) panic(errStr)
} }
// call session opened
if err := this.listener.OnOpen(this); err != nil {
this.Close()
return
}
atomic.AddInt32(&(this.grNum), 2) atomic.AddInt32(&(this.grNum), 2)
go this.handleLoop() go this.handleLoop()
go this.handlePackage() go this.handlePackage()
...@@ -357,22 +357,13 @@ func (this *Session) handleLoop() { ...@@ -357,22 +357,13 @@ func (this *Session) handleLoop() {
if err := recover(); err != nil { if err := recover(); err != nil {
log.Error("%s, [session.handleLoop] err=%+v\n", this.sessionToken(), err) log.Error("%s, [session.handleLoop] err=%+v\n", this.sessionToken(), err)
} }
close(this.wQ)
close(this.rQ)
grNum = atomic.AddInt32(&(this.grNum), -1) grNum = atomic.AddInt32(&(this.grNum), -1)
if this.listener != nil { this.listener.OnClose(this)
this.listener.OnClose(this)
}
// real close connection, dispose会调用(conn)Close,
this.dispose()
log.Info("statistic{%s}, [session.handleLoop] goroutine exit now, left gr num %d", this.Stat(), grNum) log.Info("statistic{%s}, [session.handleLoop] goroutine exit now, left gr num %d", this.Stat(), grNum)
this.Close()
}() }()
// call session opened
if this.listener != nil {
this.listener.OnOpen(this)
}
ticker = time.NewTicker(this.peroid) ticker = time.NewTicker(this.peroid)
LOOP: LOOP:
for { for {
...@@ -381,10 +372,10 @@ LOOP: ...@@ -381,10 +372,10 @@ LOOP:
log.Info("%s, [session.handleLoop] got done signal ", this.Stat()) log.Info("%s, [session.handleLoop] got done signal ", this.Stat())
break LOOP break LOOP
case inPkg = <-this.rQ: case inPkg = <-this.rQ:
if this.listener != nil {
this.listener.OnMessage(this, inPkg) this.listener.OnMessage(this, inPkg)
this.incReadPkgCount() this.incReadPkgCount()
}
case outPkg = <-this.wQ: case outPkg = <-this.wQ:
if err = this.pkgHandler.Write(this, outPkg); err != nil { if err = this.pkgHandler.Write(this, outPkg); err != nil {
log.Error("%s, [session.handleLoop] = error{%+v}", this.sessionToken(), err) log.Error("%s, [session.handleLoop] = error{%+v}", this.sessionToken(), err)
...@@ -392,9 +383,7 @@ LOOP: ...@@ -392,9 +383,7 @@ LOOP:
} }
this.incWritePkgCount() this.incWritePkgCount()
case <-ticker.C: case <-ticker.C:
if this.listener != nil { this.listener.OnCron(this)
this.listener.OnCron(this)
}
} }
} }
ticker.Stop() ticker.Stop()
...@@ -418,10 +407,10 @@ LAST: ...@@ -418,10 +407,10 @@ LAST:
} }
this.incWritePkgCount() this.incWritePkgCount()
case inPkg = <-this.rQ: case inPkg = <-this.rQ:
if this.listener != nil {
this.listener.OnMessage(this, inPkg) this.listener.OnMessage(this, inPkg)
this.incReadPkgCount() this.incReadPkgCount()
}
default: default:
log.Info("%s, [session.handleLoop] default", this.sessionToken()) log.Info("%s, [session.handleLoop] default", this.sessionToken())
break LAST break LAST
...@@ -452,7 +441,7 @@ func (this *Session) handlePackage() { ...@@ -452,7 +441,7 @@ func (this *Session) handlePackage() {
close(this.readerDone) close(this.readerDone)
grNum = atomic.AddInt32(&(this.grNum), -1) grNum = atomic.AddInt32(&(this.grNum), -1)
log.Info("%s, [session.handlePackage] gr will exit now, left gr num %d", this.sessionToken(), grNum) log.Info("%s, [session.handlePackage] gr will exit now, left gr num %d", this.sessionToken(), grNum)
if errFlag && this.listener != nil { if errFlag {
log.Info("%s, [session.handlePackage] errFlag", this.sessionToken()) log.Info("%s, [session.handlePackage] errFlag", this.sessionToken())
this.listener.OnError(this, err) this.listener.OnError(this, err)
} }
...@@ -492,7 +481,7 @@ func (this *Session) handlePackage() { ...@@ -492,7 +481,7 @@ func (this *Session) handlePackage() {
break break
} }
// pkg, err = this.pkgHandler.Read(this, pktBuf) // pkg, err = this.pkgHandler.Read(this, pktBuf)
pkg, err = this.pkgHandler.Read(this, pktBuf.Bytes()) pkg, len, err = this.pkgHandler.Read(this, pktBuf.Bytes())
if err != nil { if err != nil {
log.Info("%s, [session.pkgHandler.Read] = error{%+v}", this.sessionToken(), err) log.Info("%s, [session.pkgHandler.Read] = error{%+v}", this.sessionToken(), err)
errFlag = true errFlag = true
...@@ -503,9 +492,44 @@ func (this *Session) handlePackage() { ...@@ -503,9 +492,44 @@ func (this *Session) handlePackage() {
break break
} }
this.rQ <- pkg this.rQ <- pkg
pktBuf.Next(len)
} }
if exit { if exit {
break break
} }
} }
} }
func (this *Session) stop() {
select {
case <-this.done:
return
default:
this.once.Do(func() { close(this.done) })
}
}
// this function will be invoked by NewSessionCallback(if return error is not nil) or (Session)handleLoop automatically.
// It is goroutine-safe to be invoked many times.
func (this *Session) Close() error {
this.stop()
log.Info("%s closed now, its current gr num %d",
this.sessionToken(), atomic.LoadInt32(&(this.grNum)))
this.lock.Lock()
if this.attrs != nil {
this.attrs = nil
select {
case <-this.readerDone:
default:
close(this.readerDone)
}
close(this.wQ)
this.wQ = nil
close(this.rQ)
this.rQ = nil
this.gettyConn.close((int)((int64)(this.wait)))
}
this.lock.Unlock()
return nil
}
...@@ -10,8 +10,6 @@ ...@@ -10,8 +10,6 @@
package getty package getty
import ( import (
"bytes"
"encoding/binary"
"net" "net"
"strconv" "strconv"
) )
...@@ -20,23 +18,3 @@ import ( ...@@ -20,23 +18,3 @@ import (
func HostAddress(host string, port int) string { func HostAddress(host string, port int) string {
return net.JoinHostPort(host, strconv.Itoa(port)) return net.JoinHostPort(host, strconv.Itoa(port))
} }
////////////////////////////////////////
// enc/dec
////////////////////////////////////////
func Int2Bytes(x int32) []byte {
var buf = bytes.NewBuffer([]byte{})
binary.Write(buf, binary.BigEndian, x)
return buf.Bytes()
}
func Bytes2Int(b []byte) int32 {
var (
x int32
buf *bytes.Buffer
)
buf = bytes.NewBuffer(b)
binary.Read(buf, binary.BigEndian, &x)
return x
}
...@@ -10,5 +10,5 @@ ...@@ -10,5 +10,5 @@
package getty package getty
var ( var (
Version = "0.3.03" Version = "0.3.04"
) )
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment