-
Notifications
You must be signed in to change notification settings - Fork 0
/
lib.go
155 lines (126 loc) · 4.75 KB
/
lib.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package libkflow
import (
"errors"
"fmt"
"net"
"github.com/kentik/libkflow/api"
)
var (
ErrInvalidAuth = errors.New("invalid API email/token")
ErrInvalidConfig = errors.New("invalid config")
ErrInvalidDevice = errors.New("invalid device")
)
// NewSenderWithDeviceID creates a new flow Sender given a device ID,
// error channel, and Config.
func NewSenderWithDeviceID(did int, errors chan<- error, cfg *Config) (*Sender, error) {
client := cfg.client()
d, err := lookupdev(client.GetDeviceByID(did))
if err != nil {
return nil, err
}
return cfg.start(client, d, errors)
}
// NewSenderWithDeviceIF creates a new flow Sender given a device interface name,
// error channel, and Config.
func NewSenderWithDeviceIF(dif string, errors chan<- error, cfg *Config) (*Sender, error) {
client := cfg.client()
d, err := lookupdev(client.GetDeviceByIF(dif))
if err != nil {
return nil, err
}
return cfg.start(client, d, errors)
}
// NewSenderWithDeviceIP creates a new flow Sender given a device IP address,
// error channel, and Config.
func NewSenderWithDeviceIP(dip net.IP, errors chan<- error, cfg *Config) (*Sender, error) {
client := cfg.client()
d, err := lookupdev(client.GetDeviceByIP(dip))
if err != nil {
return nil, err
}
return cfg.start(client, d, errors)
}
// NewSenderWithDeviceName creates a new flow Sender given a device name address,
// error channel, and Config.
func NewSenderWithDeviceName(name string, errors chan<- error, cfg *Config) (*Sender, error) {
client := cfg.client()
d, err := lookupdev(client.GetDeviceByName(name))
if err != nil {
return nil, err
}
return cfg.start(client, d, errors)
}
// NewSenderWithDeviceNameWithErrors creates a new flow Sender given a device name address and Config.
// The channel is closed after Sender.Stop is called and all flow has been dispatched.
func NewSenderWithDeviceNameWithErrors(name string, cfg *Config) (*Sender, <-chan error, error) {
client := cfg.client()
d, err := lookupdev(client.GetDeviceByName(name))
if err != nil {
return nil, nil, err
}
return cfg.startWithInternalErrors(client, d)
}
// NewSenderWithNewDevice creates a new device given device creation parameters,
// and then creates a new flow Sender with that device, the error channel, and
// the Config.
func NewSenderWithNewDevice(dev *api.DeviceCreate, errors chan<- error, cfg *Config) (*Sender, error) {
client := cfg.client()
d, err := client.CreateDevice(dev)
if err != nil {
return nil, err
}
return cfg.start(client, d, errors)
}
// NewSenderWithNewDeviceWithErrors creates a new device and returns a new flow Sender and an error channel which will
// report errors generated from the Sender. The channel is closed after Sender.Stop is called and all flow has been dispatched
func NewSenderWithNewDeviceWithErrors(dev *api.DeviceCreate, cfg *Config) (*Sender, <-chan error, error) {
client := cfg.client()
d, err := client.CreateDevice(dev)
if err != nil {
return nil, nil, err
}
return cfg.startWithInternalErrors(client, d)
}
// NewSenderWithNewSiteAndDevice creates a new device and site then returns a flow Sender for that newly created device
func NewSenderWithNewSiteAndDevice(siteAndDevice *api.SiteAndDeviceCreate, errors chan<- error, cfg *Config) (*Sender, error) {
client := cfg.client()
d, err := client.CreateDeviceAndSite(siteAndDevice)
if err != nil {
return nil, err
}
return cfg.start(client, d, errors)
}
// NewSenderWithNewSiteAndDeviceWithErrors is the same as NewSenderWithNewSiteAndDeviceWithErrors except rather than
// passing in a channel to receive errors, a channel is returned by the function. The channel is closed after Sender.Stop is called
// and all flow has been dispatched
func NewSenderWithNewSiteAndDeviceWithErrors(siteAndDevice *api.SiteAndDeviceCreate, cfg *Config) (*Sender, <-chan error, error) {
client := cfg.client()
d, err := client.CreateDeviceAndSite(siteAndDevice)
if err != nil {
return nil, nil, err
}
return cfg.startWithInternalErrors(client, d)
}
// NewSenderFromDevice returns a Sender for an existing Device
func NewSenderFromDevice(d *api.Device, errors chan<- error, cfg *Config) (*Sender, error) {
client := cfg.client()
return cfg.start(client, d, errors)
}
// NewSenderFromDeviceWithErrors returns a Sender and an error channel for an existing Device
func NewSenderFromDeviceWithErrors(d *api.Device, cfg *Config) (*Sender, <-chan error, error) {
client := cfg.client()
return cfg.startWithInternalErrors(client, d)
}
func lookupdev(dev *api.Device, err error) (*api.Device, error) {
if err != nil {
switch {
case api.IsErrorWithStatusCode(err, 401):
return nil, ErrInvalidAuth
case api.IsErrorWithStatusCode(err, 404):
return nil, ErrInvalidDevice
default:
return nil, fmt.Errorf("device lookup error: %s", err)
}
}
return dev, nil
}