Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/v/datalake/coordinator/coordinator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -865,8 +865,12 @@ ss::sstring coordinator::get_effective_default_partition_spec(
using_glue_catalog()
&& current_spec == cfg.iceberg_default_partition_spec.default_value()) {
// Glue can't partition on nested fields like redpanda.timestamp.
vlog(
datalake_log.warn,
static constexpr auto rate_limit = std::chrono::seconds(5);
static thread_local ss::logger::rate_limit rate(rate_limit);
vloglr(
datalake_log,
ss::log_level::warn,
rate,
"Overriding default partition spec to '()' for AWS Glue "
"compatibility");
return "()";
Expand Down
3 changes: 3 additions & 0 deletions src/v/iceberg/rest_client/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ redpanda_cc_library(
hdrs = [
"catalog_client.h",
],
implementation_deps = [
"//src/v/bytes:iobuf_parser",
],
visibility = ["//visibility:public"],
deps = [
":client_probe",
Expand Down
137 changes: 103 additions & 34 deletions src/v/iceberg/rest_client/catalog_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include "absl/strings/str_join.h"
#include "absl/strings/strip.h"
#include "bytes/iobuf_parser.h"
#include "bytes/streambuf.h"
#include "config/types.h"
#include "datalake/credential_manager.h"
Expand Down Expand Up @@ -51,6 +52,19 @@ iobuf serialize_payload_as_json(const T& payload) {

return std::move(buf).as_iobuf();
}

template<typename T>
void maybe_log_payload_as_json(
ss::logger& l, ss::log_level lvl, std::string_view msg, const T& payload) {
if (!l.is_enabled(lvl)) {
return;
}
auto buf = serialize_payload_as_json(payload);
iobuf_parser p(std::move(buf));
const auto logged_size = std::min(p.bytes_left(), 4_KiB);
vlogl(l, lvl, "{}: {}", msg, p.read_string_safe(logged_size));
}

static constexpr std::string_view json_content_type = "application/json";
static constexpr std::string_view oauth_token_endpoint = "oauth/tokens";
static constexpr std::string_view config_endpoint = "config";
Expand Down Expand Up @@ -217,12 +231,23 @@ catalog_client::acquire_token(retry_chain_node& rtc) {
{"client_secret", creds.client_secret},
{"scope", creds.oauth2_scope},
});
co_return (co_await perform_request(
rtc,
token_request,
custom_oauth2_server ? *creds.oauth2_server_uri : _endpoint,
client_probe::endpoint::oauth_token,
std::move(payload)))
auto req_res = co_await perform_request(
rtc,
token_request,
custom_oauth2_server ? *creds.oauth2_server_uri : _endpoint,
client_probe::endpoint::oauth_token,
std::move(payload));
if (!req_res.has_value()) {
vlogl(
log,
ss::log_level::trace,
"Failed to perform oauth_token request with payload: client_id={}, "
"scope={}",
creds.client_id,
creds.oauth2_scope);
co_return tl::unexpected(req_res.error());
}
co_return std::move(req_res)
.and_then(parse_json)
.and_then(parse_as_expected("oauth_token", parse_oauth_token));
}
Expand Down Expand Up @@ -413,12 +438,21 @@ catalog_client::create_namespace(
co_return tl::unexpected(auth_result.error());
}

co_return (co_await perform_request(
rtc,
http_request,
_endpoint,
client_probe::endpoint::create_namespace,
serialize_payload_as_json(req)))
auto req_res = co_await perform_request(
rtc,
http_request,
_endpoint,
client_probe::endpoint::create_namespace,
serialize_payload_as_json(req));
if (!req_res.has_value()) {
maybe_log_payload_as_json(
log,
ss::log_level::trace,
"Failed to perform create_namespace request",
req);
co_return tl::unexpected(req_res.error());
}
co_return std::move(req_res)
.and_then(parse_json)
.and_then(
parse_as_expected("create_namespace", parse_create_namespace_response));
Expand All @@ -444,12 +478,21 @@ ss::future<expected<load_table_result>> catalog_client::create_table(
co_return tl::unexpected(auth_result.error());
}

co_return (co_await perform_request(
rtc,
http_request,
_endpoint,
client_probe::endpoint::create_table,
serialize_payload_as_json(req)))
auto req_res = co_await perform_request(
rtc,
http_request,
_endpoint,
client_probe::endpoint::create_table,
serialize_payload_as_json(req));
if (!req_res.has_value()) {
maybe_log_payload_as_json(
log,
ss::log_level::trace,
"Failed to perform create_table request",
req);
co_return tl::unexpected(req_res.error());
}
co_return std::move(req_res)
.and_then(parse_json)
.and_then(parse_as_expected("create_table", parse_load_table_result));
}
Expand All @@ -473,9 +516,18 @@ ss::future<expected<load_table_result>> catalog_client::load_table(
co_return tl::unexpected(auth_result.error());
}

co_return (
co_await perform_request(
rtc, http_request, _endpoint, client_probe::endpoint::load_table))
auto req_res = co_await perform_request(
rtc, http_request, _endpoint, client_probe::endpoint::load_table);
if (!req_res.has_value()) {
vlog(
log.trace,
"Failed to perform load_table request for table '{}' in namespace "
"'{}'",
table_name,
absl::StrJoin(ns, "."));
co_return tl::unexpected(req_res.error());
}
co_return std::move(req_res)
.and_then(parse_json)
.and_then(parse_as_expected("load_table", parse_load_table_result));
}
Expand Down Expand Up @@ -507,13 +559,21 @@ ss::future<expected<std::monostate>> catalog_client::drop_table(
co_return tl::unexpected(auth_result.error());
}

co_return (
co_await perform_request(
rtc, http_request, _endpoint, client_probe::endpoint::drop_table))
.map([](iobuf&&) {
// we expect empty response, discard it
return std::monostate{};
});
auto req_res = co_await perform_request(
rtc, http_request, _endpoint, client_probe::endpoint::drop_table);
if (!req_res.has_value()) {
vlog(
log.trace,
"Failed to perform drop_table request for table '{}' in namespace "
"'{}'",
table_name,
absl::StrJoin(ns, "."));
co_return tl::unexpected(req_res.error());
}
co_return std::move(req_res).map([](iobuf&&) {
// we expect empty response, discard it
return std::monostate{};
});
}

ss::future<expected<commit_table_response>> catalog_client::commit_table_update(
Expand All @@ -535,12 +595,21 @@ ss::future<expected<commit_table_response>> catalog_client::commit_table_update(
co_return tl::unexpected(auth_result.error());
}

co_return (co_await perform_request(
rtc,
http_request,
_endpoint,
client_probe::endpoint::commit_table_update,
serialize_payload_as_json(commit_request)))
auto req_res = co_await perform_request(
rtc,
http_request,
_endpoint,
client_probe::endpoint::commit_table_update,
serialize_payload_as_json(commit_request));
if (!req_res.has_value()) {
maybe_log_payload_as_json(
log,
ss::log_level::trace,
"Failed to perform commit_table_update request",
commit_request);
co_return tl::unexpected(req_res.error());
}
co_return std::move(req_res)
.and_then(parse_json)
.and_then(
parse_as_expected("commit_table_update", parse_commit_table_response));
Expand Down