diff --git a/src/v/cluster/topics_frontend.cc b/src/v/cluster/topics_frontend.cc index 44ac566f0cb3..b4061d837b10 100644 --- a/src/v/cluster/topics_frontend.cc +++ b/src/v/cluster/topics_frontend.cc @@ -74,18 +74,30 @@ std::vector get_enterprise_features(const cluster::topic_configuration& cfg) { std::vector features; const auto si_disabled = model::shadow_indexing_mode::disabled; - if (cfg.properties.shadow_indexing.value_or(si_disabled) != si_disabled) { - features.emplace_back("tiered storage"); - } - if (cfg.is_recovery_enabled()) { - features.emplace_back("topic recovery"); - } - if (cfg.is_read_replica()) { - features.emplace_back("remote read replicas"); + // Only enforce tiered storage topic config sanctions when cloud storage is + // enabled for the cluster + if (config::shard_local_cfg().cloud_storage_enabled.is_restricted()) { + if ( + cfg.properties.shadow_indexing.value_or(si_disabled) != si_disabled) { + features.emplace_back("tiered storage"); + } + if (cfg.is_recovery_enabled()) { + features.emplace_back("topic recovery"); + } + if (cfg.is_read_replica()) { + features.emplace_back("remote read replicas"); + } } - if (cfg.is_schema_id_validation_enabled()) { - features.emplace_back("schema ID validation"); + + // Only enforce schema ID validation topic configs if Schema ID validation + // is enabled for the cluster + if (config::shard_local_cfg().enable_schema_id_validation.is_restricted()) { + if (cfg.is_schema_id_validation_enabled()) { + features.emplace_back("schema ID validation"); + } } + + // We are always enforcing leadership preference restrictions if (const auto& leaders_pref = cfg.properties.leaders_preference; leaders_pref.has_value() && config::shard_local_cfg() @@ -111,11 +123,15 @@ std::vector get_enterprise_features( std::vector features; const auto si_disabled = model::shadow_indexing_mode::disabled; - if ( - (properties.shadow_indexing.value_or(si_disabled) - < updated_properties.shadow_indexing.value_or(si_disabled)) - || (properties.remote_delete < updated_properties.remote_delete)) { - features.emplace_back("tiered storage"); + // Only enforce tiered storage topic config sanctions when cloud storage is + // enabled for the cluster + if (config::shard_local_cfg().cloud_storage_enabled.is_restricted()) { + if ( + (properties.shadow_indexing.value_or(si_disabled) + < updated_properties.shadow_indexing.value_or(si_disabled)) + || (properties.remote_delete < updated_properties.remote_delete)) { + features.emplace_back("tiered storage"); + } } static constexpr auto key_schema_id_validation_enabled = @@ -163,12 +179,16 @@ std::vector get_enterprise_features( up.record_value_subject_name_strategy_compat)); }; - if ( - ((key_schema_id_validation_enabled(properties) - < key_schema_id_validation_enabled(updated_properties)) - || (value_schema_id_validation_enabled(properties) < value_schema_id_validation_enabled(updated_properties))) - || (schema_id_validation_enabled(updated_properties) && sns_modified())) { - features.emplace_back("schema id validation"); + // Only enforce schema ID validation topic configs if Schema ID validation + // is enabled for the cluster + if (config::shard_local_cfg().enable_schema_id_validation.is_restricted()) { + if ( + ((key_schema_id_validation_enabled(properties) + < key_schema_id_validation_enabled(updated_properties)) + || (value_schema_id_validation_enabled(properties) < value_schema_id_validation_enabled(updated_properties))) + || (schema_id_validation_enabled(updated_properties) && sns_modified())) { + features.emplace_back("schema id validation"); + } } if (const auto& updated_pref = updated_properties.leaders_preference; diff --git a/src/v/config/property.h b/src/v/config/property.h index c2900d932400..a118e2c6da46 100644 --- a/src/v/config/property.h +++ b/src/v/config/property.h @@ -1091,6 +1091,11 @@ class enterprise : public P { return std::nullopt; } + /** + * @brief Checks current value of property to see if it is restricted + */ + bool is_restricted() const { return do_check_restricted(this->value()); } + private: bool do_check_restricted(const T& setting) const final { // depending on how the restriction was defined, construct an applicable diff --git a/src/v/config/tests/enterprise_property_test.cc b/src/v/config/tests/enterprise_property_test.cc index 3bda3965fba5..65ea48fcb592 100644 --- a/src/v/config/tests/enterprise_property_test.cc +++ b/src/v/config/tests/enterprise_property_test.cc @@ -109,4 +109,34 @@ TEST(EnterprisePropertyTest, TestTypeName) { EXPECT_EQ(cfg.enterprise_enum.type_name(), "string"); } +TEST(EnterprisePropertyTest, TestIsRestricted) { + test_config cfg; + cfg.enterprise_bool.set_value(false); + EXPECT_FALSE(cfg.enterprise_bool.is_restricted()); + cfg.enterprise_bool.set_value(true); + EXPECT_TRUE(cfg.enterprise_bool.is_restricted()); + + cfg.enterprise_str_enum.set_value("foo"); + EXPECT_FALSE(cfg.enterprise_str_enum.is_restricted()); + cfg.enterprise_str_enum.set_value("bar"); + EXPECT_TRUE(cfg.enterprise_str_enum.is_restricted()); + + cfg.enterprise_str_vec.set_value( + std::vector{"foo", "bar", "baz"}); + EXPECT_FALSE(cfg.enterprise_str_vec.is_restricted()); + cfg.enterprise_str_vec.set_value( + std::vector{"foo", "bar", "baz", "GSSAPI"}); + EXPECT_TRUE(cfg.enterprise_str_vec.is_restricted()); + + cfg.enterprise_opt_int.set_value(10); + EXPECT_FALSE(cfg.enterprise_opt_int.is_restricted()); + cfg.enterprise_opt_int.set_value(10000); + EXPECT_TRUE(cfg.enterprise_opt_int.is_restricted()); + + cfg.enterprise_enum.set_value(tls_version::v1_0); + EXPECT_FALSE(cfg.enterprise_enum.is_restricted()); + cfg.enterprise_enum.set_value(tls_version::v1_3); + EXPECT_TRUE(cfg.enterprise_enum.is_restricted()); +} + } // namespace config diff --git a/src/v/kafka/server/tests/alter_config_test.cc b/src/v/kafka/server/tests/alter_config_test.cc index a5b4ecde0ca6..e07080ef05fe 100644 --- a/src/v/kafka/server/tests/alter_config_test.cc +++ b/src/v/kafka/server/tests/alter_config_test.cc @@ -238,6 +238,376 @@ class alter_config_test_fixture : public topic_properties_test_fixture { BOOST_CHECK_EQUAL(cfg_it->value, value); } } + + void alter_config_no_license_test(bool enable_cluster_config) { + using props_t = absl::flat_hash_map; + using alter_props_t = absl::flat_hash_map< + ss::sstring, + std:: + pair, kafka::config_resource_operation>>; + using skip_create = ss::bool_class; + struct test_case { + ss::sstring tp_raw; + props_t props; + alter_props_t alteration; + kafka::error_code expected; + skip_create skip{skip_create::no}; + }; + std::vector test_cases; + + constexpr auto success = kafka::error_code::none; + constexpr auto failure = kafka::error_code::invalid_config; + + constexpr auto with = + [](std::string_view prop, auto val) -> props_t::value_type { + return {ss::sstring{prop}, ssx::sformat("{}", val)}; + }; + + constexpr auto set = + [](std::string_view prop, auto val) -> alter_props_t::value_type { + return { + ss::sstring{prop}, + {ssx::sformat("{}", val), kafka::config_resource_operation::set}}; + }; + + constexpr auto remove = + [](std::string_view prop) -> alter_props_t::value_type { + return { + ss::sstring{prop}, + {std::nullopt, kafka::config_resource_operation::remove}}; + }; + + const auto enterprise_props = + [enable_cluster_config]() -> std::vector { + // If we aren't enabling Schema ID validation cluster config, + // then skip testing those topic properties + if (enable_cluster_config) { + return { + kafka::topic_property_remote_read, + kafka::topic_property_remote_write, + kafka::topic_property_record_key_schema_id_validation, + kafka::topic_property_record_key_schema_id_validation_compat, + kafka::topic_property_record_value_schema_id_validation, + kafka:: + topic_property_record_value_schema_id_validation_compat, + }; + } else { + return { + kafka::topic_property_remote_read, + kafka::topic_property_remote_write, + }; + } + }(); + + const auto non_enterprise_prop = props_t::value_type{ + kafka::topic_property_max_message_bytes, "4096"}; + + for (const auto& p : enterprise_props) { + // A topic without an enterprise property set, and then enable it + test_cases.emplace_back( + ssx::sformat("enable_{}", p), + props_t{}, + alter_props_t{{set(p, true)}}, + enable_cluster_config ? failure : success); + // A topic with an enterprise property set, and then set it to false + test_cases.emplace_back( + ssx::sformat("set_false_{}", p), + props_t{with(p, true)}, + alter_props_t{{set(p, false)}}, + success); + // A topic with an enterprise property set, and then remove it + test_cases.emplace_back( + ssx::sformat("remove_{}", p), + props_t{with(p, true)}, + alter_props_t{{remove(p)}}, + success); + // A topic with an enterprise property set, and then change + // non-enterprise property + test_cases.emplace_back( + ssx::sformat("set_other_{}", p), + props_t{with(p, true)}, + alter_props_t{{std::apply(set, non_enterprise_prop)}}, + success); + // A topic with an enterprise property set, and then remove + // non-enterprise property + test_cases.emplace_back( + ssx::sformat("remove_other_{}", p), + props_t{with(p, true), non_enterprise_prop}, + alter_props_t{{remove(non_enterprise_prop.first)}}, + success); + + // Skip creating topic. Expect no sanctions. + // Alter operations should fail downstream. + test_cases.emplace_back( + ssx::sformat("skip_create_{}", p), + props_t{}, + alter_props_t{{set(p, true)}}, + kafka::error_code::unknown_topic_or_partition, + skip_create::yes); + } + + // Specific tests for tiered storage + { + const auto full_si = props_t{ + with(kafka::topic_property_remote_read, true), + with(kafka::topic_property_remote_write, true), + with(kafka::topic_property_remote_delete, true)}; + test_cases.emplace_back( + "remove_remote.read_from_full", + full_si, + alter_props_t{{remove(kafka::topic_property_remote_read)}}, + success); + test_cases.emplace_back( + "remove_remote.write_from_full", + full_si, + alter_props_t{{remove(kafka::topic_property_remote_write)}}, + success); + test_cases.emplace_back( + "remove_remote.delete_from_full", + full_si, + alter_props_t{{remove(kafka::topic_property_remote_delete)}}, + success); + test_cases.emplace_back( + "enable_remote.delete", + props_t{with(kafka::topic_property_remote_delete, false)}, + alter_props_t{{set(kafka::topic_property_remote_delete, true)}}, + enable_cluster_config ? failure : success); + } + + // Specific tests for schema id validation subject name strategy + if (enable_cluster_config) { + using sns = pandaproxy::schema_registry::subject_name_strategy; + + const auto full_validation = props_t{ + with(kafka::topic_property_record_key_schema_id_validation, true), + with( + kafka::topic_property_record_value_schema_id_validation, true), + }; + test_cases.emplace_back( + "set_key_sns", + full_validation, + alter_props_t{ + {set( + kafka::topic_property_record_key_subject_name_strategy, + sns::topic_name)}, + }, + failure); + test_cases.emplace_back( + "set_value_sns", + full_validation, + alter_props_t{ + {set( + kafka::topic_property_record_value_subject_name_strategy, + sns::topic_name)}, + }, + failure); + + const auto key_validation = props_t{ + with(kafka::topic_property_record_key_schema_id_validation, true), + }; + test_cases.emplace_back( + "set_value_after_key", + key_validation, + alter_props_t{{set( + kafka::topic_property_record_value_schema_id_validation_compat, + true)}}, + failure); + test_cases.emplace_back( + "unset_key", + key_validation, + alter_props_t{{set( + kafka::topic_property_record_key_schema_id_validation, false)}}, + success); + + const auto value_validation = props_t{ + with( + kafka::topic_property_record_value_schema_id_validation, true), + }; + test_cases.emplace_back( + "set_key_after_value", + value_validation, + alter_props_t{{set( + kafka::topic_property_record_key_schema_id_validation_compat, + true)}}, + failure); + test_cases.emplace_back( + "unset_value", + value_validation, + alter_props_t{{set( + kafka::topic_property_record_value_schema_id_validation, + false)}}, + success); + + const auto validation_with_strat = props_t{ + with(kafka::topic_property_record_key_schema_id_validation, true), + with( + kafka::topic_property_record_value_schema_id_validation, true), + with( + kafka::topic_property_record_key_subject_name_strategy, + sns::topic_name), + with( + kafka::topic_property_record_value_subject_name_strategy, + sns::topic_name), + }; + test_cases.emplace_back( + "change_key_sns", + validation_with_strat, + alter_props_t{ + {set( + kafka::topic_property_record_key_subject_name_strategy, + sns::record_name)}, + }, + failure); + test_cases.emplace_back( + "change_value_sns", + validation_with_strat, + alter_props_t{ + {set( + kafka::topic_property_record_value_subject_name_strategy, + sns::record_name)}, + }, + failure); + test_cases.emplace_back( + "remove_key_sns", + validation_with_strat, + alter_props_t{{remove( + kafka::topic_property_record_key_subject_name_strategy)}}, + success); + + test_cases.emplace_back( + "remove_value_sns", + validation_with_strat, + alter_props_t{{remove( + kafka::topic_property_record_value_subject_name_strategy)}}, + success); + } + + // NOTE(oren): w/o schema validation enabled at the cluster level, + // related properties will be ignored on the topic create path. stick to + // COMPAT here because it's a superset of REDPANDA. + if (enable_cluster_config) { + update_cluster_config("enable_schema_id_validation", "compat"); + update_cluster_config("cloud_storage_enabled", "true"); + } + auto unset_cluster_config = ss::defer([&] { + update_cluster_config("enable_schema_id_validation", "none"); + update_cluster_config("cloud_storage_enabled", "false"); + }); + + // Specific tests for leadership pinning + { + const config::leaders_preference no_preference{}; + const config::leaders_preference pref_a{ + .type = config::leaders_preference::type_t::racks, + .racks = {model::rack_id{"A"}}}; + const config::leaders_preference pref_b{ + .type = config::leaders_preference::type_t::racks, + .racks = {model::rack_id{"A"}, model::rack_id{"B"}}}; + + test_cases.emplace_back( + "leaders_preference.enable", + props_t{}, + alter_props_t{ + {set(kafka::topic_property_leaders_preference, pref_a)}}, + failure); + test_cases.emplace_back( + "leaders_preference.change", + props_t{with(kafka::topic_property_leaders_preference, pref_a)}, + alter_props_t{ + {set(kafka::topic_property_leaders_preference, pref_b)}}, + failure); + test_cases.emplace_back( + "leaders_preference.no_change", + props_t{with(kafka::topic_property_leaders_preference, pref_a)}, + alter_props_t{ + {set(kafka::topic_property_leaders_preference, pref_a)}}, + success); + test_cases.emplace_back( + "leaders_preference.unset", + props_t{with(kafka::topic_property_leaders_preference, pref_a)}, + alter_props_t{{remove(kafka::topic_property_leaders_preference)}}, + success); + test_cases.emplace_back( + "leaders_preference.disable", + props_t{with(kafka::topic_property_leaders_preference, pref_a)}, + alter_props_t{ + {set(kafka::topic_property_leaders_preference, no_preference)}}, + success); + } + + // Create the topics for the tests + constexpr auto inc_alter_topic = [](std::string_view tp_raw) { + return model::topic{ssx::sformat("incremental_alter_{}", tp_raw)}; + }; + constexpr auto alter_topic = [](std::string_view tp_raw) { + return model::topic{ssx::sformat("alter_{}", tp_raw)}; + }; + + for (const auto& [tp_raw, props, alteration, expected, skip] : + test_cases) { + BOOST_TEST_CONTEXT(fmt::format("topic: {}", tp_raw)) { + BOOST_REQUIRE( + skip + || !create_topic(inc_alter_topic(tp_raw), props, 3) + .data.errored()); + BOOST_REQUIRE( + skip + || !create_topic(alter_topic(tp_raw), props, 3) + .data.errored()); + } + + revoke_license(); + + // Test incremental alter config + auto tp = inc_alter_topic(tp_raw); + BOOST_TEST_CONTEXT(tp) { + auto resp = incremental_alter_configs( + make_incremental_alter_topic_config_resource_cv( + tp, alteration)); + BOOST_REQUIRE_EQUAL(resp.data.responses.size(), 1); + BOOST_CHECK_EQUAL(resp.data.responses[0].error_code, expected); + if (expected == failure) { + BOOST_CHECK( + resp.data.responses[0] + .error_message.value_or("") + .contains( + features::enterprise_error_message::required)); + } + } + + delete_topic( + model::topic_namespace{model::kafka_namespace, std::move(tp)}) + .get(); + + // Test alter config + tp = alter_topic(tp_raw); + BOOST_TEST_CONTEXT(tp) { + auto properties = props; + for (const auto& a : alteration) { + if ( + a.second.second + == kafka::config_resource_operation::remove) { + properties.erase(a.first); + } else if ( + a.second.second + == kafka::config_resource_operation::set) { + properties.insert_or_assign( + a.first, a.second.first.value()); + }; + } + + auto resp = alter_configs( + make_alter_topic_config_resource_cv(tp, properties)); + BOOST_REQUIRE_EQUAL(resp.data.responses.size(), 1); + BOOST_CHECK_EQUAL(resp.data.responses[0].error_code, expected); + } + delete_topic( + model::topic_namespace{model::kafka_namespace, std::move(tp)}) + .get(); + + reinstall_license(); + } + } }; FIXTURE_TEST( @@ -1206,339 +1576,10 @@ FIXTURE_TEST( } FIXTURE_TEST(test_unlicensed_alter_configs, alter_config_test_fixture) { - using props_t = absl::flat_hash_map; - using alter_props_t = absl::flat_hash_map< - ss::sstring, - std::pair, kafka::config_resource_operation>>; - using skip_create = ss::bool_class; - struct test_case { - ss::sstring tp_raw; - props_t props; - alter_props_t alteration; - kafka::error_code expected; - skip_create skip{skip_create::no}; - }; - std::vector test_cases; - - constexpr auto success = kafka::error_code::none; - constexpr auto failure = kafka::error_code::invalid_config; - - constexpr auto with = - [](std::string_view prop, auto val) -> props_t::value_type { - return {ss::sstring{prop}, ssx::sformat("{}", val)}; - }; - - constexpr auto set = - [](std::string_view prop, auto val) -> alter_props_t::value_type { - return { - ss::sstring{prop}, - {ssx::sformat("{}", val), kafka::config_resource_operation::set}}; - }; - - constexpr auto remove = - [](std::string_view prop) -> alter_props_t::value_type { - return { - ss::sstring{prop}, - {std::nullopt, kafka::config_resource_operation::remove}}; - }; - - const auto enterprise_props = { - kafka::topic_property_remote_read, - kafka::topic_property_remote_write, - kafka::topic_property_record_key_schema_id_validation, - kafka::topic_property_record_key_schema_id_validation_compat, - kafka::topic_property_record_value_schema_id_validation, - kafka::topic_property_record_value_schema_id_validation_compat, - }; - - const auto non_enterprise_prop = props_t::value_type{ - kafka::topic_property_max_message_bytes, "4096"}; - - for (const auto& p : enterprise_props) { - // A topic without an enterprise property set, and then enable it - test_cases.emplace_back( - ssx::sformat("enable_{}", p), - props_t{}, - alter_props_t{{set(p, true)}}, - failure); - // A topic with an enterprise property set, and then set it to false - test_cases.emplace_back( - ssx::sformat("set_false_{}", p), - props_t{with(p, true)}, - alter_props_t{{set(p, false)}}, - success); - // A topic with an enterprise property set, and then remove it - test_cases.emplace_back( - ssx::sformat("remove_{}", p), - props_t{with(p, true)}, - alter_props_t{{remove(p)}}, - success); - // A topic with an enterprise property set, and then change - // non-enterprise property - test_cases.emplace_back( - ssx::sformat("set_other_{}", p), - props_t{with(p, true)}, - alter_props_t{{std::apply(set, non_enterprise_prop)}}, - success); - // A topic with an enterprise property set, and then remove - // non-enterprise property - test_cases.emplace_back( - ssx::sformat("remove_other_{}", p), - props_t{with(p, true), non_enterprise_prop}, - alter_props_t{{remove(non_enterprise_prop.first)}}, - success); - - // Skip creating topic. Expect no sanctions. - // Alter operations should fail downstream. - test_cases.emplace_back( - ssx::sformat("skip_create_{}", p), - props_t{}, - alter_props_t{{set(p, true)}}, - kafka::error_code::unknown_topic_or_partition, - skip_create::yes); - } - - // Specific tests for tiered storage - { - const auto full_si = props_t{ - with(kafka::topic_property_remote_read, true), - with(kafka::topic_property_remote_write, true), - with(kafka::topic_property_remote_delete, true)}; - test_cases.emplace_back( - "remove_remote.read_from_full", - full_si, - alter_props_t{{remove(kafka::topic_property_remote_read)}}, - success); - test_cases.emplace_back( - "remove_remote.write_from_full", - full_si, - alter_props_t{{remove(kafka::topic_property_remote_write)}}, - success); - test_cases.emplace_back( - "remove_remote.delete_from_full", - full_si, - alter_props_t{{remove(kafka::topic_property_remote_delete)}}, - success); - test_cases.emplace_back( - "enable_remote.delete", - props_t{with(kafka::topic_property_remote_delete, false)}, - alter_props_t{{set(kafka::topic_property_remote_delete, true)}}, - failure); - } - - // Specific tests for schema id validation subject name strategy - { - using sns = pandaproxy::schema_registry::subject_name_strategy; - - const auto full_validation = props_t{ - with(kafka::topic_property_record_key_schema_id_validation, true), - with(kafka::topic_property_record_value_schema_id_validation, true), - }; - test_cases.emplace_back( - "set_key_sns", - full_validation, - alter_props_t{ - {set( - kafka::topic_property_record_key_subject_name_strategy, - sns::topic_name)}, - }, - failure); - test_cases.emplace_back( - "set_value_sns", - full_validation, - alter_props_t{ - {set( - kafka::topic_property_record_value_subject_name_strategy, - sns::topic_name)}, - }, - failure); - - const auto key_validation = props_t{ - with(kafka::topic_property_record_key_schema_id_validation, true), - }; - test_cases.emplace_back( - "set_value_after_key", - key_validation, - alter_props_t{{set( - kafka::topic_property_record_value_schema_id_validation_compat, - true)}}, - failure); - test_cases.emplace_back( - "unset_key", - key_validation, - alter_props_t{{set( - kafka::topic_property_record_key_schema_id_validation, false)}}, - success); - - const auto value_validation = props_t{ - with(kafka::topic_property_record_value_schema_id_validation, true), - }; - test_cases.emplace_back( - "set_key_after_value", - value_validation, - alter_props_t{{set( - kafka::topic_property_record_key_schema_id_validation_compat, - true)}}, - failure); - test_cases.emplace_back( - "unset_value", - value_validation, - alter_props_t{{set( - kafka::topic_property_record_value_schema_id_validation, false)}}, - success); - - const auto validation_with_strat = props_t{ - with(kafka::topic_property_record_key_schema_id_validation, true), - with(kafka::topic_property_record_value_schema_id_validation, true), - with( - kafka::topic_property_record_key_subject_name_strategy, - sns::topic_name), - with( - kafka::topic_property_record_value_subject_name_strategy, - sns::topic_name), - }; - test_cases.emplace_back( - "change_key_sns", - validation_with_strat, - alter_props_t{ - {set( - kafka::topic_property_record_key_subject_name_strategy, - sns::record_name)}, - }, - failure); - test_cases.emplace_back( - "change_value_sns", - validation_with_strat, - alter_props_t{ - {set( - kafka::topic_property_record_value_subject_name_strategy, - sns::record_name)}, - }, - failure); - test_cases.emplace_back( - "remove_key_sns", - validation_with_strat, - alter_props_t{ - {remove(kafka::topic_property_record_key_subject_name_strategy)}}, - success); - - test_cases.emplace_back( - "remove_value_sns", - validation_with_strat, - alter_props_t{ - {remove(kafka::topic_property_record_value_subject_name_strategy)}}, - success); - } - - // NOTE(oren): w/o schema validation enabled at the cluster level, related - // properties will be ignored on the topic create path. stick to COMPAT here - // because it's a superset of REDPANDA. - update_cluster_config("enable_schema_id_validation", "compat"); - - // Specific tests for leadership pinning - { - const config::leaders_preference no_preference{}; - const config::leaders_preference pref_a{ - .type = config::leaders_preference::type_t::racks, - .racks = {model::rack_id{"A"}}}; - const config::leaders_preference pref_b{ - .type = config::leaders_preference::type_t::racks, - .racks = {model::rack_id{"A"}, model::rack_id{"B"}}}; - - test_cases.emplace_back( - "leaders_preference.enable", - props_t{}, - alter_props_t{ - {set(kafka::topic_property_leaders_preference, pref_a)}}, - failure); - test_cases.emplace_back( - "leaders_preference.change", - props_t{with(kafka::topic_property_leaders_preference, pref_a)}, - alter_props_t{ - {set(kafka::topic_property_leaders_preference, pref_b)}}, - failure); - test_cases.emplace_back( - "leaders_preference.no_change", - props_t{with(kafka::topic_property_leaders_preference, pref_a)}, - alter_props_t{ - {set(kafka::topic_property_leaders_preference, pref_a)}}, - success); - test_cases.emplace_back( - "leaders_preference.unset", - props_t{with(kafka::topic_property_leaders_preference, pref_a)}, - alter_props_t{{remove(kafka::topic_property_leaders_preference)}}, - success); - test_cases.emplace_back( - "leaders_preference.disable", - props_t{with(kafka::topic_property_leaders_preference, pref_a)}, - alter_props_t{ - {set(kafka::topic_property_leaders_preference, no_preference)}}, - success); - } - - // Create the topics for the tests - constexpr auto inc_alter_topic = [](std::string_view tp_raw) { - return model::topic{ssx::sformat("incremental_alter_{}", tp_raw)}; - }; - constexpr auto alter_topic = [](std::string_view tp_raw) { - return model::topic{ssx::sformat("alter_{}", tp_raw)}; - }; - - for (const auto& [tp_raw, props, alteration, expected, skip] : test_cases) { - BOOST_TEST_CONTEXT(fmt::format("topic: {}", tp_raw)) { - BOOST_REQUIRE( - skip - || !create_topic(inc_alter_topic(tp_raw), props, 3) - .data.errored()); - BOOST_REQUIRE( - skip - || !create_topic(alter_topic(tp_raw), props, 3).data.errored()); - } - - revoke_license(); - - // Test incremental alter config - auto tp = inc_alter_topic(tp_raw); - BOOST_TEST_CONTEXT(tp) { - auto resp = incremental_alter_configs( - make_incremental_alter_topic_config_resource_cv(tp, alteration)); - BOOST_REQUIRE_EQUAL(resp.data.responses.size(), 1); - BOOST_CHECK_EQUAL(resp.data.responses[0].error_code, expected); - if (expected == failure) { - BOOST_CHECK( - resp.data.responses[0].error_message.value_or("").contains( - features::enterprise_error_message::required)); - } - } - - delete_topic( - model::topic_namespace{model::kafka_namespace, std::move(tp)}) - .get(); - - // Test alter config - tp = alter_topic(tp_raw); - BOOST_TEST_CONTEXT(tp) { - auto properties = props; - for (const auto& a : alteration) { - if ( - a.second.second == kafka::config_resource_operation::remove) { - properties.erase(a.first); - } else if ( - a.second.second == kafka::config_resource_operation::set) { - properties.insert_or_assign( - a.first, a.second.first.value()); - }; - } - - auto resp = alter_configs( - make_alter_topic_config_resource_cv(tp, properties)); - BOOST_REQUIRE_EQUAL(resp.data.responses.size(), 1); - BOOST_CHECK_EQUAL(resp.data.responses[0].error_code, expected); - } - delete_topic( - model::topic_namespace{model::kafka_namespace, std::move(tp)}) - .get(); + alter_config_no_license_test(true); +} - reinstall_license(); - } +FIXTURE_TEST( + test_unlicensed_alter_configs_no_cluster_config, alter_config_test_fixture) { + alter_config_no_license_test(false); } diff --git a/src/v/kafka/server/tests/create_partition_test.cc b/src/v/kafka/server/tests/create_partition_test.cc index fc1228c85d04..b88f335a924b 100644 --- a/src/v/kafka/server/tests/create_partition_test.cc +++ b/src/v/kafka/server/tests/create_partition_test.cc @@ -35,7 +35,13 @@ FIXTURE_TEST( prop, props_t{{ss::sstring{prop}, ssx::sformat("{}", value)}}); }; + update_cluster_config(lconf().cloud_storage_enabled.name(), "true"); update_cluster_config(lconf().enable_schema_id_validation.name(), "compat"); + auto unset_cluster_config = ss::defer([&] { + update_cluster_config(lconf().cloud_storage_enabled.name(), "false"); + update_cluster_config( + lconf().enable_schema_id_validation.name(), "none"); + }); std::initializer_list enterprise_props{ // si_props @@ -85,3 +91,48 @@ FIXTURE_TEST( } } } + +FIXTURE_TEST( + test_unlicensed_topic_prop_create_partition_no_cluster_config, + topic_properties_test_fixture) { + using props_t = absl::flat_hash_map; + using test_t = std::pair; + const auto with = [](const std::string_view prop, const auto value) { + return std::make_pair( + prop, props_t{{ss::sstring{prop}, ssx::sformat("{}", value)}}); + }; + + std::initializer_list enterprise_props{ + // si_props + // Exclude these; setting up s3_imposter is too complex for this test + // * kafka::topic_property_recovery + // * kafka::topic_property_read_replica + with(kafka::topic_property_remote_read, true), + with(kafka::topic_property_remote_write, true)}; + + const int32_t partitions = 3; + + for (const auto& [prop, props] : enterprise_props) { + BOOST_TEST_CONTEXT(fmt::format("property: {}", prop)) { + auto tp = model::topic{ssx::sformat("{}", prop)}; + + auto c_res = create_topic(tp, props, 3).data; + BOOST_REQUIRE_EQUAL(c_res.topics.size(), 1); + BOOST_REQUIRE_EQUAL( + c_res.topics[0].error_code, kafka::error_code::none); + + revoke_license(); + + auto res = create_partitions(tp, partitions + 1).data; + BOOST_REQUIRE_EQUAL(res.results.size(), 1); + BOOST_CHECK_EQUAL( + res.results[0].error_code, kafka::error_code::none); + + delete_topic( + model::topic_namespace{model::kafka_namespace, std::move(tp)}) + .get(); + + reinstall_license(); + } + } +} diff --git a/src/v/kafka/server/tests/create_topics_test.cc b/src/v/kafka/server/tests/create_topics_test.cc index 67f710b2d53f..8eb3f90c7524 100644 --- a/src/v/kafka/server/tests/create_topics_test.cc +++ b/src/v/kafka/server/tests/create_topics_test.cc @@ -7,6 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0 +#include "config/configuration.h" #include "config/leaders_preference.h" #include "container/fragmented_vector.h" #include "features/enterprise_feature_messages.h" @@ -489,12 +490,57 @@ FIXTURE_TEST(case_insensitive_boolean_property, create_topic_fixture) { BOOST_CHECK_EQUAL(resp.data.topics[0].name, "topic1"); } +FIXTURE_TEST(unlicensed_permit_if_config_disabled, create_topic_fixture) { + lconf().enable_schema_id_validation.set_value( + pandaproxy::schema_registry::schema_id_validation_mode::none); + lconf().cloud_storage_enabled.set_value(false); + + revoke_license(); + + using prop_t = std::map; + const auto with = [](const std::string_view prop, const auto value) { + return std::make_pair( + prop, prop_t{{ss::sstring{prop}, ssx::sformat("{}", value)}}); + }; + std::vector> enterprise_props{ + // si_props + with(kafka::topic_property_remote_read, true), + with(kafka::topic_property_remote_write, true), + // schema id validation + with(kafka::topic_property_record_key_schema_id_validation, true), + with(kafka::topic_property_record_key_schema_id_validation_compat, true), + with(kafka::topic_property_record_value_schema_id_validation, true), + with( + kafka::topic_property_record_value_schema_id_validation_compat, true)}; + + auto client = make_kafka_client().get(); + client.connect().get(); + + for (const auto& [name, props] : enterprise_props) { + auto topic = make_topic( + ssx::sformat("topic_{}", name), std::nullopt, std::nullopt, props); + + auto resp + = client.dispatch(make_req({topic}), kafka::api_version(5)).get(); + + BOOST_CHECK_EQUAL( + resp.data.topics[0].error_code, kafka::error_code::none); + } +} + FIXTURE_TEST(unlicensed_rejected, create_topic_fixture) { // NOTE(oren): w/o schema validation enabled at the cluster level, related // properties will be ignored on the topic create path. stick to COMPAT here // because it's a superset of REDPANDA. lconf().enable_schema_id_validation.set_value( pandaproxy::schema_registry::schema_id_validation_mode::compat); + lconf().cloud_storage_enabled.set_value(true); + + auto unset_cloud_storage = ss::defer([&] { + lconf().enable_schema_id_validation.set_value( + pandaproxy::schema_registry::schema_id_validation_mode::none); + lconf().cloud_storage_enabled.set_value(false); + }); revoke_license(); using prop_t = std::map; @@ -544,6 +590,9 @@ FIXTURE_TEST(unlicensed_rejected, create_topic_fixture) { } FIXTURE_TEST(unlicensed_reject_defaults, create_topic_fixture) { + lconf().cloud_storage_enabled.set_value(true); + auto unset_cloud_storage = ss::defer( + [&] { lconf().cloud_storage_enabled.set_value(false); }); revoke_license(); const std::initializer_list si_configs{ diff --git a/tests/rptest/tests/license_enforcement_test.py b/tests/rptest/tests/license_enforcement_test.py index bb6f12bef562..8f93afeed8a9 100644 --- a/tests/rptest/tests/license_enforcement_test.py +++ b/tests/rptest/tests/license_enforcement_test.py @@ -12,9 +12,9 @@ from ducktape.mark import matrix from rptest.services.cluster import cluster -from rptest.clients.rpk import RpkTool +from rptest.clients.rpk import RpkTool, RpkException from rptest.services.admin import Admin -from rptest.services.redpanda import LoggingConfig +from rptest.services.redpanda import LoggingConfig, SISettings from rptest.tests.redpanda_test import RedpandaTest from rptest.services.redpanda_installer import RedpandaInstaller from rptest.utils.mode_checks import skip_fips_mode @@ -187,3 +187,86 @@ def test_enterprise_cluster_bootstrap(self, root_driven_bootstrap): timeout_sec=60, backoff_sec=1, err_msg="The cluster hasn't stabilized") + + +class LicenseEnforcementPermittedTopicParams(RedpandaTest): + """ + Tests that validate that topics properties whose controlling cluster config + is disabled do not cause any issues in regards to license enforcement. + """ + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.rpk = RpkTool(self.redpanda) + + def setUp(self): + pass + + @cluster(num_nodes=3) + @matrix(enable_cloud_storage=[False, True]) + def test_cloud_storage_topic_params(self, enable_cloud_storage): + """ + This test verifies that if a license isn't installed and `cloud_storage_enabled` + is set to `False`, then topics may be created with TS settingss set to true, e.g. + `redpanda.remote.write`. + """ + if enable_cloud_storage: + si_settings = SISettings(self.test_context) + self.redpanda.set_si_settings(si_settings) + + super().setUp() + + self.redpanda.set_environment( + {'__REDPANDA_DISABLE_BUILTIN_TRIAL_LICENSE': True}) + self.redpanda.restart_nodes(self.redpanda.nodes) + self.redpanda.wait_until(self.redpanda.healthy, + timeout_sec=60, + backoff_sec=1, + err_msg="The cluster hasn't stabilized") + + try: + self.rpk.create_topic("test", + config={"redpanda.remote.write": "true"}) + assert not enable_cloud_storage, "Should have failed to create topic with redpanda.remote.write set and cloud_storage_enabled set to True" + except RpkException as e: + assert enable_cloud_storage, f"Should not have failed to create topic with redpanda.remote.write set and cloud_storage_enabled set to False: {e}" + + @cluster(num_nodes=3) + def test_upgrade_with_topic_configs(self): + """ + This test verifies that if a license isn't installed and `cloud_storage_enabled` + is set to `False` and topics exist with tiered storage capabilities, the upgrade + will still succeed + """ + installer = self.redpanda._installer + prev_version = installer.highest_from_prior_feature_version( + RedpandaInstaller.HEAD) + latest_version = installer.head_version() + self.logger.info( + f"Testing with versions: {prev_version=} {latest_version=}") + + self.logger.info(f"Starting all nodes with version: {prev_version}") + installer.install(self.redpanda.nodes, prev_version) + self.redpanda.start(nodes=self.redpanda.nodes, + omit_seeds_on_idx_one=False) + self.redpanda.wait_until(self.redpanda.healthy, + timeout_sec=60, + backoff_sec=1, + err_msg="The cluster hasn't stabilized") + self.logger.debug( + "Creating a topic with redpanda.remote.write set to true") + self.rpk.create_topic("test", config={"redpanda.remote.write": "true"}) + self.logger.info( + "Disabling the trial license to simulate that the license expired") + self.redpanda.set_environment( + {'__REDPANDA_DISABLE_BUILTIN_TRIAL_LICENSE': True}) + self.redpanda.restart_nodes(self.redpanda.nodes) + self.redpanda.wait_until(self.redpanda.healthy, + timeout_sec=60, + backoff_sec=1, + err_msg="The cluster hasn't stabilized") + + installer.install(self.redpanda.nodes, latest_version) + self.redpanda.start(nodes=self.redpanda.nodes, + auto_assign_node_id=True, + omit_seeds_on_idx_one=False)