From 765352659bcf60b1ea2e86ae479ec954c68ce7e8 Mon Sep 17 00:00:00 2001 From: Shankar Nair Date: Tue, 24 Oct 2023 23:15:39 -0700 Subject: [PATCH 1/4] Process only changed metrics --- scope.go | 86 ++++++++++++++++++++++++++++++++++++----------- scope_registry.go | 3 ++ stats.go | 60 +++++++++++++++++++++++---------- stats_test.go | 4 +-- 4 files changed, 114 insertions(+), 39 deletions(-) diff --git a/scope.go b/scope.go index 0b301040..de59dcaa 100644 --- a/scope.go +++ b/scope.go @@ -102,6 +102,11 @@ type scope struct { done chan struct{} wg sync.WaitGroup root bool + + counterChangeNotifyCh chan *counter + gaugeChangeNotifyCh chan *gauge + changedCounters []*counter + changedGauges []*gauge } // ScopeOptions is a set of options to construct a scope. @@ -166,23 +171,27 @@ func newRootScope(opts ScopeOptions, interval time.Duration) *scope { } s := &scope{ - baseReporter: baseReporter, - bucketCache: newBucketCache(), - cachedReporter: opts.CachedReporter, - counters: make(map[string]*counter), - countersSlice: make([]*counter, 0, _defaultInitialSliceSize), - defaultBuckets: opts.DefaultBuckets, - done: make(chan struct{}), - gauges: make(map[string]*gauge), - gaugesSlice: make([]*gauge, 0, _defaultInitialSliceSize), - histograms: make(map[string]*histogram), - histogramsSlice: make([]*histogram, 0, _defaultInitialSliceSize), - prefix: sanitizer.Name(opts.Prefix), - reporter: opts.Reporter, - sanitizer: sanitizer, - separator: sanitizer.Name(opts.Separator), - timers: make(map[string]*timer), - root: true, + baseReporter: baseReporter, + bucketCache: newBucketCache(), + cachedReporter: opts.CachedReporter, + counters: make(map[string]*counter), + countersSlice: make([]*counter, 0, _defaultInitialSliceSize), + defaultBuckets: opts.DefaultBuckets, + done: make(chan struct{}), + gauges: make(map[string]*gauge), + gaugesSlice: make([]*gauge, 0, _defaultInitialSliceSize), + histograms: make(map[string]*histogram), + histogramsSlice: make([]*histogram, 0, _defaultInitialSliceSize), + prefix: sanitizer.Name(opts.Prefix), + reporter: opts.Reporter, + sanitizer: sanitizer, + separator: sanitizer.Name(opts.Separator), + timers: make(map[string]*timer), + root: true, + counterChangeNotifyCh: make(chan *counter, 1024), + gaugeChangeNotifyCh: make(chan *gauge, 1024), + changedCounters: make([]*counter, 0, _defaultInitialSliceSize), + changedGauges: make([]*gauge, 0, _defaultInitialSliceSize), } // NB(r): Take a copy of the tags on creation @@ -196,7 +205,7 @@ func newRootScope(opts ScopeOptions, interval time.Duration) *scope { s.wg.Add(1) go func() { defer s.wg.Done() - s.reportLoop(interval) + s.processLoop(interval) }() } @@ -281,6 +290,35 @@ func (s *scope) reportRegistry() { } } +func (s *scope) processLoop(interval time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case c := <-s.counterChangeNotifyCh: + s.changedCounters = append(s.changedCounters, c) + case g := <-s.gaugeChangeNotifyCh: + s.changedGauges = append(s.changedGauges, g) + case <-ticker.C: + s.reportChanges(s.changedCounters, s.changedGauges) + // Reset the changed counters and gauges + s.changedCounters = s.changedCounters[:0] + s.changedGauges = s.changedGauges[:0] + default: + return + } + } +} + +func (s *scope) reportChanges(counters []*counter, gauges []*gauge) { + for _, c := range counters { + c.cachedReport() + } + for _, g := range gauges { + g.cachedReport() + } +} + func (s *scope) Counter(name string) Counter { name = s.sanitizer.Name(name) if c, ok := s.counter(name); ok { @@ -295,14 +333,18 @@ func (s *scope) Counter(name string) Counter { } var cachedCounter CachedCount + var changeNotifyFn func(c *counter) if s.cachedReporter != nil { cachedCounter = s.cachedReporter.AllocateCounter( s.fullyQualifiedName(name), s.tags, ) + changeNotifyFn = func(c *counter) { + s.counterChangeNotifyCh <- c + } } - c := newCounter(cachedCounter) + c := newCounter(cachedCounter, changeNotifyFn) s.counters[name] = c s.countersSlice = append(s.countersSlice, c) @@ -331,13 +373,17 @@ func (s *scope) Gauge(name string) Gauge { } var cachedGauge CachedGauge + var changeNotifyFn func(g *gauge) if s.cachedReporter != nil { cachedGauge = s.cachedReporter.AllocateGauge( s.fullyQualifiedName(name), s.tags, ) + changeNotifyFn = func(g *gauge) { + s.gaugeChangeNotifyCh <- g + } } - g := newGauge(cachedGauge) + g := newGauge(cachedGauge, changeNotifyFn) s.gauges[name] = g s.gaugesSlice = append(s.gaugesSlice, g) diff --git a/scope_registry.go b/scope_registry.go index c52514db..f2d2d224 100644 --- a/scope_registry.go +++ b/scope_registry.go @@ -196,6 +196,9 @@ func (r *scopeRegistry) Subscope(parent *scope, prefix string, tags map[string]s timers: make(map[string]*timer), bucketCache: parent.bucketCache, done: make(chan struct{}), + + counterChangeNotifyCh: parent.counterChangeNotifyCh, + gaugeChangeNotifyCh: parent.gaugeChangeNotifyCh, } subscopeBucket.s[key] = subscope if _, ok := r.lockedLookup(subscopeBucket, preSanitizeKey); !ok { diff --git a/stats.go b/stats.go index 204c3067..99b14139 100644 --- a/stats.go +++ b/stats.go @@ -60,17 +60,30 @@ func (c *capabilities) Tagging() bool { } type counter struct { - prev int64 - curr int64 - cachedCount CachedCount -} - -func newCounter(cachedCount CachedCount) *counter { - return &counter{cachedCount: cachedCount} + prev int64 + curr int64 + updated uint64 + cachedCount CachedCount + changeNotifyFn func(c *counter) +} + +func newCounter( + cachedCount CachedCount, + changeNotifyFn func(c *counter), +) *counter { + return &counter{ + cachedCount: cachedCount, + changeNotifyFn: changeNotifyFn, + } } func (c *counter) Inc(v int64) { atomic.AddInt64(&c.curr, v) + if c.changeNotifyFn != nil { + if atomic.SwapUint64(&c.updated, 1) == 0 { + c.changeNotifyFn(c) + } + } } func (c *counter) value() int64 { @@ -99,7 +112,9 @@ func (c *counter) cachedReport() { return } - c.cachedCount.ReportCount(delta) + if atomic.SwapUint64(&c.updated, 0) == 1 { + c.cachedCount.ReportCount(delta) + } } func (c *counter) snapshot() int64 { @@ -107,18 +122,29 @@ func (c *counter) snapshot() int64 { } type gauge struct { - updated uint64 - curr uint64 - cachedGauge CachedGauge -} - -func newGauge(cachedGauge CachedGauge) *gauge { - return &gauge{cachedGauge: cachedGauge} + updated uint64 + curr uint64 + cachedGauge CachedGauge + changeNotifyFn func(g *gauge) +} + +func newGauge( + cachedGauge CachedGauge, + changeNotifyFn func(g *gauge), +) *gauge { + return &gauge{ + cachedGauge: cachedGauge, + changeNotifyFn: changeNotifyFn, + } } func (g *gauge) Update(v float64) { atomic.StoreUint64(&g.curr, math.Float64bits(v)) - atomic.StoreUint64(&g.updated, 1) + if atomic.SwapUint64(&g.updated, 1) == 0 { + if g.changeNotifyFn != nil { + g.changeNotifyFn(g) + } + } } func (g *gauge) value() float64 { @@ -297,7 +323,7 @@ func newHistogram( } for i := range h.samples { - h.samples[i].counter = newCounter(nil) + h.samples[i].counter = newCounter(nil, nil) if cachedHistogram != nil { switch htype { diff --git a/stats_test.go b/stats_test.go index 7d29e90e..0967f87e 100644 --- a/stats_test.go +++ b/stats_test.go @@ -84,7 +84,7 @@ func (r *statsTestReporter) Capabilities() Capabilities { func (r *statsTestReporter) Flush() {} func TestCounter(t *testing.T) { - counter := newCounter(nil) + counter := newCounter(nil, nil) r := newStatsTestReporter() counter.Inc(1) @@ -101,7 +101,7 @@ func TestCounter(t *testing.T) { } func TestGauge(t *testing.T) { - gauge := newGauge(nil) + gauge := newGauge(nil, nil) r := newStatsTestReporter() gauge.Update(42) From 99dbd5452d043b9f4f19e5a7f49ea58cd56d80ce Mon Sep 17 00:00:00 2001 From: Shankar Nair Date: Thu, 26 Oct 2023 16:51:29 -0700 Subject: [PATCH 2/4] optimize maintaining of changed counters and gauges --- scope.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/scope.go b/scope.go index de59dcaa..d09c8d89 100644 --- a/scope.go +++ b/scope.go @@ -105,8 +105,6 @@ type scope struct { counterChangeNotifyCh chan *counter gaugeChangeNotifyCh chan *gauge - changedCounters []*counter - changedGauges []*gauge } // ScopeOptions is a set of options to construct a scope. @@ -190,8 +188,6 @@ func newRootScope(opts ScopeOptions, interval time.Duration) *scope { root: true, counterChangeNotifyCh: make(chan *counter, 1024), gaugeChangeNotifyCh: make(chan *gauge, 1024), - changedCounters: make([]*counter, 0, _defaultInitialSliceSize), - changedGauges: make([]*gauge, 0, _defaultInitialSliceSize), } // NB(r): Take a copy of the tags on creation @@ -292,18 +288,21 @@ func (s *scope) reportRegistry() { func (s *scope) processLoop(interval time.Duration) { ticker := time.NewTicker(interval) + counters := make([]*counter, 0, _defaultInitialSliceSize) + gauges := make([]*gauge, 0, _defaultInitialSliceSize) + defer ticker.Stop() for { select { case c := <-s.counterChangeNotifyCh: - s.changedCounters = append(s.changedCounters, c) + counters = append(counters, c) case g := <-s.gaugeChangeNotifyCh: - s.changedGauges = append(s.changedGauges, g) + gauges = append(gauges, g) case <-ticker.C: - s.reportChanges(s.changedCounters, s.changedGauges) + s.reportChanges(counters, gauges) // Reset the changed counters and gauges - s.changedCounters = s.changedCounters[:0] - s.changedGauges = s.changedGauges[:0] + counters = counters[:0] + gauges = gauges[:0] default: return } From 706152f4fd142cf77c6ae320f76cce0cf769b36b Mon Sep 17 00:00:00 2001 From: Shankar Nair Date: Thu, 26 Oct 2023 17:02:54 -0700 Subject: [PATCH 3/4] fix Flush --- scope.go | 1 + 1 file changed, 1 insertion(+) diff --git a/scope.go b/scope.go index d09c8d89..48f87f04 100644 --- a/scope.go +++ b/scope.go @@ -300,6 +300,7 @@ func (s *scope) processLoop(interval time.Duration) { gauges = append(gauges, g) case <-ticker.C: s.reportChanges(counters, gauges) + s.cachedReporter.Flush() // Reset the changed counters and gauges counters = counters[:0] gauges = gauges[:0] From 3bfe23dc2063bfcde7b17ae10295d30a04d96b59 Mon Sep 17 00:00:00 2001 From: Shankar Nair Date: Mon, 30 Oct 2023 15:50:12 -0700 Subject: [PATCH 4/4] fix reset of slices --- scope.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/scope.go b/scope.go index 48f87f04..dd310181 100644 --- a/scope.go +++ b/scope.go @@ -302,7 +302,16 @@ func (s *scope) processLoop(interval time.Duration) { s.reportChanges(counters, gauges) s.cachedReporter.Flush() // Reset the changed counters and gauges + var zeroCounter *counter + for i := range counters { + counters[i] = zeroCounter + } counters = counters[:0] + + var zeroGauge *gauge + for i := range gauges { + gauges[i] = zeroGauge + } gauges = gauges[:0] default: return