From fff308a353280394c7ff0788e08c6e105ffef77c Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Tue, 19 Sep 2023 09:21:45 +0200 Subject: [PATCH 1/3] r/recovery_stm: removed unused state_changed method Signed-off-by: Michal Maslanka (cherry picked from commit fbf71007041db876fdd3f16096caf5a9ce2f6e76) --- src/v/raft/recovery_stm.cc | 10 ---------- src/v/raft/recovery_stm.h | 2 +- 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/src/v/raft/recovery_stm.cc b/src/v/raft/recovery_stm.cc index a08fd9aa9a84b..c531d80b12580 100644 --- a/src/v/raft/recovery_stm.cc +++ b/src/v/raft/recovery_stm.cc @@ -159,16 +159,6 @@ ss::future<> recovery_stm::do_recover(ss::io_priority_class iopc) { meta = get_follower_meta(); } -bool recovery_stm::state_changed() { - auto meta = get_follower_meta(); - if (!meta) { - return true; - } - auto lstats = _ptr->_log.offsets(); - return lstats.dirty_offset > meta.value()->last_dirty_log_index - || meta.value()->last_sent_offset == lstats.dirty_offset; -} - flush_after_append recovery_stm::should_flush(model::offset follower_committed_match_index) const { constexpr size_t checkpoint_flush_size = 1_MiB; diff --git a/src/v/raft/recovery_stm.h b/src/v/raft/recovery_stm.h index 32da2af396720..d57a9cd77ca2c 100644 --- a/src/v/raft/recovery_stm.h +++ b/src/v/raft/recovery_stm.h @@ -45,7 +45,7 @@ class recovery_stm { ss::future<> handle_install_snapshot_reply(result); ss::future<> open_snapshot_reader(); ss::future<> close_snapshot_reader(); - bool state_changed(); + bool is_recovery_finished(); flush_after_append should_flush(model::offset) const; consensus* _ptr; From 110462dc88e6b463eb32b6ed6d5a8508465dd7a9 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Tue, 19 Sep 2023 10:03:03 +0200 Subject: [PATCH 2/3] r/consensus: fixed tracking expected last offset of a follower In Redpanda Raft implementation there may more than one `append_entries_request` dispatched to the follower at the same time. Leader tracks follower expected end offset to coordinate `recovery_stm` and `append_entries_stm` and prevent delivering the same batches twice. In classic raft implementation there is always only one append entries request pending to the follower hence it is enough to update follower state when processing append entries reply. We must track the expected follower end before receiving response as the requests may already be in flight. Signed-off-by: Michal Maslanka (cherry picked from commit 299c32163da657d94fd48cb2050501fd2b370f52) --- src/v/cluster/cluster_utils.cc | 2 +- src/v/cluster/types.h | 4 ++-- src/v/raft/consensus.cc | 12 ++++++++--- src/v/raft/recovery_stm.cc | 28 +++++++++++++++++++------ src/v/raft/replicate_entries_stm.cc | 13 ++++++------ src/v/raft/types.cc | 2 +- src/v/raft/types.h | 5 ++++- src/v/redpanda/admin/api-doc/debug.json | 4 ++-- src/v/redpanda/admin_server.cc | 3 ++- 9 files changed, 50 insertions(+), 23 deletions(-) diff --git a/src/v/cluster/cluster_utils.cc b/src/v/cluster/cluster_utils.cc index 067a276e6ae80..90ad4446f8c8d 100644 --- a/src/v/cluster/cluster_utils.cc +++ b/src/v/cluster/cluster_utils.cc @@ -455,7 +455,7 @@ partition_raft_state get_partition_raft_state(consensus_ptr ptr) { state.last_flushed_log_index = md.last_flushed_log_index; state.match_index = md.match_index; state.next_index = md.next_index; - state.last_sent_offset = md.last_sent_offset; + state.expected_log_end_offset = md.expected_log_end_offset; state.heartbeats_failed = md.heartbeats_failed; state.is_learner = md.is_learner; state.is_recovering = md.is_recovering; diff --git a/src/v/cluster/types.h b/src/v/cluster/types.h index 75881f56a80cd..5d02c89225293 100644 --- a/src/v/cluster/types.h +++ b/src/v/cluster/types.h @@ -3543,7 +3543,7 @@ struct partition_raft_state model::offset last_dirty_log_index; model::offset match_index; model::offset next_index; - model::offset last_sent_offset; + model::offset expected_log_end_offset; size_t heartbeats_failed; bool is_learner; uint64_t ms_since_last_heartbeat; @@ -3560,7 +3560,7 @@ struct partition_raft_state last_dirty_log_index, match_index, next_index, - last_sent_offset, + expected_log_end_offset, heartbeats_failed, is_learner, ms_since_last_heartbeat, diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index 6c261bdf614db..07e9fa3e211f5 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -432,7 +432,7 @@ consensus::success_reply consensus::update_follower_index( successfull_append_entries_reply(idx, std::move(reply)); return success_reply::yes; } else { - idx.last_sent_offset = idx.last_dirty_log_index; + idx.expected_log_end_offset = model::offset{}; } if (idx.is_recovering) { @@ -444,7 +444,6 @@ consensus::success_reply consensus::update_follower_index( idx.last_dirty_log_index = reply.last_dirty_log_index; idx.last_flushed_log_index = reply.last_flushed_log_index; idx.next_index = model::next_offset(idx.last_dirty_log_index); - idx.last_sent_offset = model::offset{}; } return success_reply::no; } @@ -525,6 +524,13 @@ void consensus::successfull_append_entries_reply( idx.match_index = idx.last_dirty_log_index; idx.next_index = model::next_offset(idx.last_dirty_log_index); idx.last_successful_received_seq = idx.last_received_seq; + /** + * Update expected log end offset only if it is smaller than current value, + * the check is needed here as there might be pending append entries + * requests that were not yet replied by the follower. + */ + idx.expected_log_end_offset = std::max( + idx.last_dirty_log_index, idx.expected_log_end_offset); vlog( _ctxlog.trace, "Updated node {} match {} and next {} indices", @@ -559,7 +565,7 @@ void consensus::dispatch_recovery(follower_index_metadata& idx) { idx.next_index, log_max_offset); idx.next_index = log_max_offset; - idx.last_sent_offset = model::offset{}; + idx.expected_log_end_offset = model::offset{}; } idx.is_recovering = true; // background diff --git a/src/v/raft/recovery_stm.cc b/src/v/raft/recovery_stm.cc index c531d80b12580..9605f3cf2dbac 100644 --- a/src/v/raft/recovery_stm.cc +++ b/src/v/raft/recovery_stm.cc @@ -131,8 +131,13 @@ ss::future<> recovery_stm::do_recover(ss::io_priority_class iopc) { co_return; } - // wait for another round - if (meta.value()->last_sent_offset >= lstats.dirty_offset) { + /** + * If expected_log_end_offset is indicating that all the requests were + * already dispatched to the follower wait for append entries responses. The + * responses will trigger the follower state condition variable and + * recovery_stm will redo the check if follower still needs to be recovered. + */ + if (meta.value()->expected_log_end_offset >= lstats.dirty_offset) { co_await meta.value() ->follower_state_change.wait() .handle_exception_type([this](const ss::broken_condition_variable&) { @@ -307,7 +312,15 @@ ss::future<> recovery_stm::send_install_snapshot_request() { .done = (_sent_snapshot_bytes + chunk_size) == _snapshot_size}; _sent_snapshot_bytes += chunk_size; - + if (req.done) { + auto meta = get_follower_meta(); + if (!meta) { + // stop recovery when node was removed + _stop_requested = true; + return ss::make_ready_future<>(); + } + (*meta)->expected_log_end_offset = _ptr->_last_snapshot_index; + } vlog(_ctxlog.trace, "sending install_snapshot request: {}", req); auto seq = _ptr->next_follower_sequence(_node_id); _ptr->update_suppress_heartbeats( @@ -368,7 +381,6 @@ ss::future<> recovery_stm::handle_install_snapshot_reply( // snapshot received by the follower, continue with recovery (*meta)->match_index = _ptr->_last_snapshot_index; (*meta)->next_index = model::next_offset(_ptr->_last_snapshot_index); - (*meta)->last_sent_offset = _ptr->_last_snapshot_index; return close_snapshot_reader(); } @@ -433,7 +445,11 @@ ss::future<> recovery_stm::replicate( _stop_requested = true; return ss::now(); } - meta.value()->last_sent_offset = _last_batch_offset; + /** + * Update follower expected log end. It is equal to the last batch in a set + * of batches read for this recovery round. + */ + meta.value()->expected_log_end_offset = _last_batch_offset; _ptr->update_node_append_timestamp(_node_id); auto seq = _ptr->next_follower_sequence(_node_id); @@ -484,7 +500,7 @@ ss::future<> recovery_stm::replicate( } meta.value()->next_index = std::max( model::offset(0), model::prev_offset(_base_batch_offset)); - meta.value()->last_sent_offset = model::offset{}; + vlog( _ctxlog.trace, "Move next index {} backward", diff --git a/src/v/raft/replicate_entries_stm.cc b/src/v/raft/replicate_entries_stm.cc index d5cfbe59f3645..699d155ab0f03 100644 --- a/src/v/raft/replicate_entries_stm.cc +++ b/src/v/raft/replicate_entries_stm.cc @@ -238,7 +238,8 @@ inline bool replicate_entries_stm::should_skip_follower_request(vnode id) { if (auto it = _ptr->_fstats.find(id); it != _ptr->_fstats.end()) { const auto timeout = clock_type::now() - _ptr->_replicate_append_timeout; - if (it->second.last_received_reply_timestamp < timeout) { + auto& f_meta = it->second; + if (f_meta.last_received_reply_timestamp < timeout) { vlog( _ctxlog.trace, "Skipping sending append request to {} - didn't receive " @@ -246,13 +247,13 @@ inline bool replicate_entries_stm::should_skip_follower_request(vnode id) { id); return true; } - if (it->second.last_sent_offset != _meta.prev_log_index) { + if (f_meta.expected_log_end_offset != _meta.prev_log_index) { vlog( _ctxlog.trace, - "Skipping sending append request to {} - last sent offset: {}, " - "expected follower last offset: {}", + "Skipping sending append request to {} - expected follower log " + "end offset: {}, request expected last offset: {}", id, - it->second.last_sent_offset, + f_meta.expected_log_end_offset, _meta.prev_log_index); return true; } @@ -293,7 +294,7 @@ ss::future> replicate_entries_stm::apply(units_t u) { if (rni != _ptr->self()) { auto it = _ptr->_fstats.find(rni); if (it != _ptr->_fstats.end()) { - it->second.last_sent_offset = _dirty_offset; + it->second.expected_log_end_offset = _dirty_offset; } } ++_requests_count; diff --git a/src/v/raft/types.cc b/src/v/raft/types.cc index 99bf52f742660..4d54537f2496e 100644 --- a/src/v/raft/types.cc +++ b/src/v/raft/types.cc @@ -138,7 +138,7 @@ replicate_stages::replicate_stages(raft::errc ec) void follower_index_metadata::reset() { last_dirty_log_index = model::offset{}; last_flushed_log_index = model::offset{}; - last_sent_offset = model::offset{}; + expected_log_end_offset = model::offset{}; match_index = model::offset{}; next_index = model::offset{}; heartbeats_failed = 0; diff --git a/src/v/raft/types.h b/src/v/raft/types.h index 1f2e916433501..78a49de294f51 100644 --- a/src/v/raft/types.h +++ b/src/v/raft/types.h @@ -100,7 +100,10 @@ struct follower_index_metadata { } // next index to send to this follower model::offset next_index; - model::offset last_sent_offset; + // field indicating end offset of follower log after current pending + // append_entries_requests are successfully delivered and processed by the + // follower. + model::offset expected_log_end_offset; // timestamp of last append_entries_rpc call clock_type::time_point last_sent_append_entries_req_timestamp; clock_type::time_point last_received_reply_timestamp; diff --git a/src/v/redpanda/admin/api-doc/debug.json b/src/v/redpanda/admin/api-doc/debug.json index 3c89f6f99e3f5..b4183f70868ad 100644 --- a/src/v/redpanda/admin/api-doc/debug.json +++ b/src/v/redpanda/admin/api-doc/debug.json @@ -857,9 +857,9 @@ "type": "long", "description": "Next index" }, - "last_sent_offset": { + "expected_log_end_offset": { "type": "long", - "description": "Last sent offset" + "description": "Follower log end offset expected by the leader" }, "heartbeats_failed": { "type": "long", diff --git a/src/v/redpanda/admin_server.cc b/src/v/redpanda/admin_server.cc index a8209305dc036..393df9e04991c 100644 --- a/src/v/redpanda/admin_server.cc +++ b/src/v/redpanda/admin_server.cc @@ -4003,7 +4003,8 @@ void fill_raft_state( follower_state.last_dirty_log_index = f.last_dirty_log_index(); follower_state.match_index = f.match_index(); follower_state.next_index = f.next_index(); - follower_state.last_sent_offset = f.last_sent_offset(); + follower_state.expected_log_end_offset + = f.expected_log_end_offset(); follower_state.heartbeats_failed = f.heartbeats_failed; follower_state.is_learner = f.is_learner; follower_state.ms_since_last_heartbeat = f.ms_since_last_heartbeat; From 5804d29661875cdc67c93d9338984444258f35a2 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Tue, 19 Sep 2023 13:22:59 +0200 Subject: [PATCH 3/3] r/consensus: clamp returned offsets when stale request was received When replying to stale append entries request a request that was already delivered to the follower we must clamp returned dirty offset not to allow Raft group leader to reason about offsets which are not yet know to be matching between leader and followers. This fixes situation in which follower `match_index` may updated before its log actually matches leader. Example: (term,offset) - represent a single entry Leader log: ``` (1,0),(1,1),(1,2),(3,3),(3,4),(3,5) committed_offset: 2 ``` Follower log: ``` (1,0),(1,1),(1,2),(2,3),(2,4) committed_offset: 2 ``` There is a term inconsistency starting at offset `3` If follower would receive an append entries request with prev_log_index=1 prev_log_term=1 The request would result in a successful reply as `prev_log_term` and matches the entry at offset 1, however follower log can not be truncated so the follower will reply with success. The success reply will 'lie' to the leader that the follower log matches leader log. Signed-off-by: Michal Maslanka (cherry picked from commit d0e45c0baf88e2f0c3121a919d2051ef4e12fd8e) --- src/v/raft/consensus.cc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index 07e9fa3e211f5..e0b9ef1eeb384 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -1917,6 +1917,10 @@ consensus::do_append_entries(append_entries_request&& r) { if (request_metadata.prev_log_index < last_log_offset) { if (unlikely(request_metadata.prev_log_index < _commit_index)) { reply.result = append_entries_reply::status::success; + // clamp dirty offset to the current commit index not to allow + // leader reasoning about follower log beyond that point + reply.last_dirty_log_index = _commit_index; + reply.last_flushed_log_index = _commit_index; vlog( _ctxlog.info, "Stale append entries request processed, entry is already "