Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions app/vtinsert/opentelemetry/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ func pushFieldsFromScopeSpans(ss *otelpb.ScopeSpans, commonFields []logstorage.F
func pushFieldsFromSpan(span *otelpb.Span, scopeCommonFields []logstorage.Field, lmp insertutil.LogMessageProcessor) []logstorage.Field {
fields := scopeCommonFields
fields = append(fields,
logstorage.Field{Name: otelpb.TraceIDField, Value: span.TraceID},
logstorage.Field{Name: otelpb.SpanIDField, Value: span.SpanID},
logstorage.Field{Name: otelpb.TraceStateField, Value: span.TraceState},
logstorage.Field{Name: otelpb.ParentSpanIDField, Value: span.ParentSpanID},
Expand Down Expand Up @@ -239,17 +238,22 @@ func pushFieldsFromSpan(span *otelpb.Span, scopeCommonFields []logstorage.Field,
// append link attributes
fields = appendKeyValuesWithPrefixSuffix(fields, link.Attributes, "", linkFieldPrefix+otelpb.LinkAttrPrefix, linkFieldSuffix)
}
fields = append(fields, logstorage.Field{
Name: "_msg",
Value: msgFieldValue,
})
fields = append(fields,
logstorage.Field{Name: "_msg", Value: msgFieldValue},
// MUST: always place TraceIDField at the last. The Trace ID is required for data distribution.
// Placing it at the last position helps netinsert to find it easily, without adding extra field to
// *logstorage.InsertRow structure, which is required due to the sync between logstorage and VictoriaTraces.
// todo: @jiekun the trace ID field MUST be the last field. add extra ways to secure it.
logstorage.Field{Name: otelpb.TraceIDField, Value: span.TraceID},
)
lmp.AddRow(int64(span.EndTimeUnixNano), fields, nil)

// create an entity in trace-id-idx stream, if this trace_id hasn't been seen before.
if !traceIDCache.Has([]byte(span.TraceID)) {
lmp.AddRow(int64(span.StartTimeUnixNano), []logstorage.Field{
{Name: otelpb.TraceIDIndexFieldName, Value: span.TraceID},
{Name: "_msg", Value: msgFieldValue},
// todo: @jiekun the trace ID field MUST be the last field. add extra ways to secure it.
{Name: otelpb.TraceIDIndexFieldName, Value: span.TraceID},
}, []logstorage.Field{{Name: otelpb.TraceIDIndexStreamName, Value: strconv.FormatUint(xxhash.Sum64String(span.TraceID)%otelpb.TraceIDIndexPartitionCount, 10)}})
traceIDCache.Set([]byte(span.TraceID), nil)
}
Expand Down
29 changes: 6 additions & 23 deletions app/vtstorage/netinsert/netinsert.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/metrics"
"github.com/cespare/xxhash/v2"
"github.com/valyala/fastrand"
)

Expand Down Expand Up @@ -341,7 +342,8 @@ func (s *Storage) MustStop() {

// AddRow adds the given log row into s.
func (s *Storage) AddRow(streamHash uint64, r *logstorage.InsertRow) {
idx := s.srt.getNodeIdx(streamHash)
// todo: @jiekun the trace ID field MUST be the last field. add extra ways to secure it.
idx := s.srt.getNodeIdx(streamHash, r.Fields[len(r.Fields)-1].Value)
sn := s.sns[idx]
sn.addRow(r)
}
Expand Down Expand Up @@ -378,31 +380,12 @@ func newStreamRowsTracker(nodesCount int) *streamRowsTracker {
}
}

func (srt *streamRowsTracker) getNodeIdx(streamHash uint64) uint64 {
// getNodeIdx return the node index for a specific traceID
func (srt *streamRowsTracker) getNodeIdx(_ uint64, traceID string) uint64 {
if srt.nodesCount == 1 {
// Fast path for a single node.
return 0
}

srt.mu.Lock()
defer srt.mu.Unlock()

streamRows := srt.rowsPerStream[streamHash] + 1
srt.rowsPerStream[streamHash] = streamRows

if streamRows <= 1000 {
// Write the initial rows for the stream to a single storage node for better locality.
// This should work great for log streams containing small number of logs, since will be distributed
// evenly among available storage nodes because they have different streamHash.
return streamHash % uint64(srt.nodesCount)
}

// The log stream contains more than 1000 rows. Distribute them among storage nodes at random
// in order to improve query performance over this stream (the data for the log stream
// can be processed in parallel on all the storage nodes).
//
// The random distribution is preferred over round-robin distribution in order to avoid possible
// dependency between the order of the ingested logs and the number of storage nodes,
// which may lead to non-uniform distribution of logs among storage nodes.
return uint64(fastrand.Uint32n(uint32(srt.nodesCount)))
return xxhash.Sum64String(traceID) % uint64(srt.nodesCount)
}
3 changes: 2 additions & 1 deletion app/vtstorage/netinsert/netinsert_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package netinsert

import (
cryptorand "crypto/rand"
"fmt"
"math"
"math/rand"
Expand All @@ -26,7 +27,7 @@ func TestStreamRowsTracker(t *testing.T) {
for i := 0; i < rowsCount; i++ {
streamIdx := rng.Intn(streamsCount)
h := streamHashes[streamIdx]
nodeIdx := srt.getNodeIdx(h)
nodeIdx := srt.getNodeIdx(h, cryptorand.Text())
rowsPerNode[nodeIdx]++
}

Expand Down