Commit 0e2aa36b authored by alexstocks's avatar alexstocks

add websocket ping/pong logic & (gettyConn)UpdateActive & (gettyConn)GetActive

parent ef9876b2
......@@ -13,20 +13,33 @@ import (
// "errors"
"net"
"sync/atomic"
"time"
)
import (
log "github.com/AlexStocks/log4go"
"github.com/gorilla/websocket"
// log "github.com/AlexStocks/log4go"
)
var (
launchTime time.Time
// ErrInvalidConnection = errors.New("connection has been closed.")
)
func init() {
launchTime = time.Now()
}
/////////////////////////////////////////
// connection interfacke
/////////////////////////////////////////
type iConn interface {
incReadPkgCount()
incWritePkgCount()
UpdateActive()
GetActive() time.Time
write(p []byte) error
close(int)
}
......@@ -41,10 +54,12 @@ var (
type gettyConn struct {
ID uint32
padding uint32 // last active, in milliseconds
readCount uint32 // read() count
writeCount uint32 // write() count
readPkgCount uint32 // send pkg count
writePkgCount uint32 // recv pkg count
active int64 // active
local string // local address
peer string // peer address
}
......@@ -57,6 +72,14 @@ func (this *gettyConn) incWritePkgCount() {
atomic.AddUint32(&this.writePkgCount, 1)
}
func (this *gettyConn) UpdateActive() {
atomic.StoreInt64(&(this.active), int64(time.Since(launchTime)))
}
func (this *gettyConn) GetActive() time.Time {
return launchTime.Add(time.Duration(atomic.LoadInt64(&(this.active))))
}
func (this *gettyConn) write([]byte) error {
return nil
}
......@@ -155,7 +178,7 @@ func newGettyWSConn(conn *websocket.Conn) *gettyWSConn {
peerAddr = conn.RemoteAddr().String()
}
return &gettyWSConn{
gettyWSConn := &gettyWSConn{
conn: *conn,
gettyConn: gettyConn{
ID: atomic.AddUint32(&connID, 1),
......@@ -163,6 +186,15 @@ func newGettyWSConn(conn *websocket.Conn) *gettyWSConn {
peer: peerAddr,
},
}
conn.SetPongHandler(gettyWSConn.handlePong)
return gettyWSConn
}
func (this *gettyWSConn) handlePong(string) error {
log.Debug("get pong package")
this.UpdateActive()
return nil
}
// websocket connection read
......
......@@ -12,11 +12,15 @@
---
- 2016/10/13
> 1 add conn.go:(gettyWSConn)writePing
> 1 add conn.go:(gettyWSConn)writePing which is invoked automatically in session.go:(Session)handleLoop
>
> 2 modify session.go:(Session)handleLoop:Websocket session will send ping frame automatically every peroid.
>
> 3 version: 0.4.03
> 3 add conn.go:(gettyConn)UpdateActive, which can used as (Session)UpdateActive which is invoked by Session automatically.
>
> 4 add conn.go:(gettyConn)GetActive, which can used as (Session)GetActive
>
> 5 version: 0.4.03
- 2016/10/11
> 1 fix bug: use websocket.BinaryMessage in conn.go:(gettyWSConn)write
......
......@@ -146,7 +146,7 @@ func newWSHandler(server *Server, newSession NewSessionCallback) *wsHandler {
}
}
func (this *wsHandler) ServeWSRequest(w http.ResponseWriter, r *http.Request) {
func (this *wsHandler) serveWSRequest(w http.ResponseWriter, r *http.Request) {
log.Debug("get client request:%#v", r)
if r.Method != "GET" {
// w.WriteHeader(http.StatusMethodNotAllowed)
......@@ -195,7 +195,7 @@ func (this *Server) RunWSEventLoop(newSession NewSessionCallback, path string) {
handler *wsHandler
)
handler = newWSHandler(this, newSession)
handler.HandleFunc(path, handler.ServeWSRequest)
handler.HandleFunc(path, handler.serveWSRequest)
err = (&http.Server{
Addr: this.addr,
Handler: handler,
......@@ -231,7 +231,7 @@ func (this *Server) RunWSEventLoopWithTLS(newSession NewSessionCallback, path st
}
handler = newWSHandler(this, newSession)
handler.HandleFunc(path, handler.ServeWSRequest)
handler.HandleFunc(path, handler.serveWSRequest)
server = &http.Server{
Addr: this.addr,
Handler: handler,
......
......@@ -388,6 +388,7 @@ func (this *Session) RunEventLoop() {
}
// call session opened
this.UpdateActive()
if err := this.listener.OnOpen(this); err != nil {
this.Close()
return
......@@ -479,6 +480,7 @@ LOOP:
if flag {
if wsFlag {
err = wsConn.writePing()
log.Debug("wsConn.writePing() = error{%#v}", err)
if err != nil {
log.Warn("wsConn.writePing() = error{%#v}", err)
}
......@@ -592,6 +594,7 @@ func (this *Session) handleTCPPackage() error {
if pkg == nil {
break
}
this.UpdateActive()
this.rQ <- pkg
pktBuf.Next(pkgLen)
}
......@@ -625,6 +628,7 @@ func (this *Session) handleWSPackage() error {
// this.errFlag = true
return err
}
this.UpdateActive()
if this.reader != nil {
unmarshalPkg, length, err = this.reader.Read(this, pkg)
if err == nil && this.maxMsgLen > 0 && length > this.maxMsgLen {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment