From 71790b2e398a34b88d7f8c57be3090d896fd77c9 Mon Sep 17 00:00:00 2001 From: Ivan Kalita Date: Wed, 15 Oct 2025 14:20:26 +0200 Subject: [PATCH 1/7] chore(engine): Refactor streams result builder to use column-oriented processing Change streamsResultBuilder.CollectRecord() from row-by-row to column-by-column iteration. This better aligns with Arrow's columnar memory layout, improving CPU cache locality by reading contiguous memory regions. --- pkg/engine/compat.go | 201 ++++++++++++++++++++++++-------------- pkg/engine/compat_test.go | 24 ++++- 2 files changed, 148 insertions(+), 77 deletions(-) diff --git a/pkg/engine/compat.go b/pkg/engine/compat.go index 25e637227cadc..e537a5c6f52e6 100644 --- a/pkg/engine/compat.go +++ b/pkg/engine/compat.go @@ -45,107 +45,158 @@ type streamsResultBuilder struct { } func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) { - for row := range int(rec.NumRows()) { - stream, entry := b.collectRow(rec, row) - - // Ignore rows that don't have stream labels, log line, or timestamp - if stream.IsEmpty() || entry.Line == "" || entry.Timestamp.Equal(time.Time{}) { - continue - } - - // Add the entry to the result builder - key := stream.String() - idx, ok := b.streams[key] - if !ok { - idx = len(b.data) - b.streams[key] = idx - b.data = append(b.data, push.Stream{Labels: key}) - } - b.data[idx].Entries = append(b.data[idx].Entries, entry) - b.count++ + numRows := int(rec.NumRows()) + if numRows == 0 { + return } -} -func (b *streamsResultBuilder) collectRow(rec arrow.Record, i int) (labels.Labels, logproto.Entry) { - var entry logproto.Entry - lbs := labels.NewBuilder(labels.EmptyLabels()) - metadata := labels.NewBuilder(labels.EmptyLabels()) - parsed := labels.NewBuilder(labels.EmptyLabels()) + // let's say we have following log entries in rec: + // - {labelenv="prod-1", metadatatrace="123-1", parsed="v1"} ts1 line 1 + // - {labelenv="prod-2", metadatatrace="123-2", parsed="v2"} ts2 line 2 + // - {labelenv="prod-3", metadatatrace="123-3", parsed="v3"} ts3 line 3 + // we pre-initialize slices to store column values for all the rows, e.g.: + // rows | 1 | 2 | 3 | ... + // ==============+=========+=========+=========+==== + // timestamps | r1 ts | r2 ts | r3 ts | ... + // lines | r1 line | r2 line | r3 line | ... + // ... + // We iterate over the columns and convert the values to our format column by column, e.g., + // first all the timestamps, then all the log lines, etc. + // After all the values are collected and converted we transform the columnar representation to a row-based one. + + timestamps := make([]time.Time, numRows) + lines := make([]string, numRows) + lbsBuilders := make([]*labels.Builder, numRows) + metadataBuilders := make([]*labels.Builder, numRows) + parsedBuilders := make([]*labels.Builder, numRows) + + for rowIdx := range numRows { + lbsBuilders[rowIdx] = labels.NewBuilder(labels.EmptyLabels()) + metadataBuilders[rowIdx] = labels.NewBuilder(labels.EmptyLabels()) + parsedBuilders[rowIdx] = labels.NewBuilder(labels.EmptyLabels()) + } + // Convert arrow values to our format column by column for colIdx := range int(rec.NumCols()) { col := rec.Column(colIdx) - // Ignore column values that are NULL or invalid - if col.IsNull(i) || !col.IsValid(i) { - continue - } field := rec.Schema().Field(colIdx) ident, err := semconv.ParseFQN(field.Name) if err != nil { continue } - shortName := ident.ShortName() - // Extract line - if ident.Equal(semconv.ColumnIdentMessage) { - entry.Line = col.(*array.String).Value(i) - continue - } + switch true { - // Extract timestamp - if ident.Equal(semconv.ColumnIdentTimestamp) { - entry.Timestamp = time.Unix(0, int64(col.(*array.Timestamp).Value(i))) - continue - } + // Log line + case ident.Equal(semconv.ColumnIdentMessage): + lineCol, ok := col.(*array.String) + if !ok { + continue + } + forEachNotNullRowColValue(numRows, lineCol, func(rowIdx int) { + lines[rowIdx] = lineCol.Value(rowIdx) + }) - // Extract label - if ident.ColumnType() == types.ColumnTypeLabel { - switch arr := col.(type) { - case *array.String: - lbs.Set(shortName, arr.Value(i)) + // Timestamp + case ident.Equal(semconv.ColumnIdentTimestamp): + tsCol, ok := col.(*array.Timestamp) + if !ok { + continue } - continue - } + forEachNotNullRowColValue(numRows, tsCol, func(rowIdx int) { + timestamps[rowIdx] = time.Unix(0, int64(tsCol.Value(rowIdx))) + }) - // Extract metadata - if ident.ColumnType() == types.ColumnTypeMetadata { - switch arr := col.(type) { - case *array.String: - metadata.Set(shortName, arr.Value(i)) + // One of the label columns + case ident.ColumnType() == types.ColumnTypeLabel: + labelCol, ok := col.(*array.String) + if !ok { + continue + } + forEachNotNullRowColValue(numRows, labelCol, func(rowIdx int) { + lbsBuilders[rowIdx].Set(shortName, labelCol.Value(rowIdx)) + }) + + // One of the metadata columns + case ident.ColumnType() == types.ColumnTypeMetadata: + metadataCol, ok := col.(*array.String) + if !ok { + continue + } + forEachNotNullRowColValue(numRows, metadataCol, func(rowIdx int) { + val := metadataCol.Value(rowIdx) + metadataBuilders[rowIdx].Set(shortName, val) // include structured metadata in stream labels - lbs.Set(shortName, arr.Value(i)) + lbsBuilders[rowIdx].Set(shortName, val) + }) + + // One of the parsed columns + case ident.ColumnType() == types.ColumnTypeParsed: + parsedCol, ok := col.(*array.String) + if !ok { + continue } - continue - } - // Extract parsed - if ident.ColumnType() == types.ColumnTypeParsed { - switch arr := col.(type) { - case *array.String: - // TODO: keep errors if --strict is set - // These are reserved column names used to track parsing errors. We are dropping them until - // we add support for --strict parsing. - if shortName == types.ColumnNameError || shortName == types.ColumnNameErrorDetails { - continue - } + // TODO: keep errors if --strict is set + // These are reserved column names used to track parsing errors. We are dropping them until + // we add support for --strict parsing. + if shortName == types.ColumnNameError || shortName == types.ColumnNameErrorDetails { + continue + } - if parsed.Get(shortName) != "" { - continue + forEachNotNullRowColValue(numRows, parsedCol, func(rowIdx int) { + parsedVal := parsedCol.Value(rowIdx) + if parsedBuilders[rowIdx].Get(shortName) != "" { + return } - - parsed.Set(shortName, arr.Value(i)) - lbs.Set(shortName, arr.Value(i)) - if metadata.Get(shortName) != "" { - metadata.Del(shortName) + parsedBuilders[rowIdx].Set(shortName, parsedVal) + lbsBuilders[rowIdx].Set(shortName, parsedVal) + if metadataBuilders[rowIdx].Get(shortName) != "" { + metadataBuilders[rowIdx].Del(shortName) } - } + }) } } - entry.StructuredMetadata = logproto.FromLabelsToLabelAdapters(metadata.Labels()) - entry.Parsed = logproto.FromLabelsToLabelAdapters(parsed.Labels()) - return lbs.Labels(), entry + // Convert columnar representation to row-based one + for rowIdx := 0; rowIdx < numRows; rowIdx++ { + lbs := lbsBuilders[rowIdx].Labels() + ts := timestamps[rowIdx] + line := lines[rowIdx] + // Ignore rows that don't have stream labels, log line, or timestamp + if line == "" || ts.IsZero() || lbs.IsEmpty() { + continue + } + + entry := logproto.Entry{ + Timestamp: ts, + Line: line, + StructuredMetadata: logproto.FromLabelsToLabelAdapters(metadataBuilders[rowIdx].Labels()), + Parsed: logproto.FromLabelsToLabelAdapters(parsedBuilders[rowIdx].Labels()), + } + + // Add entry to appropriate stream + key := lbs.String() + idx, ok := b.streams[key] + if !ok { + idx = len(b.data) + b.streams[key] = idx + b.data = append(b.data, push.Stream{Labels: key}) + } + b.data[idx].Entries = append(b.data[idx].Entries, entry) + b.count++ + } +} + +func forEachNotNullRowColValue(numRows int, col arrow.Array, f func(rowIdx int)) { + for rowIdx := 0; rowIdx < numRows; rowIdx++ { + if col.IsNull(rowIdx) { + continue + } + f(rowIdx) + } } func (b *streamsResultBuilder) Build(s stats.Result, md *metadata.Context) logqlmodel.Result { diff --git a/pkg/engine/compat_test.go b/pkg/engine/compat_test.go index 31eb77b1d5766..9c5fc5881f420 100644 --- a/pkg/engine/compat_test.go +++ b/pkg/engine/compat_test.go @@ -91,6 +91,13 @@ func TestStreamsResultBuilder(t *testing.T) { nil, ) rows := arrowtest.Rows{ + { + colTs.FQN(): nil, + colMsg.FQN(): "log line 0 (must be skipped)", + colEnv.FQN(): "dev", + colNs.FQN(): "loki-dev-001", + colTid.FQN(): "860e403fcf754312", + }, { colTs.FQN(): time.Unix(0, 1620000000000000001).UTC(), colMsg.FQN(): "log line 1", @@ -126,6 +133,13 @@ func TestStreamsResultBuilder(t *testing.T) { colNs.FQN(): "loki-dev-002", colTid.FQN(): "0cf883f112ad239b", }, + { + colTs.FQN(): time.Unix(0, 1620000000000000006).UTC(), + colMsg.FQN(): "log line 6", + colEnv.FQN(): "dev", + colNs.FQN(): nil, + colTid.FQN(): "9de325g124ad230b", + }, } record := rows.Record(memory.DefaultAllocator, schema) @@ -137,11 +151,11 @@ func TestStreamsResultBuilder(t *testing.T) { err := collectResult(context.Background(), pipeline, builder) require.NoError(t, err) - require.Equal(t, 5, builder.Len()) + require.Equal(t, 6, builder.Len()) md, _ := metadata.NewContext(t.Context()) result := builder.Build(stats.Result{}, md) - require.Equal(t, 5, result.Data.(logqlmodel.Streams).Len()) + require.Equal(t, 6, result.Data.(logqlmodel.Streams).Len()) expected := logqlmodel.Streams{ push.Stream{ @@ -162,6 +176,12 @@ func TestStreamsResultBuilder(t *testing.T) { {Line: "log line 3", Timestamp: time.Unix(0, 1620000000000000003), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "61330481e1e59b18")), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})}, }, }, + push.Stream{ + Labels: labels.FromStrings("env", "dev", "traceID", "9de325g124ad230b").String(), + Entries: []logproto.Entry{ + {Line: "log line 6", Timestamp: time.Unix(0, 1620000000000000006), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "9de325g124ad230b")), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})}, + }, + }, push.Stream{ Labels: labels.FromStrings("env", "prod", "namespace", "loki-prod-001", "traceID", "40e50221e284b9d2").String(), Entries: []logproto.Entry{ From 50c823a31bf2c5089d26a815c43866b84388d55d Mon Sep 17 00:00:00 2001 From: Ivan Kalita Date: Thu, 16 Oct 2025 09:24:12 +0200 Subject: [PATCH 2/7] do not try cast --- pkg/engine/compat.go | 27 ++++++--------------------- 1 file changed, 6 insertions(+), 21 deletions(-) diff --git a/pkg/engine/compat.go b/pkg/engine/compat.go index e537a5c6f52e6..2e8346e12deb7 100644 --- a/pkg/engine/compat.go +++ b/pkg/engine/compat.go @@ -91,40 +91,28 @@ func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) { // Log line case ident.Equal(semconv.ColumnIdentMessage): - lineCol, ok := col.(*array.String) - if !ok { - continue - } + lineCol := col.(*array.String) forEachNotNullRowColValue(numRows, lineCol, func(rowIdx int) { lines[rowIdx] = lineCol.Value(rowIdx) }) // Timestamp case ident.Equal(semconv.ColumnIdentTimestamp): - tsCol, ok := col.(*array.Timestamp) - if !ok { - continue - } + tsCol := col.(*array.Timestamp) forEachNotNullRowColValue(numRows, tsCol, func(rowIdx int) { timestamps[rowIdx] = time.Unix(0, int64(tsCol.Value(rowIdx))) }) // One of the label columns case ident.ColumnType() == types.ColumnTypeLabel: - labelCol, ok := col.(*array.String) - if !ok { - continue - } + labelCol := col.(*array.String) forEachNotNullRowColValue(numRows, labelCol, func(rowIdx int) { lbsBuilders[rowIdx].Set(shortName, labelCol.Value(rowIdx)) }) // One of the metadata columns case ident.ColumnType() == types.ColumnTypeMetadata: - metadataCol, ok := col.(*array.String) - if !ok { - continue - } + metadataCol := col.(*array.String) forEachNotNullRowColValue(numRows, metadataCol, func(rowIdx int) { val := metadataCol.Value(rowIdx) metadataBuilders[rowIdx].Set(shortName, val) @@ -134,10 +122,7 @@ func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) { // One of the parsed columns case ident.ColumnType() == types.ColumnTypeParsed: - parsedCol, ok := col.(*array.String) - if !ok { - continue - } + parsedCol := col.(*array.String) // TODO: keep errors if --strict is set // These are reserved column names used to track parsing errors. We are dropping them until @@ -160,7 +145,7 @@ func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) { } } - // Convert columnar representation to row-based one + // Convert columnar representation to a row-based one for rowIdx := 0; rowIdx < numRows; rowIdx++ { lbs := lbsBuilders[rowIdx].Labels() ts := timestamps[rowIdx] From d4cc24d9df9151dc6defbaeea928f5a324f3cfee Mon Sep 17 00:00:00 2001 From: Ivan Kalita Date: Thu, 16 Oct 2025 11:37:09 +0200 Subject: [PATCH 3/7] Reuse buffers between CollectRecord calls --- pkg/engine/compat.go | 94 +++++++++++++----- pkg/engine/compat_bench_test.go | 171 ++++++++++++++++++++++++++++++++ 2 files changed, 238 insertions(+), 27 deletions(-) create mode 100644 pkg/engine/compat_bench_test.go diff --git a/pkg/engine/compat.go b/pkg/engine/compat.go index 2e8346e12deb7..554deb1514fcc 100644 --- a/pkg/engine/compat.go +++ b/pkg/engine/compat.go @@ -42,6 +42,56 @@ type streamsResultBuilder struct { streams map[string]int data logqlmodel.Streams count int + + // buffer of rows to be reused between calls to CollectRecord to reduce reallocations of slices and builders + rowsBuffer rowsBuffer +} + +type rowsBuffer struct { + len int + timestamps []time.Time + lines []string + lbsBuilders []*labels.Builder + metadataBuilders []*labels.Builder + parsedBuilders []*labels.Builder +} + +func (p *rowsBuffer) prepareFor(newLen int) { + if newLen <= p.len { + p.timestamps = p.timestamps[:newLen] + clear(p.timestamps) + + p.lines = p.lines[:newLen] + clear(p.lines) + + p.lbsBuilders = p.lbsBuilders[:newLen] + p.metadataBuilders = p.metadataBuilders[:newLen] + p.parsedBuilders = p.parsedBuilders[:newLen] + + for i := range newLen { + p.lbsBuilders[i].Reset(labels.EmptyLabels()) + p.metadataBuilders[i].Reset(labels.EmptyLabels()) + p.parsedBuilders[i].Reset(labels.EmptyLabels()) + } + + p.len = newLen + + return + } + + p.timestamps = make([]time.Time, newLen) + p.lines = make([]string, newLen) + p.lbsBuilders = make([]*labels.Builder, newLen) + p.metadataBuilders = make([]*labels.Builder, newLen) + p.parsedBuilders = make([]*labels.Builder, newLen) + + for i := range newLen { + p.lbsBuilders[i] = labels.NewBuilder(labels.EmptyLabels()) + p.metadataBuilders[i] = labels.NewBuilder(labels.EmptyLabels()) + p.parsedBuilders[i] = labels.NewBuilder(labels.EmptyLabels()) + } + + p.len = newLen } func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) { @@ -50,7 +100,7 @@ func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) { return } - // let's say we have following log entries in rec: + // let's say we have the following log entries in rec: // - {labelenv="prod-1", metadatatrace="123-1", parsed="v1"} ts1 line 1 // - {labelenv="prod-2", metadatatrace="123-2", parsed="v2"} ts2 line 2 // - {labelenv="prod-3", metadatatrace="123-3", parsed="v3"} ts3 line 3 @@ -64,17 +114,7 @@ func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) { // first all the timestamps, then all the log lines, etc. // After all the values are collected and converted we transform the columnar representation to a row-based one. - timestamps := make([]time.Time, numRows) - lines := make([]string, numRows) - lbsBuilders := make([]*labels.Builder, numRows) - metadataBuilders := make([]*labels.Builder, numRows) - parsedBuilders := make([]*labels.Builder, numRows) - - for rowIdx := range numRows { - lbsBuilders[rowIdx] = labels.NewBuilder(labels.EmptyLabels()) - metadataBuilders[rowIdx] = labels.NewBuilder(labels.EmptyLabels()) - parsedBuilders[rowIdx] = labels.NewBuilder(labels.EmptyLabels()) - } + b.rowsBuffer.prepareFor(numRows) // Convert arrow values to our format column by column for colIdx := range int(rec.NumCols()) { @@ -93,21 +133,21 @@ func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) { case ident.Equal(semconv.ColumnIdentMessage): lineCol := col.(*array.String) forEachNotNullRowColValue(numRows, lineCol, func(rowIdx int) { - lines[rowIdx] = lineCol.Value(rowIdx) + b.rowsBuffer.lines[rowIdx] = lineCol.Value(rowIdx) }) // Timestamp case ident.Equal(semconv.ColumnIdentTimestamp): tsCol := col.(*array.Timestamp) forEachNotNullRowColValue(numRows, tsCol, func(rowIdx int) { - timestamps[rowIdx] = time.Unix(0, int64(tsCol.Value(rowIdx))) + b.rowsBuffer.timestamps[rowIdx] = time.Unix(0, int64(tsCol.Value(rowIdx))) }) // One of the label columns case ident.ColumnType() == types.ColumnTypeLabel: labelCol := col.(*array.String) forEachNotNullRowColValue(numRows, labelCol, func(rowIdx int) { - lbsBuilders[rowIdx].Set(shortName, labelCol.Value(rowIdx)) + b.rowsBuffer.lbsBuilders[rowIdx].Set(shortName, labelCol.Value(rowIdx)) }) // One of the metadata columns @@ -115,9 +155,9 @@ func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) { metadataCol := col.(*array.String) forEachNotNullRowColValue(numRows, metadataCol, func(rowIdx int) { val := metadataCol.Value(rowIdx) - metadataBuilders[rowIdx].Set(shortName, val) + b.rowsBuffer.metadataBuilders[rowIdx].Set(shortName, val) // include structured metadata in stream labels - lbsBuilders[rowIdx].Set(shortName, val) + b.rowsBuffer.lbsBuilders[rowIdx].Set(shortName, val) }) // One of the parsed columns @@ -133,13 +173,13 @@ func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) { forEachNotNullRowColValue(numRows, parsedCol, func(rowIdx int) { parsedVal := parsedCol.Value(rowIdx) - if parsedBuilders[rowIdx].Get(shortName) != "" { + if b.rowsBuffer.parsedBuilders[rowIdx].Get(shortName) != "" { return } - parsedBuilders[rowIdx].Set(shortName, parsedVal) - lbsBuilders[rowIdx].Set(shortName, parsedVal) - if metadataBuilders[rowIdx].Get(shortName) != "" { - metadataBuilders[rowIdx].Del(shortName) + b.rowsBuffer.parsedBuilders[rowIdx].Set(shortName, parsedVal) + b.rowsBuffer.lbsBuilders[rowIdx].Set(shortName, parsedVal) + if b.rowsBuffer.metadataBuilders[rowIdx].Get(shortName) != "" { + b.rowsBuffer.metadataBuilders[rowIdx].Del(shortName) } }) } @@ -147,9 +187,9 @@ func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) { // Convert columnar representation to a row-based one for rowIdx := 0; rowIdx < numRows; rowIdx++ { - lbs := lbsBuilders[rowIdx].Labels() - ts := timestamps[rowIdx] - line := lines[rowIdx] + lbs := b.rowsBuffer.lbsBuilders[rowIdx].Labels() + ts := b.rowsBuffer.timestamps[rowIdx] + line := b.rowsBuffer.lines[rowIdx] // Ignore rows that don't have stream labels, log line, or timestamp if line == "" || ts.IsZero() || lbs.IsEmpty() { continue @@ -158,8 +198,8 @@ func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) { entry := logproto.Entry{ Timestamp: ts, Line: line, - StructuredMetadata: logproto.FromLabelsToLabelAdapters(metadataBuilders[rowIdx].Labels()), - Parsed: logproto.FromLabelsToLabelAdapters(parsedBuilders[rowIdx].Labels()), + StructuredMetadata: logproto.FromLabelsToLabelAdapters(b.rowsBuffer.metadataBuilders[rowIdx].Labels()), + Parsed: logproto.FromLabelsToLabelAdapters(b.rowsBuffer.parsedBuilders[rowIdx].Labels()), } // Add entry to appropriate stream diff --git a/pkg/engine/compat_bench_test.go b/pkg/engine/compat_bench_test.go new file mode 100644 index 0000000000000..425d73ca357a0 --- /dev/null +++ b/pkg/engine/compat_bench_test.go @@ -0,0 +1,171 @@ +package engine + +import ( + "fmt" + "testing" + "time" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/memory" + + "github.com/grafana/loki/v3/pkg/engine/internal/semconv" + "github.com/grafana/loki/v3/pkg/engine/internal/types" + "github.com/grafana/loki/v3/pkg/util/arrowtest" +) + +func BenchmarkStreamsResultBuilder(b *testing.B) { + alloc := memory.NewGoAllocator() + + benchmarks := []struct { + name string + numRows int + numLabels int + numMeta int + numParsed int + }{ + { + name: "small_simple", + numRows: 10, + numLabels: 2, + numMeta: 1, + numParsed: 0, + }, + { + name: "medium_simple", + numRows: 100, + numLabels: 2, + numMeta: 1, + numParsed: 0, + }, + { + name: "large_simple", + numRows: 1000, + numLabels: 2, + numMeta: 1, + numParsed: 0, + }, + { + name: "xlarge_simple", + numRows: 10000, + numLabels: 2, + numMeta: 1, + numParsed: 0, + }, + { + name: "medium_many_labels", + numRows: 100, + numLabels: 10, + numMeta: 3, + numParsed: 0, + }, + { + name: "medium_with_parsed", + numRows: 100, + numLabels: 2, + numMeta: 2, + numParsed: 5, + }, + { + name: "large_complex", + numRows: 1000, + numLabels: 10, + numMeta: 5, + numParsed: 8, + }, + } + + for _, bm := range benchmarks { + b.Run(bm.name, func(b *testing.B) { + // Build schema + colTs := semconv.ColumnIdentTimestamp + colMsg := semconv.ColumnIdentMessage + fields := []arrow.Field{ + semconv.FieldFromIdent(colTs, false), + semconv.FieldFromIdent(colMsg, false), + } + + // Add label columns + labelIdents := make([]*semconv.Identifier, bm.numLabels) + for i := 0; i < bm.numLabels; i++ { + ident := semconv.NewIdentifier( + fmt.Sprintf("label_%d", i), + types.ColumnTypeLabel, + types.Loki.String, + ) + labelIdents[i] = ident + fields = append(fields, semconv.FieldFromIdent(ident, false)) + } + + // Add metadata columns + metaIdents := make([]*semconv.Identifier, bm.numMeta) + for i := 0; i < bm.numMeta; i++ { + ident := semconv.NewIdentifier( + fmt.Sprintf("meta_%d", i), + types.ColumnTypeMetadata, + types.Loki.String, + ) + metaIdents[i] = ident + fields = append(fields, semconv.FieldFromIdent(ident, false)) + } + + // Add parsed columns + parsedIdents := make([]*semconv.Identifier, bm.numParsed) + for i := 0; i < bm.numParsed; i++ { + ident := semconv.NewIdentifier( + fmt.Sprintf("parsed_%d", i), + types.ColumnTypeParsed, + types.Loki.String, + ) + parsedIdents[i] = ident + fields = append(fields, semconv.FieldFromIdent(ident, false)) + } + + schema := arrow.NewSchema(fields, nil) + + // Build rows + rows := make(arrowtest.Rows, bm.numRows) + baseTime := time.Unix(0, 1620000000000000000).UTC() + for rowIdx := 0; rowIdx < bm.numRows; rowIdx++ { + row := make(map[string]any) + row[colTs.FQN()] = baseTime.Add(time.Duration(rowIdx) * time.Nanosecond) + row[colMsg.FQN()] = fmt.Sprintf("log line %d with some additional text to make it more realistic", rowIdx) + + // Add label values + for labelIdx, ident := range labelIdents { + row[ident.FQN()] = fmt.Sprintf("label_%d_value_%d", labelIdx, rowIdx%10) + } + + // Add metadata values + for metaIdx, ident := range metaIdents { + row[ident.FQN()] = fmt.Sprintf("meta_%d_value_%d", metaIdx, rowIdx%5) + } + + // Add parsed values + for parsedIdx, ident := range parsedIdents { + row[ident.FQN()] = fmt.Sprintf("parsed_%d_value_%d", parsedIdx, rowIdx%3) + } + + rows[rowIdx] = row + } + + record := rows.Record(alloc, schema) + defer record.Release() + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + rb := newStreamsResultBuilder() + // Collect records twice on purpose to see how efficient CollectRecord is when the builder already has + // some data + rb.CollectRecord(record) + rb.CollectRecord(record) + + // Ensure the result is used to prevent compiler optimizations + if rb.Len() != bm.numRows*2 { + b.Fatalf("expected %d entries, got %d", bm.numRows*2, rb.Len()) + } + } + }) + } +} From 32353c440b588cd623f722fcac617b6d39acd553 Mon Sep 17 00:00:00 2001 From: Ivan Kalita Date: Thu, 16 Oct 2025 13:18:04 +0200 Subject: [PATCH 4/7] Update benchmark to test records of a different size --- pkg/engine/compat_bench_test.go | 238 ++++++++++++++++---------------- 1 file changed, 116 insertions(+), 122 deletions(-) diff --git a/pkg/engine/compat_bench_test.go b/pkg/engine/compat_bench_test.go index 425d73ca357a0..f8b8fcd34ca7e 100644 --- a/pkg/engine/compat_bench_test.go +++ b/pkg/engine/compat_bench_test.go @@ -17,139 +17,52 @@ func BenchmarkStreamsResultBuilder(b *testing.B) { alloc := memory.NewGoAllocator() benchmarks := []struct { - name string - numRows int - numLabels int - numMeta int - numParsed int + name string + numRowsFirstRecord int + numRowsSecondRecord int + numLabels int + numMeta int + numParsed int }{ { - name: "small_simple", - numRows: 10, - numLabels: 2, - numMeta: 1, - numParsed: 0, + name: "records_equal_size", + numRowsFirstRecord: 1000, + numRowsSecondRecord: 1000, + numLabels: 10, + numMeta: 5, + numParsed: 8, }, { - name: "medium_simple", - numRows: 100, - numLabels: 2, - numMeta: 1, - numParsed: 0, + name: "record_two_bigger", + numRowsFirstRecord: 1000, + numRowsSecondRecord: 2000, + numLabels: 10, + numMeta: 5, + numParsed: 8, }, { - name: "large_simple", - numRows: 1000, - numLabels: 2, - numMeta: 1, - numParsed: 0, - }, - { - name: "xlarge_simple", - numRows: 10000, - numLabels: 2, - numMeta: 1, - numParsed: 0, - }, - { - name: "medium_many_labels", - numRows: 100, - numLabels: 10, - numMeta: 3, - numParsed: 0, - }, - { - name: "medium_with_parsed", - numRows: 100, - numLabels: 2, - numMeta: 2, - numParsed: 5, - }, - { - name: "large_complex", - numRows: 1000, - numLabels: 10, - numMeta: 5, - numParsed: 8, + name: "record_two_smaller", + numRowsFirstRecord: 1000, + numRowsSecondRecord: 500, + numLabels: 10, + numMeta: 5, + numParsed: 8, }, } for _, bm := range benchmarks { b.Run(bm.name, func(b *testing.B) { - // Build schema - colTs := semconv.ColumnIdentTimestamp - colMsg := semconv.ColumnIdentMessage - fields := []arrow.Field{ - semconv.FieldFromIdent(colTs, false), - semconv.FieldFromIdent(colMsg, false), - } - // Add label columns - labelIdents := make([]*semconv.Identifier, bm.numLabels) - for i := 0; i < bm.numLabels; i++ { - ident := semconv.NewIdentifier( - fmt.Sprintf("label_%d", i), - types.ColumnTypeLabel, - types.Loki.String, - ) - labelIdents[i] = ident - fields = append(fields, semconv.FieldFromIdent(ident, false)) - } - - // Add metadata columns - metaIdents := make([]*semconv.Identifier, bm.numMeta) - for i := 0; i < bm.numMeta; i++ { - ident := semconv.NewIdentifier( - fmt.Sprintf("meta_%d", i), - types.ColumnTypeMetadata, - types.Loki.String, - ) - metaIdents[i] = ident - fields = append(fields, semconv.FieldFromIdent(ident, false)) - } - - // Add parsed columns - parsedIdents := make([]*semconv.Identifier, bm.numParsed) - for i := 0; i < bm.numParsed; i++ { - ident := semconv.NewIdentifier( - fmt.Sprintf("parsed_%d", i), - types.ColumnTypeParsed, - types.Loki.String, - ) - parsedIdents[i] = ident - fields = append(fields, semconv.FieldFromIdent(ident, false)) - } - - schema := arrow.NewSchema(fields, nil) - - // Build rows - rows := make(arrowtest.Rows, bm.numRows) + schema, labelIdents, metaIdents, parsedIdents := prepareSchema(bm.numLabels, bm.numMeta, bm.numParsed) baseTime := time.Unix(0, 1620000000000000000).UTC() - for rowIdx := 0; rowIdx < bm.numRows; rowIdx++ { - row := make(map[string]any) - row[colTs.FQN()] = baseTime.Add(time.Duration(rowIdx) * time.Nanosecond) - row[colMsg.FQN()] = fmt.Sprintf("log line %d with some additional text to make it more realistic", rowIdx) - - // Add label values - for labelIdx, ident := range labelIdents { - row[ident.FQN()] = fmt.Sprintf("label_%d_value_%d", labelIdx, rowIdx%10) - } - // Add metadata values - for metaIdx, ident := range metaIdents { - row[ident.FQN()] = fmt.Sprintf("meta_%d_value_%d", metaIdx, rowIdx%5) - } + rows1 := generateRows(labelIdents, metaIdents, parsedIdents, bm.numRowsFirstRecord, baseTime) + record1 := rows1.Record(alloc, schema) + defer record1.Release() - // Add parsed values - for parsedIdx, ident := range parsedIdents { - row[ident.FQN()] = fmt.Sprintf("parsed_%d_value_%d", parsedIdx, rowIdx%3) - } - - rows[rowIdx] = row - } - - record := rows.Record(alloc, schema) - defer record.Release() + rows2 := generateRows(labelIdents, metaIdents, parsedIdents, bm.numRowsSecondRecord, baseTime) + record2 := rows2.Record(alloc, schema) + defer record2.Release() b.ResetTimer() b.ReportAllocs() @@ -158,14 +71,95 @@ func BenchmarkStreamsResultBuilder(b *testing.B) { rb := newStreamsResultBuilder() // Collect records twice on purpose to see how efficient CollectRecord is when the builder already has // some data - rb.CollectRecord(record) - rb.CollectRecord(record) + rb.CollectRecord(record1) + rb.CollectRecord(record2) // Ensure the result is used to prevent compiler optimizations - if rb.Len() != bm.numRows*2 { - b.Fatalf("expected %d entries, got %d", bm.numRows*2, rb.Len()) + if rb.Len() != bm.numRowsFirstRecord+bm.numRowsSecondRecord { + b.Fatalf("expected %d entries, got %d", bm.numRowsFirstRecord+bm.numRowsSecondRecord, rb.Len()) } } }) } } + +func prepareSchema(numLabels int, numMeta int, numParsed int) (*arrow.Schema, []*semconv.Identifier, []*semconv.Identifier, []*semconv.Identifier) { + // Build schema + colTs := semconv.ColumnIdentTimestamp + colMsg := semconv.ColumnIdentMessage + fields := []arrow.Field{ + semconv.FieldFromIdent(colTs, false), + semconv.FieldFromIdent(colMsg, false), + } + + // Add label columns + labelIdents := make([]*semconv.Identifier, numLabels) + for i := 0; i < numLabels; i++ { + ident := semconv.NewIdentifier( + fmt.Sprintf("label_%d", i), + types.ColumnTypeLabel, + types.Loki.String, + ) + labelIdents[i] = ident + fields = append(fields, semconv.FieldFromIdent(ident, false)) + } + + // Add metadata columns + metaIdents := make([]*semconv.Identifier, numMeta) + for i := 0; i < numMeta; i++ { + ident := semconv.NewIdentifier( + fmt.Sprintf("meta_%d", i), + types.ColumnTypeMetadata, + types.Loki.String, + ) + metaIdents[i] = ident + fields = append(fields, semconv.FieldFromIdent(ident, false)) + } + + // Add parsed columns + parsedIdents := make([]*semconv.Identifier, numParsed) + for i := 0; i < numParsed; i++ { + ident := semconv.NewIdentifier( + fmt.Sprintf("parsed_%d", i), + types.ColumnTypeParsed, + types.Loki.String, + ) + parsedIdents[i] = ident + fields = append(fields, semconv.FieldFromIdent(ident, false)) + } + + return arrow.NewSchema(fields, nil), labelIdents, metaIdents, parsedIdents +} + +func generateRows( + labelIdents []*semconv.Identifier, + metaIdents []*semconv.Identifier, + parsedIdents []*semconv.Identifier, + numRows int, + baseTime time.Time, +) arrowtest.Rows { + rows := make(arrowtest.Rows, numRows) + for rowIdx := 0; rowIdx < numRows; rowIdx++ { + row := make(map[string]any) + row[semconv.ColumnIdentTimestamp.FQN()] = baseTime.Add(time.Duration(rowIdx) * time.Nanosecond) + row[semconv.ColumnIdentMessage.FQN()] = fmt.Sprintf("log line %d with some additional text to make it more realistic", rowIdx) + + // Add label values + for labelIdx, ident := range labelIdents { + row[ident.FQN()] = fmt.Sprintf("label_%d_value_%d", labelIdx, rowIdx%10) + } + + // Add metadata values + for metaIdx, ident := range metaIdents { + row[ident.FQN()] = fmt.Sprintf("meta_%d_value_%d", metaIdx, rowIdx%5) + } + + // Add parsed values + for parsedIdx, ident := range parsedIdents { + row[ident.FQN()] = fmt.Sprintf("parsed_%d_value_%d", parsedIdx, rowIdx%3) + } + + rows[rowIdx] = row + } + return rows +} From 60fe15a6548536a58652cea01e2fee3a2cb570c2 Mon Sep 17 00:00:00 2001 From: Ivan Kalita Date: Thu, 16 Oct 2025 17:28:21 +0200 Subject: [PATCH 5/7] Reset the rows, fix memory leaks, reduce reallocations --- pkg/engine/compat.go | 73 ++++++++++++++++++++++++++------------------ 1 file changed, 43 insertions(+), 30 deletions(-) diff --git a/pkg/engine/compat.go b/pkg/engine/compat.go index 554deb1514fcc..6e986858a9ad1 100644 --- a/pkg/engine/compat.go +++ b/pkg/engine/compat.go @@ -33,8 +33,9 @@ var ( func newStreamsResultBuilder() *streamsResultBuilder { return &streamsResultBuilder{ - data: make(logqlmodel.Streams, 0), - streams: make(map[string]int), + data: make(logqlmodel.Streams, 0), + streams: make(map[string]int), + rowsBuffer: &rowsBuffer{}, } } @@ -44,7 +45,7 @@ type streamsResultBuilder struct { count int // buffer of rows to be reused between calls to CollectRecord to reduce reallocations of slices and builders - rowsBuffer rowsBuffer + rowsBuffer *rowsBuffer } type rowsBuffer struct { @@ -56,42 +57,52 @@ type rowsBuffer struct { parsedBuilders []*labels.Builder } -func (p *rowsBuffer) prepareFor(newLen int) { - if newLen <= p.len { - p.timestamps = p.timestamps[:newLen] - clear(p.timestamps) - - p.lines = p.lines[:newLen] - clear(p.lines) +func (buf *rowsBuffer) prepareFor(newLen int) { + if newLen == buf.len { + return + } - p.lbsBuilders = p.lbsBuilders[:newLen] - p.metadataBuilders = p.metadataBuilders[:newLen] - p.parsedBuilders = p.parsedBuilders[:newLen] + if newLen < buf.len { + // free not used items at the end of the slices so they can be GC-ed + clear(buf.timestamps[newLen:buf.len]) + clear(buf.lines[newLen:buf.len]) + clear(buf.lbsBuilders[newLen:buf.len]) + clear(buf.metadataBuilders[newLen:buf.len]) + clear(buf.parsedBuilders[newLen:buf.len]) - for i := range newLen { - p.lbsBuilders[i].Reset(labels.EmptyLabels()) - p.metadataBuilders[i].Reset(labels.EmptyLabels()) - p.parsedBuilders[i].Reset(labels.EmptyLabels()) - } + // shrink to the new length, no need to zero the items as it was done before via resetRow(i) + buf.timestamps = buf.timestamps[:newLen] + buf.lines = buf.lines[:newLen] + buf.lbsBuilders = buf.lbsBuilders[:newLen] + buf.metadataBuilders = buf.metadataBuilders[:newLen] + buf.parsedBuilders = buf.parsedBuilders[:newLen] - p.len = newLen + buf.len = newLen return } - p.timestamps = make([]time.Time, newLen) - p.lines = make([]string, newLen) - p.lbsBuilders = make([]*labels.Builder, newLen) - p.metadataBuilders = make([]*labels.Builder, newLen) - p.parsedBuilders = make([]*labels.Builder, newLen) - - for i := range newLen { - p.lbsBuilders[i] = labels.NewBuilder(labels.EmptyLabels()) - p.metadataBuilders[i] = labels.NewBuilder(labels.EmptyLabels()) - p.parsedBuilders[i] = labels.NewBuilder(labels.EmptyLabels()) + // newLen > buf.len + numRowsToAdd := newLen - buf.len + buf.timestamps = append(buf.timestamps, make([]time.Time, numRowsToAdd)...) + buf.lines = append(buf.lines, make([]string, numRowsToAdd)...) + buf.lbsBuilders = append(buf.lbsBuilders, make([]*labels.Builder, numRowsToAdd)...) + buf.metadataBuilders = append(buf.metadataBuilders, make([]*labels.Builder, numRowsToAdd)...) + buf.parsedBuilders = append(buf.parsedBuilders, make([]*labels.Builder, numRowsToAdd)...) + for i := buf.len; i < newLen; i++ { + buf.lbsBuilders[i] = labels.NewBuilder(labels.EmptyLabels()) + buf.metadataBuilders[i] = labels.NewBuilder(labels.EmptyLabels()) + buf.parsedBuilders[i] = labels.NewBuilder(labels.EmptyLabels()) } + buf.len = newLen +} - p.len = newLen +func (buf *rowsBuffer) resetRow(i int) { + buf.timestamps[i] = time.Time{} + buf.lines[i] = "" + buf.lbsBuilders[i].Reset(labels.EmptyLabels()) + buf.metadataBuilders[i].Reset(labels.EmptyLabels()) + buf.parsedBuilders[i].Reset(labels.EmptyLabels()) } func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) { @@ -192,6 +203,7 @@ func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) { line := b.rowsBuffer.lines[rowIdx] // Ignore rows that don't have stream labels, log line, or timestamp if line == "" || ts.IsZero() || lbs.IsEmpty() { + b.rowsBuffer.resetRow(rowIdx) continue } @@ -201,6 +213,7 @@ func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) { StructuredMetadata: logproto.FromLabelsToLabelAdapters(b.rowsBuffer.metadataBuilders[rowIdx].Labels()), Parsed: logproto.FromLabelsToLabelAdapters(b.rowsBuffer.parsedBuilders[rowIdx].Labels()), } + b.rowsBuffer.resetRow(rowIdx) // Add entry to appropriate stream key := lbs.String() From 118d8714870f48387fe36a2ccca932b903959f00 Mon Sep 17 00:00:00 2001 From: Ivan Kalita Date: Mon, 20 Oct 2025 16:07:38 +0200 Subject: [PATCH 6/7] Use slice of structs --- pkg/engine/compat.go | 148 +++++++++++-------------- pkg/engine/compat_test.go | 223 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 289 insertions(+), 82 deletions(-) diff --git a/pkg/engine/compat.go b/pkg/engine/compat.go index 6e986858a9ad1..a83362bea0923 100644 --- a/pkg/engine/compat.go +++ b/pkg/engine/compat.go @@ -9,14 +9,14 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" + "github.com/grafana/loki/pkg/push" + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/engine/internal/semconv" "github.com/grafana/loki/v3/pkg/engine/internal/types" - "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logqlmodel" "github.com/grafana/loki/v3/pkg/logqlmodel/metadata" "github.com/grafana/loki/v3/pkg/logqlmodel/stats" - - "github.com/grafana/loki/pkg/push" ) type ResultBuilder interface { @@ -33,9 +33,9 @@ var ( func newStreamsResultBuilder() *streamsResultBuilder { return &streamsResultBuilder{ - data: make(logqlmodel.Streams, 0), - streams: make(map[string]int), - rowsBuffer: &rowsBuffer{}, + data: make(logqlmodel.Streams, 0), + streams: make(map[string]int), + rowBuilders: nil, } } @@ -44,65 +44,16 @@ type streamsResultBuilder struct { data logqlmodel.Streams count int - // buffer of rows to be reused between calls to CollectRecord to reduce reallocations of slices and builders - rowsBuffer *rowsBuffer + // buffer for rows + rowBuilders []rowBuilder } -type rowsBuffer struct { - len int - timestamps []time.Time - lines []string - lbsBuilders []*labels.Builder - metadataBuilders []*labels.Builder - parsedBuilders []*labels.Builder -} - -func (buf *rowsBuffer) prepareFor(newLen int) { - if newLen == buf.len { - return - } - - if newLen < buf.len { - // free not used items at the end of the slices so they can be GC-ed - clear(buf.timestamps[newLen:buf.len]) - clear(buf.lines[newLen:buf.len]) - clear(buf.lbsBuilders[newLen:buf.len]) - clear(buf.metadataBuilders[newLen:buf.len]) - clear(buf.parsedBuilders[newLen:buf.len]) - - // shrink to the new length, no need to zero the items as it was done before via resetRow(i) - buf.timestamps = buf.timestamps[:newLen] - buf.lines = buf.lines[:newLen] - buf.lbsBuilders = buf.lbsBuilders[:newLen] - buf.metadataBuilders = buf.metadataBuilders[:newLen] - buf.parsedBuilders = buf.parsedBuilders[:newLen] - - buf.len = newLen - - return - } - - // newLen > buf.len - numRowsToAdd := newLen - buf.len - buf.timestamps = append(buf.timestamps, make([]time.Time, numRowsToAdd)...) - buf.lines = append(buf.lines, make([]string, numRowsToAdd)...) - buf.lbsBuilders = append(buf.lbsBuilders, make([]*labels.Builder, numRowsToAdd)...) - buf.metadataBuilders = append(buf.metadataBuilders, make([]*labels.Builder, numRowsToAdd)...) - buf.parsedBuilders = append(buf.parsedBuilders, make([]*labels.Builder, numRowsToAdd)...) - for i := buf.len; i < newLen; i++ { - buf.lbsBuilders[i] = labels.NewBuilder(labels.EmptyLabels()) - buf.metadataBuilders[i] = labels.NewBuilder(labels.EmptyLabels()) - buf.parsedBuilders[i] = labels.NewBuilder(labels.EmptyLabels()) - } - buf.len = newLen -} - -func (buf *rowsBuffer) resetRow(i int) { - buf.timestamps[i] = time.Time{} - buf.lines[i] = "" - buf.lbsBuilders[i].Reset(labels.EmptyLabels()) - buf.metadataBuilders[i].Reset(labels.EmptyLabels()) - buf.parsedBuilders[i].Reset(labels.EmptyLabels()) +type rowBuilder struct { + timestamp time.Time + line string + lbsBuilder *labels.Builder + metadataBuilder *labels.Builder + parsedBuilder *labels.Builder } func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) { @@ -125,7 +76,7 @@ func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) { // first all the timestamps, then all the log lines, etc. // After all the values are collected and converted we transform the columnar representation to a row-based one. - b.rowsBuffer.prepareFor(numRows) + b.ensureRowBuilders(numRows) // Convert arrow values to our format column by column for colIdx := range int(rec.NumCols()) { @@ -144,21 +95,21 @@ func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) { case ident.Equal(semconv.ColumnIdentMessage): lineCol := col.(*array.String) forEachNotNullRowColValue(numRows, lineCol, func(rowIdx int) { - b.rowsBuffer.lines[rowIdx] = lineCol.Value(rowIdx) + b.rowBuilders[rowIdx].line = lineCol.Value(rowIdx) }) // Timestamp case ident.Equal(semconv.ColumnIdentTimestamp): tsCol := col.(*array.Timestamp) forEachNotNullRowColValue(numRows, tsCol, func(rowIdx int) { - b.rowsBuffer.timestamps[rowIdx] = time.Unix(0, int64(tsCol.Value(rowIdx))) + b.rowBuilders[rowIdx].timestamp = time.Unix(0, int64(tsCol.Value(rowIdx))) }) // One of the label columns case ident.ColumnType() == types.ColumnTypeLabel: labelCol := col.(*array.String) forEachNotNullRowColValue(numRows, labelCol, func(rowIdx int) { - b.rowsBuffer.lbsBuilders[rowIdx].Set(shortName, labelCol.Value(rowIdx)) + b.rowBuilders[rowIdx].lbsBuilder.Set(shortName, labelCol.Value(rowIdx)) }) // One of the metadata columns @@ -166,9 +117,8 @@ func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) { metadataCol := col.(*array.String) forEachNotNullRowColValue(numRows, metadataCol, func(rowIdx int) { val := metadataCol.Value(rowIdx) - b.rowsBuffer.metadataBuilders[rowIdx].Set(shortName, val) - // include structured metadata in stream labels - b.rowsBuffer.lbsBuilders[rowIdx].Set(shortName, val) + b.rowBuilders[rowIdx].metadataBuilder.Set(shortName, val) + b.rowBuilders[rowIdx].lbsBuilder.Set(shortName, val) }) // One of the parsed columns @@ -184,13 +134,13 @@ func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) { forEachNotNullRowColValue(numRows, parsedCol, func(rowIdx int) { parsedVal := parsedCol.Value(rowIdx) - if b.rowsBuffer.parsedBuilders[rowIdx].Get(shortName) != "" { + if b.rowBuilders[rowIdx].parsedBuilder.Get(shortName) != "" { return } - b.rowsBuffer.parsedBuilders[rowIdx].Set(shortName, parsedVal) - b.rowsBuffer.lbsBuilders[rowIdx].Set(shortName, parsedVal) - if b.rowsBuffer.metadataBuilders[rowIdx].Get(shortName) != "" { - b.rowsBuffer.metadataBuilders[rowIdx].Del(shortName) + b.rowBuilders[rowIdx].parsedBuilder.Set(shortName, parsedVal) + b.rowBuilders[rowIdx].lbsBuilder.Set(shortName, parsedVal) + if b.rowBuilders[rowIdx].metadataBuilder.Get(shortName) != "" { + b.rowBuilders[rowIdx].metadataBuilder.Del(shortName) } }) } @@ -198,22 +148,22 @@ func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) { // Convert columnar representation to a row-based one for rowIdx := 0; rowIdx < numRows; rowIdx++ { - lbs := b.rowsBuffer.lbsBuilders[rowIdx].Labels() - ts := b.rowsBuffer.timestamps[rowIdx] - line := b.rowsBuffer.lines[rowIdx] + lbs := b.rowBuilders[rowIdx].lbsBuilder.Labels() + ts := b.rowBuilders[rowIdx].timestamp + line := b.rowBuilders[rowIdx].line // Ignore rows that don't have stream labels, log line, or timestamp if line == "" || ts.IsZero() || lbs.IsEmpty() { - b.rowsBuffer.resetRow(rowIdx) + b.resetRowBuilder(rowIdx) continue } entry := logproto.Entry{ Timestamp: ts, Line: line, - StructuredMetadata: logproto.FromLabelsToLabelAdapters(b.rowsBuffer.metadataBuilders[rowIdx].Labels()), - Parsed: logproto.FromLabelsToLabelAdapters(b.rowsBuffer.parsedBuilders[rowIdx].Labels()), + StructuredMetadata: logproto.FromLabelsToLabelAdapters(b.rowBuilders[rowIdx].metadataBuilder.Labels()), + Parsed: logproto.FromLabelsToLabelAdapters(b.rowBuilders[rowIdx].parsedBuilder.Labels()), } - b.rowsBuffer.resetRow(rowIdx) + b.resetRowBuilder(rowIdx) // Add entry to appropriate stream key := lbs.String() @@ -228,6 +178,40 @@ func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) { } } +func (b *streamsResultBuilder) ensureRowBuilders(newLen int) { + if newLen == len(b.rowBuilders) { + return + } + + if newLen < len(b.rowBuilders) { + // free not used items at the end of the slices so they can be GC-ed + clear(b.rowBuilders[newLen:len(b.rowBuilders)]) + b.rowBuilders = b.rowBuilders[:newLen] + + return + } + + // newLen > buf.len + numRowsToAdd := newLen - len(b.rowBuilders) + oldLen := len(b.rowBuilders) + b.rowBuilders = append(b.rowBuilders, make([]rowBuilder, numRowsToAdd)...) + for i := oldLen; i < newLen; i++ { + b.rowBuilders[i] = rowBuilder{ + lbsBuilder: labels.NewBuilder(labels.EmptyLabels()), + metadataBuilder: labels.NewBuilder(labels.EmptyLabels()), + parsedBuilder: labels.NewBuilder(labels.EmptyLabels()), + } + } +} + +func (b *streamsResultBuilder) resetRowBuilder(i int) { + b.rowBuilders[i].timestamp = time.Time{} + b.rowBuilders[i].line = "" + b.rowBuilders[i].lbsBuilder.Reset(labels.EmptyLabels()) + b.rowBuilders[i].metadataBuilder.Reset(labels.EmptyLabels()) + b.rowBuilders[i].parsedBuilder.Reset(labels.EmptyLabels()) +} + func forEachNotNullRowColValue(numRows int, col arrow.Array, f func(rowIdx int)) { for rowIdx := 0; rowIdx < numRows; rowIdx++ { if col.IsNull(rowIdx) { diff --git a/pkg/engine/compat_test.go b/pkg/engine/compat_test.go index 9c5fc5881f420..4893d9a48533d 100644 --- a/pkg/engine/compat_test.go +++ b/pkg/engine/compat_test.go @@ -197,6 +197,229 @@ func TestStreamsResultBuilder(t *testing.T) { } require.Equal(t, expected, result.Data.(logqlmodel.Streams)) }) + + t.Run("multiple records with different streams are accumulated correctly", func(t *testing.T) { + colTs := semconv.ColumnIdentTimestamp + colMsg := semconv.ColumnIdentMessage + colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String) + + schema := arrow.NewSchema( + []arrow.Field{ + semconv.FieldFromIdent(colTs, false), + semconv.FieldFromIdent(colMsg, false), + semconv.FieldFromIdent(colEnv, false), + }, + nil, + ) + + // First record: prod and dev streams + rows1 := arrowtest.Rows{ + { + colTs.FQN(): time.Unix(0, 1620000000000000001).UTC(), + colMsg.FQN(): "log line 1", + colEnv.FQN(): "prod", + }, + { + colTs.FQN(): time.Unix(0, 1620000000000000002).UTC(), + colMsg.FQN(): "log line 2", + colEnv.FQN(): "dev", + }, + } + record1 := rows1.Record(alloc, schema) + defer record1.Release() + + // Second record: prod and staging streams + rows2 := arrowtest.Rows{ + { + colTs.FQN(): time.Unix(0, 1620000000000000003).UTC(), + colMsg.FQN(): "log line 3", + colEnv.FQN(): "prod", + }, + { + colTs.FQN(): time.Unix(0, 1620000000000000004).UTC(), + colMsg.FQN(): "log line 4", + colEnv.FQN(): "staging", + }, + } + record2 := rows2.Record(alloc, schema) + defer record2.Release() + + builder := newStreamsResultBuilder() + + // Collect first record + builder.CollectRecord(record1) + require.Equal(t, 2, builder.Len(), "should have 2 entries after first record") + + // Collect second record + builder.CollectRecord(record2) + require.Equal(t, 4, builder.Len(), "should have 4 entries total after second record") + + md, _ := metadata.NewContext(t.Context()) + result := builder.Build(stats.Result{}, md) + streams := result.Data.(logqlmodel.Streams) + // Note: 3 unique streams (dev, prod, staging), but 4 total entries + // The prod stream has 2 entries (one from each record) + require.Equal(t, 3, len(streams), "should have 3 unique streams") + + // Verify stream grouping - prod stream should have entries from both records + expected := logqlmodel.Streams{ + push.Stream{ + Labels: labels.FromStrings("env", "dev").String(), + Entries: []logproto.Entry{ + {Line: "log line 2", Timestamp: time.Unix(0, 1620000000000000002), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.Labels{}), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})}, + }, + }, + push.Stream{ + Labels: labels.FromStrings("env", "prod").String(), + Entries: []logproto.Entry{ + {Line: "log line 1", Timestamp: time.Unix(0, 1620000000000000001), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.Labels{}), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})}, + {Line: "log line 3", Timestamp: time.Unix(0, 1620000000000000003), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.Labels{}), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})}, + }, + }, + push.Stream{ + Labels: labels.FromStrings("env", "staging").String(), + Entries: []logproto.Entry{ + {Line: "log line 4", Timestamp: time.Unix(0, 1620000000000000004), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.Labels{}), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})}, + }, + }, + } + require.Equal(t, expected, streams) + }) + + t.Run("buffer reuse with varying record sizes", func(t *testing.T) { + colTs := semconv.ColumnIdentTimestamp + colMsg := semconv.ColumnIdentMessage + colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String) + + schema := arrow.NewSchema( + []arrow.Field{ + semconv.FieldFromIdent(colTs, false), + semconv.FieldFromIdent(colMsg, false), + semconv.FieldFromIdent(colEnv, false), + }, + nil, + ) + + builder := newStreamsResultBuilder() + + // First record: 5 rows (buffer grows to 5) + rows1 := make(arrowtest.Rows, 5) + for i := 0; i < 5; i++ { + rows1[i] = arrowtest.Row{ + colTs.FQN(): time.Unix(0, int64(1620000000000000001+i)).UTC(), + colMsg.FQN(): "log line", + colEnv.FQN(): "prod", + } + } + record1 := rows1.Record(alloc, schema) + builder.CollectRecord(record1) + record1.Release() + require.Equal(t, 5, builder.Len()) + require.Equal(t, 5, len(builder.rowBuilders), "buffer should have 5 rowBuilders") + + // Second record: 2 rows (buffer shrinks to 2) + rows2 := make(arrowtest.Rows, 2) + for i := 0; i < 2; i++ { + rows2[i] = arrowtest.Row{ + colTs.FQN(): time.Unix(0, int64(1620000000000000010+i)).UTC(), + colMsg.FQN(): "log line", + colEnv.FQN(): "dev", + } + } + record2 := rows2.Record(alloc, schema) + builder.CollectRecord(record2) + record2.Release() + require.Equal(t, 7, builder.Len()) + require.Equal(t, 2, len(builder.rowBuilders), "buffer should shrink to 2 rowBuilders") + + // Third record: 10 rows (buffer grows to 10) + rows3 := make(arrowtest.Rows, 10) + for i := 0; i < 10; i++ { + rows3[i] = arrowtest.Row{ + colTs.FQN(): time.Unix(0, int64(1620000000000000020+i)).UTC(), + colMsg.FQN(): "log line", + colEnv.FQN(): "staging", + } + } + record3 := rows3.Record(alloc, schema) + builder.CollectRecord(record3) + record3.Release() + require.Equal(t, 17, builder.Len()) + require.Equal(t, 10, len(builder.rowBuilders), "buffer should grow to 10 rowBuilders") + + // Verify all rowBuilders are properly initialized + for i := 0; i < len(builder.rowBuilders); i++ { + require.NotNil(t, builder.rowBuilders[i].lbsBuilder, "lbsBuilder should be initialized") + require.NotNil(t, builder.rowBuilders[i].metadataBuilder, "metadataBuilder should be initialized") + require.NotNil(t, builder.rowBuilders[i].parsedBuilder, "parsedBuilder should be initialized") + } + }) + + t.Run("empty records mixed with valid records", func(t *testing.T) { + colTs := semconv.ColumnIdentTimestamp + colMsg := semconv.ColumnIdentMessage + colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String) + + schema := arrow.NewSchema( + []arrow.Field{ + semconv.FieldFromIdent(colTs, false), + semconv.FieldFromIdent(colMsg, false), + semconv.FieldFromIdent(colEnv, false), + }, + nil, + ) + + builder := newStreamsResultBuilder() + + // First record: 3 valid rows + rows1 := make(arrowtest.Rows, 3) + for i := 0; i < 3; i++ { + rows1[i] = arrowtest.Row{ + colTs.FQN(): time.Unix(0, int64(1620000000000000001+i)).UTC(), + colMsg.FQN(): "log line", + colEnv.FQN(): "prod", + } + } + record1 := rows1.Record(alloc, schema) + builder.CollectRecord(record1) + record1.Release() + require.Equal(t, 3, builder.Len()) + + // Second record: empty (0 rows) + rows2 := arrowtest.Rows{} + record2 := rows2.Record(alloc, schema) + builder.CollectRecord(record2) + record2.Release() + require.Equal(t, 3, builder.Len(), "empty record should not change count") + + // Third record: 2 valid rows + rows3 := make(arrowtest.Rows, 2) + for i := 0; i < 2; i++ { + rows3[i] = arrowtest.Row{ + colTs.FQN(): time.Unix(0, int64(1620000000000000010+i)).UTC(), + colMsg.FQN(): "log line", + colEnv.FQN(): "dev", + } + } + record3 := rows3.Record(alloc, schema) + builder.CollectRecord(record3) + record3.Release() + require.Equal(t, 5, builder.Len(), "should have 5 total entries") + + // Verify final result + md, _ := metadata.NewContext(t.Context()) + result := builder.Build(stats.Result{}, md) + streams := result.Data.(logqlmodel.Streams) + // Note: 2 unique streams (prod with 3 entries, dev with 2 entries) = 5 total entries + require.Equal(t, 2, len(streams), "should have 2 unique streams") + + // Verify the streams have the correct number of entries + var totalEntries int + for _, stream := range streams { + totalEntries += len(stream.Entries) + } + require.Equal(t, 5, totalEntries, "should have 5 total entries across both streams") + }) } func TestVectorResultBuilder(t *testing.T) { From 3bfcf9ceda1fbe438c254c02e41b076919590ae9 Mon Sep 17 00:00:00 2001 From: Ivan Kalita Date: Thu, 23 Oct 2025 10:24:40 +0200 Subject: [PATCH 7/7] review comments and rebase fixes --- pkg/engine/compat.go | 4 ++-- pkg/engine/compat_test.go | 16 ++++++++-------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/engine/compat.go b/pkg/engine/compat.go index a83362bea0923..7a92eb44d42ff 100644 --- a/pkg/engine/compat.go +++ b/pkg/engine/compat.go @@ -10,10 +10,10 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/grafana/loki/pkg/push" - "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/engine/internal/semconv" "github.com/grafana/loki/v3/pkg/engine/internal/types" + "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logqlmodel" "github.com/grafana/loki/v3/pkg/logqlmodel/metadata" "github.com/grafana/loki/v3/pkg/logqlmodel/stats" @@ -147,7 +147,7 @@ func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) { } // Convert columnar representation to a row-based one - for rowIdx := 0; rowIdx < numRows; rowIdx++ { + for rowIdx := range numRows { lbs := b.rowBuilders[rowIdx].lbsBuilder.Labels() ts := b.rowBuilders[rowIdx].timestamp line := b.rowBuilders[rowIdx].line diff --git a/pkg/engine/compat_test.go b/pkg/engine/compat_test.go index 4893d9a48533d..ca526cc7ee610 100644 --- a/pkg/engine/compat_test.go +++ b/pkg/engine/compat_test.go @@ -225,7 +225,7 @@ func TestStreamsResultBuilder(t *testing.T) { colEnv.FQN(): "dev", }, } - record1 := rows1.Record(alloc, schema) + record1 := rows1.Record(memory.DefaultAllocator, schema) defer record1.Release() // Second record: prod and staging streams @@ -241,7 +241,7 @@ func TestStreamsResultBuilder(t *testing.T) { colEnv.FQN(): "staging", }, } - record2 := rows2.Record(alloc, schema) + record2 := rows2.Record(memory.DefaultAllocator, schema) defer record2.Release() builder := newStreamsResultBuilder() @@ -311,7 +311,7 @@ func TestStreamsResultBuilder(t *testing.T) { colEnv.FQN(): "prod", } } - record1 := rows1.Record(alloc, schema) + record1 := rows1.Record(memory.DefaultAllocator, schema) builder.CollectRecord(record1) record1.Release() require.Equal(t, 5, builder.Len()) @@ -326,7 +326,7 @@ func TestStreamsResultBuilder(t *testing.T) { colEnv.FQN(): "dev", } } - record2 := rows2.Record(alloc, schema) + record2 := rows2.Record(memory.DefaultAllocator, schema) builder.CollectRecord(record2) record2.Release() require.Equal(t, 7, builder.Len()) @@ -341,7 +341,7 @@ func TestStreamsResultBuilder(t *testing.T) { colEnv.FQN(): "staging", } } - record3 := rows3.Record(alloc, schema) + record3 := rows3.Record(memory.DefaultAllocator, schema) builder.CollectRecord(record3) record3.Release() require.Equal(t, 17, builder.Len()) @@ -380,14 +380,14 @@ func TestStreamsResultBuilder(t *testing.T) { colEnv.FQN(): "prod", } } - record1 := rows1.Record(alloc, schema) + record1 := rows1.Record(memory.DefaultAllocator, schema) builder.CollectRecord(record1) record1.Release() require.Equal(t, 3, builder.Len()) // Second record: empty (0 rows) rows2 := arrowtest.Rows{} - record2 := rows2.Record(alloc, schema) + record2 := rows2.Record(memory.DefaultAllocator, schema) builder.CollectRecord(record2) record2.Release() require.Equal(t, 3, builder.Len(), "empty record should not change count") @@ -401,7 +401,7 @@ func TestStreamsResultBuilder(t *testing.T) { colEnv.FQN(): "dev", } } - record3 := rows3.Record(alloc, schema) + record3 := rows3.Record(memory.DefaultAllocator, schema) builder.CollectRecord(record3) record3.Release() require.Equal(t, 5, builder.Len(), "should have 5 total entries")