Skip to content

Commit 8b6ec45

Browse files
authored
Merge pull request #24602 from andrwng/datalake-manifest-list-fix
iceberg: fix spec inconsistency in manifest list files_count
2 parents 1c2f495 + 59cc2c3 commit 8b6ec45

File tree

5 files changed

+118
-28
lines changed

5 files changed

+118
-28
lines changed

src/v/iceberg/avroschemas/manifest_file.schema.json

+6-3
Original file line numberDiff line numberDiff line change
@@ -45,19 +45,22 @@
4545
"field-id": 503
4646
},
4747
{
48-
"name": "added_data_files_count",
48+
"name": "added_files_count",
49+
"aliases": ["added_data_files_count"],
4950
"type": "int",
5051
"doc": "Added entry count",
5152
"field-id": 504
5253
},
5354
{
54-
"name": "existing_data_files_count",
55+
"name": "existing_files_count",
56+
"aliases": ["existing_data_files_count"],
5557
"type": "int",
5658
"doc": "Existing entry count",
5759
"field-id": 505
5860
},
5961
{
60-
"name": "deleted_data_files_count",
62+
"name": "deleted_files_count",
63+
"aliases": ["deleted_data_files_count"],
6164
"type": "int",
6265
"doc": "Deleted entry count",
6366
"field-id": 506

src/v/iceberg/manifest_list_avro.cc

+6-7
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,9 @@ avrogen::manifest_file file_to_avro(const manifest_file& f) {
9696
ret.min_sequence_number = f.min_seq_number();
9797
ret.added_snapshot_id = f.added_snapshot_id();
9898

99-
ret.added_data_files_count = static_cast<int32_t>(f.added_files_count);
100-
ret.existing_data_files_count = static_cast<int32_t>(
101-
f.existing_files_count);
102-
ret.deleted_data_files_count = static_cast<int32_t>(f.deleted_files_count);
99+
ret.added_files_count = static_cast<int32_t>(f.added_files_count);
100+
ret.existing_files_count = static_cast<int32_t>(f.existing_files_count);
101+
ret.deleted_files_count = static_cast<int32_t>(f.deleted_files_count);
103102

104103
ret.added_rows_count = static_cast<int32_t>(f.added_rows_count);
105104
ret.existing_rows_count = static_cast<int32_t>(f.existing_rows_count);
@@ -128,9 +127,9 @@ manifest_file file_from_avro(const avrogen::manifest_file& f) {
128127
ret.min_seq_number = sequence_number{f.min_sequence_number};
129128
ret.added_snapshot_id = snapshot_id{f.added_snapshot_id};
130129

131-
ret.added_files_count = f.added_data_files_count;
132-
ret.existing_files_count = f.existing_data_files_count;
133-
ret.deleted_files_count = f.deleted_data_files_count;
130+
ret.added_files_count = f.added_files_count;
131+
ret.existing_files_count = f.existing_files_count;
132+
ret.deleted_files_count = f.deleted_files_count;
134133

135134
ret.added_rows_count = f.added_rows_count;
136135
ret.existing_rows_count = f.existing_rows_count;

src/v/iceberg/tests/manifest_serialization_test.cc

+12-18
Original file line numberDiff line numberDiff line change
@@ -163,9 +163,9 @@ TEST(ManifestSerializationTest, TestManifestFile) {
163163
manifest.sequence_number = 3;
164164
manifest.min_sequence_number = 4;
165165
manifest.added_snapshot_id = 5;
166-
manifest.added_data_files_count = 6;
167-
manifest.existing_data_files_count = 7;
168-
manifest.deleted_data_files_count = 8;
166+
manifest.added_files_count = 6;
167+
manifest.existing_files_count = 7;
168+
manifest.deleted_files_count = 8;
169169
manifest.added_rows_count = 9;
170170
manifest.existing_rows_count = 10;
171171
manifest.deleted_rows_count = 11;
@@ -198,12 +198,9 @@ TEST(ManifestSerializationTest, TestManifestFile) {
198198
EXPECT_EQ(manifest.sequence_number, dmanifest.sequence_number);
199199
EXPECT_EQ(manifest.min_sequence_number, dmanifest.min_sequence_number);
200200
EXPECT_EQ(manifest.added_snapshot_id, dmanifest.added_snapshot_id);
201-
EXPECT_EQ(
202-
manifest.added_data_files_count, dmanifest.added_data_files_count);
203-
EXPECT_EQ(
204-
manifest.existing_data_files_count, dmanifest.existing_data_files_count);
205-
EXPECT_EQ(
206-
manifest.deleted_data_files_count, dmanifest.deleted_data_files_count);
201+
EXPECT_EQ(manifest.added_files_count, dmanifest.added_files_count);
202+
EXPECT_EQ(manifest.existing_files_count, dmanifest.existing_files_count);
203+
EXPECT_EQ(manifest.deleted_files_count, dmanifest.deleted_files_count);
207204
EXPECT_EQ(manifest.added_rows_count, dmanifest.added_rows_count);
208205
EXPECT_EQ(manifest.existing_rows_count, dmanifest.existing_rows_count);
209206
EXPECT_EQ(manifest.deleted_rows_count, dmanifest.deleted_rows_count);
@@ -218,9 +215,9 @@ TEST(ManifestSerializationTest, TestManifestAvroReaderWriter) {
218215
manifest.sequence_number = 3;
219216
manifest.min_sequence_number = 4;
220217
manifest.added_snapshot_id = 5;
221-
manifest.added_data_files_count = 6;
222-
manifest.existing_data_files_count = 7;
223-
manifest.deleted_data_files_count = 8;
218+
manifest.added_files_count = 6;
219+
manifest.existing_files_count = 7;
220+
manifest.deleted_files_count = 8;
224221
manifest.added_rows_count = 9;
225222
manifest.existing_rows_count = 10;
226223
manifest.deleted_rows_count = 11;
@@ -264,12 +261,9 @@ TEST(ManifestSerializationTest, TestManifestAvroReaderWriter) {
264261
EXPECT_EQ(manifest.sequence_number, dmanifest.sequence_number);
265262
EXPECT_EQ(manifest.min_sequence_number, dmanifest.min_sequence_number);
266263
EXPECT_EQ(manifest.added_snapshot_id, dmanifest.added_snapshot_id);
267-
EXPECT_EQ(
268-
manifest.added_data_files_count, dmanifest.added_data_files_count);
269-
EXPECT_EQ(
270-
manifest.existing_data_files_count, dmanifest.existing_data_files_count);
271-
EXPECT_EQ(
272-
manifest.deleted_data_files_count, dmanifest.deleted_data_files_count);
264+
EXPECT_EQ(manifest.added_files_count, dmanifest.added_files_count);
265+
EXPECT_EQ(manifest.existing_files_count, dmanifest.existing_files_count);
266+
EXPECT_EQ(manifest.deleted_files_count, dmanifest.deleted_files_count);
273267
EXPECT_EQ(manifest.added_rows_count, dmanifest.added_rows_count);
274268
EXPECT_EQ(manifest.existing_rows_count, dmanifest.existing_rows_count);
275269
EXPECT_EQ(manifest.deleted_rows_count, dmanifest.deleted_rows_count);

tests/rptest/tests/datalake/datalake_e2e_test.py

+25
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,31 @@ def test_avro_schema(self, cloud_storage_type, query_engine):
134134
assert spark_describe_out == spark_expected_out, str(
135135
spark_describe_out)
136136

137+
@cluster(num_nodes=4)
138+
@matrix(cloud_storage_type=supported_storage_types())
139+
def test_upload_after_external_update(self, cloud_storage_type):
140+
table_name = f"redpanda.{self.topic_name}"
141+
with DatalakeServices(self.test_ctx,
142+
redpanda=self.redpanda,
143+
filesystem_catalog_mode=True,
144+
include_query_engines=[QueryEngineType.SPARK
145+
]) as dl:
146+
count = 100
147+
dl.create_iceberg_enabled_topic(self.topic_name, partitions=1)
148+
dl.produce_to_topic(self.topic_name, 1024, count)
149+
dl.wait_for_translation(self.topic_name, count)
150+
spark = dl.spark()
151+
spark.make_client().cursor().execute(f"delete from {table_name}")
152+
count_after_del = spark.count_table("redpanda", self.topic_name)
153+
assert count_after_del == 0, f"{count_after_del} rows, expected 0"
154+
155+
dl.produce_to_topic(self.topic_name, 1024, count)
156+
dl.wait_for_translation_until_offset(self.topic_name,
157+
2 * count - 1)
158+
count_after_produce = spark.count_table("redpanda",
159+
self.topic_name)
160+
assert count_after_produce == count, f"{count_after_produce} rows, expected {count}"
161+
137162
@cluster(num_nodes=4)
138163
@matrix(cloud_storage_type=supported_storage_types(),
139164
filesystem_catalog_mode=[True, False])
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
# Copyright 2024 Redpanda Data, Inc.
2+
#
3+
# Use of this software is governed by the Business Source License
4+
# included in the file licenses/BSL.md
5+
#
6+
# As of the Change Date specified in that file, in accordance with
7+
# the Business Source License, use of this software will be governed
8+
# by the Apache License, Version 2.0
9+
from rptest.services.cluster import cluster
10+
11+
from rptest.services.redpanda import SISettings
12+
from rptest.utils.mode_checks import skip_debug_mode
13+
from rptest.tests.redpanda_test import RedpandaTest
14+
from rptest.tests.datalake.datalake_services import DatalakeServices
15+
from rptest.tests.datalake.query_engine_base import QueryEngineType
16+
from rptest.tests.datalake.utils import supported_storage_types
17+
from ducktape.mark import matrix
18+
19+
20+
class DatalakeUpgradeTest(RedpandaTest):
21+
def __init__(self, test_context):
22+
super(DatalakeUpgradeTest,
23+
self).__init__(test_context,
24+
num_brokers=3,
25+
si_settings=SISettings(test_context=test_context),
26+
extra_rp_conf={
27+
"iceberg_enabled": "true",
28+
"iceberg_catalog_commit_interval_ms": 5000
29+
})
30+
self.test_ctx = test_context
31+
self.topic_name = "upgrade_topic"
32+
33+
# Initial version that supported Iceberg.
34+
self.initial_version = (24, 3)
35+
36+
def setUp(self):
37+
self.redpanda._installer.install(self.redpanda.nodes,
38+
self.initial_version)
39+
40+
@cluster(num_nodes=6)
41+
@skip_debug_mode
42+
@matrix(cloud_storage_type=supported_storage_types(),
43+
query_engine=[QueryEngineType.SPARK])
44+
def test_upload_through_upgrade(self, cloud_storage_type, query_engine):
45+
"""
46+
Test that Iceberg translation can progress through different versions
47+
of Redpanda (e.g. ensuring that data format changes or additional
48+
Iceberg fields don't block progress).
49+
"""
50+
total_count = 0
51+
versions = self.load_version_range(self.initial_version)[1:]
52+
with DatalakeServices(self.test_ctx,
53+
redpanda=self.redpanda,
54+
filesystem_catalog_mode=True,
55+
include_query_engines=[query_engine]) as dl:
56+
dl.create_iceberg_enabled_topic(self.topic_name, partitions=10)
57+
58+
def run_workload():
59+
nonlocal total_count
60+
count = 100
61+
dl.produce_to_topic(self.topic_name, 1024, msg_count=count)
62+
total_count += count
63+
dl.wait_for_translation(self.topic_name, msg_count=total_count)
64+
65+
versions = self.load_version_range(self.initial_version)
66+
for v in self.upgrade_through_versions(versions_in=versions,
67+
already_running=True):
68+
self.logger.info(f"Updated to {v}")
69+
run_workload()

0 commit comments

Comments
 (0)