Commit 5f47c4d9 authored by pantianying's avatar pantianying Committed by watermelo

change grNum to chan

parent 293a60d4
......@@ -86,9 +86,10 @@ type session struct {
// attribute
attrs *ValuesContext
// goroutines sync
grNum int32
lock sync.RWMutex
// goroutines done signal
handleLoopDone chan struct{}
handlePackageDone chan struct{}
lock sync.RWMutex
}
func newSession(endPoint EndPoint, conn Connection) *session {
......@@ -102,9 +103,11 @@ func newSession(endPoint EndPoint, conn Connection) *session {
period: period,
done: make(chan struct{}),
wait: pendingDuration,
attrs: NewValuesContext(nil),
done: make(chan struct{}),
wait: pendingDuration,
attrs: NewValuesContext(nil),
handleLoopDone: make(chan struct{}),
handlePackageDone: make(chan struct{}),
}
ss.Connection.setSession(ss)
......@@ -145,7 +148,8 @@ func (s *session) Reset() {
s.period = period
s.wait = pendingDuration
s.attrs = NewValuesContext(nil)
s.grNum = 0
s.handleLoopDone = make(chan struct{})
s.handlePackageDone = make(chan struct{})
s.SetWriteTimeout(netIOTimeout)
s.SetReadTimeout(netIOTimeout)
......@@ -470,7 +474,6 @@ func (s *session) run() {
}
// start read/write gr
atomic.AddInt32(&(s.grNum), 2)
go s.handleLoop()
go s.handlePackage()
}
......@@ -496,7 +499,7 @@ func (s *session) handleLoop() {
log.Errorf("[session.handleLoop] panic session %s: err=%s\n%s", s.sessionToken(), r, rBuf)
}
grNum = atomic.AddInt32(&(s.grNum), -1)
close(s.handleLoopDone)
s.listener.OnClose(s)
log.Info("%s, [session.handleLoop] goroutine exit now, left gr num %d", s.Stat(), grNum)
s.gc()
......@@ -511,19 +514,17 @@ LOOP:
select {
case <-s.done:
// this case branch assure the (session)handleLoop gr will exit before (session)handlePackage gr.
if atomic.LoadInt32(&(s.grNum)) == 1 { // make sure @(session)handlePackage goroutine has been closed.
if len(s.wQ) == 0 {
log.Infof("%s, [session.handleLoop] got done signal. wQ is nil.", s.Stat())
break LOOP
}
counter.Start()
// if time.Since(start).Nanoseconds() >= s.wait.Nanoseconds() {
if counter.Count() > s.wait.Nanoseconds() {
log.Infof("%s, [session.handleLoop] got done signal ", s.Stat())
break LOOP
}
<-s.handlePackageDone
if len(s.wQ) == 0 {
log.Infof("%s, [session.handleLoop] got done signal. wQ is nil.", s.Stat())
break LOOP
}
counter.Start()
// if time.Since(start).Nanoseconds() >= s.wait.Nanoseconds() {
if counter.Count() > s.wait.Nanoseconds() {
log.Infof("%s, [session.handleLoop] got done signal ", s.Stat())
break LOOP
}
case outPkg = <-s.wQ:
if flag {
if err = s.writer.Write(s, outPkg); err != nil {
......@@ -577,7 +578,7 @@ func (s *session) handlePackage() {
log.Errorf("[session.handlePackage] panic session %s: err=%s\n%s", s.sessionToken(), r, rBuf)
}
grNum = atomic.AddInt32(&(s.grNum), -1)
close(s.handlePackageDone)
log.Infof("%s, [session.handlePackage] gr will exit now, left gr num %d", s.sessionToken(), grNum)
s.stop()
if err != nil {
......@@ -856,5 +857,5 @@ func (s *session) gc() {
func (s *session) Close() {
s.stop()
log.Info("%s closed now. its current gr num is %d",
s.sessionToken(), atomic.LoadInt32(&(s.grNum)))
s.sessionToken())
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment