Skip to content

Commit 07e9ea0

Browse files
committed
add priority queue
1 parent d6512e2 commit 07e9ea0

File tree

5 files changed

+595
-0
lines changed

5 files changed

+595
-0
lines changed

examples/pq/main.go

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
6+
pq "github.com/sunvim/utils/priorityqueue"
7+
)
8+
9+
func main() {
10+
q := pq.NewPriorityQueue(5, false)
11+
12+
q.Put(&Node{Number: 5, Name: "hello"})
13+
q.Put(&Node{Number: 3, Name: "world"})
14+
q.Put(&Node{Number: 4, Name: "sky"})
15+
q.Put(&Node{Number: 1, Name: "mobus"})
16+
q.Put(&Node{Number: 2, Name: "sunqc"})
17+
18+
for i := 0; i < 5; i++ {
19+
v, _ := q.Get(i)
20+
fmt.Printf("item: %+v \n", v)
21+
}
22+
23+
}
24+
25+
type Node struct {
26+
Number uint64
27+
Name string
28+
}
29+
30+
func (n *Node) Compare(other pq.Item) int {
31+
o := other.(*Node)
32+
if o.Number > n.Number {
33+
return 1
34+
} else if o.Number == n.Number {
35+
return 0
36+
}
37+
return -1
38+
}

priorityqueue/error.go

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package priorityqueue
2+
3+
import "errors"
4+
5+
var (
6+
// ErrDisposed is returned when an operation is performed on a disposed
7+
// queue.
8+
ErrDisposed = errors.New(`queue: disposed`)
9+
10+
// ErrTimeout is returned when an applicable queue operation times out.
11+
ErrTimeout = errors.New(`queue: poll timed out`)
12+
13+
// ErrEmptyQueue is returned when an non-applicable queue operation was called
14+
// due to the queue's empty item state
15+
ErrEmptyQueue = errors.New(`queue: empty queue`)
16+
)

priorityqueue/pq.go

+242
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
package priorityqueue
2+
3+
import "sync"
4+
5+
// Item is an item that can be added to the priority queue.
6+
type Item interface {
7+
// Compare returns a bool that can be used to determine
8+
// ordering in the priority queue. Assuming the queue
9+
// is in ascending order, this should return > logic.
10+
// Return 1 to indicate this object is greater than the
11+
// the other logic, 0 to indicate equality, and -1 to indicate
12+
// less than other.
13+
Compare(other Item) int
14+
}
15+
16+
type priorityItems []Item
17+
18+
func (items *priorityItems) swap(i, j int) {
19+
(*items)[i], (*items)[j] = (*items)[j], (*items)[i]
20+
}
21+
22+
func (items *priorityItems) pop() Item {
23+
size := len(*items)
24+
25+
// Move last leaf to root, and 'pop' the last item.
26+
items.swap(size-1, 0)
27+
item := (*items)[size-1] // Item to return.
28+
(*items)[size-1], *items = nil, (*items)[:size-1]
29+
30+
// 'Bubble down' to restore heap property.
31+
index := 0
32+
childL, childR := 2*index+1, 2*index+2
33+
for len(*items) > childL {
34+
child := childL
35+
if len(*items) > childR && (*items)[childR].Compare((*items)[childL]) < 0 {
36+
child = childR
37+
}
38+
39+
if (*items)[child].Compare((*items)[index]) < 0 {
40+
items.swap(index, child)
41+
42+
index = child
43+
childL, childR = 2*index+1, 2*index+2
44+
} else {
45+
break
46+
}
47+
}
48+
49+
return item
50+
}
51+
52+
func (items *priorityItems) get(number int) []Item {
53+
returnItems := make([]Item, 0, number)
54+
for i := 0; i < number; i++ {
55+
if len(*items) == 0 {
56+
break
57+
}
58+
59+
returnItems = append(returnItems, items.pop())
60+
}
61+
62+
return returnItems
63+
}
64+
65+
func (items *priorityItems) push(item Item) {
66+
// Stick the item as the end of the last level.
67+
*items = append(*items, item)
68+
69+
// 'Bubble up' to restore heap property.
70+
index := len(*items) - 1
71+
parent := int((index - 1) / 2)
72+
for parent >= 0 && (*items)[parent].Compare(item) > 0 {
73+
items.swap(index, parent)
74+
75+
index = parent
76+
parent = int((index - 1) / 2)
77+
}
78+
}
79+
80+
// PriorityQueue is similar to queue except that it takes
81+
// items that implement the Item interface and adds them
82+
// to the queue in priority order.
83+
type PriorityQueue struct {
84+
waiters waiters
85+
items priorityItems
86+
itemMap map[Item]struct{}
87+
lock sync.Mutex
88+
disposeLock sync.Mutex
89+
disposed bool
90+
allowDuplicates bool
91+
}
92+
93+
// Put adds items to the queue.
94+
func (pq *PriorityQueue) Put(items ...Item) error {
95+
if len(items) == 0 {
96+
return nil
97+
}
98+
99+
pq.lock.Lock()
100+
defer pq.lock.Unlock()
101+
102+
if pq.disposed {
103+
return ErrDisposed
104+
}
105+
106+
for _, item := range items {
107+
if pq.allowDuplicates {
108+
pq.items.push(item)
109+
} else if _, ok := pq.itemMap[item]; !ok {
110+
pq.itemMap[item] = struct{}{}
111+
pq.items.push(item)
112+
}
113+
}
114+
115+
for {
116+
sema := pq.waiters.get()
117+
if sema == nil {
118+
break
119+
}
120+
121+
sema.response.Add(1)
122+
sema.ready <- true
123+
sema.response.Wait()
124+
if len(pq.items) == 0 {
125+
break
126+
}
127+
}
128+
129+
return nil
130+
}
131+
132+
// Get retrieves items from the queue. If the queue is empty,
133+
// this call blocks until the next item is added to the queue. This
134+
// will attempt to retrieve number of items.
135+
func (pq *PriorityQueue) Get(number int) ([]Item, error) {
136+
if number < 1 {
137+
return nil, nil
138+
}
139+
140+
pq.lock.Lock()
141+
142+
if pq.disposed {
143+
pq.lock.Unlock()
144+
return nil, ErrDisposed
145+
}
146+
147+
var items []Item
148+
149+
// Remove references to popped items.
150+
deleteItems := func(items []Item) {
151+
for _, item := range items {
152+
delete(pq.itemMap, item)
153+
}
154+
}
155+
156+
if len(pq.items) == 0 {
157+
sema := newSema()
158+
pq.waiters.put(sema)
159+
pq.lock.Unlock()
160+
161+
<-sema.ready
162+
163+
if pq.Disposed() {
164+
return nil, ErrDisposed
165+
}
166+
167+
items = pq.items.get(number)
168+
if !pq.allowDuplicates {
169+
deleteItems(items)
170+
}
171+
sema.response.Done()
172+
return items, nil
173+
}
174+
175+
items = pq.items.get(number)
176+
deleteItems(items)
177+
pq.lock.Unlock()
178+
return items, nil
179+
}
180+
181+
// Peek will look at the next item without removing it from the queue.
182+
func (pq *PriorityQueue) Peek() Item {
183+
pq.lock.Lock()
184+
defer pq.lock.Unlock()
185+
if len(pq.items) > 0 {
186+
return pq.items[0]
187+
}
188+
return nil
189+
}
190+
191+
// Empty returns a bool indicating if there are any items left
192+
// in the queue.
193+
func (pq *PriorityQueue) Empty() bool {
194+
pq.lock.Lock()
195+
defer pq.lock.Unlock()
196+
197+
return len(pq.items) == 0
198+
}
199+
200+
// Len returns a number indicating how many items are in the queue.
201+
func (pq *PriorityQueue) Len() int {
202+
pq.lock.Lock()
203+
defer pq.lock.Unlock()
204+
205+
return len(pq.items)
206+
}
207+
208+
// Disposed returns a bool indicating if this queue has been disposed.
209+
func (pq *PriorityQueue) Disposed() bool {
210+
pq.disposeLock.Lock()
211+
defer pq.disposeLock.Unlock()
212+
213+
return pq.disposed
214+
}
215+
216+
// Dispose will prevent any further reads/writes to this queue
217+
// and frees available resources.
218+
func (pq *PriorityQueue) Dispose() {
219+
pq.lock.Lock()
220+
defer pq.lock.Unlock()
221+
222+
pq.disposeLock.Lock()
223+
defer pq.disposeLock.Unlock()
224+
225+
pq.disposed = true
226+
for _, waiter := range pq.waiters {
227+
waiter.response.Add(1)
228+
waiter.ready <- true
229+
}
230+
231+
pq.items = nil
232+
pq.waiters = nil
233+
}
234+
235+
// NewPriorityQueue is the constructor for a priority queue.
236+
func NewPriorityQueue(hint int, allowDuplicates bool) *PriorityQueue {
237+
return &PriorityQueue{
238+
items: make(priorityItems, 0, hint),
239+
itemMap: make(map[Item]struct{}, hint),
240+
allowDuplicates: allowDuplicates,
241+
}
242+
}

0 commit comments

Comments
 (0)