Unverified Commit 0895f959 authored by georgehao's avatar georgehao Committed by GitHub

fix timewheel error (#60)

* feat: update github action format

* feat: 修复timewheel相关问题

* feat: del println log
parent fd9b88f3
...@@ -49,8 +49,9 @@ jobs: ...@@ -49,8 +49,9 @@ jobs:
go get -v -t -d ./... go get -v -t -d ./...
fi fi
- name: Go Fmt - name: format
run: go fmt ./... && [[ -z `git status -s` ]] run: |
gofmt -l -w . && [[ -z `git status -s` ]]
- name: License Check - name: License Check
run: | run: |
......
...@@ -179,6 +179,8 @@ func NewTimerWheel() *TimerWheel { ...@@ -179,6 +179,8 @@ func NewTimerWheel() *TimerWheel {
ticker: time.NewTicker(time.Duration(minTickerInterval)), ticker: time.NewTicker(time.Duration(minTickerInterval)),
timerQ: make(chan *timerNodeAction, timerNodeQueueSize), timerQ: make(chan *timerNodeAction, timerNodeQueueSize),
} }
w.enable.Store(true)
w.start = w.clock w.start = w.clock
for i := 0; i < maxTimerLevel; i++ { for i := 0; i < maxTimerLevel; i++ {
...@@ -200,46 +202,42 @@ func NewTimerWheel() *TimerWheel { ...@@ -200,46 +202,42 @@ func NewTimerWheel() *TimerWheel {
if !w.enable.Load() { if !w.enable.Load() {
break LOOP break LOOP
} }
select { select {
case t, cFlag = <-w.ticker.C: case t, cFlag = <-w.ticker.C:
atomic.StoreInt64(&curGxTime, t.UnixNano()) if !cFlag {
if cFlag && 0 != w.number.Load() { break LOOP
ret := w.timerUpdate(t)
if ret == 0 {
w.run()
}
continue
} }
break LOOP atomic.StoreInt64(&curGxTime, t.UnixNano())
ret := w.timerUpdate(t)
if ret == 0 {
w.run()
}
case nodeAction, qFlag = <-w.timerQ: case nodeAction, qFlag = <-w.timerQ:
// just one w.timerQ channel to ensure the exec sequence of timer event. if !qFlag {
if qFlag { break LOOP
switch {
case nodeAction.action == TimerActionAdd:
w.number.Add(1)
w.insertTimerNode(nodeAction.node)
case nodeAction.action == TimerActionDel:
w.number.Add(-1)
w.deleteTimerNode(nodeAction.node)
case nodeAction.action == TimerActionReset:
// log.CInfo("node action:%#v", nodeAction)
w.resetTimerNode(nodeAction.node)
default:
w.number.Add(1)
w.insertTimerNode(nodeAction.node)
}
continue
} }
break LOOP // just one w.timerQ channel to ensure the exec sequence of timer event.
switch {
case nodeAction.action == TimerActionAdd:
w.number.Add(1)
w.insertTimerNode(nodeAction.node)
case nodeAction.action == TimerActionDel:
w.number.Add(-1)
w.deleteTimerNode(nodeAction.node)
case nodeAction.action == TimerActionReset:
// log.CInfo("node action:%#v", nodeAction)
w.resetTimerNode(nodeAction.node)
default:
w.number.Add(1)
w.insertTimerNode(nodeAction.node)
}
} }
} }
log.Printf("the timeWheel runner exit, current timer node num:%d", w.number.Load())
}() }()
w.enable.Store(true)
return w return w
} }
...@@ -262,11 +260,11 @@ func (w *TimerWheel) Now() time.Time { ...@@ -262,11 +260,11 @@ func (w *TimerWheel) Now() time.Time {
func (w *TimerWheel) run() { func (w *TimerWheel) run() {
var ( var (
clock int64 clock int64
err error err error
node *timerNode node *timerNode
slot *list.List slot *list.List
array []*timerNode reinsertNodes []*timerNode
) )
slot = w.slot[0] slot = w.slot[0]
...@@ -280,7 +278,7 @@ func (w *TimerWheel) run() { ...@@ -280,7 +278,7 @@ func (w *TimerWheel) run() {
err = node.timerRun(node.ID, UnixNano2Time(clock), node.arg) err = node.timerRun(node.ID, UnixNano2Time(clock), node.arg)
if err == nil && node.typ == TimerLoop { if err == nil && node.typ == TimerLoop {
array = append(array, node) reinsertNodes = append(reinsertNodes, node)
// w.insertTimerNode(node) // w.insertTimerNode(node)
} else { } else {
w.number.Add(-1) w.number.Add(-1)
...@@ -289,9 +287,10 @@ func (w *TimerWheel) run() { ...@@ -289,9 +287,10 @@ func (w *TimerWheel) run() {
next = e.Next() next = e.Next()
slot.Remove(e) slot.Remove(e)
} }
for idx := range array[:] {
array[idx].trig += array[idx].period for _, reinsertNode := range reinsertNodes {
w.insertTimerNode(array[idx]) reinsertNode.trig += reinsertNode.period
w.insertTimerNode(reinsertNode)
} }
} }
......
...@@ -63,3 +63,10 @@ func TestGetEndTime(t *testing.T) { ...@@ -63,3 +63,10 @@ func TestGetEndTime(t *testing.T) {
yearEndTime := GetEndTime("year") yearEndTime := GetEndTime("year")
t.Logf("this year end time %q", yearEndTime) t.Logf("this year end time %q", yearEndTime)
} }
func TestTimerConsumerGoroutine(t *testing.T) {
InitDefaultTimerWheel()
time.Sleep(time.Second * 2)
println("sleep over")
<-defaultTimerWheel.After(1)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment