12
12
13
13
#include " absl/strings/str_join.h"
14
14
#include " absl/strings/strip.h"
15
+ #include " bytes/iobuf_parser.h"
15
16
#include " bytes/streambuf.h"
16
17
#include " config/types.h"
17
18
#include " datalake/credential_manager.h"
@@ -51,6 +52,18 @@ iobuf serialize_payload_as_json(const T& payload) {
51
52
52
53
return std::move (buf).as_iobuf ();
53
54
}
55
+
56
+ template <typename T>
57
+ void maybe_log_payload_as_json (
58
+ ss::logger& l, ss::log_level lvl, std::string_view msg, const T& payload) {
59
+ if (!l.is_enabled (lvl)) {
60
+ return ;
61
+ }
62
+ auto buf = serialize_payload_as_json (payload);
63
+ iobuf_parser p (std::move (buf));
64
+ vlogl (iceberg::log, lvl, " {}: {}" , msg, p.read_string_safe (4_KiB));
65
+ }
66
+
54
67
static constexpr std::string_view json_content_type = " application/json" ;
55
68
static constexpr std::string_view oauth_token_endpoint = " oauth/tokens" ;
56
69
static constexpr std::string_view config_endpoint = " config" ;
@@ -217,12 +230,23 @@ catalog_client::acquire_token(retry_chain_node& rtc) {
217
230
{" client_secret" , creds.client_secret },
218
231
{" scope" , creds.oauth2_scope },
219
232
});
220
- co_return (co_await perform_request (
221
- rtc,
222
- token_request,
223
- custom_oauth2_server ? *creds.oauth2_server_uri : _endpoint,
224
- client_probe::endpoint::oauth_token,
225
- std::move (payload)))
233
+ auto req_res = co_await perform_request (
234
+ rtc,
235
+ token_request,
236
+ custom_oauth2_server ? *creds.oauth2_server_uri : _endpoint,
237
+ client_probe::endpoint::oauth_token,
238
+ std::move (payload));
239
+ if (!req_res.has_value ()) {
240
+ vlogl (
241
+ log,
242
+ ss::log_level::trace,
243
+ " Failed to perform oauth_token request with payload: client_id={}, "
244
+ " scope={}" ,
245
+ creds.client_id ,
246
+ creds.oauth2_scope );
247
+ co_return tl::unexpected (req_res.error ());
248
+ }
249
+ co_return std::move (req_res)
226
250
.and_then (parse_json)
227
251
.and_then (parse_as_expected (" oauth_token" , parse_oauth_token));
228
252
}
@@ -413,12 +437,21 @@ catalog_client::create_namespace(
413
437
co_return tl::unexpected (auth_result.error ());
414
438
}
415
439
416
- co_return (co_await perform_request (
417
- rtc,
418
- http_request,
419
- _endpoint,
420
- client_probe::endpoint::create_namespace,
421
- serialize_payload_as_json (req)))
440
+ auto req_res = co_await perform_request (
441
+ rtc,
442
+ http_request,
443
+ _endpoint,
444
+ client_probe::endpoint::create_namespace,
445
+ serialize_payload_as_json (req));
446
+ if (!req_res.has_value ()) {
447
+ maybe_log_payload_as_json (
448
+ log,
449
+ ss::log_level::trace,
450
+ " Failed to perform create_namespace request" ,
451
+ req);
452
+ co_return tl::unexpected (req_res.error ());
453
+ }
454
+ co_return std::move (req_res)
422
455
.and_then (parse_json)
423
456
.and_then (
424
457
parse_as_expected (" create_namespace" , parse_create_namespace_response));
@@ -444,12 +477,21 @@ ss::future<expected<load_table_result>> catalog_client::create_table(
444
477
co_return tl::unexpected (auth_result.error ());
445
478
}
446
479
447
- co_return (co_await perform_request (
448
- rtc,
449
- http_request,
450
- _endpoint,
451
- client_probe::endpoint::create_table,
452
- serialize_payload_as_json (req)))
480
+ auto req_res = co_await perform_request (
481
+ rtc,
482
+ http_request,
483
+ _endpoint,
484
+ client_probe::endpoint::create_table,
485
+ serialize_payload_as_json (req));
486
+ if (!req_res.has_value ()) {
487
+ maybe_log_payload_as_json (
488
+ log,
489
+ ss::log_level::trace,
490
+ " Failed to perform create_table request" ,
491
+ req);
492
+ co_return tl::unexpected (req_res.error ());
493
+ }
494
+ co_return std::move (req_res)
453
495
.and_then (parse_json)
454
496
.and_then (parse_as_expected (" create_table" , parse_load_table_result));
455
497
}
@@ -473,9 +515,18 @@ ss::future<expected<load_table_result>> catalog_client::load_table(
473
515
co_return tl::unexpected (auth_result.error ());
474
516
}
475
517
476
- co_return (
477
- co_await perform_request (
478
- rtc, http_request, _endpoint, client_probe::endpoint::load_table))
518
+ auto req_res = co_await perform_request (
519
+ rtc, http_request, _endpoint, client_probe::endpoint::load_table);
520
+ if (!req_res.has_value ()) {
521
+ vlog (
522
+ log.trace ,
523
+ " Failed to perform load_table request for table '{}' in namespace "
524
+ " '{}'" ,
525
+ table_name,
526
+ absl::StrJoin (ns, " ." ));
527
+ co_return tl::unexpected (req_res.error ());
528
+ }
529
+ co_return std::move (req_res)
479
530
.and_then (parse_json)
480
531
.and_then (parse_as_expected (" load_table" , parse_load_table_result));
481
532
}
@@ -507,13 +558,21 @@ ss::future<expected<std::monostate>> catalog_client::drop_table(
507
558
co_return tl::unexpected (auth_result.error ());
508
559
}
509
560
510
- co_return (
511
- co_await perform_request (
512
- rtc, http_request, _endpoint, client_probe::endpoint::drop_table))
513
- .map ([](iobuf&&) {
514
- // we expect empty response, discard it
515
- return std::monostate{};
516
- });
561
+ auto req_res = co_await perform_request (
562
+ rtc, http_request, _endpoint, client_probe::endpoint::drop_table);
563
+ if (!req_res.has_value ()) {
564
+ vlog (
565
+ log.trace ,
566
+ " Failed to perform drop_table request for table '{}' in namespace "
567
+ " '{}'" ,
568
+ table_name,
569
+ absl::StrJoin (ns, " ." ));
570
+ co_return tl::unexpected (req_res.error ());
571
+ }
572
+ co_return std::move (req_res).map ([](iobuf&&) {
573
+ // we expect empty response, discard it
574
+ return std::monostate{};
575
+ });
517
576
}
518
577
519
578
ss::future<expected<commit_table_response>> catalog_client::commit_table_update (
@@ -535,12 +594,21 @@ ss::future<expected<commit_table_response>> catalog_client::commit_table_update(
535
594
co_return tl::unexpected (auth_result.error ());
536
595
}
537
596
538
- co_return (co_await perform_request (
539
- rtc,
540
- http_request,
541
- _endpoint,
542
- client_probe::endpoint::commit_table_update,
543
- serialize_payload_as_json (commit_request)))
597
+ auto req_res = co_await perform_request (
598
+ rtc,
599
+ http_request,
600
+ _endpoint,
601
+ client_probe::endpoint::commit_table_update,
602
+ serialize_payload_as_json (commit_request));
603
+ if (!req_res.has_value ()) {
604
+ maybe_log_payload_as_json (
605
+ log,
606
+ ss::log_level::trace,
607
+ " Failed to perform commit_table_update request" ,
608
+ commit_request);
609
+ co_return tl::unexpected (req_res.error ());
610
+ }
611
+ co_return std::move (req_res)
544
612
.and_then (parse_json)
545
613
.and_then (
546
614
parse_as_expected (" commit_table_update" , parse_commit_table_response));
0 commit comments