Skip to content

Commit f5f97ad

Browse files
authored
Merge pull request #108 from hikhvar/fix-reloading-with-big-set-of-targets
Fix bugs introduced by hot reloading of targets
2 parents 64b4119 + 436cd06 commit f5f97ad

File tree

4 files changed

+128
-74
lines changed

4 files changed

+128
-74
lines changed

collector.go

+57-40
Original file line numberDiff line numberDiff line change
@@ -12,48 +12,66 @@ import (
1212
"github.com/czerwonk/ping_exporter/config"
1313
)
1414

15-
var (
16-
labelNames []string
15+
type pingCollector struct {
16+
monitor *mon.Monitor
17+
enableDeprecatedMetrics bool
18+
rttUnit rttUnit
19+
20+
cfg *config.Config
21+
22+
mutex sync.RWMutex
23+
24+
customLabels *customLabelSet
25+
metrics map[string]*mon.Metrics
26+
1727
rttDesc scaledMetrics
1828
bestDesc scaledMetrics
1929
worstDesc scaledMetrics
2030
meanDesc scaledMetrics
2131
stddevDesc scaledMetrics
2232
lossDesc *prometheus.Desc
2333
progDesc *prometheus.Desc
24-
mutex *sync.Mutex
25-
)
34+
}
2635

