Unverified Commit f45e1075 authored by XavierNiu's avatar XavierNiu Committed by GitHub

Ftr: Unbounded Chan (#64)

* ftr: chanx

* go fmt

* fix apache license

* rename package

* unbounded chan

* go fmt

* remove type T
parent d6ee0beb
...@@ -15,89 +15,91 @@ ...@@ -15,89 +15,91 @@
* limitations under the License. * limitations under the License.
*/ */
package chanx package gxchan
// T defines interface{}, and will be used for generic type after go 1.18 is released. import (
type T interface{} "github.com/dubbogo/gost/container/queue"
)
// UnboundedChan is an unbounded chan. // UnboundedChan is a chan that could grow if the number of elements exceeds the capacity.
// In is used to write without blocking, which supports multiple writers.
// and Out is used to read, which supports multiple readers.
// You can close the in channel if you want.
type UnboundedChan struct { type UnboundedChan struct {
In chan<- T // channel for write in chan interface{}
Out <-chan T // channel for read out chan interface{}
buffer *RingBuffer // buffer queue *gxqueue.CircularUnboundedQueue
} }
// Len returns len of In plus len of Out plus len of buffer. // NewUnboundedChan creates an instance of UnboundedChan.
func (c UnboundedChan) Len() int { func NewUnboundedChan(capacity int) *UnboundedChan {
return len(c.In) + c.buffer.Len() + len(c.Out) ch := &UnboundedChan{
} in: make(chan interface{}, capacity/3),
out: make(chan interface{}, capacity/3),
queue: gxqueue.NewCircularUnboundedQueue(capacity - 2*(capacity/3)),
}
// BufLen returns len of the buffer. go ch.run()
func (c UnboundedChan) BufLen() int {
return c.buffer.Len()
}
// NewUnboundedChan creates the unbounded chan. return ch
// in is used to write without blocking, which supports multiple writers.
// and out is used to read, which supports multiple readers.
// You can close the in channel if you want.
func NewUnboundedChan(initCapacity int) UnboundedChan {
return NewUnboundedChanSize(initCapacity, initCapacity, initCapacity)
} }
// NewUnboundedChanSize is like NewUnboundedChan but you can set initial capacity for In, Out, Buffer. // In returns write-only chan
func NewUnboundedChanSize(initInCapacity, initOutCapacity, initBufCapacity int) UnboundedChan { func (ch *UnboundedChan) In() chan<- interface{} {
in := make(chan T, initInCapacity) return ch.in
out := make(chan T, initOutCapacity) }
ch := UnboundedChan{In: in, Out: out, buffer: NewRingBuffer(initBufCapacity)}
go process(in, out, ch) // Out returns read-only chan
func (ch *UnboundedChan) Out() <-chan interface{} {
return ch.out
}
return ch func (ch *UnboundedChan) Len() int {
return len(ch.in) + len(ch.out) + ch.queue.Len()
} }
func process(in, out chan T, ch UnboundedChan) { func (ch *UnboundedChan) run() {
defer close(out) defer func() {
loop: close(ch.out)
}()
for { for {
val, ok := <-in val, ok := <-ch.in
if !ok { // in is closed if !ok {
break loop // `ch.in` was closed and queue has no elements
return
} }
// out is not full
select { select {
case out <- val: // data was written to `ch.out`
case ch.out <- val:
continue continue
// `ch.out` is full, move the data to `ch.queue`
default: default:
ch.queue.Push(val)
} }
// out is full for !ch.queue.IsEmpty() {
ch.buffer.Write(val)
for !ch.buffer.IsEmpty() {
select { select {
case val, ok := <-in: case val, ok := <-ch.in:
if !ok { // in is closed if !ok {
break loop ch.closeWait()
} return
ch.buffer.Write(val)
case out <- ch.buffer.Peek():
ch.buffer.Pop()
if ch.buffer.IsEmpty() && ch.buffer.size > ch.buffer.initialSize { // after burst
ch.buffer.Reset()
} }
ch.queue.Push(val)
case ch.out <- ch.queue.Peek():
ch.queue.Pop()
} }
} }
ch.shrinkQueue()
} }
}
// drain func (ch *UnboundedChan) shrinkQueue() {
for !ch.buffer.IsEmpty() { if ch.queue.IsEmpty() && ch.queue.Cap() > ch.queue.InitialSize() {
out <- ch.buffer.Pop() ch.queue.Reset()
} }
}
ch.buffer.Reset() func (ch *UnboundedChan) closeWait() {
for !ch.queue.IsEmpty() {
ch.out <- ch.queue.Pop()
}
} }
...@@ -15,69 +15,67 @@ ...@@ -15,69 +15,67 @@
* limitations under the License. * limitations under the License.
*/ */
package chanx package gxchan
import ( import (
"sync" "sync"
"testing" "testing"
) )
func TestMakeUnboundedChan(t *testing.T) { import (
ch := NewUnboundedChan(100) "github.com/stretchr/testify/assert"
)
func TestUnboundedChan(t *testing.T) {
ch := NewUnboundedChan(300)
var count int
for i := 1; i < 200; i++ { for i := 1; i < 200; i++ {
ch.In <- int64(i) ch.In() <- i
}
for i := 1; i < 60; i++ {
v, _ := <-ch.Out()
count += v.(int)
} }
var count int64 assert.Equal(t, 100, ch.queue.Cap())
var wg sync.WaitGroup
for i := 200; i <= 1200; i++ {
ch.In() <- i
}
assert.Equal(t, 1600, ch.queue.Cap())
wg := sync.WaitGroup{}
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
var icount int
for v := range ch.Out { for v := range ch.Out() {
count += v.(int64) count += v.(int)
icount++
if icount == 900 {
break
}
} }
}() }()
for i := 200; i <= 1000; i++ {
ch.In <- int64(i)
}
close(ch.In)
wg.Wait() wg.Wait()
if count != 500500 { close(ch.In())
t.Fatalf("expected 500500 but got %d", count)
}
}
func TestMakeUnboundedChanSize(t *testing.T) {
ch := NewUnboundedChanSize(10, 50, 100)
for i := 1; i < 200; i++ {
ch.In <- int64(i)
}
var count int64 // buffer should be empty
var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
for v := range ch.Out() {
for v := range ch.Out { count += v.(int)
count += v.(int64)
} }
}() }()
for i := 200; i <= 1000; i++ {
ch.In <- int64(i)
}
close(ch.In)
wg.Wait() wg.Wait()
if count != 500500 { assert.Equal(t, 720600, count)
t.Fatalf("expected 500500 but got %d", count)
}
} }
...@@ -15,129 +15,94 @@ ...@@ -15,129 +15,94 @@
* limitations under the License. * limitations under the License.
*/ */
package chanx package gxqueue
import ( const (
"errors" fastGrowThreshold = 1024
) )
var ErrIsEmpty = errors.New("ringbuffer is empty") // CircularUnboundedQueue is a circular structure and will grow automatically if it exceeds the capacity.
type CircularUnboundedQueue struct {
// RingBuffer is a ring buffer for common types. data []interface{}
// It never is full and always grows if it will be full. head, tail int
// It is not thread-safe(goroutine-safe) so you must use Lock to use it in multiple writers and multiple readers. isize int // initial size
type RingBuffer struct {
buf []T
initialSize int
size int
r int // read pointer
w int // write pointer
} }
func NewRingBuffer(initialSize int) *RingBuffer { func NewCircularUnboundedQueue(size int) *CircularUnboundedQueue {
if initialSize <= 0 { if size < 0 {
panic("initial size must be great than zero") panic("size should be greater than zero")
} }
// initial size must >= 2 return &CircularUnboundedQueue{
if initialSize == 1 { data: make([]interface{}, size+1),
initialSize = 2 isize: size,
} }
}
return &RingBuffer{ func (q *CircularUnboundedQueue) IsEmpty() bool {
buf: make([]T, initialSize), return q.head == q.tail
initialSize: initialSize,
size: initialSize,
}
} }
func (r *RingBuffer) Read() (T, error) { func (q *CircularUnboundedQueue) Push(t interface{}) {
if r.r == r.w { q.data[q.tail] = t
return nil, ErrIsEmpty
}
v := r.buf[r.r] q.tail = (q.tail + 1) % len(q.data)
r.r++ if q.tail == q.head {
if r.r == r.size { q.grow()
r.r = 0
} }
return v, nil
} }
func (r *RingBuffer) Pop() T { func (q *CircularUnboundedQueue) Pop() interface{} {
v, err := r.Read() if q.IsEmpty() {
if err == ErrIsEmpty { // Empty panic("queue has no element")
panic(ErrIsEmpty.Error())
} }
return v t := q.data[q.head]
} q.head = (q.head + 1) % len(q.data)
func (r *RingBuffer) Peek() T { return t
if r.r == r.w { // Empty
panic(ErrIsEmpty.Error())
}
v := r.buf[r.r]
return v
} }
func (r *RingBuffer) Write(v T) { func (q *CircularUnboundedQueue) Peek() interface{} {
r.buf[r.w] = v if q.IsEmpty() {
r.w++ panic("queue has no element")
if r.w == r.size {
r.w = 0
} }
return q.data[q.head]
}
if r.w == r.r { // full func (q *CircularUnboundedQueue) Cap() int {
r.grow() return len(q.data) - 1
}
} }
func (r *RingBuffer) grow() { func (q *CircularUnboundedQueue) Len() int {
var size int head, tail := q.head, q.tail
if r.size < 1024 { if head > tail {
size = r.size * 2 tail += len(q.data)
} else {
size = r.size + r.size/4
} }
return tail - head
buf := make([]T, size)
copy(buf[0:], r.buf[r.r:])
copy(buf[r.size-r.r:], r.buf[0:r.r])
r.r = 0
r.w = r.size
r.size = size
r.buf = buf
} }
func (r *RingBuffer) IsEmpty() bool { func (q *CircularUnboundedQueue) Reset() {
return r.r == r.w q.data = make([]interface{}, q.isize+1)
q.head, q.tail = 0, 0
} }
// Capacity returns the size of the underlying buffer. func (q *CircularUnboundedQueue) InitialSize() int {
func (r *RingBuffer) Capacity() int { return q.isize
return r.size
} }
func (r *RingBuffer) Len() int { func (q *CircularUnboundedQueue) grow() {
if r.r == r.w { oldsize := len(q.data) - 1
return 0 var newsize int
} if oldsize < fastGrowThreshold {
newsize = oldsize * 2
if r.w > r.r { } else {
return r.w - r.r newsize = oldsize + oldsize/4
} }
return r.size - r.r + r.w newdata := make([]interface{}, newsize+1)
} copy(newdata[0:], q.data[q.head:])
copy(newdata[len(q.data)-q.head:], q.data[:q.head])
func (r *RingBuffer) Reset() { q.data = newdata
r.r = 0 q.head, q.tail = 0, oldsize+1
r.w = 0
r.size = r.initialSize
r.buf = make([]T, r.initialSize)
} }
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package chanx package gxqueue
import ( import (
"testing" "testing"
...@@ -25,116 +25,73 @@ import ( ...@@ -25,116 +25,73 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
func TestRingBuffer(t *testing.T) { func TestCircularUnboundedQueueWithoutGrowing(t *testing.T) {
rb := NewRingBuffer(10) queue := NewCircularUnboundedQueue(10)
v, err := rb.Read()
assert.Nil(t, v) queue.Reset()
assert.Error(t, err, ErrIsEmpty)
// write 1 element
write := 0 queue.Push(1)
read := 0 assert.Equal(t, 1, queue.Len())
assert.Equal(t, 10, queue.Cap())
// write one and read it // peek and pop
rb.Write(0) assert.Equal(t, 1, queue.Peek())
v, err = rb.Read() assert.Equal(t, 1, queue.Pop())
assert.NoError(t, err) // inspect len and cap
assert.Equal(t, 0, v) assert.Equal(t, 0, queue.Len())
assert.Equal(t, 1, rb.r) assert.Equal(t, 10, queue.Cap())
assert.Equal(t, 1, rb.w)
assert.True(t, rb.IsEmpty()) // write 8 elements
for i := 0; i < 8; i++ {
// then write 10 queue.Push(i)
for i := 0; i < 9; i++ {
rb.Write(i)
write += i
} }
assert.Equal(t, 10, rb.Capacity()) assert.Equal(t, 8, queue.Len())
assert.Equal(t, 9, rb.Len()) assert.Equal(t, 10, queue.Cap())
// write one more, the buffer is full so it grows var v interface{}
rb.Write(10) // pop 5 elements
write += 10 for i := 0; i < 5; i++ {
assert.Equal(t, 20, rb.Capacity()) v = queue.Pop()
assert.Equal(t, 10, rb.Len()) assert.Equal(t, i, v)
for i := 0; i < 90; i++ {
rb.Write(i)
write += i
} }
assert.Equal(t, 3, queue.Len())
assert.Equal(t, 10, queue.Cap())
assert.Equal(t, 160, rb.Capacity()) // write 6 elements
assert.Equal(t, 100, rb.Len()) for i := 0; i < 6; i++ {
queue.Push(i)
for {
v, err := rb.Read()
if err == ErrIsEmpty {
break
}
read += v.(int)
} }
assert.Equal(t, 9, queue.Len())
assert.Equal(t, write, read) assert.Equal(t, 10, queue.Cap())
rb.Reset()
assert.Equal(t, 10, rb.Capacity())
assert.Equal(t, 0, rb.Len())
assert.True(t, rb.IsEmpty())
} }
func TestRingBuffer_One(t *testing.T) { func TestBufferWithGrowing(t *testing.T) {
rb := NewRingBuffer(1) // size < fastGrowThreshold
v, err := rb.Read() queue := NewCircularUnboundedQueue(10)
assert.Nil(t, v)
assert.Error(t, err, ErrIsEmpty) // write 11 elements
for i := 0; i < 11; i++ {
write := 0 queue.Push(i)
read := 0
// write one and read it
rb.Write(0)
v, err = rb.Read()
assert.NoError(t, err)
assert.Equal(t, 0, v)
assert.Equal(t, 1, rb.r)
assert.Equal(t, 1, rb.w)
assert.True(t, rb.IsEmpty())
// then write 10
for i := 0; i < 9; i++ {
rb.Write(i)
write += i
}
assert.Equal(t, 16, rb.Capacity())
assert.Equal(t, 9, rb.Len())
// write one more, the buffer is full so it grows
rb.Write(10)
write += 10
assert.Equal(t, 16, rb.Capacity())
assert.Equal(t, 10, rb.Len())
for i := 0; i < 90; i++ {
rb.Write(i)
write += i
} }
assert.Equal(t, 128, rb.Capacity()) assert.Equal(t, 11, queue.Len())
assert.Equal(t, 100, rb.Len()) assert.Equal(t, 20, queue.Cap())
queue.Reset()
assert.Equal(t, 0, queue.Len())
assert.Equal(t, 10, queue.Cap())
for { queue = NewCircularUnboundedQueue(fastGrowThreshold)
v, err := rb.Read()
if err == ErrIsEmpty {
break
}
read += v.(int) // write fastGrowThreshold+1 elements
for i := 0; i < fastGrowThreshold+1; i++ {
queue.Push(i)
} }
assert.Equal(t, write, read) assert.Equal(t, fastGrowThreshold+1, queue.Len())
assert.Equal(t, fastGrowThreshold+fastGrowThreshold/4, queue.Cap())
rb.Reset() queue.Reset()
assert.Equal(t, 2, rb.Capacity()) assert.Equal(t, 0, queue.Len())
assert.Equal(t, 0, rb.Len()) assert.Equal(t, fastGrowThreshold, queue.Cap())
assert.True(t, rb.IsEmpty())
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment