Commit 7c9883a3 authored by xujianhai666's avatar xujianhai666

use disposed int32

parent ee6d7d0e
...@@ -140,7 +140,7 @@ type Queue struct { ...@@ -140,7 +140,7 @@ type Queue struct {
waiters waiters waiters waiters
items items items items
lock sync.Mutex lock sync.Mutex
disposed bool disposed int32
} }
// New is a constructor for a new threadsafe queue. // New is a constructor for a new threadsafe queue.
...@@ -159,7 +159,7 @@ func (q *Queue) Put(items ...interface{}) error { ...@@ -159,7 +159,7 @@ func (q *Queue) Put(items ...interface{}) error {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
if q.disposed { if atomic.LoadInt32(&q.disposed) == 1 {
return ErrDisposed return ErrDisposed
} }
...@@ -205,7 +205,7 @@ func (q *Queue) Poll(number int64, timeout time.Duration) ([]interface{}, error) ...@@ -205,7 +205,7 @@ func (q *Queue) Poll(number int64, timeout time.Duration) ([]interface{}, error)
q.lock.Lock() q.lock.Lock()
if q.disposed { if atomic.LoadInt32(&q.disposed) == 1 {
q.lock.Unlock() q.lock.Unlock()
return nil, ErrDisposed return nil, ErrDisposed
} }
...@@ -224,7 +224,7 @@ func (q *Queue) Poll(number int64, timeout time.Duration) ([]interface{}, error) ...@@ -224,7 +224,7 @@ func (q *Queue) Poll(number int64, timeout time.Duration) ([]interface{}, error)
select { select {
case <-sema.ready: case <-sema.ready:
// we are now inside the put's lock // we are now inside the put's lock
if q.disposed { if atomic.LoadInt32(&q.disposed) == 1 {
return nil, ErrDisposed return nil, ErrDisposed
} }
items = q.items.get(number) items = q.items.get(number)
...@@ -258,7 +258,7 @@ func (q *Queue) Peek() (interface{}, error) { ...@@ -258,7 +258,7 @@ func (q *Queue) Peek() (interface{}, error) {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
if q.disposed { if atomic.LoadInt32(&q.disposed) == 1 {
return nil, ErrDisposed return nil, ErrDisposed
} }
...@@ -270,17 +270,17 @@ func (q *Queue) Peek() (interface{}, error) { ...@@ -270,17 +270,17 @@ func (q *Queue) Peek() (interface{}, error) {
return peekItem, nil return peekItem, nil
} }
// TakeUntil takes a function and returns a list of items that // GetUntil gets a function and returns a list of items that
// match the checker until the checker returns false. This does not // match the checker until the checker returns false. This does not
// wait if there are no items in the queue. // wait if there are no items in the queue.
func (q *Queue) TakeUntil(checker func(item interface{}) bool) ([]interface{}, error) { func (q *Queue) GetUntil(checker func(item interface{}) bool) ([]interface{}, error) {
if checker == nil { if checker == nil {
return nil, nil return nil, nil
} }
q.lock.Lock() q.lock.Lock()
if q.disposed { if atomic.LoadInt32(&q.disposed) == 1 {
q.lock.Unlock() q.lock.Unlock()
return nil, ErrDisposed return nil, ErrDisposed
} }
...@@ -312,7 +312,7 @@ func (q *Queue) Disposed() bool { ...@@ -312,7 +312,7 @@ func (q *Queue) Disposed() bool {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
return q.disposed return atomic.LoadInt32(&q.disposed) == 1
} }
// Dispose will dispose of this queue and returns // Dispose will dispose of this queue and returns
...@@ -322,7 +322,7 @@ func (q *Queue) Dispose() []interface{} { ...@@ -322,7 +322,7 @@ func (q *Queue) Dispose() []interface{} {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
q.disposed = true atomic.StoreInt32(&q.disposed, 1)
for _, waiter := range q.waiters { for _, waiter := range q.waiters {
waiter.response.Add(1) waiter.response.Add(1)
select { select {
...@@ -352,9 +352,9 @@ func ExecuteInParallel(q *Queue, fn func(interface{})) { ...@@ -352,9 +352,9 @@ func ExecuteInParallel(q *Queue, fn func(interface{})) {
q.lock.Lock() // so no one touches anything in the middle q.lock.Lock() // so no one touches anything in the middle
// of this process // of this process
todo, done := uint64(len(q.items)), int64(-1) length, count := uint64(len(q.items)), int64(-1)
// this is important or we might face an infinite loop // this is important or we might face an infinite loop
if todo == 0 { if length == 0 {
return return
} }
...@@ -370,8 +370,8 @@ func ExecuteInParallel(q *Queue, fn func(interface{})) { ...@@ -370,8 +370,8 @@ func ExecuteInParallel(q *Queue, fn func(interface{})) {
for i := 0; i < numCPU; i++ { for i := 0; i < numCPU; i++ {
go func() { go func() {
for { for {
index := atomic.AddInt64(&done, 1) index := atomic.AddInt64(&count, 1)
if index >= int64(todo) { if index >= int64(length) {
wg.Done() wg.Done()
break break
} }
......
...@@ -181,6 +181,7 @@ func TestGetEmpty(t *testing.T) { ...@@ -181,6 +181,7 @@ func TestGetEmpty(t *testing.T) {
q := New(10) q := New(10)
go func() { go func() {
time.Sleep(time.Second)
q.Put(`a`) q.Put(`a`)
}() }()
...@@ -223,7 +224,7 @@ func TestMultipleGetEmpty(t *testing.T) { ...@@ -223,7 +224,7 @@ func TestMultipleGetEmpty(t *testing.T) {
if assert.Len(t, results[0], 1) && assert.Len(t, results[1], 1) { if assert.Len(t, results[0], 1) && assert.Len(t, results[1], 1) {
assert.True(t, (results[0][0] == `a` && results[1][0] == `b`) || assert.True(t, (results[0][0] == `a` && results[1][0] == `b`) ||
(results[0][0] == `b` && results[1][0] == `a`), (results[0][0] == `b` && results[1][0] == `a`),
`The array should be a, b or b, a`) `The array should be a, b or b, a`)
} }
} }
...@@ -370,10 +371,10 @@ func TestPeekOnDisposedQueue(t *testing.T) { ...@@ -370,10 +371,10 @@ func TestPeekOnDisposedQueue(t *testing.T) {
assert.IsType(t, ErrDisposed, err) assert.IsType(t, ErrDisposed, err)
} }
func TestTakeUntil(t *testing.T) { func TestGetUntil(t *testing.T) {
q := New(10) q := New(10)
q.Put(`a`, `b`, `c`) q.Put(`a`, `b`, `c`)
result, err := q.TakeUntil(func(item interface{}) bool { result, err := q.GetUntil(func(item interface{}) bool {
return item != `c` return item != `c`
}) })
...@@ -385,9 +386,9 @@ func TestTakeUntil(t *testing.T) { ...@@ -385,9 +386,9 @@ func TestTakeUntil(t *testing.T) {
assert.Equal(t, expected, result) assert.Equal(t, expected, result)
} }
func TestTakeUntilEmptyQueue(t *testing.T) { func TestGetUntilEmptyQueue(t *testing.T) {
q := New(10) q := New(10)
result, err := q.TakeUntil(func(item interface{}) bool { result, err := q.GetUntil(func(item interface{}) bool {
return item != `c` return item != `c`
}) })
...@@ -399,10 +400,10 @@ func TestTakeUntilEmptyQueue(t *testing.T) { ...@@ -399,10 +400,10 @@ func TestTakeUntilEmptyQueue(t *testing.T) {
assert.Equal(t, expected, result) assert.Equal(t, expected, result)
} }
func TestTakeUntilThenGet(t *testing.T) { func TestGetUntilThenGet(t *testing.T) {
q := New(10) q := New(10)
q.Put(`a`, `b`, `c`) q.Put(`a`, `b`, `c`)
takeItems, _ := q.TakeUntil(func(item interface{}) bool { takeItems, _ := q.GetUntil(func(item interface{}) bool {
return item != `c` return item != `c`
}) })
...@@ -411,10 +412,10 @@ func TestTakeUntilThenGet(t *testing.T) { ...@@ -411,10 +412,10 @@ func TestTakeUntilThenGet(t *testing.T) {
assert.Equal(t, []interface{}{`c`}, restItems) assert.Equal(t, []interface{}{`c`}, restItems)
} }
func TestTakeUntilNoMatches(t *testing.T) { func TestGetUntilNoMatches(t *testing.T) {
q := New(10) q := New(10)
q.Put(`a`, `b`, `c`) q.Put(`a`, `b`, `c`)
takeItems, _ := q.TakeUntil(func(item interface{}) bool { takeItems, _ := q.GetUntil(func(item interface{}) bool {
return item != `a` return item != `a`
}) })
...@@ -423,10 +424,10 @@ func TestTakeUntilNoMatches(t *testing.T) { ...@@ -423,10 +424,10 @@ func TestTakeUntilNoMatches(t *testing.T) {
assert.Equal(t, []interface{}{`a`, `b`, `c`}, restItems) assert.Equal(t, []interface{}{`a`, `b`, `c`}, restItems)
} }
func TestTakeUntilOnDisposedQueue(t *testing.T) { func TestGetUntilOnDisposedQueue(t *testing.T) {
q := New(10) q := New(10)
q.Dispose() q.Dispose()
result, err := q.TakeUntil(func(item interface{}) bool { result, err := q.GetUntil(func(item interface{}) bool {
return true return true
}) })
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment