Unverified Commit 0c7e1c6a authored by 望哥's avatar 望哥 Committed by GitHub

Merge pull request #20 from fangyincheng/del-rQ

Mod: don't use read queue
parents c5e0d906 ef0bddd6
...@@ -154,7 +154,10 @@ type Session interface { ...@@ -154,7 +154,10 @@ type Session interface {
SetReader(Reader) SetReader(Reader)
SetWriter(Writer) SetWriter(Writer)
SetCronPeriod(int) SetCronPeriod(int)
// Deprecated: don't use read queue.
SetRQLen(int) SetRQLen(int)
SetWQLen(int) SetWQLen(int)
SetWaitTime(time.Duration) SetWaitTime(time.Duration)
SetTaskPool(*gxsync.TaskPool) SetTaskPool(*gxsync.TaskPool)
......
...@@ -66,8 +66,7 @@ type session struct { ...@@ -66,8 +66,7 @@ type session struct {
reader Reader // @reader should be nil when @conn is a gettyWSConn object. reader Reader // @reader should be nil when @conn is a gettyWSConn object.
writer Writer writer Writer
// read & write // write
rQ chan interface{}
wQ chan interface{} wQ chan interface{}
// handle logic // handle logic
...@@ -275,17 +274,8 @@ func (s *session) SetCronPeriod(period int) { ...@@ -275,17 +274,8 @@ func (s *session) SetCronPeriod(period int) {
s.period = time.Duration(period) * time.Millisecond s.period = time.Duration(period) * time.Millisecond
} }
// set @session's read queue size // Deprecated: don't use read queue.
func (s *session) SetRQLen(readQLen int) { func (s *session) SetRQLen(readQLen int) {}
if readQLen < 1 {
panic("@readQLen < 1")
}
s.lock.Lock()
defer s.lock.Unlock()
s.rQ = make(chan interface{}, readQLen)
log.Debug("%s, [session.SetRQLen] rQ{len:%d, cap:%d}", s.Stat(), len(s.rQ), cap(s.rQ))
}
// set @session's Write queue size // set @session's Write queue size
func (s *session) SetWQLen(writeQLen int) { func (s *session) SetWQLen(writeQLen int) {
...@@ -466,10 +456,6 @@ func (s *session) run() { ...@@ -466,10 +456,6 @@ func (s *session) run() {
s.wQ = make(chan interface{}, defaultQLen) s.wQ = make(chan interface{}, defaultQLen)
} }
if s.rQ == nil && s.tPool == nil {
s.rQ = make(chan interface{}, defaultQLen)
}
// call session opened // call session opened
s.UpdateActive() s.UpdateActive()
if err := s.listener.OnOpen(s); err != nil { if err := s.listener.OnOpen(s); err != nil {
...@@ -492,7 +478,6 @@ func (s *session) handleLoop() { ...@@ -492,7 +478,6 @@ func (s *session) handleLoop() {
wsConn *gettyWSConn wsConn *gettyWSConn
// start time.Time // start time.Time
counter gxtime.CountWatch counter gxtime.CountWatch
inPkg interface{}
outPkg interface{} outPkg interface{}
) )
...@@ -522,8 +507,8 @@ LOOP: ...@@ -522,8 +507,8 @@ LOOP:
case <-s.done: case <-s.done:
// this case branch assure the (session)handleLoop gr will exit before (session)handlePackage gr. // this case branch assure the (session)handleLoop gr will exit before (session)handlePackage gr.
if atomic.LoadInt32(&(s.grNum)) == 1 { // make sure @(session)handlePackage goroutine has been closed. if atomic.LoadInt32(&(s.grNum)) == 1 { // make sure @(session)handlePackage goroutine has been closed.
if len(s.rQ) == 0 && len(s.wQ) == 0 { if len(s.wQ) == 0 {
log.Infof("%s, [session.handleLoop] got done signal. Both rQ and wQ are nil.", s.Stat()) log.Infof("%s, [session.handleLoop] got done signal. wQ is nil.", s.Stat())
break LOOP break LOOP
} }
counter.Start() counter.Start()
...@@ -534,18 +519,6 @@ LOOP: ...@@ -534,18 +519,6 @@ LOOP:
} }
} }
case inPkg = <-s.rQ:
// read the s.rQ and assure (session)handlePackage gr will not block by (session)rQ.
if flag {
log.Debugf("%#v <-s.rQ", inPkg)
pkg := inPkg
// go s.listener.OnMessage(s, pkg)
s.listener.OnMessage(s, pkg)
s.incReadPkgNum()
} else {
log.Infof("[session.handleLoop] drop readin package{%#v}", inPkg)
}
case outPkg = <-s.wQ: case outPkg = <-s.wQ:
if flag { if flag {
if err = s.writer.Write(s, outPkg); err != nil { if err = s.writer.Write(s, outPkg); err != nil {
...@@ -573,14 +546,15 @@ LOOP: ...@@ -573,14 +546,15 @@ LOOP:
} }
func (s *session) addTask(pkg interface{}) { func (s *session) addTask(pkg interface{}) {
f := func() {
s.listener.OnMessage(s, pkg)
s.incReadPkgNum()
}
if s.tPool != nil { if s.tPool != nil {
s.tPool.AddTask(func() { s.tPool.AddTask(f)
s.listener.OnMessage(s, pkg) return
s.incReadPkgNum()
})
} else {
s.rQ <- pkg
} }
f()
} }
func (s *session) handlePackage() { func (s *session) handlePackage() {
...@@ -853,10 +827,6 @@ func (s *session) gc() { ...@@ -853,10 +827,6 @@ func (s *session) gc() {
close(s.wQ) close(s.wQ)
s.wQ = nil s.wQ = nil
} }
if s.rQ != nil {
close(s.rQ)
s.rQ = nil
}
s.Connection.close((int)((int64)(s.wait))) s.Connection.close((int)((int64)(s.wait)))
} }
s.lock.Unlock() s.lock.Unlock()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment