-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathevents.go
380 lines (312 loc) · 10.2 KB
/
events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
package securityspy
import (
"bufio"
"bytes"
"fmt"
"io"
"net/url"
"strconv"
"strings"
"time"
)
// String provides a description of an event.
func (e *Event) String() string {
txt := EventName(e.Type)
if txt == "" {
return UnknownEventText
}
return txt
}
/* Events methods follow. */
// BindFunc binds a call-back function to an Event in SecuritySpy.
// Use this to receive incoming events via a callback method in a go routine.
func (e *Events) BindFunc(event EventType, callBack func(Event)) {
if callBack == nil {
return
}
e.binds.Lock()
defer e.binds.Unlock()
if val, ok := e.eventBinds[event]; ok {
e.eventBinds[event] = append(val, callBack)
return
}
e.eventBinds[event] = []func(Event){callBack}
}
// BindChan binds a receiving channel to an Event in SecuritySpy.
// Use this to receive incoming events over a channel.
// Avoid using unbuffered channels as they may block further event processing.
func (e *Events) BindChan(event EventType, channel chan Event) {
if channel == nil {
return
}
e.chans.Lock()
defer e.chans.Unlock()
if val, ok := e.eventChans[event]; ok {
e.eventChans[event] = append(val, channel)
return
}
e.eventChans[event] = []chan Event{channel}
}
// Stop stops Watch() loops and disconnects from the event stream.
// No further callback messages will fire after this is called.
// Closes all channels that were passed to BindChan if closeChans=true.
// Stop writing to the channels with Custom() before calling Stop().
func (e *Events) Stop(closeChans bool) {
defer func() { e.Running = false }()
if e.Running {
e.custom(eventStreamStop, -1, -1, "") // signal
if e.stream != nil {
_ = e.stream.Close()
e.stream = nil
}
close(e.eventChan)
}
if !closeChans {
return
}
for _, chans := range e.eventChans {
for i := range chans {
close(chans[i])
}
}
}
// UnbindAll removes all event bindings and channels.
func (e *Events) UnbindAll() {
e.binds.Lock()
e.chans.Lock()
defer func() {
e.binds.Unlock()
e.chans.Unlock()
}()
e.eventBinds = make(map[EventType][]func(Event))
e.eventChans = make(map[EventType][]chan Event)
}
// UnbindChan removes all bound channels for a particular event.
func (e *Events) UnbindChan(event EventType) {
e.chans.Lock()
defer e.chans.Unlock()
delete(e.eventChans, event)
}
// UnbindFunc removes all bound callbacks for a particular event.
// EventType is a set of constants that begin with Event*.
func (e *Events) UnbindFunc(event EventType) {
e.binds.Lock()
defer e.binds.Unlock()
delete(e.eventBinds, event)
}
// Watch kicks off the routines to watch the eventStream and fire callback bindings.
// If your application relies on event stream messages, call this at least once
// to connect the stream. If you have no call back functions or channels then do not
// call this. Call Stop() to close the connection when you're done with it.
func (e *Events) Watch(retryInterval time.Duration, refreshOnConfigChange bool) {
e.Running = true
e.eventChan = make(chan Event, EventBuffer) // allow 1000 events to buffer
go e.eventStreamSelector(refreshOnConfigChange, retryInterval)
go e.eventStreamScanner()
}
// Custom fires an event into the running event Watcher. Any functions or
// channels bound to the CUSTOM Event type will also be called.
func (e *Events) Custom(cameraNum int, msg string) {
e.custom(EventStreamCustom, -11000, cameraNum, msg)
}
// custom allows a quick way to make events.
func (e *Events) custom(eventType EventType, eventID int, cam int, msg string) {
if !e.Running {
return
}
e.eventChan <- Event{
Time: time.Now().Round(time.Second),
When: time.Now().Round(time.Second),
ID: eventID,
Msg: string(eventType) + " " + msg,
Type: eventType,
Camera: e.server.Cameras.ByNum(cam),
}
}
/* INTERFACE HELPER METHODS FOLLOW */
// eventStreamScanner connects to the securityspy event stream and fires events into a channel.
func (e *Events) eventStreamScanner() {
defer e.custom(EventStreamDisconnect, -10000, -1, "Connection Closed")
if err := e.eventStreamConnect(); err != nil {
return
}
defer func() {
e.stream.Close()
e.stream = nil
}()
scanner := bufio.NewScanner(e.stream)
scanner.Split(scanLinesCR)
for scanner.Scan() {
// Constantly scan for new events, then report them to the event channel.
if text := scanner.Text(); strings.Count(text, " ") > 2 { //nolint:gomnd,nolintlint
e.eventChan <- e.UnmarshalEvent(text)
}
}
}
// eventStreamConnect establishes a connection to the event stream and passes off the http Reader.
func (e *Events) eventStreamConnect() error {
client := e.server.HTTPClient()
client.Timeout = 0
resp, err := e.server.GetClient("++eventStream", url.Values{"version": []string{"3"}}, client)
if err != nil {
return fmt.Errorf("connecting event stream: %w", err)
}
e.stream = resp.Body
e.custom(EventStreamConnect, -9999, -1, EventName(EventStreamConnect))
return nil
}
// eventStreamSelector watches the event channel.
// Fires bound event call back functions.
// Also reconnects to the event stream if the connection fails.
// There is a "loop" that occurs among the eventStream* methods.
// Stop() properly handles the shutdown of the loop, so if can be safely restarted w/ Watch().
func (e *Events) eventStreamSelector(refreshOnConfigChange bool, retryInterval time.Duration) {
Loop:
for event := range e.eventChan {
switch event.Type { //nolint:exhaustive
case eventStreamStop:
break Loop // Stop() called.
case EventConfigChange:
if refreshOnConfigChange {
e.serverRefresh()
}
case EventStreamDisconnect:
// reconnect to event stream
go func() {
time.Sleep(retryInterval)
e.eventStreamScanner()
}()
}
// All events run binds.
e.binds.RLock()
event.callBacks(e.eventBinds)
e.binds.RUnlock()
e.chans.RLock()
event.eventChans(e.eventChans)
e.chans.RUnlock()
}
}
func (e *Events) serverRefresh() {
if err := e.server.Refresh(); err != nil {
e.custom(EventWatcherRefreshFail, -9997, -1, err.Error())
return
}
e.custom(EventWatcherRefreshed, -9998, -1, EventName(EventWatcherRefreshed))
}
//nolint:dupword
/* Example Event Stream Flow:
(new, v5)
20190927092026 3 3 CLASSIFY HUMAN 99
20190927092026 4 3 TRIGGER_M 9
20190927092036 5 3 CLASSIFY HUMAN 5 VEHICLE 95
20190927092040 5 X NULL
20190927092050 6 3 FILE /Volumes/VolName/Cam/2019-07-26/26-07-2019 15-52-00 C Cam.m4v
20190927092055 7 3 DISARM_M
20190927092056 8 3 OFFLINE
*/
// UnmarshalEvent turns raw text into an Event that can fire callbacks.
// You generally shouldn't need to call this method, it's exposed for convenience.
/* [TIME] is specified in the order: "year, month, day, hour, minute, second" and is always 14 characters long.
* [EVENT NUMBER] increases by 1 for each subsequent event.
* [CAMERA NUMBER] specifies the camera that this event relates to, for example CAM15 for camera number 15.
* [EVENT] describes the event: ARM_C, DISARM_C, ARM_M, DISARM_M, ARM_A, DISARM_A, ERROR,
CONFIGCHANGE, MOTION, OFFLINE, ONLINE */
func (e *Events) UnmarshalEvent(text string) Event { //nolint:funlen,cyclop
var (
err error
parts = strings.SplitN(text, " ", 4) //nolint:gomnd
newEvent = Event{Msg: parts[3], ID: -1, Time: time.Now()}
// Parse the time stamp; append the Offset from ++systemInfo to get the right time-location.
eventTime = fmt.Sprintf("%v%+03.0f", parts[0], e.server.Info.GmtOffset.Hours())
)
if newEvent.When, err = time.ParseInLocation(EventTimeFormat+"-07", eventTime, time.Local); err != nil {
newEvent.When = time.Now()
newEvent.Errors = append(newEvent.Errors, ErrDateParseFail)
}
// Parse the ID
if newEvent.ID, err = strconv.Atoi(parts[1]); err != nil {
newEvent.ID = BadID
newEvent.Errors = append(newEvent.Errors, ErrIDParseFail)
}
// Parse the camera number.
parts[2] = strings.TrimPrefix(parts[2], "CAM")
if parts[2] != "X" {
if cameraNum, err := strconv.Atoi(parts[2]); err != nil {
newEvent.Errors = append(newEvent.Errors, ErrCAMParseFail)
} else if newEvent.Camera = e.server.Cameras.ByNum(cameraNum); newEvent.Camera == nil {
newEvent.Errors = append(newEvent.Errors, ErrCAMMissing)
}
}
// Parse and convert the type string to EventType.
parts = strings.Split(newEvent.Msg, " ")
newEvent.Type = EventType(parts[0])
// Check if the type we just converted is a known event.
if name := EventName(newEvent.Type); name == "" {
newEvent.Errors = append(newEvent.Errors, ErrUnknownEvent)
newEvent.Type = EventUnknownEvent
}
// If this is a trigger-type event, add the trigger reason(s)
if newEvent.Type == EventTriggerAction || newEvent.Type == EventTriggerMotion && len(parts) == 2 {
b, _ := strconv.Atoi(parts[1])
msg := ""
// Check if this bitmask contains any of our known reasons.
for flag, txt := range Reasons {
if b&int(flag) != 0 {
if msg != "" {
msg += ", "
}
newEvent.Reasons = append(newEvent.Reasons, flag)
msg += txt
}
}
newEvent.Msg += " - Reasons: " + msg
if msg == "" {
newEvent.Msg += UnknownReasonText
}
}
return newEvent
}
// callBacks is run for each event to execute callback functions.
func (e *Event) callBacks(binds map[EventType][]func(Event)) {
callbacks := func(callbacks []func(Event)) {
for _, callBack := range callbacks {
if callBack != nil {
go callBack(*e) // Send it off!
}
}
}
if _, ok := binds[e.Type]; ok {
callbacks(binds[e.Type])
} else if _, ok := binds[EventUnknownEvent]; ok && e.Type != EventUnknownEvent {
callbacks(binds[EventUnknownEvent])
}
if _, ok := binds[EventAllEvents]; ok {
callbacks(binds[EventAllEvents])
}
}
// eventChans is run for each event to notify external channels.
func (e *Event) eventChans(chans map[EventType][]chan Event) {
for _, t := range []EventType{e.Type, EventAllEvents} {
if chans, ok := chans[t]; ok {
for i := range chans {
chans[i] <- *e
}
}
}
}
// scanLinesCR is a custom bufio.Scanner to read SecuritySpy eventStream.
func scanLinesCR(data []byte, atEOF bool) (int, []byte, error) {
if atEOF && len(data) == 0 {
return 0, nil, ErrDisconnect
}
if i := bytes.IndexByte(data, '\r'); i >= 0 {
// We have a full CR-terminated line.
return i + 1, data[0:i], nil
}
// If we're at EOF, we have a final, non-terminated line. Return it.
if atEOF {
return len(data), data, io.ErrShortBuffer
}
// Request more data.
return 0, nil, nil
}