diff --git a/src/v/iceberg/avroschemas/manifest_file.schema.json b/src/v/iceberg/avroschemas/manifest_file.schema.json index 6886935d3e5f6..15c21eeda50e2 100644 --- a/src/v/iceberg/avroschemas/manifest_file.schema.json +++ b/src/v/iceberg/avroschemas/manifest_file.schema.json @@ -45,19 +45,22 @@ "field-id": 503 }, { - "name": "added_data_files_count", + "name": "added_files_count", + "aliases": ["added_data_files_count"], "type": "int", "doc": "Added entry count", "field-id": 504 }, { - "name": "existing_data_files_count", + "name": "existing_files_count", + "aliases": ["existing_data_files_count"], "type": "int", "doc": "Existing entry count", "field-id": 505 }, { - "name": "deleted_data_files_count", + "name": "deleted_files_count", + "aliases": ["deleted_data_files_count"], "type": "int", "doc": "Deleted entry count", "field-id": 506 diff --git a/src/v/iceberg/manifest_list_avro.cc b/src/v/iceberg/manifest_list_avro.cc index 8696f857bb0a2..e12917efaf920 100644 --- a/src/v/iceberg/manifest_list_avro.cc +++ b/src/v/iceberg/manifest_list_avro.cc @@ -96,10 +96,9 @@ avrogen::manifest_file file_to_avro(const manifest_file& f) { ret.min_sequence_number = f.min_seq_number(); ret.added_snapshot_id = f.added_snapshot_id(); - ret.added_data_files_count = static_cast(f.added_files_count); - ret.existing_data_files_count = static_cast( - f.existing_files_count); - ret.deleted_data_files_count = static_cast(f.deleted_files_count); + ret.added_files_count = static_cast(f.added_files_count); + ret.existing_files_count = static_cast(f.existing_files_count); + ret.deleted_files_count = static_cast(f.deleted_files_count); ret.added_rows_count = static_cast(f.added_rows_count); ret.existing_rows_count = static_cast(f.existing_rows_count); @@ -128,9 +127,9 @@ manifest_file file_from_avro(const avrogen::manifest_file& f) { ret.min_seq_number = sequence_number{f.min_sequence_number}; ret.added_snapshot_id = snapshot_id{f.added_snapshot_id}; - ret.added_files_count = f.added_data_files_count; - ret.existing_files_count = f.existing_data_files_count; - ret.deleted_files_count = f.deleted_data_files_count; + ret.added_files_count = f.added_files_count; + ret.existing_files_count = f.existing_files_count; + ret.deleted_files_count = f.deleted_files_count; ret.added_rows_count = f.added_rows_count; ret.existing_rows_count = f.existing_rows_count; diff --git a/src/v/iceberg/tests/manifest_serialization_test.cc b/src/v/iceberg/tests/manifest_serialization_test.cc index 026403a81bcec..417381b4cc506 100644 --- a/src/v/iceberg/tests/manifest_serialization_test.cc +++ b/src/v/iceberg/tests/manifest_serialization_test.cc @@ -163,9 +163,9 @@ TEST(ManifestSerializationTest, TestManifestFile) { manifest.sequence_number = 3; manifest.min_sequence_number = 4; manifest.added_snapshot_id = 5; - manifest.added_data_files_count = 6; - manifest.existing_data_files_count = 7; - manifest.deleted_data_files_count = 8; + manifest.added_files_count = 6; + manifest.existing_files_count = 7; + manifest.deleted_files_count = 8; manifest.added_rows_count = 9; manifest.existing_rows_count = 10; manifest.deleted_rows_count = 11; @@ -198,12 +198,9 @@ TEST(ManifestSerializationTest, TestManifestFile) { EXPECT_EQ(manifest.sequence_number, dmanifest.sequence_number); EXPECT_EQ(manifest.min_sequence_number, dmanifest.min_sequence_number); EXPECT_EQ(manifest.added_snapshot_id, dmanifest.added_snapshot_id); - EXPECT_EQ( - manifest.added_data_files_count, dmanifest.added_data_files_count); - EXPECT_EQ( - manifest.existing_data_files_count, dmanifest.existing_data_files_count); - EXPECT_EQ( - manifest.deleted_data_files_count, dmanifest.deleted_data_files_count); + EXPECT_EQ(manifest.added_files_count, dmanifest.added_files_count); + EXPECT_EQ(manifest.existing_files_count, dmanifest.existing_files_count); + EXPECT_EQ(manifest.deleted_files_count, dmanifest.deleted_files_count); EXPECT_EQ(manifest.added_rows_count, dmanifest.added_rows_count); EXPECT_EQ(manifest.existing_rows_count, dmanifest.existing_rows_count); EXPECT_EQ(manifest.deleted_rows_count, dmanifest.deleted_rows_count); @@ -218,9 +215,9 @@ TEST(ManifestSerializationTest, TestManifestAvroReaderWriter) { manifest.sequence_number = 3; manifest.min_sequence_number = 4; manifest.added_snapshot_id = 5; - manifest.added_data_files_count = 6; - manifest.existing_data_files_count = 7; - manifest.deleted_data_files_count = 8; + manifest.added_files_count = 6; + manifest.existing_files_count = 7; + manifest.deleted_files_count = 8; manifest.added_rows_count = 9; manifest.existing_rows_count = 10; manifest.deleted_rows_count = 11; @@ -264,12 +261,9 @@ TEST(ManifestSerializationTest, TestManifestAvroReaderWriter) { EXPECT_EQ(manifest.sequence_number, dmanifest.sequence_number); EXPECT_EQ(manifest.min_sequence_number, dmanifest.min_sequence_number); EXPECT_EQ(manifest.added_snapshot_id, dmanifest.added_snapshot_id); - EXPECT_EQ( - manifest.added_data_files_count, dmanifest.added_data_files_count); - EXPECT_EQ( - manifest.existing_data_files_count, dmanifest.existing_data_files_count); - EXPECT_EQ( - manifest.deleted_data_files_count, dmanifest.deleted_data_files_count); + EXPECT_EQ(manifest.added_files_count, dmanifest.added_files_count); + EXPECT_EQ(manifest.existing_files_count, dmanifest.existing_files_count); + EXPECT_EQ(manifest.deleted_files_count, dmanifest.deleted_files_count); EXPECT_EQ(manifest.added_rows_count, dmanifest.added_rows_count); EXPECT_EQ(manifest.existing_rows_count, dmanifest.existing_rows_count); EXPECT_EQ(manifest.deleted_rows_count, dmanifest.deleted_rows_count); diff --git a/tests/rptest/services/spark_service.py b/tests/rptest/services/spark_service.py index 2e23a3d6ed722..28a5c48267b4e 100644 --- a/tests/rptest/services/spark_service.py +++ b/tests/rptest/services/spark_service.py @@ -142,6 +142,9 @@ def clean_node(self, node, **_): self.stop_node(node, allow_fail=True) node.account.remove(SparkService.LOGS_DIR, allow_fail=True) + def escape_identifier(self, table: str) -> str: + return f"`{table}`" + @staticmethod def engine_name(): return QueryEngineType.SPARK diff --git a/tests/rptest/services/trino_service.py b/tests/rptest/services/trino_service.py index 144d52b1185eb..3db4b4f357b2d 100644 --- a/tests/rptest/services/trino_service.py +++ b/tests/rptest/services/trino_service.py @@ -136,6 +136,9 @@ def make_client(self): port=self.trino_port, catalog="redpanda") + def escape_identifier(self, table: str) -> str: + return f'"{table}"' + @staticmethod def dict_to_conf(d: dict[str, Optional[str | bool]]): """ diff --git a/tests/rptest/tests/datalake/datalake_e2e_test.py b/tests/rptest/tests/datalake/datalake_e2e_test.py index baf47abfaf5e2..07f612dfc9a03 100644 --- a/tests/rptest/tests/datalake/datalake_e2e_test.py +++ b/tests/rptest/tests/datalake/datalake_e2e_test.py @@ -136,6 +136,31 @@ def test_avro_schema(self, cloud_storage_type, query_engine, assert spark_describe_out == spark_expected_out, str( spark_describe_out) + @cluster(num_nodes=4) + @matrix(cloud_storage_type=supported_storage_types()) + def test_upload_after_external_update(self, cloud_storage_type): + table_name = f"redpanda.{self.topic_name}" + with DatalakeServices(self.test_ctx, + redpanda=self.redpanda, + filesystem_catalog_mode=True, + include_query_engines=[QueryEngineType.SPARK + ]) as dl: + count = 100 + dl.create_iceberg_enabled_topic(self.topic_name, partitions=1) + dl.produce_to_topic(self.topic_name, 1024, count) + dl.wait_for_translation(self.topic_name, count) + spark = dl.spark() + spark.make_client().cursor().execute(f"delete from {table_name}") + count_after_del = spark.count_table("redpanda", self.topic_name) + assert count_after_del == 0, f"{count_after_del} rows, expected 0" + + dl.produce_to_topic(self.topic_name, 1024, count) + dl.wait_for_translation_until_offset(self.topic_name, + 2 * count - 1) + count_after_produce = spark.count_table("redpanda", + self.topic_name) + assert count_after_produce == count, f"{count_after_produce} rows, expected {count}" + @cluster(num_nodes=4) @matrix(cloud_storage_type=supported_storage_types(), filesystem_catalog_mode=[True, False]) diff --git a/tests/rptest/tests/datalake/datalake_services.py b/tests/rptest/tests/datalake/datalake_services.py index a5082d3f21d14..f5fcd40646592 100644 --- a/tests/rptest/tests/datalake/datalake_services.py +++ b/tests/rptest/tests/datalake/datalake_services.py @@ -150,13 +150,12 @@ def wait_for_translation_until_offset(self, timeout=30, backoff_sec=5): self.wait_for_iceberg_table("redpanda", topic, timeout, backoff_sec) - table_name = f"redpanda.{topic}" def translation_done(): offsets = dict( map( - lambda e: (e.engine_name(), - e.max_translated_offset(table_name, partition)), + lambda e: (e.engine_name( + ), e.max_translated_offset("redpanda", topic, partition)), self.query_engines)) self.redpanda.logger.debug( f"Current translated offsets: {offsets}") @@ -179,11 +178,12 @@ def wait_for_translation(self, timeout=30, backoff_sec=5): self.wait_for_iceberg_table("redpanda", topic, timeout, backoff_sec) - table_name = f"redpanda.{topic}" def translation_done(): counts = dict( - map(lambda e: (e.engine_name(), e.count_table(table_name)), + map( + lambda e: + (e.engine_name(), e.count_table("redpanda", topic)), self.query_engines)) self.redpanda.logger.debug(f"Current counts: {counts}") return all([c == msg_count for _, c in counts.items()]) diff --git a/tests/rptest/tests/datalake/query_engine_base.py b/tests/rptest/tests/datalake/query_engine_base.py index b3d701f245255..ff03c4ec44a28 100644 --- a/tests/rptest/tests/datalake/query_engine_base.py +++ b/tests/rptest/tests/datalake/query_engine_base.py @@ -42,16 +42,20 @@ def run_query(self, query): finally: client.close() + @abstractmethod + def escape_identifier(self, table: str) -> str: + raise NotImplementedError + def run_query_fetch_all(self, query): with self.run_query(query) as cursor: return cursor.fetchall() - def count_table(self, table) -> int: - query = f"select count(*) from {table}" + def count_table(self, namespace, table) -> int: + query = f"select count(*) from {namespace}.{self.escape_identifier(table)}" with self.run_query(query) as cursor: return cursor.fetchone()[0] - def max_translated_offset(self, table, partition) -> int: - query = f"select max(redpanda.offset) from {table} where redpanda.partition={partition}" + def max_translated_offset(self, namespace, table, partition) -> int: + query = f"select max(redpanda.offset) from {namespace}.{self.escape_identifier(table)} where redpanda.partition={partition}" with self.run_query(query) as cursor: return cursor.fetchone()[0]