Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/aws.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ unique_ptr<HTTPResponse> AWSInput::ExecuteRequest(ClientContext &context, Aws::H
return result;
}

unique_ptr<HTTPResponse> AWSInput::HeadRequest(ClientContext &context) {
return ExecuteRequest(context, Aws::Http::HttpMethod::HTTP_HEAD);
}

unique_ptr<HTTPResponse> AWSInput::GetRequest(ClientContext &context) {
return ExecuteRequest(context, Aws::Http::HttpMethod::HTTP_GET);
}
Expand Down
56 changes: 46 additions & 10 deletions src/catalog_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
using namespace duckdb_yyjson;
namespace duckdb {

vector<string> IRCAPI::ParseSchemaName(string &namespace_name) {
vector<string> IRCAPI::ParseSchemaName(const string &namespace_name) {
idx_t start = 0;
idx_t end = namespace_name.find(".", start);
vector<string> ret;
Expand All @@ -40,6 +40,7 @@ string IRCAPI::GetSchemaName(const vector<string> &items) {

//! Used for the path parameters
string IRCAPI::GetEncodedSchemaName(const vector<string> &items) {
D_ASSERT(!items.empty());
static const string unit_separator = "%1F";
return StringUtil::Join(items, unit_separator);
}
Expand All @@ -63,6 +64,47 @@ string IRCAPI::GetEncodedSchemaName(const vector<string> &items) {
int(response.status));
}

bool IRCAPI::VerifySchemaExistence(ClientContext &context, IRCatalog &catalog, const string &schema) {
auto namespace_items = ParseSchemaName(schema);
auto schema_name = GetEncodedSchemaName(namespace_items);

auto url_builder = catalog.GetBaseUrl();
url_builder.AddPathComponent(catalog.prefix);
url_builder.AddPathComponent("namespaces");
url_builder.AddPathComponent(schema_name);

auto url = url_builder.GetURL();
try {
auto response = catalog.auth_handler->HeadRequest(context, url_builder);
if (response->Success()) {
return true;
}
return false;
} catch (std::exception &ex) {
//! For some reason this API likes to throw, instead of just returning a response with an error
return false;
}
}

bool IRCAPI::VerifyTableExistence(ClientContext &context, IRCatalog &catalog, const IRCSchemaEntry &schema,
const string &table) {
auto schema_name = GetEncodedSchemaName(schema.namespace_items);

auto url_builder = catalog.GetBaseUrl();
url_builder.AddPathComponent(catalog.prefix);
url_builder.AddPathComponent("namespaces");
url_builder.AddPathComponent(schema_name);
url_builder.AddPathComponent("tables");
url_builder.AddPathComponent(table);

auto url = url_builder.GetURL();
auto response = catalog.auth_handler->HeadRequest(context, url_builder);
if (response->Success()) {
return true;
}
return false;
}

static string GetTableMetadata(ClientContext &context, IRCatalog &catalog, const IRCSchemaEntry &schema,
const string &table) {
auto schema_name = IRCAPI::GetEncodedSchemaName(schema.namespace_items);
Expand All @@ -80,21 +122,15 @@ static string GetTableMetadata(ClientContext &context, IRCatalog &catalog, const
ThrowException(url, *response, "GET");
}

const auto &api_result = response->body;
std::unique_ptr<yyjson_doc, YyjsonDocDeleter> doc(ICUtils::api_result_to_doc(api_result));
auto *root = yyjson_doc_get_root(doc.get());
auto load_table_result = rest_api_objects::LoadTableResult::FromJSON(root);
catalog.SetCachedValue(url, api_result, load_table_result);
return api_result;
return response->body;
}

rest_api_objects::LoadTableResult IRCAPI::GetTable(ClientContext &context, IRCatalog &catalog,
const IRCSchemaEntry &schema, const string &table_name) {
string result = GetTableMetadata(context, catalog, schema, table_name);
auto result = GetTableMetadata(context, catalog, schema, table_name);
std::unique_ptr<yyjson_doc, YyjsonDocDeleter> doc(ICUtils::api_result_to_doc(result));
auto *metadata_root = yyjson_doc_get_root(doc.get());
auto load_table_result = rest_api_objects::LoadTableResult::FromJSON(metadata_root);
return load_table_result;
return rest_api_objects::LoadTableResult::FromJSON(metadata_root);
}

vector<rest_api_objects::TableIdentifier> IRCAPI::GetTables(ClientContext &context, IRCatalog &catalog,
Expand Down
21 changes: 21 additions & 0 deletions src/common/api_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,25 @@ unique_ptr<HTTPResponse> APIUtils::GetRequest(ClientContext &context, const IRCE
return http_util.Request(get_request);
}

unique_ptr<HTTPResponse> APIUtils::HeadRequest(ClientContext &context, const IRCEndpointBuilder &endpoint_builder,
const string &token) {
auto &db = DatabaseInstance::GetDatabase(context);

HTTPHeaders headers(db);
headers.Insert("X-Iceberg-Access-Delegation", "vended-credentials");
if (!token.empty()) {
headers.Insert("Authorization", StringUtil::Format("Bearer %s", token));
}

auto &http_util = HTTPUtil::Get(db);
unique_ptr<HTTPParams> params;

string request_url = AddHttpHostIfMissing(endpoint_builder.GetURL());

params = http_util.InitializeParameters(context, request_url);

HeadRequestInfo head_request(request_url, headers, *params);
return http_util.Request(head_request);
}

} // namespace duckdb
2 changes: 2 additions & 0 deletions src/include/api_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class APIUtils {
public:
static unique_ptr<HTTPResponse> GetRequest(ClientContext &context, const IRCEndpointBuilder &endpoint_builder,
const string &token = "");
static unique_ptr<HTTPResponse> HeadRequest(ClientContext &context, const IRCEndpointBuilder &endpoint_builder,
const string &token = "");
static unique_ptr<HTTPResponse> DeleteRequest(ClientContext &context, const IRCEndpointBuilder &endpoint_builder,
const string &token = "");
static unique_ptr<HTTPResponse> PostRequest(ClientContext &context, const string &url, const string &post_data,
Expand Down
1 change: 1 addition & 0 deletions src/include/aws.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class AWSInput {

public:
unique_ptr<HTTPResponse> GetRequest(ClientContext &context);
unique_ptr<HTTPResponse> HeadRequest(ClientContext &context);
unique_ptr<HTTPResponse> DeleteRequest(ClientContext &context);
unique_ptr<HTTPResponse> PostRequest(ClientContext &context, string post_body);

Expand Down
9 changes: 6 additions & 3 deletions src/include/catalog_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ struct IRCAPISchema {
class IRCAPI {
public:
static const string API_VERSION_1;
static vector<string> ParseSchemaName(string &namespace_name);
static string GetSchemaName(const vector<string> &items);
static string GetEncodedSchemaName(const vector<string> &items);
static vector<rest_api_objects::TableIdentifier> GetTables(ClientContext &context, IRCatalog &catalog,
const IRCSchemaEntry &schema);
static bool VerifySchemaExistence(ClientContext &context, IRCatalog &catalog, const string &schema);
static bool VerifyTableExistence(ClientContext &context, IRCatalog &catalog, const IRCSchemaEntry &schema,
const string &table);
static vector<string> ParseSchemaName(const string &namespace_name);
static string GetSchemaName(const vector<string> &items);
static string GetEncodedSchemaName(const vector<string> &items);
static rest_api_objects::LoadTableResult GetTable(ClientContext &context, IRCatalog &catalog,
const IRCSchemaEntry &schema, const string &table_name);
static vector<IRCAPISchema> GetSchemas(ClientContext &context, IRCatalog &catalog, const vector<string> &parent);
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/authorization/none.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class NoneAuthorization : public IRCAuthorization {
public:
static unique_ptr<IRCAuthorization> FromAttachOptions(IcebergAttachOptions &input);
unique_ptr<HTTPResponse> GetRequest(ClientContext &context, const IRCEndpointBuilder &endpoint_builder) override;
unique_ptr<HTTPResponse> HeadRequest(ClientContext &context, const IRCEndpointBuilder &endpoint_builder) override;
unique_ptr<HTTPResponse> DeleteRequest(ClientContext &context, const IRCEndpointBuilder &endpoint_builder) override;
unique_ptr<HTTPResponse> PostRequest(ClientContext &context, const IRCEndpointBuilder &endpoint_builder,
const string &body) override;
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/authorization/oauth2.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class OAuth2Authorization : public IRCAuthorization {
public:
static unique_ptr<OAuth2Authorization> FromAttachOptions(ClientContext &context, IcebergAttachOptions &input);
unique_ptr<HTTPResponse> GetRequest(ClientContext &context, const IRCEndpointBuilder &endpoint_builder) override;
unique_ptr<HTTPResponse> HeadRequest(ClientContext &context, const IRCEndpointBuilder &endpoint_builder) override;
unique_ptr<HTTPResponse> DeleteRequest(ClientContext &context, const IRCEndpointBuilder &endpoint_builder) override;
unique_ptr<HTTPResponse> PostRequest(ClientContext &context, const IRCEndpointBuilder &endpoint_builder,
const string &body) override;
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/authorization/sigv4.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class SIGV4Authorization : public IRCAuthorization {
public:
static unique_ptr<IRCAuthorization> FromAttachOptions(IcebergAttachOptions &input);
unique_ptr<HTTPResponse> GetRequest(ClientContext &context, const IRCEndpointBuilder &endpoint_builder) override;
unique_ptr<HTTPResponse> HeadRequest(ClientContext &context, const IRCEndpointBuilder &endpoint_builder) override;
unique_ptr<HTTPResponse> DeleteRequest(ClientContext &context, const IRCEndpointBuilder &endpoint_builder) override;
unique_ptr<HTTPResponse> PostRequest(ClientContext &context, const IRCEndpointBuilder &endpoint_builder,
const string &body) override;
Expand Down
2 changes: 2 additions & 0 deletions src/include/storage/irc_authorization.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ struct IRCAuthorization {

public:
virtual unique_ptr<HTTPResponse> GetRequest(ClientContext &context, const IRCEndpointBuilder &endpoint_builder) = 0;
virtual unique_ptr<HTTPResponse> HeadRequest(ClientContext &context,
const IRCEndpointBuilder &endpoint_builder) = 0;
virtual unique_ptr<HTTPResponse> DeleteRequest(ClientContext &context,
const IRCEndpointBuilder &endpoint_builder) = 0;
virtual unique_ptr<HTTPResponse> PostRequest(ClientContext &context, const IRCEndpointBuilder &endpoint_builder,
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/irc_schema_entry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "catalog_api.hpp"
#include "duckdb/catalog/catalog_entry/schema_catalog_entry.hpp"
#include "storage/irc_table_set.hpp"
#include "duckdb/common/enums/on_entry_not_found.hpp"

namespace duckdb {
class IRCTransaction;
Expand Down
4 changes: 3 additions & 1 deletion src/include/storage/irc_schema_set.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class IRCSchemaSet {

public:
void LoadEntries(ClientContext &context);
optional_ptr<CatalogEntry> GetEntry(ClientContext &context, const string &name);
optional_ptr<CatalogEntry> GetEntry(ClientContext &context, const string &name, OnEntryNotFound if_not_found);
void Scan(ClientContext &context, const std::function<void(CatalogEntry &)> &callback);

protected:
Expand All @@ -21,6 +21,8 @@ class IRCSchemaSet {
public:
Catalog &catalog;
case_insensitive_map_t<unique_ptr<CatalogEntry>> entries;
//! Whether a listing has been done for the catalog
bool listed = false;

private:
mutex entry_lock;
Expand Down
2 changes: 2 additions & 0 deletions src/include/storage/irc_table_set.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class ICTableSet {
IRCSchemaEntry &schema;
Catalog &catalog;
case_insensitive_map_t<IcebergTableInformation> entries;
//! Whether a listing is done for this transaction
bool listed = false;

private:
mutex entry_lock;
Expand Down
5 changes: 5 additions & 0 deletions src/storage/authorization/none.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ unique_ptr<HTTPResponse> NoneAuthorization::GetRequest(ClientContext &context,
return APIUtils::GetRequest(context, endpoint_builder, "");
}

unique_ptr<HTTPResponse> NoneAuthorization::HeadRequest(ClientContext &context,
const IRCEndpointBuilder &endpoint_builder) {
return APIUtils::HeadRequest(context, endpoint_builder, "");
}

unique_ptr<HTTPResponse> NoneAuthorization::DeleteRequest(ClientContext &context,
const IRCEndpointBuilder &endpoint_builder) {
return APIUtils::DeleteRequest(context, endpoint_builder, "");
Expand Down
5 changes: 5 additions & 0 deletions src/storage/authorization/oauth2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ unique_ptr<HTTPResponse> OAuth2Authorization::GetRequest(ClientContext &context,
return APIUtils::GetRequest(context, endpoint_builder, token);
}

unique_ptr<HTTPResponse> OAuth2Authorization::HeadRequest(ClientContext &context,
const IRCEndpointBuilder &endpoint_builder) {
return APIUtils::HeadRequest(context, endpoint_builder, token);
}

unique_ptr<HTTPResponse> OAuth2Authorization::DeleteRequest(ClientContext &context,
const IRCEndpointBuilder &endpoint_builder) {
return APIUtils::DeleteRequest(context, endpoint_builder, token);
Expand Down
6 changes: 6 additions & 0 deletions src/storage/authorization/sigv4.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ unique_ptr<HTTPResponse> SIGV4Authorization::GetRequest(ClientContext &context,
return aws_input.GetRequest(context);
}

unique_ptr<HTTPResponse> SIGV4Authorization::HeadRequest(ClientContext &context,
const IRCEndpointBuilder &endpoint_builder) {
AWSInput aws_input = CreateAWSInput(context, endpoint_builder);
return aws_input.HeadRequest(context);
}

unique_ptr<HTTPResponse> SIGV4Authorization::DeleteRequest(ClientContext &context,
const IRCEndpointBuilder &endpoint_builder) {
AWSInput aws_input = CreateAWSInput(context, endpoint_builder);
Expand Down
37 changes: 1 addition & 36 deletions src/storage/irc_catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ optional_ptr<SchemaCatalogEntry> IRCatalog::LookupSchema(CatalogTransaction tran
auto &schemas = irc_transaction.GetSchemas();

auto &schema_name = schema_lookup.GetEntryName();
auto entry = schemas.GetEntry(transaction.GetContext(), schema_name);
auto entry = schemas.GetEntry(transaction.GetContext(), schema_name, if_not_found);
if (!entry && if_not_found != OnEntryNotFound::RETURN_NULL) {
throw CatalogException(schema_lookup.GetErrorContext(), "Schema with name \"%s\" not found", schema_name);
}
Expand Down Expand Up @@ -318,41 +318,6 @@ void IRCatalog::GetConfig(ClientContext &context, IcebergEndpointType &endpoint_
}
}

string IRCatalog::OptionalGetCachedValue(const string &url) {
std::lock_guard<std::mutex> lock(metadata_cache_mutex);
auto value = metadata_cache.find(url);
if (value != metadata_cache.end()) {
auto now = system_clock::now();
if (now < value->second->expires_at) {
return value->second->data;
}
}
return "";
}

bool IRCatalog::SetCachedValue(const string &url, const string &value,
const rest_api_objects::LoadTableResult &result) {
//! FIXME: shouldn't this also store the 'storage-credentials' ??
if (!result.has_config) {
return false;
}
auto &credentials = result.config;
auto expires_at_it = credentials.find("s3.session-token-expires-at-ms");
if (expires_at_it == credentials.end()) {
return false;
}

auto &expires_at = expires_at_it->second;
auto epochMillis = std::stoll(expires_at);
auto expired_time = system_clock::time_point(milliseconds(epochMillis));
auto val = make_uniq<MetadataCacheValue>(value, expired_time);
{
std::lock_guard<std::mutex> lock(metadata_cache_mutex);
metadata_cache[url] = std::move(val);
}
return true;
}

//===--------------------------------------------------------------------===//
// Attach
//===--------------------------------------------------------------------===//
Expand Down
25 changes: 21 additions & 4 deletions src/storage/irc_schema_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,28 @@ namespace duckdb {
IRCSchemaSet::IRCSchemaSet(Catalog &catalog) : catalog(catalog) {
}

optional_ptr<CatalogEntry> IRCSchemaSet::GetEntry(ClientContext &context, const string &name) {
LoadEntries(context);
optional_ptr<CatalogEntry> IRCSchemaSet::GetEntry(ClientContext &context, const string &name,
OnEntryNotFound if_not_found) {
lock_guard<mutex> l(entry_lock);
auto &ic_catalog = catalog.Cast<IRCatalog>();

auto entry = entries.find(name);
if (entry == entries.end()) {
return nullptr;
CreateSchemaInfo info;
if (!IRCAPI::VerifySchemaExistence(context, ic_catalog, name)) {
if (if_not_found == OnEntryNotFound::RETURN_NULL) {
return nullptr;
} else {
throw CatalogException("Iceberg namespace by the name of '%s' does not exist", name);
}
}
info.schema = name;
info.internal = false;
auto schema_entry = make_uniq<IRCSchemaEntry>(catalog, info);
schema_entry->namespace_items = {name};
CreateEntryInternal(context, std::move(schema_entry));
entry = entries.find(name);
D_ASSERT(entry != entries.end());
}
return entry->second.get();
}
Expand All @@ -34,7 +50,7 @@ static string GetSchemaName(const vector<string> &items) {
}

void IRCSchemaSet::LoadEntries(ClientContext &context) {
if (!entries.empty()) {
if (listed) {
return;
}

Expand All @@ -48,6 +64,7 @@ void IRCSchemaSet::LoadEntries(ClientContext &context) {
schema_entry->namespace_items = std::move(schema.items);
CreateEntryInternal(context, std::move(schema_entry));
}
listed = true;
}

optional_ptr<CatalogEntry> IRCSchemaSet::CreateEntryInternal(ClientContext &context, unique_ptr<CatalogEntry> entry) {
Expand Down
Loading
Loading