12
12
13
13
#include " absl/strings/str_join.h"
14
14
#include " absl/strings/strip.h"
15
+ #include " bytes/iobuf_parser.h"
15
16
#include " bytes/streambuf.h"
16
17
#include " config/types.h"
17
18
#include " datalake/credential_manager.h"
@@ -51,6 +52,19 @@ iobuf serialize_payload_as_json(const T& payload) {
51
52
52
53
return std::move (buf).as_iobuf ();
53
54
}
55
+
56
+ template <typename T>
57
+ void maybe_log_payload_as_json (
58
+ ss::logger& l, ss::log_level lvl, std::string_view msg, const T& payload) {
59
+ if (!l.is_enabled (lvl)) {
60
+ return ;
61
+ }
62
+ auto buf = serialize_payload_as_json (payload);
63
+ iobuf_parser p (std::move (buf));
64
+ const auto logged_size = std::min (p.bytes_left (), 4_KiB);
65
+ vlogl (l, lvl, " {}: {}" , msg, p.read_string_safe (logged_size));
66
+ }
67
+
54
68
static constexpr std::string_view json_content_type = " application/json" ;
55
69
static constexpr std::string_view oauth_token_endpoint = " oauth/tokens" ;
56
70
static constexpr std::string_view config_endpoint = " config" ;
@@ -217,12 +231,23 @@ catalog_client::acquire_token(retry_chain_node& rtc) {
217
231
{" client_secret" , creds.client_secret },
218
232
{" scope" , creds.oauth2_scope },
219
233
});
220
- co_return (co_await perform_request (
221
- rtc,
222
- token_request,
223
- custom_oauth2_server ? *creds.oauth2_server_uri : _endpoint,
224
- client_probe::endpoint::oauth_token,
225
- std::move (payload)))
234
+ auto req_res = co_await perform_request (
235
+ rtc,
236
+ token_request,
237
+ custom_oauth2_server ? *creds.oauth2_server_uri : _endpoint,
238
+ client_probe::endpoint::oauth_token,
239
+ std::move (payload));
240
+ if (!req_res.has_value ()) {
241
+ vlogl (
242
+ log,
243
+ ss::log_level::trace,
244
+ " Failed to perform oauth_token request with payload: client_id={}, "
245
+ " scope={}" ,
246
+ creds.client_id ,
247
+ creds.oauth2_scope );
248
+ co_return tl::unexpected (req_res.error ());
249
+ }
250
+ co_return std::move (req_res)
226
251
.and_then (parse_json)
227
252
.and_then (parse_as_expected (" oauth_token" , parse_oauth_token));
228
253
}
@@ -413,12 +438,21 @@ catalog_client::create_namespace(
413
438
co_return tl::unexpected (auth_result.error ());
414
439
}
415
440
416
- co_return (co_await perform_request (
417
- rtc,
418
- http_request,
419
- _endpoint,
420
- client_probe::endpoint::create_namespace,
421
- serialize_payload_as_json (req)))
441
+ auto req_res = co_await perform_request (
442
+ rtc,
443
+ http_request,
444
+ _endpoint,
445
+ client_probe::endpoint::create_namespace,
446
+ serialize_payload_as_json (req));
447
+ if (!req_res.has_value ()) {
448
+ maybe_log_payload_as_json (
449
+ log,
450
+ ss::log_level::trace,
451
+ " Failed to perform create_namespace request" ,
452
+ req);
453
+ co_return tl::unexpected (req_res.error ());
454
+ }
455
+ co_return std::move (req_res)
422
456
.and_then (parse_json)
423
457
.and_then (
424
458
parse_as_expected (" create_namespace" , parse_create_namespace_response));
@@ -444,12 +478,21 @@ ss::future<expected<load_table_result>> catalog_client::create_table(
444
478
co_return tl::unexpected (auth_result.error ());
445
479
}
446
480
447
- co_return (co_await perform_request (
448
- rtc,
449
- http_request,
450
- _endpoint,
451
- client_probe::endpoint::create_table,
452
- serialize_payload_as_json (req)))
481
+ auto req_res = co_await perform_request (
482
+ rtc,
483
+ http_request,
484
+ _endpoint,
485
+ client_probe::endpoint::create_table,
486
+ serialize_payload_as_json (req));
487
+ if (!req_res.has_value ()) {
488
+ maybe_log_payload_as_json (
489
+ log,
490
+ ss::log_level::trace,
491
+ " Failed to perform create_table request" ,
492
+ req);
493
+ co_return tl::unexpected (req_res.error ());
494
+ }
495
+ co_return std::move (req_res)
453
496
.and_then (parse_json)
454
497
.and_then (parse_as_expected (" create_table" , parse_load_table_result));
455
498
}
@@ -473,9 +516,18 @@ ss::future<expected<load_table_result>> catalog_client::load_table(
473
516
co_return tl::unexpected (auth_result.error ());
474
517
}
475
518
476
- co_return (
477
- co_await perform_request (
478
- rtc, http_request, _endpoint, client_probe::endpoint::load_table))
519
+ auto req_res = co_await perform_request (
520
+ rtc, http_request, _endpoint, client_probe::endpoint::load_table);
521
+ if (!req_res.has_value ()) {
522
+ vlog (
523
+ log.trace ,
524
+ " Failed to perform load_table request for table '{}' in namespace "
525
+ " '{}'" ,
526
+ table_name,
527
+ absl::StrJoin (ns, " ." ));
528
+ co_return tl::unexpected (req_res.error ());
529
+ }
530
+ co_return std::move (req_res)
479
531
.and_then (parse_json)
480
532
.and_then (parse_as_expected (" load_table" , parse_load_table_result));
481
533
}
@@ -507,13 +559,21 @@ ss::future<expected<std::monostate>> catalog_client::drop_table(
507
559
co_return tl::unexpected (auth_result.error ());
508
560
}
509
561
510
- co_return (
511
- co_await perform_request (
512
- rtc, http_request, _endpoint, client_probe::endpoint::drop_table))
513
- .map ([](iobuf&&) {
514
- // we expect empty response, discard it
515
- return std::monostate{};
516
- });
562
+ auto req_res = co_await perform_request (
563
+ rtc, http_request, _endpoint, client_probe::endpoint::drop_table);
564
+ if (!req_res.has_value ()) {
565
+ vlog (
566
+ log.trace ,
567
+ " Failed to perform drop_table request for table '{}' in namespace "
568
+ " '{}'" ,
569
+ table_name,
570
+ absl::StrJoin (ns, " ." ));
571
+ co_return tl::unexpected (req_res.error ());
572
+ }
573
+ co_return std::move (req_res).map ([](iobuf&&) {
574
+ // we expect empty response, discard it
575
+ return std::monostate{};
576
+ });
517
577
}
518
578
519
579
ss::future<expected<commit_table_response>> catalog_client::commit_table_update (
@@ -535,12 +595,21 @@ ss::future<expected<commit_table_response>> catalog_client::commit_table_update(
535
595
co_return tl::unexpected (auth_result.error ());
536
596
}
537
597
538
- co_return (co_await perform_request (
539
- rtc,
540
- http_request,
541
- _endpoint,
542
- client_probe::endpoint::commit_table_update,
543
- serialize_payload_as_json (commit_request)))
598
+ auto req_res = co_await perform_request (
599
+ rtc,
600
+ http_request,
601
+ _endpoint,
602
+ client_probe::endpoint::commit_table_update,
603
+ serialize_payload_as_json (commit_request));
604
+ if (!req_res.has_value ()) {
605
+ maybe_log_payload_as_json (
606
+ log,
607
+ ss::log_level::trace,
608
+ " Failed to perform commit_table_update request" ,
609
+ commit_request);
610
+ co_return tl::unexpected (req_res.error ());
611
+ }
612
+ co_return std::move (req_res)
544
613
.and_then (parse_json)
545
614
.and_then (
546
615
parse_as_expected (" commit_table_update" , parse_commit_table_response));
0 commit comments