Commit c22d4d22 authored by alexstocks's avatar alexstocks Committed by watermelo

Imp: close connection asynchronously in session.gc

parent 4709210b
...@@ -91,7 +91,7 @@ type session struct { ...@@ -91,7 +91,7 @@ type session struct {
// done // done
wait time.Duration wait time.Duration
once sync.Once once *sync.Once
done chan struct{} done chan struct{}
// attribute // attribute
...@@ -115,6 +115,7 @@ func newSession(endPoint EndPoint, conn Connection) *session { ...@@ -115,6 +115,7 @@ func newSession(endPoint EndPoint, conn Connection) *session {
period: period, period: period,
once: &sync.Once{},
done: make(chan struct{}), done: make(chan struct{}),
wait: pendingDuration, wait: pendingDuration,
attrs: gxcontext.NewValuesContext(nil), attrs: gxcontext.NewValuesContext(nil),
...@@ -153,14 +154,15 @@ func newWSSession(conn *websocket.Conn, endPoint EndPoint) Session { ...@@ -153,14 +154,15 @@ func newWSSession(conn *websocket.Conn, endPoint EndPoint) Session {
} }
func (s *session) Reset() { func (s *session) Reset() {
s.name = defaultSessionName *s = session{
s.once = sync.Once{} name: defaultSessionName,
s.done = make(chan struct{}) once: &sync.Once{},
s.period = period done: make(chan struct{}),
s.wait = pendingDuration period: period,
s.attrs = gxcontext.NewValuesContext(nil) wait: pendingDuration,
s.rDone = make(chan struct{}) attrs: gxcontext.NewValuesContext(nil),
s.grNum = 0 rDone: make(chan struct{}),
}
s.SetWriteTimeout(netIOTimeout) s.SetWriteTimeout(netIOTimeout)
s.SetReadTimeout(netIOTimeout) s.SetReadTimeout(netIOTimeout)
...@@ -360,6 +362,10 @@ func (s *session) RemoveAttribute(key interface{}) { ...@@ -360,6 +362,10 @@ func (s *session) RemoveAttribute(key interface{}) {
} }
func (s *session) sessionToken() string { func (s *session) sessionToken() string {
if s.IsClosed() || s.Connection == nil {
return "session-closed"
}
return fmt.Sprintf("{%s:%s:%d:%s<->%s}", return fmt.Sprintf("{%s:%s:%d:%s<->%s}",
s.name, s.EndPoint().EndPointType(), s.ID(), s.LocalAddr(), s.RemoteAddr()) s.name, s.EndPoint().EndPointType(), s.ID(), s.LocalAddr(), s.RemoteAddr())
} }
...@@ -663,8 +669,10 @@ func (s *session) handlePackage() { ...@@ -663,8 +669,10 @@ func (s *session) handlePackage() {
s.stop() s.stop()
if err != nil { if err != nil {
log.Errorf("%s, [session.handlePackage] error:%+v", s.sessionToken(), err) log.Errorf("%s, [session.handlePackage] error:%+v", s.sessionToken(), err)
if s != nil || s.listener != nil {
s.listener.OnError(s, err) s.listener.OnError(s, err)
} }
}
}() }()
if _, ok := s.Connection.(*gettyTCPConn); ok { if _, ok := s.Connection.(*gettyTCPConn); ok {
...@@ -730,12 +738,12 @@ func (s *session) handleTCPPackage() error { ...@@ -730,12 +738,12 @@ func (s *session) handleTCPPackage() error {
if netError, ok = perrors.Cause(err).(net.Error); ok && netError.Timeout() { if netError, ok = perrors.Cause(err).(net.Error); ok && netError.Timeout() {
break break
} }
log.Errorf("%s, [session.conn.read] = error:%+v", s.sessionToken(), err)
if perrors.Cause(err) == io.EOF { if perrors.Cause(err) == io.EOF {
err = nil err = nil
exit = true exit = true
break break
} }
log.Errorf("%s, [session.conn.read] = error:%+v", s.sessionToken(), err)
exit = true exit = true
} }
break break
...@@ -759,7 +767,7 @@ func (s *session) handleTCPPackage() error { ...@@ -759,7 +767,7 @@ func (s *session) handleTCPPackage() error {
// handle case 1 // handle case 1
if err != nil { if err != nil {
log.Warnf("%s, [session.handleTCPPackage] = len{%d}, error:%+v", log.Warnf("%s, [session.handleTCPPackage] = len{%d}, error:%+v",
s.sessionToken(), pkgLen, err) s.sessionToken(), pkgLen, perrors.WithStack(err))
exit = true exit = true
break break
} }
...@@ -877,7 +885,7 @@ func (s *session) handleWSPackage() error { ...@@ -877,7 +885,7 @@ func (s *session) handleWSPackage() error {
} }
if err != nil { if err != nil {
log.Warnf("%s, [session.handleWSPackage] = error{%+s}", log.Warnf("%s, [session.handleWSPackage] = error{%+s}",
s.sessionToken(), err) s.sessionToken(), perrors.WithStack(err))
return perrors.WithStack(err) return perrors.WithStack(err)
} }
s.UpdateActive() s.UpdateActive()
...@@ -924,16 +932,28 @@ func (s *session) stop() { ...@@ -924,16 +932,28 @@ func (s *session) stop() {
} }
func (s *session) gc() { func (s *session) gc() {
var (
wQ chan interface{}
conn Connection
)
s.lock.Lock() s.lock.Lock()
if s.attrs != nil { if s.attrs != nil {
s.attrs = nil s.attrs = nil
if s.wQ != nil { if s.wQ != nil {
close(s.wQ) wQ = s.wQ
s.wQ = nil s.wQ = nil
} }
s.Connection.close((int)((int64)(s.wait))) conn = s.Connection
} }
s.lock.Unlock() s.lock.Unlock()
go func() {
if wQ != nil {
conn.close((int)((int64)(s.wait)))
close(wQ)
}
}()
} }
// Close will be invoked by NewSessionCallback(if return error is not nil) // Close will be invoked by NewSessionCallback(if return error is not nil)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment