forked from xiaonanln/go-lockfree-queue
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathqueue.go
139 lines (119 loc) · 2.68 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package lfqueue
import (
"runtime"
"sync/atomic"
)
type Queue struct {
capacityMod uint32
capacity uint32
getPos uint32
_1 [64]byte
putPos uint32
// _2 [64]byte
entries []queueEntry
}
type queueEntry struct {
putPos uint32
getPos uint32
elem interface{}
}
func NewQueue(capacity int) *Queue {
q := &Queue{
capacity: minQuantity(uint32(capacity + 1)),
putPos: 0,
getPos: 0,
}
q.capacityMod = q.capacity - 1
q.entries = make([]queueEntry, q.capacity)
for i := range q.entries {
entry := &q.entries[i]
entry.getPos = uint32(i)
entry.putPos = uint32(i)
}
return q
}
func (q *Queue) Put(elem interface{}) (ok bool) {
capacityMod := q.capacityMod
retry:
getPos := atomic.LoadUint32(&q.getPos)
putPos := atomic.LoadUint32(&q.putPos)
var cnt uint32
if putPos >= getPos {
cnt = putPos - getPos
} else { // putPos < getPos when putPos exceed uint32 boundary
cnt = (putPos - getPos) + capacityMod
}
if cnt >= capacityMod {
return false
}
if !atomic.CompareAndSwapUint32(&q.putPos, putPos, putPos+1) {
runtime.Gosched()
goto retry
}
entry := &q.entries[putPos&capacityMod]
for {
entryGetPos := atomic.LoadUint32(&entry.getPos)
entryPutPos := atomic.LoadUint32(&entry.putPos)
if putPos == entryPutPos && entryPutPos == entryGetPos {
entry.elem = elem
atomic.AddUint32(&entry.putPos, q.capacity)
return true
} else {
runtime.Gosched()
}
}
}
func (q *Queue) Get() (elem interface{}, ok bool) {
capacity := q.capacity
capacityMod := q.capacityMod
retry:
getPos := atomic.LoadUint32(&q.getPos)
putPos := atomic.LoadUint32(&q.putPos)
var cnt uint32
if putPos >= getPos {
cnt = putPos - getPos
} else { // putPos < getPos when putPos exceed uint32 boundary
cnt = (putPos - getPos) + capacityMod
}
if cnt <= 0 {
return nil, false
}
if !atomic.CompareAndSwapUint32(&q.getPos, getPos, getPos+1) {
runtime.Gosched()
goto retry
}
entry := &q.entries[getPos&capacityMod]
for {
entryGetPos := atomic.LoadUint32(&entry.getPos)
entryPutPos := atomic.LoadUint32(&entry.putPos)
if getPos == entryGetPos && entryGetPos == entryPutPos-capacity {
elem := entry.elem
entry.elem = nil
atomic.AddUint32(&entry.getPos, capacity)
return elem, true
} else {
runtime.Gosched()
}
}
}
func (q *Queue) Size() int {
getPos := atomic.LoadUint32(&q.getPos)
putPos := atomic.LoadUint32(&q.putPos)
var cnt uint32
if putPos >= getPos {
cnt = putPos - getPos
} else { // putPos < getPos when putPos exceed uint32 boundary
cnt = (putPos - getPos) + q.capacityMod
}
return int(cnt)
}
func minQuantity(v uint32) uint32 {
v--
v |= v >> 1
v |= v >> 2
v |= v >> 4
v |= v >> 8
v |= v >> 16
v++
return v
}