Skip to content

Commit 2dac5ef

Browse files
authored
chore: Write empty message values in logs section writer (#19359)
1 parent 6ea1a53 commit 2dac5ef

File tree

7 files changed

+20
-22
lines changed

7 files changed

+20
-22
lines changed

pkg/dataobj/internal/dataset/column_test.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func TestColumnBuilder_ReadWrite(t *testing.T) {
5555
require.NoError(t, err)
5656
require.Equal(t, ColumnType{Physical: datasetmd.PHYSICAL_TYPE_BINARY, Logical: "data"}, col.Desc.Type)
5757
require.Equal(t, len(in), col.Desc.RowsCount)
58-
require.Equal(t, len(in)-2, col.Desc.ValuesCount) // -2 for the empty strings
58+
require.Equal(t, len(in), col.Desc.ValuesCount)
5959
require.GreaterOrEqual(t, len(col.Pages), len(in)/pageMaxRows)
6060

6161
t.Log("Uncompressed size:", col.Desc.UncompressedSize)
@@ -90,10 +90,6 @@ func TestColumnBuilder_ReadWrite(t *testing.T) {
9090

9191
func TestColumnBuilder_MinMax(t *testing.T) {
9292
var (
93-
// We include the null string in the test to ensure that it's never
94-
// considered in min/max ranges.
95-
nullString = ""
96-
9793
aString = strings.Repeat("a", 100)
9894
bString = strings.Repeat("b", 100)
9995
cString = strings.Repeat("c", 100)
@@ -104,8 +100,6 @@ func TestColumnBuilder_MinMax(t *testing.T) {
104100
)
105101

106102
in := []string{
107-
nullString,
108-
109103
// We append strings out-of-order below to ensure that the min/max
110104
// comparisons are working properly.
111105
//

pkg/dataobj/internal/dataset/page_builder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func (b *pageBuilder) canAppend(n, valueSize int) bool {
9999
// Append appends value into the pageBuilder. Append returns true if the data
100100
// was appended; false if the pageBuilder is full.
101101
func (b *pageBuilder) Append(value Value) bool {
102-
if value.IsNil() || value.IsZero() {
102+
if value.IsNil() {
103103
return b.AppendNull()
104104
}
105105

pkg/dataobj/internal/dataset/page_reader_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func Test_pageReader(t *testing.T) {
3434

3535
page := buildPage(t, opts, pageReaderTestStrings)
3636
require.Equal(t, len(pageReaderTestStrings), page.Desc.RowCount)
37-
require.Equal(t, len(pageReaderTestStrings)-2, page.Desc.ValuesCount) // -2 for the empty strings
37+
require.Equal(t, len(pageReaderTestStrings), page.Desc.ValuesCount)
3838

3939
t.Log("Uncompressed size: ", page.Desc.UncompressedSize)
4040
t.Log("Compressed size: ", page.Desc.CompressedSize)
@@ -57,7 +57,7 @@ func Test_pageReader_SeekToStart(t *testing.T) {
5757

5858
page := buildPage(t, opts, pageReaderTestStrings)
5959
require.Equal(t, len(pageReaderTestStrings), page.Desc.RowCount)
60-
require.Equal(t, len(pageReaderTestStrings)-2, page.Desc.ValuesCount) // -2 for the empty strings
60+
require.Equal(t, len(pageReaderTestStrings), page.Desc.ValuesCount)
6161

6262
t.Log("Uncompressed size: ", page.Desc.UncompressedSize)
6363
t.Log("Compressed size: ", page.Desc.CompressedSize)
@@ -87,7 +87,7 @@ func Test_pageReader_Reset(t *testing.T) {
8787

8888
page := buildPage(t, opts, pageReaderTestStrings)
8989
require.Equal(t, len(pageReaderTestStrings), page.Desc.RowCount)
90-
require.Equal(t, len(pageReaderTestStrings)-2, page.Desc.ValuesCount) // -2 for the empty strings
90+
require.Equal(t, len(pageReaderTestStrings), page.Desc.ValuesCount)
9191

9292
t.Log("Uncompressed size: ", page.Desc.UncompressedSize)
9393
t.Log("Compressed size: ", page.Desc.CompressedSize)
@@ -116,7 +116,7 @@ func Test_pageReader_SkipRows(t *testing.T) {
116116

117117
page := buildPage(t, opts, pageReaderTestStrings)
118118
require.Equal(t, len(pageReaderTestStrings), page.Desc.RowCount)
119-
require.Equal(t, len(pageReaderTestStrings)-2, page.Desc.ValuesCount) // -2 for the empty strings
119+
require.Equal(t, len(pageReaderTestStrings), page.Desc.ValuesCount)
120120

121121
t.Log("Uncompressed size: ", page.Desc.UncompressedSize)
122122
t.Log("Compressed size: ", page.Desc.CompressedSize)

pkg/dataobj/internal/dataset/page_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ func Test_pageBuilder_WriteRead(t *testing.T) {
155155
page, err := b.Flush()
156156
require.NoError(t, err)
157157
require.Equal(t, len(in), page.Desc.RowCount)
158-
require.Equal(t, len(in)-2, page.Desc.ValuesCount) // -2 for the empty strings
158+
require.Equal(t, len(in), page.Desc.ValuesCount)
159159

160160
t.Log("Uncompressed size: ", page.Desc.UncompressedSize)
161161
t.Log("Compressed size: ", page.Desc.CompressedSize)

pkg/dataobj/internal/dataset/reader_basic_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ func Test_basicReader_ReadColumns(t *testing.T) {
114114
if testPerson.middleName != "" {
115115
require.Equal(t, testPerson.middleName, string(row.Values[1].Binary()), "middle_name mismatch")
116116
} else {
117-
require.True(t, row.Values[1].IsNil(), "middle_name should be nil")
117+
require.True(t, row.Values[1].IsZero(), "middle_name should be nil")
118118
}
119119
require.Equal(t, testPerson.birthYear, row.Values[3].Int64(), "birth_year mismatch")
120120

@@ -212,7 +212,6 @@ func Test_partitionRows(t *testing.T) {
212212
require.Equal(t, tc.expect, actual)
213213
})
214214
}
215-
216215
}
217216

218217
func Test_basicReader_Reset(t *testing.T) {

pkg/dataobj/sections/logs/reader.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ func (r *Reader) Read(ctx context.Context, batchSize int) (arrow.Record, error)
179179
}
180180

181181
columnBuilder := builder.Field(columnIndex)
182+
columnType := r.opts.Columns[columnIndex].Type
182183

183184
if val.IsNil() {
184185
columnBuilder.AppendNull()
@@ -193,7 +194,6 @@ func (r *Reader) Read(ctx context.Context, batchSize int) (arrow.Record, error)
193194
// Passing our byte slices to [array.StringBuilder.BinaryBuilder.Append] are safe; it
194195
// will copy the contents of the value and we can reuse the buffer on the
195196
// next call to [dataset.Reader.Read].
196-
columnType := r.opts.Columns[columnIndex].Type
197197
switch columnType {
198198
case ColumnTypeInvalid:
199199
columnBuilder.AppendNull() // Unsupported column

pkg/dataobj/sections/logs/reader_test.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,18 +31,22 @@ func TestReader(t *testing.T) {
3131
{StreamID: 2, Timestamp: unixTime(30), Metadata: labels.FromStrings("trace_id", "123456"), Line: []byte("foo bar")},
3232
{StreamID: 1, Timestamp: unixTime(20), Metadata: labels.FromStrings("trace_id", "abcdef"), Line: []byte("goodbye, world!")},
3333
{StreamID: 1, Timestamp: unixTime(10), Metadata: labels.EmptyLabels(), Line: []byte("hello, world!")},
34+
{StreamID: 1, Timestamp: unixTime(5), Metadata: labels.FromStrings("trace_id", "abcdef", "foo", ""), Line: []byte("")},
3435
})
3536

3637
var (
3738
streamID = sec.Columns()[0]
38-
traceID = sec.Columns()[2]
39-
message = sec.Columns()[3]
39+
foo = sec.Columns()[2]
40+
traceID = sec.Columns()[3]
41+
message = sec.Columns()[4]
4042
)
4143

4244
require.Equal(t, "", streamID.Name)
4345
require.Equal(t, logs.ColumnTypeStreamID, streamID.Type)
4446
require.Equal(t, "trace_id", traceID.Name)
4547
require.Equal(t, logs.ColumnTypeMetadata, traceID.Type)
48+
require.Equal(t, "foo", foo.Name)
49+
require.Equal(t, logs.ColumnTypeMetadata, foo.Type)
4650
require.Equal(t, "", message.Name)
4751
require.Equal(t, logs.ColumnTypeMessage, message.Type)
4852

@@ -53,10 +57,11 @@ func TestReader(t *testing.T) {
5357
}{
5458
{
5559
name: "basic reads with predicate",
56-
columns: []*logs.Column{streamID, traceID, message},
60+
columns: []*logs.Column{streamID, traceID, foo, message},
5761
expected: arrowtest.Rows{
58-
{"stream_id.int64": int64(2), "trace_id.metadata.utf8": "123456", "message.utf8": "foo bar"},
59-
{"stream_id.int64": int64(1), "trace_id.metadata.utf8": "abcdef", "message.utf8": "goodbye, world!"},
62+
{"stream_id.int64": int64(2), "foo.metadata.utf8": nil, "trace_id.metadata.utf8": "123456", "message.utf8": "foo bar"},
63+
{"stream_id.int64": int64(1), "foo.metadata.utf8": nil, "trace_id.metadata.utf8": "abcdef", "message.utf8": "goodbye, world!"},
64+
{"stream_id.int64": int64(1), "foo.metadata.utf8": "", "trace_id.metadata.utf8": "abcdef", "message.utf8": ""},
6065
},
6166
},
6267
// tests that the reader evaluates predicates correctly even when predicate columns are not projected.
@@ -66,6 +71,7 @@ func TestReader(t *testing.T) {
6671
expected: arrowtest.Rows{
6772
{"stream_id.int64": int64(2), "message.utf8": "foo bar"},
6873
{"stream_id.int64": int64(1), "message.utf8": "goodbye, world!"},
74+
{"stream_id.int64": int64(1), "message.utf8": ""},
6975
},
7076
},
7177
} {
@@ -105,7 +111,6 @@ func TestReader(t *testing.T) {
105111
require.NoError(t, err, "failed to get rows from table")
106112
require.Equal(t, tt.expected, actual)
107113
})
108-
109114
}
110115
}
111116

0 commit comments

Comments
 (0)