Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"format-version":2,"table-uuid":"d6293bed-4757-4504-9342-f69a447b7759","location":"data/persistent/expression_filter","last-sequence-number":0,"last-updated-ms":1757676428493,"last-column-id":2,"current-schema-id":0,"schemas":[{"type":"struct","schema-id":0,"fields":[{"id":1,"name":"id","required":true,"type":"long"},{"id":2,"name":"value","required":false,"type":"string"}]}],"default-spec-id":0,"partition-specs":[{"spec-id":0,"fields":[]}],"last-partition-id":999,"default-sort-order-id":0,"sort-orders":[{"order-id":0,"fields":[]}],"properties":{"write.parquet.compression-codec":"zstd"},"current-snapshot-id":-1,"refs":{},"snapshots":[],"statistics":[],"partition-statistics":[],"snapshot-log":[],"metadata-log":[]}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"format-version":2,"table-uuid":"d6293bed-4757-4504-9342-f69a447b7759","location":"data/persistent/expression_filter","last-sequence-number":1,"last-updated-ms":1757676429141,"last-column-id":2,"current-schema-id":0,"schemas":[{"type":"struct","schema-id":0,"fields":[{"id":1,"name":"id","required":true,"type":"long"},{"id":2,"name":"value","required":false,"type":"string"}]}],"default-spec-id":0,"partition-specs":[{"spec-id":0,"fields":[]}],"last-partition-id":999,"default-sort-order-id":0,"sort-orders":[{"order-id":0,"fields":[]}],"properties":{"write.parquet.compression-codec":"zstd"},"current-snapshot-id":8096310958539014181,"refs":{"main":{"snapshot-id":8096310958539014181,"type":"branch"}},"snapshots":[{"sequence-number":1,"snapshot-id":8096310958539014181,"timestamp-ms":1757676429141,"summary":{"operation":"append","added-data-files":"1","added-records":"3","added-files-size":"705","changed-partition-count":"1","total-records":"3","total-files-size":"705","total-data-files":"1","total-delete-files":"0","total-position-deletes":"0","total-equality-deletes":"0","iceberg-version":"Apache Iceberg 1.9.2 (commit 071d5606bc6199a0be9b3f274ec7fbf111d88821)"},"manifest-list":"data/persistent/expression_filter/metadata/snap-8096310958539014181-1-8d30f58e-7333-4451-983d-eaf657a21a11.avro","schema-id":0}],"statistics":[],"partition-statistics":[],"snapshot-log":[{"timestamp-ms":1757676429141,"snapshot-id":8096310958539014181}],"metadata-log":[{"timestamp-ms":1757676428493,"metadata-file":"data/persistent/expression_filter/metadata/00000-acdf842e-3a9d-4b9b-ad87-daf78583a550.metadata.json"}]}
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
00001-19739cda-f528-4429-84cc-377ffdd24c75
39 changes: 27 additions & 12 deletions src/aws.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,18 @@ std::shared_ptr<Aws::Http::HttpRequest> AWSInput::CreateSignedRequest(Aws::Http:
// return request;
}

static string GetPayloadHash(const char *buffer, idx_t buffer_len) {
if (buffer_len > 0) {
hash_bytes payload_hash_bytes;
hash_str payload_hash_str;
sha256(buffer, buffer_len, payload_hash_bytes);
hex256(payload_hash_bytes, payload_hash_str);
return string((char *)payload_hash_str, sizeof(payload_hash_str));
} else {
return "";
}
}

