Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 43 additions & 9 deletions src/functions/ducklake_compaction_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,15 +196,19 @@ struct DuckLakeCompactionCandidates {
};

struct DuckLakeCompactionGroup {
idx_t schema_version;
//! Unset when cross-schema merging is allowed, so files from different
//! schema_versions fall into the same bucket.
optional_idx schema_version;
optional_idx partition_id;
vector<string> partition_values;
};

struct DuckLakeCompactionGroupHash {
uint64_t operator()(const DuckLakeCompactionGroup &group) const {
uint64_t hash = 0;
hash ^= std::hash<idx_t>()(group.schema_version);
if (group.schema_version.IsValid()) {
hash ^= std::hash<idx_t>()(group.schema_version.GetIndex());
}
if (group.partition_id.IsValid()) {
hash ^= std::hash<idx_t>()(group.partition_id.GetIndex());
}
Expand Down Expand Up @@ -261,7 +265,14 @@ void DuckLakeCompactor::GenerateCompactions(DuckLakeTableEntry &table,
}
// construct the compaction group for this file - i.e. the set of candidate files we can compact it with
DuckLakeCompactionGroup group;
group.schema_version = candidate.schema_version;
if (!options.allow_cross_schema || type != CompactionType::MERGE_ADJACENT_TABLES) {
// by default, files written under different schema_versions must stay in
// separate groups. With allow_cross_schema we let them merge - the reader
// will project each file into the latest schema via its mapping_id.
// Cross-schema merging is only supported for MERGE_ADJACENT_TABLES; the
// REWRITE_DELETES path assumes the source and target schemas match.
group.schema_version = candidate.schema_version;
}
group.partition_id = candidate.file.partition_id;
group.partition_values = candidate.file.partition_values;

Expand Down Expand Up @@ -387,9 +398,20 @@ unique_ptr<LogicalOperator> DuckLakeCompactor::InsertSort(Binder &binder, unique

unique_ptr<LogicalOperator>
DuckLakeCompactor::GenerateCompactionCommand(vector<DuckLakeCompactionFileEntry> source_files) {
// get the table entry at the specified snapshot
auto snapshot_id = source_files[0].file.begin_snapshot;
DuckLakeSnapshot snapshot(snapshot_id, source_files[0].schema_version, 0, 0);
// Determine which snapshot to bind the scan + output schema against.
// Normally this is the schema_version that the source files were written
// under (they all share one, enforced by DuckLakeCompactionGroup). When
// allow_cross_schema is set for MERGE_ADJACENT_TABLES, the group may contain
// files from multiple schema_versions; in that case we bind against the
// latest snapshot so the merged output is written under the current schema,
// and let DuckLakeMultiFileReader project each source file via its
// per-file mapping_id.
const bool use_latest_snapshot =
options.allow_cross_schema && type == CompactionType::MERGE_ADJACENT_TABLES;
DuckLakeSnapshot snapshot = use_latest_snapshot
? transaction.GetSnapshot()
: DuckLakeSnapshot(source_files[0].file.begin_snapshot,
source_files[0].schema_version, 0, 0);

auto entry = catalog.GetEntryById(transaction, snapshot, table_id);
if (!entry) {
Expand Down Expand Up @@ -639,13 +661,14 @@ static void GenerateCompaction(ClientContext &context, DuckLakeTransaction &tran
DuckLakeCatalog &ducklake_catalog, TableFunctionBindInput &input,
DuckLakeTableEntry &cur_table, CompactionType type, double delete_threshold,
uint64_t max_files, optional_idx min_file_size, optional_idx max_file_size,
vector<unique_ptr<LogicalOperator>> &compactions) {
bool allow_cross_schema, vector<unique_ptr<LogicalOperator>> &compactions) {
switch (type) {
case CompactionType::MERGE_ADJACENT_TABLES: {
DuckLakeMergeAdjacentOptions options;
options.max_files = max_files;
options.min_file_size = min_file_size;
options.max_file_size = max_file_size;
options.allow_cross_schema = allow_cross_schema;
DuckLakeCompactor compactor(context, ducklake_catalog, transaction, *input.binder, cur_table.GetTableId(),
options);
compactor.GenerateCompactions(cur_table, compactions);
Expand Down Expand Up @@ -727,6 +750,15 @@ unique_ptr<LogicalOperator> BindCompaction(ClientContext &context, TableFunction
throw BinderException("The min_file_size must be less than max_file_size.");
}

bool allow_cross_schema = false;
auto allow_cross_schema_entry = input.named_parameters.find("allow_cross_schema");
if (allow_cross_schema_entry != input.named_parameters.end()) {
if (allow_cross_schema_entry->second.IsNull()) {
throw BinderException("The allow_cross_schema option must be a non-null boolean.");
}
allow_cross_schema = BooleanValue::Get(allow_cross_schema_entry->second);
}

if (input.inputs.size() == 1) {
// No default schema/table, we will perform rewrites on deletes in the whole database
auto schemas = ducklake_catalog.GetSchemas(context);
Expand All @@ -739,7 +771,8 @@ unique_ptr<LogicalOperator> BindCompaction(ClientContext &context, TableFunction
cur_table.GetTableId(), "true") == "true") {
auto delete_threshold = GetDeleteThreshold(&dl_cur_schema, cur_table, ducklake_catalog, input);
GenerateCompaction(context, transaction, ducklake_catalog, input, cur_table, type,
delete_threshold, max_files, min_file_size, max_file_size, compactions);
delete_threshold, max_files, min_file_size, max_file_size,
allow_cross_schema, compactions);
}
}
});
Expand Down Expand Up @@ -773,7 +806,7 @@ unique_ptr<LogicalOperator> BindCompaction(ClientContext &context, TableFunction
if (auto_compact) {
auto delete_threshold = GetDeleteThreshold(dl_schema, ducklake_table, ducklake_catalog, input);
GenerateCompaction(context, transaction, ducklake_catalog, input, ducklake_table, type, delete_threshold,
max_files, min_file_size, max_file_size, compactions);
max_files, min_file_size, max_file_size, allow_cross_schema, compactions);
}

return GenerateCompactionOperator(input, bind_index, compactions);
Expand All @@ -797,6 +830,7 @@ TableFunctionSet DuckLakeMergeAdjacentFilesFunction::GetFunctions() {
function.named_parameters["min_file_size"] = LogicalType::UBIGINT;
function.named_parameters["max_file_size"] = LogicalType::UBIGINT;
function.named_parameters["max_compacted_files"] = LogicalType::UBIGINT;
function.named_parameters["allow_cross_schema"] = LogicalType::BOOLEAN;
if (type.size() == 2) {
function.named_parameters["schema"] = LogicalType::VARCHAR;
}
Expand Down
4 changes: 4 additions & 0 deletions src/include/storage/ducklake_metadata_info.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,10 @@ struct DuckLakeMergeAdjacentOptions {
uint64_t max_files;
optional_idx min_file_size;
optional_idx max_file_size;
//! When true, files written under different schema_versions may be merged
//! together into the current schema. Relies on the per-file mapping_id +
//! DuckLakeMultiFileReader to project each source into the latest schema.
bool allow_cross_schema = false;
};

struct DuckLakeFileSizeOptions {
Expand Down
186 changes: 186 additions & 0 deletions test/sql/compaction/merge_adjacent_cross_schema.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
# name: test/sql/compaction/merge_adjacent_cross_schema.test
# description: test allow_cross_schema flag on ducklake_merge_adjacent_files
# group: [compaction]

require ducklake

require parquet

test-env DUCKLAKE_CONNECTION {TEST_DIR}/{UUID}.db

test-env DATA_PATH {TEST_DIR}

statement ok
ATTACH 'ducklake:{DUCKLAKE_CONNECTION}' AS ducklake (DATA_PATH '{DATA_PATH}/ducklake_cross_schema/')

statement ok
USE ducklake;

#
# Core scenario: ADD COLUMN between inserts. With allow_cross_schema => true,
# files written under both schemas merge into a single file under the latest
# schema. Old rows take NULL for the new column.
#

statement ok
CREATE TABLE t AS SELECT range AS id FROM range(5);

statement ok
CALL ducklake_flush_inlined_data('ducklake');

statement ok
INSERT INTO t VALUES (5), (6);

statement ok
CALL ducklake_flush_inlined_data('ducklake');

statement ok
ALTER TABLE t ADD COLUMN label VARCHAR;

statement ok
INSERT INTO t VALUES (7, 'seven'), (8, 'eight');

statement ok
CALL ducklake_flush_inlined_data('ducklake');

# three files exist, spanning two schema_versions
query I
SELECT COUNT(*) FROM ducklake_list_files('ducklake', 't')
----
3

# merge everything across the schema_version boundary
query III
SELECT table_name, files_processed, files_created FROM ducklake_merge_adjacent_files('ducklake', 't', allow_cross_schema => true)
----
t 3 1

statement ok
CALL ducklake_cleanup_old_files('ducklake', cleanup_all => true);

query I
SELECT COUNT(*) FROM ducklake_list_files('ducklake', 't')
----
1

# data integrity: old rows fill NULL on the new column, new rows keep their values
query IT
SELECT id, label FROM t ORDER BY id;
----
0 NULL
1 NULL
2 NULL
3 NULL
4 NULL
5 NULL
6 NULL
7 seven
8 eight

#
# Backward compat: without the flag, old + new schema files stay in separate
# groups and only same-schema groups merge. This is the current behavior.
#

statement ok
CREATE TABLE t_compat AS SELECT range AS id FROM range(3);

statement ok
CALL ducklake_flush_inlined_data('ducklake');

statement ok
INSERT INTO t_compat VALUES (3), (4);

statement ok
CALL ducklake_flush_inlined_data('ducklake');

statement ok
ALTER TABLE t_compat ADD COLUMN label VARCHAR;

statement ok
INSERT INTO t_compat VALUES (5, 'a'), (6, 'b');

statement ok
CALL ducklake_flush_inlined_data('ducklake');

statement ok
INSERT INTO t_compat VALUES (7, 'c'), (8, 'd');

statement ok
CALL ducklake_flush_inlined_data('ducklake');

# 4 files: 2 pre-ALTER + 2 post-ALTER
query I
SELECT COUNT(*) FROM ducklake_list_files('ducklake', 't_compat')
----
4

# default call: one merge per schema_version group (2 + 2 -> 1 + 1)
query III
SELECT table_name, files_processed, files_created FROM ducklake_merge_adjacent_files('ducklake', 't_compat') ORDER BY ALL
----
t_compat 2 1
t_compat 2 1

statement ok
CALL ducklake_cleanup_old_files('ducklake', cleanup_all => true);

# two files remain - one per schema_version
query I
SELECT COUNT(*) FROM ducklake_list_files('ducklake', 't_compat')
----
2

#
# Partition isolation: even with allow_cross_schema => true, files written
# under different partition specs must not be merged together. The grouping
# key still includes partition_id, and SET PARTITIONED BY mints a new one.
#

statement ok
CREATE TABLE t_part (id INTEGER, part INTEGER);

statement ok
ALTER TABLE t_part SET PARTITIONED BY (part);

statement ok
INSERT INTO t_part VALUES (1, 0), (2, 0);

statement ok
CALL ducklake_flush_inlined_data('ducklake');

statement ok
INSERT INTO t_part VALUES (3, 0), (4, 0);

statement ok
CALL ducklake_flush_inlined_data('ducklake');

# repartition - bumps partition_id (and schema_version)
statement ok
ALTER TABLE t_part SET PARTITIONED BY (id);

statement ok
INSERT INTO t_part VALUES (5, 0), (6, 0);

statement ok
CALL ducklake_flush_inlined_data('ducklake');

statement ok
INSERT INTO t_part VALUES (7, 0), (8, 0);

statement ok
CALL ducklake_flush_inlined_data('ducklake');

# with allow_cross_schema => true, the two partition specs still isolate:
# old spec has part=0 bucket with 2 files; new spec has 4 per-id buckets
# with 1 file each. Only the old-spec (part=0) bucket has >=2 files and merges.
query III
SELECT table_name, files_processed, files_created FROM ducklake_merge_adjacent_files('ducklake', 't_part', allow_cross_schema => true) ORDER BY ALL
----
t_part 2 1

# row count preserved
query I
SELECT COUNT(*) FROM t_part
----
8