27-
type pingCollector struct {
28-
cfg *config.Config
29-
customLabels *customLabelSet
30-
monitor *mon.Monitor
31-
metrics map[string]*mon.Metrics
36+
func NewPingCollector(enableDeprecatedMetrics bool, unit rttUnit, monitor *mon.Monitor, cfg *config.Config) *pingCollector {
37+
ret := &pingCollector{
38+
monitor: monitor,
39+
enableDeprecatedMetrics: enableDeprecatedMetrics,
40+
rttUnit: unit,
41+
cfg: cfg,
42+
}
43+
ret.customLabels = newCustomLabelSet(cfg.Targets)
44+
ret.createDesc()
45+
return ret
3246
}
3347

34-
func (p *pingCollector) Describe(ch chan<- *prometheus.Desc) {
35-
p.createDesc()
48+
func (p *pingCollector) UpdateConfig(cfg *config.Config) {
49+
p.mutex.Lock()
50+
defer p.mutex.Unlock()
51+
p.cfg.Targets = cfg.Targets
52+
}
3653

37-
if enableDeprecatedMetrics {
38-
rttDesc.Describe(ch)
54+
func (p *pingCollector) Describe(ch chan<- *prometheus.Desc) {
55+
if p.enableDeprecatedMetrics {
56+
p.rttDesc.Describe(ch)
3957
}
40-
bestDesc.Describe(ch)
41-
worstDesc.Describe(ch)
42-
meanDesc.Describe(ch)
43-
stddevDesc.Describe(ch)
44-
ch <- lossDesc
45-
ch <- progDesc
58+
p.bestDesc.Describe(ch)
59+
p.worstDesc.Describe(ch)
60+
p.meanDesc.Describe(ch)
61+
p.stddevDesc.Describe(ch)
62+
ch <- p.lossDesc
63+
ch <- p.progDesc
4664
}
4765

4866
func (p *pingCollector) Collect(ch chan<- prometheus.Metric) {
49-
mutex.Lock()
50-
defer mutex.Unlock()
67+
p.mutex.Lock()
68+
defer p.mutex.Unlock()
5169

5270
if m := p.monitor.Export(); len(m) > 0 {
5371
p.metrics = m
5472
}
5573

56-
ch <- prometheus.MustNewConstMetric(progDesc, prometheus.GaugeValue, 1)
74+
ch <- prometheus.MustNewConstMetric(p.progDesc, prometheus.GaugeValue, 1)
5775

5876
for target, metrics := range p.metrics {
5977
l := strings.SplitN(target, " ", 3)
@@ -63,35 +81,34 @@ func (p *pingCollector) Collect(ch chan<- prometheus.Metric) {
6381

6482
if metrics.PacketsSent > metrics.PacketsLost {
6583
if enableDeprecatedMetrics {
66-
rttDesc.Collect(ch, metrics.Best, append(l, "best")...)
67-
rttDesc.Collect(ch, metrics.Worst, append(l, "worst")...)
68-
rttDesc.Collect(ch, metrics.Mean, append(l, "mean")...)
69-
rttDesc.Collect(ch, metrics.StdDev, append(l, "std_dev")...)
84+
p.rttDesc.Collect(ch, metrics.Best, append(l, "best")...)
85+
p.rttDesc.Collect(ch, metrics.Worst, append(l, "worst")...)
86+
p.rttDesc.Collect(ch, metrics.Mean, append(l, "mean")...)
87+
p.rttDesc.Collect(ch, metrics.StdDev, append(l, "std_dev")...)
7088
}
7189

72-
bestDesc.Collect(ch, metrics.Best, l...)
73-
worstDesc.Collect(ch, metrics.Worst, l...)
74-
meanDesc.Collect(ch, metrics.Mean, l...)
75-
stddevDesc.Collect(ch, metrics.StdDev, l...)
90+
p.bestDesc.Collect(ch, metrics.Best, l...)
91+
p.worstDesc.Collect(ch, metrics.Worst, l...)
92+
p.meanDesc.Collect(ch, metrics.Mean, l...)
93+
p.stddevDesc.Collect(ch, metrics.StdDev, l...)
7694
}
7795

7896
loss := float64(metrics.PacketsLost) / float64(metrics.PacketsSent)
79-
ch <- prometheus.MustNewConstMetric(lossDesc, prometheus.GaugeValue, loss, l...)
97+
ch <- prometheus.MustNewConstMetric(p.lossDesc, prometheus.GaugeValue, loss, l...)
8098
}
8199
}
82100

83101
func (p *pingCollector) createDesc() {
84-
labelNames = []string{"target", "ip", "ip_version"}
102+
labelNames := []string{"target", "ip", "ip_version"}
85103
labelNames = append(labelNames, p.customLabels.labelNames()...)
86104

87-
rttDesc = newScaledDesc("rtt", "Round trip time", append(labelNames, "type"))
88-
bestDesc = newScaledDesc("rtt_best", "Best round trip time", labelNames)
89-
worstDesc = newScaledDesc("rtt_worst", "Worst round trip time", labelNames)
90-
meanDesc = newScaledDesc("rtt_mean", "Mean round trip time", labelNames)
91-
stddevDesc = newScaledDesc("rtt_std_deviation", "Standard deviation", labelNames)
92-
lossDesc = newDesc("loss_ratio", "Packet loss from 0.0 to 1.0", labelNames, nil)
93-
progDesc = newDesc("up", "ping_exporter version", nil, prometheus.Labels{"version": version})
94-
mutex = &sync.Mutex{}
105+
p.rttDesc = newScaledDesc("rtt", "Round trip time", p.rttUnit, append(labelNames, "type"))
106+
p.bestDesc = newScaledDesc("rtt_best", "Best round trip time", p.rttUnit, labelNames)
107+
p.worstDesc = newScaledDesc("rtt_worst", "Worst round trip time", p.rttUnit, labelNames)
108+
p.meanDesc = newScaledDesc("rtt_mean", "Mean round trip time", p.rttUnit, labelNames)
109+
p.stddevDesc = newScaledDesc("rtt_std_deviation", "Standard deviation", p.rttUnit, labelNames)
110+
p.lossDesc = newDesc("loss_ratio", "Packet loss from 0.0 to 1.0", labelNames, nil)
111+
p.progDesc = newDesc("up", "ping_exporter version", nil, prometheus.Labels{"version": version})
95112
}
96113

97114
func newDesc(name, help string, variableLabels []string, constLabels prometheus.Labels) *prometheus.Desc {

main.go

+51-29
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"net/http"
1212
"os"
1313
"strings"
14+
"sync"
1415
"time"
1516

1617
"github.com/digineo/go-ping"
@@ -116,13 +117,18 @@ func main() {
116117
kingpin.FatalUsage("No targets specified")
117118
}
118119

119-
m, err := startMonitor(cfg)
120+
resolver := setupResolver(cfg)
121+
122+
m, err := startMonitor(cfg, resolver)
120123
if err != nil {
121124
log.Errorln(err)
122125
os.Exit(2)
123126
}
124127

125-
startServer(cfg, m)
128+
collector := NewPingCollector(enableDeprecatedMetrics, rttMetricsScale, m, cfg)
129+
go watchConfig(desiredTargets, resolver, m, collector)
130+
131+
startServer(cfg, collector)
126132
}
127133

128134
func printVersion() {
@@ -132,8 +138,7 @@ func printVersion() {
132138
fmt.Println("Metric exporter for go-icmp")
133139
}
134140

135-
func startMonitor(cfg *config.Config) (*mon.Monitor, error) {
136-
resolver := setupResolver(cfg)
141+
func startMonitor(cfg *config.Config, resolver *net.Resolver) (*mon.Monitor, error) {
137142
var bind4, bind6 string
138143
if ln, err := net.Listen("tcp4", "127.0.0.1:0"); err == nil {
139144
// ipv4 enabled
@@ -165,42 +170,50 @@ func startMonitor(cfg *config.Config) (*mon.Monitor, error) {
165170
}
166171

167172
go startDNSAutoRefresh(cfg.DNS.Refresh.Duration(), desiredTargets, monitor, cfg)
168-
go watchConfig(desiredTargets, resolver, monitor)
169173
return monitor, nil
170174
}
171175

172176
func upsertTargets(globalTargets *targets, resolver *net.Resolver, cfg *config.Config, monitor *mon.Monitor) error {
173177
oldTargets := globalTargets.Targets()
174178
newTargets := make([]*target, len(cfg.Targets))
179+
var wg sync.WaitGroup
175180
for i, t := range cfg.Targets {
176-
t := &target{
177-
host: t.Addr,
178-
addresses: make([]net.IPAddr, 0),
179-
delay: time.Duration(10*i) * time.Millisecond,
180-
resolver: resolver,
181-
}
182-
newTargets[i] = t
183-
184-
err := t.addOrUpdateMonitor(monitor, targetOpts{
185-
disableIPv4: cfg.Options.DisableIPv4,
186-
disableIPv6: cfg.Options.DisableIPv6,
187-
})
188-
if err != nil {
189-
return fmt.Errorf("failed to setup target: %w", err)
181+
newTarget := globalTargets.Get(t.Addr)
182+
if newTarget == nil {
183+
newTarget = &target{
184+
host: t.Addr,
185+
addresses: make([]net.IPAddr, 0),
186+
delay: time.Duration(10*i) * time.Millisecond,
187+
resolver: resolver,
188+
}
190189
}
191-
}
192190

191+
newTargets[i] = newTarget
192+
193+
wg.Add(1)
194+
go func() {
195+
err := newTarget.addOrUpdateMonitor(monitor, targetOpts{
196+
disableIPv4: cfg.Options.DisableIPv4,
197+
disableIPv6: cfg.Options.DisableIPv6,
198+
})
199+
if err != nil {
200+
log.Errorf("failed to setup target: %v", err)
201+
}
202+
wg.Done()
203+
}()
204+
}
205+
wg.Wait()
193206
globalTargets.SetTargets(newTargets)
194207

195208
removed := removedTargets(oldTargets, globalTargets)
196209
for _, removedTarget := range removed {
197-
log.Infof("remove target: %s\n", removedTarget.host)
210+
log.Infof("remove target: %s", removedTarget.host)
198211
removedTarget.removeFromMonitor(monitor)
199212
}
200213
return nil
201214
}
202215

203-
func watchConfig(globalTargets *targets, resolver *net.Resolver, monitor *mon.Monitor) {
216+
func watchConfig(globalTargets *targets, resolver *net.Resolver, monitor *mon.Monitor, collector *pingCollector) {
204217
watcher, err := inotify.NewWatcher()
205218
if err != nil {
206219
log.Fatalf("unable to create file watcher: %v", err)
@@ -212,17 +225,30 @@ func watchConfig(globalTargets *targets, resolver *net.Resolver, monitor *mon.Mo
212225
}
213226
for {
214227
select {
215-
case <-watcher.Events:
228+
case event := <-watcher.Events:
229+
log.Debugf("Got file inotify event: %s", event)
230+
// If the file is removed, the inotify watcher will lose track of the file. Add it again.
231+
if event.Op == inotify.Remove {
232+
if err = watcher.Add(*configFile); err != nil {
233+
log.Fatalf("failed to renew watch for file: %v", err)
234+
}
235+
}
216236
cfg, err := loadConfig()
217237
if err != nil {
218238
log.Errorf("unable to load config: %v", err)
219239
continue
220240
}
241+
// We get zero targets if the file was truncated. This happens if an automation tool rewrites
242+
// the complete file, instead of alternating only parts of it.
243+
if len(cfg.Targets) == 0 {
244+
continue
245+
}
221246
log.Infof("reloading config file %s", *configFile)
222247
if err := upsertTargets(globalTargets, resolver, cfg, monitor); err != nil {
223248
log.Errorf("failed to reload config: %v", err)
224249
continue
225250
}
251+
collector.UpdateConfig(cfg)
226252
case err := <-watcher.Errors:
227253
log.Errorf("watching file failed: %v", err)
228254
}
@@ -264,19 +290,15 @@ func refreshDNS(tar *targets, monitor *mon.Monitor, cfg *config.Config) {
264290
}
265291
}
266292

267-
func startServer(cfg *config.Config, monitor *mon.Monitor) {
293+
func startServer(cfg *config.Config, collector *pingCollector) {
268294
var err error
269295
log.Infof("Starting ping exporter (Version: %s)", version)
270296
http.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) {
271297
fmt.Fprintf(w, indexHTML, *metricsPath)
272298
})
273299

274300
reg := prometheus.NewRegistry()
275-
reg.MustRegister(&pingCollector{
276-
cfg: cfg,
277-
monitor: monitor,
278-
customLabels: newCustomLabelSet(cfg.Targets),
279-
})
301+
reg.MustRegister(collector)
280302

281303
l := log.New()
282304
l.Level = log.ErrorLevel

rttscale.go

+7-5
Original file line numberDiff line numberDiff line change
@@ -29,28 +29,30 @@ func rttUnitFromString(s string) rttUnit {
2929
type scaledMetrics struct {
3030
Millis *prometheus.Desc
3131
Seconds *prometheus.Desc
32+
scale rttUnit
3233
}
3334

3435
func (s *scaledMetrics) Describe(ch chan<- *prometheus.Desc) {
35-
if rttMetricsScale == rttInMills || rttMetricsScale == rttBoth {
36+
if s.scale == rttInMills || s.scale == rttBoth {
3637
ch <- s.Millis
3738
}
38-
if rttMetricsScale == rttInSeconds || rttMetricsScale == rttBoth {
39+
if s.scale == rttInSeconds || s.scale == rttBoth {
3940
ch <- s.Seconds
4041
}
4142
}
4243

4344
func (s *scaledMetrics) Collect(ch chan<- prometheus.Metric, value float32, labelValues ...string) {
44-
if rttMetricsScale == rttInMills || rttMetricsScale == rttBoth {
45+
if s.scale == rttInMills || s.scale == rttBoth {
4546
ch <- prometheus.MustNewConstMetric(s.Millis, prometheus.GaugeValue, float64(value), labelValues...)
4647
}
47-
if rttMetricsScale == rttInSeconds || rttMetricsScale == rttBoth {
48+
if s.scale == rttInSeconds || s.scale == rttBoth {
4849
ch <- prometheus.MustNewConstMetric(s.Seconds, prometheus.GaugeValue, float64(value)/1000, labelValues...)
4950
}
5051
}
5152

52-
func newScaledDesc(name, help string, variableLabels []string) scaledMetrics {
53+
func newScaledDesc(name, help string, scale rttUnit, variableLabels []string) scaledMetrics {
5354
return scaledMetrics{
55+
scale: scale,
5456
Millis: newDesc(name+"_ms", help+" in millis (deprecated)", variableLabels, nil),
5557
Seconds: newDesc(name+"_seconds", help+" in seconds", variableLabels, nil),
5658
}

target.go

+13
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ func (t *targets) SetTargets(tar []*target) {
3737
}
3838

3939
func (t *targets) Contains(tar *target) bool {
40+
t.mutex.RLock()
41+
defer t.mutex.RUnlock()
4042
for _, ta := range t.t {
4143
if ta.host == tar.host {
4244
return true
@@ -45,6 +47,17 @@ func (t *targets) Contains(tar *target) bool {
4547
return false
4648
}
4749

50+
func (t *targets) Get(host string) *target {
51+
t.mutex.RLock()
52+
defer t.mutex.RUnlock()
53+
for _, ta := range t.t {
54+
if ta.host == host {
55+
return ta
56+
}
57+
}
58+
return nil
59+
}
60+
4861
func (t *targets) Targets() []*target {
4962
t.mutex.RLock()
5063
defer t.mutex.RUnlock()

0 commit comments

Comments
 (0)