Skip to content

Commit

Permalink
Merge pull request #18498 from ztlpn/v23.2.x-bp
Browse files Browse the repository at this point in the history
[v23.2.x] Fix some concurrent memory access problems in partition balancer
  • Loading branch information
ztlpn authored May 22, 2024
2 parents c827fab + 9409311 commit 9e4bddc
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 24 deletions.
21 changes: 20 additions & 1 deletion src/v/cluster/scheduling/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include "cluster/logger.h"
#include "cluster/scheduling/allocation_state.h"
#include "utils/exceptions.h"
#include "utils/to_string.h"

#include <fmt/ostream.h>
Expand Down Expand Up @@ -76,6 +77,9 @@ allocation_units::allocation_units(

allocation_units::~allocation_units() {
oncore_debug_verify(_oncore);
if (unlikely(!_state)) {
return;
}
for (auto& pas : _assignments) {
for (auto& replica : pas.replicas) {
_state->remove_allocation(replica, _domain);
Expand All @@ -96,6 +100,11 @@ allocated_partition::allocated_partition(

std::optional<allocated_partition::previous_replica>
allocated_partition::prepare_move(model::node_id prev_node) {
if (unlikely(!_state)) {
throw concurrent_modification_error(
"allocation_state was concurrently replaced");
}

previous_replica prev;
auto it = std::find_if(
_replicas.begin(), _replicas.end(), [prev_node](const auto& bs) {
Expand Down Expand Up @@ -149,6 +158,11 @@ allocated_partition::prepare_move(model::node_id prev_node) {

model::broker_shard allocated_partition::add_replica(
model::node_id node, const std::optional<previous_replica>& prev) {
if (unlikely(!_state)) {
throw concurrent_modification_error(
"allocation_state was concurrently replaced");
}

if (!_original_node2shard) {
_original_node2shard.emplace();
for (const auto& bs : _replicas) {
Expand Down Expand Up @@ -225,7 +239,12 @@ bool allocated_partition::is_original(model::node_id node) const {
}

errc allocated_partition::try_revert(const reallocation_step& step) {
if (!_original_node2shard || !_state) {
if (unlikely(!_state)) {
throw concurrent_modification_error(
"allocation_state was concurrently replaced");
}

if (!_original_node2shard) {
return errc::no_update_in_progress;
}

Expand Down
31 changes: 24 additions & 7 deletions src/v/cluster/topic_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ topic_table::apply(finish_moving_partition_replicas_cmd cmd, model::offset o) {

_updates_in_progress.erase(it);

_topics_map_revision++;

partition_assignment delta_assignment{
current_assignment_it->group,
current_assignment_it->id,
Expand Down Expand Up @@ -406,6 +408,8 @@ topic_table::apply(cancel_moving_partition_replicas_cmd cmd, model::offset o) {
current_assignment_it->replicas
= in_progress_it->second.get_previous_replicas();

_topics_map_revision++;

/**
* Cancel/force abort delta contains two assignments new_assignment is set
* to the one the partition is currently being moved from. Previous
Expand Down Expand Up @@ -471,6 +475,11 @@ topic_table::apply(revert_cancel_partition_move_cmd cmd, model::offset o) {
co_return errc::no_update_in_progress;
}

auto p_meta_it = tp->second.partitions.find(ntp.tp.partition);
if (p_meta_it == tp->second.partitions.end()) {
co_return errc::partition_not_exists;
}

// revert replica set update
current_assignment_it->replicas
= in_progress_it->second.get_target_replicas();
Expand All @@ -481,11 +490,7 @@ topic_table::apply(revert_cancel_partition_move_cmd cmd, model::offset o) {
current_assignment_it->replicas,
};

// update partition_meta object
auto p_meta_it = tp->second.partitions.find(ntp.tp.partition);
if (p_meta_it == tp->second.partitions.end()) {
co_return errc::partition_not_exists;
}
// update partition_meta object:
// the cancellation was reverted and update went through, we must
// update replicas_revisions.
p_meta_it->second.replicas_revisions = update_replicas_revisions(
Expand All @@ -497,6 +502,8 @@ topic_table::apply(revert_cancel_partition_move_cmd cmd, model::offset o) {
/// Since the update is already finished we drop in_progress state
_updates_in_progress.erase(in_progress_it);

_topics_map_revision++;

// notify backend about finished update
_pending_deltas.emplace_back(
ntp, std::move(delta_assignment), o, delta::op_type::update_finished);
Expand Down Expand Up @@ -832,12 +839,14 @@ class topic_table::snapshot_applier {
updates_t& _updates_in_progress;
fragmented_vector<delta>& _pending_deltas;
topic_table_probe& _probe;
model::revision_id& _topics_map_revision;

public:
explicit snapshot_applier(topic_table& parent)
: _updates_in_progress(parent._updates_in_progress)
, _pending_deltas(parent._pending_deltas)
, _probe(parent._probe) {}
, _probe(parent._probe)
, _topics_map_revision(parent._topics_map_revision) {}

void delete_ntp(
const model::topic_namespace& ns_tp,
Expand All @@ -846,7 +855,9 @@ class topic_table::snapshot_applier {
auto ntp = model::ntp(ns_tp.ns, ns_tp.tp, p_as.id);
vlog(
clusterlog.trace, "deleting ntp {} not in controller snapshot", ntp);
_updates_in_progress.erase(ntp);
if (_updates_in_progress.erase(ntp)) {
_topics_map_revision++;
};

_pending_deltas.emplace_back(
std::move(ntp),
Expand Down Expand Up @@ -883,6 +894,9 @@ class topic_table::snapshot_applier {
vlog(clusterlog.trace, "adding ntp {} from controller snapshot", ntp);
size_t pending_deltas_start_idx = _pending_deltas.size();

// we are going to modify md_item so increment the revision right away.
_topics_map_revision++;

const model::partition_id p_id = ntp.tp.partition;

// 1. reconcile the _topics state (the md_item object) and generate
Expand Down Expand Up @@ -1121,6 +1135,7 @@ ss::future<> topic_table::apply_snapshot(
co_await applier.delete_topic(
ns_tp, md_item, topic_snapshot.metadata.revision);
md_item = co_await applier.create_topic(ns_tp, topic_snapshot);
_topics_map_revision++;
} else {
// The topic was present in the previous set, now we need to
// reconcile individual partitions.
Expand Down Expand Up @@ -1156,6 +1171,7 @@ ss::future<> topic_table::apply_snapshot(
if (!topic_snapshot.partitions.contains(as_it_copy->id)) {
applier.delete_ntp(ns_tp, *as_it_copy, snap_revision);
md_item.get_assignments().erase(as_it_copy);
_topics_map_revision++;
}
co_await ss::coroutine::maybe_yield();
}
Expand Down Expand Up @@ -1513,6 +1529,7 @@ void topic_table::change_partition_replicas(
auto previous_assignment = current_assignment.replicas;
// replace partition replica set
current_assignment.replicas = new_assignment;
_topics_map_revision++;

// calculate deleta for backend

Expand Down
23 changes: 12 additions & 11 deletions src/v/cluster/topic_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,21 +96,17 @@ class topic_table {
// * partition::get_revision_id()
// * raft::group_configuration::revision_id()

class concurrent_modification_error final : public std::exception {
class concurrent_modification_error final
: public ::concurrent_modification_error {
public:
concurrent_modification_error(
model::revision_id initial_revision,
model::revision_id current_revision)
: _msg(ssx::sformat(
"Topic table was modified by concurrent fiber. (initial_revision: "
"{}, current_revision: {}) ",
: ::concurrent_modification_error(ssx::sformat(
"Topic table was modified by concurrent fiber. "
"(initial_revision: {}, current_revision: {}) ",
initial_revision,
current_revision)) {}

const char* what() const noexcept final { return _msg.c_str(); }

private:
ss::sstring _msg;
};

class in_progress_update {
Expand Down Expand Up @@ -532,8 +528,13 @@ class topic_table {

updates_t _updates_in_progress;
model::revision_id _last_applied_revision_id;
// Monotonic counter that is bumped for every addition/deletion to topics
// map. Unlike other revisions this does not correspond to the command

// Monotonic counter that is bumped each time _topics, _disabled_partitions,
// or _updates_in_progress are modified in a way that makes iteration over
// them unsafe (i.e. invalidates iterators or references, including
// for nested collections like partition sets and replica sets).
//
// Unlike other revisions this does not correspond to the command
// revision that updated the map.
model::revision_id _topics_map_revision{0};

Expand Down
33 changes: 33 additions & 0 deletions src/v/utils/exceptions.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/
#pragma once

#include "seastarx.h"

#include <seastar/core/sstring.hh>

#include <stdexcept>

/// Some objects reference state that changes comparatively rarely (e.g.
/// topic_table state) across yield points and expect these references to remain
/// valid. In case these references are invalidated by a concurrent fiber, this
/// exception is thrown. This is a signal for the caller to restart the
/// computation with up-to-date state.
class concurrent_modification_error : public std::exception {
public:
explicit concurrent_modification_error(ss::sstring s)
: _msg(std::move(s)) {}

const char* what() const noexcept override { return _msg.c_str(); }

private:
ss::sstring _msg;
};
10 changes: 5 additions & 5 deletions src/v/utils/stable_iterator_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,20 @@
#pragma once

#include "seastarx.h"
#include "utils/exceptions.h"

#include <seastar/util/noncopyable_function.hh>

#include <boost/iterator/iterator_adaptor.hpp>
#include <fmt/format.h>

#include <stdexcept>
#include <string_view>
#include <version>

class iterator_stability_violation : public std::runtime_error {
class iterator_stability_violation final
: public concurrent_modification_error {
public:
explicit iterator_stability_violation(const std::string& why)
: std::runtime_error(why){};
explicit iterator_stability_violation(ss::sstring why)
: concurrent_modification_error(std::move(why)){};
};

/*
Expand Down

0 comments on commit 9e4bddc

Please sign in to comment.