Commit 8abb0aa7 authored by AlexStocks's avatar AlexStocks

add Session interface

parent f7505d10
......@@ -11,6 +11,11 @@
## develop history ##
---
- 2017/02/03
> 1 Session struct -> session struct and add Session interface
>
> 2 version: 0.7.0
- 2016/11/19
> 1 add conn.go:(*gettyWSConn) setCompressType to add zip compress feature for ws connection
>
......
......@@ -21,6 +21,7 @@ import (
)
import (
"github.com/AlexStocks/goext/sync"
log "github.com/AlexStocks/log4go"
"github.com/gorilla/websocket"
)
......@@ -42,10 +43,10 @@ type Client struct {
interval time.Duration
addr string
newSession NewSessionCallback
sessionMap map[*Session]empty
sessionMap map[Session]gxsync.Empty
sync.Once
done chan empty
done chan gxsync.Empty
wg sync.WaitGroup
// for wss client
......@@ -68,8 +69,8 @@ func NewClient(connNum int, connInterval time.Duration, serverAddr string) *Clie
number: connNum,
interval: connInterval,
addr: serverAddr,
sessionMap: make(map[*Session]empty, connNum),
done: make(chan empty),
sessionMap: make(map[Session]gxsync.Empty, connNum),
done: make(chan gxsync.Empty),
}
}
......@@ -90,13 +91,13 @@ func NewWSSClient(connNum int, connInterval time.Duration, serverAddr string, ce
number: connNum,
interval: connInterval,
addr: serverAddr,
sessionMap: make(map[*Session]empty, connNum),
done: make(chan empty),
sessionMap: make(map[Session]gxsync.Empty, connNum),
done: make(chan gxsync.Empty),
certFile: cert,
}
}
func (this *Client) dialTCP() *Session {
func (this *Client) dialTCP() Session {
var (
err error
conn net.Conn
......@@ -120,12 +121,12 @@ func (this *Client) dialTCP() *Session {
}
}
func (this *Client) dialWS() *Session {
func (this *Client) dialWS() Session {
var (
err error
dialer websocket.Dialer
conn *websocket.Conn
session *Session
session Session
)
dialer.EnableCompression = true
......@@ -139,8 +140,8 @@ func (this *Client) dialWS() *Session {
}
if err == nil {
session = NewWSSession(conn)
if session.maxMsgLen > 0 {
conn.SetReadLimit(int64(session.maxMsgLen))
if session.(*session).maxMsgLen > 0 {
conn.SetReadLimit(int64(session.(*session).maxMsgLen))
}
return session
......@@ -152,14 +153,14 @@ func (this *Client) dialWS() *Session {
}
}
func (this *Client) dialWSS() *Session {
func (this *Client) dialWSS() Session {
var (
err error
certPem []byte
certPool *x509.CertPool
dialer websocket.Dialer
conn *websocket.Conn
session *Session
session Session
)
dialer.EnableCompression = true
......@@ -184,8 +185,8 @@ func (this *Client) dialWSS() *Session {
}
if err == nil {
session = NewWSSession(conn)
if session.maxMsgLen > 0 {
conn.SetReadLimit(int64(session.maxMsgLen))
if session.(*session).maxMsgLen > 0 {
conn.SetReadLimit(int64(session.(*session).maxMsgLen))
}
return session
......@@ -197,7 +198,7 @@ func (this *Client) dialWSS() *Session {
}
}
func (this *Client) dial() *Session {
func (this *Client) dial() Session {
if strings.HasPrefix(this.addr, "ws") {
return this.dialWS()
} else if strings.HasPrefix(this.addr, "wss") {
......@@ -225,7 +226,7 @@ func (this *Client) sessionNum() int {
func (this *Client) connect() {
var (
err error
session *Session
session Session
)
for {
......@@ -237,9 +238,9 @@ func (this *Client) connect() {
err = this.newSession(session)
if err == nil {
// session.RunEventLoop()
session.run()
session.(*session).run()
this.Lock()
this.sessionMap[session] = empty{}
this.sessionMap[session] = gxsync.Empty{}
this.Unlock()
break
}
......
......@@ -12,19 +12,19 @@ package getty
// NewSessionCallback will be invoked when server accepts a new client connection or client connects to server successfully.
// if there are too many client connections or u do not want to connect a server again, u can return non-nil error. And
// then getty will close the new session.
type NewSessionCallback func(*Session) error
type NewSessionCallback func(Session) error
// Reader is used to unmarshal a complete pkg from buffer
type Reader interface {
// Parse tcp pkg from buffer and if possible return a complete pkg
// If length of buf is not long enough, u should return {nil,0, nil}
// The second return value is the length of the pkg.
Read(*Session, []byte) (interface{}, int, error)
Read(Session, []byte) (interface{}, int, error)
}
// Writer is used to marshal pkg and write to session
type Writer interface {
Write(*Session, interface{}) error
Write(Session, interface{}) error
}
// tcp package handler interface
......@@ -37,19 +37,19 @@ type ReadWriter interface {
type EventListener interface {
// invoked when session opened
// If the return error is not nil, @Session will be closed.
OnOpen(*Session) error
OnOpen(Session) error
// invoked when session closed.
OnClose(*Session)
OnClose(Session)
// invoked when got error.
OnError(*Session, error)
OnError(Session, error)
// invoked periodically, its period can be set by (Session)SetCronPeriod
OnCron(*Session)
OnCron(Session)
// invoked when receive packge. Pls attention that do not handle long time logic processing in this func.
// Y'd better set the package's maximum length. If the message's length is greater than it, u should
// should return err in Reader{Read} and getty will close this connection soon.
OnMessage(*Session, interface{})
OnMessage(Session, interface{})
}
......@@ -48,20 +48,24 @@ const (
// connection interfacke
/////////////////////////////////////////
type iConn interface {
id() uint32
setCompressType(t CompressType)
localAddr() string
remoteAddr() string
type Connection interface {
ID() uint32
SetCompressType(t CompressType)
LocalAddr() string
RemoteAddr() string
incReadPkgCount()
incWritePkgCount()
updateActive()
getActive() time.Time
// update session's active time
UpdateActive()
// get session's active time
GetActive() time.Time
readDeadline() time.Duration
setReadDeadline(time.Duration)
// SetReadDeadline sets deadline for the future read calls.
SetReadDeadline(time.Duration)
writeDeadline() time.Duration
setWriteDeadline(time.Duration)
write(p []byte) error
// SetWriteDeadlile sets deadline for the future read calls.
SetWriteDeadline(time.Duration)
Write(p []byte) error
// don't distinguish between tcp connection and websocket connection. Because
// gorilla/websocket/conn.go:(Conn)Close also invoke net.Conn.Close
close(int)
......@@ -76,7 +80,7 @@ var (
)
type gettyConn struct {
ID uint32
id uint32
compress CompressType
padding1 uint8
padding2 uint16
......@@ -91,15 +95,15 @@ type gettyConn struct {
peer string // peer address
}
func (this *gettyConn) id() uint32 {
return this.ID
func (this *gettyConn) ID() uint32 {
return this.id
}
func (this *gettyConn) localAddr() string {
func (this *gettyConn) LocalAddr() string {
return this.local
}
func (this *gettyConn) remoteAddr() string {
func (this *gettyConn) RemoteAddr() string {
return this.peer
}
......@@ -111,15 +115,15 @@ func (this *gettyConn) incWritePkgCount() {
atomic.AddUint32(&this.writePkgCount, 1)
}
func (this *gettyConn) updateActive() {
func (this *gettyConn) UpdateActive() {
atomic.StoreInt64(&(this.active), int64(time.Since(launchTime)))
}
func (this *gettyConn) getActive() time.Time {
func (this *gettyConn) GetActive() time.Time {
return launchTime.Add(time.Duration(atomic.LoadInt64(&(this.active))))
}
func (this *gettyConn) write([]byte) error {
func (this *gettyConn) Write([]byte) error {
return nil
}
......@@ -129,7 +133,7 @@ func (this gettyConn) readDeadline() time.Duration {
return this.rDeadline
}
func (this *gettyConn) setReadDeadline(rDeadline time.Duration) {
func (this *gettyConn) SetReadDeadline(rDeadline time.Duration) {
if rDeadline < 1 {
panic("@rDeadline < 1")
}
......@@ -144,7 +148,7 @@ func (this gettyConn) writeDeadline() time.Duration {
return this.wDeadline
}
func (this *gettyConn) setWriteDeadline(wDeadline time.Duration) {
func (this *gettyConn) SetWriteDeadline(wDeadline time.Duration) {
if wDeadline < 1 {
panic("@wDeadline < 1")
}
......@@ -182,7 +186,7 @@ func newGettyTCPConn(conn net.Conn) *gettyTCPConn {
reader: io.Reader(conn),
writer: io.Writer(conn),
gettyConn: gettyConn{
ID: atomic.AddUint32(&connID, 1),
id: atomic.AddUint32(&connID, 1),
local: localAddr,
peer: peerAddr,
compress: CompressNone,
......@@ -213,7 +217,7 @@ func (this *writeFlusher) Write(p []byte) (int, error) {
}
// set compress type(tcp: zip/snappy, websocket:zip)
func (this *gettyTCPConn) setCompressType(t CompressType) {
func (this *gettyTCPConn) SetCompressType(t CompressType) {
switch {
case t == CompressZip:
this.reader = flate.NewReader(this.conn)
......@@ -244,7 +248,7 @@ func (this *gettyTCPConn) read(p []byte) (int, error) {
}
// tcp connection write
func (this *gettyTCPConn) write(p []byte) error {
func (this *gettyTCPConn) Write(p []byte) error {
// if this.conn == nil {
// return 0, ErrInvalidConnection
// }
......@@ -296,7 +300,7 @@ func newGettyWSConn(conn *websocket.Conn) *gettyWSConn {
gettyWSConn := &gettyWSConn{
conn: conn,
gettyConn: gettyConn{
ID: atomic.AddUint32(&connID, 1),
id: atomic.AddUint32(&connID, 1),
local: localAddr,
peer: peerAddr,
},
......@@ -309,7 +313,7 @@ func newGettyWSConn(conn *websocket.Conn) *gettyWSConn {
}
// set compress type(tcp: zip/snappy, websocket:zip)
func (this *gettyWSConn) setCompressType(t CompressType) {
func (this *gettyWSConn) SetCompressType(t CompressType) {
switch {
case t == CompressZip:
this.conn.EnableWriteCompression(true)
......@@ -328,14 +332,14 @@ func (this *gettyWSConn) handlePing(message string) error {
err = nil
}
if err == nil {
this.updateActive()
this.UpdateActive()
}
return err
}
func (this *gettyWSConn) handlePong(string) error {
this.updateActive()
this.UpdateActive()
return nil
}
......@@ -356,7 +360,7 @@ func (this *gettyWSConn) read() ([]byte, error) {
}
// websocket connection write
func (this *gettyWSConn) write(p []byte) error {
func (this *gettyWSConn) Write(p []byte) error {
// atomic.AddUint32(&this.writeCount, 1)
atomic.AddUint32(&this.writeCount, (uint32)(len(p)))
// this.conn.SetWriteDeadline(time.Now().Add(this.wDeadline))
......
......@@ -20,6 +20,7 @@ import (
import (
"github.com/AlexStocks/goext/net"
"github.com/AlexStocks/goext/sync"
log "github.com/AlexStocks/log4go"
"github.com/gorilla/websocket"
)
......@@ -34,12 +35,12 @@ type Server struct {
listener net.Listener
sync.Once
done chan empty
done chan gxsync.Empty
wg sync.WaitGroup
}
func NewServer() *Server {
return &Server{done: make(chan empty)}
return &Server{done: make(chan gxsync.Empty)}
}
func (this *Server) stop() {
......@@ -94,7 +95,7 @@ func (this *Server) RunEventloop(newSession NewSessionCallback) {
defer this.wg.Done()
var (
err error
client *Session
client Session
delay time.Duration
)
for {
......@@ -123,7 +124,7 @@ func (this *Server) RunEventloop(newSession NewSessionCallback) {
}
delay = 0
// client.RunEventLoop()
client.run()
client.(*session).run()
}
}()
}
......@@ -178,11 +179,11 @@ func (this *wsHandler) serveWSRequest(w http.ResponseWriter, r *http.Request) {
log.Warn("Server{%s}.newSession(session{%#v}) = err {%#v}", this.server.addr, session, err)
return
}
if session.maxMsgLen > 0 {
conn.SetReadLimit(int64(session.maxMsgLen))
if session.(*session).maxMsgLen > 0 {
conn.SetReadLimit(int64(session.(*session).maxMsgLen))
}
// session.RunEventLoop()
session.run()
session.(*session).run()
}
// RunWSEventLoop serve websocket client request
......@@ -253,7 +254,7 @@ func (this *Server) Listener() net.Listener {
return this.listener
}
func (this *Server) accept(newSession NewSessionCallback) (*Session, error) {
func (this *Server) accept(newSession NewSessionCallback) (Session, error) {
conn, err := this.listener.Accept()
if err != nil {
return nil, err
......
......@@ -21,6 +21,7 @@ import (
)
import (
"github.com/AlexStocks/goext/sync"
"github.com/AlexStocks/goext/time"
log "github.com/AlexStocks/log4go"
"github.com/gorilla/websocket"
......@@ -31,7 +32,7 @@ const (
netIOTimeout = 1e9 // 1s
period = 60 * 1e9 // 1 minute
pendingDuration = 3e9
defaultSessionName = "Session"
defaultSessionName = "session"
outputFormat = "session %s, Read Count: %d, Write Count: %d, Read Pkg Count: %d, Write Pkg Count: %d"
)
......@@ -40,8 +41,8 @@ const (
/////////////////////////////////////////
var (
ErrSessionClosed = errors.New("Session Already Closed")
ErrSessionBlocked = errors.New("Session Full Blocked")
ErrSessionClosed = errors.New("session Already Closed")
ErrSessionBlocked = errors.New("session Full Blocked")
ErrMsgTooLong = errors.New("Message Too Long")
)
......@@ -49,20 +50,46 @@ var (
wheel = gxtime.NewWheel(gxtime.TimeMillisecondDuration(100), 1200) // wheel longest span is 2 minute
)
type empty struct{}
type Session interface {
Connection
Reset()
Conn() net.Conn
Stat() string
IsClosed() bool
SetMaxMsgLen(int)
SetName(string)
SetEventListener(EventListener)
SetPkgHandler(ReadWriter)
SetReader(Reader)
SetWriter(Writer)
SetCronPeriod(int)
SetRQLen(int)
SetWQLen(int)
SetWaitTime(time.Duration)
GetAttribute(string) interface{}
SetAttribute(string, interface{})
RemoveAttribute(string)
WritePkg(interface{}) error
WriteBytes([]byte) error
WriteBytesArray(...[]byte) error
Close()
}
// getty base session
type Session struct {
type session struct {
name string
maxMsgLen int32
// net read write
iConn
// net read Write
Connection
// pkgHandler ReadWriter
reader Reader // @reader should be nil when @conn is a gettyWSConn object.
writer Writer
listener EventListener
once sync.Once
done chan empty
done chan gxsync.Empty
// errFlag bool
period time.Duration
......@@ -77,113 +104,94 @@ type Session struct {
lock sync.RWMutex
}
func NewSession() *Session {
session := &Session{
func NewSession() Session {
session := &session{
name: defaultSessionName,
done: make(chan empty),
done: make(chan gxsync.Empty),
period: period,
wait: pendingDuration,
attrs: make(map[string]interface{}),
}
session.setWriteDeadline(netIOTimeout)
session.setReadDeadline(netIOTimeout)
session.SetWriteDeadline(netIOTimeout)
session.SetReadDeadline(netIOTimeout)
return session
}
func NewTCPSession(conn net.Conn) *Session {
session := &Session{
name: defaultSessionName,
iConn: newGettyTCPConn(conn),
done: make(chan empty),
period: period,
wait: pendingDuration,
attrs: make(map[string]interface{}),
func NewTCPSession(conn net.Conn) Session {
session := &session{
name: defaultSessionName,
Connection: newGettyTCPConn(conn),
done: make(chan gxsync.Empty),
period: period,
wait: pendingDuration,
attrs: make(map[string]interface{}),
}
session.setWriteDeadline(netIOTimeout)
session.setReadDeadline(netIOTimeout)
session.SetWriteDeadline(netIOTimeout)
session.SetReadDeadline(netIOTimeout)
return session
}
func NewWSSession(conn *websocket.Conn) *Session {
session := &Session{
name: defaultSessionName,
iConn: newGettyWSConn(conn),
done: make(chan empty),
period: period,
wait: pendingDuration,
attrs: make(map[string]interface{}),
func NewWSSession(conn *websocket.Conn) Session {
session := &session{
name: defaultSessionName,
Connection: newGettyWSConn(conn),
done: make(chan gxsync.Empty),
period: period,
wait: pendingDuration,
attrs: make(map[string]interface{}),
}
session.setWriteDeadline(netIOTimeout)
session.setReadDeadline(netIOTimeout)
session.SetWriteDeadline(netIOTimeout)
session.SetReadDeadline(netIOTimeout)
return session
}
func (this *Session) Reset() {
func (this *session) Reset() {
this.name = defaultSessionName
this.once = sync.Once{}
this.done = make(chan empty)
this.done = make(chan gxsync.Empty)
// this.errFlag = false
this.period = period
this.wait = pendingDuration
this.attrs = make(map[string]interface{})
this.grNum = 0
this.setWriteDeadline(netIOTimeout)
this.setReadDeadline(netIOTimeout)
this.SetWriteDeadline(netIOTimeout)
this.SetReadDeadline(netIOTimeout)
}
// func (this *Session) SetConn(conn net.Conn) { this.gettyConn = newGettyConn(conn) }
func (this *Session) Conn() net.Conn {
if tc, ok := this.iConn.(*gettyTCPConn); ok {
// func (this *session) SetConn(conn net.Conn) { this.gettyConn = newGettyConn(conn) }
func (this *session) Conn() net.Conn {
if tc, ok := this.Connection.(*gettyTCPConn); ok {
return tc.conn
}
if wc, ok := this.iConn.(*gettyWSConn); ok {
if wc, ok := this.Connection.(*gettyWSConn); ok {
return wc.conn.UnderlyingConn()
}
return nil
}
func (this *Session) gettyConn() *gettyConn {
if tc, ok := this.iConn.(*gettyTCPConn); ok {
func (this *session) gettyConn() *gettyConn {
if tc, ok := this.Connection.(*gettyTCPConn); ok {
return &(tc.gettyConn)
}
if wc, ok := this.iConn.(*gettyWSConn); ok {
if wc, ok := this.Connection.(*gettyWSConn); ok {
return &(wc.gettyConn)
}
return nil
}
// get session ID
func (this *Session) ID() uint32 {
return this.iConn.id()
}
func (this *Session) SetCompressType(t CompressType) {
this.iConn.setCompressType(t)
}
// get local address
func (this *Session) LocalAddr() string {
return this.iConn.localAddr()
}
// get peer address
func (this *Session) RemoteAddr() string {
return this.iConn.remoteAddr()
}
// return the connect statistic data
func (this *Session) Stat() string {
func (this *session) Stat() string {
var conn *gettyConn
if conn = this.gettyConn(); conn == nil {
return ""
......@@ -199,7 +207,7 @@ func (this *Session) Stat() string {
}
// check whether the session has been closed.
func (this *Session) IsClosed() bool {
func (this *session) IsClosed() bool {
select {
case <-this.done:
return true
......@@ -210,35 +218,35 @@ func (this *Session) IsClosed() bool {
}
// set maximum pacakge length of every pacakge in (EventListener)OnMessage(@pkgs)
func (this *Session) SetMaxMsgLen(length int) { this.maxMsgLen = int32(length) }
func (this *session) SetMaxMsgLen(length int) { this.maxMsgLen = int32(length) }
// set session name
func (this *Session) SetName(name string) { this.name = name }
func (this *session) SetName(name string) { this.name = name }
// set EventListener
func (this *Session) SetEventListener(listener EventListener) {
func (this *session) SetEventListener(listener EventListener) {
this.listener = listener
}
// set package handler
func (this *Session) SetPkgHandler(handler ReadWriter) {
func (this *session) SetPkgHandler(handler ReadWriter) {
this.reader = handler
this.writer = handler
// this.pkgHandler = handler
}
// set Reader
func (this *Session) SetReader(reader Reader) {
func (this *session) SetReader(reader Reader) {
this.reader = reader
}
// set Writer
func (this *Session) SetWriter(writer Writer) {
func (this *session) SetWriter(writer Writer) {
this.writer = writer
}
// period is in millisecond. Websocket session will send ping frame automatically every peroid.
func (this *Session) SetCronPeriod(period int) {
func (this *session) SetCronPeriod(period int) {
if period < 1 {
panic("@period < 1")
}
......@@ -248,8 +256,8 @@ func (this *Session) SetCronPeriod(period int) {
this.lock.Unlock()
}
// set @Session's read queue size
func (this *Session) SetRQLen(readQLen int) {
// set @session's read queue size
func (this *session) SetRQLen(readQLen int) {
if readQLen < 1 {
panic("@readQLen < 1")
}
......@@ -260,8 +268,8 @@ func (this *Session) SetRQLen(readQLen int) {
this.lock.Unlock()
}
// set @Session's write queue size
func (this *Session) SetWQLen(writeQLen int) {
// set @session's Write queue size
func (this *session) SetWQLen(writeQLen int) {
if writeQLen < 1 {
panic("@writeQLen < 1")
}
......@@ -272,22 +280,8 @@ func (this *Session) SetWQLen(writeQLen int) {
this.lock.Unlock()
}
// SetReadDeadline sets deadline for the future read calls.
func (this *Session) SetReadDeadline(rDeadline time.Duration) {
this.lock.Lock()
this.setReadDeadline(rDeadline)
this.lock.Unlock()
}
// SetWriteDeadlile sets deadline for the future read calls.
func (this *Session) SetWriteDeadline(wDeadline time.Duration) {
this.lock.Lock()
this.setWriteDeadline(wDeadline)
this.lock.Unlock()
}
// set maximum wait time when session got error or got exit signal
func (this *Session) SetWaitTime(waitTime time.Duration) {
func (this *session) SetWaitTime(waitTime time.Duration) {
if waitTime < 1 {
panic("@wait < 1")
}
......@@ -298,7 +292,7 @@ func (this *Session) SetWaitTime(waitTime time.Duration) {
}
// set attribute of key @session:key
func (this *Session) GetAttribute(key string) interface{} {
func (this *session) GetAttribute(key string) interface{} {
var ret interface{}
this.lock.RLock()
ret = this.attrs[key]
......@@ -307,35 +301,25 @@ func (this *Session) GetAttribute(key string) interface{} {
}
// get attribute of key @session:key
func (this *Session) SetAttribute(key string, value interface{}) {
func (this *session) SetAttribute(key string, value interface{}) {
this.lock.Lock()
this.attrs[key] = value
this.lock.Unlock()
}
// delete attribute of key @session:key
func (this *Session) RemoveAttribute(key string) {
func (this *session) RemoveAttribute(key string) {
this.lock.Lock()
delete(this.attrs, key)
this.lock.Unlock()
}
// update session's active time
func (this *Session) UpdateActive() {
this.updateActive()
}
// get session's active time
func (this *Session) GetActive() time.Time {
return this.getActive()
}
func (this *Session) sessionToken() string {
return fmt.Sprintf("{%s:%d:%s<->%s}", this.name, this.iConn.id(), this.iConn.localAddr(), this.iConn.remoteAddr())
func (this *session) sessionToken() string {
return fmt.Sprintf("{%s:%d:%s<->%s}", this.name, this.ID(), this.LocalAddr(), this.RemoteAddr())
}
// Queued write, for handler
func (this *Session) WritePkg(pkg interface{}) error {
// Queued Write, for handler
func (this *session) WritePkg(pkg interface{}) error {
if this.IsClosed() {
return ErrSessionClosed
}
......@@ -369,24 +353,24 @@ func (this *Session) WritePkg(pkg interface{}) error {
}
// for codecs
func (this *Session) WriteBytes(pkg []byte) error {
func (this *session) WriteBytes(pkg []byte) error {
if this.IsClosed() {
return ErrSessionClosed
}
// this.conn.SetWriteDeadline(time.Now().Add(this.wDeadline))
return this.iConn.write(pkg)
return this.Connection.Write(pkg)
}
// write multiple packages at once
func (this *Session) WriteBytesArray(pkgs ...[]byte) error {
// Write multiple packages at once
func (this *session) WriteBytesArray(pkgs ...[]byte) error {
if this.IsClosed() {
return ErrSessionClosed
}
// this.conn.SetWriteDeadline(time.Now().Add(this.wDeadline))
if len(pkgs) == 1 {
return this.iConn.write(pkgs[0])
return this.Connection.Write(pkgs[0])
}
// get len
......@@ -408,27 +392,27 @@ func (this *Session) WriteBytesArray(pkgs ...[]byte) error {
l += len(pkgs[i])
}
// return this.iConn.write(arr)
// return this.Connection.Write(arr)
return this.WriteBytes(arr)
}
// func (this *Session) RunEventLoop() {
func (this *Session) run() {
// func (this *session) RunEventLoop() {
func (this *session) run() {
if this.rQ == nil || this.wQ == nil {
errStr := fmt.Sprintf("Session{name:%s, rQ:%#v, wQ:%#v}",
errStr := fmt.Sprintf("session{name:%s, rQ:%#v, wQ:%#v}",
this.name, this.rQ, this.wQ)
log.Error(errStr)
panic(errStr)
}
if this.iConn == nil || this.listener == nil || this.writer == nil {
errStr := fmt.Sprintf("Session{name:%s, conn:%#v, listener:%#v, writer:%#v}",
this.name, this.iConn, this.listener, this.writer)
if this.Connection == nil || this.listener == nil || this.writer == nil {
errStr := fmt.Sprintf("session{name:%s, conn:%#v, listener:%#v, writer:%#v}",
this.name, this.Connection, this.listener, this.writer)
log.Error(errStr)
panic(errStr)
}
// call session opened
this.updateActive()
this.UpdateActive()
if err := this.listener.OnOpen(this); err != nil {
this.Close()
return
......@@ -439,7 +423,7 @@ func (this *Session) run() {
go this.handlePackage()
}
func (this *Session) handleLoop() {
func (this *session) handleLoop() {
var (
err error
flag bool
......@@ -471,8 +455,8 @@ func (this *Session) handleLoop() {
this.gc()
}()
wsConn, wsFlag = this.iConn.(*gettyWSConn)
flag = true // do not do any read/write/cron operation while got write error
wsConn, wsFlag = this.Connection.(*gettyWSConn)
flag = true // do not do any read/Write/cron operation while got Write error
// ticker = time.NewTicker(this.period) // use wheel instead, 2016/09/26
LOOP:
for {
......@@ -480,9 +464,9 @@ LOOP:
// It choose one at random if multiple are ready. Otherwise it choose default branch if none is ready.
select {
case <-this.done:
// 这个分支确保(Session)handleLoop gr在(Session)handlePackage gr之后退出
// 这个分支确保(session)handleLoop gr在(session)handlePackage gr之后退出
// once.Do(func() { ticker.Stop() }) // use wheel instead, 2016/09/26
if atomic.LoadInt32(&(this.grNum)) == 1 { // make sure @(Session)handlePackage goroutine has been closed.
if atomic.LoadInt32(&(this.grNum)) == 1 { // make sure @(session)handlePackage goroutine has been closed.
if len(this.rQ) == 0 && len(this.wQ) == 0 {
log.Info("%s, [session.handleLoop] got done signal. Both rQ and wQ are nil.", this.Stat())
break LOOP
......@@ -496,7 +480,7 @@ LOOP:
}
case inPkg = <-this.rQ:
// 这个条件分支通过(Session)rQ排空确保(Session)handlePackage gr不会阻塞在(Session)rQ上
// 这个条件分支通过(session)rQ排空确保(session)handlePackage gr不会阻塞在(session)rQ上
if flag {
this.listener.OnMessage(this, inPkg)
this.incReadPkgCount()
......@@ -534,7 +518,7 @@ LOOP:
// once.Do(func() { ticker.Stop() }) // use wheel instead, 2016/09/26
}
func (this *Session) handlePackage() {
func (this *session) handlePackage() {
var (
err error
)
......@@ -559,21 +543,21 @@ func (this *Session) handlePackage() {
}
}()
if _, ok := this.iConn.(*gettyTCPConn); ok {
if _, ok := this.Connection.(*gettyTCPConn); ok {
if this.reader == nil {
errStr := fmt.Sprintf("Session{name:%s, conn:%#v, reader:%#v}", this.name, this.iConn, this.reader)
errStr := fmt.Sprintf("session{name:%s, conn:%#v, reader:%#v}", this.name, this.Connection, this.reader)
log.Error(errStr)
panic(errStr)
}
err = this.handleTCPPackage()
} else if _, ok := this.iConn.(*gettyWSConn); ok {
} else if _, ok := this.Connection.(*gettyWSConn); ok {
err = this.handleWSPackage()
}
}
// get package from tcp stream(packet)
func (this *Session) handleTCPPackage() error {
func (this *session) handleTCPPackage() error {
var (
ok bool
err error
......@@ -589,7 +573,7 @@ func (this *Session) handleTCPPackage() error {
buf = make([]byte, maxReadBufLen)
pktBuf = new(bytes.Buffer)
conn = this.iConn.(*gettyTCPConn)
conn = this.Connection.(*gettyTCPConn)
for {
if this.IsClosed() {
err = nil
......@@ -638,7 +622,7 @@ func (this *Session) handleTCPPackage() error {
if pkg == nil {
break
}
this.updateActive()
this.UpdateActive()
this.rQ <- pkg
pktBuf.Next(pkgLen)
}
......@@ -651,7 +635,7 @@ func (this *Session) handleTCPPackage() error {
}
// get package from websocket stream
func (this *Session) handleWSPackage() error {
func (this *session) handleWSPackage() error {
var (
ok bool
err error
......@@ -662,7 +646,7 @@ func (this *Session) handleWSPackage() error {
unmarshalPkg interface{}
)
conn = this.iConn.(*gettyWSConn)
conn = this.Connection.(*gettyWSConn)
for {
if this.IsClosed() {
break
......@@ -676,7 +660,7 @@ func (this *Session) handleWSPackage() error {
// this.errFlag = true
return err
}
this.updateActive()
this.UpdateActive()
if this.reader != nil {
unmarshalPkg, length, err = this.reader.Read(this, pkg)
if err == nil && this.maxMsgLen > 0 && length > int(this.maxMsgLen) {
......@@ -695,14 +679,14 @@ func (this *Session) handleWSPackage() error {
return nil
}
func (this *Session) stop() {
func (this *session) stop() {
select {
case <-this.done: // this.done is a blocked channel. if it has not been closed, the default branch will be invoked.
return
default:
this.once.Do(func() {
// let read/write timeout asap
// let read/Write timeout asap
if conn := this.Conn(); conn != nil {
conn.SetReadDeadline(time.Now().Add(this.readDeadline()))
conn.SetWriteDeadline(time.Now().Add(this.writeDeadline()))
......@@ -712,7 +696,7 @@ func (this *Session) stop() {
}
}
func (this *Session) gc() {
func (this *session) gc() {
this.lock.Lock()
if this.attrs != nil {
this.attrs = nil
......@@ -720,14 +704,14 @@ func (this *Session) gc() {
this.wQ = nil
close(this.rQ)
this.rQ = nil
this.iConn.close((int)((int64)(this.wait)))
this.Connection.close((int)((int64)(this.wait)))
}
this.lock.Unlock()
}
// this function will be invoked by NewSessionCallback(if return error is not nil) or (Session)handleLoop automatically.
// this function will be invoked by NewSessionCallback(if return error is not nil) or (session)handleLoop automatically.
// It is goroutine-safe to be invoked many times.
func (this *Session) Close() {
func (this *session) Close() {
this.stop()
log.Info("%s closed now, its current gr num %d", this.sessionToken(), atomic.LoadInt32(&(this.grNum)))
}
......@@ -10,9 +10,9 @@
package getty
const (
Version = "0.6.2"
DATE = "2016/11/19"
Version = "0.7.0"
DATE = "2017/02/03"
GETTY_MAJOR = 0
GETTY_MINOR = 6
GETTY_BUILD = 2
GETTY_MINOR = 7
GETTY_BUILD = 0
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment