Skip to content

Commit 576cb11

Browse files
committed
Add OTLP for traces
Signed-off-by: Pavol Loffay <[email protected]>
1 parent bac4016 commit 576cb11

File tree

6 files changed

+281
-115
lines changed

6 files changed

+281
-115
lines changed

api/traces/v1/http.go

+34-1
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ func (n nopInstrumentHandler) NewHandler(labels prometheus.Labels, handler http.
109109
// The web UI handler is able to rewrite
110110
// HTML to change the <base> attribute so that it works with the Observatorium-style
111111
// "/api/v1/traces/{tenant}/" URLs.
112-
func NewV2Handler(read *url.URL, readTemplate string, tempo *url.URL, upstreamCA []byte, upstreamCert *stdtls.Certificate, opts ...HandlerOption) http.Handler {
112+
func NewV2Handler(read *url.URL, readTemplate string, tempo *url.URL, writeOTLPHttp *url.URL, upstreamCA []byte, upstreamCert *stdtls.Certificate, opts ...HandlerOption) http.Handler {
113113

114114
if read == nil && readTemplate == "" && tempo == nil {
115115
panic("missing Jaeger read url")
@@ -191,6 +191,39 @@ func NewV2Handler(read *url.URL, readTemplate string, tempo *url.URL, upstreamCA
191191
})
192192
}
193193

194+
if writeOTLPHttp != nil {
195+
middlewares := proxy.Middlewares(
196+
proxy.MiddlewareSetUpstream(writeOTLPHttp),
197+
proxy.MiddlewareSetPrefixHeader(),
198+
proxy.MiddlewareLogger(c.logger),
199+
middlewareMetrics,
200+
)
201+
202+
t := &http.Transport{
203+
DialContext: (&net.Dialer{
204+
Timeout: dialTimeout,
205+
}).DialContext,
206+
TLSClientConfig: tls.NewClientConfig(upstreamCA, upstreamCert),
207+
}
208+
209+
proxyOTLP := &httputil.ReverseProxy{
210+
Director: middlewares,
211+
ErrorLog: proxy.Logger(c.logger),
212+
Transport: otelhttp.NewTransport(t),
213+
214+
// This is a key piece, it changes <base href=> tags on text/html content
215+
ModifyResponse: jaegerUIResponseModifier,
216+
}
217+
218+
r.Group(func(r chi.Router) {
219+
r.Use(c.tempoMiddlewares...)
220+
r.Post("/v1/traces", c.instrument.NewHandler(
221+
prometheus.Labels{"group": "tracesotlphttpv1api", "handler": "traces"},
222+
proxyOTLP))
223+
})
224+
225+
}
226+
194227
// if tempo upstream is enabled, configure proxy and route
195228
if tempo != nil {
196229
level.Debug(c.logger).Log("msg", "Configuring upstream Tempo", "queryv2", tempo)

main.go

+32-17
Original file line numberDiff line numberDiff line change
@@ -178,14 +178,15 @@ type tracesConfig struct {
178178
// readTemplateEndpoint is of the form "http://jaeger-{tenant}-query:16686".
179179
readTemplateEndpoint string
180180

181-
readEndpoint *url.URL
182-
writeEndpoint string
183-
tempoEndpoint *url.URL
184-
upstreamWriteTimeout time.Duration
185-
upstreamCAFile string
186-
upstreamCertFile string
187-
upstreamKeyFile string
188-
tenantHeader string
181+
readEndpoint *url.URL
182+
writeOTLPGRPCEndpoint string
183+
writeOTLPHTTPEndpoint *url.URL
184+
tempoEndpoint *url.URL
185+
upstreamWriteTimeout time.Duration
186+
upstreamCAFile string
187+
upstreamCertFile string
188+
upstreamKeyFile string
189+
tenantHeader string
189190
// enable traces if readTemplateEndpoint, readEndpoint, or writeEndpoint is provided.
190191
enabled bool
191192
}
@@ -802,6 +803,7 @@ func main() {
802803
cfg.traces.readEndpoint,
803804
cfg.traces.readTemplateEndpoint,
804805
cfg.traces.tempoEndpoint,
806+
cfg.traces.writeOTLPHTTPEndpoint,
805807
tracesUpstreamCACert,
806808
tracesUpstreamClientCert,
807809
tracesv1.Logger(logger),
@@ -1052,7 +1054,8 @@ func parseFlags() (config, error) {
10521054
rawLogsAuthExtractSelectors string
10531055
rawTracesReadEndpoint string
10541056
rawTracesTempoEndpoint string
1055-
rawTracesWriteEndpoint string
1057+
rawTracesWriteOTLPGRPCEndpoint string
1058+
rawTracesWriteOTLPHTTPEndpoint string
10561059
rawTracingEndpointType string
10571060
)
10581061

@@ -1142,8 +1145,10 @@ func parseFlags() (config, error) {
11421145
"The endpoint against which to make HTTP read requests for traces using traceQL (tempo API).")
11431146
flag.StringVar(&cfg.traces.readTemplateEndpoint, "experimental.traces.read.endpoint-template", "",
11441147
"A template replacing --read.traces.endpoint, such as http://jaeger-{tenant}-query:16686")
1145-
flag.StringVar(&rawTracesWriteEndpoint, "traces.write.endpoint", "",
1146-
"The endpoint against which to make gRPC write requests for traces.")
1148+
flag.StringVar(&rawTracesWriteOTLPGRPCEndpoint, "traces.write.otlpgrpc.endpoint", "",
1149+
"The endpoint against which to make OTLP gRPC write requests for traces.")
1150+
flag.StringVar(&rawTracesWriteOTLPHTTPEndpoint, "traces.write.otlphttp.endpoint", "",
1151+
"The endpoint against which to make OTLP HTTP write requests for traces.")
11471152
flag.DurationVar(&cfg.traces.upstreamWriteTimeout, "traces.write-timeout", tracesMiddlewareTimeout,
11481153
"The HTTP write timeout for proxied requests to the traces endpoint.")
11491154
flag.StringVar(&cfg.traces.upstreamCAFile, "traces.tls.ca-file", "",
@@ -1344,19 +1349,29 @@ func parseFlags() (config, error) {
13441349
cfg.traces.tempoEndpoint = tracesTempoEndpoint
13451350
}
13461351

1347-
if rawTracesWriteEndpoint != "" {
1352+
if rawTracesWriteOTLPGRPCEndpoint != "" {
1353+
cfg.traces.enabled = true
1354+
1355+
_, _, err := net.SplitHostPort(rawTracesWriteOTLPGRPCEndpoint)
1356+
if err != nil {
1357+
return cfg, fmt.Errorf("--traces.write.otlpgrpc.endpoint %q is invalid: %w", rawTracesWriteOTLPGRPCEndpoint, err)
1358+
}
1359+
1360+
cfg.traces.writeOTLPGRPCEndpoint = rawTracesWriteOTLPGRPCEndpoint
1361+
}
1362+
if rawTracesWriteOTLPHTTPEndpoint != "" {
13481363
cfg.traces.enabled = true
13491364

1350-
_, _, err := net.SplitHostPort(rawTracesWriteEndpoint)
1365+
tracesOTLPHTTPEndpoint, err := url.ParseRequestURI(rawTracesWriteOTLPHTTPEndpoint)
13511366
if err != nil {
1352-
return cfg, fmt.Errorf("--traces.write.endpoint %q is invalid: %w", rawTracesWriteEndpoint, err)
1367+
return cfg, fmt.Errorf("--traces.write.otlphttp.endpoint %q is invalid: %w", rawTracesWriteOTLPHTTPEndpoint, err)
13531368
}
13541369

1355-
cfg.traces.writeEndpoint = rawTracesWriteEndpoint
1370+
cfg.traces.writeOTLPHTTPEndpoint = tracesOTLPHTTPEndpoint
13561371
}
13571372

13581373
if cfg.traces.enabled && cfg.server.grpcListen == "" {
1359-
return cfg, fmt.Errorf("-traces.write.endpoint is set to %q but -grpc.listen is not set", cfg.traces.writeEndpoint)
1374+
return cfg, fmt.Errorf("-traces.write.endpoint is set to %q but -grpc.listen is not set", cfg.traces.writeOTLPGRPCEndpoint)
13601375
}
13611376

13621377
if !cfg.traces.enabled && cfg.server.grpcListen != "" {
@@ -1446,7 +1461,7 @@ func newGRPCServer(cfg *config, tenantHeader string, tenantIDs map[string]string
14461461
authorizers map[string]rbac.Authorizer, logger log.Logger, tracesUpstreamCA []byte, tracesUpstreamCert *stdtls.Certificate,
14471462
) (*grpc.Server, error) {
14481463
connOtel, err := tracesv1.NewOTelConnection(
1449-
cfg.traces.writeEndpoint,
1464+
cfg.traces.writeOTLPGRPCEndpoint,
14501465
tracesv1.WithLogger(logger),
14511466
tracesv1.WithUpstreamTLS(tracesUpstreamCA, tracesUpstreamCert),
14521467
)

test/e2e/configs.go

+13-10
Original file line numberDiff line numberDiff line change
@@ -228,15 +228,17 @@ func createRulesYAML(
228228

229229
const otelConfigTpl = `
230230
receivers:
231-
otlp/grpc:
231+
otlp:
232232
protocols:
233233
grpc:
234234
endpoint: "0.0.0.0:4317"
235+
http:
236+
endpoint: "0.0.0.0:4318"
235237
236238
exporters:
237-
logging:
238-
logLevel: debug
239-
jaeger:
239+
debug:
240+
verbosity: detailed
241+
otlp:
240242
endpoint: %[1]s
241243
tls:
242244
insecure: true
@@ -250,8 +252,8 @@ service:
250252
251253
pipelines:
252254
traces/grpc:
253-
receivers: [otlp/grpc]
254-
exporters: [logging,jaeger]
255+
receivers: [otlp]
256+
exporters: [debug,otlp]
255257
`
256258

257259
// createOtelCollectorConfigYAML() creates YAML for an Open Telemetry collector inside the Observatorium API boundary.
@@ -288,8 +290,8 @@ receivers:
288290
endpoint: 0.0.0.0:4317
289291
290292
exporters:
291-
logging:
292-
logLevel: debug
293+
debug:
294+
verbosity: detailed
293295
otlp:
294296
endpoint: %[1]s
295297
# auth:
@@ -310,12 +312,13 @@ service:
310312
extensions: [health_check]
311313
telemetry:
312314
metrics:
313-
address: localhost:8889
315+
address: :8888
316+
level: detailed
314317
# extensions: [oauth2client]
315318
pipelines:
316319
traces:
317320
receivers: [otlp]
318-
exporters: [logging,otlp]
321+
exporters: [debug,otlp]
319322
`
320323

321324
// createOtelForwardingCollectorConfigYAML() creates YAML for an Open Telemetry collector outside the

test/e2e/interactive_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func TestInteractiveSetup(t *testing.T) {
2323
readEndpoint, writeEndpoint, readExtEndpoint := startServicesForMetrics(t, e)
2424
logsEndpoint, logsExtEndpoint := startServicesForLogs(t, e)
2525
rulesEndpoint := startServicesForRules(t, e)
26-
internalOtlpEndpoint, httpExternalQueryEndpoint, httpInternalQueryEndpoint := startServicesForTraces(t, e)
26+
internalOTLPGRPCEndpoint, internalOTLPHTTPEndpoint, httpExternalQueryEndpoint, httpInternalQueryEndpoint := startServicesForTraces(t, e)
2727

2828
api, err := newObservatoriumAPIService(
2929
e,
@@ -32,7 +32,8 @@ func TestInteractiveSetup(t *testing.T) {
3232
withRulesEndpoint("http://"+rulesEndpoint),
3333
withRateLimiter(rateLimiterAddr),
3434
withGRPCListenEndpoint(":8317"),
35-
withOtelTraceEndpoint(internalOtlpEndpoint),
35+
withOTLPGRPCTraceEndpoint(internalOTLPGRPCEndpoint),
36+
withOTLPHTTPTraceEndpoint(internalOTLPHTTPEndpoint),
3637
withJaegerEndpoint("http://"+httpInternalQueryEndpoint),
3738
)
3839
testutil.Ok(t, err)

test/e2e/services.go

+32-27
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,10 @@ const (
2626
upImage = "quay.io/observatorium/up:master-2022-10-27-d8bb06f"
2727
alertmanagerImage = "quay.io/prometheus/alertmanager:v0.25.0"
2828

29-
jaegerAllInOneImage = "jaegertracing/all-in-one:1.31"
30-
otelCollectorImage = "otel/opentelemetry-collector:0.45.0"
29+
jaegerAllInOneImage = "jaegertracing/all-in-one:1.57.0"
30+
otelCollectorImage = "ghcr.io/open-telemetry/opentelemetry-collector-releases/opentelemetry-collector-contrib:0.101.0"
3131
tempoImage = "grafana/tempo:2.2.4"
3232

33-
// Note that if the forwarding collector uses OIDC flow instead of hard-coding
34-
// the bearer token we would need
35-
// "otel/opentelemetry-collector-contrib:0.45.0" instead.
36-
otelFwdCollectorImage = "otel/opentelemetry-collector:0.45.0"
37-
3833
dexImage = "dexidp/dex:v2.30.0"
3934
opaImage = "openpolicyagent/opa:0.47.4-static"
4035
gubernatorImage = "ghcr.io/mailgun/gubernator:v2.0.0-rc.36"
@@ -96,16 +91,16 @@ func startServicesForLogs(t *testing.T, e e2e.Environment) (
9691
return loki.InternalEndpoint("http"), loki.Endpoint("http")
9792
}
9893

99-
func startServicesForTraces(t *testing.T, e e2e.Environment) (otlpGRPCEndpoint, jaegerExternalHttpEndpoint, jaegerInternalHttpEndpoint string) {
94+
func startServicesForTraces(t *testing.T, e e2e.Environment) (otlpGRPCEndpoint, otlpHTTPEndpoint, jaegerExternalHttpEndpoint, jaegerInternalHttpEndpoint string) {
10095
prometheus := e2edb.NewPrometheus(e, "prometheus")
10196
testutil.Ok(t, e2e.StartAndWaitReady(prometheus))
10297

10398
jaeger := e.Runnable("jaeger").
10499
WithPorts(
105100
map[string]int{
106-
"jaeger.grpc": 14250, // Receives traces
107-
"grpc.query": 16685, // Query
108-
"http.query": 16686, // Query
101+
"otlp.grpc": 4317, // Receiving traces
102+
"grpc.query": 16685, // Query
103+
"http.query": 16686, // Query
109104
}).
110105
Init(e2e.StartOptions{
111106
Image: jaegerAllInOneImage,
@@ -115,7 +110,7 @@ func startServicesForTraces(t *testing.T, e e2e.Environment) (otlpGRPCEndpoint,
115110
},
116111
})
117112

118-
createOtelCollectorConfigYAML(t, e, jaeger.InternalEndpoint("jaeger.grpc"))
113+
createOtelCollectorConfigYAML(t, e, jaeger.InternalEndpoint("otlp.grpc"))
119114

120115
otel := e.Runnable("otel-collector").
121116
WithPorts(
@@ -139,7 +134,7 @@ func startServicesForTraces(t *testing.T, e e2e.Environment) (otlpGRPCEndpoint,
139134
testutil.Ok(t, e2e.StartAndWaitReady(jaeger))
140135
testutil.Ok(t, e2e.StartAndWaitReady(otel))
141136

142-
return otel.InternalEndpoint("grpc"), jaeger.Endpoint("http.query"), jaeger.InternalEndpoint("http.query")
137+
return otel.InternalEndpoint("grpc"), otel.InternalEndpoint("http"), jaeger.Endpoint("http.query"), jaeger.InternalEndpoint("http.query")
143138
}
144139

145140
func startTempoServicesForTraces(t *testing.T, e e2e.Environment) (tempoDistributorEndpoint, internalTempoQueryEndpoint, tempoQueryEndpoint string) {
@@ -348,16 +343,17 @@ func newOPAService(e e2e.Environment) *e2emon.InstrumentedRunnable {
348343
}
349344

350345
type apiOptions struct {
351-
logsEndpoint string
352-
metricsReadEndpoint string
353-
metricsWriteEndpoint string
354-
metricsRulesEndpoint string
355-
alertmanagerEndpoint string
356-
ratelimiterAddr string
357-
tracesWriteEndpoint string
358-
gRPCListenEndpoint string
359-
jaegerQueryEndpoint string
360-
tempoEndpoint string
346+
logsEndpoint string
347+
metricsReadEndpoint string
348+
metricsWriteEndpoint string
349+
metricsRulesEndpoint string
350+
alertmanagerEndpoint string
351+
ratelimiterAddr string
352+
tracesWriteOTLPGRPCEndpoint string
353+
tracesWriteOTLPHTTPEndpoint string
354+
gRPCListenEndpoint string
355+
jaegerQueryEndpoint string
356+
tempoEndpoint string
361357

362358
// "experimental.traces.read.endpoint-template" value.
363359
tracesExperimentalTemplateReadEndpoint string
@@ -378,9 +374,15 @@ func withMetricsEndpoints(readEndpoint string, writeEndpoint string) apiOption {
378374
}
379375
}
380376

381-
func withOtelTraceEndpoint(exportEndpoint string) apiOption {
377+
func withOTLPGRPCTraceEndpoint(exportEndpoint string) apiOption {
378+
return func(o *apiOptions) {
379+
o.tracesWriteOTLPGRPCEndpoint = exportEndpoint
380+
}
381+
}
382+
383+
func withOTLPHTTPTraceEndpoint(exportEndpoint string) apiOption {
382384
return func(o *apiOptions) {
383-
o.tracesWriteEndpoint = exportEndpoint
385+
o.tracesWriteOTLPHTTPEndpoint = exportEndpoint
384386
}
385387
}
386388

@@ -476,8 +478,11 @@ func newObservatoriumAPIService(
476478
args = append(args, "--middleware.rate-limiter.grpc-address="+opts.ratelimiterAddr)
477479
}
478480

479-
if opts.tracesWriteEndpoint != "" {
480-
args = append(args, "--traces.write.endpoint="+opts.tracesWriteEndpoint)
481+
if opts.tracesWriteOTLPGRPCEndpoint != "" {
482+
args = append(args, "--traces.write.otlpgrpc.endpoint="+opts.tracesWriteOTLPGRPCEndpoint)
483+
}
484+
if opts.tracesWriteOTLPHTTPEndpoint != "" {
485+
args = append(args, "--traces.write.otlphttp.endpoint="+opts.tracesWriteOTLPHTTPEndpoint)
481486
}
482487

483488
if opts.tracesExperimentalTemplateReadEndpoint != "" {

0 commit comments

Comments
 (0)