unique_ptr<HTTPResponse> AWSInput::ExecuteRequest(ClientContext &context, Aws::Http::HttpMethod method,
const string body, string content_type) {

Expand All @@ -140,6 +152,11 @@ unique_ptr<HTTPResponse> AWSInput::ExecuteRequest(ClientContext &context, Aws::H
// If access key is not set, we don't set the headers at all to allow accessing public files through s3 urls

string payload_hash = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"; // Empty payload hash

if (!body.empty()) {
payload_hash = GetPayloadHash(body.c_str(), body.size());
}

// key_id, secret, session_token
// we can pass date/time but this is mostly useful in testing. normally we just get the current datetime
// here.
Expand All @@ -161,6 +178,9 @@ unique_ptr<HTTPResponse> AWSInput::ExecuteRequest(ClientContext &context, Aws::H
hash_str canonical_request_hash_str;
if (content_type.length() > 0) {
signed_headers += "content-type;";
#ifdef EMSCRIPTEN
res["content-type"] = content_type;
#endif
}
signed_headers += "host;x-amz-content-sha256;x-amz-date";
if (session_token.length() > 0) {
Expand Down Expand Up @@ -244,19 +264,14 @@ unique_ptr<HTTPResponse> AWSInput::ExecuteRequest(ClientContext &context, Aws::H

params = http_util.InitializeParameters(context, request_url);

if (!body.empty()) {
throw NotImplementedException("CreateSignedRequest with non-empty body is not supported at this time");
/*
auto bodyStream = Aws::MakeShared<Aws::StringStream>("");
*bodyStream << body;
request->AddContentBody(bodyStream);
request->SetContentLength(std::to_string(body.size()));
if (!content_type.empty()) {
request->SetHeaderValue("Content-Type", content_type);
}
*/
if (method == Aws::Http::HttpMethod::HTTP_HEAD) {
HeadRequestInfo head_request(request_url, res, *params);
return http_util.Request(head_request);
}
if (method == Aws::Http::HttpMethod::HTTP_DELETE) {
DeleteRequestInfo delete_request(request_url, res, *params);
return http_util.Request(delete_request);
}

if (method == Aws::Http::HttpMethod::HTTP_GET) {
GetRequestInfo get_request(request_url, res, *params, nullptr, nullptr);
return http_util.Request(get_request);
Expand Down
27 changes: 23 additions & 4 deletions src/iceberg_predicate.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "iceberg_predicate.hpp"
#include "duckdb/planner/expression/bound_operator_expression.hpp"
#include "duckdb/planner/filter/constant_filter.hpp"
#include "duckdb/planner/filter/conjunction_filter.hpp"
#include "duckdb/planner/filter/null_filter.hpp"
Expand Down Expand Up @@ -80,16 +81,34 @@ bool MatchBoundsTemplated(const TableFilter &filter, const IcebergPredicateStats
return MatchBoundsIsNotNullFilter<TRANSFORM>(stats, transform);
}
case TableFilterType::EXPRESSION_FILTER: {
//! Expressions can be arbitrarily complex, and we currently only support IS NULL/IS NOT NULL checks against the
//! column itself, i.e. where the expression is a BOUND_OPERATOR with type OPERATOR_IS_NULL/_IS_NOT_NULL with a
//! single child expression of type BOUND_REF.
//!
//! See duckdb/duckdb-iceberg#464
auto &expression_filter = filter.Cast<ExpressionFilter>();
auto &expr = *expression_filter.expr;

if (expr.type != ExpressionType::OPERATOR_IS_NULL && expr.type != ExpressionType::OPERATOR_IS_NOT_NULL) {
return true;
}

D_ASSERT(expr.GetExpressionClass() == ExpressionClass::BOUND_OPERATOR);
auto &bound_operator_expr = expr.Cast<BoundOperatorExpression>();

D_ASSERT(bound_operator_expr.children.size() == 1);
auto &child_expr = bound_operator_expr.children[0];
if (child_expr->type != ExpressionType::BOUND_REF) {
//! We can't evaluate expressions that aren't direct column references
return true;
}

if (expr.type == ExpressionType::OPERATOR_IS_NULL) {
return MatchBoundsIsNullFilter<TRANSFORM>(stats, transform);
}
if (expr.type == ExpressionType::OPERATOR_IS_NOT_NULL) {
} else {
D_ASSERT(expr.type == ExpressionType::OPERATOR_IS_NOT_NULL);
return MatchBoundsIsNotNullFilter<TRANSFORM>(stats, transform);
}
//! Any other expression can not be filtered
return true;
}
default:
//! Conservative approach: we don't know what this is, just say it doesn't filter anything
Expand Down
3 changes: 2 additions & 1 deletion src/include/storage/iceberg_table_information.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ struct IcebergTableInformation {
IRCSchemaEntry &schema;
string name;
string table_id;
// bool deleted;

rest_api_objects::LoadTableResult load_table_result;
IcebergTableMetadata table_metadata;
unordered_map<int32_t, unique_ptr<ICTableEntry>> schema_versions;
// dummy entry to hold existence of a table, but no schema versions
unique_ptr<ICTableEntry> dummy_entry;

public:
unique_ptr<IcebergTransactionData> transaction_data;
Expand Down
11 changes: 11 additions & 0 deletions src/include/storage/irc_catalog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ class IRCatalog : public Catalog {
bool SetCachedValue(const string &url, const string &value, const rest_api_objects::LoadTableResult &result);
static void SetAWSCatalogOptions(IcebergAttachOptions &attach_options,
case_insensitive_set_t &set_by_attach_options);
//! Whether or not this catalog should search a specific type with the standard priority
CatalogLookupBehavior CatalogTypeLookupRule(CatalogType type) const override {
switch (type) {
case CatalogType::TABLE_FUNCTION_ENTRY:
case CatalogType::SCALAR_FUNCTION_ENTRY:
case CatalogType::AGGREGATE_FUNCTION_ENTRY:
return CatalogLookupBehavior::NEVER_LOOKUP;
default:
return CatalogLookupBehavior::STANDARD;
}
}

public:
static unique_ptr<Catalog> Attach(optional_ptr<StorageExtensionInfo> storage_info, ClientContext &context,
Expand Down
31 changes: 24 additions & 7 deletions src/storage/irc_table_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ ICTableSet::ICTableSet(IRCSchemaEntry &schema) : schema(schema), catalog(schema.

bool ICTableSet::FillEntry(ClientContext &context, IcebergTableInformation &table) {
if (!table.schema_versions.empty()) {
//! Already filled
return true;
}

Expand Down Expand Up @@ -67,13 +66,31 @@ void ICTableSet::Scan(ClientContext &context, const std::function<void(CatalogEn
auto table_namespace = IRCAPI::GetEncodedSchemaName(schema.namespace_items);
for (auto &entry : entries) {
auto &table_info = entry.second;
if (FillEntry(context, table_info)) {
auto schema_id = table_info.table_metadata.current_schema_id;
callback(*table_info.schema_versions[schema_id]);
} else {
DUCKDB_LOG(context, IcebergLogType, "Table %s.%s not an Iceberg Table", table_namespace, entry.first);
non_iceberg_tables.insert(entry.first);
if (table_info.dummy_entry) {
// FIXME: why do we need to return the same entry again?
auto &optional = table_info.dummy_entry.get()->Cast<CatalogEntry>();
callback(optional);
continue;
}

// create a table entry with fake schema data to avoid calling the LoadTableInformation endpoint for every
// table while listing schemas
CreateTableInfo info(schema, table_info.name);
vector<ColumnDefinition> columns;
auto col = ColumnDefinition(string("__"), LogicalType::UNKNOWN);
columns.push_back(std::move(col));
info.columns = ColumnList(std::move(columns));
auto table_entry = make_uniq<ICTableEntry>(table_info, catalog, schema, info);
if (!table_entry->internal) {
table_entry->internal = schema.internal;
}
auto result = table_entry.get();
if (result->name.empty()) {
throw InternalException("ICTableSet::CreateEntry called with empty name");
}
table_info.dummy_entry = std::move(table_entry);
auto &optional = table_info.dummy_entry.get()->Cast<CatalogEntry>();
callback(optional);
}
// erase not iceberg tables
for (auto &entry : non_iceberg_tables) {
Expand Down
40 changes: 40 additions & 0 deletions test/sql/local/iceberg_scans/expression_filter.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# name: test/sql/local/iceberg_scans/expression_filter.test
# group: [iceberg_scans]

require-env DUCKDB_ICEBERG_HAVE_GENERATED_DATA

require avro

require parquet

require iceberg

statement ok
attach ':memory:' as my_datalake;

statement ok
create schema my_datalake.default;

statement ok
create view my_datalake.default.expression_filter as select * from ICEBERG_SCAN('__WORKING_DIRECTORY__/data/persistent/expression_filter');

# baseline: select all rows
query II
SELECT * FROM my_datalake.default.expression_filter ORDER BY id ASC;
----
1 foo
2 bar
3 baz

# CASE expression should not exclude any data files (prior to #464, IS NULL on CASE expression column resulted in exclusion)
query II
SELECT id, CASE WHEN value = 'foo' THEN 'not null' ELSE NULL END AS role FROM my_datalake.default.expression_filter WHERE role IS NULL ORDER BY id ASC;
----
2 NULL
3 NULL

# Complement to previous query
query II
SELECT id, CASE WHEN value = 'foo' THEN 'not null' ELSE NULL END AS role FROM my_datalake.default.expression_filter WHERE role IS NOT NULL ORDER BY id ASC;
----
1 not null
2 changes: 1 addition & 1 deletion test/sql/local/irc/iceberg_catalog_read.test
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ statement ok
CALL enable_logging('HTTP');

statement ok
set logging_level='debug'
set logging_level='debug';

statement ok
CREATE SECRET (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,18 @@ select count(*) from duckdb_logs_parsed('HTTP');
query I
select count(*) from duckdb_logs_parsed('HTTP');
----
5
3

statement ok
use memory;

# 3 more requests are made,
# 2 from previous duckdb_logs_parsed call for 'main', 'default',
# and 1 for 'memory'
# requests no longer go up
# namespace 'memory' is looked up in the iceberg catalog
query I
select count(*) from duckdb_logs_parsed('HTTP');
----
8
4

query I
select count(*) from duckdb_logs_parsed('HTTP');
----
8
4
110 changes: 110 additions & 0 deletions test/sql/local/irc/test_table_information_requests.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# name: test/sql/local/irc/test_table_information_requests.test
# description: test integration with iceberg catalog read
# group: [irc]

require-env ICEBERG_SERVER_AVAILABLE

require avro

require parquet

require iceberg

require httpfs

# Do not ignore 'HTTP' error messages!
set ignore_error_messages

statement ok
set enable_logging=true

statement ok
set logging_level='debug'

statement ok
CALL enable_logging('HTTP');

statement ok
CREATE SECRET (
TYPE S3,
KEY_ID 'admin',
SECRET 'password',
ENDPOINT '127.0.0.1:9000',
URL_STYLE 'path',
USE_SSL 0
);


statement ok
ATTACH '' AS my_datalake (
TYPE ICEBERG,
CLIENT_ID 'admin',
CLIENT_SECRET 'password',
ENDPOINT 'http://127.0.0.1:8181'
);

query I
select count(*) > 10 from (show all tables);
----
1

# 1 call for oath, 1 call for config
# 1 call to list namespaces
# 1 call to list tables in default
# 1 call to list tables in level1 namespace (no recursive namespace calls)
query I
select count(*) from duckdb_logs_parsed('HTTP');
----
5

statement ok
call truncate_duckdb_logs();

query II
select column_name, column_type from (describe my_datalake.default.supplier);
----
s_suppkey BIGINT
s_name VARCHAR
s_address VARCHAR
s_nationkey INTEGER
s_phone VARCHAR
s_acctbal DECIMAL(15,2)
s_comment VARCHAR

# one request to verify the default schema
# another request to verify table default.supplier
# another request to the table information endpoint
# FIXME: apparantly there is also a request to an avro file
query I
select count(*) from duckdb_logs_parsed('HTTP');
----
4

statement ok
begin;

statement ok
show all tables;

query I
select distinct(s_nationkey) from my_datalake.default.supplier order by all limit 5;
----
0
1
2
3
4

statement ok
commit;

# 5 calls to list the namespaces
# 1 call the the GetTableInformationEndpoint for supploer
# (FIXME) 1 call to an avro file in the warehouse
# 1 call to the manifest file
# 1 call to the manifest list
# 2 calls to read parquet files
query I
select count(*) from duckdb_logs_parsed('HTTP');
----
11
Loading