Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .lycheeignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
http://localhost
https://localhost
http://jaeger-collector
https://github.com/open-telemetry/opentelemetry-go/milestone/
https://github.com/open-telemetry/opentelemetry-go/projects
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ The next release will require at least [Go 1.24].
- Add experimental self-observability trace exporter metrics in `go.opentelemetry.io/otel/exporters/stdout/stdouttrace`.
Check the `go.opentelemetry.io/otel/exporters/stdout/stdouttrace/internal/x` package documentation for more information. (#7133)
- Support testing of [Go 1.25]. (#7187)
- Add experimental self-observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc` (#7120)

### Changed

Expand Down
3 changes: 3 additions & 0 deletions exporters/otlp/otlpmetric/otlpmetricgrpc/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ default aggregation to use for histogram instruments. Supported values:

The configuration can be overridden by [WithAggregationSelector] option.

See [go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/x] for information about
the experimental features.

[W3C Baggage HTTP Header Content Format]: https://www.w3.org/TR/baggage/#header-content
[Explicit Bucket Histogram Aggregation]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.26.0/specification/metrics/sdk.md#explicit-bucket-histogram-aggregation
[Base2 Exponential Bucket Histogram Aggregation]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.26.0/specification/metrics/sdk.md#base2-exponential-bucket-histogram-aggregation
Expand Down
23 changes: 21 additions & 2 deletions exporters/otlp/otlpmetric/otlpmetricgrpc/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import (
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"

"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/oconf"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/selfobservability"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/transform"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/semconv/v1.36.0/otelconv"
)

// Exporter is a OpenTelemetry metric Exporter using gRPC.
Expand All @@ -31,6 +33,9 @@ type Exporter struct {
aggregationSelector metric.AggregationSelector

shutdownOnce sync.Once

// Self-observability metrics
metrics *selfobservability.ExporterMetrics
}

func newExporter(c *client, cfg oconf.Config) (*Exporter, error) {
Expand All @@ -46,11 +51,20 @@ func newExporter(c *client, cfg oconf.Config) (*Exporter, error) {
as = metric.DefaultAggregationSelector
}

// Extract server address and port from endpoint for self-observability
serverAddress, serverPort := selfobservability.ParseEndpoint(cfg.Metrics.Endpoint)

return &Exporter{
client: c,

temporalitySelector: ts,
aggregationSelector: as,

metrics: selfobservability.NewExporterMetrics(
string(otelconv.ComponentTypeOtlpGRPCMetricExporter),
serverAddress,
serverPort,
),
}, nil
}

Expand All @@ -68,19 +82,24 @@ func (e *Exporter) Aggregation(k metric.InstrumentKind) metric.Aggregation {
//
// This method returns an error if called after Shutdown.
// This method returns an error if the method is canceled by the passed context.
func (e *Exporter) Export(ctx context.Context, rm *metricdata.ResourceMetrics) error {
func (e *Exporter) Export(ctx context.Context, rm *metricdata.ResourceMetrics) (finalErr error) {
defer global.Debug("OTLP/gRPC exporter export", "Data", rm)

// Track export operation for self-observability
finishTracking := e.metrics.TrackExport(ctx, rm)
defer func() { finishTracking(finalErr) }()

otlpRm, err := transform.ResourceMetrics(rm)
// Best effort upload of transformable metrics.
e.clientMu.Lock()
upErr := e.client.UploadMetrics(ctx, otlpRm)
e.clientMu.Unlock()

// Return the appropriate error
if upErr != nil {
if err == nil {
return fmt.Errorf("failed to upload metrics: %w", upErr)
}
// Merge the two errors.
return fmt.Errorf("failed to upload incomplete metrics (%w): %w", err, upErr)
}
return err
Expand Down
4 changes: 2 additions & 2 deletions exporters/otlp/otlpmetric/otlpmetricgrpc/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ func TestExporterClientConcurrentSafe(t *testing.T) {
}

someWork.Wait()
assert.NoError(t, exp.Shutdown(ctx))
assert.ErrorIs(t, exp.Shutdown(ctx), errShutdown)
require.NoError(t, exp.Shutdown(ctx))
require.ErrorIs(t, exp.Shutdown(ctx), errShutdown)

close(done)
wg.Wait()
Expand Down
2 changes: 1 addition & 1 deletion exporters/otlp/otlpmetric/otlpmetricgrpc/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/google/go-cmp v0.7.0
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/otel v1.37.0
go.opentelemetry.io/otel/metric v1.37.0
go.opentelemetry.io/otel/sdk v1.37.0
go.opentelemetry.io/otel/sdk/metric v1.37.0
go.opentelemetry.io/proto/otlp v1.7.1
Expand All @@ -25,7 +26,6 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/otel/metric v1.37.0 // indirect
go.opentelemetry.io/otel/trace v1.37.0 // indirect
golang.org/x/net v0.43.0 // indirect
golang.org/x/sys v0.35.0 // indirect
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package selfobservability provides self-observability metrics for OTLP metric exporters.
// This is an experimental feature controlled by the x.SelfObservability feature flag.
package selfobservability // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/selfobservability"

import (
"context"
"fmt"
"net/url"
"strconv"
"strings"
"sync/atomic"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
semconv "go.opentelemetry.io/otel/semconv/v1.36.0"
"go.opentelemetry.io/otel/semconv/v1.36.0/otelconv"

"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/x"
)

// exporterIDCounter is used to generate unique component names for exporters.
var exporterIDCounter atomic.Uint64

// nextExporterID returns the next unique exporter ID.
func nextExporterID() uint64 {
return exporterIDCounter.Add(1) - 1
}

// ExporterMetrics holds the self-observability metric instruments for an OTLP metric exporter.
type ExporterMetrics struct {
exported otelconv.SDKExporterMetricDataPointExported
inflight otelconv.SDKExporterMetricDataPointInflight
duration otelconv.SDKExporterOperationDuration
attrs []attribute.KeyValue
enabled bool
}

// NewExporterMetrics creates a new ExporterMetrics instance.
// If self-observability is disabled, returns a no-op instance.
func NewExporterMetrics(componentType, serverAddress string, serverPort int) *ExporterMetrics {
em := &ExporterMetrics{
enabled: x.SelfObservability.Enabled(),
}

if !em.enabled {
return em
}

meter := otel.GetMeterProvider().Meter(
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc",
metric.WithInstrumentationVersion(sdk.Version()),
metric.WithSchemaURL(semconv.SchemaURL),
)

var err error
em.exported, err = otelconv.NewSDKExporterMetricDataPointExported(meter)
if err != nil {
em.enabled = false
return em
}

em.inflight, err = otelconv.NewSDKExporterMetricDataPointInflight(meter)
if err != nil {
em.enabled = false
return em
}

em.duration, err = otelconv.NewSDKExporterOperationDuration(meter)
if err != nil {
em.enabled = false
return em
}

// Set up common attributes
componentName := fmt.Sprintf("%s/%d", componentType, nextExporterID())
em.attrs = []attribute.KeyValue{
semconv.OTelComponentTypeKey.String(componentType),
semconv.OTelComponentName(componentName),
semconv.ServerAddress(serverAddress),
semconv.ServerPort(serverPort),
}

return em
}

// TrackExport tracks an export operation and returns a function to complete the tracking.
// The returned function should be called when the export operation completes.
func (em *ExporterMetrics) TrackExport(ctx context.Context, rm *metricdata.ResourceMetrics) func(error) {
if !em.enabled {
return func(error) {}
}

dataPointCount := countDataPoints(rm)
startTime := time.Now()

em.inflight.Add(ctx, dataPointCount, em.attrs...)

return func(err error) {
em.inflight.Add(ctx, -dataPointCount, em.attrs...)

duration := time.Since(startTime).Seconds()

attrs := make([]attribute.KeyValue, len(em.attrs), len(em.attrs)+1)
copy(attrs, em.attrs)
if err != nil {
attrs = append(attrs, semconv.ErrorTypeOther)
}
em.duration.Inst().Record(ctx, duration, metric.WithAttributes(attrs...))

if err == nil {
em.exported.Add(ctx, dataPointCount, em.attrs...)
}
}
}

// countDataPoints counts the total number of data points in a ResourceMetrics.
func countDataPoints(rm *metricdata.ResourceMetrics) int64 {
if rm == nil {
return 0
}

var total int64
for _, sm := range rm.ScopeMetrics {
for _, m := range sm.Metrics {
switch data := m.Data.(type) {
case metricdata.Gauge[int64]:
total += int64(len(data.DataPoints))
case metricdata.Gauge[float64]:
total += int64(len(data.DataPoints))
case metricdata.Sum[int64]:
total += int64(len(data.DataPoints))
case metricdata.Sum[float64]:
total += int64(len(data.DataPoints))
case metricdata.Histogram[int64]:
total += int64(len(data.DataPoints))
case metricdata.Histogram[float64]:
total += int64(len(data.DataPoints))
case metricdata.ExponentialHistogram[int64]:
total += int64(len(data.DataPoints))
case metricdata.ExponentialHistogram[float64]:
total += int64(len(data.DataPoints))
case metricdata.Summary:
total += int64(len(data.DataPoints))
}
}
}
return total
}

// ParseEndpoint extracts server address and port from an endpoint URL.
// Returns defaults if parsing fails or endpoint is empty.
func ParseEndpoint(endpoint string) (address string, port int) {
address = "localhost"
port = 4317

if endpoint == "" {
return
}

// Handle endpoint without scheme
if !strings.Contains(endpoint, "://") {
endpoint = "http://" + endpoint
}

u, err := url.Parse(endpoint)
if err != nil {
return
}

if u.Hostname() != "" {
address = u.Hostname()
}

if u.Port() != "" {
if p, err := strconv.Atoi(u.Port()); err == nil {
port = p
}
}

return
}
Loading
Loading