Skip to content

Commit b0f5ca7

Browse files
authored
chore: Dedup log objects during build (#19378)
1 parent c47fe46 commit b0f5ca7

File tree

3 files changed

+83
-9
lines changed

3 files changed

+83
-9
lines changed

pkg/dataobj/sections/logs/table_build.go

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package logs
22

33
import (
4+
"bytes"
45
"cmp"
56
"slices"
67

@@ -22,21 +23,30 @@ func buildTable(buf *tableBuffer, pageSize, pageRowCount int, compressionOpts da
2223
messageBuilder = buf.Message(pageSize, pageRowCount, compressionOpts)
2324
)
2425

25-
for i, record := range records {
26+
var prev Record
27+
row := 0
28+
for _, record := range records {
29+
if equalRecords(prev, record) {
30+
// Skip equal records
31+
continue
32+
}
33+
prev = record
34+
2635
// Append only fails if given out-of-order data, where the provided row
2736
// number is less than the previous row number. That can't happen here, so
2837
// to keep the code readable we ignore the error values.
2938

30-
_ = streamIDBuilder.Append(i, dataset.Int64Value(record.StreamID))
31-
_ = timestampBuilder.Append(i, dataset.Int64Value(record.Timestamp.UnixNano()))
32-
_ = messageBuilder.Append(i, dataset.BinaryValue(record.Line))
39+
_ = streamIDBuilder.Append(row, dataset.Int64Value(record.StreamID))
40+
_ = timestampBuilder.Append(row, dataset.Int64Value(record.Timestamp.UnixNano()))
41+
_ = messageBuilder.Append(row, dataset.BinaryValue(record.Line))
3342

3443
record.Metadata.Range(func(md labels.Label) {
3544
// Passing around md.Value as an unsafe slice is safe here: appending
3645
// values is always read-only and the byte slice will never be mutated.
3746
metadataBuilder := buf.Metadata(md.Name, pageSize, pageRowCount, compressionOpts)
38-
_ = metadataBuilder.Append(i, dataset.BinaryValue(unsafeSlice(md.Value, 0)))
47+
_ = metadataBuilder.Append(row, dataset.BinaryValue(unsafeSlice(md.Value, 0)))
3948
})
49+
row++
4050
}
4151

4252
table, err := buf.Flush()
@@ -56,3 +66,16 @@ func sortRecords(records []Record) {
5666
return cmp.Compare(a.StreamID, b.StreamID)
5767
})
5868
}
69+
70+
func equalRecords(a, b Record) bool {
71+
if a.StreamID != b.StreamID {
72+
return false
73+
}
74+
if a.Timestamp != b.Timestamp {
75+
return false
76+
}
77+
if !labels.Equal(a.Metadata, b.Metadata) {
78+
return false
79+
}
80+
return bytes.Equal(a.Line, b.Line)
81+
}

pkg/dataobj/sections/logs/table_merge.go

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package logs
22

33
import (
4+
"bytes"
45
"cmp"
56
"context"
67
"errors"
@@ -9,6 +10,7 @@ import (
910
"math"
1011

1112
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
13+
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
1214
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
1315
"github.com/grafana/loki/v3/pkg/util/loser"
1416
)
@@ -60,9 +62,7 @@ func mergeTables(buf *tableBuffer, pageSize, pageRowCount int, compressionOpts d
6062
messageBuilder = buf.Message(pageSize, pageRowCount, compressionOpts)
6163
)
6264

63-
var (
64-
tableSequences = make([]*tableSequence, 0, len(tables))
65-
)
65+
tableSequences := make([]*tableSequence, 0, len(tables))
6666
for _, t := range tables {
6767
dsetColumns, err := result.Collect(t.ListColumns(context.Background()))
6868
if err != nil {
@@ -96,6 +96,7 @@ func mergeTables(buf *tableBuffer, pageSize, pageRowCount int, compressionOpts d
9696
tree := loser.New(tableSequences, maxValue, tableSequenceAt, CompareForSortOrder(sort), tableSequenceClose)
9797
defer tree.Close()
9898

99+
var prev dataset.Row
99100
for tree.Next() {
100101
seq := tree.Winner()
101102

@@ -104,6 +105,12 @@ func mergeTables(buf *tableBuffer, pageSize, pageRowCount int, compressionOpts d
104105
return nil, err
105106
}
106107

108+
if equalRows(prev, row) {
109+
// Skip equal rows
110+
continue
111+
}
112+
prev = row
113+
107114
for i, column := range seq.columns {
108115
// column is guaranteed to be a *tableColumn since we got it from *table.
109116
column := column.(*tableColumn)
@@ -258,3 +265,41 @@ func CompareRows(a, b dataset.Row) int {
258265
}
259266
return cmp.Compare(bTimestamp, aTimestamp)
260267
}
268+
269+
// equalRows compares two rows for equality, column by column.
270+
// a row is considered equal if all the columns are equal.
271+
func equalRows(a, b dataset.Row) bool {
272+
if len(a.Values) != len(b.Values) {
273+
return false
274+
}
275+
276+
// The first two columns of each row are *always* stream ID and timestamp, so they will be checked first.
277+
// This means equalRows will exit quickly for rows with different timestamps without reading the rest of the columns.
278+
for i := 0; i < len(a.Values); i++ {
279+
aType, bType := a.Values[i].Type(), b.Values[i].Type()
280+
if aType != bType {
281+
return false
282+
}
283+
284+
switch aType {
285+
case datasetmd.PHYSICAL_TYPE_INT64:
286+
if a.Values[i].Int64() != b.Values[i].Int64() {
287+
return false
288+
}
289+
case datasetmd.PHYSICAL_TYPE_UINT64:
290+
if a.Values[i].Uint64() != b.Values[i].Uint64() {
291+
return false
292+
}
293+
case datasetmd.PHYSICAL_TYPE_BINARY:
294+
if !bytes.Equal(a.Values[i].Binary(), b.Values[i].Binary()) {
295+
return false
296+
}
297+
case datasetmd.PHYSICAL_TYPE_UNSPECIFIED:
298+
continue
299+
default:
300+
return false
301+
}
302+
}
303+
304+
return true
305+
}

pkg/dataobj/sections/logs/table_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ func initBuffer(buf *tableBuffer) {
4949
func Test_mergeTables(t *testing.T) {
5050
var buf tableBuffer
5151

52-
// tables need to be sorted by Timestamp DESC and StreamID ASC
52+
// tables need to be sorted by Timestamp DESC and StreamID ASC.
53+
// duplicates are added to ensure the resulting objects are deduplicated if all attributes match.
5354
var (
5455
tableA = buildTable(&buf, pageSize, pageRows, dataset.CompressionOptions{}, []Record{
5556
{StreamID: 3, Timestamp: time.Unix(3, 0), Line: []byte("hello")},
@@ -58,16 +59,21 @@ func Test_mergeTables(t *testing.T) {
5859
})
5960

6061
tableB = buildTable(&buf, pageSize, pageRows, dataset.CompressionOptions{}, []Record{
62+
{StreamID: 3, Timestamp: time.Unix(3, 0), Line: []byte("hello")}, // Duplicate in tableA
6163
{StreamID: 1, Timestamp: time.Unix(2, 0), Line: []byte("world")},
6264
{StreamID: 3, Timestamp: time.Unix(1, 0), Line: []byte("goodbye")},
6365
})
6466

6567
tableC = buildTable(&buf, pageSize, pageRows, dataset.CompressionOptions{}, []Record{
6668
{StreamID: 3, Timestamp: time.Unix(2, 0), Line: []byte("are")},
69+
{StreamID: 3, Timestamp: time.Unix(2, 0), Line: []byte("are")}, // Duplicate within tableC
6770
{StreamID: 2, Timestamp: time.Unix(1, 0), Line: []byte("doing?")},
6871
})
6972
)
7073

74+
// TableC should have been initially deduped by buildTable
75+
require.Equal(t, tableC.Timestamp.Desc.RowsCount, 2)
76+
7177
mergedTable, err := mergeTables(&buf, pageSize, pageRows, dataset.CompressionOptions{}, []*table{tableA, tableB, tableC}, SortTimestampDESC)
7278
require.NoError(t, err)
7379

0 commit comments

Comments
 (0)