diff --git a/pkg/engine/compat.go b/pkg/engine/compat.go index 25e637227cadc..7a92eb44d42ff 100644 --- a/pkg/engine/compat.go +++ b/pkg/engine/compat.go @@ -9,14 +9,14 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" + "github.com/grafana/loki/pkg/push" + "github.com/grafana/loki/v3/pkg/engine/internal/semconv" "github.com/grafana/loki/v3/pkg/engine/internal/types" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logqlmodel" "github.com/grafana/loki/v3/pkg/logqlmodel/metadata" "github.com/grafana/loki/v3/pkg/logqlmodel/stats" - - "github.com/grafana/loki/pkg/push" ) type ResultBuilder interface { @@ -33,8 +33,9 @@ var ( func newStreamsResultBuilder() *streamsResultBuilder { return &streamsResultBuilder{ - data: make(logqlmodel.Streams, 0), - streams: make(map[string]int), + data: make(logqlmodel.Streams, 0), + streams: make(map[string]int), + rowBuilders: nil, } } @@ -42,110 +43,182 @@ type streamsResultBuilder struct { streams map[string]int data logqlmodel.Streams count int -} - -func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) { - for row := range int(rec.NumRows()) { - stream, entry := b.collectRow(rec, row) - // Ignore rows that don't have stream labels, log line, or timestamp - if stream.IsEmpty() || entry.Line == "" || entry.Timestamp.Equal(time.Time{}) { - continue - } + // buffer for rows + rowBuilders []rowBuilder +} - // Add the entry to the result builder - key := stream.String() - idx, ok := b.streams[key] - if !ok { - idx = len(b.data) - b.streams[key] = idx - b.data = append(b.data, push.Stream{Labels: key}) - } - b.data[idx].Entries = append(b.data[idx].Entries, entry) - b.count++ - } +type rowBuilder struct { + timestamp time.Time + line string + lbsBuilder *labels.Builder + metadataBuilder *labels.Builder + parsedBuilder *labels.Builder } -func (b *streamsResultBuilder) collectRow(rec arrow.Record, i int) (labels.Labels, logproto.Entry) { - var entry logproto.Entry - lbs := labels.NewBuilder(labels.EmptyLabels()) - metadata := labels.NewBuilder(labels.EmptyLabels()) - parsed := labels.NewBuilder(labels.EmptyLabels()) +func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) { + numRows := int(rec.NumRows()) + if numRows == 0 { + return + } + // let's say we have the following log entries in rec: + // - {labelenv="prod-1", metadatatrace="123-1", parsed="v1"} ts1 line 1 + // - {labelenv="prod-2", metadatatrace="123-2", parsed="v2"} ts2 line 2 + // - {labelenv="prod-3", metadatatrace="123-3", parsed="v3"} ts3 line 3 + // we pre-initialize slices to store column values for all the rows, e.g.: + // rows | 1 | 2 | 3 | ... + // ==============+=========+=========+=========+==== + // timestamps | r1 ts | r2 ts | r3 ts | ... + // lines | r1 line | r2 line | r3 line | ... + // ... + // We iterate over the columns and convert the values to our format column by column, e.g., + // first all the timestamps, then all the log lines, etc. + // After all the values are collected and converted we transform the columnar representation to a row-based one. + + b.ensureRowBuilders(numRows) + + // Convert arrow values to our format column by column for colIdx := range int(rec.NumCols()) { col := rec.Column(colIdx) - // Ignore column values that are NULL or invalid - if col.IsNull(i) || !col.IsValid(i) { - continue - } field := rec.Schema().Field(colIdx) ident, err := semconv.ParseFQN(field.Name) if err != nil { continue } - shortName := ident.ShortName() - // Extract line - if ident.Equal(semconv.ColumnIdentMessage) { - entry.Line = col.(*array.String).Value(i) - continue + switch true { + + // Log line + case ident.Equal(semconv.ColumnIdentMessage): + lineCol := col.(*array.String) + forEachNotNullRowColValue(numRows, lineCol, func(rowIdx int) { + b.rowBuilders[rowIdx].line = lineCol.Value(rowIdx) + }) + + // Timestamp + case ident.Equal(semconv.ColumnIdentTimestamp): + tsCol := col.(*array.Timestamp) + forEachNotNullRowColValue(numRows, tsCol, func(rowIdx int) { + b.rowBuilders[rowIdx].timestamp = time.Unix(0, int64(tsCol.Value(rowIdx))) + }) + + // One of the label columns + case ident.ColumnType() == types.ColumnTypeLabel: + labelCol := col.(*array.String) + forEachNotNullRowColValue(numRows, labelCol, func(rowIdx int) { + b.rowBuilders[rowIdx].lbsBuilder.Set(shortName, labelCol.Value(rowIdx)) + }) + + // One of the metadata columns + case ident.ColumnType() == types.ColumnTypeMetadata: + metadataCol := col.(*array.String) + forEachNotNullRowColValue(numRows, metadataCol, func(rowIdx int) { + val := metadataCol.Value(rowIdx) + b.rowBuilders[rowIdx].metadataBuilder.Set(shortName, val) + b.rowBuilders[rowIdx].lbsBuilder.Set(shortName, val) + }) + + // One of the parsed columns + case ident.ColumnType() == types.ColumnTypeParsed: + parsedCol := col.(*array.String) + + // TODO: keep errors if --strict is set + // These are reserved column names used to track parsing errors. We are dropping them until + // we add support for --strict parsing. + if shortName == types.ColumnNameError || shortName == types.ColumnNameErrorDetails { + continue + } + + forEachNotNullRowColValue(numRows, parsedCol, func(rowIdx int) { + parsedVal := parsedCol.Value(rowIdx) + if b.rowBuilders[rowIdx].parsedBuilder.Get(shortName) != "" { + return + } + b.rowBuilders[rowIdx].parsedBuilder.Set(shortName, parsedVal) + b.rowBuilders[rowIdx].lbsBuilder.Set(shortName, parsedVal) + if b.rowBuilders[rowIdx].metadataBuilder.Get(shortName) != "" { + b.rowBuilders[rowIdx].metadataBuilder.Del(shortName) + } + }) } + } - // Extract timestamp - if ident.Equal(semconv.ColumnIdentTimestamp) { - entry.Timestamp = time.Unix(0, int64(col.(*array.Timestamp).Value(i))) + // Convert columnar representation to a row-based one + for rowIdx := range numRows { + lbs := b.rowBuilders[rowIdx].lbsBuilder.Labels() + ts := b.rowBuilders[rowIdx].timestamp + line := b.rowBuilders[rowIdx].line + // Ignore rows that don't have stream labels, log line, or timestamp + if line == "" || ts.IsZero() || lbs.IsEmpty() { + b.resetRowBuilder(rowIdx) continue } - // Extract label - if ident.ColumnType() == types.ColumnTypeLabel { - switch arr := col.(type) { - case *array.String: - lbs.Set(shortName, arr.Value(i)) - } - continue + entry := logproto.Entry{ + Timestamp: ts, + Line: line, + StructuredMetadata: logproto.FromLabelsToLabelAdapters(b.rowBuilders[rowIdx].metadataBuilder.Labels()), + Parsed: logproto.FromLabelsToLabelAdapters(b.rowBuilders[rowIdx].parsedBuilder.Labels()), } + b.resetRowBuilder(rowIdx) - // Extract metadata - if ident.ColumnType() == types.ColumnTypeMetadata { - switch arr := col.(type) { - case *array.String: - metadata.Set(shortName, arr.Value(i)) - // include structured metadata in stream labels - lbs.Set(shortName, arr.Value(i)) - } - continue + // Add entry to appropriate stream + key := lbs.String() + idx, ok := b.streams[key] + if !ok { + idx = len(b.data) + b.streams[key] = idx + b.data = append(b.data, push.Stream{Labels: key}) } + b.data[idx].Entries = append(b.data[idx].Entries, entry) + b.count++ + } +} - // Extract parsed - if ident.ColumnType() == types.ColumnTypeParsed { - switch arr := col.(type) { - case *array.String: - // TODO: keep errors if --strict is set - // These are reserved column names used to track parsing errors. We are dropping them until - // we add support for --strict parsing. - if shortName == types.ColumnNameError || shortName == types.ColumnNameErrorDetails { - continue - } +func (b *streamsResultBuilder) ensureRowBuilders(newLen int) { + if newLen == len(b.rowBuilders) { + return + } - if parsed.Get(shortName) != "" { - continue - } + if newLen < len(b.rowBuilders) { + // free not used items at the end of the slices so they can be GC-ed + clear(b.rowBuilders[newLen:len(b.rowBuilders)]) + b.rowBuilders = b.rowBuilders[:newLen] - parsed.Set(shortName, arr.Value(i)) - lbs.Set(shortName, arr.Value(i)) - if metadata.Get(shortName) != "" { - metadata.Del(shortName) - } - } + return + } + + // newLen > buf.len + numRowsToAdd := newLen - len(b.rowBuilders) + oldLen := len(b.rowBuilders) + b.rowBuilders = append(b.rowBuilders, make([]rowBuilder, numRowsToAdd)...) + for i := oldLen; i < newLen; i++ { + b.rowBuilders[i] = rowBuilder{ + lbsBuilder: labels.NewBuilder(labels.EmptyLabels()), + metadataBuilder: labels.NewBuilder(labels.EmptyLabels()), + parsedBuilder: labels.NewBuilder(labels.EmptyLabels()), } } - entry.StructuredMetadata = logproto.FromLabelsToLabelAdapters(metadata.Labels()) - entry.Parsed = logproto.FromLabelsToLabelAdapters(parsed.Labels()) +} + +func (b *streamsResultBuilder) resetRowBuilder(i int) { + b.rowBuilders[i].timestamp = time.Time{} + b.rowBuilders[i].line = "" + b.rowBuilders[i].lbsBuilder.Reset(labels.EmptyLabels()) + b.rowBuilders[i].metadataBuilder.Reset(labels.EmptyLabels()) + b.rowBuilders[i].parsedBuilder.Reset(labels.EmptyLabels()) +} - return lbs.Labels(), entry +func forEachNotNullRowColValue(numRows int, col arrow.Array, f func(rowIdx int)) { + for rowIdx := 0; rowIdx < numRows; rowIdx++ { + if col.IsNull(rowIdx) { + continue + } + f(rowIdx) + } } func (b *streamsResultBuilder) Build(s stats.Result, md *metadata.Context) logqlmodel.Result { diff --git a/pkg/engine/compat_bench_test.go b/pkg/engine/compat_bench_test.go new file mode 100644 index 0000000000000..f8b8fcd34ca7e --- /dev/null +++ b/pkg/engine/compat_bench_test.go @@ -0,0 +1,165 @@ +package engine + +import ( + "fmt" + "testing" + "time" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/memory" + + "github.com/grafana/loki/v3/pkg/engine/internal/semconv" + "github.com/grafana/loki/v3/pkg/engine/internal/types" + "github.com/grafana/loki/v3/pkg/util/arrowtest" +) + +func BenchmarkStreamsResultBuilder(b *testing.B) { + alloc := memory.NewGoAllocator() + + benchmarks := []struct { + name string + numRowsFirstRecord int + numRowsSecondRecord int + numLabels int + numMeta int + numParsed int + }{ + { + name: "records_equal_size", + numRowsFirstRecord: 1000, + numRowsSecondRecord: 1000, + numLabels: 10, + numMeta: 5, + numParsed: 8, + }, + { + name: "record_two_bigger", + numRowsFirstRecord: 1000, + numRowsSecondRecord: 2000, + numLabels: 10, + numMeta: 5, + numParsed: 8, + }, + { + name: "record_two_smaller", + numRowsFirstRecord: 1000, + numRowsSecondRecord: 500, + numLabels: 10, + numMeta: 5, + numParsed: 8, + }, + } + + for _, bm := range benchmarks { + b.Run(bm.name, func(b *testing.B) { + + schema, labelIdents, metaIdents, parsedIdents := prepareSchema(bm.numLabels, bm.numMeta, bm.numParsed) + baseTime := time.Unix(0, 1620000000000000000).UTC() + + rows1 := generateRows(labelIdents, metaIdents, parsedIdents, bm.numRowsFirstRecord, baseTime) + record1 := rows1.Record(alloc, schema) + defer record1.Release() + + rows2 := generateRows(labelIdents, metaIdents, parsedIdents, bm.numRowsSecondRecord, baseTime) + record2 := rows2.Record(alloc, schema) + defer record2.Release() + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + rb := newStreamsResultBuilder() + // Collect records twice on purpose to see how efficient CollectRecord is when the builder already has + // some data + rb.CollectRecord(record1) + rb.CollectRecord(record2) + + // Ensure the result is used to prevent compiler optimizations + if rb.Len() != bm.numRowsFirstRecord+bm.numRowsSecondRecord { + b.Fatalf("expected %d entries, got %d", bm.numRowsFirstRecord+bm.numRowsSecondRecord, rb.Len()) + } + } + }) + } +} + +func prepareSchema(numLabels int, numMeta int, numParsed int) (*arrow.Schema, []*semconv.Identifier, []*semconv.Identifier, []*semconv.Identifier) { + // Build schema + colTs := semconv.ColumnIdentTimestamp + colMsg := semconv.ColumnIdentMessage + fields := []arrow.Field{ + semconv.FieldFromIdent(colTs, false), + semconv.FieldFromIdent(colMsg, false), + } + + // Add label columns + labelIdents := make([]*semconv.Identifier, numLabels) + for i := 0; i < numLabels; i++ { + ident := semconv.NewIdentifier( + fmt.Sprintf("label_%d", i), + types.ColumnTypeLabel, + types.Loki.String, + ) + labelIdents[i] = ident + fields = append(fields, semconv.FieldFromIdent(ident, false)) + } + + // Add metadata columns + metaIdents := make([]*semconv.Identifier, numMeta) + for i := 0; i < numMeta; i++ { + ident := semconv.NewIdentifier( + fmt.Sprintf("meta_%d", i), + types.ColumnTypeMetadata, + types.Loki.String, + ) + metaIdents[i] = ident + fields = append(fields, semconv.FieldFromIdent(ident, false)) + } + + // Add parsed columns + parsedIdents := make([]*semconv.Identifier, numParsed) + for i := 0; i < numParsed; i++ { + ident := semconv.NewIdentifier( + fmt.Sprintf("parsed_%d", i), + types.ColumnTypeParsed, + types.Loki.String, + ) + parsedIdents[i] = ident + fields = append(fields, semconv.FieldFromIdent(ident, false)) + } + + return arrow.NewSchema(fields, nil), labelIdents, metaIdents, parsedIdents +} + +func generateRows( + labelIdents []*semconv.Identifier, + metaIdents []*semconv.Identifier, + parsedIdents []*semconv.Identifier, + numRows int, + baseTime time.Time, +) arrowtest.Rows { + rows := make(arrowtest.Rows, numRows) + for rowIdx := 0; rowIdx < numRows; rowIdx++ { + row := make(map[string]any) + row[semconv.ColumnIdentTimestamp.FQN()] = baseTime.Add(time.Duration(rowIdx) * time.Nanosecond) + row[semconv.ColumnIdentMessage.FQN()] = fmt.Sprintf("log line %d with some additional text to make it more realistic", rowIdx) + + // Add label values + for labelIdx, ident := range labelIdents { + row[ident.FQN()] = fmt.Sprintf("label_%d_value_%d", labelIdx, rowIdx%10) + } + + // Add metadata values + for metaIdx, ident := range metaIdents { + row[ident.FQN()] = fmt.Sprintf("meta_%d_value_%d", metaIdx, rowIdx%5) + } + + // Add parsed values + for parsedIdx, ident := range parsedIdents { + row[ident.FQN()] = fmt.Sprintf("parsed_%d_value_%d", parsedIdx, rowIdx%3) + } + + rows[rowIdx] = row + } + return rows +} diff --git a/pkg/engine/compat_test.go b/pkg/engine/compat_test.go index 31eb77b1d5766..ca526cc7ee610 100644 --- a/pkg/engine/compat_test.go +++ b/pkg/engine/compat_test.go @@ -91,6 +91,13 @@ func TestStreamsResultBuilder(t *testing.T) { nil, ) rows := arrowtest.Rows{ + { + colTs.FQN(): nil, + colMsg.FQN(): "log line 0 (must be skipped)", + colEnv.FQN(): "dev", + colNs.FQN(): "loki-dev-001", + colTid.FQN(): "860e403fcf754312", + }, { colTs.FQN(): time.Unix(0, 1620000000000000001).UTC(), colMsg.FQN(): "log line 1", @@ -126,6 +133,13 @@ func TestStreamsResultBuilder(t *testing.T) { colNs.FQN(): "loki-dev-002", colTid.FQN(): "0cf883f112ad239b", }, + { + colTs.FQN(): time.Unix(0, 1620000000000000006).UTC(), + colMsg.FQN(): "log line 6", + colEnv.FQN(): "dev", + colNs.FQN(): nil, + colTid.FQN(): "9de325g124ad230b", + }, } record := rows.Record(memory.DefaultAllocator, schema) @@ -137,11 +151,11 @@ func TestStreamsResultBuilder(t *testing.T) { err := collectResult(context.Background(), pipeline, builder) require.NoError(t, err) - require.Equal(t, 5, builder.Len()) + require.Equal(t, 6, builder.Len()) md, _ := metadata.NewContext(t.Context()) result := builder.Build(stats.Result{}, md) - require.Equal(t, 5, result.Data.(logqlmodel.Streams).Len()) + require.Equal(t, 6, result.Data.(logqlmodel.Streams).Len()) expected := logqlmodel.Streams{ push.Stream{ @@ -162,6 +176,12 @@ func TestStreamsResultBuilder(t *testing.T) { {Line: "log line 3", Timestamp: time.Unix(0, 1620000000000000003), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "61330481e1e59b18")), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})}, }, }, + push.Stream{ + Labels: labels.FromStrings("env", "dev", "traceID", "9de325g124ad230b").String(), + Entries: []logproto.Entry{ + {Line: "log line 6", Timestamp: time.Unix(0, 1620000000000000006), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "9de325g124ad230b")), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})}, + }, + }, push.Stream{ Labels: labels.FromStrings("env", "prod", "namespace", "loki-prod-001", "traceID", "40e50221e284b9d2").String(), Entries: []logproto.Entry{ @@ -177,6 +197,229 @@ func TestStreamsResultBuilder(t *testing.T) { } require.Equal(t, expected, result.Data.(logqlmodel.Streams)) }) + + t.Run("multiple records with different streams are accumulated correctly", func(t *testing.T) { + colTs := semconv.ColumnIdentTimestamp + colMsg := semconv.ColumnIdentMessage + colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String) + + schema := arrow.NewSchema( + []arrow.Field{ + semconv.FieldFromIdent(colTs, false), + semconv.FieldFromIdent(colMsg, false), + semconv.FieldFromIdent(colEnv, false), + }, + nil, + ) + + // First record: prod and dev streams + rows1 := arrowtest.Rows{ + { + colTs.FQN(): time.Unix(0, 1620000000000000001).UTC(), + colMsg.FQN(): "log line 1", + colEnv.FQN(): "prod", + }, + { + colTs.FQN(): time.Unix(0, 1620000000000000002).UTC(), + colMsg.FQN(): "log line 2", + colEnv.FQN(): "dev", + }, + } + record1 := rows1.Record(memory.DefaultAllocator, schema) + defer record1.Release() + + // Second record: prod and staging streams + rows2 := arrowtest.Rows{ + { + colTs.FQN(): time.Unix(0, 1620000000000000003).UTC(), + colMsg.FQN(): "log line 3", + colEnv.FQN(): "prod", + }, + { + colTs.FQN(): time.Unix(0, 1620000000000000004).UTC(), + colMsg.FQN(): "log line 4", + colEnv.FQN(): "staging", + }, + } + record2 := rows2.Record(memory.DefaultAllocator, schema) + defer record2.Release() + + builder := newStreamsResultBuilder() + + // Collect first record + builder.CollectRecord(record1) + require.Equal(t, 2, builder.Len(), "should have 2 entries after first record") + + // Collect second record + builder.CollectRecord(record2) + require.Equal(t, 4, builder.Len(), "should have 4 entries total after second record") + + md, _ := metadata.NewContext(t.Context()) + result := builder.Build(stats.Result{}, md) + streams := result.Data.(logqlmodel.Streams) + // Note: 3 unique streams (dev, prod, staging), but 4 total entries + // The prod stream has 2 entries (one from each record) + require.Equal(t, 3, len(streams), "should have 3 unique streams") + + // Verify stream grouping - prod stream should have entries from both records + expected := logqlmodel.Streams{ + push.Stream{ + Labels: labels.FromStrings("env", "dev").String(), + Entries: []logproto.Entry{ + {Line: "log line 2", Timestamp: time.Unix(0, 1620000000000000002), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.Labels{}), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})}, + }, + }, + push.Stream{ + Labels: labels.FromStrings("env", "prod").String(), + Entries: []logproto.Entry{ + {Line: "log line 1", Timestamp: time.Unix(0, 1620000000000000001), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.Labels{}), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})}, + {Line: "log line 3", Timestamp: time.Unix(0, 1620000000000000003), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.Labels{}), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})}, + }, + }, + push.Stream{ + Labels: labels.FromStrings("env", "staging").String(), + Entries: []logproto.Entry{ + {Line: "log line 4", Timestamp: time.Unix(0, 1620000000000000004), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.Labels{}), Parsed: logproto.FromLabelsToLabelAdapters(labels.Labels{})}, + }, + }, + } + require.Equal(t, expected, streams) + }) + + t.Run("buffer reuse with varying record sizes", func(t *testing.T) { + colTs := semconv.ColumnIdentTimestamp + colMsg := semconv.ColumnIdentMessage + colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String) + + schema := arrow.NewSchema( + []arrow.Field{ + semconv.FieldFromIdent(colTs, false), + semconv.FieldFromIdent(colMsg, false), + semconv.FieldFromIdent(colEnv, false), + }, + nil, + ) + + builder := newStreamsResultBuilder() + + // First record: 5 rows (buffer grows to 5) + rows1 := make(arrowtest.Rows, 5) + for i := 0; i < 5; i++ { + rows1[i] = arrowtest.Row{ + colTs.FQN(): time.Unix(0, int64(1620000000000000001+i)).UTC(), + colMsg.FQN(): "log line", + colEnv.FQN(): "prod", + } + } + record1 := rows1.Record(memory.DefaultAllocator, schema) + builder.CollectRecord(record1) + record1.Release() + require.Equal(t, 5, builder.Len()) + require.Equal(t, 5, len(builder.rowBuilders), "buffer should have 5 rowBuilders") + + // Second record: 2 rows (buffer shrinks to 2) + rows2 := make(arrowtest.Rows, 2) + for i := 0; i < 2; i++ { + rows2[i] = arrowtest.Row{ + colTs.FQN(): time.Unix(0, int64(1620000000000000010+i)).UTC(), + colMsg.FQN(): "log line", + colEnv.FQN(): "dev", + } + } + record2 := rows2.Record(memory.DefaultAllocator, schema) + builder.CollectRecord(record2) + record2.Release() + require.Equal(t, 7, builder.Len()) + require.Equal(t, 2, len(builder.rowBuilders), "buffer should shrink to 2 rowBuilders") + + // Third record: 10 rows (buffer grows to 10) + rows3 := make(arrowtest.Rows, 10) + for i := 0; i < 10; i++ { + rows3[i] = arrowtest.Row{ + colTs.FQN(): time.Unix(0, int64(1620000000000000020+i)).UTC(), + colMsg.FQN(): "log line", + colEnv.FQN(): "staging", + } + } + record3 := rows3.Record(memory.DefaultAllocator, schema) + builder.CollectRecord(record3) + record3.Release() + require.Equal(t, 17, builder.Len()) + require.Equal(t, 10, len(builder.rowBuilders), "buffer should grow to 10 rowBuilders") + + // Verify all rowBuilders are properly initialized + for i := 0; i < len(builder.rowBuilders); i++ { + require.NotNil(t, builder.rowBuilders[i].lbsBuilder, "lbsBuilder should be initialized") + require.NotNil(t, builder.rowBuilders[i].metadataBuilder, "metadataBuilder should be initialized") + require.NotNil(t, builder.rowBuilders[i].parsedBuilder, "parsedBuilder should be initialized") + } + }) + + t.Run("empty records mixed with valid records", func(t *testing.T) { + colTs := semconv.ColumnIdentTimestamp + colMsg := semconv.ColumnIdentMessage + colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String) + + schema := arrow.NewSchema( + []arrow.Field{ + semconv.FieldFromIdent(colTs, false), + semconv.FieldFromIdent(colMsg, false), + semconv.FieldFromIdent(colEnv, false), + }, + nil, + ) + + builder := newStreamsResultBuilder() + + // First record: 3 valid rows + rows1 := make(arrowtest.Rows, 3) + for i := 0; i < 3; i++ { + rows1[i] = arrowtest.Row{ + colTs.FQN(): time.Unix(0, int64(1620000000000000001+i)).UTC(), + colMsg.FQN(): "log line", + colEnv.FQN(): "prod", + } + } + record1 := rows1.Record(memory.DefaultAllocator, schema) + builder.CollectRecord(record1) + record1.Release() + require.Equal(t, 3, builder.Len()) + + // Second record: empty (0 rows) + rows2 := arrowtest.Rows{} + record2 := rows2.Record(memory.DefaultAllocator, schema) + builder.CollectRecord(record2) + record2.Release() + require.Equal(t, 3, builder.Len(), "empty record should not change count") + + // Third record: 2 valid rows + rows3 := make(arrowtest.Rows, 2) + for i := 0; i < 2; i++ { + rows3[i] = arrowtest.Row{ + colTs.FQN(): time.Unix(0, int64(1620000000000000010+i)).UTC(), + colMsg.FQN(): "log line", + colEnv.FQN(): "dev", + } + } + record3 := rows3.Record(memory.DefaultAllocator, schema) + builder.CollectRecord(record3) + record3.Release() + require.Equal(t, 5, builder.Len(), "should have 5 total entries") + + // Verify final result + md, _ := metadata.NewContext(t.Context()) + result := builder.Build(stats.Result{}, md) + streams := result.Data.(logqlmodel.Streams) + // Note: 2 unique streams (prod with 3 entries, dev with 2 entries) = 5 total entries + require.Equal(t, 2, len(streams), "should have 2 unique streams") + + // Verify the streams have the correct number of entries + var totalEntries int + for _, stream := range streams { + totalEntries += len(stream.Entries) + } + require.Equal(t, 5, totalEntries, "should have 5 total entries across both streams") + }) } func TestVectorResultBuilder(t *testing.T) {