Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add backward compatibility configs for tally to otel migration #7235

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions common/metrics/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,17 @@ type (
// - "milliseconds"
// - "bytes"
PerUnitHistogramBoundaries map[string][]float64 `yaml:"perUnitHistogramBoundaries"`

// Following configs are added for backwards compatibility when switching from tally to opentelemetry
// Both configs should be set to true when using opentelemetry framework to have the same behavior as tally.

// WithoutUnits controls if metric unit should be added to the metric name as a suffix.
// This config only takes effect when using opentelemetry framework.
WithoutUnits bool `yaml:"withoutUnits"`
// RecordTimerInSeconds controls if Timer metric should be emitted as number of seconds
// (instead of milliseconds).
// This config only takes effect when using opentelemetry framework.
RecordTimerInSeconds bool `yaml:"timerInSeconds"`
}

// StatsdConfig contains the config items for statsd metrics reporter
Expand Down Expand Up @@ -389,6 +400,12 @@ func setDefaultPerUnitHistogramBoundaries(clientConfig *ClientConfig) {
buckets[Bytes] = bucket
}

bucketInSeconds := make([]float64, len(buckets[Milliseconds]))
for idx, boundary := range buckets[Milliseconds] {
bucketInSeconds[idx] = boundary / float64(time.Second/time.Millisecond)
}
buckets[Seconds] = bucketInSeconds

clientConfig.PerUnitHistogramBoundaries = buckets
}

Expand Down
22 changes: 14 additions & 8 deletions common/metrics/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,22 +135,28 @@ func (s *MetricsSuite) TestSetDefaultPerUnitHistogramBoundaries() {
expectResult map[string][]float64
}

customizedBoundaries := map[string][]float64{
Dimensionless: {1},
Milliseconds: defaultPerUnitHistogramBoundaries[Milliseconds],
Bytes: defaultPerUnitHistogramBoundaries[Bytes],
}
testCases := []histogramTest{
{
input: nil,
expectResult: defaultPerUnitHistogramBoundaries,
input: nil,
expectResult: map[string][]float64{
Dimensionless: defaultPerUnitHistogramBoundaries[Dimensionless],
Milliseconds: defaultPerUnitHistogramBoundaries[Milliseconds],
Seconds: {0.001, 0.002, 0.005, 0.010, 0.020, 0.050, 0.100, 0.200, 0.500, 1, 2, 5, 10, 20, 50, 100, 200, 500, 1_000},
Bytes: defaultPerUnitHistogramBoundaries[Bytes],
},
},
{
input: map[string][]float64{
UnitNameDimensionless: {1},
UnitNameMilliseconds: {10, 1000, 2000},
"notDefine": {1},
},
expectResult: customizedBoundaries,
expectResult: map[string][]float64{
Dimensionless: {1},
Milliseconds: {10, 1000, 2000},
Seconds: {0.01, 1, 2},
Bytes: defaultPerUnitHistogramBoundaries[Bytes],
},
},
}

Expand Down
1 change: 1 addition & 0 deletions common/metrics/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ const (
Dimensionless = "1"
Milliseconds = "ms"
Bytes = "By"
Seconds = "s"
)
2 changes: 1 addition & 1 deletion common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type (
func NewTimerDef(name string, opts ...Option) timerDefinition {
// This line cannot be combined with others!
// This ensures the stack trace has information of the caller.
def := newMetricDefinition(name, append(opts, WithUnit(Milliseconds))...)
def := newMetricDefinition(name, opts...)
globalRegistry.register(def)
return timerDefinition{def}
}
Expand Down
9 changes: 7 additions & 2 deletions common/metrics/opentelemetry_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,19 @@ func NewOpenTelemetryProvider(
clientConfig *ClientConfig,
) (*openTelemetryProviderImpl, error) {
reg := prometheus.NewRegistry()
exporter, err := exporters.New(exporters.WithRegisterer(reg))
exporterOpts := []exporters.Option{exporters.WithRegisterer(reg)}
if clientConfig.WithoutUnits {
exporterOpts = append(exporterOpts, exporters.WithoutUnits())
}
exporter, err := exporters.New(exporterOpts...)

if err != nil {
logger.Error("Failed to initialize prometheus exporter.", tag.Error(err))
return nil, err
}

var views []sdkmetrics.View
for _, u := range []string{Dimensionless, Bytes, Milliseconds} {
for _, u := range []string{Dimensionless, Bytes, Milliseconds, Seconds} {
views = append(views, sdkmetrics.NewView(
sdkmetrics.Instrument{
Kind: sdkmetrics.InstrumentKindHistogram,
Expand Down
52 changes: 38 additions & 14 deletions common/metrics/otel_metrics_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ import (
// otelMetricsHandler is an adapter around an OpenTelemetry [metric.Meter] that implements the [Handler] interface.
type (
otelMetricsHandler struct {
l log.Logger
set attribute.Set
provider OpenTelemetryProvider
excludeTags map[string]map[string]struct{}
catalog catalog
gauges *sync.Map // string -> *gaugeAdapter. note: shared between multiple otelMetricsHandlers
l log.Logger
set attribute.Set
provider OpenTelemetryProvider
excludeTags map[string]map[string]struct{}
catalog catalog
gauges *sync.Map // string -> *gaugeAdapter. note: shared between multiple otelMetricsHandlers
recordTimerInSeconds bool
}

// This is to work around the lack of synchronous gauge:
Expand Down Expand Up @@ -84,13 +85,15 @@ func NewOtelMetricsHandler(
if err != nil {
return nil, fmt.Errorf("failed to build metrics catalog: %w", err)
}

return &otelMetricsHandler{
l: l,
set: makeInitialSet(cfg.Tags),
provider: o,
excludeTags: configExcludeTags(cfg),
catalog: c,
gauges: new(sync.Map),
l: l,
set: makeInitialSet(cfg.Tags),
provider: o,
excludeTags: configExcludeTags(cfg),
catalog: c,
gauges: new(sync.Map),
recordTimerInSeconds: cfg.RecordTimerInSeconds,
}, nil
}

Expand Down Expand Up @@ -172,7 +175,14 @@ func (g *gaugeAdapterGauge) Record(v float64, tags ...Tag) {

// Timer obtains a timer for the given name.
func (omp *otelMetricsHandler) Timer(timer string) TimerIface {
opts := addOptions(omp, histogramOptions{metric.WithUnit(Milliseconds)}, timer)
if omp.recordTimerInSeconds {
return omp.timerInSeconds(timer)
}
return omp.timerInMilliseconds(timer)
}

func (omp *otelMetricsHandler) timerInMilliseconds(timer string) TimerIface {
opts := addOptions(omp, int64HistogramOptions{metric.WithUnit(Milliseconds)}, timer)
c, err := omp.provider.GetMeter().Int64Histogram(timer, opts...)
if err != nil {
omp.l.Error("error getting metric", tag.NewStringTag("MetricName", timer), tag.Error(err))
Expand All @@ -185,9 +195,23 @@ func (omp *otelMetricsHandler) Timer(timer string) TimerIface {
})
}

func (omp *otelMetricsHandler) timerInSeconds(timer string) TimerIface {
opts := addOptions(omp, float64HistogramOptions{metric.WithUnit(Seconds)}, timer)
c, err := omp.provider.GetMeter().Float64Histogram(timer, opts...)
if err != nil {
omp.l.Error("error getting metric", tag.NewStringTag("MetricName", timer), tag.Error(err))
return TimerFunc(func(i time.Duration, t ...Tag) {})
}

return TimerFunc(func(i time.Duration, t ...Tag) {
option := metric.WithAttributeSet(omp.makeSet(t))
c.Record(context.Background(), i.Seconds(), option)
})
}

// Histogram obtains a histogram for the given name.
func (omp *otelMetricsHandler) Histogram(histogram string, unit MetricUnit) HistogramIface {
opts := addOptions(omp, histogramOptions{metric.WithUnit(string(unit))}, histogram)
opts := addOptions(omp, int64HistogramOptions{metric.WithUnit(string(unit))}, histogram)
c, err := omp.provider.GetMeter().Int64Histogram(histogram, opts...)
if err != nil {
omp.l.Error("error getting metric", tag.NewStringTag("MetricName", histogram), tag.Error(err))
Expand Down
96 changes: 84 additions & 12 deletions common/metrics/otel_metrics_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,33 +68,33 @@ func TestMeter(t *testing.T) {
sdkmetrics.NewView(
sdkmetrics.Instrument{
Kind: sdkmetrics.InstrumentKindHistogram,
Unit: "By",
Unit: Bytes,
},
sdkmetrics.Stream{
Aggregation: sdkmetrics.AggregationExplicitBucketHistogram{
Boundaries: defaultConfig.PerUnitHistogramBoundaries["By"],
Boundaries: defaultConfig.PerUnitHistogramBoundaries[Bytes],
},
},
),
sdkmetrics.NewView(
sdkmetrics.Instrument{
Kind: sdkmetrics.InstrumentKindHistogram,
Unit: "1",
Unit: Dimensionless,
},
sdkmetrics.Stream{
Aggregation: sdkmetrics.AggregationExplicitBucketHistogram{
Boundaries: defaultConfig.PerUnitHistogramBoundaries["1"],
Boundaries: defaultConfig.PerUnitHistogramBoundaries[Dimensionless],
},
},
),
sdkmetrics.NewView(
sdkmetrics.Instrument{
Kind: sdkmetrics.InstrumentKindHistogram,
Unit: "ms",
Unit: Milliseconds,
},
sdkmetrics.Stream{
Aggregation: sdkmetrics.AggregationExplicitBucketHistogram{
Boundaries: defaultConfig.PerUnitHistogramBoundaries["ms"],
Boundaries: defaultConfig.PerUnitHistogramBoundaries[Milliseconds],
},
},
),
Expand Down Expand Up @@ -171,7 +171,7 @@ func TestMeter(t *testing.T) {
},
Temporality: metricdata.CumulativeTemporality,
},
Unit: "ms",
Unit: Milliseconds,
},
{
Name: "temp",
Expand Down Expand Up @@ -200,7 +200,7 @@ func TestMeter(t *testing.T) {
},
Temporality: metricdata.CumulativeTemporality,
},
Unit: "By",
Unit: Bytes,
},
}
if diff := cmp.Diff(want, got.ScopeMetrics[0].Metrics,
Expand All @@ -223,22 +223,94 @@ func TestMeter(t *testing.T) {
}
}

func TestMeter_TimerInSeconds(t *testing.T) {
ctx := context.Background()
rdr := sdkmetrics.NewManualReader()
provider := sdkmetrics.NewMeterProvider(
sdkmetrics.WithReader(rdr),
sdkmetrics.WithView(
sdkmetrics.NewView(
sdkmetrics.Instrument{
Kind: sdkmetrics.InstrumentKindHistogram,
Unit: Seconds,
},
sdkmetrics.Stream{
Aggregation: sdkmetrics.AggregationExplicitBucketHistogram{
Boundaries: defaultConfig.PerUnitHistogramBoundaries[Seconds],
},
},
),
),
)

timerInSecondsConfig := defaultConfig
timerInSecondsConfig.RecordTimerInSeconds = true
p, err := NewOtelMetricsHandler(
log.NewTestLogger(),
&testProvider{meter: provider.Meter("test")},
timerInSecondsConfig,
)
require.NoError(t, err)
recordTimer(p)

var got metricdata.ResourceMetrics
err = rdr.Collect(ctx, &got)
assert.Nil(t, err)

want := []metricdata.Metrics{
{
Name: "latency",
Data: metricdata.Histogram[float64]{
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Count: 2,
BucketCounts: []uint64{0, 0, 0, 1, 1, 0},
Min: metricdata.NewExtrema[float64](float64(minLatency) / 1000),
Max: metricdata.NewExtrema[float64](float64(maxLatency) / 1000),
Sum: (minLatency + maxLatency) / 1000,
Exemplars: []metricdata.Exemplar[float64]{},
},
},
Temporality: metricdata.CumulativeTemporality,
},
Unit: Seconds,
},
}
if diff := cmp.Diff(want, got.ScopeMetrics[0].Metrics,
cmp.Comparer(func(e1, e2 metricdata.Extrema[float64]) bool {
v1, ok1 := e1.Value()
v2, ok2 := e2.Value()
return ok1 && ok2 && v1 == v2
}),
cmp.Comparer(func(a1, a2 attribute.Set) bool {
return a1.Equals(&a2)
}),
cmpopts.IgnoreFields(metricdata.HistogramDataPoint[float64]{}, "StartTime", "Time", "Bounds"),
); diff != "" {
t.Errorf("mismatch (-want, +got):\n%s", diff)
}
}

func recordMetrics(mp Handler) {
hitsCounter := mp.Counter("hits")
gauge := mp.Gauge("temp")

timer := mp.Timer("latency")
histogram := mp.Histogram("transmission", Bytes)
hitsTaggedCounter := mp.Counter("hits-tagged")
hitsTaggedExcludedCounter := mp.Counter("hits-tagged-excluded")

hitsCounter.Record(8)
gauge.Record(100, StringTag("location", "Mare Imbrium"))
timer.Record(time.Duration(minLatency) * time.Millisecond)
timer.Record(time.Duration(maxLatency) * time.Millisecond)
histogram.Record(int64(testBytes))
hitsTaggedCounter.Record(11, UnsafeTaskQueueTag("__sticky__"))
hitsTaggedExcludedCounter.Record(14, UnsafeTaskQueueTag("filtered"))

recordTimer(mp)
}

func recordTimer(mp Handler) {
timer := mp.Timer("latency")
timer.Record(time.Duration(minLatency) * time.Millisecond)
timer.Record(time.Duration(maxLatency) * time.Millisecond)
}

type erroneousMeter struct {
Expand Down
13 changes: 9 additions & 4 deletions common/metrics/otel_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ type (
optionSet[T any] interface {
addOption(option metric.InstrumentOption) T
}
counterOptions []metric.Int64CounterOption
gaugeOptions []metric.Float64ObservableGaugeOption
histogramOptions []metric.Int64HistogramOption
counterOptions []metric.Int64CounterOption
gaugeOptions []metric.Float64ObservableGaugeOption
float64HistogramOptions []metric.Float64HistogramOption
int64HistogramOptions []metric.Int64HistogramOption
)

func addOptions[T optionSet[T]](omp *otelMetricsHandler, opts T, metricName string) T {
Expand Down Expand Up @@ -68,6 +69,10 @@ func (opts gaugeOptions) addOption(option metric.InstrumentOption) gaugeOptions
return append(opts, option)
}

func (opts histogramOptions) addOption(option metric.InstrumentOption) histogramOptions {
func (opts float64HistogramOptions) addOption(option metric.InstrumentOption) float64HistogramOptions {
return append(opts, option)
}

func (opts int64HistogramOptions) addOption(option metric.InstrumentOption) int64HistogramOptions {
return append(opts, option)
}
Loading
Loading