Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE-7743] cluster: tiered storage topic property update fixes #23545

Merged
merged 7 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/go/rpk/pkg/cli/topic/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ valid, but does not apply it.
// at the same time, so we issue first the set request for write,
// then the rest of the requests.
// See https://github.com/redpanda-data/redpanda/issues/9191
// TODO: Remove this once v24.2 is EOL.
// See https://github.com/redpanda-data/redpanda/pull/23545.
WillemKauf marked this conversation as resolved.
Show resolved Hide resolved
_, isRRR := setKVs["redpanda.remote.read"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't backport this PR because of the feature gate, but if we wanted to fix these bugs in older versions we could implement this request-splitting logic that rpk is doing inside redpanda in 24.1/24.2/23.3. It might be worth asking product if we want to do that.

To be clear, I think we should get this PR in because this seems like the right long-term fix, but since these bugs are important we might want to release a backportable fix for them as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't backport this PR because of the feature gate, but if we wanted to fix these bugs in older versions we could implement this request-splitting logic that rpk is doing inside redpanda in 24.1/24.2/23.3. It might be worth asking product if we want to do that.

I think it's a good idea, so that it can work with other tools that support setting arbitrary config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, these are pretty important bugs, it would be good to fix them in previous versions.

This should be done in a future PR, but for now my focus is on v24.3.

wv, isRRW := setKVs["redpanda.remote.write"]
rrwErrors := make(map[string]int16)
Expand Down
77 changes: 41 additions & 36 deletions src/v/cluster/tests/serialization_rt_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -843,25 +843,27 @@ SEASTAR_THREAD_TEST_CASE(serde_reflection_roundtrip) {
[] { return random_generators::get_int<size_t>(0, 100000); })));
}
{
cluster::incremental_topic_updates updates{
.compression = random_property_update(
tests::random_optional([] { return model::random_compression(); })),
.cleanup_policy_bitflags = random_property_update(
tests::random_optional(
[] { return model::random_cleanup_policy(); })),
.compaction_strategy = random_property_update(tests::random_optional(
[] { return model::random_compaction_strategy(); })),
.timestamp_type = random_property_update(tests::random_optional(
[] { return model::random_timestamp_type(); })),
.segment_size = random_property_update(tests::random_optional(
[] { return random_generators::get_int(100_MiB, 1_GiB); })),
.retention_bytes = random_property_update(tests::random_tristate(
[] { return random_generators::get_int(100_MiB, 1_GiB); })),
.retention_duration = random_property_update(
tests::random_tristate([] { return tests::random_duration_ms(); })),
.shadow_indexing = random_property_update(tests::random_optional(
[] { return model::random_shadow_indexing_mode(); })),
.remote_delete = random_property_update(tests::random_bool())};
cluster::incremental_topic_updates updates;
updates.compression = random_property_update(
tests::random_optional([] { return model::random_compression(); }));
updates.cleanup_policy_bitflags = random_property_update(
tests::random_optional(
[] { return model::random_cleanup_policy(); }));
updates.compaction_strategy = random_property_update(
tests::random_optional(
[] { return model::random_compaction_strategy(); }));
updates.timestamp_type = random_property_update(tests::random_optional(
[] { return model::random_timestamp_type(); }));
updates.segment_size = random_property_update(tests::random_optional(
[] { return random_generators::get_int(100_MiB, 1_GiB); }));
updates.retention_bytes = random_property_update(tests::random_tristate(
[] { return random_generators::get_int(100_MiB, 1_GiB); }));
updates.retention_duration = random_property_update(
tests::random_tristate([] { return tests::random_duration_ms(); }));
updates.remote_delete = random_property_update(tests::random_bool());
updates.get_shadow_indexing() = random_property_update(
tests::random_optional(
[] { return model::random_shadow_indexing_mode(); }));
roundtrip_test(updates);
}
{ roundtrip_test(old_random_topic_configuration()); }
Expand Down Expand Up @@ -2061,23 +2063,26 @@ SEASTAR_THREAD_TEST_CASE(serde_reflection_roundtrip) {
auto val_validation = tests::random_bool();
auto val_strategy = tests::random_subject_name_strategy();

cluster::incremental_topic_updates updates{
.record_key_schema_id_validation = random_property_update(
tests::random_optional([=] { return key_validation; })),
.record_key_schema_id_validation_compat = random_property_update(
tests::random_optional([=] { return key_validation; })),
.record_key_subject_name_strategy = random_property_update(
tests::random_optional([=] { return key_strategy; })),
.record_key_subject_name_strategy_compat = random_property_update(
tests::random_optional([=] { return key_strategy; })),
.record_value_schema_id_validation = random_property_update(
tests::random_optional([=] { return val_validation; })),
.record_value_schema_id_validation_compat = random_property_update(
tests::random_optional([=] { return val_validation; })),
.record_value_subject_name_strategy = random_property_update(
tests::random_optional([=] { return val_strategy; })),
.record_value_subject_name_strategy_compat = random_property_update(
tests::random_optional([=] { return val_strategy; }))};
cluster::incremental_topic_updates updates;
updates.record_key_schema_id_validation = random_property_update(
tests::random_optional([=] { return key_validation; }));
updates.record_key_schema_id_validation_compat = random_property_update(
tests::random_optional([=] { return key_validation; }));
updates.record_key_subject_name_strategy = random_property_update(
tests::random_optional([=] { return key_strategy; }));
updates.record_key_subject_name_strategy_compat
= random_property_update(
tests::random_optional([=] { return key_strategy; }));
updates.record_value_schema_id_validation = random_property_update(
tests::random_optional([=] { return val_validation; }));
updates.record_value_schema_id_validation_compat
= random_property_update(
tests::random_optional([=] { return val_validation; }));
updates.record_value_subject_name_strategy = random_property_update(
tests::random_optional([=] { return val_strategy; }));
updates.record_value_subject_name_strategy_compat
= random_property_update(
tests::random_optional([=] { return val_strategy; }));

roundtrip_test(updates);
}
Expand Down
62 changes: 60 additions & 2 deletions src/v/cluster/topic_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -800,10 +800,20 @@ void incremental_update(
}
}

// This is a deprecated (as of `v24.3`) function here for legacy purposes. Bug
// prone. Utilize `incremental_update` overload below for `remote_read` and
// `remote_write` updates. See:
// https://github.com/redpanda-data/redpanda/issues/9191
// https://github.com/redpanda-data/redpanda/pull/23220
template<>
void incremental_update(
std::optional<model::shadow_indexing_mode>& property,
property_update<std::optional<model::shadow_indexing_mode>> override) {
if (override.op != incremental_update_operation::none) {
WillemKauf marked this conversation as resolved.
Show resolved Hide resolved
vlog(
clusterlog.trace,
"Performing deprecated incremental_update to shadow_indexing_mode");
}
switch (override.op) {
case incremental_update_operation::remove:
if (!override.value || !property) {
Expand Down Expand Up @@ -831,6 +841,37 @@ void incremental_update(
}
}

void incremental_update(
std::optional<model::shadow_indexing_mode>& property,
property_update<bool> overrides,
model::shadow_indexing_mode m) {
switch (overrides.op) {
case incremental_update_operation::remove: {
// This codepath is currently unused, as remove operation at Kafka layer
// causes a set operation to the default cluster value.
if (!property.has_value()) {
break;
}
auto simode = property.value();
property = model::add_shadow_indexing_flag(
simode, model::negate_shadow_indexing_flag(m));
return;
}
case incremental_update_operation::set: {
// set new value
auto simode = property.value_or(model::shadow_indexing_mode::disabled);
auto si_flag_update = overrides.value
? m
: model::negate_shadow_indexing_flag(m);
property = model::add_shadow_indexing_flag(simode, si_flag_update);
return;
}
case incremental_update_operation::none:
// do nothing
return;
}
}

template<typename T>
void incremental_update(
tristate<T>& property, property_update<tristate<T>> override) {
Expand Down Expand Up @@ -898,8 +939,6 @@ topic_table::apply(update_topic_properties_cmd cmd, model::offset o) {
incremental_update(updated_properties.segment_size, overrides.segment_size);
incremental_update(
updated_properties.timestamp_type, overrides.timestamp_type);
incremental_update(
updated_properties.shadow_indexing, overrides.shadow_indexing);
incremental_update(
updated_properties.batch_max_bytes, overrides.batch_max_bytes);
incremental_update(
Expand All @@ -908,6 +947,25 @@ topic_table::apply(update_topic_properties_cmd cmd, model::offset o) {
incremental_update(
updated_properties.retention_local_target_ms,
overrides.retention_local_target_ms);
// These tiered storage properties shouldn't be set at the
// same time, due to feature gating from
// `shadow_indexing_split_topic_property_update`, but still set the
// deprecated update to `none` as a sanity check.
if (
overrides.remote_read.op != incremental_update_operation::none
|| overrides.remote_write.op != incremental_update_operation::none) {
overrides.get_shadow_indexing().op = incremental_update_operation::none;
}
incremental_update(
updated_properties.shadow_indexing, overrides.get_shadow_indexing());
incremental_update(
updated_properties.shadow_indexing,
overrides.remote_read,
model::shadow_indexing_mode::fetch);
incremental_update(
updated_properties.shadow_indexing,
overrides.remote_write,
model::shadow_indexing_mode::archival);
incremental_update(
updated_properties.remote_delete,
overrides.remote_delete,
Expand Down
15 changes: 9 additions & 6 deletions src/v/cluster/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -348,15 +348,16 @@ std::ostream& operator<<(std::ostream& o, const incremental_topic_updates& i) {
"record_value_subject_name_strategy_compat: {}, "
"initial_retention_local_target_bytes: {}, "
"initial_retention_local_target_ms: {}, write_caching: {}, flush_ms: {}, "
"flush_bytes: {}, iceberg_enabled: {}, leaders_preference: {}",
"flush_bytes: {}, iceberg_enabled: {}, leaders_preference: {}, "
"remote_read: {}, remote_write: {}",
i.compression,
i.cleanup_policy_bitflags,
i.compaction_strategy,
i.timestamp_type,
i.segment_size,
i.retention_bytes,
i.retention_duration,
i.shadow_indexing,
i.get_shadow_indexing(),
i.batch_max_bytes,
i.retention_local_target_bytes,
i.retention_local_target_ms,
Expand All @@ -376,7 +377,9 @@ std::ostream& operator<<(std::ostream& o, const incremental_topic_updates& i) {
i.flush_ms,
i.flush_bytes,
i.iceberg_enabled,
i.leaders_preference);
i.leaders_preference,
i.remote_read,
i.remote_write);
return o;
}

Expand Down Expand Up @@ -1162,7 +1165,7 @@ void adl<cluster::incremental_topic_updates>::to(
t.segment_size,
t.retention_bytes,
t.retention_duration,
t.shadow_indexing,
t.get_shadow_indexing(),
t.batch_max_bytes,
t.retention_local_target_bytes,
t.retention_local_target_ms,
Expand Down Expand Up @@ -1239,9 +1242,9 @@ adl<cluster::incremental_topic_updates>::from(iobuf_parser& in) {
if (
version
<= cluster::incremental_topic_updates::version_with_shadow_indexing) {
updates.shadow_indexing = adl<cluster::property_update<
updates.get_shadow_indexing() = adl<cluster::property_update<
std::optional<model::shadow_indexing_mode>>>{}
.from(in);
.from(in);
}

if (
Expand Down
19 changes: 17 additions & 2 deletions src/v/cluster/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -595,11 +595,14 @@ struct incremental_topic_updates
property_update<std::optional<size_t>> segment_size;
property_update<tristate<size_t>> retention_bytes;
property_update<tristate<std::chrono::milliseconds>> retention_duration;
property_update<std::optional<model::shadow_indexing_mode>> shadow_indexing;
property_update<std::optional<uint32_t>> batch_max_bytes;
property_update<tristate<size_t>> retention_local_target_bytes;
property_update<tristate<std::chrono::milliseconds>>
retention_local_target_ms;
property_update<bool> remote_read{
false, incremental_update_operation::none};
property_update<bool> remote_write{
false, incremental_update_operation::none};
property_update<bool> remote_delete{
false, incremental_update_operation::none};
property_update<tristate<std::chrono::milliseconds>> segment_ms;
Expand Down Expand Up @@ -632,6 +635,11 @@ struct incremental_topic_updates
property_update<std::optional<config::leaders_preference>>
leaders_preference;

// To allow us to better control use of the deprecated shadow_indexing
// field, use getters and setters instead.
const auto& get_shadow_indexing() const { return shadow_indexing; }
auto& get_shadow_indexing() { return shadow_indexing; }

auto serde_fields() {
return std::tie(
compression,
Expand Down Expand Up @@ -661,7 +669,9 @@ struct incremental_topic_updates
flush_ms,
flush_bytes,
iceberg_enabled,
leaders_preference);
leaders_preference,
remote_read,
remote_write);
}

friend std::ostream&
Expand All @@ -670,6 +680,11 @@ struct incremental_topic_updates
friend bool operator==(
const incremental_topic_updates&, const incremental_topic_updates&)
= default;

private:
// This field is kept here for legacy purposes, but should be considered
// deprecated in favour of remote_read and remote_write.
property_update<std::optional<model::shadow_indexing_mode>> shadow_indexing;
};

using replication_factor
Expand Down
4 changes: 2 additions & 2 deletions src/v/compat/cluster_compat.h
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ GEN_COMPAT_CHECK(
json_write(segment_size);
json_write(retention_bytes);
json_write(retention_duration);
json_write(shadow_indexing);
json_write(get_shadow_indexing());
json_write(remote_delete);
},
{
Expand All @@ -642,7 +642,7 @@ GEN_COMPAT_CHECK(
json_read(segment_size);
json_read(retention_bytes);
json_read(retention_duration);
json_read(shadow_indexing);
json_read(get_shadow_indexing());
json_read(remote_delete);
})

Expand Down
Loading