Skip to content

Commit

Permalink
add custom tags to internal metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
shaan420 committed Oct 3, 2023
1 parent be77342 commit 0c4e7fe
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 0 deletions.
5 changes: 5 additions & 0 deletions m3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ type Configuration struct {
// HistogramBucketTagPrecision is precision to use when formatting the metric tag
// with the histogram bucket bound values.
HistogramBucketTagPrecision uint `yaml:"histogramBucketTagPrecision"`

// CommonTagsInternal are tags that should be added to all internal metrics
// emitted by the reporter.
CommonTagsInternal map[string]string `yaml:"commonTagsInternal"`
}

// NewReporter creates a new M3 reporter from this configuration.
Expand All @@ -66,5 +70,6 @@ func (c Configuration) NewReporter() (Reporter, error) {
MaxPacketSizeBytes: c.PacketSize,
IncludeHost: c.IncludeHost,
HistogramBucketTagPrecision: c.HistogramBucketTagPrecision,
CommonTagsInternal: c.CommonTagsInternal,
})
}
1 change: 1 addition & 0 deletions m3/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func TestConfigSimple(t *testing.T) {
assert.True(t, ok)
assert.True(t, tagEquals(reporter.commonTags, "service", "my-service"))
assert.True(t, tagEquals(reporter.commonTags, "env", "test"))
assert.Equal(t, 0, len(c.CommonTagsInternal))
}

func TestConfigMulti(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions m3/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ type Options struct {
HistogramBucketIDName string
HistogramBucketName string
HistogramBucketTagPrecision uint
CommonTagsInternal map[string]string
}

// NewReporter creates a new M3 reporter.
Expand Down Expand Up @@ -288,6 +289,11 @@ func NewReporter(opts Options) (Reporter, error) {
internalTags := map[string]string{
"version": tally.Version,
}

for k, v := range opts.CommonTagsInternal {
internalTags[k] = v
}

r.batchSizeHistogram = r.AllocateHistogram("tally.internal.batch-size", internalTags, buckets)
r.numBatchesCounter = r.AllocateCounter("tally.internal.num-batches", internalTags)
r.numMetricsCounter = r.AllocateCounter("tally.internal.num-metrics", internalTags)
Expand Down
43 changes: 43 additions & 0 deletions m3/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,42 @@ func TestReporterResetTagsAfterReturnToPool(t *testing.T) {
require.Equal(t, 0, len(filtered[1].GetTags()))
}

func TestReporterCommmonTagsInternal(t *testing.T) {
var wg sync.WaitGroup
server := newFakeM3Server(t, &wg, false, Compact)
go server.Serve()
defer server.Close()

commonTagsInternal := map[string]string{
"internal1": "test1",
"internal2": "test2",
}

r, err := NewReporter(Options{
HostPorts: []string{server.Addr},
Service: "test-service",
CommonTags: defaultCommonTags,
MaxQueueSize: queueSize,
MaxPacketSizeBytes: maxPacketSize,
CommonTagsInternal: commonTagsInternal,
})
require.NoError(t, err)
defer r.Close()

wg.Add(internalMetrics)
r.Flush()
wg.Wait()

metrics := server.Service.getMetrics()
require.Equal(t, internalMetrics, len(metrics))
for _, metric := range metrics {
require.True(t, strings.HasPrefix(metric.Name, "tally.internal"))
for k, v := range commonTagsInternal {
require.True(t, tagEquals(metric.Tags, k, v))
}
}
}

func TestReporterHasReportingAndTaggingCapability(t *testing.T) {
r, err := NewReporter(Options{
HostPorts: []string{"127.0.0.1:9052"},
Expand Down Expand Up @@ -589,6 +625,13 @@ type fakeM3ServerPackets struct {
values [][]byte
}

// newFakeM3Server creates a new fake M3 server that listens on a random port
// and returns the address.
// The server will wait for the given wait group to be done before returning.
// If countBatches is true, the server will wait for a batch to be received
// before returning.
// But if countBatches is false, the server will wait for a metric to be received
// before returning.
func newFakeM3Server(t *testing.T, wg *sync.WaitGroup, countBatches bool, protocol Protocol) *fakeM3Server {
service := newFakeM3Service(wg, countBatches)
processor := m3thrift.NewM3Processor(service)
Expand Down

0 comments on commit 0c4e7fe

Please sign in to comment.