From 7bfe787cb74dc1dcb8f54dcc7c20b0f1d6516146 Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Tue, 4 Feb 2025 11:01:15 -0800 Subject: [PATCH 1/3] Add backward compatibility configs for tally to otel migration --- common/metrics/config.go | 17 ++++ common/metrics/config_test.go | 22 +++-- common/metrics/consts.go | 1 + common/metrics/defs.go | 2 +- common/metrics/opentelemetry_provider.go | 9 +- common/metrics/otel_metrics_handler.go | 52 ++++++++--- common/metrics/otel_metrics_handler_test.go | 96 +++++++++++++++++--- common/metrics/otel_options.go | 13 ++- common/metrics/otel_options_test.go | 19 ++-- common/metrics/tally_metrics_handler_test.go | 9 +- 10 files changed, 190 insertions(+), 50 deletions(-) diff --git a/common/metrics/config.go b/common/metrics/config.go index 3b896f3ac16..5a4b4759e56 100644 --- a/common/metrics/config.go +++ b/common/metrics/config.go @@ -72,6 +72,17 @@ type ( // - "milliseconds" // - "bytes" PerUnitHistogramBoundaries map[string][]float64 `yaml:"perUnitHistogramBoundaries"` + + // Following configs are added for backwards compatibility when switching from tally to opentelemetry + // Both configs should be set to true when using opentelemetry framework to have the same behavior as tally. + + // WithoutUnits controls if metric unit should be added to the metric name as a suffix. + // This config only takes effect when using opentelemetry framework. + WithoutUnits bool `yaml:"withoutUnits"` + // RecordTimerInSeconds controls if Timer metric should be emitted as number of seconds + // (instead of milliseconds). + // This config only takes effect when using opentelemetry framework. + RecordTimerInSeconds bool `yaml:"timerInSeconds"` } // StatsdConfig contains the config items for statsd metrics reporter @@ -389,6 +400,12 @@ func setDefaultPerUnitHistogramBoundaries(clientConfig *ClientConfig) { buckets[Bytes] = bucket } + bucketInSeconds := make([]float64, len(buckets[Milliseconds])) + for idx, boundary := range buckets[Milliseconds] { + bucketInSeconds[idx] = boundary / float64(time.Second/time.Millisecond) + } + buckets[Seconds] = bucketInSeconds + clientConfig.PerUnitHistogramBoundaries = buckets } diff --git a/common/metrics/config_test.go b/common/metrics/config_test.go index 800ad49fcc5..0d426b6acc4 100644 --- a/common/metrics/config_test.go +++ b/common/metrics/config_test.go @@ -135,22 +135,28 @@ func (s *MetricsSuite) TestSetDefaultPerUnitHistogramBoundaries() { expectResult map[string][]float64 } - customizedBoundaries := map[string][]float64{ - Dimensionless: {1}, - Milliseconds: defaultPerUnitHistogramBoundaries[Milliseconds], - Bytes: defaultPerUnitHistogramBoundaries[Bytes], - } testCases := []histogramTest{ { - input: nil, - expectResult: defaultPerUnitHistogramBoundaries, + input: nil, + expectResult: map[string][]float64{ + Dimensionless: defaultPerUnitHistogramBoundaries[Dimensionless], + Milliseconds: defaultPerUnitHistogramBoundaries[Milliseconds], + Seconds: {0.001, 0.002, 0.005, 0.010, 0.020, 0.050, 0.100, 0.200, 0.500, 1, 2, 5, 10, 20, 50, 100, 200, 500, 1_000}, + Bytes: defaultPerUnitHistogramBoundaries[Bytes], + }, }, { input: map[string][]float64{ UnitNameDimensionless: {1}, + UnitNameMilliseconds: {10, 1000, 2000}, "notDefine": {1}, }, - expectResult: customizedBoundaries, + expectResult: map[string][]float64{ + Dimensionless: {1}, + Milliseconds: {10, 1000, 2000}, + Seconds: {0.01, 1, 2}, + Bytes: defaultPerUnitHistogramBoundaries[Bytes], + }, }, } diff --git a/common/metrics/consts.go b/common/metrics/consts.go index 6070e6c2a89..bae3563b0bb 100644 --- a/common/metrics/consts.go +++ b/common/metrics/consts.go @@ -32,4 +32,5 @@ const ( Dimensionless = "1" Milliseconds = "ms" Bytes = "By" + Seconds = "s" ) diff --git a/common/metrics/defs.go b/common/metrics/defs.go index e01dc2c638f..2799fb4a832 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -46,7 +46,7 @@ type ( func NewTimerDef(name string, opts ...Option) timerDefinition { // This line cannot be combined with others! // This ensures the stack trace has information of the caller. - def := newMetricDefinition(name, append(opts, WithUnit(Milliseconds))...) + def := newMetricDefinition(name, opts...) globalRegistry.register(def) return timerDefinition{def} } diff --git a/common/metrics/opentelemetry_provider.go b/common/metrics/opentelemetry_provider.go index 325f405a186..64d1c3f6593 100644 --- a/common/metrics/opentelemetry_provider.go +++ b/common/metrics/opentelemetry_provider.go @@ -59,14 +59,19 @@ func NewOpenTelemetryProvider( clientConfig *ClientConfig, ) (*openTelemetryProviderImpl, error) { reg := prometheus.NewRegistry() - exporter, err := exporters.New(exporters.WithRegisterer(reg)) + exporterOpts := []exporters.Option{exporters.WithRegisterer(reg)} + if clientConfig.WithoutUnits { + exporterOpts = append(exporterOpts, exporters.WithoutUnits()) + } + exporter, err := exporters.New(exporterOpts...) + if err != nil { logger.Error("Failed to initialize prometheus exporter.", tag.Error(err)) return nil, err } var views []sdkmetrics.View - for _, u := range []string{Dimensionless, Bytes, Milliseconds} { + for _, u := range []string{Dimensionless, Bytes, Milliseconds, Seconds} { views = append(views, sdkmetrics.NewView( sdkmetrics.Instrument{ Kind: sdkmetrics.InstrumentKindHistogram, diff --git a/common/metrics/otel_metrics_handler.go b/common/metrics/otel_metrics_handler.go index 3d52064c16c..cbf466883d9 100644 --- a/common/metrics/otel_metrics_handler.go +++ b/common/metrics/otel_metrics_handler.go @@ -39,12 +39,13 @@ import ( // otelMetricsHandler is an adapter around an OpenTelemetry [metric.Meter] that implements the [Handler] interface. type ( otelMetricsHandler struct { - l log.Logger - set attribute.Set - provider OpenTelemetryProvider - excludeTags map[string]map[string]struct{} - catalog catalog - gauges *sync.Map // string -> *gaugeAdapter. note: shared between multiple otelMetricsHandlers + l log.Logger + set attribute.Set + provider OpenTelemetryProvider + excludeTags map[string]map[string]struct{} + catalog catalog + gauges *sync.Map // string -> *gaugeAdapter. note: shared between multiple otelMetricsHandlers + recordTimerInSeconds bool } // This is to work around the lack of synchronous gauge: @@ -84,13 +85,15 @@ func NewOtelMetricsHandler( if err != nil { return nil, fmt.Errorf("failed to build metrics catalog: %w", err) } + return &otelMetricsHandler{ - l: l, - set: makeInitialSet(cfg.Tags), - provider: o, - excludeTags: configExcludeTags(cfg), - catalog: c, - gauges: new(sync.Map), + l: l, + set: makeInitialSet(cfg.Tags), + provider: o, + excludeTags: configExcludeTags(cfg), + catalog: c, + gauges: new(sync.Map), + recordTimerInSeconds: cfg.RecordTimerInSeconds, }, nil } @@ -172,7 +175,14 @@ func (g *gaugeAdapterGauge) Record(v float64, tags ...Tag) { // Timer obtains a timer for the given name. func (omp *otelMetricsHandler) Timer(timer string) TimerIface { - opts := addOptions(omp, histogramOptions{metric.WithUnit(Milliseconds)}, timer) + if omp.recordTimerInSeconds { + return omp.timerInSeconds(timer) + } + return omp.timerInMilliseconds(timer) +} + +func (omp *otelMetricsHandler) timerInMilliseconds(timer string) TimerIface { + opts := addOptions(omp, int64HistogramOptions{metric.WithUnit(Milliseconds)}, timer) c, err := omp.provider.GetMeter().Int64Histogram(timer, opts...) if err != nil { omp.l.Error("error getting metric", tag.NewStringTag("MetricName", timer), tag.Error(err)) @@ -185,9 +195,23 @@ func (omp *otelMetricsHandler) Timer(timer string) TimerIface { }) } +func (omp *otelMetricsHandler) timerInSeconds(timer string) TimerIface { + opts := addOptions(omp, float64HistogramOptions{metric.WithUnit(Seconds)}, timer) + c, err := omp.provider.GetMeter().Float64Histogram(timer, opts...) + if err != nil { + omp.l.Error("error getting metric", tag.NewStringTag("MetricName", timer), tag.Error(err)) + return TimerFunc(func(i time.Duration, t ...Tag) {}) + } + + return TimerFunc(func(i time.Duration, t ...Tag) { + option := metric.WithAttributeSet(omp.makeSet(t)) + c.Record(context.Background(), i.Seconds(), option) + }) +} + // Histogram obtains a histogram for the given name. func (omp *otelMetricsHandler) Histogram(histogram string, unit MetricUnit) HistogramIface { - opts := addOptions(omp, histogramOptions{metric.WithUnit(string(unit))}, histogram) + opts := addOptions(omp, int64HistogramOptions{metric.WithUnit(string(unit))}, histogram) c, err := omp.provider.GetMeter().Int64Histogram(histogram, opts...) if err != nil { omp.l.Error("error getting metric", tag.NewStringTag("MetricName", histogram), tag.Error(err)) diff --git a/common/metrics/otel_metrics_handler_test.go b/common/metrics/otel_metrics_handler_test.go index 7af93588b7f..575a859b905 100644 --- a/common/metrics/otel_metrics_handler_test.go +++ b/common/metrics/otel_metrics_handler_test.go @@ -68,33 +68,33 @@ func TestMeter(t *testing.T) { sdkmetrics.NewView( sdkmetrics.Instrument{ Kind: sdkmetrics.InstrumentKindHistogram, - Unit: "By", + Unit: Bytes, }, sdkmetrics.Stream{ Aggregation: sdkmetrics.AggregationExplicitBucketHistogram{ - Boundaries: defaultConfig.PerUnitHistogramBoundaries["By"], + Boundaries: defaultConfig.PerUnitHistogramBoundaries[Bytes], }, }, ), sdkmetrics.NewView( sdkmetrics.Instrument{ Kind: sdkmetrics.InstrumentKindHistogram, - Unit: "1", + Unit: Dimensionless, }, sdkmetrics.Stream{ Aggregation: sdkmetrics.AggregationExplicitBucketHistogram{ - Boundaries: defaultConfig.PerUnitHistogramBoundaries["1"], + Boundaries: defaultConfig.PerUnitHistogramBoundaries[Dimensionless], }, }, ), sdkmetrics.NewView( sdkmetrics.Instrument{ Kind: sdkmetrics.InstrumentKindHistogram, - Unit: "ms", + Unit: Milliseconds, }, sdkmetrics.Stream{ Aggregation: sdkmetrics.AggregationExplicitBucketHistogram{ - Boundaries: defaultConfig.PerUnitHistogramBoundaries["ms"], + Boundaries: defaultConfig.PerUnitHistogramBoundaries[Milliseconds], }, }, ), @@ -171,7 +171,7 @@ func TestMeter(t *testing.T) { }, Temporality: metricdata.CumulativeTemporality, }, - Unit: "ms", + Unit: Milliseconds, }, { Name: "temp", @@ -200,7 +200,7 @@ func TestMeter(t *testing.T) { }, Temporality: metricdata.CumulativeTemporality, }, - Unit: "By", + Unit: Bytes, }, } if diff := cmp.Diff(want, got.ScopeMetrics[0].Metrics, @@ -223,22 +223,94 @@ func TestMeter(t *testing.T) { } } +func TestMeter_TimerInSeconds(t *testing.T) { + ctx := context.Background() + rdr := sdkmetrics.NewManualReader() + provider := sdkmetrics.NewMeterProvider( + sdkmetrics.WithReader(rdr), + sdkmetrics.WithView( + sdkmetrics.NewView( + sdkmetrics.Instrument{ + Kind: sdkmetrics.InstrumentKindHistogram, + Unit: Seconds, + }, + sdkmetrics.Stream{ + Aggregation: sdkmetrics.AggregationExplicitBucketHistogram{ + Boundaries: defaultConfig.PerUnitHistogramBoundaries[Seconds], + }, + }, + ), + ), + ) + + timerInSecondsConfig := defaultConfig + timerInSecondsConfig.RecordTimerInSeconds = true + p, err := NewOtelMetricsHandler( + log.NewTestLogger(), + &testProvider{meter: provider.Meter("test")}, + timerInSecondsConfig, + ) + require.NoError(t, err) + recordTimer(p) + + var got metricdata.ResourceMetrics + err = rdr.Collect(ctx, &got) + assert.Nil(t, err) + + want := []metricdata.Metrics{ + { + Name: "latency", + Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Count: 2, + BucketCounts: []uint64{0, 0, 0, 1, 1, 0}, + Min: metricdata.NewExtrema[float64](float64(minLatency) / 1000), + Max: metricdata.NewExtrema[float64](float64(maxLatency) / 1000), + Sum: (minLatency + maxLatency) / 1000, + Exemplars: []metricdata.Exemplar[float64]{}, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + Unit: Seconds, + }, + } + if diff := cmp.Diff(want, got.ScopeMetrics[0].Metrics, + cmp.Comparer(func(e1, e2 metricdata.Extrema[float64]) bool { + v1, ok1 := e1.Value() + v2, ok2 := e2.Value() + return ok1 && ok2 && v1 == v2 + }), + cmp.Comparer(func(a1, a2 attribute.Set) bool { + return a1.Equals(&a2) + }), + cmpopts.IgnoreFields(metricdata.HistogramDataPoint[float64]{}, "StartTime", "Time", "Bounds"), + ); diff != "" { + t.Errorf("mismatch (-want, +got):\n%s", diff) + } +} + func recordMetrics(mp Handler) { hitsCounter := mp.Counter("hits") gauge := mp.Gauge("temp") - - timer := mp.Timer("latency") histogram := mp.Histogram("transmission", Bytes) hitsTaggedCounter := mp.Counter("hits-tagged") hitsTaggedExcludedCounter := mp.Counter("hits-tagged-excluded") hitsCounter.Record(8) gauge.Record(100, StringTag("location", "Mare Imbrium")) - timer.Record(time.Duration(minLatency) * time.Millisecond) - timer.Record(time.Duration(maxLatency) * time.Millisecond) histogram.Record(int64(testBytes)) hitsTaggedCounter.Record(11, UnsafeTaskQueueTag("__sticky__")) hitsTaggedExcludedCounter.Record(14, UnsafeTaskQueueTag("filtered")) + + recordTimer(mp) +} + +func recordTimer(mp Handler) { + timer := mp.Timer("latency") + timer.Record(time.Duration(minLatency) * time.Millisecond) + timer.Record(time.Duration(maxLatency) * time.Millisecond) } type erroneousMeter struct { diff --git a/common/metrics/otel_options.go b/common/metrics/otel_options.go index 41db0cb1236..f7e29683ce6 100644 --- a/common/metrics/otel_options.go +++ b/common/metrics/otel_options.go @@ -38,9 +38,10 @@ type ( optionSet[T any] interface { addOption(option metric.InstrumentOption) T } - counterOptions []metric.Int64CounterOption - gaugeOptions []metric.Float64ObservableGaugeOption - histogramOptions []metric.Int64HistogramOption + counterOptions []metric.Int64CounterOption + gaugeOptions []metric.Float64ObservableGaugeOption + float64HistogramOptions []metric.Float64HistogramOption + int64HistogramOptions []metric.Int64HistogramOption ) func addOptions[T optionSet[T]](omp *otelMetricsHandler, opts T, metricName string) T { @@ -68,6 +69,10 @@ func (opts gaugeOptions) addOption(option metric.InstrumentOption) gaugeOptions return append(opts, option) } -func (opts histogramOptions) addOption(option metric.InstrumentOption) histogramOptions { +func (opts float64HistogramOptions) addOption(option metric.InstrumentOption) float64HistogramOptions { + return append(opts, option) +} + +func (opts int64HistogramOptions) addOption(option metric.InstrumentOption) int64HistogramOptions { return append(opts, option) } diff --git a/common/metrics/otel_options_test.go b/common/metrics/otel_options_test.go index 8b968a60b15..5ddc5eee0f7 100644 --- a/common/metrics/otel_options_test.go +++ b/common/metrics/otel_options_test.go @@ -81,25 +81,30 @@ func TestAddOptions(t *testing.T) { handler := &otelMetricsHandler{catalog: c.catalog} var ( - counter counterOptions - gauge gaugeOptions - hist histogramOptions + counter counterOptions + gauge gaugeOptions + int64hist int64HistogramOptions + float64hist float64HistogramOptions ) for _, opt := range inputOpts { counter = append(counter, opt.(metric.Int64CounterOption)) gauge = append(gauge, opt.(metric.Float64ObservableGaugeOption)) - hist = append(hist, opt.(metric.Int64HistogramOption)) + int64hist = append(int64hist, opt.(metric.Int64HistogramOption)) + float64hist = append(float64hist, opt.(metric.Float64HistogramOption)) } counter = addOptions(handler, counter, metricName) gauge = addOptions(handler, gauge, metricName) - hist = addOptions(handler, hist, metricName) + int64hist = addOptions(handler, int64hist, metricName) + float64hist = addOptions(handler, float64hist, metricName) require.Len(t, counter, len(c.expectedOpts)) require.Len(t, gauge, len(c.expectedOpts)) - require.Len(t, hist, len(c.expectedOpts)) + require.Len(t, int64hist, len(c.expectedOpts)) + require.Len(t, float64hist, len(c.expectedOpts)) for i, opt := range c.expectedOpts { assert.Equal(t, opt, counter[i]) assert.Equal(t, opt, gauge[i]) - assert.Equal(t, opt, hist[i]) + assert.Equal(t, opt, int64hist[i]) + assert.Equal(t, opt, float64hist[i]) } }) } diff --git a/common/metrics/tally_metrics_handler_test.go b/common/metrics/tally_metrics_handler_test.go index 40b374f88c0..92e354171f3 100644 --- a/common/metrics/tally_metrics_handler_test.go +++ b/common/metrics/tally_metrics_handler_test.go @@ -41,8 +41,13 @@ var defaultConfig = ClientConfig{ "activityType": {}, "workflowType": {}, }, - Prefix: "", - PerUnitHistogramBoundaries: map[string][]float64{Dimensionless: {0, 10, 100}, Bytes: {1024, 2048}, Milliseconds: {10, 500, 1000, 5000, 10000}}, + Prefix: "", + PerUnitHistogramBoundaries: map[string][]float64{ + Dimensionless: {0, 10, 100}, + Bytes: {1024, 2048}, + Milliseconds: {10, 500, 1000, 5000, 10000}, + Seconds: {0.01, 0.5, 1, 5, 10}, + }, } func TestTallyScope(t *testing.T) { From b9068baa293815f3552d7c374403cfee4be6e0ef Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Tue, 4 Feb 2025 17:38:37 -0800 Subject: [PATCH 2/3] without counter suffix --- common/metrics/config.go | 9 ++++++--- common/metrics/opentelemetry_provider.go | 5 ++++- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/common/metrics/config.go b/common/metrics/config.go index 5a4b4759e56..95c6b97283a 100644 --- a/common/metrics/config.go +++ b/common/metrics/config.go @@ -74,11 +74,14 @@ type ( PerUnitHistogramBoundaries map[string][]float64 `yaml:"perUnitHistogramBoundaries"` // Following configs are added for backwards compatibility when switching from tally to opentelemetry - // Both configs should be set to true when using opentelemetry framework to have the same behavior as tally. + // All configs should be set to true when using opentelemetry framework to have the same behavior as tally. - // WithoutUnits controls if metric unit should be added to the metric name as a suffix. + // WithoutUnitSuffix controls the additional of unit suffixes to metric names. // This config only takes effect when using opentelemetry framework. - WithoutUnits bool `yaml:"withoutUnits"` + WithoutUnitSuffix bool `yaml:"withoutUnitSuffix"` + // WithoutCounterSuffix controls the additional of _total suffixes to counter metric names. + // This config only takes effect when using opentelemetry framework. + WithoutCounterSuffix bool `yaml:"withoutCounterSuffix"` // RecordTimerInSeconds controls if Timer metric should be emitted as number of seconds // (instead of milliseconds). // This config only takes effect when using opentelemetry framework. diff --git a/common/metrics/opentelemetry_provider.go b/common/metrics/opentelemetry_provider.go index 64d1c3f6593..1ee81fdebe5 100644 --- a/common/metrics/opentelemetry_provider.go +++ b/common/metrics/opentelemetry_provider.go @@ -60,9 +60,12 @@ func NewOpenTelemetryProvider( ) (*openTelemetryProviderImpl, error) { reg := prometheus.NewRegistry() exporterOpts := []exporters.Option{exporters.WithRegisterer(reg)} - if clientConfig.WithoutUnits { + if clientConfig.WithoutUnitSuffix { exporterOpts = append(exporterOpts, exporters.WithoutUnits()) } + if clientConfig.WithoutCounterSuffix { + exporterOpts = append(exporterOpts, exporters.WithoutCounterSuffixes()) + } exporter, err := exporters.New(exporterOpts...) if err != nil { From ff6a253fbb59f1e3e763c43c774a70b4705e6a51 Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Tue, 4 Feb 2025 17:41:27 -0800 Subject: [PATCH 3/3] fix yaml --- common/metrics/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/metrics/config.go b/common/metrics/config.go index 95c6b97283a..9834d23d7da 100644 --- a/common/metrics/config.go +++ b/common/metrics/config.go @@ -85,7 +85,7 @@ type ( // RecordTimerInSeconds controls if Timer metric should be emitted as number of seconds // (instead of milliseconds). // This config only takes effect when using opentelemetry framework. - RecordTimerInSeconds bool `yaml:"timerInSeconds"` + RecordTimerInSeconds bool `yaml:"recordTimerInSeconds"` } // StatsdConfig contains the config items for statsd metrics reporter