Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

iceberg: fix spec inconsistency in manifest list files_count #24602

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/v/iceberg/avroschemas/manifest_file.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,22 @@
"field-id": 503
},
{
"name": "added_data_files_count",
"name": "added_files_count",
"aliases": ["added_data_files_count"],
"type": "int",
"doc": "Added entry count",
"field-id": 504
},
{
"name": "existing_data_files_count",
"name": "existing_files_count",
"aliases": ["existing_data_files_count"],
"type": "int",
"doc": "Existing entry count",
"field-id": 505
},
{
"name": "deleted_data_files_count",
"name": "deleted_files_count",
"aliases": ["deleted_data_files_count"],
"type": "int",
"doc": "Deleted entry count",
"field-id": 506
Expand Down
13 changes: 6 additions & 7 deletions src/v/iceberg/manifest_list_avro.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,9 @@ avrogen::manifest_file file_to_avro(const manifest_file& f) {
ret.min_sequence_number = f.min_seq_number();
ret.added_snapshot_id = f.added_snapshot_id();

ret.added_data_files_count = static_cast<int32_t>(f.added_files_count);
ret.existing_data_files_count = static_cast<int32_t>(
f.existing_files_count);
ret.deleted_data_files_count = static_cast<int32_t>(f.deleted_files_count);
ret.added_files_count = static_cast<int32_t>(f.added_files_count);
ret.existing_files_count = static_cast<int32_t>(f.existing_files_count);
ret.deleted_files_count = static_cast<int32_t>(f.deleted_files_count);

ret.added_rows_count = static_cast<int32_t>(f.added_rows_count);
ret.existing_rows_count = static_cast<int32_t>(f.existing_rows_count);
Expand Down Expand Up @@ -128,9 +127,9 @@ manifest_file file_from_avro(const avrogen::manifest_file& f) {
ret.min_seq_number = sequence_number{f.min_sequence_number};
ret.added_snapshot_id = snapshot_id{f.added_snapshot_id};

ret.added_files_count = f.added_data_files_count;
ret.existing_files_count = f.existing_data_files_count;
ret.deleted_files_count = f.deleted_data_files_count;
ret.added_files_count = f.added_files_count;
ret.existing_files_count = f.existing_files_count;
ret.deleted_files_count = f.deleted_files_count;

ret.added_rows_count = f.added_rows_count;
ret.existing_rows_count = f.existing_rows_count;
Expand Down
30 changes: 12 additions & 18 deletions src/v/iceberg/tests/manifest_serialization_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,9 @@ TEST(ManifestSerializationTest, TestManifestFile) {
manifest.sequence_number = 3;
manifest.min_sequence_number = 4;
manifest.added_snapshot_id = 5;
manifest.added_data_files_count = 6;
manifest.existing_data_files_count = 7;
manifest.deleted_data_files_count = 8;
manifest.added_files_count = 6;
manifest.existing_files_count = 7;
manifest.deleted_files_count = 8;
manifest.added_rows_count = 9;
manifest.existing_rows_count = 10;
manifest.deleted_rows_count = 11;
Expand Down Expand Up @@ -198,12 +198,9 @@ TEST(ManifestSerializationTest, TestManifestFile) {
EXPECT_EQ(manifest.sequence_number, dmanifest.sequence_number);
EXPECT_EQ(manifest.min_sequence_number, dmanifest.min_sequence_number);
EXPECT_EQ(manifest.added_snapshot_id, dmanifest.added_snapshot_id);
EXPECT_EQ(
manifest.added_data_files_count, dmanifest.added_data_files_count);
EXPECT_EQ(
manifest.existing_data_files_count, dmanifest.existing_data_files_count);
EXPECT_EQ(
manifest.deleted_data_files_count, dmanifest.deleted_data_files_count);
EXPECT_EQ(manifest.added_files_count, dmanifest.added_files_count);
EXPECT_EQ(manifest.existing_files_count, dmanifest.existing_files_count);
EXPECT_EQ(manifest.deleted_files_count, dmanifest.deleted_files_count);
EXPECT_EQ(manifest.added_rows_count, dmanifest.added_rows_count);
EXPECT_EQ(manifest.existing_rows_count, dmanifest.existing_rows_count);
EXPECT_EQ(manifest.deleted_rows_count, dmanifest.deleted_rows_count);
Expand All @@ -218,9 +215,9 @@ TEST(ManifestSerializationTest, TestManifestAvroReaderWriter) {
manifest.sequence_number = 3;
manifest.min_sequence_number = 4;
manifest.added_snapshot_id = 5;
manifest.added_data_files_count = 6;
manifest.existing_data_files_count = 7;
manifest.deleted_data_files_count = 8;
manifest.added_files_count = 6;
manifest.existing_files_count = 7;
manifest.deleted_files_count = 8;
manifest.added_rows_count = 9;
manifest.existing_rows_count = 10;
manifest.deleted_rows_count = 11;
Expand Down Expand Up @@ -264,12 +261,9 @@ TEST(ManifestSerializationTest, TestManifestAvroReaderWriter) {
EXPECT_EQ(manifest.sequence_number, dmanifest.sequence_number);
EXPECT_EQ(manifest.min_sequence_number, dmanifest.min_sequence_number);
EXPECT_EQ(manifest.added_snapshot_id, dmanifest.added_snapshot_id);
EXPECT_EQ(
manifest.added_data_files_count, dmanifest.added_data_files_count);
EXPECT_EQ(
manifest.existing_data_files_count, dmanifest.existing_data_files_count);
EXPECT_EQ(
manifest.deleted_data_files_count, dmanifest.deleted_data_files_count);
EXPECT_EQ(manifest.added_files_count, dmanifest.added_files_count);
EXPECT_EQ(manifest.existing_files_count, dmanifest.existing_files_count);
EXPECT_EQ(manifest.deleted_files_count, dmanifest.deleted_files_count);
EXPECT_EQ(manifest.added_rows_count, dmanifest.added_rows_count);
EXPECT_EQ(manifest.existing_rows_count, dmanifest.existing_rows_count);
EXPECT_EQ(manifest.deleted_rows_count, dmanifest.deleted_rows_count);
Expand Down
25 changes: 25 additions & 0 deletions tests/rptest/tests/datalake/datalake_e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,31 @@ def test_avro_schema(self, cloud_storage_type, query_engine):
assert spark_describe_out == spark_expected_out, str(
spark_describe_out)

@cluster(num_nodes=4)
@matrix(cloud_storage_type=supported_storage_types())
def test_upload_after_external_update(self, cloud_storage_type):
table_name = f"redpanda.{self.topic_name}"
with DatalakeServices(self.test_ctx,
redpanda=self.redpanda,
filesystem_catalog_mode=True,
include_query_engines=[QueryEngineType.SPARK
]) as dl:
count = 100
dl.create_iceberg_enabled_topic(self.topic_name, partitions=1)
dl.produce_to_topic(self.topic_name, 1024, count)
dl.wait_for_translation(self.topic_name, count)
spark = dl.spark()
spark.make_client().cursor().execute(f"delete from {table_name}")
count_after_del = spark.count_table("redpanda", self.topic_name)
assert count_after_del == 0, f"{count_after_del} rows, expected 0"

dl.produce_to_topic(self.topic_name, 1024, count)
dl.wait_for_translation_until_offset(self.topic_name,
2 * count - 1)
count_after_produce = spark.count_table("redpanda",
self.topic_name)
assert count_after_produce == count, f"{count_after_produce} rows, expected {count}"

@cluster(num_nodes=4)
@matrix(cloud_storage_type=supported_storage_types(),
filesystem_catalog_mode=[True, False])
Expand Down
69 changes: 69 additions & 0 deletions tests/rptest/tests/datalake/datalake_upgrade_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Copyright 2024 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0
from rptest.services.cluster import cluster

from rptest.services.redpanda import SISettings
from rptest.utils.mode_checks import skip_debug_mode
from rptest.tests.redpanda_test import RedpandaTest
from rptest.tests.datalake.datalake_services import DatalakeServices
from rptest.tests.datalake.query_engine_base import QueryEngineType
from rptest.tests.datalake.utils import supported_storage_types
from ducktape.mark import matrix


class DatalakeUpgradeTest(RedpandaTest):
def __init__(self, test_context):
super(DatalakeUpgradeTest,
self).__init__(test_context,
num_brokers=3,
si_settings=SISettings(test_context=test_context),
extra_rp_conf={
"iceberg_enabled": "true",
"iceberg_catalog_commit_interval_ms": 5000
})
self.test_ctx = test_context
self.topic_name = "upgrade_topic"

# Initial version that supported Iceberg.
self.initial_version = (24, 3)

def setUp(self):
self.redpanda._installer.install(self.redpanda.nodes,
self.initial_version)

@cluster(num_nodes=6)
@skip_debug_mode
@matrix(cloud_storage_type=supported_storage_types(),
query_engine=[QueryEngineType.SPARK])
def test_upload_through_upgrade(self, cloud_storage_type, query_engine):
"""
Test that Iceberg translation can progress through different versions
of Redpanda (e.g. ensuring that data format changes or additional
Iceberg fields don't block progress).
"""
total_count = 0
versions = self.load_version_range(self.initial_version)[1:]
with DatalakeServices(self.test_ctx,
redpanda=self.redpanda,
filesystem_catalog_mode=True,
include_query_engines=[query_engine]) as dl:
dl.create_iceberg_enabled_topic(self.topic_name, partitions=10)

def run_workload():
nonlocal total_count
count = 100
dl.produce_to_topic(self.topic_name, 1024, msg_count=count)
total_count += count
dl.wait_for_translation(self.topic_name, msg_count=total_count)

versions = self.load_version_range(self.initial_version)
for v in self.upgrade_through_versions(versions_in=versions,
already_running=True):
self.logger.info(f"Updated to {v}")
run_workload()
Loading