Commit 2c81cca2 authored by AlexStocks's avatar AlexStocks

check udp connection alive

parent b796edc5
......@@ -14,6 +14,11 @@
## develop history ##
---
- 2018/03/18
> improvement
* nerr -> netError
* check udp connection alive after connect()
- 2018/03/17
> improvement
* add end point type
......@@ -284,7 +289,7 @@
>
> 4 reconstruct session output token string session.go:(Session)sessionToken
>
> 5 use err instead of nerr in session.go:(Session)handlePackage:defer:OnError
> 5 use err instead of netError in session.go:(Session)handlePackage:defer:OnError
>
> 6 version: 0.2.07
......
......@@ -22,6 +22,7 @@ import (
)
import (
"github.com/AlexStocks/goext/log"
"github.com/AlexStocks/goext/sync"
log "github.com/AlexStocks/log4go"
"github.com/gorilla/websocket"
......@@ -31,6 +32,7 @@ const (
connInterval = 3e9 // 3s
connectTimeout = 5e9
maxTimes = 10
pingPacket = "ping"
)
/////////////////////////////////////////
......@@ -143,8 +145,11 @@ func (c *client) dialUDP() Session {
conn *net.UDPConn
localAddr *net.UDPAddr
peerAddr *net.UDPAddr
length int
buf []byte
)
buf = make([]byte, 128)
localAddr = &net.UDPAddr{IP: net.IPv4zero, Port: 0}
peerAddr, _ = net.ResolveUDPAddr("udp", c.addr)
for {
......@@ -155,6 +160,23 @@ func (c *client) dialUDP() Session {
if err == nil && conn.LocalAddr().String() == conn.RemoteAddr().String() {
err = errSelfConnect
}
// check connection alive by write/read action
copy(buf, []byte(pingPacket))
conn.SetWriteDeadline(wheel.Now().Add(1e9))
if length, err = conn.Write(buf[:len(pingPacket)]); err != nil {
log.Warn("conn.Write(%s) = {length:%d, err:%s}", pingPacket, length, err)
continue
}
conn.SetReadDeadline(wheel.Now().Add(1e9))
length, err = conn.Read(buf)
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
err = nil
}
if err != nil {
log.Info("conn{%#v}.Read() = err{%s}", conn, err)
continue
}
if err == nil {
return newUDPSession(conn, c.endPointType)
}
......
......@@ -21,7 +21,6 @@ import (
import (
"github.com/AlexStocks/goext/context"
"github.com/AlexStocks/goext/log"
"github.com/AlexStocks/goext/sync"
"github.com/AlexStocks/goext/time"
log "github.com/AlexStocks/log4go"
......@@ -461,7 +460,7 @@ LOOP:
case inPkg = <-s.rQ:
// 这个条件分支通过(session)rQ排空确保(session)handlePackage gr不会阻塞在(session)rQ上
if flag {
gxlog.CInfo("%#v <-s.rQ", inPkg)
log.Debug("%#v <-s.rQ", inPkg)
s.listener.OnMessage(s, inPkg)
s.incReadPkgCount()
} else {
......@@ -515,7 +514,6 @@ func (s *session) handlePackage() {
grNum = atomic.AddInt32(&(s.grNum), -1)
log.Info("%s, [session.handlePackage] gr will exit now, left gr num %d", s.sessionToken(), grNum)
s.stop()
// if s.errFlag {
if err != nil {
log.Error("%s, [session.handlePackage] error{%s}", s.sessionToken(), err)
s.listener.OnError(s, err)
......@@ -542,16 +540,16 @@ func (s *session) handlePackage() {
// get package from tcp stream(packet)
func (s *session) handleTCPPackage() error {
var (
ok bool
err error
nerr net.Error
conn *gettyTCPConn
exit bool
bufLen int
pkgLen int
buf []byte
pktBuf *bytes.Buffer
pkg interface{}
ok bool
err error
netError net.Error
conn *gettyTCPConn
exit bool
bufLen int
pkgLen int
buf []byte
pktBuf *bytes.Buffer
pkg interface{}
)
buf = make([]byte, maxReadBufLen)
......@@ -569,7 +567,7 @@ func (s *session) handleTCPPackage() error {
// s.conn.SetReadTimeout(time.Now().Add(s.rTimeout))
bufLen, err = conn.read(buf)
if err != nil {
if nerr, ok = err.(net.Error); ok && nerr.Timeout() {
if netError, ok = err.(net.Error); ok && netError.Timeout() {
break
}
log.Error("%s, [session.conn.read] = error{%s}", s.sessionToken(), err)
......@@ -620,15 +618,15 @@ func (s *session) handleTCPPackage() error {
// get package from udp packet
func (s *session) handleUDPPackage() error {
var (
ok bool
err error
nerr net.Error
conn *gettyUDPConn
bufLen int
buf []byte
addr *net.UDPAddr
pkgLen int
pkg interface{}
ok bool
err error
netError net.Error
conn *gettyUDPConn
bufLen int
buf []byte
addr *net.UDPAddr
pkgLen int
pkg interface{}
)
conn = s.Connection.(*gettyUDPConn)
......@@ -643,8 +641,8 @@ func (s *session) handleUDPPackage() error {
}
bufLen, addr, err = conn.read(buf)
gxlog.CInfo("conn.read() = bufLen:%d, addr:%#v, err:%s", bufLen, pkgLen, err)
if nerr, ok = err.(net.Error); ok && nerr.Timeout() {
log.Debug("conn.read() = bufLen:%d, addr:%#v, err:%s", bufLen, pkgLen, err)
if netError, ok = err.(net.Error); ok && netError.Timeout() {
continue
}
if err != nil {
......@@ -653,7 +651,7 @@ func (s *session) handleUDPPackage() error {
}
pkg, pkgLen, err = s.reader.Read(s, buf[:bufLen])
gxlog.CInfo("s.reader.Read() = pkg:%#v, pkgLen:%d, err:%s", pkg, pkgLen, err)
log.Debug("s.reader.Read() = pkg:%#v, pkgLen:%d, err:%s", pkg, pkgLen, err)
if err == nil && s.maxMsgLen > 0 && bufLen > int(s.maxMsgLen) {
err = ErrMsgTooLong
}
......@@ -673,7 +671,7 @@ func (s *session) handleWSPackage() error {
var (
ok bool
err error
nerr net.Error
netError net.Error
length int
conn *gettyWSConn
pkg []byte
......@@ -686,8 +684,7 @@ func (s *session) handleWSPackage() error {
break
}
pkg, err = conn.read()
if nerr, ok = err.(net.Error); ok && nerr.Timeout() {
if netError, ok = err.(net.Error); ok && netError.Timeout() {
continue
}
if err != nil {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment