diff --git a/collectors/delta_counter.go b/collectors/delta_counter.go index 4a5c6f60..6e72a30d 100644 --- a/collectors/delta_counter.go +++ b/collectors/delta_counter.go @@ -42,10 +42,13 @@ type DeltaCounterStore interface { ListMetrics(metricDescriptorName string) map[string][]*CollectedMetric } -type metricEntry = map[uint64]*CollectedMetric +type metricEntry struct { + collected map[uint64]*CollectedMetric + mutex *sync.RWMutex +} type inMemoryDeltaCounterStore struct { - store map[string]metricEntry + store map[string]*metricEntry ttl time.Duration storeMutex *sync.RWMutex logger log.Logger @@ -53,38 +56,44 @@ type inMemoryDeltaCounterStore struct { // NewInMemoryDeltaCounterStore returns an implementation of DeltaCounterStore which is persisted in-memory func NewInMemoryDeltaCounterStore(logger log.Logger, ttl time.Duration) DeltaCounterStore { - return inMemoryDeltaCounterStore{ - store: map[string]metricEntry{}, + return &inMemoryDeltaCounterStore{ + store: map[string]*metricEntry{}, storeMutex: &sync.RWMutex{}, logger: logger, ttl: ttl, } } -func (s inMemoryDeltaCounterStore) Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *ConstMetric) { +func (s *inMemoryDeltaCounterStore) Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *ConstMetric) { if currentValue == nil { return } - var entry metricEntry + var entry *metricEntry s.storeMutex.Lock() if _, exists := s.store[metricDescriptor.Name]; !exists { - s.store[metricDescriptor.Name] = metricEntry{} + s.store[metricDescriptor.Name] = &metricEntry{ + collected: map[uint64]*CollectedMetric{}, + mutex: &sync.RWMutex{}, + } } entry = s.store[metricDescriptor.Name] s.storeMutex.Unlock() key := toCounterKey(currentValue) - existing := entry[key] + + entry.mutex.Lock() + defer entry.mutex.Unlock() + existing := entry.collected[key] if existing == nil { level.Debug(s.logger).Log("msg", "Tracking new counter", "fqName", currentValue.fqName, "key", key, "current_value", currentValue.value, "incoming_time", currentValue.reportTime) - entry[key] = &CollectedMetric{currentValue, time.Now()} + entry.collected[key] = &CollectedMetric{currentValue, time.Now()} return } if existing.metric.reportTime.Before(currentValue.reportTime) { - level.Debug(s.logger).Log("msg", "Incrementing existing counter", "fqName", currentValue.fqName, "key", key, "current_value", existing.metric.value, "adding", currentValue.value, "last_reported_time", entry[key].metric.reportTime, "incoming_time", currentValue.reportTime) + level.Debug(s.logger).Log("msg", "Incrementing existing counter", "fqName", currentValue.fqName, "key", key, "current_value", existing.metric.value, "adding", currentValue.value, "last_reported_time", entry.collected[key].metric.reportTime, "incoming_time", currentValue.reportTime) currentValue.value = currentValue.value + existing.metric.value existing.metric = currentValue existing.lastCollectedAt = time.Now() @@ -113,24 +122,26 @@ func toCounterKey(c *ConstMetric) uint64 { return h } -func (s inMemoryDeltaCounterStore) ListMetrics(metricDescriptorName string) map[string][]*CollectedMetric { +func (s *inMemoryDeltaCounterStore) ListMetrics(metricDescriptorName string) map[string][]*CollectedMetric { output := map[string][]*CollectedMetric{} now := time.Now() ttlWindowStart := now.Add(-s.ttl) s.storeMutex.Lock() - metric := s.store[metricDescriptorName] - if metric == nil { + entry := s.store[metricDescriptorName] + if entry == nil { s.storeMutex.Unlock() return output } s.storeMutex.Unlock() - for key, collected := range metric { + entry.mutex.Lock() + defer entry.mutex.Unlock() + for key, collected := range entry.collected { //Scan and remove metrics which are outside the TTL if ttlWindowStart.After(collected.lastCollectedAt) { level.Debug(s.logger).Log("msg", "Deleting counter entry outside of TTL", "key", key, "fqName", collected.metric.fqName) - delete(metric, key) + delete(entry.collected, key) continue } diff --git a/collectors/delta_distribution.go b/collectors/delta_distribution.go index fe1b8db6..7086b0c5 100644 --- a/collectors/delta_distribution.go +++ b/collectors/delta_distribution.go @@ -42,10 +42,13 @@ type DeltaDistributionStore interface { ListMetrics(metricDescriptorName string) map[string][]*CollectedHistogram } -type histogramEntry = map[uint64]*CollectedHistogram +type histogramEntry struct { + collected map[uint64]*CollectedHistogram + mutex *sync.RWMutex +} type inMemoryDeltaDistributionStore struct { - store map[string]histogramEntry + store map[string]*histogramEntry ttl time.Duration storeMutex *sync.RWMutex logger log.Logger @@ -53,33 +56,39 @@ type inMemoryDeltaDistributionStore struct { // NewInMemoryDeltaDistributionStore returns an implementation of DeltaDistributionStore which is persisted in-memory func NewInMemoryDeltaDistributionStore(logger log.Logger, ttl time.Duration) DeltaDistributionStore { - return inMemoryDeltaDistributionStore{ - store: map[string]histogramEntry{}, + return &inMemoryDeltaDistributionStore{ + store: map[string]*histogramEntry{}, storeMutex: &sync.RWMutex{}, logger: logger, ttl: ttl, } } -func (s inMemoryDeltaDistributionStore) Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *HistogramMetric) { +func (s *inMemoryDeltaDistributionStore) Increment(metricDescriptor *monitoring.MetricDescriptor, currentValue *HistogramMetric) { if currentValue == nil { return } - var entry histogramEntry + var entry *histogramEntry s.storeMutex.Lock() if _, exists := s.store[metricDescriptor.Name]; !exists { - s.store[metricDescriptor.Name] = histogramEntry{} + s.store[metricDescriptor.Name] = &histogramEntry{ + collected: map[uint64]*CollectedHistogram{}, + mutex: &sync.RWMutex{}, + } } entry = s.store[metricDescriptor.Name] s.storeMutex.Unlock() key := toHistogramKey(currentValue) - existing := entry[key] + + entry.mutex.Lock() + defer entry.mutex.Unlock() + existing := entry.collected[key] if existing == nil { level.Debug(s.logger).Log("msg", "Tracking new histogram", "fqName", currentValue.fqName, "key", key, "incoming_time", currentValue.reportTime) - entry[key] = &CollectedHistogram{histogram: currentValue, lastCollectedAt: time.Now()} + entry.collected[key] = &CollectedHistogram{histogram: currentValue, lastCollectedAt: time.Now()} return } @@ -133,7 +142,7 @@ func mergeHistograms(existing *HistogramMetric, current *HistogramMetric) *Histo return current } -func (s inMemoryDeltaDistributionStore) ListMetrics(metricDescriptorName string) map[string][]*CollectedHistogram { +func (s *inMemoryDeltaDistributionStore) ListMetrics(metricDescriptorName string) map[string][]*CollectedHistogram { output := map[string][]*CollectedHistogram{} now := time.Now() ttlWindowStart := now.Add(-s.ttl) @@ -146,11 +155,13 @@ func (s inMemoryDeltaDistributionStore) ListMetrics(metricDescriptorName string) } s.storeMutex.Unlock() - for key, collected := range entry { + entry.mutex.Lock() + defer entry.mutex.Unlock() + for key, collected := range entry.collected { //Scan and remove metrics which are outside the TTL if ttlWindowStart.After(collected.lastCollectedAt) { level.Debug(s.logger).Log("msg", "Deleting histogram entry outside of TTL", "key", key, "fqName", collected.histogram.fqName) - delete(entry, key) + delete(entry.collected, key) continue }