Commit 6d88c0f0 authored by xujianhai666's avatar xujianhai666

add queue

parent bf2907b3
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package container
import (
"errors"
"sync"
)
// minQueueLen is smallest capacity that queue may have.
// Must be power of 2 for bitwise modulus: x % n == x & (n - 1).
const (
defaultQueueLen = 16
)
var (
ErrEmpty = errors.New("")
)
// Queue represents a single instance of the queue data structure.
type Queue struct {
buf []interface{}
head, tail, count int
lock sync.Mutex
}
// New constructs and returns a new Queue.
func NewQueue() *Queue {
return &Queue{
buf: make([]interface{}, defaultQueueLen),
}
}
// Length returns the number of elements currently stored in the queue.
func (q *Queue) Length() int {
q.lock.Lock()
defer q.lock.Unlock()
return q.count
}
// resizes the queue to fit exactly twice its current contents
// this can result in shrinking if the queue is less than half-full
func (q *Queue) resize() {
newBuf := make([]interface{}, q.count<<1)
if q.tail > q.head {
copy(newBuf, q.buf[q.head:q.tail])
} else {
n := copy(newBuf, q.buf[q.head:])
copy(newBuf[n:], q.buf[:q.tail])
}
q.head = 0
q.tail = q.count
q.buf = newBuf
}
// Add puts an element on the end of the queue.
func (q *Queue) Add(elem interface{}) {
q.lock.Lock()
defer q.lock.Unlock()
if q.count == len(q.buf) {
q.resize()
}
q.buf[q.tail] = elem
q.tail = (q.tail + 1) & (len(q.buf) - 1)
q.count++
}
// Peek returns the element at the head of the queue. return ErrEmpty
// if the queue is empty.
func (q *Queue) Peek() (interface{}, error) {
q.lock.Lock()
defer q.lock.Unlock()
if q.count <= 0 {
return nil, ErrEmpty
}
return q.buf[q.head], nil
}
// Remove removes element from the front of queue. If the
// queue is empty, return ErrEmpty
func (q *Queue) Remove() (interface{}, error) {
q.lock.Lock()
defer q.lock.Unlock()
if q.count <= 0 {
return nil, ErrEmpty
}
ret := q.buf[q.head]
q.buf[q.head] = nil
q.head = (q.head + 1) & (len(q.buf) - 1)
q.count--
// Resize down if buffer 1/4 full.
if len(q.buf) > defaultQueueLen && (q.count<<2) == len(q.buf) {
q.resize()
}
return ret, nil
}
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package container
import (
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
func TestQueueSimple(t *testing.T) {
q := NewQueue()
for i := 0; i < defaultQueueLen; i++ {
q.Add(i)
}
for i := 0; i < defaultQueueLen; i++ {
v, _ := q.Peek()
assert.Equal(t, i, v.(int))
x, _ := q.Remove()
assert.Equal(t, i, x)
}
}
func TestQueueWrapping(t *testing.T) {
q := NewQueue()
for i := 0; i < defaultQueueLen; i++ {
q.Add(i)
}
for i := 0; i < 3; i++ {
q.Remove()
q.Add(defaultQueueLen + i)
}
for i := 0; i < defaultQueueLen; i++ {
v, _ := q.Peek()
assert.Equal(t, i+3, v.(int))
q.Remove()
}
}
func TestQueueLength(t *testing.T) {
q := NewQueue()
assert.Equal(t, 0, q.Length(), "empty queue length should be 0")
for i := 0; i < 1000; i++ {
q.Add(i)
assert.Equal(t, i+1, q.Length())
}
for i := 0; i < 1000; i++ {
q.Remove()
assert.Equal(t, 1000-i-1, q.Length())
}
}
func TestQueuePeekOutOfRangePanics(t *testing.T) {
q := NewQueue()
_, err := q.Peek()
assert.Equal(t, ErrEmpty, err)
q.Add(1)
_, err = q.Remove()
assert.Nil(t, err)
_, err = q.Peek()
assert.Equal(t, ErrEmpty, err)
}
func TestQueueRemoveOutOfRangePanics(t *testing.T) {
q := NewQueue()
_, err := q.Remove()
assert.Equal(t, ErrEmpty, err)
q.Add(1)
_, err = q.Remove()
assert.Nil(t, err)
_, err = q.Remove()
assert.Equal(t, ErrEmpty, err)
}
// General warning: Go's benchmark utility (go test -bench .) increases the number of
// iterations until the benchmarks take a reasonable amount of time to run; memory usage
// is *NOT* considered. On my machine, these benchmarks hit around ~1GB before they've had
// enough, but if you have less than that available and start swapping, then all bets are off.
func BenchmarkQueueSerial(b *testing.B) {
q := NewQueue()
for i := 0; i < b.N; i++ {
q.Add(nil)
}
for i := 0; i < b.N; i++ {
q.Peek()
q.Remove()
}
}
func BenchmarkQueueTickTock(b *testing.B) {
q := NewQueue()
for i := 0; i < b.N; i++ {
q.Add(nil)
q.Peek()
q.Remove()
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment