Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Add native histogram exemplar support in `go.opentelemetry.io/otel/exporters/prometheus`. (#6772)
- Add experimental self-observability log metrics in `go.opentelemetry.io/otel/sdk/log`.
Check the `go.opentelemetry.io/otel/sdk/log/internal/x` package documentation for more information. (#7121)
- Add experimental self-observability trace exporter metrics in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc`. (#7133)

### Changed

Expand Down
89 changes: 88 additions & 1 deletion exporters/otlp/otlptrace/otlptracegrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ package otlptracegrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlptra
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
Expand All @@ -18,10 +20,16 @@ import (
"google.golang.org/grpc/status"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/otlpconfig"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/retry"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/x"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk"
semconv "go.opentelemetry.io/otel/semconv/v1.36.0"
"go.opentelemetry.io/otel/semconv/v1.36.0/otelconv"
)

type client struct {
Expand All @@ -45,6 +53,12 @@ type client struct {
conn *grpc.ClientConn
tscMu sync.RWMutex
tsc coltracepb.TraceServiceClient

selfObservabilityEnabled bool
spanInFlightMetric otelconv.SDKExporterSpanInflight
spanExportedMetric otelconv.SDKExporterSpanExported
operationDurationMetric otelconv.SDKExporterOperationDuration
selfObservabilityAttrs []attribute.KeyValue
}

// Compile time check *client implements otlptrace.Client.
Expand Down Expand Up @@ -74,6 +88,8 @@ func newClient(opts ...Option) *client {
c.metadata = metadata.New(cfg.Traces.Headers)
}

c.initSelfObservability()

Comment on lines +95 to +96
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flattening initSelfObservability seems appropriate. This is the only call site and this function is scoped to setup a new client, which includes telemetry.

return c
}

Expand Down Expand Up @@ -189,10 +205,43 @@ func (c *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.Resourc
ctx, cancel := c.exportContext(ctx)
defer cancel()

return c.requestFunc(ctx, func(iCtx context.Context) error {
return c.requestFunc(ctx, func(iCtx context.Context) (err error) {
if c.selfObservabilityEnabled {
count := 0
for _, ps := range protoSpans {
for _, ss := range ps.ScopeSpans {
count += len(ss.Spans)
}
}
c.spanInFlightMetric.Add(context.Background(), int64(count), c.selfObservabilityAttrs...)

defer func(start time.Time) {
duration := time.Since(start)
durationAttrs := append([]attribute.KeyValue{
c.operationDurationMetric.AttrRPCGRPCStatusCode(otelconv.RPCGRPCStatusCodeAttr(status.Code(err))),
},
c.selfObservabilityAttrs...,
)
exportedAttrs := append([]attribute.KeyValue{}, c.selfObservabilityAttrs...)

if err != nil {
durationAttrs = append(durationAttrs, semconv.ErrorType(err))
exportedAttrs = append(exportedAttrs, semconv.ErrorType(err))
}

c.operationDurationMetric.Record(context.Background(), duration.Seconds(),
durationAttrs...,
)

c.spanExportedMetric.Add(context.Background(), int64(count), exportedAttrs...)
c.spanInFlightMetric.Add(context.Background(), int64(-count), c.selfObservabilityAttrs...)
}(time.Now())
}

resp, err := c.tsc.Export(iCtx, &coltracepb.ExportTraceServiceRequest{
ResourceSpans: protoSpans,
})

if resp != nil && resp.PartialSuccess != nil {
msg := resp.PartialSuccess.GetErrorMessage()
n := resp.PartialSuccess.GetRejectedSpans()
Expand All @@ -201,6 +250,7 @@ func (c *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.Resourc
otel.Handle(err)
}
}

// nil is converted to OK.
if status.Code(err) == codes.OK {
// Success.
Expand Down Expand Up @@ -298,3 +348,40 @@ func (c *client) MarshalLog() any {
Endpoint: c.endpoint,
}
}

func (c *client) initSelfObservability() {
if !x.SelfObservability.Enabled() {
return
}

c.selfObservabilityEnabled = true

c.selfObservabilityAttrs = []attribute.KeyValue{
semconv.OTelComponentName(fmt.Sprintf("%s/%d", otelconv.ComponentTypeOtlpGRPCSpanExporter, nextExporterID())),
semconv.OTelComponentTypeOtlpGRPCSpanExporter,
}

mp := otel.GetMeterProvider()
m := mp.Meter("go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc",
metric.WithInstrumentationVersion(sdk.Version()),
metric.WithSchemaURL(semconv.SchemaURL))
Comment on lines +381 to +383
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
m := mp.Meter("go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc",
metric.WithInstrumentationVersion(sdk.Version()),
metric.WithSchemaURL(semconv.SchemaURL))
m := mp.Meter(
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc",
metric.WithInstrumentationVersion(sdk.Version()),
metric.WithSchemaURL(semconv.SchemaURL),
)


var err error
if c.spanInFlightMetric, err = otelconv.NewSDKExporterSpanInflight(m); err != nil {
otel.Handle(err)
}
if c.spanExportedMetric, err = otelconv.NewSDKExporterSpanExported(m); err != nil {
otel.Handle(err)
}
if c.operationDurationMetric, err = otelconv.NewSDKExporterOperationDuration(m); err != nil {
otel.Handle(err)
}
}

var exporterIDCounter atomic.Int64

// nextExporterID returns a new unique ID for an exporter.
// the starting value is 0, and it increments by 1 for each call.
Comment on lines +445 to +446
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
// nextExporterID returns a new unique ID for an exporter.
// the starting value is 0, and it increments by 1 for each call.
// nextExporterID returns a monotonically increasing int64 starting at 0

func nextExporterID() int64 {
return exporterIDCounter.Add(1) - 1
}
222 changes: 222 additions & 0 deletions exporters/otlp/otlptrace/otlptracegrpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,15 @@ import (
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/otlptracetest"
"go.opentelemetry.io/otel/sdk"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
semconv "go.opentelemetry.io/otel/semconv/v1.36.0"
"go.opentelemetry.io/otel/semconv/v1.36.0/otelconv"
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -433,3 +440,218 @@ func TestCustomUserAgent(t *testing.T) {
headers := mc.getHeaders()
require.Contains(t, headers.Get("user-agent")[0], customUserAgent)
}

func TestSelfObservability(t *testing.T) {
defaultCallExportSpans := func(t *testing.T, exporter *otlptrace.Exporter) {
require.NoError(t, exporter.ExportSpans(context.Background(), tracetest.SpanStubs{
{Name: "/foo"},
{Name: "/bar"},
}.Snapshots()))
}

tests := []struct {
name string
enabled bool
callExportSpans func(t *testing.T, exporter *otlptrace.Exporter)
assertMetrics func(t *testing.T, rm metricdata.ResourceMetrics)
}{
{
name: "Disabled",
enabled: false,
callExportSpans: defaultCallExportSpans,
assertMetrics: func(t *testing.T, rm metricdata.ResourceMetrics) {
assert.Empty(t, rm.ScopeMetrics)
},
},
{
name: "Enabled",
enabled: true,
callExportSpans: defaultCallExportSpans,
assertMetrics: func(t *testing.T, rm metricdata.ResourceMetrics) {
t.Helper()
require.Len(t, rm.ScopeMetrics, 1)

sm := rm.ScopeMetrics[0]
require.Len(t, sm.Metrics, 3)

assert.Equal(t, instrumentation.Scope{
Name: "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc",
Version: sdk.Version(),
SchemaURL: semconv.SchemaURL,
}, sm.Scope)

metricdatatest.AssertEqual(t, metricdata.Metrics{
Name: otelconv.SDKExporterSpanInflight{}.Name(),
Description: otelconv.SDKExporterSpanInflight{}.Description(),
Unit: otelconv.SDKExporterSpanInflight{}.Unit(),
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
semconv.OTelComponentName("otlp_grpc_span_exporter/0"),
semconv.OTelComponentTypeKey.String("otlp_grpc_span_exporter"),
),
Value: 0,
},
},
},
}, sm.Metrics[0], metricdatatest.IgnoreTimestamp())

metricdatatest.AssertEqual(t, metricdata.Metrics{
Name: otelconv.SDKExporterSpanExported{}.Name(),
Description: otelconv.SDKExporterSpanExported{}.Description(),
Unit: otelconv.SDKExporterSpanExported{}.Unit(),
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
semconv.OTelComponentName("otlp_grpc_span_exporter/0"),
semconv.OTelComponentTypeKey.String("otlp_grpc_span_exporter"),
),
Value: 2,
},
},
},
}, sm.Metrics[1], metricdatatest.IgnoreTimestamp())

metricdatatest.AssertEqual(t, metricdata.Metrics{
Name: otelconv.SDKExporterOperationDuration{}.Name(),
Description: otelconv.SDKExporterOperationDuration{}.Description(),
Unit: otelconv.SDKExporterOperationDuration{}.Unit(),
Data: metricdata.Histogram[float64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Attributes: attribute.NewSet(
semconv.OTelComponentName("otlp_grpc_span_exporter/0"),
semconv.OTelComponentTypeKey.String("otlp_grpc_span_exporter"),
semconv.RPCGRPCStatusCodeOk,
),
},
},
},
}, sm.Metrics[2], metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
},
},
{
name: "Enabled, but ExportSpans returns error",
enabled: true,
callExportSpans: func(t *testing.T, exporter *otlptrace.Exporter) {
t.Helper()
ctx, cancel := context.WithCancel(context.Background())
cancel()

err := exporter.ExportSpans(ctx, tracetest.SpanStubs{
{Name: "/foo"},
{Name: "/bar"},
}.Snapshots())
require.Error(t, err)
},
assertMetrics: func(t *testing.T, rm metricdata.ResourceMetrics) {
t.Helper()
require.Len(t, rm.ScopeMetrics, 1)

sm := rm.ScopeMetrics[0]
require.Len(t, sm.Metrics, 3)

assert.Equal(t, instrumentation.Scope{
Name: "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc",
Version: sdk.Version(),
SchemaURL: semconv.SchemaURL,
}, sm.Scope)

metricdatatest.AssertEqual(t, metricdata.Metrics{
Name: otelconv.SDKExporterSpanInflight{}.Name(),
Description: otelconv.SDKExporterSpanInflight{}.Description(),
Unit: otelconv.SDKExporterSpanInflight{}.Unit(),
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
semconv.OTelComponentName("otlp_grpc_span_exporter/1"),
semconv.OTelComponentTypeKey.String("otlp_grpc_span_exporter"),
),
Value: 0,
},
},
},
}, sm.Metrics[0], metricdatatest.IgnoreTimestamp())

metricdatatest.AssertEqual(t, metricdata.Metrics{
Name: otelconv.SDKExporterSpanExported{}.Name(),
Description: otelconv.SDKExporterSpanExported{}.Description(),
Unit: otelconv.SDKExporterSpanExported{}.Unit(),
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
semconv.OTelComponentName("otlp_grpc_span_exporter/1"),
semconv.OTelComponentTypeKey.String("otlp_grpc_span_exporter"),
semconv.ErrorType(status.Error(codes.Canceled, "")),
),
Value: 2,
},
},
},
}, sm.Metrics[1], metricdatatest.IgnoreTimestamp())

metricdatatest.AssertEqual(t, metricdata.Metrics{
Name: otelconv.SDKExporterOperationDuration{}.Name(),
Description: otelconv.SDKExporterOperationDuration{}.Description(),
Unit: otelconv.SDKExporterOperationDuration{}.Unit(),
Data: metricdata.Histogram[float64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Attributes: attribute.NewSet(
semconv.OTelComponentName("otlp_grpc_span_exporter/1"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This relies on test execution order. It is brittle and will break when test are run in parallel or new cases are added. The generator needs to be reset per test case or this needs to not be evaluated as strictly.

semconv.OTelComponentTypeKey.String("otlp_grpc_span_exporter"),
semconv.ErrorType(status.Error(codes.Canceled, "")),
semconv.RPCGRPCStatusCodeCancelled,
),
},
},
},
}, sm.Metrics[2], metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.enabled {
t.Setenv("OTEL_GO_X_SELF_OBSERVABILITY", "true")
}
Comment on lines +655 to +657
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two test cases and this conditional splits them. They should be made into their own tests to just remove the complexity being added to accommodate everything here.


original := otel.GetMeterProvider()
defer otel.SetMeterProvider(original)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
defer otel.SetMeterProvider(original)
t.Cleanup(func() { otel.SetMeterProvider(original) })


r := metric.NewManualReader()
mp := metric.NewMeterProvider(metric.WithReader(r))
otel.SetMeterProvider(mp)

mc := runMockCollector(t)
t.Cleanup(func() { require.NoError(t, mc.stop()) })
exp := newGRPCExporter(t, context.Background(), mc.endpoint)
t.Cleanup(func() { require.NoError(t, exp.Shutdown(context.Background())) })

// exporter, err := stdouttrace.New(
// stdouttrace.WithWriter(io.Discard))

// require.NoError(t, err)

tt.callExportSpans(t, exp)

var rm metricdata.ResourceMetrics
require.NoError(t, r.Collect(context.Background(), &rm))

tt.assertMetrics(t, rm)
})
}
}
Loading
Loading