diff --git a/scope.go b/scope.go index 0b30104..dd31018 100644 --- a/scope.go +++ b/scope.go @@ -102,6 +102,9 @@ type scope struct { done chan struct{} wg sync.WaitGroup root bool + + counterChangeNotifyCh chan *counter + gaugeChangeNotifyCh chan *gauge } // ScopeOptions is a set of options to construct a scope. @@ -166,23 +169,25 @@ func newRootScope(opts ScopeOptions, interval time.Duration) *scope { } s := &scope{ - baseReporter: baseReporter, - bucketCache: newBucketCache(), - cachedReporter: opts.CachedReporter, - counters: make(map[string]*counter), - countersSlice: make([]*counter, 0, _defaultInitialSliceSize), - defaultBuckets: opts.DefaultBuckets, - done: make(chan struct{}), - gauges: make(map[string]*gauge), - gaugesSlice: make([]*gauge, 0, _defaultInitialSliceSize), - histograms: make(map[string]*histogram), - histogramsSlice: make([]*histogram, 0, _defaultInitialSliceSize), - prefix: sanitizer.Name(opts.Prefix), - reporter: opts.Reporter, - sanitizer: sanitizer, - separator: sanitizer.Name(opts.Separator), - timers: make(map[string]*timer), - root: true, + baseReporter: baseReporter, + bucketCache: newBucketCache(), + cachedReporter: opts.CachedReporter, + counters: make(map[string]*counter), + countersSlice: make([]*counter, 0, _defaultInitialSliceSize), + defaultBuckets: opts.DefaultBuckets, + done: make(chan struct{}), + gauges: make(map[string]*gauge), + gaugesSlice: make([]*gauge, 0, _defaultInitialSliceSize), + histograms: make(map[string]*histogram), + histogramsSlice: make([]*histogram, 0, _defaultInitialSliceSize), + prefix: sanitizer.Name(opts.Prefix), + reporter: opts.Reporter, + sanitizer: sanitizer, + separator: sanitizer.Name(opts.Separator), + timers: make(map[string]*timer), + root: true, + counterChangeNotifyCh: make(chan *counter, 1024), + gaugeChangeNotifyCh: make(chan *gauge, 1024), } // NB(r): Take a copy of the tags on creation @@ -196,7 +201,7 @@ func newRootScope(opts ScopeOptions, interval time.Duration) *scope { s.wg.Add(1) go func() { defer s.wg.Done() - s.reportLoop(interval) + s.processLoop(interval) }() } @@ -281,6 +286,48 @@ func (s *scope) reportRegistry() { } } +func (s *scope) processLoop(interval time.Duration) { + ticker := time.NewTicker(interval) + counters := make([]*counter, 0, _defaultInitialSliceSize) + gauges := make([]*gauge, 0, _defaultInitialSliceSize) + + defer ticker.Stop() + for { + select { + case c := <-s.counterChangeNotifyCh: + counters = append(counters, c) + case g := <-s.gaugeChangeNotifyCh: + gauges = append(gauges, g) + case <-ticker.C: + s.reportChanges(counters, gauges) + s.cachedReporter.Flush() + // Reset the changed counters and gauges + var zeroCounter *counter + for i := range counters { + counters[i] = zeroCounter + } + counters = counters[:0] + + var zeroGauge *gauge + for i := range gauges { + gauges[i] = zeroGauge + } + gauges = gauges[:0] + default: + return + } + } +} + +func (s *scope) reportChanges(counters []*counter, gauges []*gauge) { + for _, c := range counters { + c.cachedReport() + } + for _, g := range gauges { + g.cachedReport() + } +} + func (s *scope) Counter(name string) Counter { name = s.sanitizer.Name(name) if c, ok := s.counter(name); ok { @@ -295,14 +342,18 @@ func (s *scope) Counter(name string) Counter { } var cachedCounter CachedCount + var changeNotifyFn func(c *counter) if s.cachedReporter != nil { cachedCounter = s.cachedReporter.AllocateCounter( s.fullyQualifiedName(name), s.tags, ) + changeNotifyFn = func(c *counter) { + s.counterChangeNotifyCh <- c + } } - c := newCounter(cachedCounter) + c := newCounter(cachedCounter, changeNotifyFn) s.counters[name] = c s.countersSlice = append(s.countersSlice, c) @@ -331,13 +382,17 @@ func (s *scope) Gauge(name string) Gauge { } var cachedGauge CachedGauge + var changeNotifyFn func(g *gauge) if s.cachedReporter != nil { cachedGauge = s.cachedReporter.AllocateGauge( s.fullyQualifiedName(name), s.tags, ) + changeNotifyFn = func(g *gauge) { + s.gaugeChangeNotifyCh <- g + } } - g := newGauge(cachedGauge) + g := newGauge(cachedGauge, changeNotifyFn) s.gauges[name] = g s.gaugesSlice = append(s.gaugesSlice, g) diff --git a/scope_registry.go b/scope_registry.go index c52514d..f2d2d22 100644 --- a/scope_registry.go +++ b/scope_registry.go @@ -196,6 +196,9 @@ func (r *scopeRegistry) Subscope(parent *scope, prefix string, tags map[string]s timers: make(map[string]*timer), bucketCache: parent.bucketCache, done: make(chan struct{}), + + counterChangeNotifyCh: parent.counterChangeNotifyCh, + gaugeChangeNotifyCh: parent.gaugeChangeNotifyCh, } subscopeBucket.s[key] = subscope if _, ok := r.lockedLookup(subscopeBucket, preSanitizeKey); !ok { diff --git a/stats.go b/stats.go index 204c306..99b1413 100644 --- a/stats.go +++ b/stats.go @@ -60,17 +60,30 @@ func (c *capabilities) Tagging() bool { } type counter struct { - prev int64 - curr int64 - cachedCount CachedCount -} - -func newCounter(cachedCount CachedCount) *counter { - return &counter{cachedCount: cachedCount} + prev int64 + curr int64 + updated uint64 + cachedCount CachedCount + changeNotifyFn func(c *counter) +} + +func newCounter( + cachedCount CachedCount, + changeNotifyFn func(c *counter), +) *counter { + return &counter{ + cachedCount: cachedCount, + changeNotifyFn: changeNotifyFn, + } } func (c *counter) Inc(v int64) { atomic.AddInt64(&c.curr, v) + if c.changeNotifyFn != nil { + if atomic.SwapUint64(&c.updated, 1) == 0 { + c.changeNotifyFn(c) + } + } } func (c *counter) value() int64 { @@ -99,7 +112,9 @@ func (c *counter) cachedReport() { return } - c.cachedCount.ReportCount(delta) + if atomic.SwapUint64(&c.updated, 0) == 1 { + c.cachedCount.ReportCount(delta) + } } func (c *counter) snapshot() int64 { @@ -107,18 +122,29 @@ func (c *counter) snapshot() int64 { } type gauge struct { - updated uint64 - curr uint64 - cachedGauge CachedGauge -} - -func newGauge(cachedGauge CachedGauge) *gauge { - return &gauge{cachedGauge: cachedGauge} + updated uint64 + curr uint64 + cachedGauge CachedGauge + changeNotifyFn func(g *gauge) +} + +func newGauge( + cachedGauge CachedGauge, + changeNotifyFn func(g *gauge), +) *gauge { + return &gauge{ + cachedGauge: cachedGauge, + changeNotifyFn: changeNotifyFn, + } } func (g *gauge) Update(v float64) { atomic.StoreUint64(&g.curr, math.Float64bits(v)) - atomic.StoreUint64(&g.updated, 1) + if atomic.SwapUint64(&g.updated, 1) == 0 { + if g.changeNotifyFn != nil { + g.changeNotifyFn(g) + } + } } func (g *gauge) value() float64 { @@ -297,7 +323,7 @@ func newHistogram( } for i := range h.samples { - h.samples[i].counter = newCounter(nil) + h.samples[i].counter = newCounter(nil, nil) if cachedHistogram != nil { switch htype { diff --git a/stats_test.go b/stats_test.go index 7d29e90..0967f87 100644 --- a/stats_test.go +++ b/stats_test.go @@ -84,7 +84,7 @@ func (r *statsTestReporter) Capabilities() Capabilities { func (r *statsTestReporter) Flush() {} func TestCounter(t *testing.T) { - counter := newCounter(nil) + counter := newCounter(nil, nil) r := newStatsTestReporter() counter.Inc(1) @@ -101,7 +101,7 @@ func TestCounter(t *testing.T) { } func TestGauge(t *testing.T) { - gauge := newGauge(nil) + gauge := newGauge(nil, nil) r := newStatsTestReporter() gauge.Update(42)