Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 152 additions & 79 deletions pkg/engine/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"

"github.com/grafana/loki/pkg/push"

"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logqlmodel"
"github.com/grafana/loki/v3/pkg/logqlmodel/metadata"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"

"github.com/grafana/loki/pkg/push"
)

type ResultBuilder interface {
Expand All @@ -33,119 +33,192 @@ var (

func newStreamsResultBuilder() *streamsResultBuilder {
return &streamsResultBuilder{
data: make(logqlmodel.Streams, 0),
streams: make(map[string]int),
data: make(logqlmodel.Streams, 0),
streams: make(map[string]int),
rowBuilders: nil,
}
}

type streamsResultBuilder struct {
streams map[string]int
data logqlmodel.Streams
count int
}

func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) {
for row := range int(rec.NumRows()) {
stream, entry := b.collectRow(rec, row)

// Ignore rows that don't have stream labels, log line, or timestamp
if stream.IsEmpty() || entry.Line == "" || entry.Timestamp.Equal(time.Time{}) {
continue
}
// buffer for rows
rowBuilders []rowBuilder
}

// Add the entry to the result builder
key := stream.String()
idx, ok := b.streams[key]
if !ok {
idx = len(b.data)
b.streams[key] = idx
b.data = append(b.data, push.Stream{Labels: key})
}
b.data[idx].Entries = append(b.data[idx].Entries, entry)
b.count++
}
type rowBuilder struct {
timestamp time.Time
line string
lbsBuilder *labels.Builder
metadataBuilder *labels.Builder
parsedBuilder *labels.Builder
}

func (b *streamsResultBuilder) collectRow(rec arrow.Record, i int) (labels.Labels, logproto.Entry) {
var entry logproto.Entry
lbs := labels.NewBuilder(labels.EmptyLabels())
metadata := labels.NewBuilder(labels.EmptyLabels())
parsed := labels.NewBuilder(labels.EmptyLabels())
func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) {
numRows := int(rec.NumRows())
if numRows == 0 {
return
}

// let's say we have the following log entries in rec:
// - {labelenv="prod-1", metadatatrace="123-1", parsed="v1"} ts1 line 1
// - {labelenv="prod-2", metadatatrace="123-2", parsed="v2"} ts2 line 2
// - {labelenv="prod-3", metadatatrace="123-3", parsed="v3"} ts3 line 3
// we pre-initialize slices to store column values for all the rows, e.g.:
// rows | 1 | 2 | 3 | ...
// ==============+=========+=========+=========+====
// timestamps | r1 ts | r2 ts | r3 ts | ...
// lines | r1 line | r2 line | r3 line | ...
// ...
// We iterate over the columns and convert the values to our format column by column, e.g.,
// first all the timestamps, then all the log lines, etc.
// After all the values are collected and converted we transform the columnar representation to a row-based one.

b.ensureRowBuilders(numRows)

// Convert arrow values to our format column by column
for colIdx := range int(rec.NumCols()) {
col := rec.Column(colIdx)
// Ignore column values that are NULL or invalid
if col.IsNull(i) || !col.IsValid(i) {
continue
}

field := rec.Schema().Field(colIdx)
ident, err := semconv.ParseFQN(field.Name)
if err != nil {
continue
}

shortName := ident.ShortName()

// Extract line
if ident.Equal(semconv.ColumnIdentMessage) {
entry.Line = col.(*array.String).Value(i)
continue
switch true {

// Log line
case ident.Equal(semconv.ColumnIdentMessage):
lineCol := col.(*array.String)
forEachNotNullRowColValue(numRows, lineCol, func(rowIdx int) {
b.rowBuilders[rowIdx].line = lineCol.Value(rowIdx)
})

// Timestamp
case ident.Equal(semconv.ColumnIdentTimestamp):
tsCol := col.(*array.Timestamp)
forEachNotNullRowColValue(numRows, tsCol, func(rowIdx int) {
b.rowBuilders[rowIdx].timestamp = time.Unix(0, int64(tsCol.Value(rowIdx)))
})

// One of the label columns
case ident.ColumnType() == types.ColumnTypeLabel:
labelCol := col.(*array.String)
forEachNotNullRowColValue(numRows, labelCol, func(rowIdx int) {
b.rowBuilders[rowIdx].lbsBuilder.Set(shortName, labelCol.Value(rowIdx))
})

// One of the metadata columns
case ident.ColumnType() == types.ColumnTypeMetadata:
metadataCol := col.(*array.String)
forEachNotNullRowColValue(numRows, metadataCol, func(rowIdx int) {
val := metadataCol.Value(rowIdx)
b.rowBuilders[rowIdx].metadataBuilder.Set(shortName, val)
b.rowBuilders[rowIdx].lbsBuilder.Set(shortName, val)
})

// One of the parsed columns
case ident.ColumnType() == types.ColumnTypeParsed:
parsedCol := col.(*array.String)

// TODO: keep errors if --strict is set
// These are reserved column names used to track parsing errors. We are dropping them until
// we add support for --strict parsing.
if shortName == types.ColumnNameError || shortName == types.ColumnNameErrorDetails {
continue
}

forEachNotNullRowColValue(numRows, parsedCol, func(rowIdx int) {
parsedVal := parsedCol.Value(rowIdx)
if b.rowBuilders[rowIdx].parsedBuilder.Get(shortName) != "" {
return
}
b.rowBuilders[rowIdx].parsedBuilder.Set(shortName, parsedVal)
b.rowBuilders[rowIdx].lbsBuilder.Set(shortName, parsedVal)
if b.rowBuilders[rowIdx].metadataBuilder.Get(shortName) != "" {
b.rowBuilders[rowIdx].metadataBuilder.Del(shortName)
}
})
}
}

// Extract timestamp
if ident.Equal(semconv.ColumnIdentTimestamp) {
entry.Timestamp = time.Unix(0, int64(col.(*array.Timestamp).Value(i)))
// Convert columnar representation to a row-based one
for rowIdx := range numRows {
lbs := b.rowBuilders[rowIdx].lbsBuilder.Labels()
ts := b.rowBuilders[rowIdx].timestamp
line := b.rowBuilders[rowIdx].line
// Ignore rows that don't have stream labels, log line, or timestamp
if line == "" || ts.IsZero() || lbs.IsEmpty() {
b.resetRowBuilder(rowIdx)
continue
}

// Extract label
if ident.ColumnType() == types.ColumnTypeLabel {
switch arr := col.(type) {
case *array.String:
lbs.Set(shortName, arr.Value(i))
}
continue
entry := logproto.Entry{
Timestamp: ts,
Line: line,
StructuredMetadata: logproto.FromLabelsToLabelAdapters(b.rowBuilders[rowIdx].metadataBuilder.Labels()),
Parsed: logproto.FromLabelsToLabelAdapters(b.rowBuilders[rowIdx].parsedBuilder.Labels()),
}
b.resetRowBuilder(rowIdx)

// Extract metadata
if ident.ColumnType() == types.ColumnTypeMetadata {
switch arr := col.(type) {
case *array.String:
metadata.Set(shortName, arr.Value(i))
// include structured metadata in stream labels
lbs.Set(shortName, arr.Value(i))
}
continue
// Add entry to appropriate stream
key := lbs.String()
idx, ok := b.streams[key]
if !ok {
idx = len(b.data)
b.streams[key] = idx
b.data = append(b.data, push.Stream{Labels: key})
}
b.data[idx].Entries = append(b.data[idx].Entries, entry)
b.count++
}
}

// Extract parsed
if ident.ColumnType() == types.ColumnTypeParsed {
switch arr := col.(type) {
case *array.String:
// TODO: keep errors if --strict is set
// These are reserved column names used to track parsing errors. We are dropping them until
// we add support for --strict parsing.
if shortName == types.ColumnNameError || shortName == types.ColumnNameErrorDetails {
continue
}
func (b *streamsResultBuilder) ensureRowBuilders(newLen int) {
if newLen == len(b.rowBuilders) {
return
}

if parsed.Get(shortName) != "" {
continue
}
if newLen < len(b.rowBuilders) {
// free not used items at the end of the slices so they can be GC-ed
clear(b.rowBuilders[newLen:len(b.rowBuilders)])
b.rowBuilders = b.rowBuilders[:newLen]

parsed.Set(shortName, arr.Value(i))
lbs.Set(shortName, arr.Value(i))
if metadata.Get(shortName) != "" {
metadata.Del(shortName)
}
}
return
}

// newLen > buf.len
numRowsToAdd := newLen - len(b.rowBuilders)
oldLen := len(b.rowBuilders)
b.rowBuilders = append(b.rowBuilders, make([]rowBuilder, numRowsToAdd)...)
for i := oldLen; i < newLen; i++ {
b.rowBuilders[i] = rowBuilder{
lbsBuilder: labels.NewBuilder(labels.EmptyLabels()),
metadataBuilder: labels.NewBuilder(labels.EmptyLabels()),
parsedBuilder: labels.NewBuilder(labels.EmptyLabels()),
}
}
entry.StructuredMetadata = logproto.FromLabelsToLabelAdapters(metadata.Labels())
entry.Parsed = logproto.FromLabelsToLabelAdapters(parsed.Labels())
}

func (b *streamsResultBuilder) resetRowBuilder(i int) {
b.rowBuilders[i].timestamp = time.Time{}
b.rowBuilders[i].line = ""
b.rowBuilders[i].lbsBuilder.Reset(labels.EmptyLabels())
b.rowBuilders[i].metadataBuilder.Reset(labels.EmptyLabels())
b.rowBuilders[i].parsedBuilder.Reset(labels.EmptyLabels())
}

return lbs.Labels(), entry
func forEachNotNullRowColValue(numRows int, col arrow.Array, f func(rowIdx int)) {
for rowIdx := 0; rowIdx < numRows; rowIdx++ {
if col.IsNull(rowIdx) {
continue
}
f(rowIdx)
}
}

func (b *streamsResultBuilder) Build(s stats.Result, md *metadata.Context) logqlmodel.Result {
Expand Down
Loading