Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in
Toggle navigation
G
gostnops
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
wei.xuan
gostnops
Commits
50735fff
Unverified
Commit
50735fff
authored
Nov 13, 2020
by
Xin.Zh
Committed by
GitHub
Nov 13, 2020
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #33 from wenxuwan/master
add new poolqueue
parents
651b2933
8a39cbcb
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
331 additions
and
0 deletions
+331
-0
poolqueue.go
container/queue/poolqueue.go
+211
-0
poolqueue_test.go
container/queue/poolqueue_test.go
+120
-0
No files found.
container/queue/poolqueue.go
0 → 100644
View file @
50735fff
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
//refs:https://github.com/golang/go/blob/2333c6299f340a5f76a73a4fec6db23ffa388e97/src/sync/poolqueue.go
package
gxqueue
import
(
"errors"
"sync/atomic"
"unsafe"
)
// poolDequeue is a lock-free fixed-size single-producer,
// multi-consumer queue. The single producer can both push and pop
// from the head, and consumers can pop from the tail.
//
// It has the added feature that it nils out unused slots to avoid
// unnecessary retention of objects. This is important for sync.Pool,
// but not typically a property considered in the literature.
type
poolDequeue
struct
{
// headTail packs together a 32-bit head index and a 32-bit
// tail index. Both are indexes into vals modulo len(vals)-1.
//
// tail = index of oldest data in queue
// head = index of next slot to fill
//
// Slots in the range [tail, head) are owned by consumers.
// A consumer continues to own a slot outside this range until
// it nils the slot, at which point ownership passes to the
// producer.
//
// The head index is stored in the most-significant bits so
// that we can atomically add to it and the overflow is
// harmless.
headTail
uint64
// vals is a ring buffer of interface{} values stored in this
// dequeue. The size of this must be a power of 2.
//
// vals[i].typ is nil if the slot is empty and non-nil
// otherwise. A slot is still in use until *both* the tail
// index has moved beyond it and typ has been set to nil. This
// is set to nil atomically by the consumer and read
// atomically by the producer.
vals
[]
eface
}
type
eface
struct
{
typ
,
val
unsafe
.
Pointer
}
const
dequeueBits
=
32
// dequeueLimit is the maximum size of a poolDequeue.
//
// This must be at most (1<<dequeueBits)/2 because detecting fullness
// depends on wrapping around the ring buffer without wrapping around
// the index. We divide by 4 so this fits in an int on 32-bit.
const
dequeueLimit
=
(
1
<<
dequeueBits
)
/
4
// dequeueNil is used in poolDeqeue to represent interface{}(nil).
// Since we use nil to represent empty slots, we need a sentinel value
// to represent nil.
type
dequeueNil
*
struct
{}
func
(
d
*
poolDequeue
)
unpack
(
ptrs
uint64
)
(
head
,
tail
uint32
)
{
const
mask
=
1
<<
dequeueBits
-
1
head
=
uint32
((
ptrs
>>
dequeueBits
)
&
mask
)
tail
=
uint32
(
ptrs
&
mask
)
return
}
func
(
d
*
poolDequeue
)
pack
(
head
,
tail
uint32
)
uint64
{
const
mask
=
1
<<
dequeueBits
-
1
return
(
uint64
(
head
)
<<
dequeueBits
)
|
uint64
(
tail
&
mask
)
}
// PushHead adds val at the head of the queue. It returns false if the
// queue is full. It must only be called by a single producer.
func
(
d
*
poolDequeue
)
PushHead
(
val
interface
{})
bool
{
ptrs
:=
atomic
.
LoadUint64
(
&
d
.
headTail
)
head
,
tail
:=
d
.
unpack
(
ptrs
)
if
(
tail
+
uint32
(
len
(
d
.
vals
)))
&
(
1
<<
dequeueBits
-
1
)
==
head
{
// Queue is full.
return
false
}
slot
:=
&
d
.
vals
[
head
&
uint32
(
len
(
d
.
vals
)
-
1
)]
// Check if the head slot has been released by popTail.
typ
:=
atomic
.
LoadPointer
(
&
slot
.
typ
)
if
typ
!=
nil
{
// Another goroutine is still cleaning up the tail, so
// the queue is actually still full.
return
false
}
// The head slot is free, so we own it.
if
val
==
nil
{
val
=
dequeueNil
(
nil
)
}
*
(
*
interface
{})(
unsafe
.
Pointer
(
slot
))
=
val
// Increment head. This passes ownership of slot to popTail
// and acts as a store barrier for writing the slot.
atomic
.
AddUint64
(
&
d
.
headTail
,
1
<<
dequeueBits
)
return
true
}
// PopHead removes and returns the element at the head of the queue.
// It returns false if the queue is empty. It must only be called by a
// single producer.
func
(
d
*
poolDequeue
)
PopHead
()
(
interface
{},
bool
)
{
var
slot
*
eface
for
{
ptrs
:=
atomic
.
LoadUint64
(
&
d
.
headTail
)
head
,
tail
:=
d
.
unpack
(
ptrs
)
if
tail
==
head
{
// Queue is empty.
return
nil
,
false
}
// Confirm tail and decrement head. We do this before
// reading the value to take back ownership of this
// slot.
head
--
ptrs2
:=
d
.
pack
(
head
,
tail
)
if
atomic
.
CompareAndSwapUint64
(
&
d
.
headTail
,
ptrs
,
ptrs2
)
{
// We successfully took back slot.
slot
=
&
d
.
vals
[
head
&
uint32
(
len
(
d
.
vals
)
-
1
)]
break
}
}
val
:=
*
(
*
interface
{})(
unsafe
.
Pointer
(
slot
))
if
val
==
dequeueNil
(
nil
)
{
val
=
nil
}
// Zero the slot. Unlike popTail, this isn't racing with
// pushHead, so we don't need to be careful here.
*
slot
=
eface
{}
return
val
,
true
}
// PopTail removes and returns the element at the tail of the queue.
// It returns false if the queue is empty. It may be called by any
// number of consumers.
func
(
d
*
poolDequeue
)
PopTail
()
(
interface
{},
bool
)
{
var
slot
*
eface
for
{
ptrs
:=
atomic
.
LoadUint64
(
&
d
.
headTail
)
head
,
tail
:=
d
.
unpack
(
ptrs
)
if
tail
==
head
{
// Queue is empty.
return
nil
,
false
}
// Confirm head and tail (for our speculative check
// above) and increment tail. If this succeeds, then
// we own the slot at tail.
ptrs2
:=
d
.
pack
(
head
,
tail
+
1
)
if
atomic
.
CompareAndSwapUint64
(
&
d
.
headTail
,
ptrs
,
ptrs2
)
{
// Success.
slot
=
&
d
.
vals
[
tail
&
uint32
(
len
(
d
.
vals
)
-
1
)]
break
}
}
// We now own slot.
val
:=
*
(
*
interface
{})(
unsafe
.
Pointer
(
slot
))
if
val
==
dequeueNil
(
nil
)
{
val
=
nil
}
// Tell pushHead that we're done with this slot. Zeroing the
// slot is also important so we don't leave behind references
// that could keep this object live longer than necessary.
//
// We write to val first and then publish that we're done with
// this slot by atomically writing to typ.
slot
.
val
=
nil
atomic
.
StorePointer
(
&
slot
.
typ
,
nil
)
// At this point pushHead owns the slot.
return
val
,
true
}
// NewPoolDequeue new a poolDequeue instance.
func
NewPoolDequeue
(
n
int
)
(
*
poolDequeue
,
error
)
{
if
n
&
(
n
-
1
)
!=
0
{
return
nil
,
errors
.
New
(
"the size of pool must be a power of 2"
)
}
d
:=
&
poolDequeue
{
vals
:
make
([]
eface
,
n
),
}
return
d
,
nil
}
container/queue/poolqueue_test.go
0 → 100644
View file @
50735fff
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
//refs:https://github.com/golang/go/blob/2333c6299f340a5f76a73a4fec6db23ffa388e97/src/sync/pool_test.go
package
gxqueue
import
(
"runtime"
"sync"
"sync/atomic"
"testing"
)
import
(
"github.com/stretchr/testify/assert"
)
func
TestCreatePoolDequeue
(
t
*
testing
.
T
)
{
_
,
err
:=
NewPoolDequeue
(
15
)
assert
.
EqualError
(
t
,
err
,
"the size of pool must be a power of 2"
)
_
,
err
=
NewPoolDequeue
(
18
)
assert
.
EqualError
(
t
,
err
,
"the size of pool must be a power of 2"
)
_
,
err
=
NewPoolDequeue
(
24
)
assert
.
EqualError
(
t
,
err
,
"the size of pool must be a power of 2"
)
_
,
err
=
NewPoolDequeue
(
8
)
assert
.
NoError
(
t
,
err
)
}
func
TestPoolDequeue
(
t
*
testing
.
T
)
{
const
P
=
10
var
N
int
=
2e6
d
,
err
:=
NewPoolDequeue
(
16
)
if
err
!=
nil
{
t
.
Errorf
(
"create poolDequeue fail"
)
}
if
testing
.
Short
()
{
N
=
1e3
}
have
:=
make
([]
int32
,
N
)
var
stop
int32
var
wg
sync
.
WaitGroup
record
:=
func
(
val
int
)
{
atomic
.
AddInt32
(
&
have
[
val
],
1
)
if
val
==
N
-
1
{
atomic
.
StoreInt32
(
&
stop
,
1
)
}
}
// Start P-1 consumers.
for
i
:=
1
;
i
<
P
;
i
++
{
wg
.
Add
(
1
)
go
func
()
{
fail
:=
0
for
atomic
.
LoadInt32
(
&
stop
)
==
0
{
val
,
ok
:=
d
.
PopTail
()
if
ok
{
fail
=
0
record
(
val
.
(
int
))
}
else
{
// Speed up the test by
// allowing the pusher to run.
if
fail
++
;
fail
%
100
==
0
{
runtime
.
Gosched
()
}
}
}
wg
.
Done
()
}()
}
// Start 1 producer.
nPopHead
:=
0
wg
.
Add
(
1
)
go
func
()
{
for
j
:=
0
;
j
<
N
;
j
++
{
for
!
d
.
PushHead
(
j
)
{
// Allow a popper to run.
runtime
.
Gosched
()
}
if
j
%
10
==
0
{
val
,
ok
:=
d
.
PopHead
()
if
ok
{
nPopHead
++
record
(
val
.
(
int
))
}
}
}
wg
.
Done
()
}()
wg
.
Wait
()
// Check results.
for
i
,
count
:=
range
have
{
if
count
!=
1
{
t
.
Errorf
(
"expected have[%d] = 1, got %d"
,
i
,
count
)
}
}
// Check that at least some PopHeads succeeded. We skip this
// check in short mode because it's common enough that the
// queue will stay nearly empty all the time and a PopTail
// will happen during the window between every PushHead and
// PopHead.
if
!
testing
.
Short
()
&&
nPopHead
==
0
{
t
.
Errorf
(
"popHead never succeeded"
)
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment