From 718e59634a7f888fe4cddaa9fb66ce9d0ae30b75 Mon Sep 17 00:00:00 2001 From: Jiekun Date: Sun, 5 Oct 2025 11:54:14 +0800 Subject: [PATCH 1/6] feature: [traceID distribution] Change the data distribution by traceID --- app/vtinsert/opentelemetry/opentelemetry.go | 16 ++++++---- app/vtstorage/netinsert/netinsert.go | 33 +++++++-------------- app/vtstorage/netinsert/netinsert_test.go | 3 +- 3 files changed, 22 insertions(+), 30 deletions(-) diff --git a/app/vtinsert/opentelemetry/opentelemetry.go b/app/vtinsert/opentelemetry/opentelemetry.go index c6fc45233f..ce302db40b 100644 --- a/app/vtinsert/opentelemetry/opentelemetry.go +++ b/app/vtinsert/opentelemetry/opentelemetry.go @@ -191,7 +191,6 @@ func pushFieldsFromScopeSpans(ss *otelpb.ScopeSpans, commonFields []logstorage.F func pushFieldsFromSpan(span *otelpb.Span, scopeCommonFields []logstorage.Field, lmp insertutil.LogMessageProcessor) []logstorage.Field { fields := scopeCommonFields fields = append(fields, - logstorage.Field{Name: otelpb.TraceIDField, Value: span.TraceID}, logstorage.Field{Name: otelpb.SpanIDField, Value: span.SpanID}, logstorage.Field{Name: otelpb.TraceStateField, Value: span.TraceState}, logstorage.Field{Name: otelpb.ParentSpanIDField, Value: span.ParentSpanID}, @@ -239,17 +238,22 @@ func pushFieldsFromSpan(span *otelpb.Span, scopeCommonFields []logstorage.Field, // append link attributes fields = appendKeyValuesWithPrefixSuffix(fields, link.Attributes, "", linkFieldPrefix+otelpb.LinkAttrPrefix, linkFieldSuffix) } - fields = append(fields, logstorage.Field{ - Name: "_msg", - Value: msgFieldValue, - }) + fields = append(fields, + logstorage.Field{Name: "_msg", Value: msgFieldValue}, + // MUST: always place TraceIDField at the last. The Trace ID is required for data distribution. + // Placing it at the last position helps netinsert to find it easily, without adding extra field to + // *logstorage.InsertRow structure, which is required due to the sync between logstorage and VictoriaTraces. + // todo: @jiekun the trace ID field MUST be the last field. add extra ways to secure it. + logstorage.Field{Name: otelpb.TraceIDField, Value: span.TraceID}, + ) lmp.AddRow(int64(span.EndTimeUnixNano), fields, nil) // create an entity in trace-id-idx stream, if this trace_id hasn't been seen before. if !traceIDCache.Has([]byte(span.TraceID)) { lmp.AddRow(int64(span.StartTimeUnixNano), []logstorage.Field{ - {Name: otelpb.TraceIDIndexFieldName, Value: span.TraceID}, {Name: "_msg", Value: msgFieldValue}, + // todo: @jiekun the trace ID field MUST be the last field. add extra ways to secure it. + {Name: otelpb.TraceIDIndexFieldName, Value: span.TraceID}, }, []logstorage.Field{{Name: otelpb.TraceIDIndexStreamName, Value: strconv.FormatUint(xxhash.Sum64String(span.TraceID)%otelpb.TraceIDIndexPartitionCount, 10)}}) traceIDCache.Set([]byte(span.TraceID), nil) } diff --git a/app/vtstorage/netinsert/netinsert.go b/app/vtstorage/netinsert/netinsert.go index 06c3b41463..8f78490e9d 100644 --- a/app/vtstorage/netinsert/netinsert.go +++ b/app/vtstorage/netinsert/netinsert.go @@ -19,7 +19,9 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" + "github.com/VictoriaMetrics/fastcache" "github.com/VictoriaMetrics/metrics" + "github.com/cespare/xxhash/v2" "github.com/valyala/fastrand" ) @@ -341,7 +343,8 @@ func (s *Storage) MustStop() { // AddRow adds the given log row into s. func (s *Storage) AddRow(streamHash uint64, r *logstorage.InsertRow) { - idx := s.srt.getNodeIdx(streamHash) + // todo: @jiekun the trace ID field MUST be the last field. add extra ways to secure it. + idx := s.srt.getNodeIdx(streamHash, r.Fields[len(r.Fields)-1].Value) sn := s.sns[idx] sn.addRow(r) } @@ -378,31 +381,15 @@ func newStreamRowsTracker(nodesCount int) *streamRowsTracker { } } -func (srt *streamRowsTracker) getNodeIdx(streamHash uint64) uint64 { +var ( + traceIDnodeIdxCache = fastcache.New(4 * 1024 * 1024) +) + +func (srt *streamRowsTracker) getNodeIdx(_ uint64, traceID string) uint64 { if srt.nodesCount == 1 { // Fast path for a single node. return 0 } - srt.mu.Lock() - defer srt.mu.Unlock() - - streamRows := srt.rowsPerStream[streamHash] + 1 - srt.rowsPerStream[streamHash] = streamRows - - if streamRows <= 1000 { - // Write the initial rows for the stream to a single storage node for better locality. - // This should work great for log streams containing small number of logs, since will be distributed - // evenly among available storage nodes because they have different streamHash. - return streamHash % uint64(srt.nodesCount) - } - - // The log stream contains more than 1000 rows. Distribute them among storage nodes at random - // in order to improve query performance over this stream (the data for the log stream - // can be processed in parallel on all the storage nodes). - // - // The random distribution is preferred over round-robin distribution in order to avoid possible - // dependency between the order of the ingested logs and the number of storage nodes, - // which may lead to non-uniform distribution of logs among storage nodes. - return uint64(fastrand.Uint32n(uint32(srt.nodesCount))) + return xxhash.Sum64String(traceID) % uint64(srt.nodesCount) } diff --git a/app/vtstorage/netinsert/netinsert_test.go b/app/vtstorage/netinsert/netinsert_test.go index 957156137a..3d4a9fea89 100644 --- a/app/vtstorage/netinsert/netinsert_test.go +++ b/app/vtstorage/netinsert/netinsert_test.go @@ -1,6 +1,7 @@ package netinsert import ( + cryptorand "crypto/rand" "fmt" "math" "math/rand" @@ -26,7 +27,7 @@ func TestStreamRowsTracker(t *testing.T) { for i := 0; i < rowsCount; i++ { streamIdx := rng.Intn(streamsCount) h := streamHashes[streamIdx] - nodeIdx := srt.getNodeIdx(h) + nodeIdx := srt.getNodeIdx(h, cryptorand.Text()) rowsPerNode[nodeIdx]++ } From 40ea3d6ba74cf0e80830c344014097725ad1b75d Mon Sep 17 00:00:00 2001 From: Jiekun Date: Mon, 6 Oct 2025 16:10:08 +0800 Subject: [PATCH 2/6] feature: [traceID distribution] make linter happy --- app/vtstorage/netinsert/netinsert.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/app/vtstorage/netinsert/netinsert.go b/app/vtstorage/netinsert/netinsert.go index 8f78490e9d..d0f8f39733 100644 --- a/app/vtstorage/netinsert/netinsert.go +++ b/app/vtstorage/netinsert/netinsert.go @@ -19,7 +19,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" - "github.com/VictoriaMetrics/fastcache" "github.com/VictoriaMetrics/metrics" "github.com/cespare/xxhash/v2" "github.com/valyala/fastrand" @@ -381,10 +380,7 @@ func newStreamRowsTracker(nodesCount int) *streamRowsTracker { } } -var ( - traceIDnodeIdxCache = fastcache.New(4 * 1024 * 1024) -) - +// getNodeIdx return the node index for a specific traceID func (srt *streamRowsTracker) getNodeIdx(_ uint64, traceID string) uint64 { if srt.nodesCount == 1 { // Fast path for a single node. From e28ad5f439b4777582d59b04c5979c6e69861845 Mon Sep 17 00:00:00 2001 From: Jiekun Date: Tue, 7 Oct 2025 16:08:50 +0800 Subject: [PATCH 3/6] feature: [traceID distribution] make AddRow function working when trace_id field is not exist --- app/vtstorage/netinsert/netinsert.go | 46 +++++++++++++++++++++++++--- 1 file changed, 42 insertions(+), 4 deletions(-) diff --git a/app/vtstorage/netinsert/netinsert.go b/app/vtstorage/netinsert/netinsert.go index d0f8f39733..2f2dd9e797 100644 --- a/app/vtstorage/netinsert/netinsert.go +++ b/app/vtstorage/netinsert/netinsert.go @@ -22,6 +22,8 @@ import ( "github.com/VictoriaMetrics/metrics" "github.com/cespare/xxhash/v2" "github.com/valyala/fastrand" + + otelpb "github.com/VictoriaMetrics/VictoriaTraces/lib/protoparser/opentelemetry/pb" ) // the maximum size of a single data block sent to storage node. @@ -342,8 +344,17 @@ func (s *Storage) MustStop() { // AddRow adds the given log row into s. func (s *Storage) AddRow(streamHash uint64, r *logstorage.InsertRow) { - // todo: @jiekun the trace ID field MUST be the last field. add extra ways to secure it. - idx := s.srt.getNodeIdx(streamHash, r.Fields[len(r.Fields)-1].Value) + // We put the trace ID in the last field. And there is a unit test to secure it. + // But for better compatibility, we will search for the trace_id in reverse order, + // instead of directly using the last one in the `r.Fields` slice. + var traceID string + for i := len(r.Fields) - 1; i >= 0; i-- { + if r.Fields[i].Name == otelpb.TraceIDField { + traceID = r.Fields[i].Value + break + } + } + idx := s.srt.getNodeIdx(streamHash, traceID) sn := s.sns[idx] sn.addRow(r) } @@ -381,11 +392,38 @@ func newStreamRowsTracker(nodesCount int) *streamRowsTracker { } // getNodeIdx return the node index for a specific traceID -func (srt *streamRowsTracker) getNodeIdx(_ uint64, traceID string) uint64 { +func (srt *streamRowsTracker) getNodeIdx(streamHash uint64, traceID string) uint64 { if srt.nodesCount == 1 { // Fast path for a single node. return 0 } - return xxhash.Sum64String(traceID) % uint64(srt.nodesCount) + // common path: distribute data by trace ID. + if traceID != "" { + return xxhash.Sum64String(traceID) % uint64(srt.nodesCount) + } + + // for backward compatible, keep the original random distribution logic. + // only data without trace ID will go to the following path. + srt.mu.Lock() + defer srt.mu.Unlock() + + streamRows := srt.rowsPerStream[streamHash] + 1 + srt.rowsPerStream[streamHash] = streamRows + + if streamRows <= 1000 { + // Write the initial rows for the stream to a single storage node for better locality. + // This should work great for log streams containing small number of logs, since will be distributed + // evenly among available storage nodes because they have different streamHash. + return streamHash % uint64(srt.nodesCount) + } + + // The log stream contains more than 1000 rows. Distribute them among storage nodes at random + // in order to improve query performance over this stream (the data for the log stream + // can be processed in parallel on all the storage nodes). + // + // The random distribution is preferred over round-robin distribution in order to avoid possible + // dependency between the order of the ingested logs and the number of storage nodes, + // which may lead to non-uniform distribution of logs among storage nodes. + return uint64(fastrand.Uint32n(uint32(srt.nodesCount))) } From 34a70e913b7dd858f5cc6a898a71544222486fae Mon Sep 17 00:00:00 2001 From: Jiekun Date: Tue, 7 Oct 2025 16:12:22 +0800 Subject: [PATCH 4/6] feature: [traceID distribution] fix wording --- app/vtstorage/netinsert/netinsert.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/app/vtstorage/netinsert/netinsert.go b/app/vtstorage/netinsert/netinsert.go index 2f2dd9e797..ed3c194bf1 100644 --- a/app/vtstorage/netinsert/netinsert.go +++ b/app/vtstorage/netinsert/netinsert.go @@ -344,9 +344,9 @@ func (s *Storage) MustStop() { // AddRow adds the given log row into s. func (s *Storage) AddRow(streamHash uint64, r *logstorage.InsertRow) { - // We put the trace ID in the last field. And there is a unit test to secure it. - // But for better compatibility, we will search for the trace_id in reverse order, - // instead of directly using the last one in the `r.Fields` slice. + // we put the trace ID in the last field. And there is a unit test to secure it. + // but for better compatibility, we will search for the trace_id in reverse order, + // instead of using the last one in the `r.Fields` slice directly. var traceID string for i := len(r.Fields) - 1; i >= 0; i-- { if r.Fields[i].Name == otelpb.TraceIDField { From 8dade4b9519b10f41c54a666832e8019a76ba47f Mon Sep 17 00:00:00 2001 From: Jiekun Date: Mon, 13 Oct 2025 12:54:04 +0800 Subject: [PATCH 5/6] chore: update comments for this feature --- app/vtstorage/netinsert/netinsert.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/app/vtstorage/netinsert/netinsert.go b/app/vtstorage/netinsert/netinsert.go index ed3c194bf1..a179aa8ea8 100644 --- a/app/vtstorage/netinsert/netinsert.go +++ b/app/vtstorage/netinsert/netinsert.go @@ -344,8 +344,8 @@ func (s *Storage) MustStop() { // AddRow adds the given log row into s. func (s *Storage) AddRow(streamHash uint64, r *logstorage.InsertRow) { - // we put the trace ID in the last field. And there is a unit test to secure it. - // but for better compatibility, we will search for the trace_id in reverse order, + // trace ID should always be put in the last field. + // but for better compatibility, we should search for the trace_id in reverse order, // instead of using the last one in the `r.Fields` slice directly. var traceID string for i := len(r.Fields) - 1; i >= 0; i-- { @@ -400,6 +400,12 @@ func (srt *streamRowsTracker) getNodeIdx(streamHash uint64, traceID string) uint // common path: distribute data by trace ID. if traceID != "" { + // When the node count is changed after a restart, the spans of a trace might be + // distributed across different nodes. Therefore, there's no guarantee that a trace query + // can find all the spans of a trace in ONE vtstorage instance. + // + // This could potentially affect the service graph, which aggregates data within each vtstorage instance. + // However, since only a small number of traces are affected, the overall trend will remain consistent. return xxhash.Sum64String(traceID) % uint64(srt.nodesCount) } From bf39dc35c28f08b959299c4a61fb17d4defedbaa Mon Sep 17 00:00:00 2001 From: Jiekun Date: Mon, 13 Oct 2025 14:35:07 +0800 Subject: [PATCH 6/6] chore: make linter happ --- app/vtstorage/netinsert/netinsert.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/vtstorage/netinsert/netinsert.go b/app/vtstorage/netinsert/netinsert.go index a179aa8ea8..188603ed3b 100644 --- a/app/vtstorage/netinsert/netinsert.go +++ b/app/vtstorage/netinsert/netinsert.go @@ -344,7 +344,7 @@ func (s *Storage) MustStop() { // AddRow adds the given log row into s. func (s *Storage) AddRow(streamHash uint64, r *logstorage.InsertRow) { - // trace ID should always be put in the last field. + // trace ID should always be put in the last field. // but for better compatibility, we should search for the trace_id in reverse order, // instead of using the last one in the `r.Fields` slice directly. var traceID string