-
Notifications
You must be signed in to change notification settings - Fork 0
/
shimqueue.go
63 lines (51 loc) · 1.14 KB
/
shimqueue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package eventbus
// Simple expanding queue structure that can be thrown between a sender/receiver.
// This can effectively emulate a channel with unrestricted buffering.
type shimQueue struct {
// Listener channel
input <-chan Message
// Emitter channel
output chan<- Message
}
// Create a queue shim
func CreateShimQueue(input <-chan Message, output chan<- Message) {
var shim shimQueue
shim.input = input
shim.output = output
go shim.run()
}
func (shim *shimQueue) run() {
// Main loop -- recv+queue messages, send+deque messages
queue := make([]Message, 0)
done := false
var outMsg Message
var outChan chan<- Message
// Event loop with items in queue
for !done {
if len(queue) > 0 {
outMsg = queue[0]
outChan = shim.output
} else {
outChan = nil
}
select {
case msg, ok := <-shim.input:
// Activity on input queue
if ok {
queue = append(queue, msg)
} else {
done = true
}
case outChan <- outMsg:
// Item sent on receive queue
queue = queue[1:]
}
}
// Drain queue
for _, msg := range queue {
shim.output <- msg
}
// Tear down some things
close(shim.output)
queue = []Message{}
}