Skip to content

Commit 94a498d

Browse files
committed
remove the need to rewrite expressions
1 parent 3e318fc commit 94a498d

File tree

4 files changed

+42
-66
lines changed

4 files changed

+42
-66
lines changed

src/deletes/equality_delete.cpp

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -61,26 +61,28 @@ void IcebergMultiFileList::ScanEqualityDeleteFile(const IcebergManifestEntry &en
6161
id_to_global_column[col.identifier.GetValue<int32_t>()] = i;
6262
}
6363

64-
auto new_column_indexes = column_indexes;
65-
for (auto field_id : entry.equality_ids) {
66-
auto global_column_id = id_to_global_column[field_id];
67-
ColumnIndex equality_index(global_column_id);
68-
//! Check if the column needed by the equality delete is present
69-
if (std::find(column_indexes.begin(), column_indexes.end(), equality_index) == column_indexes.end()) {
70-
//! Column isn't being selected, add the column so it can be used for the equality delete
71-
new_column_indexes.push_back(equality_index);
72-
}
73-
}
74-
7564
unordered_map<idx_t, idx_t> global_id_to_result_id;
76-
for (idx_t i = 0; i < new_column_indexes.size(); i++) {
77-
auto &column_index = new_column_indexes[i];
65+
for (idx_t i = 0; i < column_indexes.size(); i++) {
66+
auto &column_index = column_indexes[i];
7867
if (column_index.IsVirtualColumn()) {
7968
continue;
8069
}
8170
auto global_id = column_index.GetPrimaryIndex();
8271
global_id_to_result_id[global_id] = i;
8372
}
73+
//! For the column(s) that are needed but aren't referenced, add them to the map
74+
for (auto field_id : entry.equality_ids) {
75+
auto global_column_id = id_to_global_column[field_id];
76+
ColumnIndex equality_index(global_column_id);
77+
//! Check if the column needed by the equality delete is present
78+
if (std::find(column_indexes.begin(), column_indexes.end(), equality_index) != column_indexes.end()) {
79+
continue;
80+
}
81+
auto new_result_id = column_indexes.size() + equality_id_to_result_id.size();
82+
//! Create or get the result id mapping for this equality id
83+
auto result_id = equality_id_to_result_id.emplace(field_id, new_result_id).first->second;
84+
global_id_to_result_id[global_column_id] = result_id;
85+
}
8486

8587
//! Take only the relevant columns from the result
8688
InitializeFromOtherChunk(result, result_p, column_ids);
@@ -97,13 +99,13 @@ void IcebergMultiFileList::ScanEqualityDeleteFile(const IcebergManifestEntry &en
9799

98100
auto it = global_id_to_result_id.find(global_column_id);
99101
D_ASSERT(it != global_id_to_result_id.end());
100-
global_column_id = it->second;
102+
auto result_column_id = it->second;
101103

102104
for (idx_t i = 0; i < count; i++) {
103105
auto &row = rows[i];
104106
auto constant = vec.GetValue(i);
105107
unique_ptr<Expression> equality_filter;
106-
auto bound_ref = make_uniq<BoundReferenceExpression>(col.type, global_column_id);
108+
auto bound_ref = make_uniq<BoundReferenceExpression>(col.type, result_column_id);
107109
if (!constant.IsNull()) {
108110
//! Create a COMPARE_NOT_EQUAL expression
109111
equality_filter =

src/iceberg_functions/iceberg_multi_file_reader.cpp

Lines changed: 22 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -286,12 +286,18 @@ ReaderInitializeType IcebergMultiFileReader::InitializeReader(MultiFileReaderDat
286286

287287
//! Add the columns needed by the equality deletes if not present
288288
auto new_global_column_ids = global_column_ids;
289+
auto &equality_to_result_id = multi_file_list.equality_id_to_result_id;
290+
new_global_column_ids.resize(global_column_ids.size() + equality_to_result_id.size());
289291
for (auto field_id : equality_delete_ids) {
292+
auto it = equality_to_result_id.find(field_id);
293+
if (it == equality_to_result_id.end()) {
294+
//! Already selected, no need to add
295+
continue;
296+
}
290297
auto global_column_id = id_to_global_column[field_id];
291298
ColumnIndex equality_index(global_column_id);
292-
if (std::find(global_column_ids.begin(), global_column_ids.end(), equality_index) == global_column_ids.end()) {
293-
new_global_column_ids.push_back(equality_index);
294-
}
299+
//! FIXME: is this correct?
300+
new_global_column_ids[it->second] = equality_index;
295301
}
296302

297303
return CreateMapping(context, reader_data, global_columns, new_global_column_ids, table_filters, gstate.file_list,
@@ -339,8 +345,7 @@ void IcebergMultiFileReader::FinalizeBind(MultiFileReaderData &reader_data, cons
339345
void IcebergMultiFileReader::ApplyEqualityDeletes(ClientContext &context, DataChunk &output_chunk,
340346
const IcebergMultiFileList &multi_file_list,
341347
const IcebergManifestEntry &data_file,
342-
const vector<MultiFileColumnDefinition> &local_columns,
343-
const unordered_map<idx_t, idx_t> &field_id_to_result_id) {
348+
const vector<MultiFileColumnDefinition> &local_columns) {
344349
auto delete_rows = multi_file_list.GetEqualityDeletesForFile(data_file);
345350

346351
if (delete_rows.empty()) {
@@ -384,25 +389,7 @@ void IcebergMultiFileReader::ApplyEqualityDeletes(ClientContext &context, DataCh
384389
}
385390
continue;
386391
}
387-
if (field_id_to_result_id.empty()) {
388-
equalities.push_back(expression->Copy());
389-
continue;
390-
}
391-
idx_t index = field_id_to_result_id.at(field_id);
392-
if (expression->type == ExpressionType::COMPARE_NOTEQUAL) {
393-
auto &expr = expression->Cast<BoundComparisonExpression>();
394-
auto bound_ref = make_uniq<BoundReferenceExpression>(expr.left->return_type, index);
395-
unique_ptr<Expression> equality_filter = make_uniq<BoundComparisonExpression>(
396-
ExpressionType::COMPARE_NOTEQUAL, std::move(bound_ref), expr.right->Copy());
397-
equalities.push_back(std::move(equality_filter));
398-
} else if (expression->type == ExpressionType::OPERATOR_IS_NOT_NULL) {
399-
auto &expr = expression->Cast<BoundOperatorExpression>();
400-
auto bound_ref = make_uniq<BoundReferenceExpression>(expr.children[0]->return_type, index);
401-
auto is_not_null =
402-
make_uniq<BoundOperatorExpression>(ExpressionType::OPERATOR_IS_NOT_NULL, LogicalType::BOOLEAN);
403-
is_not_null->children.push_back(std::move(bound_ref));
404-
equalities.push_back(std::move(is_not_null));
405-
}
392+
equalities.push_back(expression->Copy());
406393
}
407394

408395
unique_ptr<Expression> filter;
@@ -440,43 +427,29 @@ void IcebergMultiFileReader::FinalizeChunk(ClientContext &context, const MultiFi
440427
DataChunk &input_chunk, DataChunk &output_chunk,
441428
ExpressionExecutor &executor,
442429
optional_ptr<MultiFileReaderGlobalState> global_state) {
430+
D_ASSERT(global_state);
431+
// Get the metadata for this file
432+
const auto &multi_file_list = global_state->file_list->Cast<IcebergMultiFileList>();
443433

444434
//! Add the extra equality delete fields to output chunk.
445-
int32_t diff = 0;
446-
if (executor.expressions.size() != output_chunk.ColumnCount()) {
447-
diff = executor.expressions.size() - output_chunk.ColumnCount();
448-
for (int32_t i = diff; i > 0; i--) {
449-
int32_t index = input_chunk.ColumnCount() - i;
450-
output_chunk.data.emplace_back(input_chunk.data[index]);
435+
idx_t diff = executor.expressions.size() - output_chunk.ColumnCount();
436+
(void)diff;
437+
D_ASSERT(diff == multi_file_list.equality_id_to_result_id.size());
438+
if (diff > 0) {
439+
int32_t start = input_chunk.ColumnCount() - diff;
440+
for (int32_t i = 0; i < diff; i++) {
441+
output_chunk.data.emplace_back(input_chunk.data[start + i]);
451442
}
452443
}
453444

454445
//! Base class finalization first
455446
MultiFileReader::FinalizeChunk(context, bind_data, reader, reader_data, input_chunk, output_chunk, executor,
456447
global_state);
457448

458-
//! Map from index into local_columns -> field_id
459449
auto &local_columns = reader.columns;
460-
unordered_map<idx_t, idx_t> column_index_to_field_id;
461-
for (idx_t i = 0; i < local_columns.size(); i++) {
462-
auto &col = local_columns[i];
463-
column_index_to_field_id[i] = col.identifier.GetValue<int32_t>();
464-
}
465-
466-
//! Map from field_id -> index in 'output_chunk'
467-
unordered_map<idx_t, idx_t> field_id_to_result_id;
468-
auto &column_indexes = reader.column_indexes;
469-
auto result_id = executor.expressions.size() - column_indexes.size();
470-
for (auto &column_index : column_indexes) {
471-
field_id_to_result_id[column_index_to_field_id[column_index.GetPrimaryIndex()]] = result_id++;
472-
}
473-
474-
D_ASSERT(global_state);
475-
// Get the metadata for this file
476-
const auto &multi_file_list = dynamic_cast<const IcebergMultiFileList &>(*global_state->file_list);
477450
auto file_id = reader.file_list_idx.GetIndex();
478451
auto &data_file = multi_file_list.data_files[file_id];
479-
ApplyEqualityDeletes(context, output_chunk, multi_file_list, data_file, local_columns, field_id_to_result_id);
452+
ApplyEqualityDeletes(context, output_chunk, multi_file_list, data_file, local_columns);
480453

481454
//! Remove the extra columns we added to perform the equality delete filtering
482455
for (idx_t i = 0; i < diff; i++) {

src/include/iceberg_multi_file_list.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ struct IcebergMultiFileList : public MultiFileList {
119119
//! All equality deletes with sequence numbers higher than that of the data_file apply to that data_file
120120
mutable map<sequence_number_t, unique_ptr<IcebergEqualityDeleteData>> equality_delete_data;
121121
mutable mutex delete_lock;
122+
//! The columns needed by the equality deletes that aren't referenced by the scan
123+
mutable unordered_map<int32_t, column_t> equality_id_to_result_id;
122124

123125
bool initialized = false;
124126
const IcebergOptions &options;

src/include/iceberg_multi_file_reader.hpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,7 @@ struct IcebergMultiFileReader : public MultiFileReader {
6464
ExpressionExecutor &executor, optional_ptr<MultiFileReaderGlobalState> global_state) override;
6565
void ApplyEqualityDeletes(ClientContext &context, DataChunk &output_chunk,
6666
const IcebergMultiFileList &multi_file_list, const IcebergManifestEntry &data_file,
67-
const vector<MultiFileColumnDefinition> &local_columns,
68-
const unordered_map<idx_t, idx_t> &field_id_to_result_id);
67+
const vector<MultiFileColumnDefinition> &local_columns);
6968
bool ParseOption(const string &key, const Value &val, MultiFileOptions &options, ClientContext &context) override;
7069

7170
public:

0 commit comments

Comments
 (0)