Unverified Commit d6ee0beb authored by Laurence's avatar Laurence Committed by GitHub

ftr: add chanx (#63)

parent c1449770
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package chanx
import (
"errors"
)
var ErrIsEmpty = errors.New("ringbuffer is empty")
// RingBuffer is a ring buffer for common types.
// It never is full and always grows if it will be full.
// It is not thread-safe(goroutine-safe) so you must use Lock to use it in multiple writers and multiple readers.
type RingBuffer struct {
buf []T
initialSize int
size int
r int // read pointer
w int // write pointer
}
func NewRingBuffer(initialSize int) *RingBuffer {
if initialSize <= 0 {
panic("initial size must be great than zero")
}
// initial size must >= 2
if initialSize == 1 {
initialSize = 2
}
return &RingBuffer{
buf: make([]T, initialSize),
initialSize: initialSize,
size: initialSize,
}
}
func (r *RingBuffer) Read() (T, error) {
if r.r == r.w {
return nil, ErrIsEmpty
}
v := r.buf[r.r]
r.r++
if r.r == r.size {
r.r = 0
}
return v, nil
}
func (r *RingBuffer) Pop() T {
v, err := r.Read()
if err == ErrIsEmpty { // Empty
panic(ErrIsEmpty.Error())
}
return v
}
func (r *RingBuffer) Peek() T {
if r.r == r.w { // Empty
panic(ErrIsEmpty.Error())
}
v := r.buf[r.r]
return v
}
func (r *RingBuffer) Write(v T) {
r.buf[r.w] = v
r.w++
if r.w == r.size {
r.w = 0
}
if r.w == r.r { // full
r.grow()
}
}
func (r *RingBuffer) grow() {
var size int
if r.size < 1024 {
size = r.size * 2
} else {
size = r.size + r.size/4
}
buf := make([]T, size)
copy(buf[0:], r.buf[r.r:])
copy(buf[r.size-r.r:], r.buf[0:r.r])
r.r = 0
r.w = r.size
r.size = size
r.buf = buf
}
func (r *RingBuffer) IsEmpty() bool {
return r.r == r.w
}
// Capacity returns the size of the underlying buffer.
func (r *RingBuffer) Capacity() int {
return r.size
}
func (r *RingBuffer) Len() int {
if r.r == r.w {
return 0
}
if r.w > r.r {
return r.w - r.r
}
return r.size - r.r + r.w
}
func (r *RingBuffer) Reset() {
r.r = 0
r.w = 0
r.size = r.initialSize
r.buf = make([]T, r.initialSize)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package chanx
import (
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
func TestRingBuffer(t *testing.T) {
rb := NewRingBuffer(10)
v, err := rb.Read()
assert.Nil(t, v)
assert.Error(t, err, ErrIsEmpty)
write := 0
read := 0
// write one and read it
rb.Write(0)
v, err = rb.Read()
assert.NoError(t, err)
assert.Equal(t, 0, v)
assert.Equal(t, 1, rb.r)
assert.Equal(t, 1, rb.w)
assert.True(t, rb.IsEmpty())
// then write 10
for i := 0; i < 9; i++ {
rb.Write(i)
write += i
}
assert.Equal(t, 10, rb.Capacity())
assert.Equal(t, 9, rb.Len())
// write one more, the buffer is full so it grows
rb.Write(10)
write += 10
assert.Equal(t, 20, rb.Capacity())
assert.Equal(t, 10, rb.Len())
for i := 0; i < 90; i++ {
rb.Write(i)
write += i
}
assert.Equal(t, 160, rb.Capacity())
assert.Equal(t, 100, rb.Len())
for {
v, err := rb.Read()
if err == ErrIsEmpty {
break
}
read += v.(int)
}
assert.Equal(t, write, read)
rb.Reset()
assert.Equal(t, 10, rb.Capacity())
assert.Equal(t, 0, rb.Len())
assert.True(t, rb.IsEmpty())
}
func TestRingBuffer_One(t *testing.T) {
rb := NewRingBuffer(1)
v, err := rb.Read()
assert.Nil(t, v)
assert.Error(t, err, ErrIsEmpty)
write := 0
read := 0
// write one and read it
rb.Write(0)
v, err = rb.Read()
assert.NoError(t, err)
assert.Equal(t, 0, v)
assert.Equal(t, 1, rb.r)
assert.Equal(t, 1, rb.w)
assert.True(t, rb.IsEmpty())
// then write 10
for i := 0; i < 9; i++ {
rb.Write(i)
write += i
}
assert.Equal(t, 16, rb.Capacity())
assert.Equal(t, 9, rb.Len())
// write one more, the buffer is full so it grows
rb.Write(10)
write += 10
assert.Equal(t, 16, rb.Capacity())
assert.Equal(t, 10, rb.Len())
for i := 0; i < 90; i++ {
rb.Write(i)
write += i
}
assert.Equal(t, 128, rb.Capacity())
assert.Equal(t, 100, rb.Len())
for {
v, err := rb.Read()
if err == ErrIsEmpty {
break
}
read += v.(int)
}
assert.Equal(t, write, read)
rb.Reset()
assert.Equal(t, 2, rb.Capacity())
assert.Equal(t, 0, rb.Len())
assert.True(t, rb.IsEmpty())
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package chanx
// T defines interface{}, and will be used for generic type after go 1.18 is released.
type T interface{}
// UnboundedChan is an unbounded chan.
// In is used to write without blocking, which supports multiple writers.
// and Out is used to read, which supports multiple readers.
// You can close the in channel if you want.
type UnboundedChan struct {
In chan<- T // channel for write
Out <-chan T // channel for read
buffer *RingBuffer // buffer
}
// Len returns len of In plus len of Out plus len of buffer.
func (c UnboundedChan) Len() int {
return len(c.In) + c.buffer.Len() + len(c.Out)
}
// BufLen returns len of the buffer.
func (c UnboundedChan) BufLen() int {
return c.buffer.Len()
}
// NewUnboundedChan creates the unbounded chan.
// in is used to write without blocking, which supports multiple writers.
// and out is used to read, which supports multiple readers.
// You can close the in channel if you want.
func NewUnboundedChan(initCapacity int) UnboundedChan {
return NewUnboundedChanSize(initCapacity, initCapacity, initCapacity)
}
// NewUnboundedChanSize is like NewUnboundedChan but you can set initial capacity for In, Out, Buffer.
func NewUnboundedChanSize(initInCapacity, initOutCapacity, initBufCapacity int) UnboundedChan {
in := make(chan T, initInCapacity)
out := make(chan T, initOutCapacity)
ch := UnboundedChan{In: in, Out: out, buffer: NewRingBuffer(initBufCapacity)}
go process(in, out, ch)
return ch
}
func process(in, out chan T, ch UnboundedChan) {
defer close(out)
loop:
for {
val, ok := <-in
if !ok { // in is closed
break loop
}
// out is not full
select {
case out <- val:
continue
default:
}
// out is full
ch.buffer.Write(val)
for !ch.buffer.IsEmpty() {
select {
case val, ok := <-in:
if !ok { // in is closed
break loop
}
ch.buffer.Write(val)
case out <- ch.buffer.Peek():
ch.buffer.Pop()
if ch.buffer.IsEmpty() && ch.buffer.size > ch.buffer.initialSize { // after burst
ch.buffer.Reset()
}
}
}
}
// drain
for !ch.buffer.IsEmpty() {
out <- ch.buffer.Pop()
}
ch.buffer.Reset()
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package chanx
import (
"sync"
"testing"
)
func TestMakeUnboundedChan(t *testing.T) {
ch := NewUnboundedChan(100)
for i := 1; i < 200; i++ {
ch.In <- int64(i)
}
var count int64
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for v := range ch.Out {
count += v.(int64)
}
}()
for i := 200; i <= 1000; i++ {
ch.In <- int64(i)
}
close(ch.In)
wg.Wait()
if count != 500500 {
t.Fatalf("expected 500500 but got %d", count)
}
}
func TestMakeUnboundedChanSize(t *testing.T) {
ch := NewUnboundedChanSize(10, 50, 100)
for i := 1; i < 200; i++ {
ch.In <- int64(i)
}
var count int64
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for v := range ch.Out {
count += v.(int64)
}
}()
for i := 200; i <= 1000; i++ {
ch.In <- int64(i)
}
close(ch.In)
wg.Wait()
if count != 500500 {
t.Fatalf("expected 500500 but got %d", count)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment