diff --git a/src/go/rpk/pkg/cli/topic/config.go b/src/go/rpk/pkg/cli/topic/config.go index 81d6ee5e44c63..68d4a9f67bde3 100644 --- a/src/go/rpk/pkg/cli/topic/config.go +++ b/src/go/rpk/pkg/cli/topic/config.go @@ -76,6 +76,8 @@ valid, but does not apply it. // at the same time, so we issue first the set request for write, // then the rest of the requests. // See https://github.com/redpanda-data/redpanda/issues/9191 + // TODO: Remove this once v24.2 is EOL. + // See https://github.com/redpanda-data/redpanda/pull/23545. _, isRRR := setKVs["redpanda.remote.read"] wv, isRRW := setKVs["redpanda.remote.write"] rrwErrors := make(map[string]int16) diff --git a/src/v/cluster/tests/serialization_rt_test.cc b/src/v/cluster/tests/serialization_rt_test.cc index 6f43a7470553a..0d75fb08fc9b8 100644 --- a/src/v/cluster/tests/serialization_rt_test.cc +++ b/src/v/cluster/tests/serialization_rt_test.cc @@ -843,25 +843,27 @@ SEASTAR_THREAD_TEST_CASE(serde_reflection_roundtrip) { [] { return random_generators::get_int(0, 100000); }))); } { - cluster::incremental_topic_updates updates{ - .compression = random_property_update( - tests::random_optional([] { return model::random_compression(); })), - .cleanup_policy_bitflags = random_property_update( - tests::random_optional( - [] { return model::random_cleanup_policy(); })), - .compaction_strategy = random_property_update(tests::random_optional( - [] { return model::random_compaction_strategy(); })), - .timestamp_type = random_property_update(tests::random_optional( - [] { return model::random_timestamp_type(); })), - .segment_size = random_property_update(tests::random_optional( - [] { return random_generators::get_int(100_MiB, 1_GiB); })), - .retention_bytes = random_property_update(tests::random_tristate( - [] { return random_generators::get_int(100_MiB, 1_GiB); })), - .retention_duration = random_property_update( - tests::random_tristate([] { return tests::random_duration_ms(); })), - .shadow_indexing = random_property_update(tests::random_optional( - [] { return model::random_shadow_indexing_mode(); })), - .remote_delete = random_property_update(tests::random_bool())}; + cluster::incremental_topic_updates updates; + updates.compression = random_property_update( + tests::random_optional([] { return model::random_compression(); })); + updates.cleanup_policy_bitflags = random_property_update( + tests::random_optional( + [] { return model::random_cleanup_policy(); })); + updates.compaction_strategy = random_property_update( + tests::random_optional( + [] { return model::random_compaction_strategy(); })); + updates.timestamp_type = random_property_update(tests::random_optional( + [] { return model::random_timestamp_type(); })); + updates.segment_size = random_property_update(tests::random_optional( + [] { return random_generators::get_int(100_MiB, 1_GiB); })); + updates.retention_bytes = random_property_update(tests::random_tristate( + [] { return random_generators::get_int(100_MiB, 1_GiB); })); + updates.retention_duration = random_property_update( + tests::random_tristate([] { return tests::random_duration_ms(); })); + updates.remote_delete = random_property_update(tests::random_bool()); + updates.get_shadow_indexing() = random_property_update( + tests::random_optional( + [] { return model::random_shadow_indexing_mode(); })); roundtrip_test(updates); } { roundtrip_test(old_random_topic_configuration()); } @@ -2061,23 +2063,26 @@ SEASTAR_THREAD_TEST_CASE(serde_reflection_roundtrip) { auto val_validation = tests::random_bool(); auto val_strategy = tests::random_subject_name_strategy(); - cluster::incremental_topic_updates updates{ - .record_key_schema_id_validation = random_property_update( - tests::random_optional([=] { return key_validation; })), - .record_key_schema_id_validation_compat = random_property_update( - tests::random_optional([=] { return key_validation; })), - .record_key_subject_name_strategy = random_property_update( - tests::random_optional([=] { return key_strategy; })), - .record_key_subject_name_strategy_compat = random_property_update( - tests::random_optional([=] { return key_strategy; })), - .record_value_schema_id_validation = random_property_update( - tests::random_optional([=] { return val_validation; })), - .record_value_schema_id_validation_compat = random_property_update( - tests::random_optional([=] { return val_validation; })), - .record_value_subject_name_strategy = random_property_update( - tests::random_optional([=] { return val_strategy; })), - .record_value_subject_name_strategy_compat = random_property_update( - tests::random_optional([=] { return val_strategy; }))}; + cluster::incremental_topic_updates updates; + updates.record_key_schema_id_validation = random_property_update( + tests::random_optional([=] { return key_validation; })); + updates.record_key_schema_id_validation_compat = random_property_update( + tests::random_optional([=] { return key_validation; })); + updates.record_key_subject_name_strategy = random_property_update( + tests::random_optional([=] { return key_strategy; })); + updates.record_key_subject_name_strategy_compat + = random_property_update( + tests::random_optional([=] { return key_strategy; })); + updates.record_value_schema_id_validation = random_property_update( + tests::random_optional([=] { return val_validation; })); + updates.record_value_schema_id_validation_compat + = random_property_update( + tests::random_optional([=] { return val_validation; })); + updates.record_value_subject_name_strategy = random_property_update( + tests::random_optional([=] { return val_strategy; })); + updates.record_value_subject_name_strategy_compat + = random_property_update( + tests::random_optional([=] { return val_strategy; })); roundtrip_test(updates); } diff --git a/src/v/cluster/topic_table.cc b/src/v/cluster/topic_table.cc index 5b50d224c42da..8ef62a8118362 100644 --- a/src/v/cluster/topic_table.cc +++ b/src/v/cluster/topic_table.cc @@ -800,10 +800,20 @@ void incremental_update( } } +// This is a deprecated (as of `v24.3`) function here for legacy purposes. Bug +// prone. Utilize `incremental_update` overload below for `remote_read` and +// `remote_write` updates. See: +// https://github.com/redpanda-data/redpanda/issues/9191 +// https://github.com/redpanda-data/redpanda/pull/23220 template<> void incremental_update( std::optional& property, property_update> override) { + if (override.op != incremental_update_operation::none) { + vlog( + clusterlog.trace, + "Performing deprecated incremental_update to shadow_indexing_mode"); + } switch (override.op) { case incremental_update_operation::remove: if (!override.value || !property) { @@ -831,6 +841,37 @@ void incremental_update( } } +void incremental_update( + std::optional& property, + property_update overrides, + model::shadow_indexing_mode m) { + switch (overrides.op) { + case incremental_update_operation::remove: { + // This codepath is currently unused, as remove operation at Kafka layer + // causes a set operation to the default cluster value. + if (!property.has_value()) { + break; + } + auto simode = property.value(); + property = model::add_shadow_indexing_flag( + simode, model::negate_shadow_indexing_flag(m)); + return; + } + case incremental_update_operation::set: { + // set new value + auto simode = property.value_or(model::shadow_indexing_mode::disabled); + auto si_flag_update = overrides.value + ? m + : model::negate_shadow_indexing_flag(m); + property = model::add_shadow_indexing_flag(simode, si_flag_update); + return; + } + case incremental_update_operation::none: + // do nothing + return; + } +} + template void incremental_update( tristate& property, property_update> override) { @@ -898,8 +939,6 @@ topic_table::apply(update_topic_properties_cmd cmd, model::offset o) { incremental_update(updated_properties.segment_size, overrides.segment_size); incremental_update( updated_properties.timestamp_type, overrides.timestamp_type); - incremental_update( - updated_properties.shadow_indexing, overrides.shadow_indexing); incremental_update( updated_properties.batch_max_bytes, overrides.batch_max_bytes); incremental_update( @@ -908,6 +947,25 @@ topic_table::apply(update_topic_properties_cmd cmd, model::offset o) { incremental_update( updated_properties.retention_local_target_ms, overrides.retention_local_target_ms); + // These tiered storage properties shouldn't be set at the + // same time, due to feature gating from + // `shadow_indexing_split_topic_property_update`, but still set the + // deprecated update to `none` as a sanity check. + if ( + overrides.remote_read.op != incremental_update_operation::none + || overrides.remote_write.op != incremental_update_operation::none) { + overrides.get_shadow_indexing().op = incremental_update_operation::none; + } + incremental_update( + updated_properties.shadow_indexing, overrides.get_shadow_indexing()); + incremental_update( + updated_properties.shadow_indexing, + overrides.remote_read, + model::shadow_indexing_mode::fetch); + incremental_update( + updated_properties.shadow_indexing, + overrides.remote_write, + model::shadow_indexing_mode::archival); incremental_update( updated_properties.remote_delete, overrides.remote_delete, diff --git a/src/v/cluster/types.cc b/src/v/cluster/types.cc index 3957f75021724..c1ef976187f6c 100644 --- a/src/v/cluster/types.cc +++ b/src/v/cluster/types.cc @@ -348,7 +348,8 @@ std::ostream& operator<<(std::ostream& o, const incremental_topic_updates& i) { "record_value_subject_name_strategy_compat: {}, " "initial_retention_local_target_bytes: {}, " "initial_retention_local_target_ms: {}, write_caching: {}, flush_ms: {}, " - "flush_bytes: {}, iceberg_enabled: {}, leaders_preference: {}", + "flush_bytes: {}, iceberg_enabled: {}, leaders_preference: {}, " + "remote_read: {}, remote_write: {}", i.compression, i.cleanup_policy_bitflags, i.compaction_strategy, @@ -356,7 +357,7 @@ std::ostream& operator<<(std::ostream& o, const incremental_topic_updates& i) { i.segment_size, i.retention_bytes, i.retention_duration, - i.shadow_indexing, + i.get_shadow_indexing(), i.batch_max_bytes, i.retention_local_target_bytes, i.retention_local_target_ms, @@ -376,7 +377,9 @@ std::ostream& operator<<(std::ostream& o, const incremental_topic_updates& i) { i.flush_ms, i.flush_bytes, i.iceberg_enabled, - i.leaders_preference); + i.leaders_preference, + i.remote_read, + i.remote_write); return o; } @@ -1162,7 +1165,7 @@ void adl::to( t.segment_size, t.retention_bytes, t.retention_duration, - t.shadow_indexing, + t.get_shadow_indexing(), t.batch_max_bytes, t.retention_local_target_bytes, t.retention_local_target_ms, @@ -1239,9 +1242,9 @@ adl::from(iobuf_parser& in) { if ( version <= cluster::incremental_topic_updates::version_with_shadow_indexing) { - updates.shadow_indexing = adl>>{} - .from(in); + .from(in); } if ( diff --git a/src/v/cluster/types.h b/src/v/cluster/types.h index cf3bfbe13d1d7..f28b348815500 100644 --- a/src/v/cluster/types.h +++ b/src/v/cluster/types.h @@ -595,11 +595,14 @@ struct incremental_topic_updates property_update> segment_size; property_update> retention_bytes; property_update> retention_duration; - property_update> shadow_indexing; property_update> batch_max_bytes; property_update> retention_local_target_bytes; property_update> retention_local_target_ms; + property_update remote_read{ + false, incremental_update_operation::none}; + property_update remote_write{ + false, incremental_update_operation::none}; property_update remote_delete{ false, incremental_update_operation::none}; property_update> segment_ms; @@ -632,6 +635,11 @@ struct incremental_topic_updates property_update> leaders_preference; + // To allow us to better control use of the deprecated shadow_indexing + // field, use getters and setters instead. + const auto& get_shadow_indexing() const { return shadow_indexing; } + auto& get_shadow_indexing() { return shadow_indexing; } + auto serde_fields() { return std::tie( compression, @@ -661,7 +669,9 @@ struct incremental_topic_updates flush_ms, flush_bytes, iceberg_enabled, - leaders_preference); + leaders_preference, + remote_read, + remote_write); } friend std::ostream& @@ -670,6 +680,11 @@ struct incremental_topic_updates friend bool operator==( const incremental_topic_updates&, const incremental_topic_updates&) = default; + +private: + // This field is kept here for legacy purposes, but should be considered + // deprecated in favour of remote_read and remote_write. + property_update> shadow_indexing; }; using replication_factor diff --git a/src/v/compat/cluster_compat.h b/src/v/compat/cluster_compat.h index 744c36a2ad356..648a2d261ef85 100644 --- a/src/v/compat/cluster_compat.h +++ b/src/v/compat/cluster_compat.h @@ -631,7 +631,7 @@ GEN_COMPAT_CHECK( json_write(segment_size); json_write(retention_bytes); json_write(retention_duration); - json_write(shadow_indexing); + json_write(get_shadow_indexing()); json_write(remote_delete); }, { @@ -642,7 +642,7 @@ GEN_COMPAT_CHECK( json_read(segment_size); json_read(retention_bytes); json_read(retention_duration); - json_read(shadow_indexing); + json_read(get_shadow_indexing()); json_read(remote_delete); }) diff --git a/src/v/compat/cluster_generator.h b/src/v/compat/cluster_generator.h index f64d1d8aa04d3..c7952e9f8baa0 100644 --- a/src/v/compat/cluster_generator.h +++ b/src/v/compat/cluster_generator.h @@ -759,52 +759,52 @@ struct instance_generator { template<> struct instance_generator { static cluster::incremental_topic_updates random() { - return { - .compression = random_property_update([] { - return tests::random_optional([] { - return instance_generator::random(); - }); - }), - .cleanup_policy_bitflags = random_property_update([] { - return tests::random_optional([] { - return instance_generator< - model::cleanup_policy_bitflags>::random(); - }); - }), - .compaction_strategy = random_property_update([] { - return tests::random_optional([] { - return instance_generator< - model::compaction_strategy>::random(); - }); - }), - .timestamp_type = random_property_update([] { - return tests::random_optional([] { - return instance_generator::random(); - }); - }), - .segment_size = random_property_update([] { - return tests::random_optional( - [] { return random_generators::get_int(); }); - }), - .retention_bytes = random_property_update([] { - return tests::random_tristate( - [] { return random_generators::get_int(); }); - }), - .retention_duration = random_property_update([] { - return tests::random_tristate( - [] { return tests::random_duration_ms(); }); - }), - .shadow_indexing = random_property_update([] { - return tests::random_optional([] { - return instance_generator< - model::shadow_indexing_mode>::random(); - }); - }), - .remote_delete = random_property_update([] { - // Enable ADL roundtrip, which always decodes as false - // for legacy topics - return false; - })}; + cluster::incremental_topic_updates updates; + updates.compression = random_property_update([] { + return tests::random_optional( + [] { return instance_generator::random(); }); + }); + updates.cleanup_policy_bitflags = random_property_update([] { + return tests::random_optional([] { + return instance_generator< + model::cleanup_policy_bitflags>::random(); + }); + }); + updates.compaction_strategy = random_property_update([] { + return tests::random_optional([] { + return instance_generator::random(); + }); + }); + updates.timestamp_type = random_property_update([] { + return tests::random_optional([] { + return instance_generator::random(); + }); + }); + updates.segment_size = random_property_update([] { + return tests::random_optional( + [] { return random_generators::get_int(); }); + }); + updates.retention_bytes = random_property_update([] { + return tests::random_tristate( + [] { return random_generators::get_int(); }); + }); + updates.retention_duration = random_property_update([] { + return tests::random_tristate( + [] { return tests::random_duration_ms(); }); + }); + updates.get_shadow_indexing() = random_property_update([] { + return tests::random_optional([] { + return instance_generator< + model::shadow_indexing_mode>::random(); + }); + }); + updates.remote_delete = random_property_update([] { + // Enable ADL roundtrip, which always decodes as false + // for legacy topics + return false; + }); + + return updates; } static std::vector limits() { diff --git a/src/v/compat/cluster_json.h b/src/v/compat/cluster_json.h index 31427c78f491d..69fda674a21b2 100644 --- a/src/v/compat/cluster_json.h +++ b/src/v/compat/cluster_json.h @@ -757,7 +757,7 @@ inline void rjson_serialize( write_member(w, "segment_size", itu.segment_size); write_member(w, "retention_bytes", itu.retention_bytes); write_member(w, "retention_duration", itu.retention_duration); - write_member(w, "shadow_indexing", itu.shadow_indexing); + write_member(w, "shadow_indexing", itu.get_shadow_indexing()); write_member(w, "remote_delete", itu.remote_delete); write_member(w, "segment_ms", itu.segment_ms); w.EndObject(); @@ -772,7 +772,7 @@ read_value(const json::Value& rd, cluster::incremental_topic_updates& itu) { read_member(rd, "segment_size", itu.segment_size); read_member(rd, "retention_bytes", itu.retention_bytes); read_member(rd, "retention_duration", itu.retention_duration); - read_member(rd, "shadow_indexing", itu.shadow_indexing); + read_member(rd, "shadow_indexing", itu.get_shadow_indexing()); read_member(rd, "remote_delete", itu.remote_delete); read_member(rd, "segment_ms", itu.segment_ms); } diff --git a/src/v/features/feature_table.cc b/src/v/features/feature_table.cc index bae0292283a08..658334022083b 100644 --- a/src/v/features/feature_table.cc +++ b/src/v/features/feature_table.cc @@ -120,6 +120,8 @@ std::string_view to_string_view(feature f) { return "remote_labels"; case feature::partition_properties_stm: return "partition_properties_stm"; + case feature::shadow_indexing_split_topic_property_update: + return "shadow_indexing_split_topic_property_update"; /* * testing features diff --git a/src/v/features/feature_table.h b/src/v/features/feature_table.h index ceaed2d436353..27f37962d0e66 100644 --- a/src/v/features/feature_table.h +++ b/src/v/features/feature_table.h @@ -82,6 +82,7 @@ enum class feature : std::uint64_t { transforms_specify_offset = 1ULL << 50U, remote_labels = 1ULL << 51U, partition_properties_stm = 1ULL << 52U, + shadow_indexing_split_topic_property_update = 1ULL << 53U, // Dummy features for testing only test_alpha = 1ULL << 61U, @@ -477,11 +478,17 @@ constexpr static std::array feature_schema{ feature_spec::available_policy::always, feature_spec::prepare_policy::always}, feature_spec{ - cluster::cluster_version{14}, + release_version::v24_3_1, "partition_properties_stm", feature::partition_properties_stm, feature_spec::available_policy::always, feature_spec::prepare_policy::always}, + feature_spec{ + release_version::v24_3_1, + "shadow_indexing_split_topic_property_update", + feature::shadow_indexing_split_topic_property_update, + feature_spec::available_policy::always, + feature_spec::prepare_policy::always}, }; std::string_view to_string_view(feature); diff --git a/src/v/kafka/server/handlers/alter_configs.cc b/src/v/kafka/server/handlers/alter_configs.cc index cd454cef89c2f..69f8a031c256b 100644 --- a/src/v/kafka/server/handlers/alter_configs.cc +++ b/src/v/kafka/server/handlers/alter_configs.cc @@ -37,6 +37,11 @@ #include namespace kafka { +// Legacy function, bug prone for multiple property updates, i.e +// alter-config --set redpanda.remote.read=true --set +// redpanda.remote.write=false. +// Used if feature flag shadow_indexing_split_topic_property_update (v24.3) is +// not active. static void parse_and_set_shadow_indexing_mode( cluster::property_update>& property_update, @@ -56,7 +61,8 @@ static void parse_and_set_shadow_indexing_mode( } checked -create_topic_properties_update(alter_configs_resource& resource) { +create_topic_properties_update( + const request_context& ctx, alter_configs_resource& resource) { using op_t = cluster::incremental_update_operation; model::topic_namespace tp_ns( @@ -77,7 +83,7 @@ create_topic_properties_update(alter_configs_resource& resource) { std::apply(apply_op(op_t::none), update.custom_properties.serde_fields()); static_assert( - std::tuple_size_v == 28, + std::tuple_size_v == 30, "If you added a property, please decide on it's default alter config " "policy, and handle the update in the loop below"); static_assert( @@ -85,6 +91,13 @@ create_topic_properties_update(alter_configs_resource& resource) { "If you added a property, please decide on it's default alter config " "policy, and handle the update in the loop below"); + /* + As of v24.3, a new update path for shadow indexing properties should be + used. + */ + const auto shadow_indexing_split_update + = ctx.feature_table().local().is_active( + features::feature::shadow_indexing_split_topic_property_update); /** * The shadow_indexing properties ('redpanda.remote.(read|write|delete)') * are special "sticky" topic properties that are always set as a @@ -93,9 +106,15 @@ create_topic_properties_update(alter_configs_resource& resource) { * * See: https://github.com/redpanda-data/redpanda/issues/7451 */ - update.properties.shadow_indexing.op = op_t::none; + update.properties.remote_read.op = op_t::none; + update.properties.remote_write.op = op_t::none; update.properties.remote_delete.op = op_t::none; + // Legacy + auto& update_properties_shadow_indexing + = update.properties.get_shadow_indexing(); + update_properties_shadow_indexing.op = op_t::none; + // Now that the defaults are set, continue to set properties from the // request @@ -147,6 +166,52 @@ create_topic_properties_update(alter_configs_resource& resource) { kafka::config_resource_operation::set); continue; } + if (cfg.name == topic_property_remote_read) { + if (shadow_indexing_split_update) { + parse_and_set_bool( + tp_ns, + update.properties.remote_read, + cfg.value, + kafka::config_resource_operation::set, + config::shard_local_cfg() + .cloud_storage_enable_remote_read()); + + } else { + // Legacy update for shadow indexing field + auto set_value + = update_properties_shadow_indexing.value + ? model::add_shadow_indexing_flag( + *update_properties_shadow_indexing.value, + model::shadow_indexing_mode::fetch) + : model::shadow_indexing_mode::fetch; + parse_and_set_shadow_indexing_mode( + update_properties_shadow_indexing, cfg.value, set_value); + } + continue; + } + if (cfg.name == topic_property_remote_write) { + if (shadow_indexing_split_update) { + parse_and_set_bool( + tp_ns, + update.properties.remote_write, + cfg.value, + kafka::config_resource_operation::set, + config::shard_local_cfg() + .cloud_storage_enable_remote_write()); + } else { + // Legacy update for shadow indexing field + auto set_value + = update_properties_shadow_indexing.value + ? model::add_shadow_indexing_flag( + *update_properties_shadow_indexing.value, + model::shadow_indexing_mode::archival) + : model::shadow_indexing_mode::archival; + parse_and_set_shadow_indexing_mode( + update_properties_shadow_indexing, cfg.value, set_value); + } + + continue; + } if (cfg.name == topic_property_remote_delete) { parse_and_set_bool( tp_ns, @@ -163,26 +228,6 @@ create_topic_properties_update(alter_configs_resource& resource) { kafka::config_resource_operation::set); continue; } - if (cfg.name == topic_property_remote_write) { - auto set_value = update.properties.shadow_indexing.value - ? model::add_shadow_indexing_flag( - *update.properties.shadow_indexing.value, - model::shadow_indexing_mode::archival) - : model::shadow_indexing_mode::archival; - parse_and_set_shadow_indexing_mode( - update.properties.shadow_indexing, cfg.value, set_value); - continue; - } - if (cfg.name == topic_property_remote_read) { - auto set_value = update.properties.shadow_indexing.value - ? model::add_shadow_indexing_flag( - *update.properties.shadow_indexing.value, - model::shadow_indexing_mode::fetch) - : model::shadow_indexing_mode::fetch; - parse_and_set_shadow_indexing_mode( - update.properties.shadow_indexing, cfg.value, set_value); - continue; - } if (cfg.name == topic_property_retention_duration) { parse_and_set_tristate( update.properties.retention_duration, @@ -329,8 +374,11 @@ alter_topic_configuration( return do_alter_topics_configuration< alter_configs_resource, alter_configs_resource_response>( - ctx, std::move(resources), validate_only, [](alter_configs_resource& r) { - return create_topic_properties_update(r); + ctx, + std::move(resources), + validate_only, + [&ctx](alter_configs_resource& r) { + return create_topic_properties_update(ctx, r); }); } diff --git a/src/v/kafka/server/handlers/configs/config_utils.h b/src/v/kafka/server/handlers/configs/config_utils.h index 0266179e0ae62..a8c64ef62ce6e 100644 --- a/src/v/kafka/server/handlers/configs/config_utils.h +++ b/src/v/kafka/server/handlers/configs/config_utils.h @@ -34,6 +34,7 @@ #include +#include #include namespace kafka { @@ -635,10 +636,18 @@ inline void parse_and_set_bool( if (op == config_resource_operation::set && value) { try { - property.value = string_switch(*value) + // Ignore case. + auto str_value = std::move(*value); + std::transform( + str_value.begin(), + str_value.end(), + str_value.begin(), + [](const auto& c) { return std::tolower(c); }); + + property.value = string_switch(str_value) .match("true", true) .match("false", false); - auto v_error = validator(tn, *value, property.value); + auto v_error = validator(tn, str_value, property.value); if (v_error) { throw validation_error{*v_error}; } diff --git a/src/v/kafka/server/handlers/incremental_alter_configs.cc b/src/v/kafka/server/handlers/incremental_alter_configs.cc index ee76ce007739c..455e736a48580 100644 --- a/src/v/kafka/server/handlers/incremental_alter_configs.cc +++ b/src/v/kafka/server/handlers/incremental_alter_configs.cc @@ -37,10 +37,11 @@ namespace kafka { using req_resource_t = incremental_alter_configs_resource; using resp_resource_t = incremental_alter_configs_resource_response; -/** - * We pass returned value as a paramter to allow template to be automatically - * resolved. - */ +// Legacy function, bug prone for multiple property updates, i.e +// alter-config --set redpanda.remote.read=true --set +// redpanda.remote.write=false. +// Used if feature flag shadow_indexing_split_topic_property_update (v24.3) is +// not active. static void parse_and_set_shadow_indexing_mode( cluster::property_update>& simode, const std::optional& value, @@ -123,7 +124,7 @@ bool valid_config_resource_operation(uint8_t v) { checked create_topic_properties_update( - request_context&, incremental_alter_configs_resource& resource) { + const request_context& ctx, incremental_alter_configs_resource& resource) { model::topic_namespace tp_ns( model::kafka_namespace, model::topic(resource.resource_name)); cluster::topic_properties_update update(tp_ns); @@ -131,6 +132,14 @@ create_topic_properties_update( schema_id_validation_config_parser schema_id_validation_config_parser{ update.properties}; + /* + As of v24.3, a new update path for shadow indexing properties should be + used. + */ + const auto shadow_indexing_split_update + = ctx.feature_table().local().is_active( + features::feature::shadow_indexing_split_topic_property_update); + for (auto& cfg : resource.configs) { // Validate int8_t is within range of config_resource_operation // before casting (otherwise casting is undefined behaviour) @@ -189,14 +198,6 @@ create_topic_properties_update( update.properties.retention_duration, cfg.value, op); continue; } - if (cfg.name == topic_property_remote_write) { - parse_and_set_shadow_indexing_mode( - update.properties.shadow_indexing, - cfg.value, - op, - model::shadow_indexing_mode::archival); - continue; - } if (cfg.name == topic_property_retention_local_target_bytes) { parse_and_set_tristate( update.properties.retention_local_target_bytes, @@ -210,11 +211,43 @@ create_topic_properties_update( continue; } if (cfg.name == topic_property_remote_read) { - parse_and_set_shadow_indexing_mode( - update.properties.shadow_indexing, - cfg.value, - op, - model::shadow_indexing_mode::fetch); + if (shadow_indexing_split_update) { + parse_and_set_bool( + tp_ns, + update.properties.remote_read, + cfg.value, + op, + config::shard_local_cfg() + .cloud_storage_enable_remote_read()); + } else { + // Legacy update for shadow indexing field + parse_and_set_shadow_indexing_mode( + update.properties.get_shadow_indexing(), + cfg.value, + op, + model::shadow_indexing_mode::fetch); + } + + continue; + } + if (cfg.name == topic_property_remote_write) { + if (shadow_indexing_split_update) { + parse_and_set_bool( + tp_ns, + update.properties.remote_write, + cfg.value, + op, + config::shard_local_cfg() + .cloud_storage_enable_remote_write()); + } else { + // Legacy update for shadow indexing field + parse_and_set_shadow_indexing_mode( + update.properties.get_shadow_indexing(), + cfg.value, + op, + model::shadow_indexing_mode::archival); + } + continue; } if (cfg.name == topic_property_remote_delete) { diff --git a/src/v/kafka/server/tests/alter_config_test.cc b/src/v/kafka/server/tests/alter_config_test.cc index 953225ac7e092..1949a5c178165 100644 --- a/src/v/kafka/server/tests/alter_config_test.cc +++ b/src/v/kafka/server/tests/alter_config_test.cc @@ -248,7 +248,6 @@ class alter_config_test_fixture : public redpanda_thread_fixture { return res.name == key; }); BOOST_REQUIRE(cfg_it != it->configs.end()); - BOOST_REQUIRE_EQUAL(cfg_it->value, value); } }; @@ -888,3 +887,201 @@ FIXTURE_TEST(test_alter_config_internal_topic, alter_config_test_fixture) { incr_resp.data.responses[0].error_code, kafka::error_code::invalid_config); } + +FIXTURE_TEST(test_shadow_indexing_alter_configs, alter_config_test_fixture) { + wait_for_controller_leadership().get(); + model::topic test_tp{"topic-1"}; + create_topic(test_tp, 6); + using map_t = absl::flat_hash_map; + std::vector test_cases; + + { + map_t properties; + properties.emplace("redpanda.remote.write", "false"); + properties.emplace("redpanda.remote.read", "true"); + test_cases.push_back(std::move(properties)); + } + { + map_t properties; + properties.emplace("redpanda.remote.write", "true"); + properties.emplace("redpanda.remote.read", "false"); + test_cases.push_back(std::move(properties)); + } + { + map_t properties; + properties.emplace("redpanda.remote.write", "true"); + properties.emplace("redpanda.remote.read", "true"); + test_cases.push_back(std::move(properties)); + } + { + map_t properties; + properties.emplace("redpanda.remote.write", "false"); + properties.emplace("redpanda.remote.read", "false"); + test_cases.push_back(std::move(properties)); + } + + for (const auto& test_case : test_cases) { + auto resp = alter_configs( + make_alter_topic_config_resource_cv(test_tp, test_case)); + + BOOST_REQUIRE_EQUAL(resp.data.responses.size(), 1); + BOOST_REQUIRE_EQUAL( + resp.data.responses[0].error_code, kafka::error_code::none); + auto describe_resp = describe_configs(test_tp); + assert_property_value( + test_tp, + "redpanda.remote.write", + test_case.at("redpanda.remote.write"), + describe_resp); + assert_property_value( + test_tp, + "redpanda.remote.read", + test_case.at("redpanda.remote.read"), + describe_resp); + } +} + +FIXTURE_TEST( + test_shadow_indexing_incremental_alter_configs, alter_config_test_fixture) { + wait_for_controller_leadership().get(); + model::topic test_tp{"topic-1"}; + create_topic(test_tp, 6); + using map_t = absl::flat_hash_map< + ss::sstring, + std::pair, kafka::config_resource_operation>>; + std::vector test_cases; + { + map_t properties; + properties.emplace( + "redpanda.remote.write", + std::make_pair("false", kafka::config_resource_operation::set)); + properties.emplace( + "redpanda.remote.read", + std::make_pair("true", kafka::config_resource_operation::set)); + test_cases.push_back(std::move(properties)); + } + { + map_t properties; + properties.emplace( + "redpanda.remote.write", + std::make_pair("true", kafka::config_resource_operation::set)); + properties.emplace( + "redpanda.remote.read", + std::make_pair("false", kafka::config_resource_operation::set)); + test_cases.push_back(std::move(properties)); + } + { + map_t properties; + properties.emplace( + "redpanda.remote.write", + std::make_pair("true", kafka::config_resource_operation::set)); + properties.emplace( + "redpanda.remote.read", + std::make_pair("true", kafka::config_resource_operation::set)); + test_cases.push_back(std::move(properties)); + } + { + map_t properties; + properties.emplace( + "redpanda.remote.write", + std::make_pair("false", kafka::config_resource_operation::set)); + properties.emplace( + "redpanda.remote.read", + std::make_pair("false", kafka::config_resource_operation::set)); + test_cases.push_back(std::move(properties)); + } + { + map_t properties; + properties.emplace( + "redpanda.remote.write", + std::make_pair( + std::nullopt, kafka::config_resource_operation::remove)); + properties.emplace( + "redpanda.remote.read", + std::make_pair("true", kafka::config_resource_operation::set)); + test_cases.push_back(std::move(properties)); + } + { + map_t properties; + properties.emplace( + "redpanda.remote.write", + std::make_pair("true", kafka::config_resource_operation::set)); + properties.emplace( + "redpanda.remote.read", + std::make_pair( + std::nullopt, kafka::config_resource_operation::remove)); + test_cases.push_back(std::move(properties)); + } + { + map_t properties; + properties.emplace( + "redpanda.remote.write", + std::make_pair("true", kafka::config_resource_operation::set)); + properties.emplace( + "redpanda.remote.read", + std::make_pair("true", kafka::config_resource_operation::set)); + test_cases.push_back(std::move(properties)); + } + { + map_t properties; + properties.emplace( + "redpanda.remote.write", + std::make_pair( + std::nullopt, kafka::config_resource_operation::remove)); + properties.emplace( + "redpanda.remote.read", + std::make_pair( + std::nullopt, kafka::config_resource_operation::remove)); + test_cases.push_back(std::move(properties)); + } + + for (const auto& test_case : test_cases) { + auto resp = incremental_alter_configs( + make_incremental_alter_topic_config_resource_cv(test_tp, test_case)); + + BOOST_REQUIRE_EQUAL(resp.data.responses.size(), 1); + BOOST_REQUIRE_EQUAL( + resp.data.responses[0].error_code, kafka::error_code::none); + auto describe_resp = describe_configs(test_tp); + assert_property_value( + test_tp, + "redpanda.remote.write", + test_case.at("redpanda.remote.write").first.value_or("false"), + describe_resp); + assert_property_value( + test_tp, + "redpanda.remote.read", + test_case.at("redpanda.remote.read").first.value_or("false"), + describe_resp); + } +} + +FIXTURE_TEST( + test_shadow_indexing_uppercase_alter_config, alter_config_test_fixture) { + wait_for_controller_leadership().get(); + model::topic test_tp{"topic-1"}; + create_topic(test_tp, 6); + using map_t = absl::flat_hash_map< + ss::sstring, + std::pair, kafka::config_resource_operation>>; + map_t properties; + // Case is on purpose. + properties.emplace( + "redpanda.remote.write", + std::make_pair("True", kafka::config_resource_operation::set)); + properties.emplace( + "redpanda.remote.read", + std::make_pair("TRUE", kafka::config_resource_operation::set)); + + auto resp = incremental_alter_configs( + make_incremental_alter_topic_config_resource_cv(test_tp, properties)); + + BOOST_REQUIRE_EQUAL(resp.data.responses.size(), 1); + BOOST_REQUIRE_EQUAL( + resp.data.responses[0].error_code, kafka::error_code::none); + auto describe_resp = describe_configs(test_tp); + assert_property_value( + test_tp, "redpanda.remote.write", "true", describe_resp); + assert_property_value( + test_tp, "redpanda.remote.read", "true", describe_resp); +} diff --git a/src/v/migrations/cloud_storage_config.cc b/src/v/migrations/cloud_storage_config.cc index 1870b943a093e..3b100e6b7f3d5 100644 --- a/src/v/migrations/cloud_storage_config.cc +++ b/src/v/migrations/cloud_storage_config.cc @@ -135,8 +135,10 @@ ss::future<> cloud_storage_config::do_mutate() { : model::shadow_indexing_mode::fetch; } - update.properties.shadow_indexing = make_property_set( + update.properties.get_shadow_indexing() = make_property_set( std::make_optional(mode)); + update.properties.remote_read = make_property_set(remote_read); + update.properties.remote_write = make_property_set(remote_write); vlog( featureslog.info, "Updating tiered storage topic {}", i.first.tp); diff --git a/tests/rptest/clients/kcl.py b/tests/rptest/clients/kcl.py index 29d27a14f4886..63e177cf0e2e5 100644 --- a/tests/rptest/clients/kcl.py +++ b/tests/rptest/clients/kcl.py @@ -136,7 +136,12 @@ def consume(self, cmd.append(topic) return self._cmd(cmd) - def _alter_config(self, values, incremental, entity_type, entity): + def _alter_config(self, + values, + incremental, + entity_type, + entity, + node=None): """ :param broker: node id. :param values: dict of property name to new value @@ -165,7 +170,7 @@ def _alter_config(self, values, incremental, entity_type, entity): # cmd needs to be string, so handle things like broker=1 cmd.append(str(entity)) - r = self._cmd(cmd, attempts=1) + r = self._cmd(cmd, attempts=1, node=node) if 'OK' not in r: raise RuntimeError(r) else: @@ -174,8 +179,12 @@ def _alter_config(self, values, incremental, entity_type, entity): def alter_broker_config(self, values, incremental, broker=None): return self._alter_config(values, incremental, "broker", broker) - def alter_topic_config(self, values, incremental, topic): - return self._alter_config(values, incremental, "topic", topic) + def alter_topic_config(self, values, incremental, topic, node=None): + return self._alter_config(values, + incremental, + "topic", + topic, + node=node) def delete_broker_config(self, keys, incremental): """ @@ -194,7 +203,8 @@ def delete_broker_config(self, keys, incremental): def describe_topic(self, topic: str, with_docs: bool = False, - with_types: bool = False): + with_types: bool = False, + node=None): """ :param topic: the name of the topic to describe :param with_docs: if true, include documention strings in the response @@ -207,7 +217,7 @@ def describe_topic(self, if with_types: cmd.append("--with-types") - return self._cmd(cmd, attempts=1) + return self._cmd(cmd, attempts=1, node=node) def offset_delete(self, group: str, topic_partitions: dict): """ @@ -374,16 +384,17 @@ def replicas_as_int(replicas: list[str]): return ret - def _cmd(self, cmd, input=None, attempts=5): + def _cmd(self, cmd, input=None, attempts=5, node=None): """ :param attempts: how many times to try before giving up (1 for no retries) :return: stdout string """ - brokers = self._redpanda.brokers() + brokers = node.name if node is not None else self._redpanda.brokers() cmd = ["kcl", "-X", f"seed_brokers={brokers}", "--no-config-file" ] + self.sasl_options() + cmd assert attempts > 0 + self._redpanda.logger.debug(f"Executing {cmd}") for retry in reversed(range(attempts)): try: res = subprocess.check_output(cmd, diff --git a/tests/rptest/tests/alter_topic_configuration_test.py b/tests/rptest/tests/alter_topic_configuration_test.py index e72fcd729dd72..308da1974ba1a 100644 --- a/tests/rptest/tests/alter_topic_configuration_test.py +++ b/tests/rptest/tests/alter_topic_configuration_test.py @@ -6,21 +6,25 @@ # As of the Change Date specified in that file, in accordance with # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0 - import random import string +import time import subprocess -from rptest.clients.kcl import RawKCL +from rptest.services.admin import Admin +from rptest.clients.kcl import KCL, RawKCL from rptest.utils.si_utils import BucketView, NT from ducktape.utils.util import wait_until from rptest.util import wait_until_result from rptest.services.cluster import cluster -from ducktape.mark import parametrize +from ducktape.mark import parametrize, matrix from rptest.clients.kafka_cli_tools import KafkaCliTools from rptest.clients.rpk import RpkTool +from rptest.services.redpanda_installer import RedpandaVersionTriple from rptest.clients.types import TopicSpec +from rptest.tests.end_to_end import EndToEndTest +from rptest.services.redpanda_installer import InstallOptions, RedpandaInstaller from rptest.tests.redpanda_test import RedpandaTest from rptest.services.redpanda import SISettings @@ -290,14 +294,40 @@ def test_overrides_remove(self): assert altered_output["redpanda.remote.read"] == "false" assert altered_output["redpanda.remote.write"] == "false" - # delete topic configs (value from configuration should be used) + # Assert cluster values are both True + admin = Admin(self.redpanda) + cluster_conf = admin.get_cluster_config() + assert cluster_conf['cloud_storage_enable_remote_read'] == True + assert cluster_conf['cloud_storage_enable_remote_write'] == True + + # delete topic configs (value from cluster configuration should be used) self.client().delete_topic_config(topic, "redpanda.remote.read") self.client().delete_topic_config(topic, "redpanda.remote.write") + altered_output = self.client().describe_topic_configs(topic) self.logger.info(f"altered_output={altered_output}") assert altered_output["redpanda.remote.read"] == "true" assert altered_output["redpanda.remote.write"] == "true" + # Set cluster values to False + self.redpanda.set_cluster_config({ + 'cloud_storage_enable_remote_read': + False, + 'cloud_storage_enable_remote_write': + False + }) + cluster_conf = admin.get_cluster_config() + assert cluster_conf['cloud_storage_enable_remote_read'] == False + assert cluster_conf['cloud_storage_enable_remote_write'] == False + + # delete topic configs (value from cluster configuration should be used) + self.client().delete_topic_config(topic, "redpanda.remote.read") + self.client().delete_topic_config(topic, "redpanda.remote.write") + + altered_output = self.client().describe_topic_configs(topic) + assert altered_output["redpanda.remote.read"] == "false" + assert altered_output["redpanda.remote.write"] == "false" + @cluster(num_nodes=3) def test_topic_manifest_reupload(self): bucket_view = BucketView(self.redpanda) @@ -323,3 +353,172 @@ def check(): backoff_sec=1, err_msg="Topic manifest was not re-uploaded as expected", retry_on_exc=True) + + +class AlterConfigMixedNodeTest(EndToEndTest): + topics = (TopicSpec(partition_count=1, replication_factor=3), ) + + def __init__(self, ctx): + super(AlterConfigMixedNodeTest, self).__init__(test_context=ctx) + + @cluster(num_nodes=3) + @matrix(incremental_update=[True, False]) + def test_alter_config_shadow_indexing_mixed_node(self, incremental_update): + """Assert that the `AlterConfig` and `IncrementalAlterConfig` APIs still work as expected, + most notably with `redpanda.remote.read` and `redpanda.remote.write`, which have seen some + changed behavior in v24.3 and above versions of `redpanda`.""" + num_nodes = 3 + + install_opts = InstallOptions(version=RedpandaVersionTriple((24, 1, + 1))) + self.start_redpanda( + num_nodes=num_nodes, + si_settings=SISettings(test_context=self.test_context), + install_opts=install_opts, + license_required=True) + + rpk = RpkTool(self.redpanda) + # KCL is used to direct AlterConfig and DescribeConfigs requests to specific brokers. + kcl = KCL(self.redpanda) + topic = self.topics[0].name + + rpk.create_topic(topic, + partitions=1, + replicas=3, + config={ + 'redpanda.remote.read': 'false', + 'redpanda.remote.write': 'false' + }) + + # Sanity check defaults + desc = rpk.describe_topic_configs(topic) + assert desc['redpanda.remote.read'][0] == 'false' + assert desc['redpanda.remote.write'][0] == 'false' + + def check_consistent_properties_across_nodes(_): + # Cannot assert on topic configs here, as we + # know the deprecated code path is bug prone. + # Still, check that view of topic properties + # is consistent across the cluster. + node_props = [] + for node in self.redpanda.nodes: + desc = kcl.describe_topic(topic, node=node) + props = set() + for line in desc.split('\n'): + line = line.rstrip() + if 'redpanda.remote.read' in line: + props.add(line) + elif 'redpanda.remote.write' in line: + props.add(line) + assert len(props) == 2 + node_props.append(props) + return all(p == node_props[0] for p in node_props) + + def alter_and_check(func): + # Make the changes to topic properties and assert they are + # propagated across all nodes. + props_list = [{ + 'redpanda.remote.read': 'true', + 'redpanda.remote.write': 'true' + }, { + 'redpanda.remote.read': 'false', + 'redpanda.remote.write': 'false' + }, { + 'redpanda.remote.read': 'true', + 'redpanda.remote.write': 'false' + }] + for props in props_list: + kcl.alter_topic_config(props, + incremental_update, + topic, + node=self.redpanda.controller()) + wait_until(lambda: func(props) == True, + timeout_sec=10, + backoff_sec=1, + err_msg=f'Failed check {func.__name__}') + + def restart_node_and_await_stable_leader(node): + self.redpanda.restart_nodes(node) + self.redpanda.wait_for_membership(first_start=False) + self.redpanda._admin.await_stable_leader(namespace='redpanda', + topic='controller', + partition=0) + + def wait_for_controller_id(n): + health_report = self.redpanda._admin.get_cluster_health_overview( + n) + return health_report['controller_id'] != -1 + + for node in self.redpanda.nodes: + wait_until(lambda: wait_for_controller_id(node) == True, + timeout_sec=15, + backoff_sec=1, + err_msg='Controller leadership did not stabilize.') + + def check_shadow_indexing_feature_state(nodes, state): + nodes = nodes if isinstance(nodes, list) else [nodes] + for node in nodes: + assert self.redpanda.get_feature_state( + 'shadow_indexing_split_topic_property_update', + node) == state + + # Perform alterations and consistency checks across cluster with all un-upgraded nodes + alter_and_check(check_consistent_properties_across_nodes) + + # Install updated version of redpanda across all nodes. + self.redpanda._installer.install(self.redpanda.nodes, + RedpandaInstaller.HEAD) + # Restart one node. + restart_node_and_await_stable_leader(self.redpanda.nodes[0]) + + # Assert that shadow indexing feature is unavailable for the upgraded node. + check_shadow_indexing_feature_state(self.redpanda.nodes[0], + 'unavailable') + + # Perform alterations and consistency checks across cluster with only one upgraded node + alter_and_check(check_consistent_properties_across_nodes) + + # Restart the second node. + restart_node_and_await_stable_leader(self.redpanda.nodes[1]) + + # Assert that shadow indexing feature is still unavailable for the upgraded nodes + check_shadow_indexing_feature_state( + [self.redpanda.nodes[0], self.redpanda.nodes[1]], 'unavailable') + + # Perform alterations and consistency checks across cluster with two upgraded nodes + alter_and_check(check_consistent_properties_across_nodes) + + # Restart the last node. + restart_node_and_await_stable_leader(self.redpanda.nodes[2]) + + # Await the shadow indexing feature. + self.redpanda.await_feature( + 'shadow_indexing_split_topic_property_update', + 'active', + timeout_sec=15) + + # Assert that shadow indexing feature is now active. + check_shadow_indexing_feature_state(self.redpanda.nodes, 'active') + + def check_remote_read_and_write_on_nodes(props): + # We can assert on topic properties on the upgraded nodes, thanks + # to the bug fixes in the updated code path. + remote_read = props['redpanda.remote.read'] + remote_write = props['redpanda.remote.write'] + for node in self.redpanda.nodes: + remote_read_valid = False + remote_write_valid = False + desc = kcl.describe_topic(topic, node=node) + for line in desc.split('\n'): + line = line.rstrip() + if 'redpanda.remote.read' in line: + remote_read_valid = remote_read in line + if 'redpanda.remote.write' in line: + remote_write_valid = remote_write in line + valid = remote_read_valid and remote_write_valid + if not valid: + return False + return True + + # Perform alterations and strong topic property checks across cluster with all upgraded nodes + alter_and_check(check_remote_read_and_write_on_nodes) diff --git a/tests/rptest/tests/offset_for_leader_epoch_archival_test.py b/tests/rptest/tests/offset_for_leader_epoch_archival_test.py index f9ade16be8d9f..aacec55e9bbb6 100644 --- a/tests/rptest/tests/offset_for_leader_epoch_archival_test.py +++ b/tests/rptest/tests/offset_for_leader_epoch_archival_test.py @@ -11,7 +11,6 @@ from rptest.services.cluster import cluster from ducktape.mark import parametrize from ducktape.utils.util import wait_until - from rptest.services.kgo_verifier_services import KgoVerifierProducer from rptest.clients.kcl import KCL from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST, SISettings, MetricsEndpoint @@ -86,9 +85,14 @@ def test_querying_remote_partitions(self, remote_reads): epoch_offsets = {} rpk = RpkTool(self.redpanda) self.client().create_topic(topic) + remote_reads_str = 'true' if remote_reads else 'false' rpk.alter_topic_config(topic.name, "redpanda.remote.read", - str(remote_reads)) + remote_reads_str) rpk.alter_topic_config(topic.name, "redpanda.remote.write", 'true') + desc = rpk.describe_topic_configs(topic.name) + + assert desc['redpanda.remote.read'][0] == remote_reads_str + assert desc['redpanda.remote.write'][0] == 'true' def wait_for_topic(): wait_until(lambda: len(list(rpk.describe_topic(topic.name))) > 0, diff --git a/tests/rptest/tests/timequery_test.py b/tests/rptest/tests/timequery_test.py index f7a1476c703cc..a5bdbfafce1da 100644 --- a/tests/rptest/tests/timequery_test.py +++ b/tests/rptest/tests/timequery_test.py @@ -56,11 +56,14 @@ def _create_and_produce(self, cluster, cloud_storage, local_retention, if cloud_storage: for k, v in { - 'redpanda.remote.read': True, - 'redpanda.remote.write': True, + 'redpanda.remote.read': 'true', + 'redpanda.remote.write': 'true', 'retention.local.target.bytes': local_retention }.items(): self.client().alter_topic_config(topic.name, k, v) + desc = self.client().describe_topic_configs(topic.name) + assert desc['redpanda.remote.read'] == 'true' + assert desc['redpanda.remote.write'] == 'true' # Configure topic to trust client-side timestamps, so that # we can generate fake ones for the test