Skip to content

Commit

Permalink
Merge pull request #24462 from vbotbuildovich/backport-pr-24378-v24.2…
Browse files Browse the repository at this point in the history
….x-590

[v24.2.x] CORE-8394 cluster: consider shard0 reserve in check_cluster_limits
  • Loading branch information
pgellert authored Dec 10, 2024
2 parents 0578f53 + 5fe9620 commit ee0c765
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 25 deletions.
30 changes: 16 additions & 14 deletions src/v/cluster/scheduling/allocation_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,24 +50,26 @@ allocation_node::allocation_node(
});
}

bool allocation_node::is_full(
const model::ntp& ntp, bool will_add_allocation) const {
// Internal topics are excluded from checks to prevent allocation failures
// when creating them. This is okay because they are fairly small in number
// compared to kafka user topic partitions.
bool allocation_node::is_internal_topic(
const config::binding<std::vector<ss::sstring>>& internal_kafka_topics,
model::topic_namespace_view ntp) {
auto is_internal_ns = ntp.ns == model::redpanda_ns
|| ntp.ns == model::kafka_internal_namespace;
if (is_internal_ns) {
return false;
return true;
}
const auto& internal_topics = _internal_kafka_topics();
auto is_internal_topic = ntp.ns == model::kafka_namespace
&& std::any_of(
internal_topics.cbegin(),
internal_topics.cend(),
[&ntp](const ss::sstring& topic) {
return topic == ntp.tp.topic();
});
const auto& internal_topics = internal_kafka_topics();
return ntp.ns == model::kafka_namespace
&& std::any_of(
internal_topics.cbegin(),
internal_topics.cend(),
[&ntp](const ss::sstring& topic) { return topic == ntp.tp; });
}

bool allocation_node::is_full(
const model::ntp& ntp, bool will_add_allocation) const {
auto is_internal_topic = allocation_node::is_internal_topic(
_internal_kafka_topics, model::topic_namespace_view{ntp});

auto count = _allocated_partitions;
if (will_add_allocation) {
Expand Down
7 changes: 7 additions & 0 deletions src/v/cluster/scheduling/allocation_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@ class allocation_node {
}
bool is_full(const model::ntp&, bool will_add_allocation) const;

// Internal topics are excluded from checks to prevent allocation failures
// when creating them. This is okay because they are fairly small in number
// compared to kafka user topic partitions.
static bool is_internal_topic(
const config::binding<std::vector<ss::sstring>>& internal_kafka_topics,
model::topic_namespace_view ntp);

private:
friend allocation_state;

Expand Down
21 changes: 15 additions & 6 deletions src/v/cluster/scheduling/partition_allocator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,16 @@ allocation_constraints partition_allocator::default_constraints() {
* with partitions that cannot be re-accommodated on smaller peers).
*/
std::error_code partition_allocator::check_cluster_limits(
const uint64_t new_partitions_replicas_requested) const {
const uint64_t new_partitions_replicas_requested,
const model::topic_namespace& topic) const {
if (_members.local().nodes().empty()) {
// Empty members table, we're probably running in a unit test
return errc::success;
}
if (allocation_node::is_internal_topic(_internal_kafka_topics, topic)) {
return errc::success;
}

// Calculate how many partition-replicas already exist, so that we can
// check if the new topic would take us past any limits.
uint64_t existent_partitions{0};
Expand Down Expand Up @@ -169,15 +174,19 @@ std::error_code partition_allocator::check_cluster_limits(

// Refuse to create a partition count that would violate the per-core
// limit.
const uint64_t core_limit = (effective_cpu_count * _partitions_per_shard());
const uint64_t core_limit = (effective_cpu_count * _partitions_per_shard())
- (broker_count * _partitions_reserve_shard0());
if (proposed_total_partitions > core_limit) {
vlog(
clusterlog.warn,
"Refusing to create {} partitions as total partition count {} would "
"exceed core limit {}",
"exceed the core-based limit {} (per-shard limit: {}, shard0 "
"reservation: {})",
new_partitions_replicas_requested,
proposed_total_partitions,
effective_cpu_count * _partitions_per_shard());
core_limit,
_partitions_per_shard(),
_partitions_reserve_shard0());
return errc::topic_invalid_partitions_core_limit;
}

Expand Down Expand Up @@ -243,7 +252,7 @@ partition_allocator::allocate(simple_allocation_request simple_req) {
const uint64_t create_count
= static_cast<uint64_t>(simple_req.additional_partitions)
* static_cast<uint64_t>(simple_req.replication_factor);
auto cluster_errc = check_cluster_limits(create_count);
auto cluster_errc = check_cluster_limits(create_count, simple_req.tp_ns);
if (cluster_errc) {
co_return cluster_errc;
}
Expand Down Expand Up @@ -276,7 +285,7 @@ partition_allocator::allocate(allocation_request request) {
}
}

auto cluster_errc = check_cluster_limits(create_count);
auto cluster_errc = check_cluster_limits(create_count, request._nt);
if (cluster_errc) {
co_return cluster_errc;
}
Expand Down
4 changes: 3 additions & 1 deletion src/v/cluster/scheduling/partition_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "cluster/scheduling/types.h"
#include "config/property.h"
#include "features/fwd.h"
#include "model/metadata.h"

namespace cluster {

Expand Down Expand Up @@ -152,7 +153,8 @@ class partition_allocator {
// new_partitions_replicas_requested represents the total number of
// partitions requested by a request. i.e. partitions * replicas requested.
std::error_code check_cluster_limits(
const uint64_t new_partitions_replicas_requested) const;
const uint64_t new_partitions_replicas_requested,
const model::topic_namespace& topic) const;

ss::future<result<allocation_units::pointer>>
do_allocate(allocation_request);
Expand Down
6 changes: 3 additions & 3 deletions src/v/cluster/tests/partition_allocator_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

struct partition_allocator_fixture {
static constexpr uint32_t partitions_per_shard = 1000;
static constexpr uint32_t partitions_reserve_shard0 = 2;

partition_allocator_fixture()
: partition_allocator_fixture(std::nullopt, std::nullopt) {}
Expand Down Expand Up @@ -68,7 +67,7 @@ struct partition_allocator_fixture {
broker.id(),
broker.properties().cores,
config::mock_binding<uint32_t>(uint32_t{partitions_per_shard}),
config::mock_binding<uint32_t>(uint32_t{partitions_reserve_shard0}),
partitions_reserve_shard0.bind(),
kafka_internal_topics.bind()));
}

Expand Down Expand Up @@ -138,6 +137,7 @@ struct partition_allocator_fixture {
cluster::partition_allocator& allocator() { return _allocator.local(); }

config::mock_property<std::vector<ss::sstring>> kafka_internal_topics{{}};
config::mock_property<uint32_t> partitions_reserve_shard0{2};
model::topic_namespace tn{model::kafka_namespace, model::topic{"test"}};
ss::sharded<cluster::members_table> members;
ss::sharded<features::feature_table> features;
Expand All @@ -158,7 +158,7 @@ struct partition_allocator_fixture {
config::mock_binding<std::optional<size_t>>(memory_per_partition),
config::mock_binding<std::optional<int32_t>>(fds_per_partition),
config::mock_binding<uint32_t>(uint32_t{partitions_per_shard}),
config::mock_binding<uint32_t>(uint32_t{partitions_reserve_shard0}),
partitions_reserve_shard0.bind(),
kafka_internal_topics.bind(),
config::mock_binding<bool>(true))
.get();
Expand Down
24 changes: 23 additions & 1 deletion src/v/cluster/tests/partition_allocator_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,28 @@ FIXTURE_TEST(allocation_over_capacity, partition_allocator_fixture) {
allocator().allocate(make_allocation_request(int_2, 1, 1)).get());
}

FIXTURE_TEST(
allocation_over_capacity_without_shard0, partition_allocator_fixture) {
// Disable shard0 reservations
partitions_reserve_shard0.update(0);

register_node(0, 6);
register_node(1, 6);
register_node(2, 6);

saturate_all_machines();
auto gr = allocator().state().last_group_id();
BOOST_REQUIRE(
allocator().allocate(make_allocation_request(1, 1)).get().has_error());
// group id hasn't changed
BOOST_REQUIRE_EQUAL(allocator().state().last_group_id(), gr);

// Make the topic internal and retry, should work.
kafka_internal_topics.update({tn.tp()});
BOOST_REQUIRE(allocator().allocate(make_allocation_request(1, 1)).get());
BOOST_REQUIRE_GT(allocator().state().last_group_id(), gr);
}

FIXTURE_TEST(max_allocation, partition_allocator_fixture) {
register_node(0, 2);
register_node(1, 2);
Expand Down Expand Up @@ -530,7 +552,7 @@ FIXTURE_TEST(updating_nodes_properties, partition_allocator_fixture) {
BOOST_REQUIRE_EQUAL(
it->second->max_capacity(),
10 * partition_allocator_fixture::partitions_per_shard
- partition_allocator_fixture::partitions_reserve_shard0);
- partitions_reserve_shard0());
}

FIXTURE_TEST(change_replication_factor, partition_allocator_fixture) {
Expand Down

0 comments on commit ee0c765

Please sign in to comment.