Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix stepping down on timeout #24590

Open
wants to merge 7 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/v/compat/raft_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ struct instance_generator<raft::append_entries_reply> {
{raft::reply_result::success,
raft::reply_result::failure,
raft::reply_result::group_unavailable,
raft::reply_result::timeout}),
raft::reply_result::follower_busy}),
.may_recover = tests::random_bool(),
};
}
Expand Down
2 changes: 1 addition & 1 deletion src/v/compat/raft_json.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ inline void read_value(const json::Value& rd, raft::append_entries_reply& out) {
obj.result = raft::reply_result::group_unavailable;
break;
case 3:
obj.result = raft::reply_result::timeout;
obj.result = raft::reply_result::follower_busy;
break;
default:
vassert(false, "invalid result {}", result);
Expand Down
13 changes: 7 additions & 6 deletions src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,13 @@ consensus::success_reply consensus::update_follower_index(
// current node may change it.
return success_reply::yes;
}

if (unlikely(r.value().result == reply_result::follower_busy)) {
// ignore this response, timed out on the receiver node
vlog(_ctxlog.trace, "Follower busy on node {}", node.id());
return success_reply::no;
}
mmaslankaprv marked this conversation as resolved.
Show resolved Hide resolved

const auto& config = _configuration_manager.get_latest();
if (!config.contains(node)) {
// We might have sent an append_entries just before removing
Expand Down Expand Up @@ -373,12 +380,6 @@ consensus::success_reply consensus::update_follower_index(
physical_node);
return success_reply::no;
}

if (unlikely(reply.result == reply_result::timeout)) {
// ignore this response, timed out on the receiver node
vlog(_ctxlog.trace, "Append entries request timedout at node {}", node);
return success_reply::no;
}
if (unlikely(reply.result == reply_result::group_unavailable)) {
// ignore this response since group is not yet bootstrapped at the
// follower
Expand Down
2 changes: 1 addition & 1 deletion src/v/raft/fundamental.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ enum class reply_result : uint8_t {
success,
failure,
group_unavailable,
timeout
follower_busy
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I a bit confused, is the timeout case no longer possible?

Don't we have two cases at least: follow replies immediately with "busy", and also follower never replies?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I guess the timeout case never ends using the reply_result, it will be handled in a different path: these codes are only for cases where an RPC was actually received, right?

};

/**
Expand Down
29 changes: 18 additions & 11 deletions src/v/raft/heartbeat_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -373,14 +373,18 @@ void heartbeat_manager::process_reply(
n);
return;
}

if (unlikely(result == reply_result::timeout)) {
/**
* This is here for completeness, it should never be triggered as the
* follower do not reply with busy error code when processing
* lightweight heartbeats
*/
if (unlikely(result == reply_result::follower_busy)) {
vlog(
hbeatlog.debug,
"Heartbeat request for group {} timed out on the node {}",
hbeatlog.error,
"Follower reported busy for group {} on node {} when processing "
"lightweight heartbeat",
group,
n);
return;
}
if (unlikely(target != consensus->self().id())) {
vlog(
Expand Down Expand Up @@ -438,14 +442,17 @@ void heartbeat_manager::process_reply(
n);
continue;
}

if (unlikely(m.result == reply_result::timeout)) {
/**
* Follower being busy is updating the last received reply timestamp as
* it is indicating the receiving replica is alive and is able to
* process request, it may simply be slow and its oplock is contended.
*/
if (unlikely(m.result == reply_result::follower_busy)) {
vlog(
hbeatlog.debug,
"Heartbeat request for group {} timed out on the node {}",
hbeatlog.trace,
"Follower busy when processing full heartbeat for group {} on {}",
m.group,
n);
continue;
}

if (unlikely(reply.target() != consensus->self().id())) {
Expand Down Expand Up @@ -555,7 +562,7 @@ void heartbeat_manager::process_reply(
continue;
}

if (unlikely(m.result == reply_result::timeout)) {
if (unlikely(m.result == reply_result::follower_busy)) {
vlog(
hbeatlog.debug,
"Heartbeat request for group {} timed out on the node {}",
Expand Down
12 changes: 6 additions & 6 deletions src/v/raft/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ class service final : public raftgen_service {
return ss::with_timeout(timeout, std::move(f))
.handle_exception_type([group](const ss::timed_out_error&) {
return append_entries_reply{
.group = group, .result = reply_result::timeout};
.group = group, .result = reply_result::follower_busy};
});
});

Expand Down Expand Up @@ -447,11 +447,11 @@ class service final : public raftgen_service {
auto f = dispatch_full_heartbeat(
source_node, target_node, m, full_hb);
f = ss::with_timeout(timeout, std::move(f))
.handle_exception_type(
[group = full_hb.group](const ss::timed_out_error&) {
return full_heartbeat_reply{
.group = group, .result = reply_result::timeout};
});
.handle_exception_type([group = full_hb.group](
const ss::timed_out_error&) {
return full_heartbeat_reply{
.group = group, .result = reply_result::follower_busy};
});
futures.push_back(std::move(f));
}

Expand Down
17 changes: 17 additions & 0 deletions src/v/raft/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,22 @@ redpanda_test_cc_library(
],
)

redpanda_test_cc_library(
name = "failure_injectable_log",
srcs = ["failure_injectable_log.cc"],
hdrs = [
"failure_injectable_log.h",
],
include_prefix = "raft/tests",
visibility = ["//visibility:public"],
deps = [
"//src/v/base",
"//src/v/storage",
"//src/v/test_utils:gtest",
"@seastar",
],
)

redpanda_test_cc_library(
name = "raft_fixture",
srcs = ["raft_fixture.cc"],
Expand All @@ -81,6 +97,7 @@ redpanda_test_cc_library(
include_prefix = "raft/tests",
visibility = ["//visibility:public"],
deps = [
":failure_injectable_log",
"//src/v/base",
"//src/v/bytes:iobuf",
"//src/v/bytes:iobuf_parser",
Expand Down
14 changes: 8 additions & 6 deletions src/v/raft/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,14 @@ rp_test(
ARGS "-- -c 8"
)


v_cc_library(
NAME raft_fixture
SRCS raft_fixture.cc
failure_injectable_log.cc
DEPS Seastar::seastar v::raft v::gtest_main)

set(gsrcs
raft_fixture.cc
basic_raft_fixture_test.cc
stm_manager_test.cc
raft_reconfiguration_test.cc
Expand All @@ -53,7 +59,7 @@ rp_test(
TIMEOUT 2000
BINARY_NAME gtest_raft
SOURCES ${gsrcs}
LIBRARIES v::raft v::storage_test_utils v::model_test_utils v::features v::gtest_main
LIBRARIES v::raft v::storage_test_utils v::model_test_utils v::features v::gtest_main v::raft_fixture
LABELS raft
ARGS "-- -c 8 -m 4G"
)
Expand All @@ -68,7 +74,3 @@ rp_test(
LABELS raft
)

v_cc_library(
NAME raft_fixture
SRCS raft_fixture.cc
DEPS Seastar::seastar v::raft v::gtest_main)
26 changes: 26 additions & 0 deletions src/v/raft/tests/basic_raft_fixture_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -836,3 +836,29 @@ TEST_F_CORO(raft_fixture, leadership_transfer_delay) {
ASSERT_LE_CORO(election_time * 1.0, transfer_time * tolerance_multiplier);
ASSERT_GE_CORO(election_time * 1.0, transfer_time / tolerance_multiplier);
}

TEST_F_CORO(raft_fixture, test_no_stepdown_on_append_entries_timeout) {
config::shard_local_cfg().replicate_append_timeout_ms.set_value(1s);
co_await create_simple_group(3);
auto leader_id = co_await wait_for_leader(10s);
for (auto& [id, n] : nodes()) {
if (id != leader_id) {
n->f_injectable_log()->set_append_delay([]() { return 5s; });
}
}

auto& leader_node = node(leader_id);
auto term_before = leader_node.raft()->term();
auto r = co_await leader_node.raft()->replicate(
make_batches(1, 10, 128),
replicate_options(consistency_level::quorum_ack, 10s));
ASSERT_FALSE_CORO(r.has_error());
for (auto& [_, n] : nodes()) {
n->f_injectable_log()->set_append_delay(std::nullopt);
}

leader_id = co_await wait_for_leader(10s);
auto& new_leader_node = node(leader_id);
ASSERT_EQ_CORO(term_before, new_leader_node.raft()->term());
ASSERT_TRUE_CORO(new_leader_node.raft()->is_leader());
}
Loading
Loading