Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/include/storage/iceberg_table_information.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@ struct IcebergTableInformation {
IRCSchemaEntry &schema;
string name;
string table_id;
// bool deleted;
bool filled = false;

rest_api_objects::LoadTableResult load_table_result;
IcebergTableMetadata table_metadata;
unordered_map<int32_t, unique_ptr<ICTableEntry>> schema_versions;
// dummy entry to hold existence of a table, but no schema versions
unique_ptr<ICTableEntry> dummy_entry;

public:
unique_ptr<IcebergTransactionData> transaction_data;
Expand Down
11 changes: 11 additions & 0 deletions src/include/storage/irc_catalog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ class IRCatalog : public Catalog {
bool SetCachedValue(const string &url, const string &value, const rest_api_objects::LoadTableResult &result);
static void SetAWSCatalogOptions(IcebergAttachOptions &attach_options,
case_insensitive_set_t &set_by_attach_options);
//! Whether or not this catalog should search a specific type with the standard priority
CatalogLookupBehavior CatalogTypeLookupRule(CatalogType type) const override {
switch (type) {
case CatalogType::TABLE_FUNCTION_ENTRY:
case CatalogType::SCALAR_FUNCTION_ENTRY:
case CatalogType::AGGREGATE_FUNCTION_ENTRY:
return CatalogLookupBehavior::NEVER_LOOKUP;
default:
return CatalogLookupBehavior::STANDARD;
}
}

public:
static unique_ptr<Catalog> Attach(optional_ptr<StorageExtensionInfo> storage_info, ClientContext &context,
Expand Down
3 changes: 2 additions & 1 deletion src/storage/iceberg_table_information.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ optional_ptr<CatalogEntry> IcebergTableInformation::CreateSchemaVersion(IcebergT

optional_ptr<CatalogEntry> IcebergTableInformation::GetSchemaVersion(optional_ptr<BoundAtClause> at) {
D_ASSERT(!schema_versions.empty());
D_ASSERT(filled);
auto snapshot_lookup = IcebergSnapshotLookup::FromAtClause(at);

int32_t schema_id;
Expand All @@ -232,7 +233,7 @@ optional_ptr<CatalogEntry> IcebergTableInformation::GetSchemaVersion(optional_pt
}

IcebergTableInformation::IcebergTableInformation(IRCatalog &catalog, IRCSchemaEntry &schema, const string &name)
: catalog(catalog), schema(schema), name(name) {
: catalog(catalog), schema(schema), name(name), filled(false) {
table_id = "uuid-" + schema.name + "-" + name;
}

Expand Down
35 changes: 27 additions & 8 deletions src/storage/irc_table_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ ICTableSet::ICTableSet(IRCSchemaEntry &schema) : schema(schema), catalog(schema.
}

bool ICTableSet::FillEntry(ClientContext &context, IcebergTableInformation &table) {
if (!table.schema_versions.empty()) {
//! Already filled
if (table.filled) {
return true;
}
// The table has not been filled yet, clear all dummy schema versions
table.schema_versions.clear();

auto &ic_catalog = catalog.Cast<IRCatalog>();

Expand All @@ -57,6 +58,7 @@ bool ICTableSet::FillEntry(ClientContext &context, IcebergTableInformation &tabl
for (auto &table_schema : schemas) {
table.CreateSchemaVersion(*table_schema.second);
}
table.filled = true;
return true;
}

Expand All @@ -67,13 +69,28 @@ void ICTableSet::Scan(ClientContext &context, const std::function<void(CatalogEn
auto table_namespace = IRCAPI::GetEncodedSchemaName(schema.namespace_items);
for (auto &entry : entries) {
auto &table_info = entry.second;
if (FillEntry(context, table_info)) {
auto schema_id = table_info.table_metadata.current_schema_id;
callback(*table_info.schema_versions[schema_id]);
} else {
DUCKDB_LOG(context, IcebergLogType, "Table %s.%s not an Iceberg Table", table_namespace, entry.first);
non_iceberg_tables.insert(entry.first);
if (table_info.dummy_entry) {
continue;
}

// create a table entry with fake schema data
// filled stays false
CreateTableInfo info(schema, table_info.name);
vector<ColumnDefinition> columns;
auto col = ColumnDefinition(string("__"), LogicalType::UNKNOWN);
columns.push_back(std::move(col));
info.columns = ColumnList(std::move(columns));
auto table_entry = make_uniq<ICTableEntry>(table_info, catalog, schema, info);
if (!table_entry->internal) {
table_entry->internal = schema.internal;
}
auto result = table_entry.get();
if (result->name.empty()) {
throw InternalException("ICTableSet::CreateEntry called with empty name");
}
table_info.dummy_entry = std::move(table_entry);
auto &optional = table_info.dummy_entry.get()->Cast<CatalogEntry>();
callback(optional);
}
// erase not iceberg tables
for (auto &entry : non_iceberg_tables) {
Expand Down Expand Up @@ -116,6 +133,8 @@ bool ICTableSet::CreateNewEntry(ClientContext &context, IRCatalog &catalog, IRCS

auto table_entry = make_uniq<ICTableEntry>(table_info, catalog, schema, info);
auto optional_entry = table_entry.get();
// Since we are creating the schema, we set the table information to filled
table_info.filled = true;

optional_entry->table_info.schema_versions[0] = std::move(table_entry);
optional_entry->table_info.table_metadata.schemas[0] =
Expand Down
2 changes: 1 addition & 1 deletion test/sql/local/irc/iceberg_catalog_read.test
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ statement ok
CALL enable_logging('HTTP');

statement ok
set logging_level='debug'
set logging_level='debug';

statement ok
CREATE SECRET (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,18 @@ select count(*) from duckdb_logs_parsed('HTTP');
query I
select count(*) from duckdb_logs_parsed('HTTP');
----
5
3

statement ok
use memory;

# 3 more requests are made,
# 2 from previous duckdb_logs_parsed call for 'main', 'default',
# and 1 for 'memory'
# requests no longer go up
# namespace 'memory' is looked up in the iceberg catalog
query I
select count(*) from duckdb_logs_parsed('HTTP');
----
8
4

query I
select count(*) from duckdb_logs_parsed('HTTP');
----
8
4
108 changes: 108 additions & 0 deletions test/sql/local/irc/test_table_information_requests.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# name: test/sql/local/irc/test_table_information_requests.test
# description: test integration with iceberg catalog read
# group: [irc]

require-env ICEBERG_SERVER_AVAILABLE

require avro

require parquet

require iceberg

require httpfs

# Do not ignore 'HTTP' error messages!
set ignore_error_messages

statement ok
set enable_logging=true

statement ok
set logging_level='debug'

statement ok
CALL enable_logging('HTTP');

statement ok
CREATE SECRET (
TYPE S3,
KEY_ID 'admin',
SECRET 'password',
ENDPOINT '127.0.0.1:9000',
URL_STYLE 'path',
USE_SSL 0
);


statement ok
ATTACH '' AS my_datalake (
TYPE ICEBERG,
CLIENT_ID 'admin',
CLIENT_SECRET 'password',
ENDPOINT 'http://127.0.0.1:8181'
);

statement ok
show all tables;

# 1 call for oath, 1 call for config
# 1 call to list namespaces
# 1 call to list tables in default
# 1 call to list tables in level1 namespace (no recursive namespace calls)
query I
select count(*) from duckdb_logs_parsed('HTTP');
----
5

statement ok
call truncate_duckdb_logs();

query II
select column_name, column_type from (describe my_datalake.default.supplier);
----
s_suppkey BIGINT
s_name VARCHAR
s_address VARCHAR
s_nationkey INTEGER
s_phone VARCHAR
s_acctbal DECIMAL(15,2)
s_comment VARCHAR

# one request to verify the default schema
# another request to verify table default.supplier
# another request to the table information endpoint
# FIXME: apparantly there is also a request to an avro file
query I
select count(*) from duckdb_logs_parsed('HTTP');
----
4

statement ok
begin;

statement ok
show all tables;

query I
select distinct(s_nationkey) from my_datalake.default.supplier order by all limit 5;
----
0
1
2
3
4

statement ok
commit;

# 5 calls to list the namespaces
# 1 call the the GetTableInformationEndpoint for supploer
# (FIXME) 1 call to an avro file in the warehouse
# 1 call to the manifest file
# 1 call to the manifest list
# 2 calls to read parquet files
query I
select count(*) from duckdb_logs_parsed('HTTP');
----
11
56 changes: 56 additions & 0 deletions test/sql/local/test_iceberg_and_ducklake.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# name: test/sql/local/test_iceberg_and_ducklake.test
# description: test integration with iceberg catalog read
# group: [local]

require-env ICEBERG_SERVER_AVAILABLE

require avro

require parquet

require iceberg

require httpfs

require ducklake

# Do not ignore 'HTTP' error messages!
set ignore_error_messages

statement ok
pragma threads=1;

statement ok
CALL enable_logging('HTTP');

statement ok
set logging_level='debug';

statement ok
CREATE SECRET (
TYPE S3,
KEY_ID 'admin',
SECRET 'password',
ENDPOINT '127.0.0.1:9000',
URL_STYLE 'path',
USE_SSL 0
);


statement ok
ATTACH '' AS my_datalake (
TYPE ICEBERG,
CLIENT_ID 'admin',
CLIENT_SECRET 'password',
ENDPOINT 'http://127.0.0.1:8181'
);

statement ok
ATTACH 'ducklake:duckdb:__TEST_DIR__/ducklake.duckdb' as my_ducklake (DATA_PATH '__TEST_DIR__/data_path');

# 2 requests to the iceberg catalog for oauth and config
# 3 requests when attaching ducklake because a ducklake attach calls from duckdb_tables()
query I
select count(*) from duckdb_logs_parsed('HTTP');
----
5
Loading