Commit ee6d7d0e authored by xujianhai666's avatar xujianhai666

improve code

parent 010739b7
...@@ -38,6 +38,7 @@ var ( ...@@ -38,6 +38,7 @@ var (
ErrEmptyQueue = errors.New(`queue: empty queue`) ErrEmptyQueue = errors.New(`queue: empty queue`)
) )
// waiters is the struct responsible for store sema(waiter better) of queue.
type waiters []*sema type waiters []*sema
func (w *waiters) get() *sema { func (w *waiters) get() *sema {
...@@ -60,38 +61,34 @@ func (w *waiters) remove(sema *sema) { ...@@ -60,38 +61,34 @@ func (w *waiters) remove(sema *sema) {
if len(*w) == 0 { if len(*w) == 0 {
return return
} }
ws := *w
for i := range *w { for i := range *w {
if ws[i] == sema { if (*w)[i] == sema {
*w = append(ws[:i], ws[i+1:]...) *w = append((*w)[:i], (*w)[i+1:]...)
return return
} }
} }
} }
// items is the struct responsible for store queue data
type items []interface{} type items []interface{}
func (items *items) get(number int64) []interface{} { func (items *items) get(number int64) []interface{} {
returnItems := make([]interface{}, 0, number) index := int(number)
index := int64(0) if int(number) > len(*items) {
for i := int64(0); i < number; i++ { index = len(*items)
if i >= int64(len(*items)) {
break
} }
returnItems = append(returnItems, (*items)[i]) returnItems := make([]interface{}, 0, index)
(*items)[i] = nil returnItems = returnItems[:index]
index++
} copy(returnItems[:index], (*items))
*items = (*items)[index:] *items = (*items)[index:]
return returnItems return returnItems
} }
func (items *items) peek() (interface{}, bool) { func (items *items) peek() (interface{}, bool) {
length := len(*items) if len(*items) == 0 {
if length == 0 {
return nil, false return nil, false
} }
...@@ -123,6 +120,8 @@ func (items *items) getUntil(checker func(item interface{}) bool) []interface{} ...@@ -123,6 +120,8 @@ func (items *items) getUntil(checker func(item interface{}) bool) []interface{}
return returnItems return returnItems
} }
// sema is the struct responsible for tracking the state
// of waiter. blocking poll if no data, notify if new data comes in.
type sema struct { type sema struct {
ready chan bool ready chan bool
response *sync.WaitGroup response *sync.WaitGroup
...@@ -144,6 +143,13 @@ type Queue struct { ...@@ -144,6 +143,13 @@ type Queue struct {
disposed bool disposed bool
} }
// New is a constructor for a new threadsafe queue.
func New(hint int64) *Queue {
return &Queue{
items: make([]interface{}, 0, hint),
}
}
// Put will add the specified items to the queue. // Put will add the specified items to the queue.
func (q *Queue) Put(items ...interface{}) error { func (q *Queue) Put(items ...interface{}) error {
if len(items) == 0 { if len(items) == 0 {
...@@ -335,13 +341,6 @@ func (q *Queue) Dispose() []interface{} { ...@@ -335,13 +341,6 @@ func (q *Queue) Dispose() []interface{} {
return disposedItems return disposedItems
} }
// New is a constructor for a new threadsafe queue.
func New(hint int64) *Queue {
return &Queue{
items: make([]interface{}, 0, hint),
}
}
// ExecuteInParallel will (in parallel) call the provided function // ExecuteInParallel will (in parallel) call the provided function
// with each item in the queue until the queue is exhausted. When the queue // with each item in the queue until the queue is exhausted. When the queue
// is exhausted execution is complete and all goroutines will be killed. // is exhausted execution is complete and all goroutines will be killed.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment