Commit a4a37fec authored by AlexStocks's avatar AlexStocks

Imp: add SetTaskPool for session interface

parent 3c7f2913
...@@ -154,6 +154,7 @@ type Session interface { ...@@ -154,6 +154,7 @@ type Session interface {
SetRQLen(int) SetRQLen(int)
SetWQLen(int) SetWQLen(int)
SetWaitTime(time.Duration) SetWaitTime(time.Duration)
SetTaskPool(*TaskPool)
GetAttribute(interface{}) interface{} GetAttribute(interface{}) interface{}
SetAttribute(interface{}, interface{}) SetAttribute(interface{}, interface{})
......
...@@ -647,8 +647,6 @@ func (s *session) handleTCPPackage() error { ...@@ -647,8 +647,6 @@ func (s *session) handleTCPPackage() error {
break break
} }
log.Errorf("%s, [session.conn.read] = error:%+v", s.sessionToken(), err) log.Errorf("%s, [session.conn.read] = error:%+v", s.sessionToken(), err)
// for (Codec)OnErr
// s.errFlag = true
exit = true exit = true
} }
break break
...@@ -787,7 +785,6 @@ func (s *session) handleWSPackage() error { ...@@ -787,7 +785,6 @@ func (s *session) handleWSPackage() error {
if err != nil { if err != nil {
log.Warnf("%s, [session.handleWSPackage] = error{%+s}", log.Warnf("%s, [session.handleWSPackage] = error{%+s}",
s.sessionToken(), err) s.sessionToken(), err)
// s.errFlag = true
return perrors.WithStack(err) return perrors.WithStack(err)
} }
s.UpdateActive() s.UpdateActive()
......
...@@ -29,12 +29,17 @@ type TaskPool struct { ...@@ -29,12 +29,17 @@ type TaskPool struct {
} }
// build a task pool // build a task pool
func newTaskPool(opts TaskPoolOptions) *TaskPool { func NewTaskPool(opts ...TaskPoolOption) *TaskPool {
opts.validate() var tOpts TaskPoolOptions
for _, opt := range opts {
opt(&tOpts)
}
tOpts.validate()
p := &TaskPool{ p := &TaskPool{
TaskPoolOptions: opts, TaskPoolOptions: tOpts,
qArray: make([]chan task, opts.tQNumber), qArray: make([]chan task, tOpts.tQNumber),
done: make(chan struct{}), done: make(chan struct{}),
} }
...@@ -107,7 +112,7 @@ func (p *TaskPool) stop() { ...@@ -107,7 +112,7 @@ func (p *TaskPool) stop() {
} }
// check whether the session has been closed. // check whether the session has been closed.
func (p *TaskPool) isClosed() bool { func (p *TaskPool) IsClosed() bool {
select { select {
case <-p.done: case <-p.done:
return true return true
...@@ -117,7 +122,7 @@ func (p *TaskPool) isClosed() bool { ...@@ -117,7 +122,7 @@ func (p *TaskPool) isClosed() bool {
} }
} }
func (p *TaskPool) close() { func (p *TaskPool) Close() {
p.stop() p.stop()
p.wg.Wait() p.wg.Wait()
for i := range p.qArray { for i := range p.qArray {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment