Skip to content

Commit 951f1f2

Browse files
authored
Merge pull request #24563 from vbotbuildovich/backport-pr-24555-v24.3.x-909
[v24.3.x] dl/coordinator: add debug logging in coordinator
2 parents 2abf7b0 + 09e2917 commit 951f1f2

File tree

4 files changed

+75
-6
lines changed

4 files changed

+75
-6
lines changed

src/v/datalake/coordinator/iceberg_file_committer.cc

+40
Original file line numberDiff line numberDiff line change
@@ -99,17 +99,24 @@ ss::future<
9999
checked<chunked_vector<mark_files_committed_update>, file_committer::errc>>
100100
iceberg_file_committer::commit_topic_files_to_catalog(
101101
model::topic topic, const topics_state& state) const {
102+
vlog(datalake_log.debug, "Beginning commit for topic {}", topic);
102103
auto tp_it = state.topic_to_state.find(topic);
103104
if (
104105
tp_it == state.topic_to_state.end()
105106
|| !tp_it->second.has_pending_entries()) {
107+
vlog(datalake_log.debug, "Topic {} has no pending entries", topic);
106108
co_return chunked_vector<mark_files_committed_update>{};
107109
}
108110
auto topic_revision = tp_it->second.revision;
109111

110112
auto table_id = table_id_for_topic(topic);
111113
auto table_res = co_await load_table(table_id);
112114
if (table_res.has_error()) {
115+
vlog(
116+
datalake_log.warn,
117+
"Error loading table {} for committing from topic {}",
118+
table_id,
119+
topic);
113120
co_return table_res.error();
114121
}
115122
auto& table = table_res.value();
@@ -130,6 +137,13 @@ iceberg_file_committer::commit_topic_files_to_catalog(
130137
if (
131138
tp_it == state.topic_to_state.end()
132139
|| tp_it->second.revision != topic_revision) {
140+
vlog(
141+
datalake_log.debug,
142+
"Commit returning early, topic {} state is missing or revision has "
143+
"changed: current {} vs expected {}",
144+
topic,
145+
tp_it->second.revision,
146+
topic_revision);
133147
co_return chunked_vector<mark_files_committed_update>{};
134148
}
135149

@@ -148,6 +162,16 @@ iceberg_file_committer::commit_topic_files_to_catalog(
148162
// replicate the fact that it was committed previously, but
149163
// don't construct a data_file to send to Iceberg as it is
150164
// already committed.
165+
vlog(
166+
datalake_log.debug,
167+
"Skipping entry for topic {} revision {} added at "
168+
"coordinator offset {} because table {} has data including "
169+
"coordinator offset {}",
170+
topic,
171+
topic_revision,
172+
e.added_pending_at,
173+
table_id,
174+
*iceberg_commit_meta_opt);
151175
continue;
152176
}
153177
new_committed_offset = std::max(
@@ -168,6 +192,11 @@ iceberg_file_committer::commit_topic_files_to_catalog(
168192
}
169193
}
170194
if (pending_commits.empty()) {
195+
vlog(
196+
datalake_log.debug,
197+
"No new data to mark committed for topic {} revision {}",
198+
topic,
199+
topic_revision);
171200
co_return chunked_vector<mark_files_committed_update>{};
172201
}
173202
chunked_vector<mark_files_committed_update> updates;
@@ -187,6 +216,12 @@ iceberg_file_committer::commit_topic_files_to_catalog(
187216
}
188217
if (icb_files.empty()) {
189218
// All files are deduplicated.
219+
vlog(
220+
datalake_log.debug,
221+
"All committed files were deduplicated for topic {} revision {}, "
222+
"returning without updating Iceberg catalog",
223+
topic,
224+
topic_revision);
190225
co_return updates;
191226
}
192227
vassert(
@@ -195,6 +230,11 @@ iceberg_file_committer::commit_topic_files_to_catalog(
195230
const auto commit_meta = commit_offset_metadata{
196231
.offset = *new_committed_offset,
197232
};
233+
vlog(
234+
datalake_log.debug,
235+
"Adding {} files to Iceberg table {}",
236+
icb_files.size(),
237+
table_id);
198238
iceberg::transaction txn(std::move(table));
199239
auto icb_append_res = co_await txn.merge_append(
200240
io_,

src/v/datalake/coordinator/state_machine.cc

+2-4
Original file line numberDiff line numberDiff line change
@@ -110,17 +110,15 @@ ss::future<> coordinator_stm::do_apply(const model::record_batch& b) {
110110
// TODO: make updates a variant so we can share code more easily?
111111
case update_key::add_files: {
112112
auto update = co_await serde::read_async<add_files_update>(val_p);
113-
vlog(
114-
_log.debug, "Applying {} from offset {}: {}", key, o, update.tp);
113+
vlog(_log.debug, "Applying {} from offset {}: {}", key, o, update);
115114
auto res = update.apply(state_, o);
116115
maybe_log_update_error(_log, key, o, res);
117116
continue;
118117
}
119118
case update_key::mark_files_committed: {
120119
auto update
121120
= co_await serde::read_async<mark_files_committed_update>(val_p);
122-
vlog(
123-
_log.debug, "Applying {} from offset {}: {}", key, o, update.tp);
121+
vlog(_log.debug, "Applying {} from offset {}: {}", key, o, update);
124122
auto res = update.apply(state_);
125123
maybe_log_update_error(_log, key, o, res);
126124
continue;

src/v/datalake/coordinator/state_update.cc

+28-1
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,34 @@ topic_lifecycle_update::apply(topics_state& state) {
284284
return true;
285285
}
286286

287-
std::ostream& operator<<(std::ostream& o, topic_lifecycle_update u) {
287+
std::ostream& operator<<(std::ostream& o, const add_files_update& u) {
288+
fmt::print(o, "{{tp: {}, revision: {}, entries: [", u.tp, u.topic_revision);
289+
static constexpr size_t max_to_log = 6;
290+
static constexpr size_t halved = max_to_log / 2;
291+
const auto& e = u.entries;
292+
if (e.size() <= max_to_log) {
293+
fmt::print(o, "{}", fmt::join(e, ", "));
294+
} else {
295+
fmt::print(o, "{}", fmt::join(e.begin(), e.begin() + halved, ", "));
296+
o << "...";
297+
fmt::print(o, "{}", fmt::join(e.end() - halved, e.end(), ", "));
298+
}
299+
fmt::print(o, "] ({} entries)}}", e.size());
300+
return o;
301+
}
302+
303+
std::ostream&
304+
operator<<(std::ostream& o, const mark_files_committed_update& u) {
305+
fmt::print(
306+
o,
307+
"{{tp: {}, revision: {}, new_committed: {}}}",
308+
u.tp,
309+
u.topic_revision,
310+
u.new_committed);
311+
return o;
312+
}
313+
314+
std::ostream& operator<<(std::ostream& o, const topic_lifecycle_update& u) {
288315
fmt::print(
289316
o,
290317
"{{topic: {}, revision: {}, new_state: {}}}",

src/v/datalake/coordinator/state_update.h

+5-1
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ struct add_files_update
4545
checked<std::nullopt_t, stm_update_error> can_apply(const topics_state&);
4646
checked<std::nullopt_t, stm_update_error>
4747
apply(topics_state&, model::offset);
48+
friend std::ostream& operator<<(std::ostream&, const add_files_update&);
4849

4950
model::topic_partition tp;
5051
model::revision_id topic_revision;
@@ -68,6 +69,8 @@ struct mark_files_committed_update
6869

6970
checked<std::nullopt_t, stm_update_error> can_apply(const topics_state&);
7071
checked<std::nullopt_t, stm_update_error> apply(topics_state&);
72+
friend std::ostream&
73+
operator<<(std::ostream&, const mark_files_committed_update&);
7174

7275
model::topic_partition tp;
7376
model::revision_id topic_revision;
@@ -90,7 +93,8 @@ struct topic_lifecycle_update
9093
checked<bool, stm_update_error> can_apply(const topics_state&);
9194
checked<bool, stm_update_error> apply(topics_state&);
9295

93-
friend std::ostream& operator<<(std::ostream&, topic_lifecycle_update);
96+
friend std::ostream&
97+
operator<<(std::ostream&, const topic_lifecycle_update&);
9498

9599
model::topic topic;
96100
model::revision_id revision;

0 commit comments

Comments
 (0)