Skip to content

Commit 003eb7d

Browse files
authored
netstorage: [traceID distribution] Change the data distribution by traceID (#65)
### Describe Your Changes For cluster mode, distribute data by `trace ID` instead of `full random`. This feature will make the spans of the same trace go to the same vtstorage instance. ``` goos: darwin goarch: arm64 pkg: github.com/VictoriaMetrics/VictoriaTraces/app/vtstorage/netinsert cpu: Apple M1 Pro BenchmarkGetNodeIdx BenchmarkGetNodeIdx-8 12863986 93.03 ns/op 0 B/op 0 allocs/op BenchmarkGetNodeIdxBak BenchmarkGetNodeIdxBak-8 11386562 106.3 ns/op 0 B/op 0 allocs/op ``` A simple benchmark with 15k ingestion rate to each vtinsert: <img width="3024" height="1764" alt="image" src="https://github.com/user-attachments/assets/4e518c01-c4ce-4fbd-8911-501da811c863" /> ### Checklist The following checks are **mandatory**: - [x] My change adheres to [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/victoriametrics/contributing/#pull-request-checklist). - [x] My change adheres to [VictoriaMetrics development goals](https://docs.victoriametrics.com/victoriametrics/goals/).
1 parent 224abdf commit 003eb7d

File tree

3 files changed

+41
-9
lines changed

3 files changed

+41
-9
lines changed

app/vtinsert/opentelemetry/opentelemetry.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,6 @@ func pushFieldsFromScopeSpans(ss *otelpb.ScopeSpans, commonFields []logstorage.F
192192
func pushFieldsFromSpan(span *otelpb.Span, scopeCommonFields []logstorage.Field, lmp insertutil.LogMessageProcessor) []logstorage.Field {
193193
fields := scopeCommonFields
194194
fields = append(fields,
195-
logstorage.Field{Name: otelpb.TraceIDField, Value: span.TraceID},
196195
logstorage.Field{Name: otelpb.SpanIDField, Value: span.SpanID},
197196
logstorage.Field{Name: otelpb.TraceStateField, Value: span.TraceState},
198197
logstorage.Field{Name: otelpb.ParentSpanIDField, Value: span.ParentSpanID},
@@ -240,17 +239,22 @@ func pushFieldsFromSpan(span *otelpb.Span, scopeCommonFields []logstorage.Field,
240239
// append link attributes
241240
fields = appendKeyValuesWithPrefixSuffix(fields, link.Attributes, "", linkFieldPrefix+otelpb.LinkAttrPrefix, linkFieldSuffix)
242241
}
243-
fields = append(fields, logstorage.Field{
244-
Name: "_msg",
245-
Value: msgFieldValue,
246-
})
242+
fields = append(fields,
243+
logstorage.Field{Name: "_msg", Value: msgFieldValue},
244+
// MUST: always place TraceIDField at the last. The Trace ID is required for data distribution.
245+
// Placing it at the last position helps netinsert to find it easily, without adding extra field to
246+
// *logstorage.InsertRow structure, which is required due to the sync between logstorage and VictoriaTraces.
247+
// todo: @jiekun the trace ID field MUST be the last field. add extra ways to secure it.
248+
logstorage.Field{Name: otelpb.TraceIDField, Value: span.TraceID},
249+
)
247250
lmp.AddRow(int64(span.EndTimeUnixNano), fields, nil)
248251

249252
// create an entity in trace-id-idx stream, if this trace_id hasn't been seen before.
250253
if !traceIDCache.Has([]byte(span.TraceID)) {
251254
lmp.AddRow(int64(span.StartTimeUnixNano), []logstorage.Field{
252-
{Name: otelpb.TraceIDIndexFieldName, Value: span.TraceID},
253255
{Name: "_msg", Value: msgFieldValue},
256+
// todo: @jiekun the trace ID field MUST be the last field. add extra ways to secure it.
257+
{Name: otelpb.TraceIDIndexFieldName, Value: span.TraceID},
254258
}, []logstorage.Field{{Name: otelpb.TraceIDIndexStreamName, Value: strconv.FormatUint(xxhash.Sum64String(span.TraceID)%otelpb.TraceIDIndexPartitionCount, 10)}})
255259
traceIDCache.Set([]byte(span.TraceID), nil)
256260
}

app/vtstorage/netinsert/netinsert.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ import (
2020
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
2121
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
2222
"github.com/VictoriaMetrics/metrics"
23+
"github.com/cespare/xxhash/v2"
2324
"github.com/valyala/fastrand"
25+
26+
otelpb "github.com/VictoriaMetrics/VictoriaTraces/lib/protoparser/opentelemetry/pb"
2427
)
2528

2629
// the maximum size of a single data block sent to storage node.
@@ -341,7 +344,17 @@ func (s *Storage) MustStop() {
341344

342345
// AddRow adds the given log row into s.
343346
func (s *Storage) AddRow(streamHash uint64, r *logstorage.InsertRow) {
344-
idx := s.srt.getNodeIdx(streamHash)
347+
// trace ID should always be put in the last field.
348+
// but for better compatibility, we should search for the trace_id in reverse order,
349+
// instead of using the last one in the `r.Fields` slice directly.
350+
var traceID string
351+
for i := len(r.Fields) - 1; i >= 0; i-- {
352+
if r.Fields[i].Name == otelpb.TraceIDField {
353+
traceID = r.Fields[i].Value
354+
break
355+
}
356+
}
357+
idx := s.srt.getNodeIdx(streamHash, traceID)
345358
sn := s.sns[idx]
346359
sn.addRow(r)
347360
}
@@ -378,12 +391,26 @@ func newStreamRowsTracker(nodesCount int) *streamRowsTracker {
378391
}
379392
}
380393

381-
func (srt *streamRowsTracker) getNodeIdx(streamHash uint64) uint64 {
394+
// getNodeIdx return the node index for a specific traceID
395+
func (srt *streamRowsTracker) getNodeIdx(streamHash uint64, traceID string) uint64 {
382396
if srt.nodesCount == 1 {
383397
// Fast path for a single node.
384398
return 0
385399
}
386400

401+
// common path: distribute data by trace ID.
402+
if traceID != "" {
403+
// When the node count is changed after a restart, the spans of a trace might be
404+
// distributed across different nodes. Therefore, there's no guarantee that a trace query
405+
// can find all the spans of a trace in ONE vtstorage instance.
406+
//
407+
// This could potentially affect the service graph, which aggregates data within each vtstorage instance.
408+
// However, since only a small number of traces are affected, the overall trend will remain consistent.
409+
return xxhash.Sum64String(traceID) % uint64(srt.nodesCount)
410+
}
411+
412+
// for backward compatible, keep the original random distribution logic.
413+
// only data without trace ID will go to the following path.
387414
srt.mu.Lock()
388415
defer srt.mu.Unlock()
389416

app/vtstorage/netinsert/netinsert_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package netinsert
22

33
import (
4+
cryptorand "crypto/rand"
45
"fmt"
56
"math"
67
"math/rand"
@@ -26,7 +27,7 @@ func TestStreamRowsTracker(t *testing.T) {
2627
for i := 0; i < rowsCount; i++ {
2728
streamIdx := rng.Intn(streamsCount)
2829
h := streamHashes[streamIdx]
29-
nodeIdx := srt.getNodeIdx(h)
30+
nodeIdx := srt.getNodeIdx(h, cryptorand.Text())
3031
rowsPerNode[nodeIdx]++
3132
}
3233

0 commit comments

Comments
 (0)