Commit 2929d9cd authored by AlexStocks's avatar AlexStocks

Add: memory pool

parent c84d02bf
## copy from github.com/sofastack/sofa-mosn
## 自定义结构体复用
##### 请求维度的内存申请复用
* 模板
```
package example
import (
"context"
"sofastack.io/sofa-mosn/pkg/buffer"
"net/http"
)
var ins exampleBufferCtx
// 注册buffer类型到内存复用框架
func init() {
buffer.RegisterBuffer(&ins)
}
// 需要包含 buffer.TempBufferCtx 到自定义的Ctx, 且要放到第一位
type exampleBufferCtx struct{
buffer.TempBufferCtx
}
// 实现New()函数, 用于生成自定义buffer
func (ctx exampleBufferCtx) New() interface{} {
buffer := new(exampleBuffers)
return buffer
}
// 实现Reset()函数, 用于回收buffer之前,重置buffer内复用的结构体
func (ctx exampleBufferCtx) Reset(i interface{}) {
buf := i.(*exampleBufferCtx)
*buf = exampleBufferCtx{}
}
// 自定义buffer结构体,包含需要复用的结构体
type exampleBuffers struct {
req http.Request
rsp http.Response
}
// 通过ctx获取复用buffer
func exampleBuffersByContext(ctx context.Context) *exampleBuffers {
poolCtx := buffer.PoolContext(ctx)
return poolCtx.Find(&ins, nil).(*exampleBuffers)
}
```
* 使用方式
```
func run(ctx context.Context) {
// 通过ctx获取内存块
buffer := exampleBuffersByContext(ctx)
// 通过指针使用
req := &buffer.req
rsp := &buffer.rsp
}
```
## IoBuffer复用
```
// GetIoBuffer returns IoBuffer from pool
func GetIoBuffer(size int) Buffer {
return ibPool.take(size)
}
// PutIoBuffer returns IoBuffer to pool
func PutIoBuffer(buf Buffer) {
if buf.Count(-1) != 0 {
return
}
ibPool.give(buf)
}
```
## Byte复用
```
// GetBytes returns *[]byte from byteBufferPool
func GetBytes(size int) *[]byte {
p := getByteBufferPool()
return p.take(size)
}
// PutBytes Put *[]byte to byteBufferPool
func PutBytes(buf *[]byte) {
p := getByteBufferPool()
p.give(buf)
}
```
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package gxbytes
import (
"context"
"sync"
"sync/atomic"
"unsafe"
)
// ContextKey type
type ContextKey int
// Context key types(built-in)
const (
ContextKeyStart ContextKey = iota
ContextKeyBufferPoolCtx
ContextKeyEnd
)
const maxBufferPool = 16
var (
index int32
bPool = bufferPoolArray[:]
vPool = new(valuePool)
bufferPoolArray [maxBufferPool]bufferPool
nullBufferValue [maxBufferPool]interface{}
)
// TempBufferCtx is template for BufferPoolCtx
type TempBufferCtx struct {
index int
}
func (t *TempBufferCtx) Index() int {
return t.index
}
func (t *TempBufferCtx) New() interface{} {
return nil
}
func (t *TempBufferCtx) Reset(x interface{}) {
}
// ifaceWords is interface internal representation.
type ifaceWords struct {
typ unsafe.Pointer
data unsafe.Pointer
}
// setIdex sets index, poolCtx must embedded TempBufferCtx
func setIndex(poolCtx BufferPoolCtx, i int) {
p := (*ifaceWords)(unsafe.Pointer(&poolCtx))
temp := (*TempBufferCtx)(p.data)
temp.index = i
}
func RegisterBuffer(poolCtx BufferPoolCtx) {
// frist index is 1
i := atomic.AddInt32(&index, 1)
if i >= maxBufferPool {
panic("bufferSize over full")
}
bPool[i].ctx = poolCtx
setIndex(poolCtx, int(i))
}
// bufferPool is buffer pool
type bufferPool struct {
ctx BufferPoolCtx
sync.Pool
}
type valuePool struct {
sync.Pool
}
// Take returns a buffer from buffer pool
func (p *bufferPool) take() (value interface{}) {
value = p.Get()
if value == nil {
value = p.ctx.New()
}
return
}
// Give returns a buffer to buffer pool
func (p *bufferPool) give(value interface{}) {
p.ctx.Reset(value)
p.Put(value)
}
// bufferValue is buffer pool's Value
type bufferValue struct {
value [maxBufferPool]interface{}
transmit [maxBufferPool]interface{}
}
// NewBufferPoolContext returns a context with bufferValue
func NewBufferPoolContext(ctx context.Context) context.Context {
return WithValue(ctx, ContextKeyBufferPoolCtx, newBufferValue())
}
// TransmitBufferPoolContext copy a context
func TransmitBufferPoolContext(dst context.Context, src context.Context) {
sValue := PoolContext(src)
if sValue.value == nullBufferValue {
return
}
dValue := PoolContext(dst)
dValue.transmit = sValue.value
sValue.value = nullBufferValue
}
// newBufferValue returns bufferValue
func newBufferValue() (value *bufferValue) {
v := vPool.Get()
if v == nil {
value = new(bufferValue)
} else {
value = v.(*bufferValue)
}
return
}
// Find returns buffer from bufferValue
func (bv *bufferValue) Find(poolCtx BufferPoolCtx, x interface{}) interface{} {
i := poolCtx.Index()
if i <= 0 || i > int(index) {
panic("buffer should call buffer.RegisterBuffer()")
}
if bv.value[i] != nil {
return bv.value[i]
}
return bv.Take(poolCtx)
}
// Take returns buffer from buffer pools
func (bv *bufferValue) Take(poolCtx BufferPoolCtx) (value interface{}) {
i := poolCtx.Index()
value = bPool[i].take()
bv.value[i] = value
return
}
// Give returns buffer to buffer pools
func (bv *bufferValue) Give() {
if index <= 0 {
return
}
// first index is 1
for i := 1; i <= int(index); i++ {
value := bv.value[i]
if value != nil {
bPool[i].give(value)
}
value = bv.transmit[i]
if value != nil {
bPool[i].give(value)
}
}
bv.value = nullBufferValue
bv.transmit = nullBufferValue
// Give bufferValue to Pool
vPool.Put(bv)
}
// PoolContext returns bufferValue by context
func PoolContext(ctx context.Context) *bufferValue {
if ctx != nil {
if val := Get(ctx, ContextKeyBufferPoolCtx); val != nil {
return val.(*bufferValue)
}
}
return newBufferValue()
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package gxbytes
import (
"context"
"testing"
"runtime/debug"
)
const Size = 2048
// Test byteBufferPool
func testbytepool() *[]byte {
b := GetBytes(Size)
buf := *b
for i := 0; i < Size; i++ {
buf[i] = 1
}
return b
}
func testbyte() []byte {
buf := make([]byte, Size)
for i := 0; i < Size; i++ {
buf[i] = 1
}
return buf
}
func BenchmarkBytePool(b *testing.B) {
for i := 0; i < b.N; i++ {
buf := testbytepool()
PutBytes(buf)
}
}
func BenchmarkByteMake(b *testing.B) {
for i := 0; i < b.N; i++ {
testbyte()
}
}
// Test IoBufferPool
var TestBuffer [Size]byte
func testiobufferpool() Buffer {
b := GetIoBuffer(Size)
b.Write(TestBuffer[:])
return b
}
func testiobuffer() Buffer {
b := NewIoBuffer(Size)
b.Write(TestBuffer[:])
return b
}
func BenchmarkIoBufferPool(b *testing.B) {
for i := 0; i < b.N; i++ {
buf := testiobufferpool()
PutIoBuffer(buf)
}
}
func BenchmarkIoBuffer(b *testing.B) {
for i := 0; i < b.N; i++ {
testiobuffer()
}
}
func Test_IoBufferPool(t *testing.T) {
str := "IoBufferPool Test"
buffer := GetIoBuffer(len(str))
buffer.Write([]byte(str))
b := make([]byte, 32)
_, err := buffer.Read(b)
if err != nil {
t.Fatal(err)
}
PutIoBuffer(buffer)
if string(b[:len(str)]) != str {
t.Fatal("IoBufferPool Test Failed")
}
t.Log("IoBufferPool Test Sucess")
}
func Test_IoBufferPool_Slice_Increase(t *testing.T) {
str := "IoBufferPool Test"
// []byte slice increase
buffer := GetIoBuffer(1)
buffer.Write([]byte(str))
b := make([]byte, 32)
_, err := buffer.Read(b)
if err != nil {
t.Fatal(err)
}
PutIoBuffer(buffer)
if string(b[:len(str)]) != str {
t.Fatal("IoBufferPool Test Slice Increase Failed")
}
t.Log("IoBufferPool Test Slice Increase Sucess")
}
func Test_IoBufferPool_Alloc_Free(t *testing.T) {
str := "IoBufferPool Test"
buffer := GetIoBuffer(100)
buffer.Free()
buffer.Alloc(1)
buffer.Write([]byte(str))
b := make([]byte, 32)
_, err := buffer.Read(b)
if err != nil {
t.Fatal(err)
}
PutIoBuffer(buffer)
if string(b[:len(str)]) != str {
t.Fatal("IoBufferPool Test Alloc Free Failed")
}
t.Log("IoBufferPool Test Alloc Free Sucess")
}
func Test_ByteBufferPool(t *testing.T) {
str := "ByteBufferPool Test"
b := GetBytes(len(str))
buf := *b
copy(buf, str)
if string(buf) != str {
t.Fatal("ByteBufferPool Test Failed")
}
PutBytes(b)
b = GetBytes(len(str))
buf = *b
copy(buf, str)
if string(buf) != str {
t.Fatal("ByteBufferPool Test Failed")
}
PutBytes(b)
t.Log("ByteBufferPool Test Sucess")
}
//test bufferpool
var mock mock_bufferctx
type mock_bufferctx struct {
TempBufferCtx
}
func (ctx *mock_bufferctx) New() interface{} {
return new(mock_buffers)
}
func (ctx *mock_bufferctx) Reset(x interface{}) {
buf := x.(*mock_buffers)
*buf = mock_buffers{}
}
type mock_buffers struct {
m [10]int
}
func mock_BuffersByContext(ctx context.Context) *mock_buffers {
poolCtx := PoolContext(ctx)
return poolCtx.Find(&mock, nil).(*mock_buffers)
}
func Test_BufferPool_Register(t *testing.T) {
defer func() {
if p := recover(); p != nil {
t.Log("expected panic")
}
}()
ctx1 := NewBufferPoolContext(context.Background())
mock_BuffersByContext(ctx1)
t.Errorf("should panic")
}
func Test_BufferPool(t *testing.T) {
// close GC
debug.SetGCPercent(100000)
var null [10]int
RegisterBuffer(&mock)
// first
ctx1 := NewBufferPoolContext(context.Background())
buf1 := mock_BuffersByContext(ctx1)
for i := 0; i < 10; i++ {
buf1.m[i] = i
}
t.Log(buf1.m)
PoolContext(ctx1).Give()
if buf1.m != null {
t.Errorf("test bufferPool Error: Reset() failed")
}
t.Log(buf1.m)
t.Logf("%p", buf1)
// second
ctx2 := NewBufferPoolContext(context.Background())
buf2 := mock_BuffersByContext(ctx2)
t.Logf("%p", buf2)
if buf1 != buf2 {
t.Errorf("test bufferPool Error: Reuse failed")
}
debug.SetGCPercent(100)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package gxbytes
import (
"context"
"sync"
)
var ins = ByteBufferCtx{}
func init() {
RegisterBuffer(&ins)
}
const minShift = 6
const maxShift = 18
const errSlot = -1
var bbPool *byteBufferPool
func init() {
bbPool = newByteBufferPool()
}
// byteBufferPool is []byte pools
type byteBufferPool struct {
minShift int
minSize int
maxSize int
pool []*bufferSlot
}
type bufferSlot struct {
defaultSize int
pool sync.Pool
}
// newByteBufferPool returns byteBufferPool
func newByteBufferPool() *byteBufferPool {
p := &byteBufferPool{
minShift: minShift,
minSize: 1 << minShift,
maxSize: 1 << maxShift,
}
for i := 0; i <= maxShift-minShift; i++ {
slab := &bufferSlot{
defaultSize: 1 << (uint)(i+minShift),
}
p.pool = append(p.pool, slab)
}
return p
}
func (p *byteBufferPool) slot(size int) int {
if size > p.maxSize {
return errSlot
}
slot := 0
shift := 0
if size > p.minSize {
size--
for size > 0 {
size = size >> 1
shift++
}
slot = shift - p.minShift
}
return slot
}
func newBytes(size int) []byte {
return make([]byte, size)
}
// take returns *[]byte from byteBufferPool
func (p *byteBufferPool) take(size int) *[]byte {
slot := p.slot(size)
if slot == errSlot {
b := newBytes(size)
return &b
}
v := p.pool[slot].pool.Get()
if v == nil {
b := newBytes(p.pool[slot].defaultSize)
b = b[0:size]
return &b
}
b := v.(*[]byte)
*b = (*b)[0:size]
return b
}
// give returns *[]byte to byteBufferPool
func (p *byteBufferPool) give(buf *[]byte) {
if buf == nil {
return
}
size := cap(*buf)
slot := p.slot(size)
if slot == errSlot {
return
}
if size != int(p.pool[slot].defaultSize) {
return
}
p.pool[slot].pool.Put(buf)
}
type ByteBufferCtx struct {
TempBufferCtx
}
type ByteBufferPoolContainer struct {
bytes []*[]byte
*byteBufferPool
}
func (ctx ByteBufferCtx) New() interface{} {
return &ByteBufferPoolContainer{
byteBufferPool: bbPool,
}
}
func (ctx ByteBufferCtx) Reset(i interface{}) {
p := i.(*ByteBufferPoolContainer)
for _, buf := range p.bytes {
p.give(buf)
}
p.bytes = p.bytes[:0]
}
// GetBytesByContext returns []byte from byteBufferPool by context
func GetBytesByContext(context context.Context, size int) *[]byte {
p := PoolContext(context).Find(&ins, nil).(*ByteBufferPoolContainer)
buf := p.take(size)
p.bytes = append(p.bytes, buf)
return buf
}
// GetBytes returns *[]byte from byteBufferPool
func GetBytes(size int) *[]byte {
return bbPool.take(size)
}
// PutBytes Put *[]byte to byteBufferPool
func PutBytes(buf *[]byte) {
bbPool.give(buf)
}
package gxbytes
import (
"math/rand"
"testing"
"time"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
func intRange(min, max int) int {
return rand.Intn(max-min) + min
}
func intN(n int) int {
return rand.Intn(n) + 1
}
func TestByteBufferPoolSmallBytes(t *testing.T) {
pool := newByteBufferPool()
for i := 0; i < 1024; i++ {
size := intN(1 << minShift)
bp := pool.take(size)
if cap(*bp) != 1<<minShift {
t.Errorf("Expect get the %d bytes from pool, but got %d", size, cap(*bp))
}
// Puts the bytes to pool
pool.give(bp)
}
}
func TestBytesBufferPoolMediumBytes(t *testing.T) {
pool := newByteBufferPool()
for i := minShift; i < maxShift; i++ {
size := intRange((1<<uint(i))+1, 1<<uint(i+1))
bp := pool.take(size)
if cap(*bp) != 1<<uint(i+1) {
t.Errorf("Expect get the slab size (%d) from pool, but got %d", 1<<uint(i+1), cap(*bp))
}
//Puts the bytes to pool
pool.give(bp)
}
}
func TestBytesBufferPoolLargeBytes(t *testing.T) {
pool := newByteBufferPool()
for i := 0; i < 1024; i++ {
size := 1<<maxShift + intN(i+1)
bp := pool.take(size)
if cap(*bp) != size {
t.Errorf("Expect get the %d bytes from pool, but got %d", size, cap(*bp))
}
// Puts the bytes to pool
pool.give(bp)
}
}
func TestBytesSlot(t *testing.T) {
pool := newByteBufferPool()
if pool.slot(pool.minSize-1) != 0 {
t.Errorf("Expect get the 0 slot")
}
if pool.slot(pool.minSize) != 0 {
t.Errorf("Expect get the 0 slot")
}
if pool.slot(pool.minSize+1) != 1 {
t.Errorf("Expect get the 1 slot")
}
if pool.slot(pool.maxSize-1) != maxShift-minShift {
t.Errorf("Expect get the %d slot", maxShift-minShift)
}
if pool.slot(pool.maxSize) != maxShift-minShift {
t.Errorf("Expect get the %d slot", maxShift-minShift)
}
if pool.slot(pool.maxSize+1) != errSlot {
t.Errorf("Expect get errSlot")
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package gxbytes
import (
"errors"
"io"
"net"
"time"
"sync/atomic"
)
const MinRead = 1 << 9
const MaxRead = 1 << 17
const ResetOffMark = -1
const DefaultSize = 1 << 4
var nullByte []byte
var (
EOF = errors.New("EOF")
ErrTooLarge = errors.New("io buffer: too large")
ErrNegativeCount = errors.New("io buffer: negative count")
ErrInvalidWriteCount = errors.New("io buffer: invalid write count")
)
// IoBuffer
type IoBuffer struct {
buf []byte // contents: buf[off : len(buf)]
off int // read from &buf[off], write to &buf[len(buf)]
offMark int
count int32
eof bool
b *[]byte
}
func (b *IoBuffer) Read(p []byte) (n int, err error) {
if b.off >= len(b.buf) {
b.Reset()
if len(p) == 0 {
return
}
return 0, io.EOF
}
n = copy(p, b.buf[b.off:])
b.off += n
return
}
// Default connection arguments
const (
DefaultConnReadTimeout = 15 * time.Second
)
func (b *IoBuffer) ReadOnce(r io.Reader) (n int64, err error) {
var (
m int
e error
zeroTime time.Time
conn net.Conn
loop, ok, first = true, true, true
)
if conn, ok = r.(net.Conn); !ok {
loop = false
}
if b.off >= len(b.buf) {
b.Reset()
}
if b.off > 0 && len(b.buf)-b.off < 4*MinRead {
b.copy(0)
}
for {
if !first {
if free := cap(b.buf) - len(b.buf); free < MinRead {
// not enough space at end
if b.off+free < MinRead {
// not enough space using beginning of buffer;
// double buffer capacity
b.copy(MinRead)
} else {
b.copy(0)
}
}
}
l := cap(b.buf) - len(b.buf)
if conn != nil {
if first {
// TODO: support configure
conn.SetReadDeadline(time.Now().Add(DefaultConnReadTimeout))
} else {
conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond))
}
m, e = r.Read(b.buf[len(b.buf):cap(b.buf)])
// Reset read deadline
conn.SetReadDeadline(zeroTime)
} else {
m, e = r.Read(b.buf[len(b.buf):cap(b.buf)])
}
if m > 0 {
b.buf = b.buf[0 : len(b.buf)+m]
n += int64(m)
}
if e != nil {
if te, ok := err.(net.Error); ok && te.Timeout() && !first {
return n, nil
}
return n, e
}
if l != m {
loop = false
}
if n > MaxRead {
loop = false
}
if !loop {
break
}
first = false
}
return n, nil
}
func (b *IoBuffer) ReadFrom(r io.Reader) (n int64, err error) {
if b.off >= len(b.buf) {
b.Reset()
}
for {
if free := cap(b.buf) - len(b.buf); free < MinRead {
// not enough space at end
if b.off+free < MinRead {
// not enough space using beginning of buffer;
// double buffer capacity
b.copy(MinRead)
} else {
b.copy(0)
}
}
m, e := r.Read(b.buf[len(b.buf):cap(b.buf)])
b.buf = b.buf[0 : len(b.buf)+m]
n += int64(m)
if e == io.EOF {
break
}
if m == 0 {
break
}
if e != nil {
return n, e
}
}
return
}
func (b *IoBuffer) Write(p []byte) (n int, err error) {
m, ok := b.tryGrowByReslice(len(p))
if !ok {
m = b.grow(len(p))
}
return copy(b.buf[m:], p), nil
}
func (b *IoBuffer) WriteString(s string) (n int, err error) {
m, ok := b.tryGrowByReslice(len(s))
if !ok {
m = b.grow(len(s))
}
return copy(b.buf[m:], s), nil
}
func (b *IoBuffer) tryGrowByReslice(n int) (int, bool) {
if l := len(b.buf); l+n <= cap(b.buf) {
b.buf = b.buf[:l+n]
return l, true
}
return 0, false
}
func (b *IoBuffer) grow(n int) int {
m := b.Len()
// If buffer is empty, reset to recover space.
if m == 0 && b.off != 0 {
b.Reset()
}
// Try to grow by means of a reslice.
if i, ok := b.tryGrowByReslice(n); ok {
return i
}
if m+n <= cap(b.buf)/2 {
// We can slide things down instead of allocating a new
// slice. We only need m+n <= cap(b.buf) to slide, but
// we instead let capacity get twice as large so we
// don't spend all our time copying.
b.copy(0)
} else {
// Not enough space anywhere, we need to allocate.
b.copy(n)
}
// Restore b.off and len(b.buf).
b.off = 0
b.buf = b.buf[:m+n]
return m
}
func (b *IoBuffer) WriteTo(w io.Writer) (n int64, err error) {
for b.off < len(b.buf) {
nBytes := b.Len()
m, e := w.Write(b.buf[b.off:])
if m > nBytes {
panic(ErrInvalidWriteCount)
}
b.off += m
n += int64(m)
if e != nil {
return n, e
}
if m == 0 || m == nBytes {
return n, nil
}
}
return
}
func (b *IoBuffer) Append(data []byte) error {
if b.off >= len(b.buf) {
b.Reset()
}
dataLen := len(data)
if free := cap(b.buf) - len(b.buf); free < dataLen {
// not enough space at end
if b.off+free < dataLen {
// not enough space using beginning of buffer;
// double buffer capacity
b.copy(dataLen)
} else {
b.copy(0)
}
}
m := copy(b.buf[len(b.buf):len(b.buf)+dataLen], data)
b.buf = b.buf[0 : len(b.buf)+m]
return nil
}
func (b *IoBuffer) AppendByte(data byte) error {
return b.Append([]byte{data})
}
func (b *IoBuffer) Peek(n int) []byte {
if len(b.buf)-b.off < n {
return nil
}
return b.buf[b.off : b.off+n]
}
func (b *IoBuffer) Mark() {
b.offMark = b.off
}
func (b *IoBuffer) Restore() {
if b.offMark != ResetOffMark {
b.off = b.offMark
b.offMark = ResetOffMark
}
}
func (b *IoBuffer) Bytes() []byte {
return b.buf[b.off:]
}
func (b *IoBuffer) Cut(offset int) Buffer {
if b.off+offset > len(b.buf) {
return nil
}
buf := make([]byte, offset)
copy(buf, b.buf[b.off:b.off+offset])
b.off += offset
b.offMark = ResetOffMark
return &IoBuffer{
buf: buf,
off: 0,
}
}
func (b *IoBuffer) Drain(offset int) {
if b.off+offset > len(b.buf) {
return
}
b.off += offset
b.offMark = ResetOffMark
}
func (b *IoBuffer) String() string {
return string(b.buf[b.off:])
}
func (b *IoBuffer) Len() int {
return len(b.buf) - b.off
}
func (b *IoBuffer) Cap() int {
return cap(b.buf)
}
func (b *IoBuffer) Reset() {
b.buf = b.buf[:0]
b.off = 0
b.offMark = ResetOffMark
b.eof = false
}
func (b *IoBuffer) available() int {
return len(b.buf) - b.off
}
func (b *IoBuffer) Clone() Buffer {
buf := GetIoBuffer(b.Len())
buf.Write(b.Bytes())
buf.SetEOF(b.EOF())
return buf
}
func (b *IoBuffer) Free() {
b.Reset()
b.giveSlice()
}
func (b *IoBuffer) Alloc(size int) {
if b.buf != nil {
b.Free()
}
if size <= 0 {
size = DefaultSize
}
b.b = b.makeSlice(size)
b.buf = *b.b
b.buf = b.buf[:0]
}
func (b *IoBuffer) Count(count int32) int32 {
return atomic.AddInt32(&b.count, count)
}
func (b *IoBuffer) EOF() bool {
return b.eof
}
func (b *IoBuffer) SetEOF(eof bool) {
b.eof = eof
}
func (b *IoBuffer) copy(expand int) {
var newBuf []byte
var bufp *[]byte
if expand > 0 {
bufp = b.makeSlice(2*cap(b.buf) + expand)
newBuf = *bufp
copy(newBuf, b.buf[b.off:])
PutBytes(b.b)
b.b = bufp
} else {
newBuf = b.buf
copy(newBuf, b.buf[b.off:])
}
b.buf = newBuf[:len(b.buf)-b.off]
b.off = 0
}
func (b *IoBuffer) makeSlice(n int) *[]byte {
return GetBytes(n)
}
func (b *IoBuffer) giveSlice() {
if b.b != nil {
PutBytes(b.b)
b.b = nil
b.buf = nullByte
}
}
func NewIoBuffer(capacity int) Buffer {
buffer := &IoBuffer{
offMark: ResetOffMark,
count: 1,
}
if capacity <= 0 {
capacity = DefaultSize
}
buffer.b = GetBytes(capacity)
buffer.buf = (*buffer.b)[:0]
return buffer
}
func NewIoBufferString(s string) Buffer {
if s == "" {
return NewIoBuffer(0)
}
return &IoBuffer{
buf: []byte(s),
offMark: ResetOffMark,
count: 1,
}
}
func NewIoBufferBytes(bytes []byte) Buffer {
if bytes == nil {
return NewIoBuffer(0)
}
return &IoBuffer{
buf: bytes,
offMark: ResetOffMark,
count: 1,
}
}
func NewIoBufferEOF() Buffer {
buf := NewIoBuffer(0)
buf.SetEOF(true)
return buf
}
package gxbytes
import (
"bytes"
"io"
"math/rand"
"testing"
"time"
)
var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
func init() {
rand.Seed(time.Now().UnixNano())
}
func randString(n int) string {
b := make([]rune, n)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return string(b)
}
func randN(n int) int {
return rand.Intn(n) + 1
}
func TestNewIoBufferString(t *testing.T) {
for i := 0; i < 1024; i++ {
s := randString(i)
b := NewIoBufferString(s)
if b.String() != s {
t.Errorf("Expect %s but got %s", s, b.String())
}
}
}
func TestNewIoBufferBytes(t *testing.T) {
for i := 0; i < 1024; i++ {
s := randString(i)
b := NewIoBufferBytes([]byte(s))
if !bytes.Equal(b.Bytes(), []byte(s)) {
t.Errorf("Expect %s but got %s", s, b.String())
}
}
}
func TestIoBufferCopy(t *testing.T) {
bi := NewIoBuffer(1)
b := bi.(*IoBuffer)
n := randN(1024) + 1
b.copy(n)
if cap(b.buf) < 2*1+n {
t.Errorf("b.copy(%d) should expand to at least %d, but got %d", n, 2*1+n, cap(b.buf))
}
}
func TestIoBufferWrite(t *testing.T) {
b := NewIoBuffer(1)
n := randN(64)
for i := 0; i < n; i++ {
s := randString(i + 16)
n, err := b.Write([]byte(s))
if err != nil {
t.Fatal(err)
}
if n != len(s) {
t.Errorf("Expect write %d bytes, but got %d", len(s), n)
}
if !bytes.Equal(b.Peek(len(s)), []byte(s)) {
t.Errorf("Expect peek %s but got %s", s, string(b.Peek(len(s))))
}
b.Drain(len(s))
}
input := make([]byte, 0, 1024)
writer := bytes.NewBuffer(nil)
for i := 0; i < n; i++ {
s := randString(i + 16)
n, err := b.Write([]byte(s))
if err != nil {
t.Fatal(err)
}
if n != len(s) {
t.Errorf("Expect write %d bytes, but got %d", len(s), n)
}
input = append(input, []byte(s)...)
}
_, err := b.WriteTo(writer)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(writer.Bytes(), input) {
t.Errorf("Expect %s but got %s", input, writer.String())
}
}
func TestIoBufferAppend(t *testing.T) {
bi := NewIoBuffer(1)
b := bi.(*IoBuffer)
n := randN(64)
for i := 0; i < n; i++ {
s := randString(i + 16)
err := b.Append([]byte(s))
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(b.Peek(len(s)), []byte(s)) {
t.Errorf("Expect peek %s but got %s", s, string(b.Peek(len(s))))
}
b.Drain(len(s))
}
}
func TestIoBufferAppendByte(t *testing.T) {
bi := NewIoBuffer(1)
b := bi.(*IoBuffer)
input := make([]byte, 0, 1024)
n := randN(1024)
for i := 0; i < n; i++ {
err := b.AppendByte(byte(i))
if err != nil {
t.Fatal(err)
}
input = append(input, byte(i))
}
if b.Len() != n {
t.Errorf("Expect %d bytes, but got %d", n, b.Len())
}
if !bytes.Equal(b.Peek(n), input) {
t.Errorf("Expect %x, but got %x", input, b.Peek(n))
}
}
func TestIoBufferRead(t *testing.T) {
b := NewIoBuffer(0)
data := make([]byte, 1)
n, err := b.Read(data)
if err != io.EOF {
t.Errorf("Expect io.EOF but got %s", err)
}
if n != 0 {
t.Errorf("Expect 0 bytes but got %d", n)
}
n, err = b.Read(nil)
if n != 0 || err != nil {
t.Errorf("Expect (0, nil) but got (%d, %s)", n, err)
}
b = NewIoBuffer(1)
s := randString(1024)
reader := bytes.NewReader([]byte(s))
nr, err := b.ReadFrom(reader)
if err != nil {
t.Errorf("Expect nil but got %s", err)
}
if nr != int64(len(s)) {
t.Errorf("Expect %d bytes but got %d", len(s), nr)
}
if !bytes.Equal(b.Peek(len(s)), []byte(s)) {
t.Errorf("Expect peek %s but got %s", s, string(b.Peek(len(s))))
}
}
func TestIoBufferReadOnce(t *testing.T) {
b := NewIoBuffer(1)
s := randString(1024)
input := make([]byte, 0, 1024)
reader := bytes.NewReader([]byte(s))
for {
n, err := b.ReadOnce(reader)
if err != nil {
if err == io.EOF {
break
}
t.Fatal(err)
}
if n != 1<<minShift {
t.Errorf("Expect %d bytes but got %d", len(s), n)
}
input = append(input, b.Peek(int(n))...)
b.Drain(int(n))
}
if !bytes.Equal(input, []byte(s)) {
t.Errorf("Expect got %s but got %s", s, string(input))
}
}
func TestIoBufferClone(t *testing.T) {
for i := 16; i < 1024+16; i++ {
s := randString(i)
buffer := NewIoBufferString(s)
nb := buffer.Clone()
if nb.String() != s {
t.Errorf("Clone() expect %s but got %s", s, nb.String())
}
}
}
func TestIoBufferCut(t *testing.T) {
for i := 16; i < 1024+16; i++ {
s := randString(i)
bi := NewIoBufferString(s)
b := bi.(*IoBuffer)
offset := randN(i) - 1
nb := b.Cut(offset)
if nb.String() != s[:offset] {
t.Errorf("Cut(%d) expect %s but got %s", offset, s[:offset], nb.String())
}
}
}
func TestIoBufferAllocAndFree(t *testing.T) {
b := NewIoBuffer(0)
for i := 0; i < 1024; i++ {
b.Alloc(i)
if b.Cap() < i {
t.Errorf("Expect alloc at least %d bytes but allocated %d", i, b.Cap())
}
}
b.Reset()
for i := 0; i < 1024; i++ {
b.Alloc(i)
if b.Cap() < i {
t.Errorf("Expect alloc at least %d bytes but allocated %d", i, b.Cap())
}
b.Free()
if b.Cap() != 0 {
t.Errorf("Expect free to 0 bytes but got %d", b.Cap())
}
}
}
func TestIoBufferZero(t *testing.T) {
writer := bytes.NewBuffer(nil)
b := NewIoBuffer(0)
_, err := b.WriteTo(writer)
if err != nil {
t.Fatal(err)
}
if len(writer.Bytes()) != 0 {
t.Errorf("Expect 0, but got %s", writer.String())
}
b = NewIoBufferBytes(nil)
_, err = b.WriteTo(writer)
if err != nil {
t.Fatal(err)
}
if len(writer.Bytes()) != 0 {
t.Errorf("Expect 0, but got %s", writer.String())
}
b = NewIoBufferString("")
_, err = b.WriteTo(writer)
if err != nil {
t.Fatal(err)
}
if len(writer.Bytes()) != 0 {
t.Errorf("Expect 0, but got %s", writer.String())
}
b = NewIoBufferEOF()
_, err = b.WriteTo(writer)
if err != nil {
t.Fatal(err)
}
if len(writer.Bytes()) != 0 {
t.Errorf("Expect 0, but got %s", writer.String())
}
b = NewIoBuffer(0)
if b.String() != "" {
t.Errorf("Expect \"\", but got %s", string(b.String()))
}
if len(b.Bytes()) != 0 {
t.Errorf("Expect 0, but got %d", len(b.Bytes()))
}
if len(b.Peek(0)) != 0 {
t.Errorf("Expect 0, but got %d", len(b.Bytes()))
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package gxbytes
import (
"errors"
"sync"
)
var ibPool IoBufferPool
// IoBufferPool is Iobuffer Pool
type IoBufferPool struct {
pool sync.Pool
}
// take returns IoBuffer from IoBufferPool
func (p *IoBufferPool) take(size int) (buf Buffer) {
v := p.pool.Get()
if v == nil {
buf = NewIoBuffer(size)
} else {
buf = v.(Buffer)
buf.Alloc(size)
buf.Count(1)
}
return
}
// give returns IoBuffer to IoBufferPool
func (p *IoBufferPool) give(buf Buffer) {
buf.Free()
p.pool.Put(buf)
}
// GetIoBuffer returns IoBuffer from pool
func GetIoBuffer(size int) Buffer {
return ibPool.take(size)
}
// PutIoBuffer returns IoBuffer to pool
func PutIoBuffer(buf Buffer) error {
count := buf.Count(-1)
if count > 0 {
return nil
} else if count < 0 {
return errors.New("PutIoBuffer duplicate")
}
ibPool.give(buf)
return nil
}
package gxbytes
import (
"testing"
)
func TestIoBufferPoolWithCount(t *testing.T) {
buf := GetIoBuffer(0)
bytes := []byte{0x00, 0x01, 0x02, 0x03, 0x04}
buf.Write(bytes)
if buf.Len() != len(bytes) {
t.Error("iobuffer len not match write bytes' size")
}
// Add a count, need put twice to free buffer
buf.Count(1)
PutIoBuffer(buf)
if buf.Len() != len(bytes) {
t.Error("iobuffer expected put ignore")
}
PutIoBuffer(buf)
if buf.Len() != 0 {
t.Error("iobuffer expected put success")
}
}
func TestIoBufferPooPutduplicate(t *testing.T) {
buf := GetIoBuffer(0)
err := PutIoBuffer(buf)
if err != nil {
t.Errorf("iobuffer put error:%v", err)
}
err = PutIoBuffer(buf)
if err == nil {
t.Errorf("iobuffer should be error: Put IoBuffer duplicate")
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package gxbytes
import (
"context"
"io"
)
// BufferPoolCtx is the bufferpool's context
type BufferPoolCtx interface {
// Index returns the bufferpool's Index
Index() int
// New returns the buffer
New() interface{}
// Reset resets the buffer
Reset(interface{})
}
type Buffer interface {
// Read reads the next len(p) bytes from the buffer or until the buffer
// is drained. The return value n is the number of bytes read. If the
// buffer has no data to return, err is io.EOF (unless len(p) is zero);
// otherwise it is nil.
Read(p []byte) (n int, err error)
// ReadOnce make a one-shot read and appends it to the buffer, growing
// the buffer as needed. The return value n is the number of bytes read. Any
// error except io.EOF encountered during the read is also returned. If the
// buffer becomes too large, ReadFrom will panic with ErrTooLarge.
ReadOnce(r io.Reader) (n int64, err error)
// ReadFrom reads data from r until EOF and appends it to the buffer, growing
// the buffer as needed. The return value n is the number of bytes read. Any
// error except io.EOF encountered during the read is also returned. If the
// buffer becomes too large, ReadFrom will panic with ErrTooLarge.
ReadFrom(r io.Reader) (n int64, err error)
// Write appends the contents of p to the buffer, growing the buffer as
// needed. The return value n is the length of p; err is always nil. If the
// buffer becomes too large, Write will panic with ErrTooLarge.
Write(p []byte) (n int, err error)
// WriteString appends the string to the buffer, growing the buffer as
// needed. The return value n is the length of s; err is always nil. If the
// buffer becomes too large, Write will panic with ErrTooLarge.
WriteString(s string) (n int, err error)
// WriteTo writes data to w until the buffer is drained or an error occurs.
// The return value n is the number of bytes written; it always fits into an
// int, but it is int64 to match the io.WriterTo interface. Any error
// encountered during the write is also returned.
WriteTo(w io.Writer) (n int64, err error)
// Peek returns n bytes from buffer, without draining any buffered data.
// If n > readable buffer, nil will be returned.
// It can be used in codec to check first-n-bytes magic bytes
// Note: do not change content in return bytes, use write instead
Peek(n int) []byte
// Bytes returns all bytes from buffer, without draining any buffered data.
// It can be used to get fixed-length content, such as headers, body.
// Note: do not change content in return bytes, use write instead
Bytes() []byte
// Drain drains a offset length of bytes in buffer.
// It can be used with Bytes(), after consuming a fixed-length of data
Drain(offset int)
// Len returns the number of bytes of the unread portion of the buffer;
// b.Len() == len(b.Bytes()).
Len() int
// Cap returns the capacity of the buffer's underlying byte slice, that is, the
// total space allocated for the buffer's data.
Cap() int
// Reset resets the buffer to be empty,
// but it retains the underlying storage for use by future writes.
Reset()
// Clone makes a copy of IoBuffer struct
Clone() Buffer
// String returns the contents of the unread portion of the buffer
// as a string. If the Buffer is a nil pointer, it returns "<nil>".
String() string
// Alloc alloc bytes from BytePoolBuffer
Alloc(int)
// Free free bytes to BytePoolBuffer
Free()
// Count sets and returns reference count
Count(int32) int32
// EOF returns whether Io is EOF on the connection
EOF() bool
//SetEOF sets the IoBuffer EOF
SetEOF(eof bool)
}
func Get(ctx context.Context, key ContextKey) interface{} {
if mosnCtx, ok := ctx.(*valueCtx); ok {
return mosnCtx.builtin[key]
}
return ctx.Value(key)
}
// WithValue add the given key-value pair into the existed value context, or create a new value context contains the pair.
// This Function should not be used along the official context.WithValue !!
// The following context topology will leads to existed pair {'foo':'bar'} NOT FOUND, cause recursive lookup for
// key-type=ContextKey is not supported by mosn.valueCtx.
//
// topology: context.Background -> mosn.valueCtx{'foo':'bar'} -> context.valueCtx -> mosn.valueCtx{'hmm':'haa'}
func WithValue(parent context.Context, key ContextKey, value interface{}) context.Context {
if mosnCtx, ok := parent.(*valueCtx); ok {
mosnCtx.builtin[key] = value
return mosnCtx
}
// create new valueCtx
mosnCtx := &valueCtx{Context: parent}
mosnCtx.builtin[key] = value
return mosnCtx
}
// Clone copy the origin mosn value context(if it is), and return new one
func Clone(parent context.Context) context.Context {
if mosnCtx, ok := parent.(*valueCtx); ok {
clone := &valueCtx{Context: mosnCtx}
// array copy assign
clone.builtin = mosnCtx.builtin
return clone
}
return parent
}
type valueCtx struct {
context.Context
builtin [ContextKeyEnd]interface{}
// TODO
//variables map[string]Variable
}
func (c *valueCtx) Value(key interface{}) interface{} {
if contextKey, ok := key.(ContextKey); ok {
return c.builtin[contextKey]
}
return c.Context.Value(key)
}
......@@ -2,10 +2,6 @@
// All rights reserved. Use of this source code is
// governed by Apache License 2.0.
// http://blog.csdn.net/siddontang/article/details/23541587
// the difference btw reflect.StringHeader and reflect.SliceHeader is that there is a 'cap' in reflect.SliceHeader.
//
// The following codes is from a vitness project.
package gxstrings
import (
......@@ -13,27 +9,6 @@ import (
"unsafe"
)
// String converts slice to a string object just by convert its pointer type
// on the same memory heap without copying.
func String(b []byte) (s string) {
pbytes := (*reflect.SliceHeader)(unsafe.Pointer(&b))
pstring := (*reflect.StringHeader)(unsafe.Pointer(&s))
pstring.Data = pbytes.Data
pstring.Len = pbytes.Len
return
}
// String converts string to a slice object just by convert its pointer type
// on the same memory heap without copying.
func Slice(s string) (b []byte) {
pbytes := (*reflect.SliceHeader)(unsafe.Pointer(&b))
pstring := (*reflect.StringHeader)(unsafe.Pointer(&s))
pbytes.Data = pstring.Data
pbytes.Len = pstring.Len
pbytes.Cap = pstring.Len
return
}
// returns &s[0], which is not allowed in go
func StringPointer(s string) unsafe.Pointer {
p := (*reflect.StringHeader)(unsafe.Pointer(&s))
......
package gxstrings
import (
"reflect"
"testing"
)
// go test -v slice_test.go slice.go
func TestString(t *testing.T) {
b := []byte("hello world")
// After converting slice to string, the string value will change
// when the slice got new value.
a := String(b)
b[0] = 'a'
if !reflect.DeepEqual("aello world", a) {
t.Errorf("a:%+v != `aello world`", a)
}
}
// BenchmarkString0-8 2000000000 0.27 ns/op
func BenchmarkString0(b *testing.B) {
b.StartTimer()
for i := 0; i < 100000000; i++ {
bs := []byte("hello world")
_ = string(bs)
bs[0] = 'a'
}
b.StopTimer()
bs := []byte("hello world")
a := string(bs)
bs[0] = 'a'
if reflect.DeepEqual("aello world", a) {
b.Errorf("a:%+v != `aello world`", a)
}
}
// BenchmarkString-8 1 1722255064 ns/op
func BenchmarkString(b *testing.B) {
b.StartTimer()
for i := 0; i < 100000000; i++ {
bs := []byte("hello world")
_ = String(bs)
bs[0] = 'a'
}
b.StopTimer()
bs := []byte("hello world")
a := String(bs)
bs[0] = 'a'
if !reflect.DeepEqual("aello world", a) {
b.Errorf("a:%+v != `aello world`", a)
}
}
func TestSlice(t *testing.T) {
a := string([]byte("hello world"))
b := Slice(a)
b = append(b, "hello world"...)
println(String(b))
if !reflect.DeepEqual([]byte("hello worldhello world"), b) {
t.Errorf("a:%+v != `hello worldhello world`", string(b))
}
}
// BenchmarkSlice0-8 1 1187713598 ns/op
func BenchmarkSlice0(b *testing.B) {
for i := 0; i < 100000000; i++ {
a := string([]byte("hello world"))
bs := ([]byte)(a)
_ = append(bs, "hello world"...)
}
}
// BenchmarkSlice-8 1 4895001383 ns/op
func BenchmarkSlice(b *testing.B) {
for i := 0; i < 100000000; i++ {
a := string([]byte("hello world"))
bs := Slice(a)
_ = append(bs, "hello world"...)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment