-
Notifications
You must be signed in to change notification settings - Fork 0
/
radio.go
145 lines (113 loc) · 3.14 KB
/
radio.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package radio
import (
"errors"
"runtime"
)
// A Broadcaster should be able to take messages an broadcast them into multiple
// channels.
type Broadcaster interface {
Broadcast(message interface{}) error
Listen() (<-chan interface{}, uint)
Stop(listener uint) error
Call(listener uint, message interface{}) error
}
// A Radio is a simple implementation of a Broadcaster. It is capable of
// broadcasting messages to multiple channels and sending messages to individual
// channels.
type Radio struct {
listeners map[uint]chan interface{}
antenna chan interface{}
lastID uint
live bool
}
// Noise is a simple message that we sent through a Radio will cause it to stop
// broadcasting messages. Future messages will return errors.
type Noise struct{}
// NewRadio creates a new instance of a Radio.
func NewRadio() *Radio {
instance := &Radio{
antenna: make(chan interface{}),
listeners: make(map[uint]chan interface{}),
}
instance.powerup()
runtime.SetFinalizer(instance, func(radio *Radio) {
instance.antenna <- Noise{}
})
return instance
}
func repeat(listener chan interface{}, message interface{}) {
listener <- message
}
func (r *Radio) powerup() {
r.live = true
go func() {
defer close(r.antenna)
for message := range r.antenna {
if _, ok := message.(Noise); ok {
r.live = false
r.Alienate()
break
}
for _, listener := range r.listeners {
go repeat(listener, message)
}
}
}()
}
// Broadcast repeats the same message to all the Radio listeners.
func (r *Radio) Broadcast(message interface{}) error {
if !r.live {
return errors.New("This radio channel is offline")
}
r.antenna <- message
return nil
}
// Listen creates a new read-only listener channel that will receive messages
// broadcast on this Radio. This function also returns an identifier that can be
// used to reference to this listerner on other Radio functions.
func (r *Radio) Listen() (<-chan interface{}, uint) {
listener := make(chan interface{})
r.lastID = r.lastID + 1
r.listeners[r.lastID] = listener
return listener, r.lastID
}
// Count returns the number of active listeners.
func (r *Radio) Count() int {
return len(r.listeners)
}
// IsLive returns whether or not the radio is broadcasting.
func (r *Radio) IsLive() bool {
return r.live
}
// WakeUp attempts to restart a radio that was taken offline.
func (r *Radio) WakeUp() error {
if r.live {
return errors.New("This radio is already broadcasting")
}
r.antenna = make(chan interface{})
r.powerup()
return nil
}
// Alienate closes all listener channels attached to this radio.
func (r *Radio) Alienate() {
for listener := range r.listeners {
r.Stop(listener)
}
}
// Call sends a message only to the specified listener.
func (r *Radio) Call(listener uint, message interface{}) error {
if ch, ok := r.listeners[listener]; ok {
go repeat(ch, message)
return nil
}
return errors.New("Listener does not exist")
}
// Stop removes a listener and closes their channel.
func (r *Radio) Stop(listener uint) error {
if ch, ok := r.listeners[listener]; ok {
delete(r.listeners, listener)
close(ch)
return nil
}
return errors.New("Listener does not exist")
}