Skip to content

Commit 0de4979

Browse files
authored
Merge pull request duckdb#74 from motherduckdb/pb/explicit-iceberg-cardinality
exploit iceberg row-count metadata
2 parents 51bacf0 + f7bc1de commit 0de4979

File tree

2 files changed

+32
-3
lines changed

2 files changed

+32
-3
lines changed

src/iceberg_functions/iceberg_scan.cpp

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,18 +129,26 @@ static Value GetParquetSchemaParam(vector<IcebergColumnDefinition> &schema) {
129129

130130
//! Build the Parquet Scan expression for the files we need to scan
131131
static unique_ptr<TableRef> MakeScanExpression(vector<Value> &data_file_values, vector<Value> &delete_file_values,
132-
vector<IcebergColumnDefinition> &schema, bool allow_moved_paths, string metadata_compression_codec, bool skip_schema_inference) {
132+
vector<IcebergColumnDefinition> &schema, bool allow_moved_paths,
133+
string metadata_compression_codec, bool skip_schema_inference,
134+
int64_t data_cardinality, int64_t delete_cardinality) {
135+
136+
auto cardinality = make_uniq<ComparisonExpression>(ExpressionType::COMPARE_EQUAL, make_uniq<ColumnRefExpression>("explicit_cardinality"),
137+
make_uniq<ConstantExpression>(Value(data_cardinality)));
138+
133139
// No deletes, just return a TableFunctionRef for a parquet scan of the data files
134140
if (delete_file_values.empty()) {
135141
auto table_function_ref_data = make_uniq<TableFunctionRef>();
136142
table_function_ref_data->alias = "iceberg_scan_data";
137143
vector<unique_ptr<ParsedExpression>> left_children;
138144
left_children.push_back(make_uniq<ConstantExpression>(Value::LIST(data_file_values)));
145+
left_children.push_back(std::move(cardinality));
139146
if (!skip_schema_inference) {
140147
left_children.push_back(
141148
make_uniq<ComparisonExpression>(ExpressionType::COMPARE_EQUAL, make_uniq<ColumnRefExpression>("schema"),
142149
make_uniq<ConstantExpression>(GetParquetSchemaParam(schema))));
143150
}
151+
144152
table_function_ref_data->function = make_uniq<FunctionExpression>("parquet_scan", std::move(left_children));
145153
return std::move(table_function_ref_data);
146154
}
@@ -165,6 +173,7 @@ static unique_ptr<TableRef> MakeScanExpression(vector<Value> &data_file_values,
165173
table_function_ref_data->alias = "iceberg_scan_data";
166174
vector<unique_ptr<ParsedExpression>> left_children;
167175
left_children.push_back(make_uniq<ConstantExpression>(Value::LIST(data_file_values)));
176+
left_children.push_back(std::move(cardinality));
168177
left_children.push_back(make_uniq<ComparisonExpression>(ExpressionType::COMPARE_EQUAL,
169178
make_uniq<ColumnRefExpression>("filename"),
170179
make_uniq<ConstantExpression>(Value(1))));
@@ -184,6 +193,8 @@ static unique_ptr<TableRef> MakeScanExpression(vector<Value> &data_file_values,
184193
table_function_ref_deletes->alias = "iceberg_scan_deletes";
185194
vector<unique_ptr<ParsedExpression>> right_children;
186195
right_children.push_back(make_uniq<ConstantExpression>(Value::LIST(delete_file_values)));
196+
right_children.push_back(make_uniq<ComparisonExpression>(ExpressionType::COMPARE_EQUAL, make_uniq<ColumnRefExpression>("explicit_cardinality"),
197+
make_uniq<ConstantExpression>(Value(delete_cardinality))));
187198
table_function_ref_deletes->function = make_uniq<FunctionExpression>("parquet_scan", std::move(right_children));
188199
join_node->right = std::move(table_function_ref_deletes);
189200

@@ -269,7 +280,19 @@ static unique_ptr<TableRef> IcebergScanBindReplace(ClientContext &context, Table
269280
if (mode == "list_files") {
270281
return MakeListFilesExpression(data_file_values, delete_file_values);
271282
} else if (mode == "default") {
272-
return MakeScanExpression(data_file_values, delete_file_values, snapshot_to_scan.schema, allow_moved_paths, metadata_compression_codec, skip_schema_inference);
283+
int64_t data_cardinality = 0, delete_cardinality = 0;
284+
for(auto &manifest : iceberg_table.entries) {
285+
for(auto &entry : manifest.manifest_entries) {
286+
if (entry.status != IcebergManifestEntryStatusType::DELETED) {
287+
if (entry.content == IcebergManifestEntryContentType::DATA) {
288+
data_cardinality += entry.record_count;
289+
} else { // DELETES
290+
delete_cardinality += entry.record_count;
291+
}
292+
}
293+
}
294+
}
295+
return MakeScanExpression(data_file_values, delete_file_values, snapshot_to_scan.schema, allow_moved_paths, metadata_compression_codec, skip_schema_inference, data_cardinality, delete_cardinality);
273296
} else {
274297
throw NotImplementedException("Unknown mode type for ICEBERG_SCAN bind : '" + mode + "'");
275298
}

test/sql/iceberg_scan_generated_data_0_001.test

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# name: test/sql/iceberg_scan_generated_data_0_001.test_slow
1+
# name: test/sql/iceberg_scan_generated_data_0_001.test
22
# description: test iceberg extension with the sf0.001 generated test set
33
# group: [iceberg]
44

@@ -106,3 +106,9 @@ statement error
106106
DESCRIBE SELECT schema_evol_added_col_1 FROM ICEBERG_SCAN('data/iceberg/generated_spec2_0_001/pyspark_iceberg_table/metadata/v6.metadata.json') ORDER BY uuid;
107107
----
108108
Binder Error
109+
110+
# Check that there are injected cardinality
111+
query II
112+
EXPLAIN SELECT count(*) FROM ICEBERG_SCAN('data/iceberg/generated_spec2_0_001/pyspark_iceberg_table');
113+
----
114+
physical_plan <REGEX>:.* ANTI .*PARQUET_SCAN.*Rows.*Rows.*

0 commit comments

Comments
 (0)