Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/include/storage/ducklake_catalog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class DuckLakeCatalog : public Catalog {
PhysicalOperator &plan) override;
PhysicalOperator &PlanDelete(ClientContext &context, PhysicalPlanGenerator &planner, LogicalDelete &op,
PhysicalOperator &plan) override;
PhysicalOperator &PlanDelete(ClientContext &context, PhysicalPlanGenerator &planner, LogicalDelete &op) override;
PhysicalOperator &PlanUpdate(ClientContext &context, PhysicalPlanGenerator &planner, LogicalUpdate &op,
PhysicalOperator &plan) override;
PhysicalOperator &PlanMergeInto(ClientContext &context, PhysicalPlanGenerator &planner, LogicalMergeInto &op,
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/ducklake_delete_filter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ namespace duckdb {
struct DuckLakeDeleteData {
vector<idx_t> deleted_rows;
vector<idx_t> snapshot_ids;
bool delete_all = false;
//! For deletion scans: mapping from row_id to snapshot_id for rows that were deleted
//! If scan_snapshot_map_uses_row_id is true, this is indexed by global row_id (from _ducklake_internal_row_id)
//! Otherwise, it's indexed by file position
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/ducklake_inlined_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ struct DuckLakeInlinedData {

struct DuckLakeInlinedDataDeletes {
set<idx_t> rows;
bool delete_all = false;
};

//! Stores inlined file deletions for a table
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/ducklake_metadata_info.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ struct DuckLakeDeletedInlinedDataInfo {
TableIndex table_id;
string table_name;
vector<idx_t> deleted_row_ids;
bool delete_all = false;
};

//! Info for all inlined file deletions for a single table
Expand Down
8 changes: 6 additions & 2 deletions src/include/storage/ducklake_transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,10 @@ struct LocalTableChanges {
void DropTransactionLocalFile(ClientContext &context, TableIndex table_id, const string &path);
void AppendFiles(TableIndex table_id, vector<DuckLakeDataFile> files);
void AppendInlinedData(ClientContext &context, TableIndex table_id, unique_ptr<DuckLakeInlinedData> new_data);
void AddNewInlinedDeletes(TableIndex table_id, const string &table_name, set<idx_t> new_deletes);
void AddNewInlinedDeletes(TableIndex table_id, const string &table_name, set<idx_t> new_deletes,
bool delete_all = false);
void DeleteFromLocalInlinedData(ClientContext &context, TableIndex table_id, set<idx_t> new_deletes);
void TruncateLocalInlinedData(TableIndex table_id);
void AddColumnToLocalInlinedData(ClientContext &context, TableIndex table_id, const LogicalType &new_column_type,
FieldIndex new_field_index, const Value &default_value);
void RemoveColumnFromLocalInlinedData(ClientContext &context, TableIndex table_id,
Expand Down Expand Up @@ -204,8 +206,10 @@ class DuckLakeTransaction : public Transaction, public enable_shared_from_this<D
NewNameMapInfo GetNewNameMaps(DuckLakeCommitState &commit_state);

void AppendInlinedData(TableIndex table_id, unique_ptr<DuckLakeInlinedData> collection);
void AddNewInlinedDeletes(TableIndex table_id, const string &table_name, set<idx_t> new_deletes);
void AddNewInlinedDeletes(TableIndex table_id, const string &table_name, set<idx_t> new_deletes,
bool delete_all = false);
void DeleteFromLocalInlinedData(TableIndex table_id, set<idx_t> new_deletes);
void TruncateLocalInlinedData(TableIndex table_id);
void AddColumnToLocalInlinedData(TableIndex table_id, const LogicalType &new_column_type,
FieldIndex new_field_index, const Value &default_value = Value());
void RemoveColumnFromLocalInlinedData(TableIndex table_id, LogicalIndex removed_column_index,
Expand Down
35 changes: 35 additions & 0 deletions src/include/storage/ducklake_truncate.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
//===----------------------------------------------------------------------===//
// DuckDB
//
// storage/ducklake_truncate.hpp
//
//===----------------------------------------------------------------------===//

#pragma once

#include "duckdb/execution/physical_operator.hpp"
#include "storage/ducklake_table_entry.hpp"

namespace duckdb {

class DuckLakeTruncate : public PhysicalOperator {
public:
DuckLakeTruncate(PhysicalPlan &physical_plan, DuckLakeTableEntry &table);

DuckLakeTableEntry &table;

public:
SourceResultType GetDataInternal(ExecutionContext &context, DataChunk &chunk,
OperatorSourceInput &input) const override;

bool IsSource() const override {
return true;
}

unique_ptr<GlobalSourceState> GetGlobalSourceState(ClientContext &context) const override;

string GetName() const override;
InsertionOrderPreservingMap<string> ParamsToString() const override;
};

} // namespace duckdb
1 change: 1 addition & 0 deletions src/storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ add_library(
ducklake_storage.cpp
ducklake_delete.cpp
ducklake_deletion_vector.cpp
ducklake_truncate.cpp
ducklake_multi_file_reader.cpp
ducklake_partition_data.cpp
ducklake_secret.cpp
Expand Down
2 changes: 1 addition & 1 deletion src/storage/ducklake_delete.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,7 @@ PhysicalOperator &DuckLakeCatalog::PlanDelete(ClientContext &context, PhysicalPl
row_id_indexes.push_back(bound_ref.index);
}
return DuckLakeDelete::PlanDelete(context, planner, op.table.Cast<DuckLakeTableEntry>(), child_plan,
std::move(row_id_indexes), std::move(encryption_key));
std::move(row_id_indexes), std::move(encryption_key), true);
}

} // namespace duckdb
11 changes: 11 additions & 0 deletions src/storage/ducklake_delete_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ DuckLakeDeleteFilter::DuckLakeDeleteFilter() : delete_data(make_shared_ptr<DuckL

idx_t DuckLakeDeleteData::Filter(row_t start_row_index, idx_t count, SelectionVector &result_sel,
optional_idx snapshot_filter) const {
if (delete_all) {
return 0;
}
auto entry = std::lower_bound(deleted_rows.begin(), deleted_rows.end(), start_row_index);
if (entry == deleted_rows.end()) {
// no filter found for this entry
Expand Down Expand Up @@ -239,11 +242,19 @@ DeleteFileScanResult DuckLakeDeleteFilter::ScanDeleteFile(ClientContext &context

void DuckLakeDeleteFilter::Initialize(ClientContext &context, const DuckLakeFileData &delete_file) {
auto scan_result = ScanDeleteFile(context, delete_file, optional_idx(), optional_idx());
delete_data->delete_all = false;
delete_data->deleted_rows = std::move(scan_result.deleted_rows);
delete_data->snapshot_ids = std::move(scan_result.snapshot_ids);
}

void DuckLakeDeleteFilter::Initialize(const DuckLakeInlinedDataDeletes &inlined_deletes) {
delete_data->delete_all = false;
if (inlined_deletes.delete_all) {
delete_data->delete_all = true;
delete_data->deleted_rows.clear();
delete_data->snapshot_ids.clear();
return;
}
D_ASSERT(std::is_sorted(delete_data->deleted_rows.begin(), delete_data->deleted_rows.end()));
auto mid_idx = delete_data->deleted_rows.size();
for (auto &idx : inlined_deletes.rows) {
Expand Down
30 changes: 16 additions & 14 deletions src/storage/ducklake_inlined_data_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,23 +131,25 @@ bool DuckLakeInlinedDataReader::TryInitializeScan(ClientContext &context, Global
if (deletion_filter) {
// map the deleted row-ids to the deleted ordinals to obtain the correct deleted rows
auto &filter = reinterpret_cast<DuckLakeDeleteFilter &>(*deletion_filter);
vector<idx_t> deleted_ordinals;
auto &deleted_row_ids = filter.delete_data->deleted_rows;
idx_t current_idx = 0;
idx_t ordinal_position = 0;
for (auto &chunk : data->data->Chunks()) {
auto &row_id_vector = chunk.data.back();
auto row_id_data = FlatVector::GetData<int64_t>(row_id_vector);
for (idx_t r = 0; r < chunk.size(); r++) {
auto row_id = NumericCast<idx_t>(row_id_data[r]);
if (current_idx < deleted_row_ids.size() && deleted_row_ids[current_idx] == row_id) {
deleted_ordinals.push_back(ordinal_position);
current_idx++;
if (!filter.delete_data->delete_all) {
vector<idx_t> deleted_ordinals;
auto &deleted_row_ids = filter.delete_data->deleted_rows;
idx_t current_idx = 0;
idx_t ordinal_position = 0;
for (auto &chunk : data->data->Chunks()) {
auto &row_id_vector = chunk.data.back();
auto row_id_data = FlatVector::GetData<int64_t>(row_id_vector);
for (idx_t r = 0; r < chunk.size(); r++) {
auto row_id = NumericCast<idx_t>(row_id_data[r]);
if (current_idx < deleted_row_ids.size() && deleted_row_ids[current_idx] == row_id) {
deleted_ordinals.push_back(ordinal_position);
current_idx++;
}
ordinal_position++;
}
ordinal_position++;
}
filter.delete_data->deleted_rows = std::move(deleted_ordinals);
}
filter.delete_data->deleted_rows = std::move(deleted_ordinals);
}
data->data->InitializeScan(state);
} else {
Expand Down
9 changes: 9 additions & 0 deletions src/storage/ducklake_metadata_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2431,6 +2431,15 @@ string DuckLakeMetadataManager::WriteNewInlinedDeletes(const vector<DuckLakeDele
return batch_queries;
}
for (auto &entry : new_deletes) {
if (entry.delete_all) {
batch_queries += StringUtil::Format(R"(
UPDATE {METADATA_CATALOG}.%s
SET end_snapshot = {SNAPSHOT_ID}
WHERE end_snapshot IS NULL AND begin_snapshot != {SNAPSHOT_ID};
)",
entry.table_name);
continue;
}
// get a list of all deleted row-ids for this table
string row_id_list;
for (auto &deleted_id : entry.deleted_row_ids) {
Expand Down
3 changes: 2 additions & 1 deletion src/storage/ducklake_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ TableFunction DuckLakeFunctions::GetDuckLakeScanFunction(DatabaseInstance &insta

DuckLakeFunctionInfo::DuckLakeFunctionInfo(DuckLakeTableEntry &table, DuckLakeTransaction &transaction_p,
DuckLakeSnapshot snapshot)
: table(table), transaction(transaction_p.shared_from_this()), snapshot(snapshot) {
: table(table), transaction(transaction_p.shared_from_this()), table_name(table.name), snapshot(snapshot),
table_id(table.GetTableId()) {
}

shared_ptr<DuckLakeFunctionInfo>
Expand Down
50 changes: 42 additions & 8 deletions src/storage/ducklake_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,19 +227,32 @@ void LocalTableChanges::AppendInlinedData(ClientContext &context, TableIndex tab
}
}

void LocalTableChanges::AddNewInlinedDeletes(TableIndex table_id, const string &table_name, set<idx_t> new_deletes) {
void LocalTableChanges::AddNewInlinedDeletes(TableIndex table_id, const string &table_name, set<idx_t> new_deletes,
bool delete_all) {
if (new_deletes.empty() && !delete_all) {
return;
}
lock_guard<mutex> guard(lock);
auto &table_changes = changes[table_id];
auto &table_deletes = table_changes.new_inlined_data_deletes;
auto entry = table_deletes.find(table_name);
if (entry != table_deletes.end()) {
if (delete_all) {
entry->second->delete_all = true;
entry->second->rows.clear();
return;
}
if (entry->second->delete_all) {
return;
}
// merge deletes
auto &existing_rows = entry->second->rows;
for (auto &row_idx : new_deletes) {
existing_rows.insert(row_idx);
}
} else {
auto new_data = make_uniq<DuckLakeInlinedDataDeletes>();
new_data->delete_all = delete_all;
new_data->rows = std::move(new_deletes);
table_deletes.emplace(table_name, std::move(new_data));
}
Expand Down Expand Up @@ -290,6 +303,22 @@ void LocalTableChanges::DeleteFromLocalInlinedData(ClientContext &context, Table
inlined_data.row_ids = std::move(new_row_ids);
}

void LocalTableChanges::TruncateLocalInlinedData(TableIndex table_id) {
lock_guard<mutex> guard(lock);
auto entry = changes.find(table_id);
if (entry == changes.end()) {
throw InternalException("TruncateLocalInlinedData called but no transaction-local data exists for table");
}
auto &table_changes = entry->second;
if (!table_changes.new_inlined_data) {
throw InternalException("TruncateLocalInlinedData called but no inlined data exists");
}
table_changes.new_inlined_data.reset();
if (table_changes.IsEmpty()) {
changes.erase(entry);
}
}

static void RemoveFieldStats(map<FieldIndex, DuckLakeColumnStats> &column_stats, const DuckLakeFieldId &field_id) {
column_stats.erase(field_id.GetFieldIndex());
for (auto &child_id : field_id.Children()) {
Expand Down Expand Up @@ -2174,8 +2203,11 @@ DuckLakeTransaction::GetNewInlinedDeletes(DuckLakeCommitState &commit_state) con
DuckLakeDeletedInlinedDataInfo info;
info.table_id = table_id;
info.table_name = delete_entry.first;
for (auto &row_id : delete_entry.second->rows) {
info.deleted_row_ids.push_back(row_id);
info.delete_all = delete_entry.second->delete_all;
if (!info.delete_all) {
for (auto &row_id : delete_entry.second->rows) {
info.deleted_row_ids.push_back(row_id);
}
}
result.push_back(std::move(info));
}
Expand Down Expand Up @@ -2700,18 +2732,20 @@ void DuckLakeTransaction::AppendInlinedData(TableIndex table_id, unique_ptr<Duck
local_changes.AppendInlinedData(*context_ref, table_id, std::move(new_data));
}

void DuckLakeTransaction::AddNewInlinedDeletes(TableIndex table_id, const string &table_name, set<idx_t> new_deletes) {
if (new_deletes.empty()) {
return;
}
local_changes.AddNewInlinedDeletes(table_id, table_name, std::move(new_deletes));
void DuckLakeTransaction::AddNewInlinedDeletes(TableIndex table_id, const string &table_name, set<idx_t> new_deletes,
bool delete_all) {
local_changes.AddNewInlinedDeletes(table_id, table_name, std::move(new_deletes), delete_all);
}

void DuckLakeTransaction::DeleteFromLocalInlinedData(TableIndex table_id, set<idx_t> new_deletes) {
auto context_ref = context.lock();
local_changes.DeleteFromLocalInlinedData(*context_ref, table_id, std::move(new_deletes));
}

void DuckLakeTransaction::TruncateLocalInlinedData(TableIndex table_id) {
local_changes.TruncateLocalInlinedData(table_id);
}

void DuckLakeTransaction::AddColumnToLocalInlinedData(TableIndex table_id, const LogicalType &new_column_type,
FieldIndex new_field_index, const Value &default_value) {
auto context_ref = context.lock();
Expand Down
Loading
Loading