Commit 6b4adbd5 authored by AlexStocks's avatar AlexStocks

Imp: remote go client reconnection goroutine by reconnect method in Session:Stop

parent 724bc6d7
...@@ -36,6 +36,7 @@ const ( ...@@ -36,6 +36,7 @@ const (
) )
var ( var (
sessionClientKey = "session-client-owner"
connectPingPackage = []byte("connect-ping") connectPingPackage = []byte("connect-ping")
) )
...@@ -363,6 +364,7 @@ func (c *client) connect() { ...@@ -363,6 +364,7 @@ func (c *client) connect() {
c.Lock() c.Lock()
c.ssMap[ss] = gxsync.Empty{} c.ssMap[ss] = gxsync.Empty{}
c.Unlock() c.Unlock()
ss.SetAttribute(sessionClientKey, c)
break break
} }
// don't distinguish between tcp connection and websocket connection. Because // don't distinguish between tcp connection and websocket connection. Because
...@@ -382,17 +384,16 @@ func (c *client) RunEventLoop(newSession NewSessionCallback) { ...@@ -382,17 +384,16 @@ func (c *client) RunEventLoop(newSession NewSessionCallback) {
c.Lock() c.Lock()
c.newSession = newSession c.newSession = newSession
c.Unlock() c.Unlock()
c.reConnect()
}
c.wg.Add(1) // a for-loop connect to make sure the connection pool is valid
// a for-loop goroutine to make sure the connection is valid func (c *client) reConnect() {
go func() {
var num, max, times int var num, max, times int
defer c.wg.Done()
c.Lock() // c.Lock()
max = c.number max = c.number
c.Unlock() // c.Unlock()
// log.Info("maximum client connection number:%d", max)
for { for {
if c.IsClosed() { if c.IsClosed() {
log.Warn("client{peer:%s} goroutine exit now.", c.addr) log.Warn("client{peer:%s} goroutine exit now.", c.addr)
...@@ -400,24 +401,17 @@ func (c *client) RunEventLoop(newSession NewSessionCallback) { ...@@ -400,24 +401,17 @@ func (c *client) RunEventLoop(newSession NewSessionCallback) {
} }
num = c.sessionNum() num = c.sessionNum()
// log.Info("current client connction number:%d", num)
if max <= num { if max <= num {
break
}
c.connect()
times++ times++
if maxTimes < times { if maxTimes < times {
times = maxTimes times = maxTimes
} }
// time.Sleep(time.Duration(int64(times) * connInterval)) // time.Sleep(time.Duration(int64(times) * connInterval))
<-wheel.After(time.Duration(int64(times) * connInterval)) <-wheel.After(time.Duration(int64(times) * connInterval))
continue
}
times = 0
c.connect()
//if c.endPointType == UDP_CLIENT {
// break
//}
// time.Sleep(c.interval) // build c.number connections asap
} }
}()
} }
func (c *client) stop() { func (c *client) stop() {
...@@ -429,6 +423,7 @@ func (c *client) stop() { ...@@ -429,6 +423,7 @@ func (c *client) stop() {
close(c.done) close(c.done)
c.Lock() c.Lock()
for s := range c.ssMap { for s := range c.ssMap {
s.RemoveAttribute(sessionClientKey)
s.Close() s.Close()
} }
c.ssMap = nil c.ssMap = nil
......
Subproject commit 0e6d40895587d9eb6d7c5b2ddd6bd7e1f008d0da Subproject commit abe816f76766902e9d3b1ab40962890f50fddddd
...@@ -774,6 +774,10 @@ func (s *session) stop() { ...@@ -774,6 +774,10 @@ func (s *session) stop() {
conn.SetWriteDeadline(now.Add(s.writeTimeout())) conn.SetWriteDeadline(now.Add(s.writeTimeout()))
} }
close(s.done) close(s.done)
c := s.GetAttribute(sessionClientKey)
if clt, ok := c.(*client); ok {
clt.reConnect()
}
}) })
} }
} }
...@@ -791,9 +795,9 @@ func (s *session) gc() { ...@@ -791,9 +795,9 @@ func (s *session) gc() {
s.lock.Unlock() s.lock.Unlock()
} }
// Close will be invoked by NewSessionCallback(if return error is not nil) or (session)handleLoop automatically. // Close will be invoked by NewSessionCallback(if return error is not nil)
// It's thread safe. // or (session)handleLoop automatically. It's thread safe.
func (s *session) Close() { func (s *session) Close() {
s.stop() s.stop()
log.Info("%s closed now, its current gr num %d", s.sessionToken(), atomic.LoadInt32(&(s.grNum))) log.Info("%s closed now. its current gr num is %d", s.sessionToken(), atomic.LoadInt32(&(s.grNum)))
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment