Commit 9525bb8a authored by alexstocks's avatar alexstocks

add session.go:Session{maxMsgLen}

parent 63469514
......@@ -21,6 +21,7 @@ import (
import (
"fmt"
log "github.com/AlexStocks/log4go"
"github.com/gorilla/websocket"
)
......@@ -122,9 +123,10 @@ func (this *Client) dialTCP() *Session {
func (this *Client) dialWS() *Session {
var (
err error
conn *websocket.Conn
dialer websocket.Dialer
err error
dialer websocket.Dialer
conn *websocket.Conn
session *Session
)
for {
......@@ -136,7 +138,12 @@ func (this *Client) dialWS() *Session {
err = errSelfConnect
}
if err == nil {
return NewWSSession(conn)
session = NewWSSession(conn)
if session.maxMsgLen > 0 {
conn.SetReadLimit(int64(session.maxMsgLen))
}
return session
}
log.Info("websocket.dialer.Dial(addr:%s) = error{%v}", this.addr, err)
......@@ -150,8 +157,9 @@ func (this *Client) dialWSS() *Session {
err error
certPem []byte
certPool *x509.CertPool
conn *websocket.Conn
dialer websocket.Dialer
conn *websocket.Conn
session *Session
)
certPem, err = ioutil.ReadFile(this.certFile)
......@@ -173,7 +181,12 @@ func (this *Client) dialWSS() *Session {
err = errSelfConnect
}
if err == nil {
return NewWSSession(conn)
session = NewWSSession(conn)
if session.maxMsgLen > 0 {
conn.SetReadLimit(int64(session.maxMsgLen))
}
return session
}
log.Info("websocket.dialer.Dial(addr:%s) = error{%v}", this.addr, err)
......
......@@ -16,7 +16,7 @@ type NewSessionCallback func(*Session) error
// Reader is used to unmarshal a complete pkg from buffer
type Reader interface {
// Parse pkg from buffer and if possible return a complete pkg
// Parse tcp pkg from buffer and if possible return a complete pkg
// If length of buf is not long enough, u should return {nil,0, nil}
// The second return value is the length of the pkg.
Read(*Session, []byte) (interface{}, int, error)
......@@ -27,7 +27,7 @@ type Writer interface {
Write(*Session, interface{}) error
}
// packet handler interface
// tcp packet handler interface
type ReadWriter interface {
Reader
Writer
......
......@@ -23,7 +23,7 @@ var (
// ErrInvalidConnection = errors.New("connection has been closed.")
)
type conn interface {
type iConn interface {
incReadPkgCount()
incWritePkgCount()
write(p []byte) error
......
......@@ -14,7 +14,13 @@
- 2016/10/10
> 1 delete session.go:Session{errFlag} to invoke codec.go:EventListener{OnClose&OnError} both when got a error
>
> 2 version: 0.4.01
> 2 modify session.go:(Session)SetReader
>
> 3 modify session.go:(Session)SetWriter
>
> 4 add modify session.go:(Session)maxMsgLen for websocket session
>
> 5 version: 0.4.01
- 2016/10/09
> 1 add client.go:NewWSSClient
......
......@@ -177,6 +177,9 @@ func (this *wsHandler) ServeWSRequest(w http.ResponseWriter, r *http.Request) {
log.Warn("Server{%s}.newSession(session{%#v}) = err {%#v}", this.server.addr, session, err)
return
}
if session.maxMsgLen > 0 {
conn.SetReadLimit(int64(session.maxMsgLen))
}
session.RunEventLoop()
}
......
......@@ -41,7 +41,8 @@ const (
var (
ErrSessionClosed = errors.New("Session Already Closed")
ErrSessionBlocked = errors.New("Session full blocked")
ErrSessionBlocked = errors.New("Session Full Blocked")
ErrMsgTooLong = errors.New("Message Too Long")
)
var (
......@@ -52,12 +53,13 @@ type empty struct{}
// getty base session
type Session struct {
name string
name string
maxMsgLen int
// net read write
conn
pkgHandler ReadWriter
// reader Reader // @reader should be nil when @conn is a gettyWSConn object.
// writer Writer
iConn
// pkgHandler ReadWriter
reader Reader // @reader should be nil when @conn is a gettyWSConn object.
writer Writer
listener EventListener
once sync.Once
done chan empty
......@@ -92,7 +94,7 @@ func NewSession() *Session {
func NewTCPSession(conn net.Conn) *Session {
return &Session{
name: defaultSessionName,
conn: newGettyTCPConn(conn),
iConn: newGettyTCPConn(conn),
done: make(chan empty),
period: period,
rDeadline: netIOTimeout,
......@@ -105,7 +107,7 @@ func NewTCPSession(conn net.Conn) *Session {
func NewWSSession(conn *websocket.Conn) *Session {
return &Session{
name: defaultSessionName,
conn: newGettyWSConn(conn),
iConn: newGettyWSConn(conn),
done: make(chan empty),
period: period,
rDeadline: netIOTimeout,
......@@ -130,11 +132,11 @@ func (this *Session) Reset() {
// func (this *Session) SetConn(conn net.Conn) { this.gettyConn = newGettyConn(conn) }
func (this *Session) Conn() net.Conn {
if tc, ok := this.conn.(*gettyTCPConn); ok {
if tc, ok := this.iConn.(*gettyTCPConn); ok {
return tc.conn
}
if wc, ok := this.conn.(*gettyWSConn); ok {
if wc, ok := this.iConn.(*gettyWSConn); ok {
return wc.conn.UnderlyingConn()
}
......@@ -142,11 +144,11 @@ func (this *Session) Conn() net.Conn {
}
func (this *Session) gettyConn() *gettyConn {
if tc, ok := this.conn.(*gettyTCPConn); ok {
if tc, ok := this.iConn.(*gettyTCPConn); ok {
return &(tc.gettyConn)
}
if wc, ok := this.conn.(*gettyWSConn); ok {
if wc, ok := this.iConn.(*gettyWSConn); ok {
return &(wc.gettyConn)
}
......@@ -180,7 +182,8 @@ func (this *Session) IsClosed() bool {
}
}
func (this *Session) SetName(name string) { this.name = name }
func (this *Session) SetMaxMsgLen(len int) { this.maxMsgLen = len }
func (this *Session) SetName(name string) { this.name = name }
func (this *Session) SetEventListener(listener EventListener) {
this.listener = listener
......@@ -188,18 +191,18 @@ func (this *Session) SetEventListener(listener EventListener) {
// set package handler
func (this *Session) SetPkgHandler(handler ReadWriter) {
this.pkgHandler = handler
this.reader = handler
this.writer = handler
// this.pkgHandler = handler
}
/*
func (this *Session) SetReader(reader Reader) {
this.reader = reader
}
func (this *Session) SetWriter(writer Reader) {
func (this *Session) SetWriter(writer Writer) {
this.writer = writer
}
*/
// period is in millisecond
func (this *Session) SetCronPeriod(period int) {
......@@ -338,14 +341,14 @@ func (this *Session) WritePkg(pkg interface{}) error {
// for codecs
func (this *Session) WriteBytes(pkg []byte) error {
// this.conn.SetWriteDeadline(time.Now().Add(this.wDeadline))
return this.conn.write(pkg)
return this.iConn.write(pkg)
}
func (this *Session) WriteBytesArray(pkgs ...[]byte) error {
// this.conn.SetWriteDeadline(time.Now().Add(this.wDeadline))
if len(pkgs) == 1 {
return this.conn.write(pkgs[0])
return this.iConn.write(pkgs[0])
}
// get len
......@@ -367,7 +370,7 @@ func (this *Session) WriteBytesArray(pkgs ...[]byte) error {
l += len(pkgs[i])
}
return this.conn.write(arr)
return this.iConn.write(arr)
}
func (this *Session) RunEventLoop() {
......@@ -377,9 +380,9 @@ func (this *Session) RunEventLoop() {
log.Error(errStr)
panic(errStr)
}
if this.conn == nil || this.listener == nil || this.pkgHandler == nil {
errStr := fmt.Sprintf("Session{name:%s, conn:%#v, listener:%#v, pkgHandler:%#v}",
this.name, this.conn, this.listener, this.pkgHandler)
if this.iConn == nil || this.listener == nil || this.writer == nil {
errStr := fmt.Sprintf("Session{name:%s, conn:%#v, listener:%#v, writer:%#v}",
this.name, this.iConn, this.listener, this.writer)
log.Error(errStr)
panic(errStr)
}
......@@ -457,7 +460,7 @@ LOOP:
case outPkg = <-this.wQ:
if flag {
if err = this.pkgHandler.Write(this, outPkg); err != nil {
if err = this.writer.Write(this, outPkg); err != nil {
log.Error("%s, [session.handleLoop] = error{%+v}", this.sessionToken(), err)
this.stop()
flag = false
......@@ -479,7 +482,9 @@ LOOP:
}
func (this *Session) handlePackage() {
var err error
var (
err error
)
defer func() {
var grNum int32
......@@ -501,9 +506,15 @@ func (this *Session) handlePackage() {
}
}()
if _, ok := this.conn.(*gettyTCPConn); ok {
if _, ok := this.iConn.(*gettyTCPConn); ok {
if this.reader == nil {
errStr := fmt.Sprintf("Session{name:%s, conn:%#v, reader:%#v}", this.name, this.iConn, this.reader)
log.Error(errStr)
panic(errStr)
}
err = this.handleTCPPackage()
} else if _, ok := this.conn.(*gettyWSConn); ok {
} else if _, ok := this.iConn.(*gettyWSConn); ok {
err = this.handleWSPackage()
}
}
......@@ -514,7 +525,6 @@ func (this *Session) handleTCPPackage() error {
err error
// nerr net.Error
conn *gettyTCPConn
ok bool
exit bool
bufLen int
pkgLen int
......@@ -523,9 +533,6 @@ func (this *Session) handleTCPPackage() error {
pkg interface{}
)
if conn, ok = this.conn.(*gettyTCPConn); !ok {
panic(fmt.Errorf("@this.conn{%#v} is not a gettyTCPConn object.", this.conn))
}
buf = make([]byte, maxReadBufLen)
pktBuf = new(bytes.Buffer)
for {
......@@ -561,9 +568,12 @@ func (this *Session) handleTCPPackage() error {
break
}
// pkg, err = this.pkgHandler.Read(this, pktBuf)
pkg, pkgLen, err = this.pkgHandler.Read(this, pktBuf.Bytes())
pkg, pkgLen, err = this.reader.Read(this, pktBuf.Bytes())
if err == nil && this.maxMsgLen > 0 && pkgLen > this.maxMsgLen {
err = ErrMsgTooLong
}
if err != nil {
log.Info("%s, [session.pkgHandler.Read] = error{%+v}", this.sessionToken(), err)
log.Warn("%s, [session.handleTCPPackage] = len{%d}, error{%+v}", this.sessionToken(), pkgLen, err)
// for (Codec)OnErr
// this.errFlag = true
exit = true
......@@ -586,15 +596,13 @@ func (this *Session) handleTCPPackage() error {
// get package from websocket stream
func (this *Session) handleWSPackage() error {
var (
err error
conn *gettyWSConn
ok bool
pkg []byte
err error
length int
conn *gettyWSConn
pkg []byte
unmarshalPkg interface{}
)
if conn, ok = this.conn.(*gettyWSConn); !ok {
panic(fmt.Errorf("@this.conn{%#v} is not a gettyWSConn object.", this.conn))
}
for {
if this.IsClosed() {
break // 退出前不再读取任何packet,buf中剩余的stream bytes也不可能凑够一个package, 所以直接退出
......@@ -602,11 +610,23 @@ func (this *Session) handleWSPackage() error {
pkg, err = conn.read()
if err != nil {
log.Info("%s, [session.handleWSPackage.Read] = error{%+v}", this.sessionToken(), err)
log.Warn("%s, [session.handleWSPackage] = error{%+v}", this.sessionToken(), err)
// this.errFlag = true
return err
}
this.rQ <- pkg
if this.reader != nil {
unmarshalPkg, length, err = this.reader.Read(this, pkg)
if err == nil && this.maxMsgLen > 0 && length > this.maxMsgLen {
err = ErrMsgTooLong
}
if err != nil {
log.Warn("%s, [session.handleWSPackage] = len{%d}, error{%+v}", this.sessionToken(), length, err)
} else {
this.rQ <- unmarshalPkg
}
} else {
this.rQ <- pkg
}
}
return nil
......@@ -630,7 +650,7 @@ func (this *Session) gc() {
this.wQ = nil
close(this.rQ)
this.rQ = nil
this.conn.close((int)((int64)(this.wait)))
this.iConn.close((int)((int64)(this.wait)))
}
this.lock.Unlock()
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment