Skip to content

Commit

Permalink
Merge pull request #13561 from mmaslankaprv/v23.2.x
Browse files Browse the repository at this point in the history
[v23.2.x] Backport of #13495
  • Loading branch information
piyushredpanda authored Sep 20, 2023
2 parents 9963ffc + 5804d29 commit 0ad82db
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 34 deletions.
2 changes: 1 addition & 1 deletion src/v/cluster/cluster_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ partition_raft_state get_partition_raft_state(consensus_ptr ptr) {
state.last_flushed_log_index = md.last_flushed_log_index;
state.match_index = md.match_index;
state.next_index = md.next_index;
state.last_sent_offset = md.last_sent_offset;
state.expected_log_end_offset = md.expected_log_end_offset;
state.heartbeats_failed = md.heartbeats_failed;
state.is_learner = md.is_learner;
state.is_recovering = md.is_recovering;
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -3543,7 +3543,7 @@ struct partition_raft_state
model::offset last_dirty_log_index;
model::offset match_index;
model::offset next_index;
model::offset last_sent_offset;
model::offset expected_log_end_offset;
size_t heartbeats_failed;
bool is_learner;
uint64_t ms_since_last_heartbeat;
Expand All @@ -3560,7 +3560,7 @@ struct partition_raft_state
last_dirty_log_index,
match_index,
next_index,
last_sent_offset,
expected_log_end_offset,
heartbeats_failed,
is_learner,
ms_since_last_heartbeat,
Expand Down
16 changes: 13 additions & 3 deletions src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ consensus::success_reply consensus::update_follower_index(
successfull_append_entries_reply(idx, std::move(reply));
return success_reply::yes;
} else {
idx.last_sent_offset = idx.last_dirty_log_index;
idx.expected_log_end_offset = model::offset{};
}

if (idx.is_recovering) {
Expand All @@ -444,7 +444,6 @@ consensus::success_reply consensus::update_follower_index(
idx.last_dirty_log_index = reply.last_dirty_log_index;
idx.last_flushed_log_index = reply.last_flushed_log_index;
idx.next_index = model::next_offset(idx.last_dirty_log_index);
idx.last_sent_offset = model::offset{};
}
return success_reply::no;
}
Expand Down Expand Up @@ -525,6 +524,13 @@ void consensus::successfull_append_entries_reply(
idx.match_index = idx.last_dirty_log_index;
idx.next_index = model::next_offset(idx.last_dirty_log_index);
idx.last_successful_received_seq = idx.last_received_seq;
/**
* Update expected log end offset only if it is smaller than current value,
* the check is needed here as there might be pending append entries
* requests that were not yet replied by the follower.
*/
idx.expected_log_end_offset = std::max(
idx.last_dirty_log_index, idx.expected_log_end_offset);
vlog(
_ctxlog.trace,
"Updated node {} match {} and next {} indices",
Expand Down Expand Up @@ -559,7 +565,7 @@ void consensus::dispatch_recovery(follower_index_metadata& idx) {
idx.next_index,
log_max_offset);
idx.next_index = log_max_offset;
idx.last_sent_offset = model::offset{};
idx.expected_log_end_offset = model::offset{};
}
idx.is_recovering = true;
// background
Expand Down Expand Up @@ -1911,6 +1917,10 @@ consensus::do_append_entries(append_entries_request&& r) {
if (request_metadata.prev_log_index < last_log_offset) {
if (unlikely(request_metadata.prev_log_index < _commit_index)) {
reply.result = append_entries_reply::status::success;
// clamp dirty offset to the current commit index not to allow
// leader reasoning about follower log beyond that point
reply.last_dirty_log_index = _commit_index;
reply.last_flushed_log_index = _commit_index;
vlog(
_ctxlog.info,
"Stale append entries request processed, entry is already "
Expand Down
38 changes: 22 additions & 16 deletions src/v/raft/recovery_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,13 @@ ss::future<> recovery_stm::do_recover(ss::io_priority_class iopc) {
co_return;
}

// wait for another round
if (meta.value()->last_sent_offset >= lstats.dirty_offset) {
/**
* If expected_log_end_offset is indicating that all the requests were
* already dispatched to the follower wait for append entries responses. The
* responses will trigger the follower state condition variable and
* recovery_stm will redo the check if follower still needs to be recovered.
*/
if (meta.value()->expected_log_end_offset >= lstats.dirty_offset) {
co_await meta.value()
->follower_state_change.wait()
.handle_exception_type([this](const ss::broken_condition_variable&) {
Expand All @@ -159,16 +164,6 @@ ss::future<> recovery_stm::do_recover(ss::io_priority_class iopc) {
meta = get_follower_meta();
}

bool recovery_stm::state_changed() {
auto meta = get_follower_meta();
if (!meta) {
return true;
}
auto lstats = _ptr->_log.offsets();
return lstats.dirty_offset > meta.value()->last_dirty_log_index
|| meta.value()->last_sent_offset == lstats.dirty_offset;
}

flush_after_append
recovery_stm::should_flush(model::offset follower_committed_match_index) const {
constexpr size_t checkpoint_flush_size = 1_MiB;
Expand Down Expand Up @@ -317,7 +312,15 @@ ss::future<> recovery_stm::send_install_snapshot_request() {
.done = (_sent_snapshot_bytes + chunk_size) == _snapshot_size};

_sent_snapshot_bytes += chunk_size;

if (req.done) {
auto meta = get_follower_meta();
if (!meta) {
// stop recovery when node was removed
_stop_requested = true;
return ss::make_ready_future<>();
}
(*meta)->expected_log_end_offset = _ptr->_last_snapshot_index;
}
vlog(_ctxlog.trace, "sending install_snapshot request: {}", req);
auto seq = _ptr->next_follower_sequence(_node_id);
_ptr->update_suppress_heartbeats(
Expand Down Expand Up @@ -378,7 +381,6 @@ ss::future<> recovery_stm::handle_install_snapshot_reply(
// snapshot received by the follower, continue with recovery
(*meta)->match_index = _ptr->_last_snapshot_index;
(*meta)->next_index = model::next_offset(_ptr->_last_snapshot_index);
(*meta)->last_sent_offset = _ptr->_last_snapshot_index;
return close_snapshot_reader();
}

Expand Down Expand Up @@ -443,7 +445,11 @@ ss::future<> recovery_stm::replicate(
_stop_requested = true;
return ss::now();
}
meta.value()->last_sent_offset = _last_batch_offset;
/**
* Update follower expected log end. It is equal to the last batch in a set
* of batches read for this recovery round.
*/
meta.value()->expected_log_end_offset = _last_batch_offset;
_ptr->update_node_append_timestamp(_node_id);

auto seq = _ptr->next_follower_sequence(_node_id);
Expand Down Expand Up @@ -494,7 +500,7 @@ ss::future<> recovery_stm::replicate(
}
meta.value()->next_index = std::max(
model::offset(0), model::prev_offset(_base_batch_offset));
meta.value()->last_sent_offset = model::offset{};

vlog(
_ctxlog.trace,
"Move next index {} backward",
Expand Down
2 changes: 1 addition & 1 deletion src/v/raft/recovery_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class recovery_stm {
ss::future<> handle_install_snapshot_reply(result<install_snapshot_reply>);
ss::future<> open_snapshot_reader();
ss::future<> close_snapshot_reader();
bool state_changed();

bool is_recovery_finished();
flush_after_append should_flush(model::offset) const;
consensus* _ptr;
Expand Down
13 changes: 7 additions & 6 deletions src/v/raft/replicate_entries_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -238,21 +238,22 @@ inline bool replicate_entries_stm::should_skip_follower_request(vnode id) {
if (auto it = _ptr->_fstats.find(id); it != _ptr->_fstats.end()) {
const auto timeout = clock_type::now()
- _ptr->_replicate_append_timeout;
if (it->second.last_received_reply_timestamp < timeout) {
auto& f_meta = it->second;
if (f_meta.last_received_reply_timestamp < timeout) {
vlog(
_ctxlog.trace,
"Skipping sending append request to {} - didn't receive "
"follower heartbeat",
id);
return true;
}
if (it->second.last_sent_offset != _meta.prev_log_index) {
if (f_meta.expected_log_end_offset != _meta.prev_log_index) {
vlog(
_ctxlog.trace,
"Skipping sending append request to {} - last sent offset: {}, "
"expected follower last offset: {}",
"Skipping sending append request to {} - expected follower log "
"end offset: {}, request expected last offset: {}",
id,
it->second.last_sent_offset,
f_meta.expected_log_end_offset,
_meta.prev_log_index);
return true;
}
Expand Down Expand Up @@ -293,7 +294,7 @@ ss::future<result<replicate_result>> replicate_entries_stm::apply(units_t u) {
if (rni != _ptr->self()) {
auto it = _ptr->_fstats.find(rni);
if (it != _ptr->_fstats.end()) {
it->second.last_sent_offset = _dirty_offset;
it->second.expected_log_end_offset = _dirty_offset;
}
}
++_requests_count;
Expand Down
2 changes: 1 addition & 1 deletion src/v/raft/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ replicate_stages::replicate_stages(raft::errc ec)
void follower_index_metadata::reset() {
last_dirty_log_index = model::offset{};
last_flushed_log_index = model::offset{};
last_sent_offset = model::offset{};
expected_log_end_offset = model::offset{};
match_index = model::offset{};
next_index = model::offset{};
heartbeats_failed = 0;
Expand Down
5 changes: 4 additions & 1 deletion src/v/raft/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ struct follower_index_metadata {
}
// next index to send to this follower
model::offset next_index;
model::offset last_sent_offset;
// field indicating end offset of follower log after current pending
// append_entries_requests are successfully delivered and processed by the
// follower.
model::offset expected_log_end_offset;
// timestamp of last append_entries_rpc call
clock_type::time_point last_sent_append_entries_req_timestamp;
clock_type::time_point last_received_reply_timestamp;
Expand Down
4 changes: 2 additions & 2 deletions src/v/redpanda/admin/api-doc/debug.json
Original file line number Diff line number Diff line change
Expand Up @@ -857,9 +857,9 @@
"type": "long",
"description": "Next index"
},
"last_sent_offset": {
"expected_log_end_offset": {
"type": "long",
"description": "Last sent offset"
"description": "Follower log end offset expected by the leader"
},
"heartbeats_failed": {
"type": "long",
Expand Down
3 changes: 2 additions & 1 deletion src/v/redpanda/admin_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4003,7 +4003,8 @@ void fill_raft_state(
follower_state.last_dirty_log_index = f.last_dirty_log_index();
follower_state.match_index = f.match_index();
follower_state.next_index = f.next_index();
follower_state.last_sent_offset = f.last_sent_offset();
follower_state.expected_log_end_offset
= f.expected_log_end_offset();
follower_state.heartbeats_failed = f.heartbeats_failed;
follower_state.is_learner = f.is_learner;
follower_state.ms_since_last_heartbeat = f.ms_since_last_heartbeat;
Expand Down

0 comments on commit 0ad82db

Please sign in to comment.