Skip to content

Commit 7f9c82d

Browse files
authored
Merge pull request #381 from Tmonster/only_post_valid_iceberg_tables
Wrap LoadTableInformation response so we can ignore non-iceberg tables
2 parents 51e9430 + 1167fb5 commit 7f9c82d

File tree

8 files changed

+182
-23
lines changed

8 files changed

+182
-23
lines changed

src/catalog_api.cpp

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,8 @@ bool IRCAPI::VerifyTableExistence(ClientContext &context, IRCatalog &catalog, co
105105
return false;
106106
}
107107

108-
static string GetTableMetadata(ClientContext &context, IRCatalog &catalog, const IRCSchemaEntry &schema,
109-
const string &table) {
108+
static unique_ptr<HTTPResponse> GetTableMetadata(ClientContext &context, IRCatalog &catalog,
109+
const IRCSchemaEntry &schema, const string &table) {
110110
auto schema_name = IRCAPI::GetEncodedSchemaName(schema.namespace_items);
111111

112112
auto url_builder = catalog.GetBaseUrl();
@@ -117,20 +117,28 @@ static string GetTableMetadata(ClientContext &context, IRCatalog &catalog, const
117117
url_builder.AddPathComponent(table);
118118

119119
auto url = url_builder.GetURL();
120-
auto response = catalog.auth_handler->GetRequest(context, url_builder);
121-
if (!response->Success()) {
122-
ThrowException(url, *response, "GET");
123-
}
124-
125-
return response->body;
120+
return catalog.auth_handler->GetRequest(context, url_builder);
126121
}
127122

128-
rest_api_objects::LoadTableResult IRCAPI::GetTable(ClientContext &context, IRCatalog &catalog,
129-
const IRCSchemaEntry &schema, const string &table_name) {
123+
APIResult<rest_api_objects::LoadTableResult> IRCAPI::GetTable(ClientContext &context, IRCatalog &catalog,
124+
const IRCSchemaEntry &schema, const string &table_name) {
125+
auto ret = APIResult<rest_api_objects::LoadTableResult>();
130126
auto result = GetTableMetadata(context, catalog, schema, table_name);
131-
std::unique_ptr<yyjson_doc, YyjsonDocDeleter> doc(ICUtils::api_result_to_doc(result));
127+
if (result->status != HTTPStatusCode::OK_200) {
128+
yyjson_val *error_obj = ICUtils::get_error_message(result->body);
129+
if (error_obj == nullptr) {
130+
throw InvalidConfigurationException(result->body);
131+
}
132+
ret.has_error = true;
133+
ret.status_ = result->status;
134+
ret.error_ = rest_api_objects::IcebergErrorResponse::FromJSON(error_obj);
135+
return ret;
136+
}
137+
ret.has_error = false;
138+
std::unique_ptr<yyjson_doc, YyjsonDocDeleter> doc(ICUtils::api_result_to_doc(result->body));
132139
auto *metadata_root = yyjson_doc_get_root(doc.get());
133-
return rest_api_objects::LoadTableResult::FromJSON(metadata_root);
140+
ret.result_ = rest_api_objects::LoadTableResult::FromJSON(metadata_root);
141+
return ret;
134142
}
135143

136144
vector<rest_api_objects::TableIdentifier> IRCAPI::GetTables(ClientContext &context, IRCatalog &catalog,
@@ -150,6 +158,12 @@ vector<rest_api_objects::TableIdentifier> IRCAPI::GetTables(ClientContext &conte
150158
}
151159
auto response = catalog.auth_handler->GetRequest(context, url_builder);
152160
if (!response->Success()) {
161+
if (response->status == HTTPStatusCode::Forbidden_403 ||
162+
response->status == HTTPStatusCode::Unauthorized_401) {
163+
// return empty result if user cannot list tables for a schema.
164+
vector<rest_api_objects::TableIdentifier> ret;
165+
return ret;
166+
}
153167
auto url = url_builder.GetURL();
154168
ThrowException(url, *response, "GET");
155169
}
@@ -191,6 +205,11 @@ vector<IRCAPISchema> IRCAPI::GetSchemas(ClientContext &context, IRCatalog &catal
191205
}
192206
auto response = catalog.auth_handler->GetRequest(context, endpoint_builder);
193207
if (!response->Success()) {
208+
if (response->status == HTTPStatusCode::Forbidden_403 ||
209+
response->status == HTTPStatusCode::Unauthorized_401) {
210+
// return empty result if user cannot list schemas.
211+
return result;
212+
}
194213
auto url = endpoint_builder.GetURL();
195214
ThrowException(url, *response, "GET");
196215
}

src/catalog_utils.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,22 @@
44

55
namespace duckdb {
66

7+
yyjson_val *ICUtils::get_error_message(const string &api_result) {
8+
auto *doc = yyjson_read(api_result.c_str(), api_result.size(), 0);
9+
auto *root = yyjson_doc_get_root(doc);
10+
auto *error = yyjson_obj_get(root, "error");
11+
if (error == nullptr) {
12+
return nullptr;
13+
}
14+
auto message = yyjson_obj_get(error, "message");
15+
auto type = yyjson_obj_get(error, "type");
16+
auto code = yyjson_obj_get(error, "code");
17+
if (message != nullptr && type != nullptr && code != nullptr) {
18+
return root;
19+
}
20+
return nullptr;
21+
}
22+
723
yyjson_doc *ICUtils::api_result_to_doc(const string &api_result) {
824
auto *doc = yyjson_read(api_result.c_str(), api_result.size(), 0);
925
auto *root = yyjson_doc_get_root(doc);

src/include/catalog_api.hpp

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,21 @@ struct IRCAPISchema {
2020
string catalog_name;
2121
};
2222

23+
// Some API responses have error messages that need to be checked before being raised
24+
// to the user, since sometimes is does not mean whole operation has failed.
25+
// Ex: Glue will return an error when trying to get the metadata for a non-iceberg table during a list tables operation
26+
// The complete operation did not fail, just getting metadata for one table
27+
template <typename T>
28+
class APIResult {
29+
public:
30+
APIResult() {};
31+
32+
T result_;
33+
HTTPStatusCode status_;
34+
bool has_error;
35+
rest_api_objects::IcebergErrorResponse error_;
36+
};
37+
2338
class IRCAPI {
2439
public:
2540
static const string API_VERSION_1;
@@ -31,8 +46,8 @@ class IRCAPI {
3146
static vector<string> ParseSchemaName(const string &namespace_name);
3247
static string GetSchemaName(const vector<string> &items);
3348
static string GetEncodedSchemaName(const vector<string> &items);
34-
static rest_api_objects::LoadTableResult GetTable(ClientContext &context, IRCatalog &catalog,
35-
const IRCSchemaEntry &schema, const string &table_name);
49+
static APIResult<rest_api_objects::LoadTableResult>
50+
GetTable(ClientContext &context, IRCatalog &catalog, const IRCSchemaEntry &schema, const string &table_name);
3651
static vector<IRCAPISchema> GetSchemas(ClientContext &context, IRCatalog &catalog, const vector<string> &parent);
3752
static void CommitTableUpdate(ClientContext &context, IRCatalog &catalog, const vector<string> &schema,
3853
const string &table_name, const string &body);

src/include/catalog_utils.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ struct YyjsonDocDeleter {
2121

2222
class ICUtils {
2323
public:
24+
static yyjson_val *get_error_message(const string &api_result);
2425
static yyjson_doc *api_result_to_doc(const string &api_result);
2526
static string JsonToString(std::unique_ptr<yyjson_mut_doc, YyjsonDocDeleter> doc);
2627
};

src/include/storage/irc_table_set.hpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ class ICTableSet {
2525

2626
public:
2727
void LoadEntries(ClientContext &context);
28-
void FillEntry(ClientContext &context, IcebergTableInformation &table);
28+
//! return true if request to LoadTableInformation was successful and entry has been filled
29+
//! or if entry is already filled. Returns False otherwise
30+
bool FillEntry(ClientContext &context, IcebergTableInformation &table);
2931

3032
public:
3133
IRCSchemaEntry &schema;

src/storage/irc_table_set.cpp

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
#include "catalog_api.hpp"
22
#include "catalog_utils.hpp"
3+
#include "iceberg_logging.hpp"
34

45
#include "storage/irc_catalog.hpp"
56
#include "storage/irc_table_set.hpp"
6-
77
#include "storage/irc_transaction.hpp"
88
#include "metadata/iceberg_partition_spec.hpp"
99
#include "duckdb/parser/constraints/not_null_constraint.hpp"
@@ -12,6 +12,8 @@
1212
#include "duckdb/planner/parsed_data/bound_create_table_info.hpp"
1313
#include "duckdb/parser/parsed_data/drop_info.hpp"
1414
#include "duckdb/catalog/dependency_list.hpp"
15+
#include "duckdb/common/enums/http_status_code.hpp"
16+
#include "duckdb/common/exception/http_exception.hpp"
1517
#include "duckdb/parser/parsed_data/create_table_info.hpp"
1618
#include "duckdb/parser/constraints/list.hpp"
1719
#include "storage/irc_schema_entry.hpp"
@@ -27,13 +29,26 @@ namespace duckdb {
2729
ICTableSet::ICTableSet(IRCSchemaEntry &schema) : schema(schema), catalog(schema.ParentCatalog()) {
2830
}
2931

30-
void ICTableSet::FillEntry(ClientContext &context, IcebergTableInformation &table) {
32+
bool ICTableSet::FillEntry(ClientContext &context, IcebergTableInformation &table) {
3133
if (!table.schema_versions.empty()) {
32-
return;
34+
//! Already filled
35+
return true;
3336
}
3437

3538
auto &ic_catalog = catalog.Cast<IRCatalog>();
36-
table.load_table_result = IRCAPI::GetTable(context, ic_catalog, schema, table.name);
39+
40+
auto get_table_result = IRCAPI::GetTable(context, ic_catalog, schema, table.name);
41+
if (get_table_result.has_error) {
42+
if (get_table_result.error_._error.type == "NoSuchIcebergTableException") {
43+
return false;
44+
}
45+
if (get_table_result.status_ == HTTPStatusCode::Forbidden_403 ||
46+
get_table_result.status_ == HTTPStatusCode::Unauthorized_401) {
47+
return false;
48+
}
49+
throw HTTPException(get_table_result.error_._error.message);
50+
}
51+
table.load_table_result = std::move(get_table_result.result_);
3752
table.table_metadata = IcebergTableMetadata::FromTableMetadata(table.load_table_result.metadata);
3853
auto &schemas = table.table_metadata.schemas;
3954

@@ -42,24 +57,34 @@ void ICTableSet::FillEntry(ClientContext &context, IcebergTableInformation &tabl
4257
for (auto &table_schema : schemas) {
4358
table.CreateSchemaVersion(*table_schema.second);
4459
}
60+
return true;
4561
}
4662

4763
void ICTableSet::Scan(ClientContext &context, const std::function<void(CatalogEntry &)> &callback) {
4864
lock_guard<mutex> l(entry_lock);
4965
LoadEntries(context);
66+
case_insensitive_set_t non_iceberg_tables;
67+
auto table_namespace = IRCAPI::GetEncodedSchemaName(schema.namespace_items);
5068
for (auto &entry : entries) {
5169
auto &table_info = entry.second;
52-
FillEntry(context, table_info);
53-
auto schema_id = table_info.table_metadata.current_schema_id;
54-
callback(*table_info.schema_versions[schema_id]);
70+
if (FillEntry(context, table_info)) {
71+
auto schema_id = table_info.table_metadata.current_schema_id;
72+
callback(*table_info.schema_versions[schema_id]);
73+
} else {
74+
DUCKDB_LOG(context, IcebergLogType, "Table %s.%s not an Iceberg Table", table_namespace, entry.first);
75+
non_iceberg_tables.insert(entry.first);
76+
}
77+
}
78+
// erase not iceberg tables
79+
for (auto &entry : non_iceberg_tables) {
80+
entries.erase(entry);
5581
}
5682
}
5783

5884
void ICTableSet::LoadEntries(ClientContext &context) {
5985
if (listed) {
6086
return;
6187
}
62-
6388
auto &ic_catalog = catalog.Cast<IRCatalog>();
6489
auto tables = IRCAPI::GetTables(context, ic_catalog, schema);
6590

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# name: test/sql/cloud/test_glue_catalog_with_other_tables.test
2+
# group: [cloud]
3+
4+
require-env ICEBERG_AWS_REMOTE_AVAILABLE
5+
6+
require-env AWS_ACCESS_KEY_ID
7+
8+
require-env AWS_SECRET_ACCESS_KEY
9+
10+
require avro
11+
12+
require parquet
13+
14+
require iceberg
15+
16+
require httpfs
17+
18+
require aws
19+
20+
statement ok
21+
CREATE SECRET iceberg_secret (
22+
TYPE S3,
23+
PROVIDER credential_chain
24+
);
25+
26+
statement ok
27+
attach '840140254803' as glue_catalog (
28+
type iceberg,
29+
endpoint 'glue.us-east-1.amazonaws.com/iceberg',
30+
AUTHORIZATION_TYPE 'sigv4'
31+
);
32+
33+
34+
statement ok
35+
show all tables;
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# name: test/sql/cloud/test_snowflake_with_diff_permissions.test
2+
# description: test integration with iceberg catalog read
3+
# group: [cloud]
4+
5+
require-env ICEBERG_AWS_REMOTE_AVAILABLE
6+
7+
require-env SNOWFLAKE_CATALOG_URI_GCS
8+
9+
require-env SNOWFLAKE_SECRET_KEY_S3_PERMISSION_TEST
10+
11+
require-env SNOWFLAKE_KEY_ID_S3_PERMISSION_TEST
12+
13+
require avro
14+
15+
require parquet
16+
17+
require iceberg
18+
19+
require httpfs
20+
21+
require aws
22+
23+
# Do not ignore 'HTTP' error messages!
24+
set ignore_error_messages
25+
26+
statement ok
27+
create secret test_namespace_permissions (
28+
TYPE ICEBERG,
29+
CLIENT_ID '${SNOWFLAKE_KEY_ID_S3_PERMISSION_TEST}',
30+
CLIENT_SECRET '${SNOWFLAKE_SECRET_KEY_S3_PERMISSION_TEST}',
31+
ENDPOINT '${SNOWFLAKE_CATALOG_URI_GCS}'
32+
);
33+
34+
statement ok
35+
attach 's3-catalog' as my_datalake (
36+
type ICEBERG,
37+
default_region 'eu-west-2',
38+
secret test_namespace_permissions,
39+
ENDPOINT '${SNOWFLAKE_CATALOG_URI_GCS}',
40+
support_nested_namespaces true
41+
);
42+
43+
query I
44+
select count(*) from (show all tables);
45+
----
46+
4

0 commit comments

Comments
 (0)