Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions app/vtinsert/opentelemetry/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ func pushFieldsFromScopeSpans(ss *otelpb.ScopeSpans, commonFields []logstorage.F
func pushFieldsFromSpan(span *otelpb.Span, scopeCommonFields []logstorage.Field, lmp insertutil.LogMessageProcessor) []logstorage.Field {
fields := scopeCommonFields
fields = append(fields,
logstorage.Field{Name: otelpb.TraceIDField, Value: span.TraceID},
logstorage.Field{Name: otelpb.SpanIDField, Value: span.SpanID},
logstorage.Field{Name: otelpb.TraceStateField, Value: span.TraceState},
logstorage.Field{Name: otelpb.ParentSpanIDField, Value: span.ParentSpanID},
Expand Down Expand Up @@ -240,17 +239,22 @@ func pushFieldsFromSpan(span *otelpb.Span, scopeCommonFields []logstorage.Field,
// append link attributes
fields = appendKeyValuesWithPrefixSuffix(fields, link.Attributes, "", linkFieldPrefix+otelpb.LinkAttrPrefix, linkFieldSuffix)
}
fields = append(fields, logstorage.Field{
Name: "_msg",
Value: msgFieldValue,
})
fields = append(fields,
logstorage.Field{Name: "_msg", Value: msgFieldValue},
// MUST: always place TraceIDField at the last. The Trace ID is required for data distribution.
// Placing it at the last position helps netinsert to find it easily, without adding extra field to
// *logstorage.InsertRow structure, which is required due to the sync between logstorage and VictoriaTraces.
// todo: @jiekun the trace ID field MUST be the last field. add extra ways to secure it.
logstorage.Field{Name: otelpb.TraceIDField, Value: span.TraceID},
)
lmp.AddRow(int64(span.EndTimeUnixNano), fields, nil)

// create an entity in trace-id-idx stream, if this trace_id hasn't been seen before.
if !traceIDCache.Has([]byte(span.TraceID)) {
lmp.AddRow(int64(span.StartTimeUnixNano), []logstorage.Field{
{Name: otelpb.TraceIDIndexFieldName, Value: span.TraceID},
{Name: "_msg", Value: msgFieldValue},
// todo: @jiekun the trace ID field MUST be the last field. add extra ways to secure it.
{Name: otelpb.TraceIDIndexFieldName, Value: span.TraceID},
}, []logstorage.Field{{Name: otelpb.TraceIDIndexStreamName, Value: strconv.FormatUint(xxhash.Sum64String(span.TraceID)%otelpb.TraceIDIndexPartitionCount, 10)}})
traceIDCache.Set([]byte(span.TraceID), nil)
}
Expand Down
31 changes: 29 additions & 2 deletions app/vtstorage/netinsert/netinsert.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/metrics"
"github.com/cespare/xxhash/v2"
"github.com/valyala/fastrand"

otelpb "github.com/VictoriaMetrics/VictoriaTraces/lib/protoparser/opentelemetry/pb"
)

// the maximum size of a single data block sent to storage node.
Expand Down Expand Up @@ -341,7 +344,17 @@ func (s *Storage) MustStop() {

// AddRow adds the given log row into s.
func (s *Storage) AddRow(streamHash uint64, r *logstorage.InsertRow) {
idx := s.srt.getNodeIdx(streamHash)
// trace ID should always be put in the last field.
// but for better compatibility, we should search for the trace_id in reverse order,
// instead of using the last one in the `r.Fields` slice directly.
var traceID string
for i := len(r.Fields) - 1; i >= 0; i-- {
if r.Fields[i].Name == otelpb.TraceIDField {
traceID = r.Fields[i].Value
break
}
}
idx := s.srt.getNodeIdx(streamHash, traceID)
sn := s.sns[idx]
sn.addRow(r)
}
Expand Down Expand Up @@ -378,12 +391,26 @@ func newStreamRowsTracker(nodesCount int) *streamRowsTracker {
}
}

func (srt *streamRowsTracker) getNodeIdx(streamHash uint64) uint64 {
// getNodeIdx return the node index for a specific traceID
func (srt *streamRowsTracker) getNodeIdx(streamHash uint64, traceID string) uint64 {
if srt.nodesCount == 1 {
// Fast path for a single node.
return 0
}

// common path: distribute data by trace ID.
if traceID != "" {
// When the node count is changed after a restart, the spans of a trace might be
// distributed across different nodes. Therefore, there's no guarantee that a trace query
// can find all the spans of a trace in ONE vtstorage instance.
//
// This could potentially affect the service graph, which aggregates data within each vtstorage instance.
// However, since only a small number of traces are affected, the overall trend will remain consistent.
return xxhash.Sum64String(traceID) % uint64(srt.nodesCount)
}

// for backward compatible, keep the original random distribution logic.
// only data without trace ID will go to the following path.
srt.mu.Lock()
defer srt.mu.Unlock()

Expand Down
3 changes: 2 additions & 1 deletion app/vtstorage/netinsert/netinsert_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package netinsert

import (
cryptorand "crypto/rand"
"fmt"
"math"
"math/rand"
Expand All @@ -26,7 +27,7 @@ func TestStreamRowsTracker(t *testing.T) {
for i := 0; i < rowsCount; i++ {
streamIdx := rng.Intn(streamsCount)
h := streamHashes[streamIdx]
nodeIdx := srt.getNodeIdx(h)
nodeIdx := srt.getNodeIdx(h, cryptorand.Text())
rowsPerNode[nodeIdx]++
}

Expand Down