@@ -307,11 +307,10 @@ ss::future<> backend::work_once() {
307
307
_nodes_to_retry.erase (node_id);
308
308
co_await send_rpc (node_id);
309
309
}
310
- co_await ssx::async_for_each (
311
- to_schedule_topic_work, [this ](const auto & nt) {
312
- _topic_work_to_retry.erase (nt);
313
- return schedule_topic_work (nt);
314
- });
310
+ for (const auto & nt : to_schedule_topic_work) {
311
+ _topic_work_to_retry.erase (nt);
312
+ co_await schedule_topic_work (nt);
313
+ }
315
314
spawn_advances ();
316
315
if (next_tick == model::timeout_clock::time_point::max ()) {
317
316
_timer.cancel ();
@@ -1261,20 +1260,21 @@ ss::future<> backend::reconcile_migration(
1261
1260
metadata.id ,
1262
1261
mrstate.scope .sought_state );
1263
1262
co_await std::visit (
1264
- [this , &metadata, &mrstate](const auto & migration) mutable {
1263
+ [this , migration_id = metadata.id , &mrstate](
1264
+ const auto & migration) mutable {
1265
1265
return ss::do_with (
1266
- migration.topic_nts (),
1267
- [ this , &metadata, &mrstate]( const auto & nts) {
1268
- // poor man's `nts | std::views::enumerate`
1269
- auto enumerated_nts = std::views::transform (
1270
- nts, [ index = - 1 ]( const auto & nt) mutable {
1271
- return std::forward_as_tuple (++ index , nt);
1272
- });
1273
- return ssx::async_for_each (
1266
+ // poor man's ` migration.topic_nts() | std::views::enumerate`
1267
+ std::views::transform (
1268
+ migration. topic_nts (),
1269
+ [ index = - 1 ]( const auto & nt) mutable {
1270
+ return std::forward_as_tuple (++ index , nt);
1271
+ }),
1272
+ [ this , migration_id, &mrstate]( auto & enumerated_nts) {
1273
+ return ss::do_for_each (
1274
1274
enumerated_nts,
1275
- [this , &metadata , &mrstate](const auto & idx_nt) {
1275
+ [this , migration_id , &mrstate](const auto & idx_nt) {
1276
1276
auto & [idx, nt] = idx_nt;
1277
- return reconcile_topic (metadata. id , idx, nt, mrstate);
1277
+ return reconcile_topic (migration_id , idx, nt, mrstate);
1278
1278
});
1279
1279
});
1280
1280
},
0 commit comments