Skip to content

Commit 3f2410e

Browse files
peterpeter
authored andcommitted
exploit iceberg row-count metadata
Examine the row-counts in the manifest and count how many rows are in the (existing or added) data an deletion files. Use these two counts to pass the new named_parameter "explicit_cardinality" into the generated parquet_scans. Note that because iceberg is passing in an explicit schema into the parquet_scan, it does not open any file during bind. That means the parquet_scans it generates did not have any cardinality information during query optimization. That can cause rather bad query plans. This PR (pb/explicit-iceberg-cardinality) fixes that. Note that this PR needs the DuckDB PR pb/exicit-parquet-cardinality that adds the "explicit_cardinality" named_parameter to parquet_scan.
1 parent 3f6d753 commit 3f2410e

File tree

1 file changed

+25
-2
lines changed

1 file changed

+25
-2
lines changed

src/iceberg_functions/iceberg_scan.cpp

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,18 +129,26 @@ static Value GetParquetSchemaParam(vector<IcebergColumnDefinition> &schema) {
129129

130130
//! Build the Parquet Scan expression for the files we need to scan
131131
static unique_ptr<TableRef> MakeScanExpression(vector<Value> &data_file_values, vector<Value> &delete_file_values,
132-
vector<IcebergColumnDefinition> &schema, bool allow_moved_paths, string metadata_compression_codec, bool skip_schema_inference) {
132+
vector<IcebergColumnDefinition> &schema, bool allow_moved_paths,
133+
string metadata_compression_codec, bool skip_schema_inference,
134+
int64_t data_cardinality, int64_t delete_cardinality) {
135+
136+
auto cardinality = make_uniq<ComparisonExpression>(ExpressionType::COMPARE_EQUAL, make_uniq<ColumnRefExpression>("explicit_cardinality"),
137+
make_uniq<ConstantExpression>(Value(data_cardinality)));
138+
133139
// No deletes, just return a TableFunctionRef for a parquet scan of the data files
134140
if (delete_file_values.empty()) {
135141
auto table_function_ref_data = make_uniq<TableFunctionRef>();
136142
table_function_ref_data->alias = "iceberg_scan_data";
137143
vector<unique_ptr<ParsedExpression>> left_children;
138144
left_children.push_back(make_uniq<ConstantExpression>(Value::LIST(data_file_values)));
145+
left_children.push_back(move(cardinality));
139146
if (!skip_schema_inference) {
140147
left_children.push_back(
141148
make_uniq<ComparisonExpression>(ExpressionType::COMPARE_EQUAL, make_uniq<ColumnRefExpression>("schema"),
142149
make_uniq<ConstantExpression>(GetParquetSchemaParam(schema))));
143150
}
151+
144152
table_function_ref_data->function = make_uniq<FunctionExpression>("parquet_scan", std::move(left_children));
145153
return std::move(table_function_ref_data);
146154
}
@@ -165,6 +173,7 @@ static unique_ptr<TableRef> MakeScanExpression(vector<Value> &data_file_values,
165173
table_function_ref_data->alias = "iceberg_scan_data";
166174
vector<unique_ptr<ParsedExpression>> left_children;
167175
left_children.push_back(make_uniq<ConstantExpression>(Value::LIST(data_file_values)));
176+
left_children.push_back(move(cardinality));
168177
left_children.push_back(make_uniq<ComparisonExpression>(ExpressionType::COMPARE_EQUAL,
169178
make_uniq<ColumnRefExpression>("filename"),
170179
make_uniq<ConstantExpression>(Value(1))));
@@ -184,6 +193,8 @@ static unique_ptr<TableRef> MakeScanExpression(vector<Value> &data_file_values,
184193
table_function_ref_deletes->alias = "iceberg_scan_deletes";
185194
vector<unique_ptr<ParsedExpression>> right_children;
186195
right_children.push_back(make_uniq<ConstantExpression>(Value::LIST(delete_file_values)));
196+
right_children.push_back(make_uniq<ComparisonExpression>(ExpressionType::COMPARE_EQUAL, make_uniq<ColumnRefExpression>("explicit_cardinality"),
197+
make_uniq<ConstantExpression>(Value(delete_cardinality))));
187198
table_function_ref_deletes->function = make_uniq<FunctionExpression>("parquet_scan", std::move(right_children));
188199
join_node->right = std::move(table_function_ref_deletes);
189200

@@ -269,7 +280,19 @@ static unique_ptr<TableRef> IcebergScanBindReplace(ClientContext &context, Table
269280
if (mode == "list_files") {
270281
return MakeListFilesExpression(data_file_values, delete_file_values);
271282
} else if (mode == "default") {
272-
return MakeScanExpression(data_file_values, delete_file_values, snapshot_to_scan.schema, allow_moved_paths, metadata_compression_codec, skip_schema_inference);
283+
int64_t data_cardinality = 0, delete_cardinality = 0;
284+
for(auto &manifest : iceberg_table.entries) {
285+
for(auto &entry : manifest.manifest_entries) {
286+
if (entry.status != IcebergManifestEntryStatusType::DELETED) {
287+
if (entry.content == IcebergManifestEntryContentType::DATA) {
288+
data_cardinality += entry.record_count;
289+
} else { // DELETES
290+
delete_cardinality += entry.record_count;
291+
}
292+
}
293+
}
294+
}
295+
return MakeScanExpression(data_file_values, delete_file_values, snapshot_to_scan.schema, allow_moved_paths, metadata_compression_codec, skip_schema_inference, data_cardinality, delete_cardinality);
273296
} else {
274297
throw NotImplementedException("Unknown mode type for ICEBERG_SCAN bind : '" + mode + "'");
275298
}

0 commit comments

Comments
 (0)