Skip to content

Commit da7e32f

Browse files
committed
support compression in grpc endpoint
1 parent 955d83c commit da7e32f

File tree

2 files changed

+30
-10
lines changed

2 files changed

+30
-10
lines changed

app/vtinsert/opentelemetry/grpc.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"encoding/binary"
55
"fmt"
66
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
7+
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
78
"github.com/VictoriaMetrics/VictoriaTraces/lib/protoparser/opentelemetry/pb"
89
"io"
910
"net/http"
@@ -29,7 +30,7 @@ func getProtobufData(r *http.Request) ([]byte, error) {
2930
return nil, &httpserver.ErrorWithStatusCode{StatusCode: http.StatusBadRequest, Err: fmt.Errorf("invalid grpc header length: %d", len(reqBody))}
3031
}
3132
grpcHeader := reqBody[:5]
32-
if isCompress := grpcHeader[0]; isCompress != 0 {
33+
if isCompress := grpcHeader[0]; isCompress != 0 && isCompress != 1 {
3334
return nil, &httpserver.ErrorWithStatusCode{StatusCode: http.StatusBadRequest, Err: fmt.Errorf("grpc compression not supporte")}
3435
}
3536
messageLength := binary.BigEndian.Uint32(grpcHeader[1:5])
@@ -55,7 +56,16 @@ func writeExportTraceResponses(w http.ResponseWriter, rejectedSpans int64, error
5556
w.Header().Set("Content-Type", "application/grpc+proto")
5657
w.Header().Set("Trailer", "grpc-status, grpc-message")
5758

58-
w.Write(grpcRespData)
59+
writtenLen, err := w.Write(grpcRespData)
60+
if writtenLen != len(grpcRespData) {
61+
logger.Errorf("unexpected write of %d bytes in replying OLTP export grpc request, expected:%d", writtenLen, len(grpcRespData))
62+
return
63+
}
64+
if err != nil {
65+
logger.Errorf("failed to reply OLTP export grpc request , error:%s", err)
66+
return
67+
}
68+
5969
w.Header().Set("Grpc-Status", "0")
6070
w.Header().Set("Grpc-Message", "")
6171

app/vtinsert/opentelemetry/opentelemetry.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package opentelemetry
22

33
import (
4+
"bytes"
45
"fmt"
56
"net/http"
67
"strconv"
@@ -165,7 +166,6 @@ func GrpcRequestHandler(r *http.Request, w http.ResponseWriter) {
165166
httpserver.Errorf(w, r, "failed to process grpc request:%s", &httpserver.ErrorWithStatusCode{Err: fmt.Errorf("grpc method not found: %s", r.URL.Path), StatusCode: http.StatusNotFound})
166167
return
167168
}
168-
169169
cp, err := insertutil.GetCommonParams(r)
170170
if err != nil {
171171
httpserver.Errorf(w, r, "cannot parse common params from request: %s", err)
@@ -186,15 +186,26 @@ func GrpcRequestHandler(r *http.Request, w http.ResponseWriter) {
186186
httpserver.Errorf(w, r, "failed to get protobuf data from request, error: %s", err)
187187
return
188188
}
189+
encoding := r.Header.Get("grpc-encoding")
190+
191+
err = protoparserutil.ReadUncompressedData(bytes.NewReader(protobufData), encoding, maxRequestSize, func(data []byte) error {
192+
var (
193+
req otelpb.ExportTraceServiceRequest
194+
callbackErr error
195+
)
196+
lmp := cp.NewLogMessageProcessor("opentelemetry_traces", false)
197+
if callbackErr = req.UnmarshalProtobuf(data); callbackErr != nil {
198+
return fmt.Errorf("cannot unmarshal request from %d protobuf bytes: %w", len(data), callbackErr)
199+
}
200+
callbackErr = pushExportTraceServiceRequest(&req, lmp)
201+
lmp.MustClose()
202+
return callbackErr
203+
})
189204

190-
var req otelpb.ExportTraceServiceRequest
191-
lmp := cp.NewLogMessageProcessor("opentelemetry_traces", false)
192-
if err = req.UnmarshalProtobuf(protobufData); err != nil {
193-
httpserver.Errorf(w, r, "cannot unmarshal request from %d protobuf bytes: %w", len(protobufData), err)
205+
if err != nil {
206+
httpserver.Errorf(w, r, "cannot read OpenTelemetry protocol data: %s", err)
194207
return
195208
}
196-
_ = pushExportTraceServiceRequest(&req, lmp)
197-
lmp.MustClose()
198209

199210
writeExportTraceResponses(w, 0, "")
200211
return
@@ -232,7 +243,6 @@ func pushFieldsFromScopeSpans(ss *otelpb.ScopeSpans, commonFields []logstorage.F
232243

233244
func pushFieldsFromSpan(span *otelpb.Span, scopeCommonFields []logstorage.Field, lmp insertutil.LogMessageProcessor) []logstorage.Field {
234245
fields := scopeCommonFields
235-
println(span.TraceID)
236246
fields = append(fields,
237247
logstorage.Field{Name: otelpb.TraceIDField, Value: span.TraceID},
238248
logstorage.Field{Name: otelpb.SpanIDField, Value: span.SpanID},

0 commit comments

Comments
 (0)