Unverified Commit c0e1af7c authored by XavierNiu's avatar XavierNiu Committed by GitHub

Ftr: Unbounded chan quota (#65)

* add quota for unbounded queue and chan

* unittests for UnboundedChan quota

* go fmt

* fix data race

* update comments

* fix Len()

* go fmt

* fix Cap() data race

* fix typo

* update comments

* encapsulate queue operations

* replace uber atomic

* go fmt

* unittests

* fix border cases
parent f45e1075
...@@ -18,23 +18,71 @@ ...@@ -18,23 +18,71 @@
package gxchan package gxchan
import ( import (
"go.uber.org/atomic"
)
import (
"github.com/dubbogo/gost/container/queue" "github.com/dubbogo/gost/container/queue"
) )
// UnboundedChan is a chan that could grow if the number of elements exceeds the capacity. // UnboundedChan is a chan that could grow if the number of elements exceeds the capacity.
type UnboundedChan struct { type UnboundedChan struct {
in chan interface{} in chan interface{}
out chan interface{} out chan interface{}
queue *gxqueue.CircularUnboundedQueue queue *gxqueue.CircularUnboundedQueue
queueLen *atomic.Int32
queueCap *atomic.Int32
} }
// NewUnboundedChan creates an instance of UnboundedChan. // NewUnboundedChan creates an instance of UnboundedChan.
func NewUnboundedChan(capacity int) *UnboundedChan { func NewUnboundedChan(capacity int) *UnboundedChan {
return NewUnboundedChanWithQuota(capacity, 0)
}
func NewUnboundedChanWithQuota(capacity, quota int) *UnboundedChan {
if capacity <= 0 {
panic("capacity should be greater than 0")
}
if quota < 0 {
panic("quota should be greater or equal to 0")
}
if quota != 0 && capacity > quota {
capacity = quota
}
var (
incap = capacity / 3
outcap = capacity / 3
qcap = capacity - 2*(capacity/3)
qquota = quota - 2*(capacity/3)
)
if capacity/3 > 0 {
incap--
} else {
qcap--
qquota--
}
// address quota if the value is not valid
if quota == 0 { // quota == 0 means no limits for queue
qquota = 0
} else { // quota != 0 means chan couldn't grow unlimitedly
if qquota == 0 {
// qquota == 0 means queue could grow unlimitedly
// in this case, the total quota will be set to quota+1
qquota = 1
}
}
ch := &UnboundedChan{ ch := &UnboundedChan{
in: make(chan interface{}, capacity/3), in: make(chan interface{}, incap),
out: make(chan interface{}, capacity/3), out: make(chan interface{}, outcap),
queue: gxqueue.NewCircularUnboundedQueue(capacity - 2*(capacity/3)), queue: gxqueue.NewCircularUnboundedQueueWithQuota(qcap, qquota),
queueLen: &atomic.Int32{},
queueCap: &atomic.Int32{},
} }
ch.queueCap.Store(int32(ch.queue.Cap()))
go ch.run() go ch.run()
...@@ -51,8 +99,14 @@ func (ch *UnboundedChan) Out() <-chan interface{} { ...@@ -51,8 +99,14 @@ func (ch *UnboundedChan) Out() <-chan interface{} {
return ch.out return ch.out
} }
// Len returns the total length of chan
func (ch *UnboundedChan) Len() int { func (ch *UnboundedChan) Len() int {
return len(ch.in) + len(ch.out) + ch.queue.Len() return len(ch.in) + len(ch.out) + int(ch.queueLen.Load())
}
// Cap returns the total capacity of chan.
func (ch *UnboundedChan) Cap() int {
return cap(ch.in) + cap(ch.out) + int(ch.queueCap.Load()) + 1
} }
func (ch *UnboundedChan) run() { func (ch *UnboundedChan) run() {
...@@ -62,44 +116,81 @@ func (ch *UnboundedChan) run() { ...@@ -62,44 +116,81 @@ func (ch *UnboundedChan) run() {
for { for {
val, ok := <-ch.in val, ok := <-ch.in
if !ok { if !ok { // `ch.in` was closed and queue has no elements
// `ch.in` was closed and queue has no elements
return return
} }
select { select {
// data was written to `ch.out` case ch.out <- val: // data was written to `ch.out`
case ch.out <- val:
continue continue
// `ch.out` is full, move the data to `ch.queue` default: // `ch.out` is full, move the data to `ch.queue`
default: if ok := ch.queuePush(val); !ok {
ch.queue.Push(val) ch.block(val)
}
} }
for !ch.queue.IsEmpty() { for !ch.queue.IsEmpty() {
select { select {
case val, ok := <-ch.in: case val, ok := <-ch.in: // `ch.in` was closed
if !ok { if !ok {
ch.closeWait() ch.closeWait()
return return
} }
ch.queue.Push(val) if ok = ch.queuePush(val); !ok { // try to push the value into queue
ch.block(val)
}
case ch.out <- ch.queue.Peek(): case ch.out <- ch.queue.Peek():
ch.queue.Pop() ch.queuePop()
} }
} }
ch.shrinkQueue()
}
}
func (ch *UnboundedChan) shrinkQueue() { if ch.queue.Cap() > ch.queue.InitialCap() {
if ch.queue.IsEmpty() && ch.queue.Cap() > ch.queue.InitialSize() { ch.queueReset()
ch.queue.Reset() }
} }
} }
// closeWait waits for being empty of `ch.queue`
func (ch *UnboundedChan) closeWait() { func (ch *UnboundedChan) closeWait() {
for !ch.queue.IsEmpty() { for !ch.queue.IsEmpty() {
ch.out <- ch.queue.Pop() ch.out <- ch.queuePop()
}
}
// block waits for having an idle space on `ch.out`
func (ch *UnboundedChan) block(val interface{}) {
// `val` is not in `ch.queue` and `ch.in`, but it is stored into `UnboundedChan`
defer func() {
ch.queueLen.Add(-1)
}()
ch.queueLen.Add(1)
if !ch.queue.IsEmpty() {
ch.out <- ch.queue.Peek()
ch.queue.Pop()
ch.queue.Push(val)
return
}
ch.out <- val
}
func (ch *UnboundedChan) queuePush(val interface{}) (ok bool) {
ok = ch.queue.Push(val)
if ok {
ch.queueLen.Add(1)
ch.queueCap.Store(int32(ch.queue.Cap()))
} }
return
}
func (ch *UnboundedChan) queueReset() {
ch.queue.Reset()
ch.queueCap.Store(int32(ch.queue.Cap()))
}
func (ch *UnboundedChan) queuePop() (t interface{}) {
t = ch.queue.Pop()
ch.queueLen.Add(-1)
return
} }
...@@ -20,6 +20,7 @@ package gxchan ...@@ -20,6 +20,7 @@ package gxchan
import ( import (
"sync" "sync"
"testing" "testing"
"time"
) )
import ( import (
...@@ -79,3 +80,148 @@ func TestUnboundedChan(t *testing.T) { ...@@ -79,3 +80,148 @@ func TestUnboundedChan(t *testing.T) {
assert.Equal(t, 720600, count) assert.Equal(t, 720600, count)
} }
func TestUnboundedChan_Quota(t *testing.T) {
t.Run("testQuota1", testQuota1)
t.Run("testQuota2", testQuota2)
}
func testQuota1(t *testing.T) {
ch := NewUnboundedChanWithQuota(10, 15)
assert.Equal(t, 2, cap(ch.in))
assert.Equal(t, 3, cap(ch.out))
assert.Equal(t, 4, ch.queue.Cap())
assert.Equal(t, 0, ch.Len())
assert.Equal(t, 10, ch.Cap())
var count int
for i := 0; i < 10; i++ {
ch.In() <- i
}
assert.True(t, 14 >= ch.Cap())
assert.True(t, 10 >= ch.Len())
for i := 0; i < 10; i++ {
v, ok := <-ch.Out()
assert.True(t, ok)
count += v.(int)
}
assert.Equal(t, 45, count)
for i := 0; i < 15; i++ {
ch.In() <- i
}
assert.True(t, 15 >= ch.Cap())
assert.True(t, 15 >= ch.Len())
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
ch.In() <- 15
}()
assert.True(t, 15 >= ch.Cap())
assert.True(t, 15 >= ch.Len())
for i := 0; i < 16; i++ {
v, ok := <-ch.Out()
assert.True(t, ok)
count += v.(int)
}
assert.True(t, 15 >= ch.Len())
assert.True(t, 10 >= ch.Cap())
wg.Wait()
assert.Equal(t, 165, count)
}
// testQuota2 tests `ch.in` has no space
func testQuota2(t *testing.T) {
ch := NewUnboundedChanWithQuota(1, 1)
for i := 0; i < 1; i++ {
ch.In() <- i
}
time.Sleep(10 * time.Millisecond)
select {
case ch.In() <- 1:
default:
assert.Fail(t, "the chan shouldn't be blocked")
}
time.Sleep(10 * time.Millisecond)
select {
case ch.In() <- 2:
assert.Fail(t, "the chan should be blocked")
default:
}
ch = NewUnboundedChanWithQuota(1, 0)
for i := 0; i < 2; i++ {
ch.In() <- i
}
time.Sleep(10 * time.Millisecond)
select {
case ch.In() <- 2:
assert.True(t, ch.Len() <= 3)
default:
assert.Fail(t, "the chan shouldn't be blocked")
}
ch = NewUnboundedChanWithQuota(1, 2)
for i := 0; i < 1; i++ {
ch.In() <- i
}
time.Sleep(10 * time.Millisecond)
select {
case ch.In() <- 1:
default:
assert.Fail(t, "the chan shouldn't be blocked")
}
time.Sleep(10 * time.Millisecond)
select {
case ch.In() <- 1:
assert.Fail(t, "the chan should be blocked")
default:
}
ch = NewUnboundedChanWithQuota(1, 3)
for i := 0; i < 2; i++ {
ch.In() <- i
}
time.Sleep(10 * time.Millisecond)
select {
case ch.In() <- 2:
default:
assert.Fail(t, "the chan shouldn't be blocked")
}
time.Sleep(10 * time.Millisecond)
select {
case ch.In() <- 2:
assert.Fail(t, "the chan should be blocked")
default:
}
}
...@@ -22,19 +22,32 @@ const ( ...@@ -22,19 +22,32 @@ const (
) )
// CircularUnboundedQueue is a circular structure and will grow automatically if it exceeds the capacity. // CircularUnboundedQueue is a circular structure and will grow automatically if it exceeds the capacity.
// CircularUnboundedQueue is not thread-safe.
type CircularUnboundedQueue struct { type CircularUnboundedQueue struct {
data []interface{} data []interface{}
head, tail int head, tail int
isize int // initial size icap int // initial capacity
quota int // specify the maximum size of the queue, setting to 0 denotes unlimited.
} }
func NewCircularUnboundedQueue(size int) *CircularUnboundedQueue { func NewCircularUnboundedQueue(capacity int) *CircularUnboundedQueue {
if size < 0 { return NewCircularUnboundedQueueWithQuota(capacity, 0)
panic("size should be greater than zero") }
func NewCircularUnboundedQueueWithQuota(capacity, quota int) *CircularUnboundedQueue {
if capacity < 0 {
panic("capacity should be greater than zero")
}
if quota < 0 {
panic("quota should be greater or equal to zero")
}
if quota != 0 && capacity > quota {
capacity = quota
} }
return &CircularUnboundedQueue{ return &CircularUnboundedQueue{
data: make([]interface{}, size+1), data: make([]interface{}, capacity+1),
isize: size, icap: capacity,
quota: quota,
} }
} }
...@@ -42,13 +55,21 @@ func (q *CircularUnboundedQueue) IsEmpty() bool { ...@@ -42,13 +55,21 @@ func (q *CircularUnboundedQueue) IsEmpty() bool {
return q.head == q.tail return q.head == q.tail
} }
func (q *CircularUnboundedQueue) Push(t interface{}) { func (q *CircularUnboundedQueue) Push(t interface{}) bool {
q.data[q.tail] = t if nextTail := (q.tail + 1) % len(q.data); nextTail != q.head {
q.data[q.tail] = t
q.tail = nextTail
return true
}
q.tail = (q.tail + 1) % len(q.data) if q.grow() {
if q.tail == q.head { // grow succeed
q.grow() q.data[q.tail] = t
q.tail = (q.tail + 1) % len(q.data)
return true
} }
return false
} }
func (q *CircularUnboundedQueue) Pop() interface{} { func (q *CircularUnboundedQueue) Pop() interface{} {
...@@ -82,27 +103,41 @@ func (q *CircularUnboundedQueue) Len() int { ...@@ -82,27 +103,41 @@ func (q *CircularUnboundedQueue) Len() int {
} }
func (q *CircularUnboundedQueue) Reset() { func (q *CircularUnboundedQueue) Reset() {
q.data = make([]interface{}, q.isize+1) q.data = make([]interface{}, q.icap+1)
q.head, q.tail = 0, 0 q.head, q.tail = 0, 0
} }
func (q *CircularUnboundedQueue) InitialSize() int { func (q *CircularUnboundedQueue) InitialCap() int {
return q.isize return q.icap
} }
func (q *CircularUnboundedQueue) grow() { func (q *CircularUnboundedQueue) grow() bool {
oldsize := len(q.data) - 1 oldcap := q.Cap()
var newsize int if oldcap == 0 {
if oldsize < fastGrowThreshold { oldcap++
newsize = oldsize * 2 }
var newcap int
if oldcap < fastGrowThreshold {
newcap = oldcap * 2
} else { } else {
newsize = oldsize + oldsize/4 newcap = oldcap + oldcap/4
}
if q.quota != 0 && newcap > q.quota {
newcap = q.quota
}
if newcap == q.Cap() {
return false
} }
newdata := make([]interface{}, newsize+1) newdata := make([]interface{}, newcap+1)
copy(newdata[0:], q.data[q.head:]) copy(newdata[0:], q.data[q.head:])
copy(newdata[len(q.data)-q.head:], q.data[:q.head]) if q.head > q.tail {
copy(newdata[len(q.data)-q.head:], q.data[:q.head-1])
}
q.head, q.tail = 0, q.Cap()
q.data = newdata q.data = newdata
q.head, q.tail = 0, oldsize+1
return true
} }
...@@ -65,7 +65,7 @@ func TestCircularUnboundedQueueWithoutGrowing(t *testing.T) { ...@@ -65,7 +65,7 @@ func TestCircularUnboundedQueueWithoutGrowing(t *testing.T) {
assert.Equal(t, 10, queue.Cap()) assert.Equal(t, 10, queue.Cap())
} }
func TestBufferWithGrowing(t *testing.T) { func TestCircularUnboundedQueueWithGrowing(t *testing.T) {
// size < fastGrowThreshold // size < fastGrowThreshold
queue := NewCircularUnboundedQueue(10) queue := NewCircularUnboundedQueue(10)
...@@ -81,6 +81,26 @@ func TestBufferWithGrowing(t *testing.T) { ...@@ -81,6 +81,26 @@ func TestBufferWithGrowing(t *testing.T) {
assert.Equal(t, 0, queue.Len()) assert.Equal(t, 0, queue.Len())
assert.Equal(t, 10, queue.Cap()) assert.Equal(t, 10, queue.Cap())
for i := 0; i < 8; i++ {
queue.Push(i)
queue.Pop()
}
for i := 0; i < 11; i++ {
queue.Push(i)
if i == 9 {
expectedArr := []int{3, 4, 5, 6, 7, 8, 9, 7, 0, 1, 2}
for j := range queue.data {
assert.Equal(t, expectedArr[j], queue.data[j].(int))
}
}
}
assert.Equal(t, 11, queue.Len())
assert.Equal(t, 20, queue.Cap())
for i := 0; i < 11; i++ {
assert.Equal(t, i, queue.Pop())
}
queue = NewCircularUnboundedQueue(fastGrowThreshold) queue = NewCircularUnboundedQueue(fastGrowThreshold)
// write fastGrowThreshold+1 elements // write fastGrowThreshold+1 elements
...@@ -95,3 +115,32 @@ func TestBufferWithGrowing(t *testing.T) { ...@@ -95,3 +115,32 @@ func TestBufferWithGrowing(t *testing.T) {
assert.Equal(t, 0, queue.Len()) assert.Equal(t, 0, queue.Len())
assert.Equal(t, fastGrowThreshold, queue.Cap()) assert.Equal(t, fastGrowThreshold, queue.Cap())
} }
func TestCircularUnboundedQueueWithQuota(t *testing.T) {
queue := NewCircularUnboundedQueueWithQuota(10, 9)
assert.Equal(t, 0, queue.Len())
assert.Equal(t, 9, queue.Cap())
queue = NewCircularUnboundedQueueWithQuota(10, 15)
for i := 0; i < 10; i++ {
ok := queue.Push(i)
assert.True(t, ok)
}
assert.Equal(t, 10, queue.Len())
assert.Equal(t, 10, queue.Cap())
for i := 0; i < 10; i++ {
v := queue.Pop()
assert.Equal(t, i, v.(int))
}
for i := 0; i < 15; i++ {
ok := queue.Push(i)
assert.True(t, ok)
}
assert.Equal(t, 15, queue.Len())
assert.Equal(t, 15, queue.Cap())
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment