Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/functions/ducklake_compaction_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,10 @@ void DuckLakeCompactor::GenerateCompactions(DuckLakeTableEntry &table,
// (does not apply to REWRITE_DELETES - delete files must be rewritten regardless of data file size)
continue;
}
if ((!candidate.delete_files.empty() && type == CompactionType::MERGE_ADJACENT_TABLES) ||
candidate.file.end_snapshot.IsValid() || candidate.has_inlined_deletions) {
// Merge Adjacent Tables doesn't perform the merge if delete files are present
if (((!candidate.delete_files.empty() || !candidate.inlined_file_deletions.empty()) &&
type == CompactionType::MERGE_ADJACENT_TABLES) ||
candidate.file.end_snapshot.IsValid()) {
// Merge Adjacent Tables doesn't perform the merge if any deletes are present
continue;
}
// construct the compaction group for this file - i.e. the set of candidate files we can compact it with
Expand Down Expand Up @@ -415,9 +416,11 @@ DuckLakeCompactor::GenerateCompactionCommand(vector<DuckLakeCompactionFileEntry>
for (auto &source : source_files) {
DuckLakeFileListEntry result;
result.file = source.file.data;
result.file_id = source.file.id;
result.row_id_start = source.file.row_id_start;
result.snapshot_id = source.file.begin_snapshot;
result.mapping_id = source.file.mapping_id;
result.inlined_file_deletions = source.inlined_file_deletions;
switch (type) {
case CompactionType::REWRITE_DELETES: {
if (!source.delete_files.empty()) {
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/ducklake_catalog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class DuckLakeCatalog : public Catalog {
PhysicalOperator &plan) override;
PhysicalOperator &PlanDelete(ClientContext &context, PhysicalPlanGenerator &planner, LogicalDelete &op,
PhysicalOperator &plan) override;
PhysicalOperator &PlanDelete(ClientContext &context, PhysicalPlanGenerator &planner, LogicalDelete &op) override;
PhysicalOperator &PlanUpdate(ClientContext &context, PhysicalPlanGenerator &planner, LogicalUpdate &op,
PhysicalOperator &plan) override;
PhysicalOperator &PlanMergeInto(ClientContext &context, PhysicalPlanGenerator &planner, LogicalMergeInto &op,
Expand Down
5 changes: 3 additions & 2 deletions src/include/storage/ducklake_metadata_info.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,8 @@ struct DuckLakeCompactionFileEntry {
vector<DuckLakeCompactionDeleteFileData> delete_files;
optional_idx max_partial_file_snapshot;
idx_t schema_version;
//! Whether this file has inlined deletions (stored in metadata database rather than delete files)
bool has_inlined_deletions = false;
//! Inlined file deletions stored in the metadata database rather than delete files
set<idx_t> inlined_file_deletions;
};

struct DuckLakeRewriteFileEntry {
Expand All @@ -468,6 +468,7 @@ struct DuckLakeCompactedFileInfo {
string path;
DataFileIndex source_id;
DataFileIndex new_id;
optional_idx rewrite_snapshot;
//! Info on delete files, in case the compaction is a delete-rewrite
string delete_file_path;
DataFileIndex delete_file_id;
Expand Down
2 changes: 2 additions & 0 deletions src/include/storage/ducklake_metadata_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ class DuckLakeMetadataManager {
SnapshotDeletedFromFiles GetFilesDeletedOrDroppedAfterSnapshot(const DuckLakeSnapshot &start_snapshot) const;
virtual unique_ptr<DuckLakeSnapshot> GetSnapshot();
virtual unique_ptr<DuckLakeSnapshot> GetSnapshot(BoundAtClause &at_clause, SnapshotBound bound);
virtual unique_ptr<DuckLakeSnapshot> GetSnapshotById(idx_t snapshot_id);
virtual idx_t GetNextColumnId(TableIndex table_id);
virtual unique_ptr<QueryResult> ReadInlinedData(DuckLakeSnapshot snapshot, const string &inlined_table_name,
const vector<string> &columns_to_read);
Expand All @@ -226,6 +227,7 @@ class DuckLakeMetadataManager {
virtual shared_ptr<DuckLakeInlinedData> TransformInlinedData(QueryResult &result,
const vector<LogicalType> &expected_types);

virtual void MarkInlinedDataDeleted(DuckLakeSnapshot snapshot, const string &inlined_table_name);
virtual void DeleteInlinedData(const DuckLakeInlinedTableInfo &inlined_table);
//! We delete at the flush
virtual void DeleteFlushedInlinedData(const DuckLakeInlinedTableInfo &inlined_table, idx_t flush_snapshot_id);
Expand Down
3 changes: 3 additions & 0 deletions src/include/storage/ducklake_transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ struct LocalTableChanges {
void AppendInlinedData(ClientContext &context, TableIndex table_id, unique_ptr<DuckLakeInlinedData> new_data);
void AddNewInlinedDeletes(TableIndex table_id, const string &table_name, set<idx_t> new_deletes);
void DeleteFromLocalInlinedData(ClientContext &context, TableIndex table_id, set<idx_t> new_deletes);
void TruncateLocalInlinedData(TableIndex table_id);
void AddColumnToLocalInlinedData(ClientContext &context, TableIndex table_id, const LogicalType &new_column_type,
FieldIndex new_field_index, const Value &default_value);
void RemoveColumnFromLocalInlinedData(ClientContext &context, TableIndex table_id,
Expand Down Expand Up @@ -206,6 +207,7 @@ class DuckLakeTransaction : public Transaction, public enable_shared_from_this<D
void AppendInlinedData(TableIndex table_id, unique_ptr<DuckLakeInlinedData> collection);
void AddNewInlinedDeletes(TableIndex table_id, const string &table_name, set<idx_t> new_deletes);
void DeleteFromLocalInlinedData(TableIndex table_id, set<idx_t> new_deletes);
void TruncateLocalInlinedData(TableIndex table_id);
void AddColumnToLocalInlinedData(TableIndex table_id, const LogicalType &new_column_type,
FieldIndex new_field_index, const Value &default_value = Value());
void RemoveColumnFromLocalInlinedData(TableIndex table_id, LogicalIndex removed_column_index,
Expand All @@ -226,6 +228,7 @@ class DuckLakeTransaction : public Transaction, public enable_shared_from_this<D
void DropFile(TableIndex table_id, DataFileIndex data_file_id, string path);

void DeleteSnapshots(const vector<DuckLakeSnapshotInfo> &snapshots);
void MarkInlinedDataDeleted(const string &inlined_table_name);
void DeleteInlinedData(const DuckLakeInlinedTableInfo &inlined_table);
//! Delete inlined data rows with begin_snapshot <= flush_snapshot_id
void DeleteFlushedInlinedData(const DuckLakeInlinedTableInfo &inlined_table, idx_t flush_snapshot_id);
Expand Down
35 changes: 35 additions & 0 deletions src/include/storage/ducklake_truncate.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
//===----------------------------------------------------------------------===//
// DuckDB
//
// storage/ducklake_truncate.hpp
//
//===----------------------------------------------------------------------===//

#pragma once

#include "duckdb/execution/physical_operator.hpp"
#include "storage/ducklake_table_entry.hpp"

namespace duckdb {

class DuckLakeTruncate : public PhysicalOperator {
public:
DuckLakeTruncate(PhysicalPlan &physical_plan, DuckLakeTableEntry &table);

DuckLakeTableEntry &table;

public:
SourceResultType GetDataInternal(ExecutionContext &context, DataChunk &chunk,
OperatorSourceInput &input) const override;

bool IsSource() const override {
return true;
}

unique_ptr<GlobalSourceState> GetGlobalSourceState(ClientContext &context) const override;

string GetName() const override;
InsertionOrderPreservingMap<string> ParamsToString() const override;
};

} // namespace duckdb
1 change: 1 addition & 0 deletions src/storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ add_library(
ducklake_storage.cpp
ducklake_delete.cpp
ducklake_deletion_vector.cpp
ducklake_truncate.cpp
ducklake_multi_file_reader.cpp
ducklake_partition_data.cpp
ducklake_secret.cpp
Expand Down
2 changes: 1 addition & 1 deletion src/storage/ducklake_delete.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ PhysicalOperator &DuckLakeCatalog::PlanDelete(ClientContext &context, PhysicalPl
row_id_indexes.push_back(bound_ref.index);
}
return DuckLakeDelete::PlanDelete(context, planner, op.table.Cast<DuckLakeTableEntry>(), child_plan,
std::move(row_id_indexes), std::move(encryption_key));
std::move(row_id_indexes), std::move(encryption_key), true);
}

} // namespace duckdb
85 changes: 65 additions & 20 deletions src/storage/ducklake_metadata_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1794,9 +1794,9 @@ vector<DuckLakeCompactionFileEntry> DuckLakeMetadataManager::GetFilesForCompacti
string select_list = data_select_list + ", " + delete_select_list;
string deletion_threshold_clause;
if (type == CompactionType::REWRITE_DELETES) {
deletion_threshold_clause = StringUtil::Format(
" AND CAST(del.delete_count AS FLOAT)/CAST(data.record_count AS FLOAT) >= %f and data.end_snapshot is null",
deletion_threshold);
// Filter current data files in SQL, then apply the delete threshold in C++ so we can include
// metadata-only inlined file deletions as rewrite candidates.
deletion_threshold_clause = " AND data.end_snapshot is null";
}
// Add file size filtering for MERGE_ADJACENT_TABLES compaction
string file_size_filter_clause;
Expand Down Expand Up @@ -1901,17 +1901,31 @@ ORDER BY data.begin_snapshot, data.row_id_start, data.data_file_id, del.begin_sn
file_entry.delete_files.push_back(std::move(delete_file));
}

// Check for inlined deletions and mark affected files
// Gather file IDs first, then query only for existence
vector<idx_t> file_ids;
file_ids.reserve(files.size());
// Load inlined deletions for active files so rewrite compaction can treat them the same as delete files.
auto inlined_deletions = ReadInlinedFileDeletions(table_id, snapshot);
for (auto &file : files) {
file_ids.push_back(file.file.id.index);
auto entry = inlined_deletions.find(file.file.id.index);
if (entry != inlined_deletions.end()) {
file.inlined_file_deletions = std::move(entry->second);
}
}
auto files_with_deletions = GetFileIdsWithInlinedDeletions(table_id, snapshot, file_ids);
for (auto &file : files) {
if (files_with_deletions.count(file.file.id.index)) {
file.has_inlined_deletions = true;

if (type == CompactionType::REWRITE_DELETES) {
for (idx_t file_idx = 0; file_idx < files.size(); file_idx++) {
auto &file = files[file_idx];
idx_t active_delete_count = 0;
if (!file.delete_files.empty() && !file.delete_files.back().end_snapshot.IsValid()) {
active_delete_count = file.delete_files.back().row_count;
}
auto total_delete_count = active_delete_count + file.inlined_file_deletions.size();
double delete_ratio = 0;
if (file.file.row_count > 0) {
delete_ratio = static_cast<double>(total_delete_count) / static_cast<double>(file.file.row_count);
}
if (total_delete_count == 0 || delete_ratio < deletion_threshold) {
files.erase_at(file_idx);
file_idx--;
}
}
}

Expand Down Expand Up @@ -3556,6 +3570,25 @@ WHERE snapshot_id = (
return snapshot;
}

unique_ptr<DuckLakeSnapshot> DuckLakeMetadataManager::GetSnapshotById(idx_t snapshot_id) {
auto query = StringUtil::Format(R"(
SELECT snapshot_id, schema_version, next_catalog_id, next_file_id
FROM {METADATA_CATALOG}.ducklake_snapshot
WHERE snapshot_id = %llu;)",
snapshot_id);
DuckLakeSnapshot dummy_snapshot(0, 0, 0, 0);
auto result = Query(dummy_snapshot, query);
if (result->HasError()) {
result->GetErrorObject().Throw(
StringUtil::Format("Failed to query snapshot %llu for DuckLake: ", snapshot_id));
}
auto snapshot = TryGetSnapshotInternal(*result);
if (!snapshot) {
throw InvalidInputException("Snapshot %llu not found in DuckLake", snapshot_id);
}
return snapshot;
}

static unordered_map<idx_t, DuckLakePartitionInfo>
GetNewPartitions(const vector<DuckLakePartitionInfo> &old_partitions,
const vector<DuckLakePartitionInfo> &new_partitions) {
Expand Down Expand Up @@ -4112,20 +4145,20 @@ string DuckLakeMetadataManager::WriteDeleteRewrites(const vector<DuckLakeCompact
for (idx_t i = compactions.size(); i > 0; i--) {
auto &compaction = compactions[i - 1];
if (table_idx_last_snapshot.find(compaction.table_index.index) == table_idx_last_snapshot.end()) {
table_idx_last_snapshot[compaction.table_index.index] = compaction.delete_file_start_snapshot.GetIndex();
table_idx_last_snapshot[compaction.table_index.index] = compaction.rewrite_snapshot.GetIndex();
}
}

string batch_query;
for (idx_t i = 0; i < compactions.size(); ++i) {
auto &compaction = compactions[i];
D_ASSERT(!compaction.path.empty());
if (!compaction.delete_file_end_snapshot.IsValid()) {
if (compaction.delete_file_id.IsValid() && !compaction.delete_file_end_snapshot.IsValid()) {
batch_query += StringUtil::Format(R"(
UPDATE {METADATA_CATALOG}.ducklake_delete_file SET end_snapshot = %llu
WHERE delete_file_id = %llu;
)",
table_idx_last_snapshot[compaction.table_index.index],
UPDATE {METADATA_CATALOG}.ducklake_delete_file SET end_snapshot = %llu
WHERE delete_file_id = %llu;
)",
table_idx_last_snapshot[compaction.table_index.index],
compaction.delete_file_id.index);
}
// We must update the data file table
Expand Down Expand Up @@ -4377,8 +4410,8 @@ WHERE end_snapshot IS NOT NULL AND NOT EXISTS(

void DuckLakeMetadataManager::DeleteInlinedData(const DuckLakeInlinedTableInfo &inlined_table) {
auto result = transaction.Query(StringUtil::Format(R"(
DELETE FROM {METADATA_CATALOG}.%s
)",
DELETE FROM {METADATA_CATALOG}.%s
)",
SQLIdentifier(inlined_table.table_name)));
if (result->HasError()) {
result->GetErrorObject().Throw("Failed to delete inlined data in DuckLake from table " +
Expand Down Expand Up @@ -4408,6 +4441,18 @@ DuckLakeMetadataManager::GenerateDeleteFlushedInlinedData(const vector<FlushedIn
return result;
}

void DuckLakeMetadataManager::MarkInlinedDataDeleted(DuckLakeSnapshot snapshot, const string &inlined_table_name) {
auto result = transaction.Query(snapshot, StringUtil::Format(R"(
UPDATE {METADATA_CATALOG}.%s
SET end_snapshot = {SNAPSHOT_ID}
WHERE end_snapshot IS NULL AND begin_snapshot <= {SNAPSHOT_ID}
)",
SQLIdentifier(inlined_table_name)));
if (result->HasError()) {
result->GetErrorObject().Throw("Failed to mark inlined data as deleted in DuckLake from table " +
inlined_table_name + ": ");
}
}
string DuckLakeMetadataManager::InsertNewSchema(const DuckLakeSnapshot &snapshot, const set<TableIndex> &table_ids) {
if (table_ids.empty()) {
return {};
Expand Down
32 changes: 0 additions & 32 deletions src/storage/ducklake_multi_file_list.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,38 +307,6 @@ vector<DuckLakeFileListExtendedEntry> DuckLakeMultiFileList::GetFilesExtended()
file_entry.data_type = DuckLakeDataType::TRANSACTION_LOCAL_INLINED_DATA;
result.push_back(std::move(file_entry));
}
if (!read_file_list) {
// we have not read the file list yet - construct it from the extended file list
// Read committed inlined file deletions from metadata
map<idx_t, set<idx_t>> committed_inlined_deletions;
if (!read_info.table_id.IsTransactionLocal()) {
auto &metadata_manager = transaction.GetMetadataManager();
committed_inlined_deletions =
metadata_manager.ReadInlinedFileDeletions(read_info.table_id, read_info.snapshot);
}
for (auto &file : result) {
DuckLakeFileListEntry file_entry;
file_entry.file = file.file;
file_entry.row_id_start = file.row_id_start;
file_entry.delete_file = file.delete_file;
file_entry.file_id = file.file_id;
file_entry.data_type = file.data_type;
// Apply committed inlined file deletions from metadata
if (file.file_id.IsValid()) {
auto it = committed_inlined_deletions.find(file.file_id.index);
if (it != committed_inlined_deletions.end()) {
file_entry.inlined_file_deletions = std::move(it->second);
}
}
// Apply local inlined file deletes if any (merges into committed deletions)
if (file.file_id.IsValid() && transaction.HasLocalInlinedFileDeletes(read_info.table_id)) {
transaction.GetLocalInlinedFileDeletesForFile(read_info.table_id, file.file_id.index,
file_entry.inlined_file_deletions);
}
files.emplace_back(std::move(file_entry));
}
read_file_list = true;
}
return result;
}

Expand Down
3 changes: 2 additions & 1 deletion src/storage/ducklake_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ TableFunction DuckLakeFunctions::GetDuckLakeScanFunction(DatabaseInstance &insta

DuckLakeFunctionInfo::DuckLakeFunctionInfo(DuckLakeTableEntry &table, DuckLakeTransaction &transaction_p,
DuckLakeSnapshot snapshot)
: table(table), transaction(transaction_p.shared_from_this()), snapshot(snapshot) {
: table(table), transaction(transaction_p.shared_from_this()), table_name(table.name), snapshot(snapshot),
table_id(table.GetTableId()) {
}

shared_ptr<DuckLakeFunctionInfo>
Expand Down
29 changes: 28 additions & 1 deletion src/storage/ducklake_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,22 @@ void LocalTableChanges::DeleteFromLocalInlinedData(ClientContext &context, Table
inlined_data.row_ids = std::move(new_row_ids);
}

void LocalTableChanges::TruncateLocalInlinedData(TableIndex table_id) {
lock_guard<mutex> guard(lock);
auto entry = changes.find(table_id);
if (entry == changes.end()) {
throw InternalException("TruncateLocalInlinedData called but no transaction-local data exists for table");
}
auto &table_changes = entry->second;
if (!table_changes.new_inlined_data) {
throw InternalException("TruncateLocalInlinedData called but no inlined data exists");
}
table_changes.new_inlined_data.reset();
if (table_changes.IsEmpty()) {
changes.erase(entry);
}
}

static void RemoveFieldStats(map<FieldIndex, DuckLakeColumnStats> &column_stats, const DuckLakeFieldId &field_id) {
column_stats.erase(field_id.GetFieldIndex());
for (auto &child_id : field_id.Children()) {
Expand Down Expand Up @@ -2391,9 +2407,12 @@ CompactionInformation DuckLakeTransaction::GetCompactionChanges(DuckLakeCommitSt
if (!compacted_file.delete_files.empty()) {
row_id_limit -= compacted_file.delete_files.back().row_count;
}
row_id_limit -= compacted_file.inlined_file_deletions.size();
DuckLakeCompactedFileInfo file_info;
file_info.path = compacted_file.file.data.path;
file_info.source_id = compacted_file.file.id;
file_info.table_index = entry.GetTableIndex();
file_info.rewrite_snapshot = commit_snapshot.snapshot_id;
if (has_new_file) {
file_info.new_id = new_file.id;
}
Expand All @@ -2402,7 +2421,6 @@ CompactionInformation DuckLakeTransaction::GetCompactionChanges(DuckLakeCommitSt
file_info.delete_file_path = compacted_file.delete_files.back().data.path;
file_info.delete_file_id = compacted_file.delete_files.back().delete_file_id;
file_info.start_snapshot = compacted_file.file.begin_snapshot;
file_info.table_index = entry.GetTableIndex();
file_info.delete_file_start_snapshot = commit_snapshot.snapshot_id;
file_info.delete_file_end_snapshot = compacted_file.delete_files.back().end_snapshot;
}
Expand Down Expand Up @@ -2558,6 +2576,11 @@ void DuckLakeTransaction::DeleteSnapshots(const vector<DuckLakeSnapshotInfo> &sn
metadata_manager.DeleteSnapshots(snapshots);
}

void DuckLakeTransaction::MarkInlinedDataDeleted(const string &inlined_table_name) {
auto &metadata_manager = GetMetadataManager();
metadata_manager.MarkInlinedDataDeleted(GetSnapshot(), inlined_table_name);
}

void DuckLakeTransaction::DeleteInlinedData(const DuckLakeInlinedTableInfo &inlined_table) {
auto &metadata_manager = GetMetadataManager();
metadata_manager.DeleteInlinedData(inlined_table);
Expand Down Expand Up @@ -2710,6 +2733,10 @@ void DuckLakeTransaction::DeleteFromLocalInlinedData(TableIndex table_id, set<id
local_changes.DeleteFromLocalInlinedData(*context_ref, table_id, std::move(new_deletes));
}

void DuckLakeTransaction::TruncateLocalInlinedData(TableIndex table_id) {
local_changes.TruncateLocalInlinedData(table_id);
}

void DuckLakeTransaction::AddColumnToLocalInlinedData(TableIndex table_id, const LogicalType &new_column_type,
FieldIndex new_field_index, const Value &default_value) {
auto context_ref = context.lock();
Expand Down
Loading
Loading