Skip to content

Commit

Permalink
cluster: use is_cancelled_state where appropriate
Browse files Browse the repository at this point in the history
Previously, we determined the end state of the update with ad-hoc code
each time. This is not reliable, as we can miss some state values (and
indeed, the code for updating replicas revisions missed the case for
force_update). Use is_cancelled_state predicate everywhere instead.

(cherry picked from commit 69fd894)
  • Loading branch information
ztlpn committed Nov 23, 2023
1 parent 04ddecb commit aa411b2
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 30 deletions.
2 changes: 1 addition & 1 deletion src/v/cluster/partition_balancer_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ partition_balancer_state::apply_snapshot(const controller_snapshot& snap) {

if (auto it = topic.updates.find(p_id); it != topic.updates.end()) {
const auto& update = it->second;
if (update.state == reconfiguration_state::in_progress) {
if (!is_cancelled_state(update.state)) {
replicas = &update.target_assignment;
}
}
Expand Down
11 changes: 3 additions & 8 deletions src/v/cluster/scheduling/partition_allocator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -498,15 +498,10 @@ partition_allocator::apply_snapshot(const controller_snapshot& snap) {
}

// final counts depend on the update state
switch (update.state) {
case reconfiguration_state::in_progress:
case reconfiguration_state::force_update:
final_replicas = &update.target_assignment;
break;
case reconfiguration_state::cancelled:
case reconfiguration_state::force_cancelled:
if (is_cancelled_state(update.state)) {
final_replicas = &partition.replicas;
break;
} else {
final_replicas = &update.target_assignment;
}
} else {
final_replicas = &partition.replicas;
Expand Down
10 changes: 3 additions & 7 deletions src/v/cluster/topic_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ topic_table::apply(finish_moving_partition_replicas_cmd cmd, model::offset o) {
return ss::make_ready_future<std::error_code>(
errc::partition_not_exists);
}
if (it->second.get_state() == reconfiguration_state::in_progress) {
if (!is_cancelled_state(it->second.get_state())) {
// update went through and the cancellation didn't happen, we must
// update replicas_revisions.
p_meta_it->second.replicas_revisions = update_replicas_revisions(
Expand Down Expand Up @@ -468,9 +468,7 @@ topic_table::apply(revert_cancel_partition_move_cmd cmd, model::offset o) {
if (in_progress_it == _updates_in_progress.end()) {
co_return errc::no_update_in_progress;
}
if (
in_progress_it->second.get_state()
== reconfiguration_state::in_progress) {
if (!is_cancelled_state(in_progress_it->second.get_state())) {
co_return errc::no_update_in_progress;
}

Expand Down Expand Up @@ -991,9 +989,7 @@ class topic_table::snapshot_applier {
update_it != topic.updates.end()) {
const auto& update = update_it->second;

if (
update.state == reconfiguration_state::in_progress
|| update.state == reconfiguration_state::force_update) {
if (!is_cancelled_state(update.state)) {
cur_assignment.replicas = update_it->second.target_assignment;
}

Expand Down
8 changes: 2 additions & 6 deletions src/v/cluster/topic_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,7 @@ class topic_table {

~in_progress_update() {
_probe.handle_update_finish(_previous_replicas, _target_replicas);
if (
_state == reconfiguration_state::cancelled
|| _state == reconfiguration_state::force_cancelled) {
if (is_cancelled_state(_state)) {
_probe.handle_update_cancel_finish(
_previous_replicas, _target_replicas);
;
Expand All @@ -149,9 +147,7 @@ class topic_table {
const reconfiguration_state& get_state() const { return _state; }

void set_state(reconfiguration_state state, model::revision_id rev) {
if (
_state == reconfiguration_state::in_progress
&& (state == reconfiguration_state::cancelled || state == reconfiguration_state::force_cancelled)) {
if (!is_cancelled_state(_state) && is_cancelled_state(state)) {
_probe.handle_update_cancel(
_previous_replicas, _target_replicas);
}
Expand Down
11 changes: 3 additions & 8 deletions src/v/cluster/topic_updates_dispatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -459,15 +459,10 @@ topic_updates_dispatcher::collect_in_progress(
continue;
}
const auto state = it->second.get_state();
if (state == reconfiguration_state::in_progress) {
in_progress[p.id] = it->second.get_previous_replicas();
} else {
vassert(
state == reconfiguration_state::cancelled
|| state == reconfiguration_state::force_cancelled,
"Invalid reconfiguration state: {}",
state);
if (is_cancelled_state(state)) {
in_progress[p.id] = it->second.get_target_replicas();
} else {
in_progress[p.id] = it->second.get_previous_replicas();
}
}
return in_progress;
Expand Down

0 comments on commit aa411b2

Please sign in to comment.