diff --git a/src/functions/ducklake_compaction_functions.cpp b/src/functions/ducklake_compaction_functions.cpp index 16b3d14bfd3..0c68c8c15de 100644 --- a/src/functions/ducklake_compaction_functions.cpp +++ b/src/functions/ducklake_compaction_functions.cpp @@ -196,7 +196,9 @@ struct DuckLakeCompactionCandidates { }; struct DuckLakeCompactionGroup { - idx_t schema_version; + //! Unset when cross-schema merging is allowed, so files from different + //! schema_versions fall into the same bucket. + optional_idx schema_version; optional_idx partition_id; vector partition_values; }; @@ -204,7 +206,9 @@ struct DuckLakeCompactionGroup { struct DuckLakeCompactionGroupHash { uint64_t operator()(const DuckLakeCompactionGroup &group) const { uint64_t hash = 0; - hash ^= std::hash()(group.schema_version); + if (group.schema_version.IsValid()) { + hash ^= std::hash()(group.schema_version.GetIndex()); + } if (group.partition_id.IsValid()) { hash ^= std::hash()(group.partition_id.GetIndex()); } @@ -261,7 +265,14 @@ void DuckLakeCompactor::GenerateCompactions(DuckLakeTableEntry &table, } // construct the compaction group for this file - i.e. the set of candidate files we can compact it with DuckLakeCompactionGroup group; - group.schema_version = candidate.schema_version; + if (!options.allow_cross_schema || type != CompactionType::MERGE_ADJACENT_TABLES) { + // by default, files written under different schema_versions must stay in + // separate groups. With allow_cross_schema we let them merge - the reader + // will project each file into the latest schema via its mapping_id. + // Cross-schema merging is only supported for MERGE_ADJACENT_TABLES; the + // REWRITE_DELETES path assumes the source and target schemas match. + group.schema_version = candidate.schema_version; + } group.partition_id = candidate.file.partition_id; group.partition_values = candidate.file.partition_values; @@ -387,9 +398,20 @@ unique_ptr DuckLakeCompactor::InsertSort(Binder &binder, unique unique_ptr DuckLakeCompactor::GenerateCompactionCommand(vector source_files) { - // get the table entry at the specified snapshot - auto snapshot_id = source_files[0].file.begin_snapshot; - DuckLakeSnapshot snapshot(snapshot_id, source_files[0].schema_version, 0, 0); + // Determine which snapshot to bind the scan + output schema against. + // Normally this is the schema_version that the source files were written + // under (they all share one, enforced by DuckLakeCompactionGroup). When + // allow_cross_schema is set for MERGE_ADJACENT_TABLES, the group may contain + // files from multiple schema_versions; in that case we bind against the + // latest snapshot so the merged output is written under the current schema, + // and let DuckLakeMultiFileReader project each source file via its + // per-file mapping_id. + const bool use_latest_snapshot = + options.allow_cross_schema && type == CompactionType::MERGE_ADJACENT_TABLES; + DuckLakeSnapshot snapshot = use_latest_snapshot + ? transaction.GetSnapshot() + : DuckLakeSnapshot(source_files[0].file.begin_snapshot, + source_files[0].schema_version, 0, 0); auto entry = catalog.GetEntryById(transaction, snapshot, table_id); if (!entry) { @@ -639,13 +661,14 @@ static void GenerateCompaction(ClientContext &context, DuckLakeTransaction &tran DuckLakeCatalog &ducklake_catalog, TableFunctionBindInput &input, DuckLakeTableEntry &cur_table, CompactionType type, double delete_threshold, uint64_t max_files, optional_idx min_file_size, optional_idx max_file_size, - vector> &compactions) { + bool allow_cross_schema, vector> &compactions) { switch (type) { case CompactionType::MERGE_ADJACENT_TABLES: { DuckLakeMergeAdjacentOptions options; options.max_files = max_files; options.min_file_size = min_file_size; options.max_file_size = max_file_size; + options.allow_cross_schema = allow_cross_schema; DuckLakeCompactor compactor(context, ducklake_catalog, transaction, *input.binder, cur_table.GetTableId(), options); compactor.GenerateCompactions(cur_table, compactions); @@ -727,6 +750,15 @@ unique_ptr BindCompaction(ClientContext &context, TableFunction throw BinderException("The min_file_size must be less than max_file_size."); } + bool allow_cross_schema = false; + auto allow_cross_schema_entry = input.named_parameters.find("allow_cross_schema"); + if (allow_cross_schema_entry != input.named_parameters.end()) { + if (allow_cross_schema_entry->second.IsNull()) { + throw BinderException("The allow_cross_schema option must be a non-null boolean."); + } + allow_cross_schema = BooleanValue::Get(allow_cross_schema_entry->second); + } + if (input.inputs.size() == 1) { // No default schema/table, we will perform rewrites on deletes in the whole database auto schemas = ducklake_catalog.GetSchemas(context); @@ -739,7 +771,8 @@ unique_ptr BindCompaction(ClientContext &context, TableFunction cur_table.GetTableId(), "true") == "true") { auto delete_threshold = GetDeleteThreshold(&dl_cur_schema, cur_table, ducklake_catalog, input); GenerateCompaction(context, transaction, ducklake_catalog, input, cur_table, type, - delete_threshold, max_files, min_file_size, max_file_size, compactions); + delete_threshold, max_files, min_file_size, max_file_size, + allow_cross_schema, compactions); } } }); @@ -773,7 +806,7 @@ unique_ptr BindCompaction(ClientContext &context, TableFunction if (auto_compact) { auto delete_threshold = GetDeleteThreshold(dl_schema, ducklake_table, ducklake_catalog, input); GenerateCompaction(context, transaction, ducklake_catalog, input, ducklake_table, type, delete_threshold, - max_files, min_file_size, max_file_size, compactions); + max_files, min_file_size, max_file_size, allow_cross_schema, compactions); } return GenerateCompactionOperator(input, bind_index, compactions); @@ -797,6 +830,7 @@ TableFunctionSet DuckLakeMergeAdjacentFilesFunction::GetFunctions() { function.named_parameters["min_file_size"] = LogicalType::UBIGINT; function.named_parameters["max_file_size"] = LogicalType::UBIGINT; function.named_parameters["max_compacted_files"] = LogicalType::UBIGINT; + function.named_parameters["allow_cross_schema"] = LogicalType::BOOLEAN; if (type.size() == 2) { function.named_parameters["schema"] = LogicalType::VARCHAR; } diff --git a/src/include/storage/ducklake_metadata_info.hpp b/src/include/storage/ducklake_metadata_info.hpp index 6c755374911..f847c5ecb6a 100644 --- a/src/include/storage/ducklake_metadata_info.hpp +++ b/src/include/storage/ducklake_metadata_info.hpp @@ -482,6 +482,10 @@ struct DuckLakeMergeAdjacentOptions { uint64_t max_files; optional_idx min_file_size; optional_idx max_file_size; + //! When true, files written under different schema_versions may be merged + //! together into the current schema. Relies on the per-file mapping_id + + //! DuckLakeMultiFileReader to project each source into the latest schema. + bool allow_cross_schema = false; }; struct DuckLakeFileSizeOptions { diff --git a/test/sql/compaction/merge_adjacent_cross_schema.test b/test/sql/compaction/merge_adjacent_cross_schema.test new file mode 100644 index 00000000000..da4a448c3d1 --- /dev/null +++ b/test/sql/compaction/merge_adjacent_cross_schema.test @@ -0,0 +1,186 @@ +# name: test/sql/compaction/merge_adjacent_cross_schema.test +# description: test allow_cross_schema flag on ducklake_merge_adjacent_files +# group: [compaction] + +require ducklake + +require parquet + +test-env DUCKLAKE_CONNECTION {TEST_DIR}/{UUID}.db + +test-env DATA_PATH {TEST_DIR} + +statement ok +ATTACH 'ducklake:{DUCKLAKE_CONNECTION}' AS ducklake (DATA_PATH '{DATA_PATH}/ducklake_cross_schema/') + +statement ok +USE ducklake; + +# +# Core scenario: ADD COLUMN between inserts. With allow_cross_schema => true, +# files written under both schemas merge into a single file under the latest +# schema. Old rows take NULL for the new column. +# + +statement ok +CREATE TABLE t AS SELECT range AS id FROM range(5); + +statement ok +CALL ducklake_flush_inlined_data('ducklake'); + +statement ok +INSERT INTO t VALUES (5), (6); + +statement ok +CALL ducklake_flush_inlined_data('ducklake'); + +statement ok +ALTER TABLE t ADD COLUMN label VARCHAR; + +statement ok +INSERT INTO t VALUES (7, 'seven'), (8, 'eight'); + +statement ok +CALL ducklake_flush_inlined_data('ducklake'); + +# three files exist, spanning two schema_versions +query I +SELECT COUNT(*) FROM ducklake_list_files('ducklake', 't') +---- +3 + +# merge everything across the schema_version boundary +query III +SELECT table_name, files_processed, files_created FROM ducklake_merge_adjacent_files('ducklake', 't', allow_cross_schema => true) +---- +t 3 1 + +statement ok +CALL ducklake_cleanup_old_files('ducklake', cleanup_all => true); + +query I +SELECT COUNT(*) FROM ducklake_list_files('ducklake', 't') +---- +1 + +# data integrity: old rows fill NULL on the new column, new rows keep their values +query IT +SELECT id, label FROM t ORDER BY id; +---- +0 NULL +1 NULL +2 NULL +3 NULL +4 NULL +5 NULL +6 NULL +7 seven +8 eight + +# +# Backward compat: without the flag, old + new schema files stay in separate +# groups and only same-schema groups merge. This is the current behavior. +# + +statement ok +CREATE TABLE t_compat AS SELECT range AS id FROM range(3); + +statement ok +CALL ducklake_flush_inlined_data('ducklake'); + +statement ok +INSERT INTO t_compat VALUES (3), (4); + +statement ok +CALL ducklake_flush_inlined_data('ducklake'); + +statement ok +ALTER TABLE t_compat ADD COLUMN label VARCHAR; + +statement ok +INSERT INTO t_compat VALUES (5, 'a'), (6, 'b'); + +statement ok +CALL ducklake_flush_inlined_data('ducklake'); + +statement ok +INSERT INTO t_compat VALUES (7, 'c'), (8, 'd'); + +statement ok +CALL ducklake_flush_inlined_data('ducklake'); + +# 4 files: 2 pre-ALTER + 2 post-ALTER +query I +SELECT COUNT(*) FROM ducklake_list_files('ducklake', 't_compat') +---- +4 + +# default call: one merge per schema_version group (2 + 2 -> 1 + 1) +query III +SELECT table_name, files_processed, files_created FROM ducklake_merge_adjacent_files('ducklake', 't_compat') ORDER BY ALL +---- +t_compat 2 1 +t_compat 2 1 + +statement ok +CALL ducklake_cleanup_old_files('ducklake', cleanup_all => true); + +# two files remain - one per schema_version +query I +SELECT COUNT(*) FROM ducklake_list_files('ducklake', 't_compat') +---- +2 + +# +# Partition isolation: even with allow_cross_schema => true, files written +# under different partition specs must not be merged together. The grouping +# key still includes partition_id, and SET PARTITIONED BY mints a new one. +# + +statement ok +CREATE TABLE t_part (id INTEGER, part INTEGER); + +statement ok +ALTER TABLE t_part SET PARTITIONED BY (part); + +statement ok +INSERT INTO t_part VALUES (1, 0), (2, 0); + +statement ok +CALL ducklake_flush_inlined_data('ducklake'); + +statement ok +INSERT INTO t_part VALUES (3, 0), (4, 0); + +statement ok +CALL ducklake_flush_inlined_data('ducklake'); + +# repartition - bumps partition_id (and schema_version) +statement ok +ALTER TABLE t_part SET PARTITIONED BY (id); + +statement ok +INSERT INTO t_part VALUES (5, 0), (6, 0); + +statement ok +CALL ducklake_flush_inlined_data('ducklake'); + +statement ok +INSERT INTO t_part VALUES (7, 0), (8, 0); + +statement ok +CALL ducklake_flush_inlined_data('ducklake'); + +# with allow_cross_schema => true, the two partition specs still isolate: +# old spec has part=0 bucket with 2 files; new spec has 4 per-id buckets +# with 1 file each. Only the old-spec (part=0) bucket has >=2 files and merges. +query III +SELECT table_name, files_processed, files_created FROM ducklake_merge_adjacent_files('ducklake', 't_part', allow_cross_schema => true) ORDER BY ALL +---- +t_part 2 1 + +# row count preserved +query I +SELECT COUNT(*) FROM t_part +---- +8