From 419c29b80b559ae093f22b74355ebd2fe184e924 Mon Sep 17 00:00:00 2001 From: Stephan Dollberg Date: Fri, 6 Dec 2024 10:56:49 +0000 Subject: [PATCH] partition_allocator: Make topic_memory_per_partition memory group aware Previously `topic_memory_per_partition` was more a rough guess whose value was way too large. This was partly a result of not having a better idea but also because obviously not all memory can be used by partitions. After some analysis via metrics and the memory profiler we now have a better idea of what the real value is. Hence, we aggressively change the value down. At the same time we make the partition_allocator memory limit check memory group aware. It will no longer compare `topic_memory_per_partition` against the total memory as if partitions could use all that memory. Instead it now compares it against the memory reserved for partitions via using the memory groups. We use some heuristics (which are better explained in the code comment) to try to guard against cases where we would make partition density worse. The new defaults are: - topic_memory_per_partition: 200KiB (this already assumes some to be merged optimizations in the seastar metrics stack). It's still fairly conservative. Probably more like 150KiB. - topic_partitions_max_memory_allocation_share: 10% - together with the above this effectively gives us twice the partition density compared to the old calculation and using 4MiB for TMPP --- src/v/cluster/BUILD | 14 ++ .../cluster/scheduling/partition_allocator.cc | 91 ++++++++++--- .../topic_memory_per_partition_default.h | 32 +++++ src/v/cluster/tests/BUILD | 4 + .../tests/partition_allocator_fixture.h | 23 ++-- .../tests/partition_allocator_tests.cc | 124 ++++++++++++++++-- .../partition_balancer_planner_fixture.h | 4 +- src/v/cluster/tests/topic_table_fixture.h | 4 +- src/v/config/BUILD | 3 + src/v/config/configuration.cc | 9 +- tests/rptest/tests/resource_limits_test.py | 2 +- 11 files changed, 270 insertions(+), 40 deletions(-) create mode 100644 src/v/cluster/scheduling/topic_memory_per_partition_default.h diff --git a/src/v/cluster/BUILD b/src/v/cluster/BUILD index 6399a3740bcba..64601a25b6f36 100644 --- a/src/v/cluster/BUILD +++ b/src/v/cluster/BUILD @@ -215,6 +215,18 @@ redpanda_cc_library( ], ) +redpanda_cc_library( + name = "topic_memory_per_partition_default", + hdrs = [ + "scheduling/topic_memory_per_partition_default.h", + ], + include_prefix = "cluster", + visibility = ["//visibility:public"], + deps = [ + "//src/v/base", + ], +) + # TODO the following headers are the only headers in redpanda which are excluded # from clang-tidy. if you remove the exclusion then you'll observe a tangled web # of header dependencies. after many hours those have not yet been resolved, so @@ -617,7 +629,9 @@ redpanda_cc_library( "version.h", ], implementation_deps = [ + ":topic_memory_per_partition_default", "//src/v/features:enterprise_feature_messages", + "//src/v/resource_mgmt:memory_groups", ], include_prefix = "cluster", visibility = ["//visibility:public"], diff --git a/src/v/cluster/scheduling/partition_allocator.cc b/src/v/cluster/scheduling/partition_allocator.cc index 2f4a4519b31fd..2a140dc558690 100644 --- a/src/v/cluster/scheduling/partition_allocator.cc +++ b/src/v/cluster/scheduling/partition_allocator.cc @@ -15,12 +15,14 @@ #include "cluster/logger.h" #include "cluster/members_table.h" #include "cluster/scheduling/constraints.h" +#include "cluster/scheduling/topic_memory_per_partition_default.h" #include "cluster/scheduling/types.h" #include "cluster/types.h" #include "config/configuration.h" #include "features/feature_table.h" #include "model/metadata.h" #include "random/generators.h" +#include "resource_mgmt/memory_groups.h" #include "ssx/async_algorithm.h" #include "utils/human.h" @@ -79,6 +81,52 @@ allocation_constraints partition_allocator::default_constraints() { return req; } +bool guess_is_memory_group_aware_memory_limit() { + // Here we are trying to guess whether `topic_memory_per_partition` is + // memory group aware. Originally the property was more like a guess of how + // much memory each partition would statically use. It was first set to 10 + // MiB and later reduced to 4MiB. Both of those weren't really factual and + // rather set such that they about align with `topic_partitions_per_shard`. + // + // These days we have a better understanding of how much memory a partition + // actually uses at rest based on memory profiler and metrics analysis. + // Hence we now set it to a more realistic value in the XXXKiB range. + // + // At the same time we also made the memory_groups reserve a percentage of + // the total memory space for partitions. Instead of dividing the whole + // memory space by `topic_memory_per_partition` we now only do that with the + // reserved value. + // + // Because of the latter making this change isn't trivial. Users might have + // lowered the `topic_memory_per_partition` property to gain higher + // partition density. With only using the reserved memory space when + // checking memory limits this might lead to now actually having less + // partition density. Hence, we need to guess whether the property was set + // before this change or after. If it was set before we just use the old + // rules (while still reserving a part of the memory) when checking memory + // limits to not break existing behaviour. + // + // Note that if we get this guess wrong that isn't immediately fatal. This + // only becomes active when creating new topics or modifying existing + // topics. At that point users can increase the memory reservation if + // needed. + + // not overriden so definitely memory group aware + if (!config::shard_local_cfg().topic_memory_per_partition.is_overriden()) { + return true; + } + + auto value = config::shard_local_cfg().topic_memory_per_partition.value(); + + // Previous default was 4MiB (and even higher before that). New one is more + // than 10x smaller. We assume it's unlikely someone would have changed the + // value to be that much smaller and hence guess that all the values larger + // than 2 times the new default are old values. Equally in the new world + // it's unlikely anybody would increase the value (at all really). + + return value < 2 * ORIGINAL_MEMORY_GROUP_AWARE_TMPP; +} + std::error_code partition_allocator::check_memory_limits( const uint64_t new_partitions_replicas_requested, uint64_t proposed_total_partitions, @@ -88,23 +136,32 @@ std::error_code partition_allocator::check_memory_limits( auto memory_per_partition_replica = config::shard_local_cfg().topic_memory_per_partition(); - if ( - memory_per_partition_replica.has_value() - && memory_per_partition_replica.value() > 0) { - const uint64_t memory_limit = effective_cluster_memory - / memory_per_partition_replica.value(); - - if (proposed_total_partitions > memory_limit) { - vlog( - clusterlog.warn, - "Refusing to create {} new partitions as total partition count " - "{} " - "would exceed memory limit {}", - new_partitions_replicas_requested, - proposed_total_partitions, - memory_limit); - return errc::topic_invalid_partitions_memory_limit; - } + if (!memory_per_partition_replica.has_value()) { + return errc::success; + } + + bool is_memory_aware = guess_is_memory_group_aware_memory_limit(); + + uint64_t memory_limit; + if (is_memory_aware) { + memory_limit = effective_cluster_memory + * memory_groups().partitions_max_memory_share() + / memory_per_partition_replica.value(); + } else { + memory_limit = effective_cluster_memory + / memory_per_partition_replica.value(); + } + + if (proposed_total_partitions > memory_limit) { + vlog( + clusterlog.warn, + "Refusing to create {} new partitions as total partition count " + "{} " + "would exceed memory limit {}", + new_partitions_replicas_requested, + proposed_total_partitions, + memory_limit); + return errc::topic_invalid_partitions_memory_limit; } return errc::success; diff --git a/src/v/cluster/scheduling/topic_memory_per_partition_default.h b/src/v/cluster/scheduling/topic_memory_per_partition_default.h new file mode 100644 index 0000000000000..f0cb3800306ff --- /dev/null +++ b/src/v/cluster/scheduling/topic_memory_per_partition_default.h @@ -0,0 +1,32 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ +#pragma once + +#include "base/units.h" + +#include + +namespace cluster { + +// Default value for `topic_memory_per_partition`. In a constant here such that +// it can easier be referred to. +inline constexpr size_t DEFAULT_TOPIC_MEMORY_PER_PARTITION = 200_KiB; + +// DO NOT CHANGE +// +// Original default for `topic_memory_per_partition` when we made it memory +// group aware. This property is used for heuristics to determine if the user +// had overridden the default when it was non-mmemory group aware. Hence, this +// value should NOT be changed and stay the same even if the (above) default is +// changed. +inline constexpr size_t ORIGINAL_MEMORY_GROUP_AWARE_TMPP = 200_KiB; + +} // namespace cluster diff --git a/src/v/cluster/tests/BUILD b/src/v/cluster/tests/BUILD index ae3d8bfe3dcc5..5b9427b3fb1ac 100644 --- a/src/v/cluster/tests/BUILD +++ b/src/v/cluster/tests/BUILD @@ -323,13 +323,17 @@ redpanda_cc_btest( "partition_allocator_tests.cc", ], deps = [ + "//src/v/base", "//src/v/cluster", + "//src/v/cluster:topic_memory_per_partition_default", "//src/v/cluster/tests:partition_allocator_fixture", + "//src/v/config", "//src/v/model", "//src/v/raft", "//src/v/raft:fundamental", "//src/v/random:fast_prng", "//src/v/random:generators", + "//src/v/resource_mgmt:memory_groups", "//src/v/test_utils:seastar_boost", "@abseil-cpp//absl/container:flat_hash_set", "@boost//:test", diff --git a/src/v/cluster/tests/partition_allocator_fixture.h b/src/v/cluster/tests/partition_allocator_fixture.h index 2e815ade14050..7ff9ef120804c 100644 --- a/src/v/cluster/tests/partition_allocator_fixture.h +++ b/src/v/cluster/tests/partition_allocator_fixture.h @@ -28,13 +28,14 @@ #include +#include #include struct partition_allocator_fixture { - static constexpr uint32_t partitions_per_shard = 1000; + static constexpr uint32_t gb_per_core = 5; partition_allocator_fixture() - : partition_allocator_fixture(std::nullopt, std::nullopt) {} + : partition_allocator_fixture(std::nullopt, std::nullopt, 1000) {} ~partition_allocator_fixture() { _allocator.stop().get(); @@ -53,7 +54,7 @@ struct partition_allocator_fixture { std::move(rack), model::broker_properties{ .cores = core_count, - .available_memory_gb = 5 * core_count, + .available_memory_gb = gb_per_core * core_count, .available_disk_gb = 10 * core_count}); auto ec = members.local().apply( @@ -142,27 +143,32 @@ struct partition_allocator_fixture { fast_prng prng; + uint32_t partitions_per_shard; + protected: explicit partition_allocator_fixture( std::optional memory_per_partition, - std::optional fds_per_partition) { + std::optional fds_per_partition, + uint32_t partitions_per_shard) + : partitions_per_shard(partitions_per_shard) { members.start().get(); features.start().get(); _allocator .start_single( std::ref(members), std::ref(features), - config::mock_binding>(memory_per_partition), config::mock_binding>(fds_per_partition), config::mock_binding(uint32_t{partitions_per_shard}), partitions_reserve_shard0.bind(), kafka_internal_topics.bind(), config::mock_binding(true)) .get(); - ss::smp::invoke_on_all([] { + ss::smp::invoke_on_all([memory_per_partition] { config::shard_local_cfg() .get("partition_autobalancing_mode") .set_value(model::partition_autobalancing_mode::node_add); + config::shard_local_cfg().topic_memory_per_partition.set_value( + memory_per_partition); }).get(); } }; @@ -172,7 +178,8 @@ struct partition_allocator_memory_limited_fixture static constexpr size_t memory_per_partition = std::numeric_limits::max(); partition_allocator_memory_limited_fixture() - : partition_allocator_fixture(memory_per_partition, std::nullopt) {} + : partition_allocator_fixture(memory_per_partition, std::nullopt, 10000) { + } }; struct partition_allocator_fd_limited_fixture @@ -181,5 +188,5 @@ struct partition_allocator_fd_limited_fixture = std::numeric_limits::max(); partition_allocator_fd_limited_fixture() - : partition_allocator_fixture(std::nullopt, fds_per_partition) {} + : partition_allocator_fixture(std::nullopt, fds_per_partition, 10000) {} }; diff --git a/src/v/cluster/tests/partition_allocator_tests.cc b/src/v/cluster/tests/partition_allocator_tests.cc index 8ef3382213671..bdb7d43b6130f 100644 --- a/src/v/cluster/tests/partition_allocator_tests.cc +++ b/src/v/cluster/tests/partition_allocator_tests.cc @@ -7,15 +7,19 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0 +#include "base/units.h" #include "cluster/cluster_utils.h" #include "cluster/scheduling/allocation_node.h" #include "cluster/scheduling/constraints.h" +#include "cluster/scheduling/topic_memory_per_partition_default.h" #include "cluster/scheduling/types.h" #include "cluster/tests/partition_allocator_fixture.h" +#include "config/configuration.h" #include "model/metadata.h" #include "raft/fundamental.h" #include "random/fast_prng.h" #include "random/generators.h" +#include "resource_mgmt/memory_groups.h" #include "test_utils/fixture.h" #include @@ -87,8 +91,7 @@ FIXTURE_TEST(unregister_node, partition_allocator_fixture) { } FIXTURE_TEST(allocation_over_core_capacity, partition_allocator_fixture) { - const auto partition_count - = partition_allocator_fixture::partitions_per_shard + 1; + const auto partition_count = this->partitions_per_shard + 1; register_node(0, 1); { @@ -116,20 +119,126 @@ FIXTURE_TEST(allocation_over_core_capacity, partition_allocator_fixture) { } FIXTURE_TEST( - allocation_over_memory_capacity, partition_allocator_memory_limited_fixture) { + allocation_memory_limited, partition_allocator_memory_limited_fixture) { + memory_groups_holder().reset(); + const auto partitions_share = 10; + ss::smp::invoke_on_all([partitions_share] { + config::shard_local_cfg() + .topic_partitions_max_memory_allocation_share.set_value( + partitions_share); + config::shard_local_cfg().topic_memory_per_partition.set_value(200_KiB); + }).get(); + register_node(0, 1); + const auto allowed_partitions + = gb_per_core * GiB / 100.0 * partitions_share + / config::shard_local_cfg().topic_memory_per_partition().value(); + { - auto result = allocator().allocate(make_allocation_request(1, 1)).get(); + auto result = allocator() + .allocate( + make_allocation_request(allowed_partitions + 1, 1)) + .get(); BOOST_REQUIRE(result.has_error()); BOOST_REQUIRE_EQUAL( result.assume_error(), cluster::make_error_code( cluster::errc::topic_invalid_partitions_memory_limit)); } + { - auto result - = allocator().allocate(make_simple_allocation_request(1, 1)).get(); + auto result = allocator() + .allocate(make_simple_allocation_request( + allowed_partitions + 1, 1)) + .get(); + BOOST_REQUIRE(result.has_error()); + BOOST_REQUIRE_EQUAL( + result.assume_error(), + cluster::make_error_code( + cluster::errc::topic_invalid_partitions_memory_limit)); + } +} + +FIXTURE_TEST( + allocation_memory_limited_legacy_config_overwrites, + partition_allocator_memory_limited_fixture) { + memory_groups_holder().reset(); + const auto partitions_share = 1; + ss::smp::invoke_on_all([partitions_share] { + config::shard_local_cfg() + .topic_partitions_max_memory_allocation_share.set_value( + partitions_share); + config::shard_local_cfg().topic_memory_per_partition.set_value(1_MiB); + }).get(); + + register_node(0, 1); + + const auto allowed_partitions + = gb_per_core * GiB / 100.0 * partitions_share + / config::shard_local_cfg().topic_memory_per_partition().value(); + + const auto legacy_allowed_partitions + = gb_per_core * GiB + / config::shard_local_cfg().topic_memory_per_partition().value(); + + BOOST_REQUIRE_GT(legacy_allowed_partitions, allowed_partitions); + + { + auto result = allocator() + .allocate( + make_allocation_request(allowed_partitions + 1, 1)) + .get(); + BOOST_REQUIRE(!result.has_error()); + } + + { + auto result = allocator() + .allocate(make_simple_allocation_request( + allowed_partitions + 1, 1)) + .get(); + BOOST_REQUIRE(!result.has_error()); + } +} + +FIXTURE_TEST( + allocation_memory_limited_lowering_mem_limit, + partition_allocator_memory_limited_fixture) { + memory_groups_holder().reset(); + const auto partitions_share = 5; + const auto topic_memory_per_partition + = cluster::DEFAULT_TOPIC_MEMORY_PER_PARTITION / 2; + + ss::smp::invoke_on_all([partitions_share, topic_memory_per_partition] { + config::shard_local_cfg() + .topic_partitions_max_memory_allocation_share.set_value( + partitions_share); + config::shard_local_cfg().topic_memory_per_partition.set_value( + topic_memory_per_partition); + }).get(); + + register_node(0, 1); + + const auto allowed_partitions = gb_per_core * GiB / 100.0 * partitions_share + / topic_memory_per_partition; + + { + auto result = allocator() + .allocate( + make_allocation_request(allowed_partitions + 1, 1)) + .get(); + BOOST_REQUIRE(result.has_error()); + BOOST_REQUIRE_EQUAL( + result.assume_error(), + cluster::make_error_code( + cluster::errc::topic_invalid_partitions_memory_limit)); + } + + { + auto result = allocator() + .allocate(make_simple_allocation_request( + allowed_partitions + 1, 1)) + .get(); BOOST_REQUIRE(result.has_error()); BOOST_REQUIRE_EQUAL( result.assume_error(), @@ -546,8 +655,7 @@ FIXTURE_TEST(updating_nodes_properties, partition_allocator_fixture) { BOOST_REQUIRE_EQUAL(it->second->allocated_partitions(), allocated); BOOST_REQUIRE_EQUAL( it->second->max_capacity(), - 10 * partition_allocator_fixture::partitions_per_shard - - partitions_reserve_shard0()); + 10 * this->partitions_per_shard - partitions_reserve_shard0()); } FIXTURE_TEST(change_replication_factor, partition_allocator_fixture) { diff --git a/src/v/cluster/tests/partition_balancer_planner_fixture.h b/src/v/cluster/tests/partition_balancer_planner_fixture.h index 4699b9f7f4eb0..139b501543a2d 100644 --- a/src/v/cluster/tests/partition_balancer_planner_fixture.h +++ b/src/v/cluster/tests/partition_balancer_planner_fixture.h @@ -24,6 +24,7 @@ #include "cluster/tests/utils.h" #include "cluster/topic_updates_dispatcher.h" #include "cluster/types.h" +#include "config/configuration.h" #include "container/fragmented_vector.h" #include "features/feature_table.h" #include "model/metadata.h" @@ -78,7 +79,6 @@ struct controller_workers { .start_single( std::ref(members), std::ref(features), - config::mock_binding>(std::nullopt), config::mock_binding>(std::nullopt), config::mock_binding(uint32_t{partitions_per_shard}), config::mock_binding(uint32_t{partitions_reserve_shard0}), @@ -89,6 +89,8 @@ struct controller_workers { "_schemas"}}), config::mock_binding(true)) .get(); + config::shard_local_cfg().topic_memory_per_partition.set_value( + std::nullopt); // use node status that is not used in test as self is always available node_status_table.start_single(model::node_id{123}).get(); state diff --git a/src/v/cluster/tests/topic_table_fixture.h b/src/v/cluster/tests/topic_table_fixture.h index e13b235027cb8..df80f03934396 100644 --- a/src/v/cluster/tests/topic_table_fixture.h +++ b/src/v/cluster/tests/topic_table_fixture.h @@ -20,6 +20,7 @@ #include "cluster/scheduling/partition_allocator.h" #include "cluster/tests/utils.h" #include "cluster/topic_table.h" +#include "config/configuration.h" #include "config/property.h" #include "features/feature_table.h" #include "model/metadata.h" @@ -44,13 +45,14 @@ struct topic_table_fixture { .start_single( std::ref(members), std::ref(features), - config::mock_binding>(std::nullopt), config::mock_binding>(std::nullopt), config::mock_binding(uint32_t{partitions_per_shard}), config::mock_binding(uint32_t{partitions_reserve_shard0}), config::mock_binding>({}), config::mock_binding(false)) .get(); + config::shard_local_cfg().topic_memory_per_partition.set_value( + std::nullopt); allocator.local().register_node( create_allocation_node(model::node_id(1), 8)); allocator.local().register_node( diff --git a/src/v/config/BUILD b/src/v/config/BUILD index 69c15f7b64900..e95b0c0f2c0ef 100644 --- a/src/v/config/BUILD +++ b/src/v/config/BUILD @@ -43,6 +43,9 @@ redpanda_cc_library( "validation_error.h", "validators.h", ], + implementation_deps = [ + "//src/v/cluster:topic_memory_per_partition_default", + ], include_prefix = "config", visibility = ["//visibility:public"], deps = [ diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 0216da60758e8..fefb9162ac6d6 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -10,6 +10,7 @@ #include "config/configuration.h" #include "base/units.h" +#include "cluster/scheduling/topic_memory_per_partition_default.h" #include "config/base_property.h" #include "config/bounded_property.h" #include "config/node_config.h" @@ -290,11 +291,11 @@ configuration::configuration() "topic_memory_per_partition", "Required memory per partition when creating topics.", {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, - 4_MiB, + cluster::DEFAULT_TOPIC_MEMORY_PER_PARTITION, { .min = 1, // Must be nonzero, it's a divisor .max = 100_MiB // Rough 'sanity' limit: a machine with 1GB RAM must be - // able to create at least 10 partitions}) + // able to create at least 10 partitions }) , topic_fds_per_partition( *this, @@ -313,7 +314,7 @@ configuration::configuration() "Maximum number of partitions which may be allocated to one shard (CPU " "core).", {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, - 1000, + 5000, { .min = 16, // Forbid absurdly small values that would prevent most // practical workloads from running @@ -340,7 +341,7 @@ configuration::configuration() "topic_partitions_max_memory_allocation_share", "foo", {.needs_restart = needs_restart::yes, .visibility = visibility::tunable}, - 20, + 10, { .min = 1, }) diff --git a/tests/rptest/tests/resource_limits_test.py b/tests/rptest/tests/resource_limits_test.py index 729c1df75bef5..6d005d2a2e450 100644 --- a/tests/rptest/tests/resource_limits_test.py +++ b/tests/rptest/tests/resource_limits_test.py @@ -91,7 +91,7 @@ def test_cpu_limited(self): # Three nodes, each with 1 core, 1000 partition-replicas # per core, so with replicas=3, 1000 partitions should be the limit try: - rpk.create_topic("toobig", partitions=1500, replicas=3) + rpk.create_topic("toobig", partitions=5500, replicas=3) except RpkException as e: assert 'INVALID_PARTITIONS' in e.msg else: