diff --git a/data/persistent/expression_filter/data/00000-0-1406cdaa-c3e4-4e6d-a22b-d85e4a813169-00001.parquet b/data/persistent/expression_filter/data/00000-0-1406cdaa-c3e4-4e6d-a22b-d85e4a813169-00001.parquet new file mode 100644 index 00000000..6fc304b5 Binary files /dev/null and b/data/persistent/expression_filter/data/00000-0-1406cdaa-c3e4-4e6d-a22b-d85e4a813169-00001.parquet differ diff --git a/data/persistent/expression_filter/metadata/00000-acdf842e-3a9d-4b9b-ad87-daf78583a550.metadata.json b/data/persistent/expression_filter/metadata/00000-acdf842e-3a9d-4b9b-ad87-daf78583a550.metadata.json new file mode 100644 index 00000000..dadfb1d6 --- /dev/null +++ b/data/persistent/expression_filter/metadata/00000-acdf842e-3a9d-4b9b-ad87-daf78583a550.metadata.json @@ -0,0 +1 @@ +{"format-version":2,"table-uuid":"d6293bed-4757-4504-9342-f69a447b7759","location":"data/persistent/expression_filter","last-sequence-number":0,"last-updated-ms":1757676428493,"last-column-id":2,"current-schema-id":0,"schemas":[{"type":"struct","schema-id":0,"fields":[{"id":1,"name":"id","required":true,"type":"long"},{"id":2,"name":"value","required":false,"type":"string"}]}],"default-spec-id":0,"partition-specs":[{"spec-id":0,"fields":[]}],"last-partition-id":999,"default-sort-order-id":0,"sort-orders":[{"order-id":0,"fields":[]}],"properties":{"write.parquet.compression-codec":"zstd"},"current-snapshot-id":-1,"refs":{},"snapshots":[],"statistics":[],"partition-statistics":[],"snapshot-log":[],"metadata-log":[]} \ No newline at end of file diff --git a/data/persistent/expression_filter/metadata/00001-19739cda-f528-4429-84cc-377ffdd24c75.metadata.json b/data/persistent/expression_filter/metadata/00001-19739cda-f528-4429-84cc-377ffdd24c75.metadata.json new file mode 100644 index 00000000..d89b3360 --- /dev/null +++ b/data/persistent/expression_filter/metadata/00001-19739cda-f528-4429-84cc-377ffdd24c75.metadata.json @@ -0,0 +1 @@ +{"format-version":2,"table-uuid":"d6293bed-4757-4504-9342-f69a447b7759","location":"data/persistent/expression_filter","last-sequence-number":1,"last-updated-ms":1757676429141,"last-column-id":2,"current-schema-id":0,"schemas":[{"type":"struct","schema-id":0,"fields":[{"id":1,"name":"id","required":true,"type":"long"},{"id":2,"name":"value","required":false,"type":"string"}]}],"default-spec-id":0,"partition-specs":[{"spec-id":0,"fields":[]}],"last-partition-id":999,"default-sort-order-id":0,"sort-orders":[{"order-id":0,"fields":[]}],"properties":{"write.parquet.compression-codec":"zstd"},"current-snapshot-id":8096310958539014181,"refs":{"main":{"snapshot-id":8096310958539014181,"type":"branch"}},"snapshots":[{"sequence-number":1,"snapshot-id":8096310958539014181,"timestamp-ms":1757676429141,"summary":{"operation":"append","added-data-files":"1","added-records":"3","added-files-size":"705","changed-partition-count":"1","total-records":"3","total-files-size":"705","total-data-files":"1","total-delete-files":"0","total-position-deletes":"0","total-equality-deletes":"0","iceberg-version":"Apache Iceberg 1.9.2 (commit 071d5606bc6199a0be9b3f274ec7fbf111d88821)"},"manifest-list":"data/persistent/expression_filter/metadata/snap-8096310958539014181-1-8d30f58e-7333-4451-983d-eaf657a21a11.avro","schema-id":0}],"statistics":[],"partition-statistics":[],"snapshot-log":[{"timestamp-ms":1757676429141,"snapshot-id":8096310958539014181}],"metadata-log":[{"timestamp-ms":1757676428493,"metadata-file":"data/persistent/expression_filter/metadata/00000-acdf842e-3a9d-4b9b-ad87-daf78583a550.metadata.json"}]} \ No newline at end of file diff --git a/data/persistent/expression_filter/metadata/8d30f58e-7333-4451-983d-eaf657a21a11-m0.avro b/data/persistent/expression_filter/metadata/8d30f58e-7333-4451-983d-eaf657a21a11-m0.avro new file mode 100644 index 00000000..84415555 Binary files /dev/null and b/data/persistent/expression_filter/metadata/8d30f58e-7333-4451-983d-eaf657a21a11-m0.avro differ diff --git a/data/persistent/expression_filter/metadata/snap-8096310958539014181-1-8d30f58e-7333-4451-983d-eaf657a21a11.avro b/data/persistent/expression_filter/metadata/snap-8096310958539014181-1-8d30f58e-7333-4451-983d-eaf657a21a11.avro new file mode 100644 index 00000000..6d39862f Binary files /dev/null and b/data/persistent/expression_filter/metadata/snap-8096310958539014181-1-8d30f58e-7333-4451-983d-eaf657a21a11.avro differ diff --git a/data/persistent/expression_filter/metadata/version-hint.text b/data/persistent/expression_filter/metadata/version-hint.text new file mode 100644 index 00000000..d5cccecd --- /dev/null +++ b/data/persistent/expression_filter/metadata/version-hint.text @@ -0,0 +1 @@ +00001-19739cda-f528-4429-84cc-377ffdd24c75 \ No newline at end of file diff --git a/src/aws.cpp b/src/aws.cpp index 413e16a7..8f796ea6 100644 --- a/src/aws.cpp +++ b/src/aws.cpp @@ -122,6 +122,18 @@ std::shared_ptr AWSInput::CreateSignedRequest(Aws::Http: // return request; } +static string GetPayloadHash(const char *buffer, idx_t buffer_len) { + if (buffer_len > 0) { + hash_bytes payload_hash_bytes; + hash_str payload_hash_str; + sha256(buffer, buffer_len, payload_hash_bytes); + hex256(payload_hash_bytes, payload_hash_str); + return string((char *)payload_hash_str, sizeof(payload_hash_str)); + } else { + return ""; + } +} + unique_ptr AWSInput::ExecuteRequest(ClientContext &context, Aws::Http::HttpMethod method, const string body, string content_type) { @@ -140,6 +152,11 @@ unique_ptr AWSInput::ExecuteRequest(ClientContext &context, Aws::H // If access key is not set, we don't set the headers at all to allow accessing public files through s3 urls string payload_hash = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"; // Empty payload hash + + if (!body.empty()) { + payload_hash = GetPayloadHash(body.c_str(), body.size()); + } + // key_id, secret, session_token // we can pass date/time but this is mostly useful in testing. normally we just get the current datetime // here. @@ -161,6 +178,9 @@ unique_ptr AWSInput::ExecuteRequest(ClientContext &context, Aws::H hash_str canonical_request_hash_str; if (content_type.length() > 0) { signed_headers += "content-type;"; +#ifdef EMSCRIPTEN + res["content-type"] = content_type; +#endif } signed_headers += "host;x-amz-content-sha256;x-amz-date"; if (session_token.length() > 0) { @@ -244,19 +264,14 @@ unique_ptr AWSInput::ExecuteRequest(ClientContext &context, Aws::H params = http_util.InitializeParameters(context, request_url); - if (!body.empty()) { - throw NotImplementedException("CreateSignedRequest with non-empty body is not supported at this time"); - /* - auto bodyStream = Aws::MakeShared(""); - *bodyStream << body; - request->AddContentBody(bodyStream); - request->SetContentLength(std::to_string(body.size())); - if (!content_type.empty()) { - request->SetHeaderValue("Content-Type", content_type); - } - */ + if (method == Aws::Http::HttpMethod::HTTP_HEAD) { + HeadRequestInfo head_request(request_url, res, *params); + return http_util.Request(head_request); + } + if (method == Aws::Http::HttpMethod::HTTP_DELETE) { + DeleteRequestInfo delete_request(request_url, res, *params); + return http_util.Request(delete_request); } - if (method == Aws::Http::HttpMethod::HTTP_GET) { GetRequestInfo get_request(request_url, res, *params, nullptr, nullptr); return http_util.Request(get_request); diff --git a/src/iceberg_predicate.cpp b/src/iceberg_predicate.cpp index db9031ff..60a563a8 100644 --- a/src/iceberg_predicate.cpp +++ b/src/iceberg_predicate.cpp @@ -1,4 +1,5 @@ #include "iceberg_predicate.hpp" +#include "duckdb/planner/expression/bound_operator_expression.hpp" #include "duckdb/planner/filter/constant_filter.hpp" #include "duckdb/planner/filter/conjunction_filter.hpp" #include "duckdb/planner/filter/null_filter.hpp" @@ -80,16 +81,34 @@ bool MatchBoundsTemplated(const TableFilter &filter, const IcebergPredicateStats return MatchBoundsIsNotNullFilter(stats, transform); } case TableFilterType::EXPRESSION_FILTER: { + //! Expressions can be arbitrarily complex, and we currently only support IS NULL/IS NOT NULL checks against the + //! column itself, i.e. where the expression is a BOUND_OPERATOR with type OPERATOR_IS_NULL/_IS_NOT_NULL with a + //! single child expression of type BOUND_REF. + //! + //! See duckdb/duckdb-iceberg#464 auto &expression_filter = filter.Cast(); auto &expr = *expression_filter.expr; + + if (expr.type != ExpressionType::OPERATOR_IS_NULL && expr.type != ExpressionType::OPERATOR_IS_NOT_NULL) { + return true; + } + + D_ASSERT(expr.GetExpressionClass() == ExpressionClass::BOUND_OPERATOR); + auto &bound_operator_expr = expr.Cast(); + + D_ASSERT(bound_operator_expr.children.size() == 1); + auto &child_expr = bound_operator_expr.children[0]; + if (child_expr->type != ExpressionType::BOUND_REF) { + //! We can't evaluate expressions that aren't direct column references + return true; + } + if (expr.type == ExpressionType::OPERATOR_IS_NULL) { return MatchBoundsIsNullFilter(stats, transform); - } - if (expr.type == ExpressionType::OPERATOR_IS_NOT_NULL) { + } else { + D_ASSERT(expr.type == ExpressionType::OPERATOR_IS_NOT_NULL); return MatchBoundsIsNotNullFilter(stats, transform); } - //! Any other expression can not be filtered - return true; } default: //! Conservative approach: we don't know what this is, just say it doesn't filter anything diff --git a/src/include/storage/iceberg_table_information.hpp b/src/include/storage/iceberg_table_information.hpp index 3c0ea2a4..9ed9240e 100644 --- a/src/include/storage/iceberg_table_information.hpp +++ b/src/include/storage/iceberg_table_information.hpp @@ -45,11 +45,12 @@ struct IcebergTableInformation { IRCSchemaEntry &schema; string name; string table_id; - // bool deleted; rest_api_objects::LoadTableResult load_table_result; IcebergTableMetadata table_metadata; unordered_map> schema_versions; + // dummy entry to hold existence of a table, but no schema versions + unique_ptr dummy_entry; public: unique_ptr transaction_data; diff --git a/src/include/storage/irc_catalog.hpp b/src/include/storage/irc_catalog.hpp index d6f8b8be..f84b0359 100644 --- a/src/include/storage/irc_catalog.hpp +++ b/src/include/storage/irc_catalog.hpp @@ -48,6 +48,17 @@ class IRCatalog : public Catalog { bool SetCachedValue(const string &url, const string &value, const rest_api_objects::LoadTableResult &result); static void SetAWSCatalogOptions(IcebergAttachOptions &attach_options, case_insensitive_set_t &set_by_attach_options); + //! Whether or not this catalog should search a specific type with the standard priority + CatalogLookupBehavior CatalogTypeLookupRule(CatalogType type) const override { + switch (type) { + case CatalogType::TABLE_FUNCTION_ENTRY: + case CatalogType::SCALAR_FUNCTION_ENTRY: + case CatalogType::AGGREGATE_FUNCTION_ENTRY: + return CatalogLookupBehavior::NEVER_LOOKUP; + default: + return CatalogLookupBehavior::STANDARD; + } + } public: static unique_ptr Attach(optional_ptr storage_info, ClientContext &context, diff --git a/src/storage/irc_table_set.cpp b/src/storage/irc_table_set.cpp index f1a9c22c..07af1855 100644 --- a/src/storage/irc_table_set.cpp +++ b/src/storage/irc_table_set.cpp @@ -31,7 +31,6 @@ ICTableSet::ICTableSet(IRCSchemaEntry &schema) : schema(schema), catalog(schema. bool ICTableSet::FillEntry(ClientContext &context, IcebergTableInformation &table) { if (!table.schema_versions.empty()) { - //! Already filled return true; } @@ -67,13 +66,31 @@ void ICTableSet::Scan(ClientContext &context, const std::functionCast(); + callback(optional); + continue; } + + // create a table entry with fake schema data to avoid calling the LoadTableInformation endpoint for every + // table while listing schemas + CreateTableInfo info(schema, table_info.name); + vector columns; + auto col = ColumnDefinition(string("__"), LogicalType::UNKNOWN); + columns.push_back(std::move(col)); + info.columns = ColumnList(std::move(columns)); + auto table_entry = make_uniq(table_info, catalog, schema, info); + if (!table_entry->internal) { + table_entry->internal = schema.internal; + } + auto result = table_entry.get(); + if (result->name.empty()) { + throw InternalException("ICTableSet::CreateEntry called with empty name"); + } + table_info.dummy_entry = std::move(table_entry); + auto &optional = table_info.dummy_entry.get()->Cast(); + callback(optional); } // erase not iceberg tables for (auto &entry : non_iceberg_tables) { diff --git a/test/sql/local/iceberg_scans/expression_filter.test b/test/sql/local/iceberg_scans/expression_filter.test new file mode 100644 index 00000000..e78dee42 --- /dev/null +++ b/test/sql/local/iceberg_scans/expression_filter.test @@ -0,0 +1,40 @@ +# name: test/sql/local/iceberg_scans/expression_filter.test +# group: [iceberg_scans] + +require-env DUCKDB_ICEBERG_HAVE_GENERATED_DATA + +require avro + +require parquet + +require iceberg + +statement ok +attach ':memory:' as my_datalake; + +statement ok +create schema my_datalake.default; + +statement ok +create view my_datalake.default.expression_filter as select * from ICEBERG_SCAN('__WORKING_DIRECTORY__/data/persistent/expression_filter'); + +# baseline: select all rows +query II +SELECT * FROM my_datalake.default.expression_filter ORDER BY id ASC; +---- +1 foo +2 bar +3 baz + +# CASE expression should not exclude any data files (prior to #464, IS NULL on CASE expression column resulted in exclusion) +query II +SELECT id, CASE WHEN value = 'foo' THEN 'not null' ELSE NULL END AS role FROM my_datalake.default.expression_filter WHERE role IS NULL ORDER BY id ASC; +---- +2 NULL +3 NULL + +# Complement to previous query +query II +SELECT id, CASE WHEN value = 'foo' THEN 'not null' ELSE NULL END AS role FROM my_datalake.default.expression_filter WHERE role IS NOT NULL ORDER BY id ASC; +---- +1 not null diff --git a/test/sql/local/irc/iceberg_catalog_read.test b/test/sql/local/irc/iceberg_catalog_read.test index dc1d7dfe..6fcd215b 100644 --- a/test/sql/local/irc/iceberg_catalog_read.test +++ b/test/sql/local/irc/iceberg_catalog_read.test @@ -19,7 +19,7 @@ statement ok CALL enable_logging('HTTP'); statement ok -set logging_level='debug' +set logging_level='debug'; statement ok CREATE SECRET ( diff --git a/test/sql/local/irc/test_duckdb_catalog_functions_and_iceberg.test b/test/sql/local/irc/test_duckdb_catalog_functions_and_iceberg.test index 2841b937..921d5e25 100644 --- a/test/sql/local/irc/test_duckdb_catalog_functions_and_iceberg.test +++ b/test/sql/local/irc/test_duckdb_catalog_functions_and_iceberg.test @@ -51,21 +51,18 @@ select count(*) from duckdb_logs_parsed('HTTP'); query I select count(*) from duckdb_logs_parsed('HTTP'); ---- -5 +3 statement ok use memory; -# 3 more requests are made, -# 2 from previous duckdb_logs_parsed call for 'main', 'default', -# and 1 for 'memory' -# requests no longer go up +# namespace 'memory' is looked up in the iceberg catalog query I select count(*) from duckdb_logs_parsed('HTTP'); ---- -8 +4 query I select count(*) from duckdb_logs_parsed('HTTP'); ---- -8 \ No newline at end of file +4 \ No newline at end of file diff --git a/test/sql/local/irc/test_table_information_requests.test b/test/sql/local/irc/test_table_information_requests.test new file mode 100644 index 00000000..19f8a270 --- /dev/null +++ b/test/sql/local/irc/test_table_information_requests.test @@ -0,0 +1,110 @@ +# name: test/sql/local/irc/test_table_information_requests.test +# description: test integration with iceberg catalog read +# group: [irc] + +require-env ICEBERG_SERVER_AVAILABLE + +require avro + +require parquet + +require iceberg + +require httpfs + +# Do not ignore 'HTTP' error messages! +set ignore_error_messages + +statement ok +set enable_logging=true + +statement ok +set logging_level='debug' + +statement ok +CALL enable_logging('HTTP'); + +statement ok +CREATE SECRET ( + TYPE S3, + KEY_ID 'admin', + SECRET 'password', + ENDPOINT '127.0.0.1:9000', + URL_STYLE 'path', + USE_SSL 0 +); + + +statement ok +ATTACH '' AS my_datalake ( + TYPE ICEBERG, + CLIENT_ID 'admin', + CLIENT_SECRET 'password', + ENDPOINT 'http://127.0.0.1:8181' +); + +query I +select count(*) > 10 from (show all tables); +---- +1 + +# 1 call for oath, 1 call for config +# 1 call to list namespaces +# 1 call to list tables in default +# 1 call to list tables in level1 namespace (no recursive namespace calls) +query I +select count(*) from duckdb_logs_parsed('HTTP'); +---- +5 + +statement ok +call truncate_duckdb_logs(); + +query II +select column_name, column_type from (describe my_datalake.default.supplier); +---- +s_suppkey BIGINT +s_name VARCHAR +s_address VARCHAR +s_nationkey INTEGER +s_phone VARCHAR +s_acctbal DECIMAL(15,2) +s_comment VARCHAR + +# one request to verify the default schema +# another request to verify table default.supplier +# another request to the table information endpoint +# FIXME: apparantly there is also a request to an avro file +query I +select count(*) from duckdb_logs_parsed('HTTP'); +---- +4 + +statement ok +begin; + +statement ok +show all tables; + +query I +select distinct(s_nationkey) from my_datalake.default.supplier order by all limit 5; +---- +0 +1 +2 +3 +4 + +statement ok +commit; + +# 5 calls to list the namespaces +# 1 call the the GetTableInformationEndpoint for supploer +# (FIXME) 1 call to an avro file in the warehouse +# 1 call to the manifest file +# 1 call to the manifest list +# 2 calls to read parquet files +query I +select count(*) from duckdb_logs_parsed('HTTP'); +---- +11 \ No newline at end of file diff --git a/test/sql/local/test_iceberg_and_ducklake.test b/test/sql/local/test_iceberg_and_ducklake.test new file mode 100644 index 00000000..cc51ffea --- /dev/null +++ b/test/sql/local/test_iceberg_and_ducklake.test @@ -0,0 +1,56 @@ +# name: test/sql/local/test_iceberg_and_ducklake.test +# description: test integration with iceberg catalog read +# group: [local] + +require-env ICEBERG_SERVER_AVAILABLE + +require avro + +require parquet + +require iceberg + +require httpfs + +require ducklake + +# Do not ignore 'HTTP' error messages! +set ignore_error_messages + +statement ok +pragma threads=1; + +statement ok +CALL enable_logging('HTTP'); + +statement ok +set logging_level='debug'; + +statement ok +CREATE SECRET ( + TYPE S3, + KEY_ID 'admin', + SECRET 'password', + ENDPOINT '127.0.0.1:9000', + URL_STYLE 'path', + USE_SSL 0 +); + + +statement ok +ATTACH '' AS my_datalake ( + TYPE ICEBERG, + CLIENT_ID 'admin', + CLIENT_SECRET 'password', + ENDPOINT 'http://127.0.0.1:8181' +); + +statement ok +ATTACH 'ducklake:duckdb:__TEST_DIR__/ducklake.duckdb' as my_ducklake (DATA_PATH '__TEST_DIR__/data_path'); + +# 2 requests to the iceberg catalog for oauth and config +# 3 requests when attaching ducklake because a ducklake attach calls from duckdb_tables() +query I +select count(*) from duckdb_logs_parsed('HTTP'); +---- +5 \ No newline at end of file