diff --git a/m3/config.go b/m3/config.go index cbeb4b6f..ccfb51ce 100644 --- a/m3/config.go +++ b/m3/config.go @@ -49,6 +49,10 @@ type Configuration struct { // HistogramBucketTagPrecision is precision to use when formatting the metric tag // with the histogram bucket bound values. HistogramBucketTagPrecision uint `yaml:"histogramBucketTagPrecision"` + + // CommonTagsInternal are tags that should be added to all internal metrics + // emitted by the reporter. + CommonTagsInternal map[string]string `yaml:"commonTagsInternal"` } // NewReporter creates a new M3 reporter from this configuration. @@ -66,5 +70,6 @@ func (c Configuration) NewReporter() (Reporter, error) { MaxPacketSizeBytes: c.PacketSize, IncludeHost: c.IncludeHost, HistogramBucketTagPrecision: c.HistogramBucketTagPrecision, + InternalTags: c.CommonTagsInternal, }) } diff --git a/m3/config_test.go b/m3/config_test.go index 690faaa4..20afd121 100644 --- a/m3/config_test.go +++ b/m3/config_test.go @@ -42,6 +42,7 @@ func TestConfigSimple(t *testing.T) { assert.True(t, ok) assert.True(t, tagEquals(reporter.commonTags, "service", "my-service")) assert.True(t, tagEquals(reporter.commonTags, "env", "test")) + assert.Equal(t, 0, len(c.CommonTagsInternal)) } func TestConfigMulti(t *testing.T) { diff --git a/m3/reporter.go b/m3/reporter.go index e25e436c..c930d5c2 100644 --- a/m3/reporter.go +++ b/m3/reporter.go @@ -149,6 +149,7 @@ type Options struct { HistogramBucketIDName string HistogramBucketName string HistogramBucketTagPrecision uint + InternalTags map[string]string } // NewReporter creates a new M3 reporter. @@ -288,6 +289,11 @@ func NewReporter(opts Options) (Reporter, error) { internalTags := map[string]string{ "version": tally.Version, } + + for k, v := range opts.InternalTags { + internalTags[k] = v + } + r.batchSizeHistogram = r.AllocateHistogram("tally.internal.batch-size", internalTags, buckets) r.numBatchesCounter = r.AllocateCounter("tally.internal.num-batches", internalTags) r.numMetricsCounter = r.AllocateCounter("tally.internal.num-metrics", internalTags) diff --git a/m3/reporter_test.go b/m3/reporter_test.go index f9fa43fb..6e5bbb0a 100644 --- a/m3/reporter_test.go +++ b/m3/reporter_test.go @@ -561,6 +561,58 @@ func TestReporterResetTagsAfterReturnToPool(t *testing.T) { require.Equal(t, 0, len(filtered[1].GetTags())) } +func TestReporterCommmonTagsInternal(t *testing.T) { + var wg sync.WaitGroup + server := newFakeM3Server(t, &wg, false, Compact) + go server.Serve() + defer server.Close() + + internalTags := map[string]string{ + "internal1": "test1", + "internal2": "test2", + } + + r, err := NewReporter(Options{ + HostPorts: []string{server.Addr}, + Service: "test-service", + CommonTags: defaultCommonTags, + MaxQueueSize: queueSize, + IncludeHost: true, + MaxPacketSizeBytes: maxPacketSize, + InternalTags: internalTags, + }) + require.NoError(t, err) + defer r.Close() + + c := r.AllocateCounter("testCounter1", nil) + c.ReportCount(1) + wg.Add(internalMetrics + 1) + r.Flush() + wg.Wait() + + numInternalMetricsActual := 0 + metrics := server.Service.getMetrics() + require.Equal(t, internalMetrics+1, len(metrics)) + for _, metric := range metrics { + if strings.HasPrefix(metric.Name, "tally.internal") { + numInternalMetricsActual++ + for k, v := range internalTags { + require.True(t, tagEquals(metric.Tags, k, v)) + } + } else { + require.Equal(t, "testCounter1", metric.Name) + require.False(t, tagIncluded(metric.Tags, "internal1")) + require.False(t, tagIncluded(metric.Tags, "internal2")) + } + // The following tags should not be present as part of the individual metrics + // as they are common tags. + require.False(t, tagIncluded(metric.Tags, "host")) + require.False(t, tagIncluded(metric.Tags, "instance")) + require.False(t, tagIncluded(metric.Tags, "service")) + } + require.Equal(t, internalMetrics, numInternalMetricsActual) +} + func TestReporterHasReportingAndTaggingCapability(t *testing.T) { r, err := NewReporter(Options{ HostPorts: []string{"127.0.0.1:9052"}, @@ -589,6 +641,13 @@ type fakeM3ServerPackets struct { values [][]byte } +// newFakeM3Server creates a new fake M3 server that listens on a random port +// and returns the server. +// The server will wait for the given wait group to be done before returning. +// If countBatches is true, the server will wait consider the wg.Add()s to be +// representing batches and will do a eg.Done() for each encountered batch. +// But if countBatches is false, the server will do the same thing but for individual +// metrics instead of batches. func newFakeM3Server(t *testing.T, wg *sync.WaitGroup, countBatches bool, protocol Protocol) *fakeM3Server { service := newFakeM3Service(wg, countBatches) processor := m3thrift.NewM3Processor(service) diff --git a/version.go b/version.go index ac578d04..07ee470a 100644 --- a/version.go +++ b/version.go @@ -21,4 +21,4 @@ package tally // Version is the current version of the library. -const Version = "4.1.8" +const Version = "4.1.9"