diff --git a/app/vtinsert/opentelemetry/opentelemetry.go b/app/vtinsert/opentelemetry/opentelemetry.go index e011df515..31011d483 100644 --- a/app/vtinsert/opentelemetry/opentelemetry.go +++ b/app/vtinsert/opentelemetry/opentelemetry.go @@ -192,7 +192,6 @@ func pushFieldsFromScopeSpans(ss *otelpb.ScopeSpans, commonFields []logstorage.F func pushFieldsFromSpan(span *otelpb.Span, scopeCommonFields []logstorage.Field, lmp insertutil.LogMessageProcessor) []logstorage.Field { fields := scopeCommonFields fields = append(fields, - logstorage.Field{Name: otelpb.TraceIDField, Value: span.TraceID}, logstorage.Field{Name: otelpb.SpanIDField, Value: span.SpanID}, logstorage.Field{Name: otelpb.TraceStateField, Value: span.TraceState}, logstorage.Field{Name: otelpb.ParentSpanIDField, Value: span.ParentSpanID}, @@ -240,17 +239,22 @@ func pushFieldsFromSpan(span *otelpb.Span, scopeCommonFields []logstorage.Field, // append link attributes fields = appendKeyValuesWithPrefixSuffix(fields, link.Attributes, "", linkFieldPrefix+otelpb.LinkAttrPrefix, linkFieldSuffix) } - fields = append(fields, logstorage.Field{ - Name: "_msg", - Value: msgFieldValue, - }) + fields = append(fields, + logstorage.Field{Name: "_msg", Value: msgFieldValue}, + // MUST: always place TraceIDField at the last. The Trace ID is required for data distribution. + // Placing it at the last position helps netinsert to find it easily, without adding extra field to + // *logstorage.InsertRow structure, which is required due to the sync between logstorage and VictoriaTraces. + // todo: @jiekun the trace ID field MUST be the last field. add extra ways to secure it. + logstorage.Field{Name: otelpb.TraceIDField, Value: span.TraceID}, + ) lmp.AddRow(int64(span.EndTimeUnixNano), fields, nil) // create an entity in trace-id-idx stream, if this trace_id hasn't been seen before. if !traceIDCache.Has([]byte(span.TraceID)) { lmp.AddRow(int64(span.StartTimeUnixNano), []logstorage.Field{ - {Name: otelpb.TraceIDIndexFieldName, Value: span.TraceID}, {Name: "_msg", Value: msgFieldValue}, + // todo: @jiekun the trace ID field MUST be the last field. add extra ways to secure it. + {Name: otelpb.TraceIDIndexFieldName, Value: span.TraceID}, }, []logstorage.Field{{Name: otelpb.TraceIDIndexStreamName, Value: strconv.FormatUint(xxhash.Sum64String(span.TraceID)%otelpb.TraceIDIndexPartitionCount, 10)}}) traceIDCache.Set([]byte(span.TraceID), nil) } diff --git a/app/vtstorage/netinsert/netinsert.go b/app/vtstorage/netinsert/netinsert.go index 06c3b4146..188603ed3 100644 --- a/app/vtstorage/netinsert/netinsert.go +++ b/app/vtstorage/netinsert/netinsert.go @@ -20,7 +20,10 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" "github.com/VictoriaMetrics/metrics" + "github.com/cespare/xxhash/v2" "github.com/valyala/fastrand" + + otelpb "github.com/VictoriaMetrics/VictoriaTraces/lib/protoparser/opentelemetry/pb" ) // the maximum size of a single data block sent to storage node. @@ -341,7 +344,17 @@ func (s *Storage) MustStop() { // AddRow adds the given log row into s. func (s *Storage) AddRow(streamHash uint64, r *logstorage.InsertRow) { - idx := s.srt.getNodeIdx(streamHash) + // trace ID should always be put in the last field. + // but for better compatibility, we should search for the trace_id in reverse order, + // instead of using the last one in the `r.Fields` slice directly. + var traceID string + for i := len(r.Fields) - 1; i >= 0; i-- { + if r.Fields[i].Name == otelpb.TraceIDField { + traceID = r.Fields[i].Value + break + } + } + idx := s.srt.getNodeIdx(streamHash, traceID) sn := s.sns[idx] sn.addRow(r) } @@ -378,12 +391,26 @@ func newStreamRowsTracker(nodesCount int) *streamRowsTracker { } } -func (srt *streamRowsTracker) getNodeIdx(streamHash uint64) uint64 { +// getNodeIdx return the node index for a specific traceID +func (srt *streamRowsTracker) getNodeIdx(streamHash uint64, traceID string) uint64 { if srt.nodesCount == 1 { // Fast path for a single node. return 0 } + // common path: distribute data by trace ID. + if traceID != "" { + // When the node count is changed after a restart, the spans of a trace might be + // distributed across different nodes. Therefore, there's no guarantee that a trace query + // can find all the spans of a trace in ONE vtstorage instance. + // + // This could potentially affect the service graph, which aggregates data within each vtstorage instance. + // However, since only a small number of traces are affected, the overall trend will remain consistent. + return xxhash.Sum64String(traceID) % uint64(srt.nodesCount) + } + + // for backward compatible, keep the original random distribution logic. + // only data without trace ID will go to the following path. srt.mu.Lock() defer srt.mu.Unlock() diff --git a/app/vtstorage/netinsert/netinsert_test.go b/app/vtstorage/netinsert/netinsert_test.go index 957156137..3d4a9fea8 100644 --- a/app/vtstorage/netinsert/netinsert_test.go +++ b/app/vtstorage/netinsert/netinsert_test.go @@ -1,6 +1,7 @@ package netinsert import ( + cryptorand "crypto/rand" "fmt" "math" "math/rand" @@ -26,7 +27,7 @@ func TestStreamRowsTracker(t *testing.T) { for i := 0; i < rowsCount; i++ { streamIdx := rng.Intn(streamsCount) h := streamHashes[streamIdx] - nodeIdx := srt.getNodeIdx(h) + nodeIdx := srt.getNodeIdx(h, cryptorand.Text()) rowsPerNode[nodeIdx]++ }