Skip to content

Commit 3e318fc

Browse files
committed
slight cleanup of logic, deduplication of the logic to select the relevant equality deletes for a given data file
1 parent cb13c01 commit 3e318fc

File tree

5 files changed

+83
-72
lines changed

5 files changed

+83
-72
lines changed

src/deletes/equality_delete.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,13 @@ void IcebergMultiFileList::ScanEqualityDeleteFile(const IcebergManifestEntry &en
6161
id_to_global_column[col.identifier.GetValue<int32_t>()] = i;
6262
}
6363

64-
std::vector<ColumnIndex> new_column_indexes = column_indexes;
64+
auto new_column_indexes = column_indexes;
6565
for (auto field_id : entry.equality_ids) {
6666
auto global_column_id = id_to_global_column[field_id];
6767
ColumnIndex equality_index(global_column_id);
68+
//! Check if the column needed by the equality delete is present
6869
if (std::find(column_indexes.begin(), column_indexes.end(), equality_index) == column_indexes.end()) {
70+
//! Column isn't being selected, add the column so it can be used for the equality delete
6971
new_column_indexes.push_back(equality_index);
7072
}
7173
}
@@ -94,6 +96,7 @@ void IcebergMultiFileList::ScanEqualityDeleteFile(const IcebergManifestEntry &en
9496
auto &vec = result.data[col_idx];
9597

9698
auto it = global_id_to_result_id.find(global_column_id);
99+
D_ASSERT(it != global_id_to_result_id.end());
97100
global_column_id = it->second;
98101

99102
for (idx_t i = 0; i < count; i++) {

src/iceberg_functions/iceberg_multi_file_list.cpp

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,38 @@ bool IcebergMultiFileList::ManifestMatchesFilter(const IcebergManifest &manifest
528528
return true;
529529
}
530530

531+
vector<reference<const IcebergEqualityDeleteRow>>
532+
IcebergMultiFileList::GetEqualityDeletesForFile(const IcebergManifestEntry &data_file) const {
533+
vector<reference<const IcebergEqualityDeleteRow>> result;
534+
535+
//! Look through all the equality delete files with a *higher* sequence number
536+
auto &metadata = GetMetadata();
537+
auto it = equality_delete_data.upper_bound(data_file.sequence_number);
538+
for (; it != equality_delete_data.end(); it++) {
539+
auto &files = it->second->files;
540+
for (auto &file : files) {
541+
auto &partition_spec = metadata.partition_specs.at(file.partition_spec_id);
542+
if (partition_spec.IsPartitioned()) {
543+
if (file.partition_spec_id != data_file.partition_spec_id) {
544+
//! Not unpartitioned and the data does not share the same partition spec as the delete, skip the
545+
//! delete file.
546+
continue;
547+
}
548+
D_ASSERT(file.partition_values.size() == data_file.partition_values.size());
549+
for (idx_t i = 0; i < file.partition_values.size(); i++) {
550+
if (file.partition_values[i] != data_file.partition_values[i]) {
551+
//! Same partition spec id, but the partitioning information doesn't match, delete file doesn't
552+
//! apply.
553+
continue;
554+
}
555+
}
556+
}
557+
result.insert(result.end(), file.rows.begin(), file.rows.end());
558+
}
559+
}
560+
return result;
561+
}
562+
531563
void IcebergMultiFileList::InitializeFiles(lock_guard<mutex> &guard) {
532564
if (initialized) {
533565
return;

src/iceberg_functions/iceberg_multi_file_reader.cpp

Lines changed: 44 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -260,34 +260,32 @@ ReaderInitializeType IcebergMultiFileReader::InitializeReader(MultiFileReaderDat
260260
FinalizeBind(reader_data, bind_data.file_options, bind_data.reader_bind, global_columns, global_column_ids, context,
261261
gstate.multi_file_reader_state.get());
262262

263+
//! Create a mapping from field_id -> column index
263264
unordered_map<int32_t, column_t> id_to_global_column;
264265
for (column_t i = 0; i < global_columns.size(); i++) {
265266
auto &col = global_columns[i];
266267
D_ASSERT(!col.identifier.IsNull());
267268
id_to_global_column[col.identifier.GetValue<int32_t>()] = i;
268269
}
269270

270-
set<int32_t> equality_delete_ids;
271-
const auto &multi_file_list = dynamic_cast<const IcebergMultiFileList &>(gstate.file_list);
271+
//! Get the data file that we're preparing to scan
272+
const auto &multi_file_list = gstate.file_list.Cast<IcebergMultiFileList>();
272273
auto &reader = *reader_data.reader;
273274
auto file_id = reader.file_list_idx.GetIndex();
274275
auto &data_file = multi_file_list.data_files[file_id];
275276

276-
auto delete_data_it = multi_file_list.equality_delete_data.upper_bound(data_file.sequence_number);
277-
for (; delete_data_it != multi_file_list.equality_delete_data.end(); delete_data_it++) {
278-
auto &files = delete_data_it->second->files;
279-
for (auto &file : files) {
280-
auto &rows = file.rows;
281-
for (auto &row : rows) {
282-
auto &filters = row.filters;
283-
for (auto &filter : filters) {
284-
equality_delete_ids.insert(filter.first);
285-
}
286-
}
277+
//! Collect all the equality delete ids needed
278+
unordered_set<int32_t> equality_delete_ids;
279+
auto delete_rows = multi_file_list.GetEqualityDeletesForFile(data_file);
280+
for (auto &row : delete_rows) {
281+
auto &filters = row.get().filters;
282+
for (auto &filter : filters) {
283+
equality_delete_ids.insert(filter.first);
287284
}
288285
}
289286

290-
vector<ColumnIndex> new_global_column_ids = global_column_ids;
287+
//! Add the columns needed by the equality deletes if not present
288+
auto new_global_column_ids = global_column_ids;
291289
for (auto field_id : equality_delete_ids) {
292290
auto global_column_id = id_to_global_column[field_id];
293291
ColumnIndex equality_index(global_column_id);
@@ -342,34 +340,8 @@ void IcebergMultiFileReader::ApplyEqualityDeletes(ClientContext &context, DataCh
342340
const IcebergMultiFileList &multi_file_list,
343341
const IcebergManifestEntry &data_file,
344342
const vector<MultiFileColumnDefinition> &local_columns,
345-
unordered_map<idx_t, idx_t> field_id_to_result_id) {
346-
vector<reference<IcebergEqualityDeleteRow>> delete_rows;
347-
348-
auto &metadata = multi_file_list.GetMetadata();
349-
auto delete_data_it = multi_file_list.equality_delete_data.upper_bound(data_file.sequence_number);
350-
//! Look through all the equality delete files with a *higher* sequence number
351-
for (; delete_data_it != multi_file_list.equality_delete_data.end(); delete_data_it++) {
352-
auto &files = delete_data_it->second->files;
353-
for (auto &file : files) {
354-
auto &partition_spec = metadata.partition_specs.at(file.partition_spec_id);
355-
if (partition_spec.IsPartitioned()) {
356-
if (file.partition_spec_id != data_file.partition_spec_id) {
357-
//! Not unpartitioned and the data does not share the same partition spec as the delete, skip the
358-
//! delete file.
359-
continue;
360-
}
361-
D_ASSERT(file.partition_values.size() == data_file.partition_values.size());
362-
for (idx_t i = 0; i < file.partition_values.size(); i++) {
363-
if (file.partition_values[i] != data_file.partition_values[i]) {
364-
//! Same partition spec id, but the partitioning information doesn't match, delete file doesn't
365-
//! apply.
366-
continue;
367-
}
368-
}
369-
}
370-
delete_rows.insert(delete_rows.end(), file.rows.begin(), file.rows.end());
371-
}
372-
}
343+
const unordered_map<idx_t, idx_t> &field_id_to_result_id) {
344+
auto delete_rows = multi_file_list.GetEqualityDeletesForFile(data_file);
373345

374346
if (delete_rows.empty()) {
375347
return;
@@ -410,26 +382,26 @@ void IcebergMultiFileReader::ApplyEqualityDeletes(ClientContext &context, DataCh
410382
} else {
411383
equalities.push_back(make_uniq<BoundConstantExpression>(Value::BOOLEAN(true)));
412384
}
413-
} else {
414-
if (field_id_to_result_id.empty()) {
415-
equalities.push_back(expression->Copy());
416-
} else {
417-
idx_t index = field_id_to_result_id[field_id];
418-
if (expression->type == ExpressionType::COMPARE_NOTEQUAL) {
419-
auto &expr = expression->Cast<BoundComparisonExpression>();
420-
auto bound_ref = make_uniq<BoundReferenceExpression>(expr.left->return_type, index);
421-
unique_ptr<Expression> equality_filter = make_uniq<BoundComparisonExpression>(
422-
ExpressionType::COMPARE_NOTEQUAL, std::move(bound_ref), expr.right->Copy());
423-
equalities.push_back(std::move(equality_filter));
424-
} else if (expression->type == ExpressionType::OPERATOR_IS_NOT_NULL) {
425-
auto &expr = expression->Cast<BoundOperatorExpression>();
426-
auto bound_ref = make_uniq<BoundReferenceExpression>(expr.children[0]->return_type, index);
427-
auto is_not_null = make_uniq<BoundOperatorExpression>(ExpressionType::OPERATOR_IS_NOT_NULL,
428-
LogicalType::BOOLEAN);
429-
is_not_null->children.push_back(std::move(bound_ref));
430-
equalities.push_back(std::move(is_not_null));
431-
}
432-
}
385+
continue;
386+
}
387+
if (field_id_to_result_id.empty()) {
388+
equalities.push_back(expression->Copy());
389+
continue;
390+
}
391+
idx_t index = field_id_to_result_id.at(field_id);
392+
if (expression->type == ExpressionType::COMPARE_NOTEQUAL) {
393+
auto &expr = expression->Cast<BoundComparisonExpression>();
394+
auto bound_ref = make_uniq<BoundReferenceExpression>(expr.left->return_type, index);
395+
unique_ptr<Expression> equality_filter = make_uniq<BoundComparisonExpression>(
396+
ExpressionType::COMPARE_NOTEQUAL, std::move(bound_ref), expr.right->Copy());
397+
equalities.push_back(std::move(equality_filter));
398+
} else if (expression->type == ExpressionType::OPERATOR_IS_NOT_NULL) {
399+
auto &expr = expression->Cast<BoundOperatorExpression>();
400+
auto bound_ref = make_uniq<BoundReferenceExpression>(expr.children[0]->return_type, index);
401+
auto is_not_null =
402+
make_uniq<BoundOperatorExpression>(ExpressionType::OPERATOR_IS_NOT_NULL, LogicalType::BOOLEAN);
403+
is_not_null->children.push_back(std::move(bound_ref));
404+
equalities.push_back(std::move(is_not_null));
433405
}
434406
}
435407

@@ -469,7 +441,7 @@ void IcebergMultiFileReader::FinalizeChunk(ClientContext &context, const MultiFi
469441
ExpressionExecutor &executor,
470442
optional_ptr<MultiFileReaderGlobalState> global_state) {
471443

472-
// add the extra equality delete fields to output chunk.
444+
//! Add the extra equality delete fields to output chunk.
473445
int32_t diff = 0;
474446
if (executor.expressions.size() != output_chunk.ColumnCount()) {
475447
diff = executor.expressions.size() - output_chunk.ColumnCount();
@@ -479,22 +451,24 @@ void IcebergMultiFileReader::FinalizeChunk(ClientContext &context, const MultiFi
479451
}
480452
}
481453

482-
// Base class finalization first
454+
//! Base class finalization first
483455
MultiFileReader::FinalizeChunk(context, bind_data, reader, reader_data, input_chunk, output_chunk, executor,
484456
global_state);
485457

458+
//! Map from index into local_columns -> field_id
486459
auto &local_columns = reader.columns;
487460
unordered_map<idx_t, idx_t> column_index_to_field_id;
488461
for (idx_t i = 0; i < local_columns.size(); i++) {
489462
auto &col = local_columns[i];
490463
column_index_to_field_id[i] = col.identifier.GetValue<int32_t>();
491464
}
465+
466+
//! Map from field_id -> index in 'output_chunk'
492467
unordered_map<idx_t, idx_t> field_id_to_result_id;
493-
vector<ColumnIndex> column_indexes = reader.column_indexes;
494-
int32_t result_id = executor.expressions.size() - 1;
495-
for (int32_t i = column_indexes.size() - 1; i >= 0; i--) {
496-
ColumnIndex column_index = column_indexes[i];
497-
field_id_to_result_id[column_index_to_field_id[column_index.GetPrimaryIndex()]] = result_id--;
468+
auto &column_indexes = reader.column_indexes;
469+
auto result_id = executor.expressions.size() - column_indexes.size();
470+
for (auto &column_index : column_indexes) {
471+
field_id_to_result_id[column_index_to_field_id[column_index.GetPrimaryIndex()]] = result_id++;
498472
}
499473

500474
D_ASSERT(global_state);
@@ -504,7 +478,7 @@ void IcebergMultiFileReader::FinalizeChunk(ClientContext &context, const MultiFi
504478
auto &data_file = multi_file_list.data_files[file_id];
505479
ApplyEqualityDeletes(context, output_chunk, multi_file_list, data_file, local_columns, field_id_to_result_id);
506480

507-
// delete the equality delete fields for result
481+
//! Remove the extra columns we added to perform the equality delete filtering
508482
for (idx_t i = 0; i < diff; i++) {
509483
output_chunk.data.pop_back();
510484
}

src/include/iceberg_multi_file_list.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ struct IcebergMultiFileList : public MultiFileList {
5353
unique_ptr<DeleteFilter> GetPositionalDeletesForFile(const string &file_path) const;
5454
void ProcessDeletes(const vector<MultiFileColumnDefinition> &global_columns,
5555
const vector<ColumnIndex> &column_indexes) const;
56+
vector<reference<const IcebergEqualityDeleteRow>>
57+
GetEqualityDeletesForFile(const IcebergManifestEntry &data_file) const;
5658

5759
public:
5860
//! MultiFileList API

src/include/iceberg_multi_file_reader.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ struct IcebergMultiFileReader : public MultiFileReader {
6565
void ApplyEqualityDeletes(ClientContext &context, DataChunk &output_chunk,
6666
const IcebergMultiFileList &multi_file_list, const IcebergManifestEntry &data_file,
6767
const vector<MultiFileColumnDefinition> &local_columns,
68-
unordered_map<idx_t, idx_t> field_id_to_result_id);
68+
const unordered_map<idx_t, idx_t> &field_id_to_result_id);
6969
bool ParseOption(const string &key, const Value &val, MultiFileOptions &options, ClientContext &context) override;
7070

7171
public:

0 commit comments

Comments
 (0)