From 0c4e7fe662a1410358f1ea59c8bd6b721b16c406 Mon Sep 17 00:00:00 2001 From: Shankar Nair Date: Mon, 2 Oct 2023 22:53:15 -0700 Subject: [PATCH] add custom tags to internal metrics --- m3/config.go | 5 +++++ m3/config_test.go | 1 + m3/reporter.go | 6 ++++++ m3/reporter_test.go | 43 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 55 insertions(+) diff --git a/m3/config.go b/m3/config.go index cbeb4b6f..e8861e0d 100644 --- a/m3/config.go +++ b/m3/config.go @@ -49,6 +49,10 @@ type Configuration struct { // HistogramBucketTagPrecision is precision to use when formatting the metric tag // with the histogram bucket bound values. HistogramBucketTagPrecision uint `yaml:"histogramBucketTagPrecision"` + + // CommonTagsInternal are tags that should be added to all internal metrics + // emitted by the reporter. + CommonTagsInternal map[string]string `yaml:"commonTagsInternal"` } // NewReporter creates a new M3 reporter from this configuration. @@ -66,5 +70,6 @@ func (c Configuration) NewReporter() (Reporter, error) { MaxPacketSizeBytes: c.PacketSize, IncludeHost: c.IncludeHost, HistogramBucketTagPrecision: c.HistogramBucketTagPrecision, + CommonTagsInternal: c.CommonTagsInternal, }) } diff --git a/m3/config_test.go b/m3/config_test.go index 690faaa4..20afd121 100644 --- a/m3/config_test.go +++ b/m3/config_test.go @@ -42,6 +42,7 @@ func TestConfigSimple(t *testing.T) { assert.True(t, ok) assert.True(t, tagEquals(reporter.commonTags, "service", "my-service")) assert.True(t, tagEquals(reporter.commonTags, "env", "test")) + assert.Equal(t, 0, len(c.CommonTagsInternal)) } func TestConfigMulti(t *testing.T) { diff --git a/m3/reporter.go b/m3/reporter.go index e25e436c..8c3b1e3d 100644 --- a/m3/reporter.go +++ b/m3/reporter.go @@ -149,6 +149,7 @@ type Options struct { HistogramBucketIDName string HistogramBucketName string HistogramBucketTagPrecision uint + CommonTagsInternal map[string]string } // NewReporter creates a new M3 reporter. @@ -288,6 +289,11 @@ func NewReporter(opts Options) (Reporter, error) { internalTags := map[string]string{ "version": tally.Version, } + + for k, v := range opts.CommonTagsInternal { + internalTags[k] = v + } + r.batchSizeHistogram = r.AllocateHistogram("tally.internal.batch-size", internalTags, buckets) r.numBatchesCounter = r.AllocateCounter("tally.internal.num-batches", internalTags) r.numMetricsCounter = r.AllocateCounter("tally.internal.num-metrics", internalTags) diff --git a/m3/reporter_test.go b/m3/reporter_test.go index f9fa43fb..daa2feec 100644 --- a/m3/reporter_test.go +++ b/m3/reporter_test.go @@ -561,6 +561,42 @@ func TestReporterResetTagsAfterReturnToPool(t *testing.T) { require.Equal(t, 0, len(filtered[1].GetTags())) } +func TestReporterCommmonTagsInternal(t *testing.T) { + var wg sync.WaitGroup + server := newFakeM3Server(t, &wg, false, Compact) + go server.Serve() + defer server.Close() + + commonTagsInternal := map[string]string{ + "internal1": "test1", + "internal2": "test2", + } + + r, err := NewReporter(Options{ + HostPorts: []string{server.Addr}, + Service: "test-service", + CommonTags: defaultCommonTags, + MaxQueueSize: queueSize, + MaxPacketSizeBytes: maxPacketSize, + CommonTagsInternal: commonTagsInternal, + }) + require.NoError(t, err) + defer r.Close() + + wg.Add(internalMetrics) + r.Flush() + wg.Wait() + + metrics := server.Service.getMetrics() + require.Equal(t, internalMetrics, len(metrics)) + for _, metric := range metrics { + require.True(t, strings.HasPrefix(metric.Name, "tally.internal")) + for k, v := range commonTagsInternal { + require.True(t, tagEquals(metric.Tags, k, v)) + } + } +} + func TestReporterHasReportingAndTaggingCapability(t *testing.T) { r, err := NewReporter(Options{ HostPorts: []string{"127.0.0.1:9052"}, @@ -589,6 +625,13 @@ type fakeM3ServerPackets struct { values [][]byte } +// newFakeM3Server creates a new fake M3 server that listens on a random port +// and returns the address. +// The server will wait for the given wait group to be done before returning. +// If countBatches is true, the server will wait for a batch to be received +// before returning. +// But if countBatches is false, the server will wait for a metric to be received +// before returning. func newFakeM3Server(t *testing.T, wg *sync.WaitGroup, countBatches bool, protocol Protocol) *fakeM3Server { service := newFakeM3Service(wg, countBatches) processor := m3thrift.NewM3Processor(service)