Skip to content

Commit bcea8fa

Browse files
JayiceZjiekun
andauthored
feature: [vt] support json encoding in opentelemery insert (#51)
related to #41 Opentelemetry support both protobuf and json encoding (https://opentelemetry.io/docs/specs/otlp/#otlphttp), so it will be nice if VictoriaTraces can accept json format traces. Addition: - Regarding the mapping between json and protobuf, I followed the standards described in the [otel documentation](https://opentelemetry.io/docs/specs/otlp/#json-protobuf-encoding). - How i test this feature? - I ran [otel demo](https://github.com/open-telemetry/opentelemetry-demo) to push json-format traces to VictoriaTraces. - I added a new case of json-format request in the integration test --------- Signed-off-by: JayiceZ <[email protected]> Co-authored-by: Jiekun <[email protected]>
1 parent 4292c24 commit bcea8fa

File tree

12 files changed

+215
-71
lines changed

12 files changed

+215
-71
lines changed

app/victoria-traces/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
9090
})
9191
return true
9292
}
93+
9394
if vtinsert.RequestHandler(w, r) {
9495
return true
9596
}

app/vtinsert/opentelemetry/opentelemetry.go

Lines changed: 79 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,12 @@ var maxRequestSize = flagutil.NewBytes("opentelemetry.traces.maxRequestSize", 64
2222

2323
var (
2424
requestsProtobufTotal = metrics.NewCounter(`vt_http_requests_total{path="/insert/opentelemetry/v1/traces",format="protobuf"}`)
25-
errorsTotal = metrics.NewCounter(`vt_http_errors_total{path="/insert/opentelemetry/v1/traces",format="protobuf"}`)
25+
errorsProtobufTotal = metrics.NewCounter(`vt_http_errors_total{path="/insert/opentelemetry/v1/traces",format="protobuf"}`)
26+
requestsJSONTotal = metrics.NewCounter(`vt_http_requests_total{path="/insert/opentelemetry/v1/traces",format="json"}`)
27+
errorsJSONTotal = metrics.NewCounter(`vt_http_errors_total{path="/insert/opentelemetry/v1/traces",format="json"}`)
2628

2729
requestProtobufDuration = metrics.NewSummary(`vt_http_request_duration_seconds{path="/insert/opentelemetry/v1/traces",format="protobuf"}`)
30+
requestJSONDuration = metrics.NewSummary(`vt_http_request_duration_seconds{path="/insert/opentelemetry/v1/traces",format="json"}`)
2831
)
2932

3033
var (
@@ -37,25 +40,37 @@ var (
3740
traceIDCache = fastcache.New(32 * 1024 * 1024)
3841
)
3942

43+
const (
44+
contentTypeProtobuf = "application/x-protobuf"
45+
contentTypeJSON = "application/json"
46+
)
47+
4048
// RequestHandler processes Opentelemetry insert requests
4149
func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
4250
switch path {
4351
// use the same path as opentelemetry collector
4452
// https://opentelemetry.io/docs/specs/otlp/#otlphttp-request
4553
case "/insert/opentelemetry/v1/traces":
46-
if r.Header.Get("Content-Type") == "application/json" {
47-
httpserver.Errorf(w, r, "json encoding isn't supported for opentelemetry format. Use protobuf encoding")
48-
return true
49-
}
50-
handleProtobuf(r, w)
51-
return true
54+
return handleTracesRequest(r, w)
55+
default:
56+
return false
57+
}
58+
}
59+
60+
func handleTracesRequest(r *http.Request, w http.ResponseWriter) bool {
61+
switch contentType := r.Header.Get("Content-Type"); contentType {
62+
case contentTypeProtobuf:
63+
handleProtobufRequest(r, w)
64+
case contentTypeJSON:
65+
handleJSONRequest(r, w)
5266
default:
67+
httpserver.Errorf(w, r, "Content-Type %s isn't supported for opentelemetry format. Use protobuf or JSON encoding", contentType)
5368
return false
5469
}
70+
return true
5571
}
5672

57-
// handleProtobuf handles the trace ingestion request.
58-
func handleProtobuf(r *http.Request, w http.ResponseWriter) {
73+
func handleProtobufRequest(r *http.Request, w http.ResponseWriter) {
5974
startTime := time.Now()
6075
requestsProtobufTotal.Inc()
6176

@@ -69,36 +84,81 @@ func handleProtobuf(r *http.Request, w http.ResponseWriter) {
6984
// for potentially better efficiency.
7085
cp.StreamFields = append(mandatoryStreamFields, cp.StreamFields...)
7186

72-
if err := insertutil.CanWriteData(); err != nil {
87+
if err = insertutil.CanWriteData(); err != nil {
7388
httpserver.Errorf(w, r, "%s", err)
7489
return
7590
}
7691

7792
encoding := r.Header.Get("Content-Encoding")
7893
err = protoparserutil.ReadUncompressedData(r.Body, encoding, maxRequestSize, func(data []byte) error {
79-
lmp := cp.NewLogMessageProcessor("opentelemetry_traces_protobuf", false)
80-
err := pushProtobufRequest(data, lmp)
94+
var (
95+
req otelpb.ExportTraceServiceRequest
96+
callbackErr error
97+
)
98+
lmp := cp.NewLogMessageProcessor("opentelemetry_traces", false)
99+
if callbackErr = req.UnmarshalProtobuf(data); callbackErr != nil {
100+
errorsProtobufTotal.Inc()
101+
return fmt.Errorf("cannot unmarshal request from %d protobuf bytes: %w", len(data), callbackErr)
102+
}
103+
callbackErr = pushExportTraceServiceRequest(&req, lmp)
81104
lmp.MustClose()
82-
return err
105+
return callbackErr
83106
})
84107
if err != nil {
85108
httpserver.Errorf(w, r, "cannot read OpenTelemetry protocol data: %s", err)
86109
return
87110
}
88-
89111
// update requestProtobufDuration only for successfully parsed requests
90112
// There is no need in updating requestProtobufDuration for request errors,
91113
// since their timings are usually much smaller than the timing for successful request parsing.
92114
requestProtobufDuration.UpdateDuration(startTime)
93115
}
94116

95-
func pushProtobufRequest(data []byte, lmp insertutil.LogMessageProcessor) error {
96-
var req otelpb.ExportTraceServiceRequest
97-
if err := req.UnmarshalProtobuf(data); err != nil {
98-
errorsTotal.Inc()
99-
return fmt.Errorf("cannot unmarshal request from %d bytes: %w", len(data), err)
117+
func handleJSONRequest(r *http.Request, w http.ResponseWriter) {
118+
startTime := time.Now()
119+
requestsJSONTotal.Inc()
120+
121+
cp, err := insertutil.GetCommonParams(r)
122+
if err != nil {
123+
httpserver.Errorf(w, r, "cannot parse common params from request: %s", err)
124+
return
125+
}
126+
// stream fields must contain the service name and span name.
127+
// by using arguments and headers, users can also add other fields as stream fields
128+
// for potentially better efficiency.
129+
cp.StreamFields = append(mandatoryStreamFields, cp.StreamFields...)
130+
131+
if err = insertutil.CanWriteData(); err != nil {
132+
httpserver.Errorf(w, r, "%s", err)
133+
return
100134
}
101135

136+
encoding := r.Header.Get("Content-Encoding")
137+
err = protoparserutil.ReadUncompressedData(r.Body, encoding, maxRequestSize, func(data []byte) error {
138+
var (
139+
req otelpb.ExportTraceServiceRequest
140+
callbackErr error
141+
)
142+
lmp := cp.NewLogMessageProcessor("opentelemetry_traces", false)
143+
if callbackErr = req.UnmarshalJSONCustom(data); callbackErr != nil {
144+
errorsJSONTotal.Inc()
145+
return fmt.Errorf("cannot unmarshal request from %d protobuf bytes: %w", len(data), callbackErr)
146+
}
147+
callbackErr = pushExportTraceServiceRequest(&req, lmp)
148+
lmp.MustClose()
149+
return callbackErr
150+
})
151+
if err != nil {
152+
httpserver.Errorf(w, r, "cannot read OpenTelemetry protocol data: %s", err)
153+
return
154+
}
155+
// update requestJSONDuration only for successfully parsed requests
156+
// There is no need in updating requestJSONDuration for request errors,
157+
// since their timings are usually much smaller than the timing for successful request parsing.
158+
requestJSONDuration.UpdateDuration(startTime)
159+
}
160+
161+
func pushExportTraceServiceRequest(req *otelpb.ExportTraceServiceRequest, lmp insertutil.LogMessageProcessor) error {
102162
var commonFields []logstorage.Field
103163
for _, rs := range req.ResourceSpans {
104164
commonFields = commonFields[:0]

docs/victoriatraces/changelog/CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ The following `tip` changes can be tested by building VictoriaTraces components
1717
* FEATURE: [logstorage](https://docs.victoriametrics.com/victorialogs/): Upgrade VictoriaLogs dependency from [v1.27.0 to v1.33.1](https://github.com/VictoriaMetrics/VictoriaLogs/compare/v1.27.0...v1.33.1).
1818
* FEATURE: [docker compose](https://github.com/VictoriaMetrics/VictoriaTraces/tree/master/deployment/docker): add cluster docker compose environment.
1919
* FEATURE: [dashboards](https://github.com/VictoriaMetrics/VictoriaTraces/blob/master/dashboards): update dashboard for VictoriaTraces single-node and cluster to provide more charts.
20-
20+
* FEATURE: [Single-node VictoriaTraces](https://docs.victoriametrics.com/victoriatraces/) and vtinsert in [VictoriaTraces cluster](https://docs.victoriametrics.com/victoriatraces/cluster/): support [JSON protobuf encoding](https://opentelemetry.io/docs/specs/otlp/#json-protobuf-encoding) in the OpenTelemetry protocol (OTLP) for data ingestion. See [this issue](https://github.com/VictoriaMetrics/VictoriaTraces/issues/41) for details. Thanks to @JayiceZ for the [pull request](https://github.com/VictoriaMetrics/VictoriaTraces/pull/51).
21+
*
2122
* BUGFIX: [Single-node VictoriaTraces](https://docs.victoriametrics.com/victoriatraces/) and vtinsert in [VictoriaTraces cluster](https://docs.victoriametrics.com/victoriatraces/cluster/): Rename various [HTTP headers](https://docs.victoriametrics.com/victoriatraces/data-ingestion/#http-headers) prefix from `VL-` to `VT-`. These headers help with debugging and customizing stream fields. Thank @JayiceZ for [the pull request](https://github.com/VictoriaMetrics/VictoriaTraces/pull/56).
2223
* BUGFIX: all components: properly expose metadata for summaries and histograms in VictoriaMetrics components with enabled `-metrics.exposeMetadata` cmd-line flag. See [metrics#98](https://github.com/VictoriaMetrics/metrics/issues/98) for details.
2324

lib/protoparser/opentelemetry/pb/common.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010

1111
// Resource represents the corresponding OTEL protobuf message
1212
type Resource struct {
13-
Attributes []*KeyValue
13+
Attributes []*KeyValue `json:"attributes"`
1414
}
1515

1616
// marshalProtobuf marshals
@@ -49,8 +49,8 @@ func (r *Resource) unmarshalProtobuf(src []byte) (err error) {
4949

5050
// KeyValue represents the corresponding OTEL protobuf message
5151
type KeyValue struct {
52-
Key string
53-
Value *AnyValue
52+
Key string `json:"key"`
53+
Value *AnyValue `json:"value"`
5454
}
5555

5656
func (kv *KeyValue) marshalProtobuf(mm *easyproto.MessageMarshaler) {
@@ -95,13 +95,13 @@ func (kv *KeyValue) unmarshalProtobuf(src []byte) (err error) {
9595

9696
// AnyValue represents the corresponding OTEL protobuf message
9797
type AnyValue struct {
98-
StringValue *string
99-
BoolValue *bool
100-
IntValue *int64
101-
DoubleValue *float64
102-
ArrayValue *ArrayValue
103-
KeyValueList *KeyValueList
104-
BytesValue *[]byte
98+
StringValue *string `json:"stringValue"`
99+
BoolValue *bool `json:"boolValue"`
100+
IntValue *int64 `json:"intValue,string"`
101+
DoubleValue *float64 `json:"doubleValue"`
102+
ArrayValue *ArrayValue `json:"arrayValue"`
103+
KeyValueList *KeyValueList `json:"keyValueList"`
104+
BytesValue *[]byte `json:"BytesValue"`
105105
}
106106

107107
func (av *AnyValue) marshalProtobuf(mm *easyproto.MessageMarshaler) {
@@ -200,7 +200,7 @@ func (av *AnyValue) unmarshalProtobuf(src []byte) (err error) {
200200

201201
// ArrayValue represents the corresponding OTEL protobuf message
202202
type ArrayValue struct {
203-
Values []*AnyValue
203+
Values []*AnyValue `json:"values"`
204204
}
205205

206206
func (av *ArrayValue) marshalProtobuf(mm *easyproto.MessageMarshaler) {
@@ -238,7 +238,7 @@ func (av *ArrayValue) unmarshalProtobuf(src []byte) (err error) {
238238

239239
// KeyValueList represents the corresponding OTEL protobuf message
240240
type KeyValueList struct {
241-
Values []*KeyValue
241+
Values []*KeyValue `json:"values"`
242242
}
243243

244244
func (kvl *KeyValueList) marshalProtobuf(mm *easyproto.MessageMarshaler) {

lib/protoparser/opentelemetry/pb/testdata/json/1.bin

Lines changed: 1 addition & 0 deletions
Large diffs are not rendered by default.

lib/protoparser/opentelemetry/pb/testdata/json/2.bin

Lines changed: 1 addition & 0 deletions
Large diffs are not rendered by default.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"resourceSpans":[{"resource":{"attributes":[{"key":"os.type","value":{"stringValue":"linux"}},{"key":"os.description","value":{"stringValue":"Debian GNU/Linux 12 (bookworm)"}},{"key":"os.build_id","value":{"stringValue":"6.10.14-linuxkit"}},{"key":"os.name","value":{"stringValue":"Debian GNU/Linux"}},{"key":"os.version","value":{"stringValue":"12"}},{"key":"host.name","value":{"stringValue":"8e296dd17f8c"}},{"key":"process.owner","value":{"stringValue":"app"}},{"key":"process.pid","value":{"intValue":"1"}},{"key":"process.runtime.description","value":{"stringValue":".NET 8.0.17"}},{"key":"process.runtime.name","value":{"stringValue":".NET"}},{"key":"process.runtime.version","value":{"stringValue":"8.0.17"}},{"key":"container.id","value":{"stringValue":"8e296dd17f8cbb2949a2483b6cc214d4e2a7faf7b9a71c71b0ea9927d6b4a4e1"}},{"key":"telemetry.distro.name","value":{"stringValue":"opentelemetry-dotnet-instrumentation"}},{"key":"telemetry.distro.version","value":{"stringValue":"1.11.0"}},{"key":"telemetry.sdk.name","value":{"stringValue":"opentelemetry"}},{"key":"telemetry.sdk.language","value":{"stringValue":"dotnet"}},{"key":"telemetry.sdk.version","value":{"stringValue":"1.11.2"}},{"key":"service.name","value":{"stringValue":"accounting"}},{"key":"service.namespace","value":{"stringValue":"opentelemetry-demo"}},{"key":"service.version","value":{"stringValue":"2.0.2"}}]},"scopeSpans":[{"scope":{"name":"OpenTelemetry.AutoInstrumentation.Kafka"},"spans":[{"traceId":"eaa99d2871f6e5d0bd9ac14bdad6629c","spanId":"5e9a917c821deb60","parentSpanId":"","flags":257,"name":"orders receive","kind":5,"startTimeUnixNano":"1757320933956646600","endTimeUnixNano":"1757320934273447500","attributes":[{"key":"messaging.client_id","value":{"stringValue":"rdkafka#consumer-1"}},{"key":"messaging.kafka.consumer.group","value":{"stringValue":"accounting"}},{"key":"messaging.operation","value":{"stringValue":"receive"}},{"key":"messaging.system","value":{"stringValue":"kafka"}},{"key":"messaging.destination.name","value":{"stringValue":"orders"}},{"key":"messaging.kafka.destination.partition","value":{"intValue":"0"}},{"key":"messaging.kafka.message.offset","value":{"intValue":"-1001"}}],"events":[{"timeUnixNano":"1757320934268187900","name":"exception","attributes":[{"key":"exception.message","value":{"stringValue":"Subscribed topic not available: orders: Broker: Unknown topic or partition"}},{"key":"exception.stacktrace","value":{"stringValue":"Confluent.Kafka.ConsumeException: Subscribed topic not available: orders: Broker: Unknown topic or partition\n at Confluent.Kafka.Consumer`2.Consume(Int32 millisecondsTimeout)\n at Confluent.Kafka.Consumer`2.Consume(CancellationToken cancellationToken)\n at Accounting.Consumer.StartListening() in /src/Accounting/Consumer.cs:line 41"}},{"key":"exception.type","value":{"stringValue":"Confluent.Kafka.ConsumeException"}}]}],"status":{"message":"Subscribed topic not available: orders: Broker: Unknown topic or partition","code":2}}]}]},{"resource":{"attributes":[{"key":"service.name","value":{"stringValue":"frontend-proxy"}},{"key":"service.namespace","value":{"stringValue":"opentelemetry-demo"}},{"key":"service.version","value":{"stringValue":"2.0.2"}}]},"scopeSpans":[{"scope":{},"spans":[{"traceId":"ccfc43e08430655c257ebe80d3dd2bd4","spanId":"28d244b8e8788f7e","parentSpanId":"cbb37701f41cb9ee","name":"router frontend egress","kind":3,"startTimeUnixNano":"1757320936665313000","endTimeUnixNano":"1757320936733610000","attributes":[{"key":"http.protocol","value":{"stringValue":"HTTP/1.1"}},{"key":"upstream_address","value":{"stringValue":"172.18.0.24:8080"}},{"key":"peer.address","value":{"stringValue":"172.18.0.24:8080"}},{"key":"component","value":{"stringValue":"proxy"}},{"key":"upstream_cluster","value":{"stringValue":"frontend"}},{"key":"upstream_cluster.name","value":{"stringValue":"frontend"}},{"key":"http.status_code","value":{"stringValue":"200"}},{"key":"response_flags","value":{"stringValue":"-"}}],"status":{}},{"traceId":"ccfc43e08430655c257ebe80d3dd2bd4","spanId":"cbb37701f41cb9ee","parentSpanId":"","name":"ingress","kind":2,"startTimeUnixNano":"1757320936664715000","endTimeUnixNano":"1757320936733661000","attributes":[{"key":"node_id","value":{"stringValue":""}},{"key":"zone","value":{"stringValue":""}},{"key":"guid:x-request-id","value":{"stringValue":"56d080ef-fc25-9c26-8e42-979ee008027b"}},{"key":"http.url","value":{"stringValue":"http://frontend-proxy:8080/_next/static/chunks/framework-8c66923947fb0c35.js"}},{"key":"http.method","value":{"stringValue":"GET"}},{"key":"downstream_cluster","value":{"stringValue":"-"}},{"key":"user_agent","value":{"stringValue":"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) HeadlessChrome/136.0.0.0 Safari/537.36"}},{"key":"http.protocol","value":{"stringValue":"HTTP/1.1"}},{"key":"peer.address","value":{"stringValue":"172.18.0.25"}},{"key":"request_size","value":{"stringValue":"0"}},{"key":"response_size","value":{"stringValue":"57621"}},{"key":"component","value":{"stringValue":"proxy"}},{"key":"upstream_cluster","value":{"stringValue":"frontend"}},{"key":"upstream_cluster.name","value":{"stringValue":"frontend"}},{"key":"http.status_code","value":{"stringValue":"200"}},{"key":"response_flags","value":{"stringValue":"-"}}],"status":{}},{"traceId":"0f781dd1c80ba5328d134314c6003b48","spanId":"aa749a44860f70e7","parentSpanId":"55b80a9567dd47b6","name":"router frontend egress","kind":3,"startTimeUnixNano":"1757320936813439000","endTimeUnixNano":"1757320936827038000","attributes":[{"key":"http.protocol","value":{"stringValue":"HTTP/1.1"}},{"key":"upstream_address","value":{"stringValue":"172.18.0.24:8080"}},{"key":"peer.address","value":{"stringValue":"172.18.0.24:8080"}},{"key":"component","value":{"stringValue":"proxy"}},{"key":"upstream_cluster","value":{"stringValue":"frontend"}},{"key":"upstream_cluster.name","value":{"stringValue":"frontend"}},{"key":"http.status_code","value":{"stringValue":"200"}},{"key":"response_flags","value":{"stringValue":"-"}}],"status":{}},{"traceId":"0f781dd1c80ba5328d134314c6003b48","spanId":"55b80a9567dd47b6","parentSpanId":"","name":"ingress","kind":2,"startTimeUnixNano":"1757320936813279000","endTimeUnixNano":"1757320936827054000","attributes":[{"key":"node_id","value":{"stringValue":""}},{"key":"zone","value":{"stringValue":""}},{"key":"guid:x-request-id","value":{"stringValue":"ebc78a99-d4f3-9ac6-9e8c-a415e8a81df7"}},{"key":"http.url","value":{"stringValue":"http://frontend-proxy:8080/_next/static/_g5lJ7kKnYpQWGjSpoVo5/_ssgManifest.js"}},{"key":"http.method","value":{"stringValue":"GET"}},{"key":"downstream_cluster","value":{"stringValue":"-"}},{"key":"user_agent","value":{"stringValue":"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) HeadlessChrome/136.0.0.0 Safari/537.36"}},{"key":"http.protocol","value":{"stringValue":"HTTP/1.1"}},{"key":"peer.address","value":{"stringValue":"172.18.0.25"}},{"key":"request_size","value":{"stringValue":"0"}},{"key":"response_size","value":{"stringValue":"77"}},{"key":"component","value":{"stringValue":"proxy"}},{"key":"upstream_cluster","value":{"stringValue":"frontend"}},{"key":"upstream_cluster.name","value":{"stringValue":"frontend"}},{"key":"http.status_code","value":{"stringValue":"200"}},{"key":"response_flags","value":{"stringValue":"-"}}],"status":{}},{"traceId":"3c0a3fa93f41d1bc5f93acdc7cecb577","spanId":"d40390d9c3b5d1e5","parentSpanId":"e4ed749bdebb51a9","name":"router frontend egress","kind":3,"startTimeUnixNano":"1757320937207177000","endTimeUnixNano":"1757320937214023000","attributes":[{"key":"http.protocol","value":{"stringValue":"HTTP/1.1"}},{"key":"upstream_address","value":{"stringValue":"172.18.0.24:8080"}},{"key":"peer.address","value":{"stringValue":"172.18.0.24:8080"}},{"key":"component","value":{"stringValue":"proxy"}},{"key":"upstream_cluster","value":{"stringValue":"frontend"}},{"key":"upstream_cluster.name","value":{"stringValue":"frontend"}},{"key":"http.status_code","value":{"stringValue":"200"}},{"key":"response_flags","value":{"stringValue":"-"}}],"status":{}}]}]}]}
18.3 KB
Binary file not shown.
41.3 KB
Binary file not shown.
4.13 KB
Binary file not shown.

0 commit comments

Comments
 (0)