@@ -132,8 +132,13 @@ ss::future<> recovery_stm::do_recover(ss::io_priority_class iopc) {
132
132
co_return ;
133
133
}
134
134
135
- // wait for another round
136
- if (meta.value ()->last_sent_offset >= lstats.dirty_offset ) {
135
+ /* *
136
+ * If expected_log_end_offset is indicating that all the requests were
137
+ * already dispatched to the follower wait for append entries responses. The
138
+ * responses will trigger the follower state condition variable and
139
+ * recovery_stm will redo the check if follower still needs to be recovered.
140
+ */
141
+ if (meta.value ()->expected_log_end_offset >= lstats.dirty_offset ) {
137
142
co_await meta.value ()
138
143
->follower_state_change .wait ()
139
144
.handle_exception_type ([this ](const ss::broken_condition_variable&) {
@@ -160,16 +165,6 @@ ss::future<> recovery_stm::do_recover(ss::io_priority_class iopc) {
160
165
meta = get_follower_meta ();
161
166
}
162
167
163
- bool recovery_stm::state_changed () {
164
- auto meta = get_follower_meta ();
165
- if (!meta) {
166
- return true ;
167
- }
168
- auto lstats = _ptr->_log ->offsets ();
169
- return lstats.dirty_offset > meta.value ()->last_dirty_log_index
170
- || meta.value ()->last_sent_offset == lstats.dirty_offset ;
171
- }
172
-
173
168
flush_after_append
174
169
recovery_stm::should_flush (model::offset follower_committed_match_index) const {
175
170
constexpr size_t checkpoint_flush_size = 1_MiB;
@@ -319,7 +314,15 @@ ss::future<> recovery_stm::send_install_snapshot_request() {
319
314
.dirty_offset = _ptr->dirty_offset ()};
320
315
321
316
_sent_snapshot_bytes += chunk_size;
322
-
317
+ if (req.done ) {
318
+ auto meta = get_follower_meta ();
319
+ if (!meta) {
320
+ // stop recovery when node was removed
321
+ _stop_requested = true ;
322
+ return ss::make_ready_future<>();
323
+ }
324
+ (*meta)->expected_log_end_offset = _ptr->_last_snapshot_index ;
325
+ }
323
326
vlog (_ctxlog.trace , " sending install_snapshot request: {}" , req);
324
327
auto hb_guard = _ptr->suppress_heartbeats (_node_id);
325
328
return _ptr->_client_protocol
@@ -375,7 +378,6 @@ ss::future<> recovery_stm::handle_install_snapshot_reply(
375
378
// snapshot received by the follower, continue with recovery
376
379
(*meta)->match_index = _ptr->_last_snapshot_index ;
377
380
(*meta)->next_index = model::next_offset (_ptr->_last_snapshot_index );
378
- (*meta)->last_sent_offset = _ptr->_last_snapshot_index ;
379
381
return close_snapshot_reader ();
380
382
}
381
383
@@ -444,7 +446,11 @@ ss::future<> recovery_stm::replicate(
444
446
_stop_requested = true ;
445
447
return ss::now ();
446
448
}
447
- meta.value ()->last_sent_offset = _last_batch_offset;
449
+ /* *
450
+ * Update follower expected log end. It is equal to the last batch in a set
451
+ * of batches read for this recovery round.
452
+ */
453
+ meta.value ()->expected_log_end_offset = _last_batch_offset;
448
454
meta.value ()->last_sent_protocol_meta = r.metadata ();
449
455
_ptr->update_node_append_timestamp (_node_id);
450
456
@@ -493,7 +499,7 @@ ss::future<> recovery_stm::replicate(
493
499
}
494
500
meta.value ()->next_index = std::max (
495
501
model::offset (0 ), model::prev_offset (_base_batch_offset));
496
- meta. value ()-> last_sent_offset = model::offset{};
502
+
497
503
vlog (
498
504
_ctxlog.trace ,
499
505
" Move next index {} backward" ,
0 commit comments