Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/include/storage/iceberg_table_information.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ struct IcebergTableInformation {
IRCSchemaEntry &schema;
string name;
string table_id;
// bool deleted;

rest_api_objects::LoadTableResult load_table_result;
IcebergTableMetadata table_metadata;
unordered_map<int32_t, unique_ptr<ICTableEntry>> schema_versions;
// dummy entry to hold existence of a table, but no schema versions
unique_ptr<ICTableEntry> dummy_entry;

public:
unique_ptr<IcebergTransactionData> transaction_data;
Expand Down
11 changes: 11 additions & 0 deletions src/include/storage/irc_catalog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ class IRCatalog : public Catalog {
bool SetCachedValue(const string &url, const string &value, const rest_api_objects::LoadTableResult &result);
static void SetAWSCatalogOptions(IcebergAttachOptions &attach_options,
case_insensitive_set_t &set_by_attach_options);
//! Whether or not this catalog should search a specific type with the standard priority
CatalogLookupBehavior CatalogTypeLookupRule(CatalogType type) const override {
switch (type) {
case CatalogType::TABLE_FUNCTION_ENTRY:
case CatalogType::SCALAR_FUNCTION_ENTRY:
case CatalogType::AGGREGATE_FUNCTION_ENTRY:
return CatalogLookupBehavior::NEVER_LOOKUP;
default:
return CatalogLookupBehavior::STANDARD;
}
}

public:
static unique_ptr<Catalog> Attach(optional_ptr<StorageExtensionInfo> storage_info, ClientContext &context,
Expand Down
31 changes: 24 additions & 7 deletions src/storage/irc_table_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ ICTableSet::ICTableSet(IRCSchemaEntry &schema) : schema(schema), catalog(schema.

bool ICTableSet::FillEntry(ClientContext &context, IcebergTableInformation &table) {
if (!table.schema_versions.empty()) {
//! Already filled
return true;
}

Expand Down Expand Up @@ -67,13 +66,31 @@ void ICTableSet::Scan(ClientContext &context, const std::function<void(CatalogEn
auto table_namespace = IRCAPI::GetEncodedSchemaName(schema.namespace_items);
for (auto &entry : entries) {
auto &table_info = entry.second;
if (FillEntry(context, table_info)) {
auto schema_id = table_info.table_metadata.current_schema_id;
callback(*table_info.schema_versions[schema_id]);
} else {
DUCKDB_LOG(context, IcebergLogType, "Table %s.%s not an Iceberg Table", table_namespace, entry.first);
non_iceberg_tables.insert(entry.first);
if (table_info.dummy_entry) {
// FIXME: why do we need to return the same entry again?
auto &optional = table_info.dummy_entry.get()->Cast<CatalogEntry>();
callback(optional);
continue;
}

// create a table entry with fake schema data to avoid calling the LoadTableInformation endpoint for every
// table while listing schemas
CreateTableInfo info(schema, table_info.name);
vector<ColumnDefinition> columns;
auto col = ColumnDefinition(string("__"), LogicalType::UNKNOWN);
columns.push_back(std::move(col));
info.columns = ColumnList(std::move(columns));
auto table_entry = make_uniq<ICTableEntry>(table_info, catalog, schema, info);
if (!table_entry->internal) {
table_entry->internal = schema.internal;
}
auto result = table_entry.get();
if (result->name.empty()) {
throw InternalException("ICTableSet::CreateEntry called with empty name");
}
table_info.dummy_entry = std::move(table_entry);
auto &optional = table_info.dummy_entry.get()->Cast<CatalogEntry>();
callback(optional);
}
// erase not iceberg tables
for (auto &entry : non_iceberg_tables) {
Expand Down
2 changes: 1 addition & 1 deletion test/sql/local/irc/iceberg_catalog_read.test
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ statement ok
CALL enable_logging('HTTP');

statement ok
set logging_level='debug'
set logging_level='debug';

statement ok
CREATE SECRET (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,18 @@ select count(*) from duckdb_logs_parsed('HTTP');
query I
select count(*) from duckdb_logs_parsed('HTTP');
----
5
3

statement ok
use memory;

# 3 more requests are made,
# 2 from previous duckdb_logs_parsed call for 'main', 'default',
# and 1 for 'memory'
# requests no longer go up
# namespace 'memory' is looked up in the iceberg catalog
query I
select count(*) from duckdb_logs_parsed('HTTP');
----
8
4

query I
select count(*) from duckdb_logs_parsed('HTTP');
----
8
4
110 changes: 110 additions & 0 deletions test/sql/local/irc/test_table_information_requests.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# name: test/sql/local/irc/test_table_information_requests.test
# description: test integration with iceberg catalog read
# group: [irc]

require-env ICEBERG_SERVER_AVAILABLE

require avro

require parquet

require iceberg

require httpfs

# Do not ignore 'HTTP' error messages!
set ignore_error_messages

statement ok
set enable_logging=true

statement ok
set logging_level='debug'

statement ok
CALL enable_logging('HTTP');

statement ok
CREATE SECRET (
TYPE S3,
KEY_ID 'admin',
SECRET 'password',
ENDPOINT '127.0.0.1:9000',
URL_STYLE 'path',
USE_SSL 0
);


statement ok
ATTACH '' AS my_datalake (
TYPE ICEBERG,
CLIENT_ID 'admin',
CLIENT_SECRET 'password',
ENDPOINT 'http://127.0.0.1:8181'
);

query I
select count(*) > 10 from (show all tables);
----
1

# 1 call for oath, 1 call for config
# 1 call to list namespaces
# 1 call to list tables in default
# 1 call to list tables in level1 namespace (no recursive namespace calls)
query I
select count(*) from duckdb_logs_parsed('HTTP');
----
5

statement ok
call truncate_duckdb_logs();

query II
select column_name, column_type from (describe my_datalake.default.supplier);
----
s_suppkey BIGINT
s_name VARCHAR
s_address VARCHAR
s_nationkey INTEGER
s_phone VARCHAR
s_acctbal DECIMAL(15,2)
s_comment VARCHAR

# one request to verify the default schema
# another request to verify table default.supplier
# another request to the table information endpoint
# FIXME: apparantly there is also a request to an avro file
query I
select count(*) from duckdb_logs_parsed('HTTP');
----
4

statement ok
begin;

statement ok
show all tables;

query I
select distinct(s_nationkey) from my_datalake.default.supplier order by all limit 5;
----
0
1
2
3
4

statement ok
commit;

# 5 calls to list the namespaces
# 1 call the the GetTableInformationEndpoint for supploer
# (FIXME) 1 call to an avro file in the warehouse
# 1 call to the manifest file
# 1 call to the manifest list
# 2 calls to read parquet files
query I
select count(*) from duckdb_logs_parsed('HTTP');
----
11
56 changes: 56 additions & 0 deletions test/sql/local/test_iceberg_and_ducklake.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# name: test/sql/local/test_iceberg_and_ducklake.test
# description: test integration with iceberg catalog read
# group: [local]

require-env ICEBERG_SERVER_AVAILABLE

require avro

require parquet

require iceberg

require httpfs

require ducklake

# Do not ignore 'HTTP' error messages!
set ignore_error_messages

statement ok
pragma threads=1;

statement ok
CALL enable_logging('HTTP');

statement ok
set logging_level='debug';

statement ok
CREATE SECRET (
TYPE S3,
KEY_ID 'admin',
SECRET 'password',
ENDPOINT '127.0.0.1:9000',
URL_STYLE 'path',
USE_SSL 0
);


statement ok
ATTACH '' AS my_datalake (
TYPE ICEBERG,
CLIENT_ID 'admin',
CLIENT_SECRET 'password',
ENDPOINT 'http://127.0.0.1:8181'
);

statement ok
ATTACH 'ducklake:duckdb:__TEST_DIR__/ducklake.duckdb' as my_ducklake (DATA_PATH '__TEST_DIR__/data_path');

# 2 requests to the iceberg catalog for oauth and config
# 3 requests when attaching ducklake because a ducklake attach calls from duckdb_tables()
query I
select count(*) from duckdb_logs_parsed('HTTP');
----
5
Loading