Skip to content

Improve migrations API #23479

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
1 change: 1 addition & 0 deletions TODO
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pass a boolean to table instead of configs; sort out test commits
6 changes: 5 additions & 1 deletion src/v/cluster/controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,11 @@ ss::future<> controller::wire_up() {
.then([this] {
_data_migration_table
= std::make_unique<data_migrations::migrations_table>(
_data_migrated_resources, std::ref(_tp_state));
_data_migrated_resources,
std::ref(_tp_state),
config::shard_local_cfg().cloud_storage_enabled()
&& config::shard_local_cfg()
.cloud_storage_disable_archiver_manager());
})
.then([this] {
return _authorizer.start(
Expand Down
11 changes: 11 additions & 0 deletions src/v/cluster/data_migration_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,17 @@ ss::future<chunked_vector<migration_metadata>> frontend::list_migrations() {
});
}

ss::future<result<migration_metadata>>
frontend::get_migration(id migration_id) {
return container().invoke_on(
data_migrations_shard, [migration_id](frontend& local) {
auto maybe_migration = local._table.get_migration(migration_id);
return maybe_migration
? result<migration_metadata>(maybe_migration->get().copy())
: errc::data_migration_not_exists;
});
}

ss::future<std::error_code> frontend::insert_barrier() {
const auto barrier_deadline = _operation_timeout
+ model::timeout_clock::now();
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/data_migration_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class frontend : public ss::peering_sharded_service<frontend> {

ss::future<check_ntp_states_reply> check_ntp_states_on_foreign_node(
model::node_id node, check_ntp_states_request&& req);

ss::future<result<migration_metadata>> get_migration(id);
ss::future<chunked_vector<migration_metadata>> list_migrations();

private:
Expand Down
29 changes: 26 additions & 3 deletions src/v/cluster/data_migration_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
namespace cluster::data_migrations {

migrations_table::migrations_table(
ss::sharded<migrated_resources>& resources, ss::sharded<topic_table>& topics)
ss::sharded<migrated_resources>& resources,
ss::sharded<topic_table>& topics,
bool enabled)
: _resources(resources)
, _topics(topics) {}
, _topics(topics)
, _enabled(enabled) {}

bool migrations_table::is_valid_state_transition(state current, state target) {
switch (current) {
Expand Down Expand Up @@ -201,6 +204,11 @@ migrations_table::apply(create_data_migration_cmd cmd) {
std::optional<migrations_table::validation_error>
migrations_table::validate_migrated_resources(
const data_migration& migration) const {
// cloud_storage_api is checked on startup
if (!_enabled) {
return validation_error{"cloud storage disabled"};
}

return ss::visit(migration, [this](const auto& migration) {
return validate_migrated_resources(migration);
});
Expand Down Expand Up @@ -237,11 +245,26 @@ std::optional<migrations_table::validation_error>
migrations_table::validate_migrated_resources(
const outbound_migration& odm) const {
for (const auto& t : odm.topics) {
if (!_topics.local().contains(t)) {
if (t.ns != model::kafka_namespace) {
return validation_error{ssx::sformat(
"topic with name {} is not in default namespace, so probably it "
"has archiver disabled",
t)};
}

auto maybe_topic_cfg = _topics.local().get_topic_cfg(t);
if (!maybe_topic_cfg) {
return validation_error{ssx::sformat(
"topic with name {} does not exists in current cluster", t)};
}

if (!model::is_archival_enabled(
maybe_topic_cfg->properties.shadow_indexing.value_or(
model::shadow_indexing_mode::disabled))) {
return validation_error{ssx::sformat(
"topic with name {} does not have archiving enabled", t)};
}

if (_resources.local().is_already_migrated(t)) {
return validation_error{ssx::sformat(
"topic with name {} is already part of active migration", t)};
Expand Down
5 changes: 4 additions & 1 deletion src/v/cluster/data_migration_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ class migrations_table {

explicit migrations_table(
ss::sharded<migrated_resources>& resources,
ss::sharded<topic_table>& topics);
ss::sharded<topic_table>& topics,
bool enabled);

using notification_id = named_type<uint64_t, struct notification_id_tag>;
using notification_callback = ss::noncopyable_function<void(id)>;
Expand Down Expand Up @@ -132,6 +133,8 @@ class migrations_table {
absl::node_hash_map<id, migration_metadata> _migrations;
ss::sharded<migrated_resources>& _resources;
ss::sharded<topic_table>& _topics;
bool _enabled;

notification_list<notification_callback, notification_id> _callbacks;
};
} // namespace cluster::data_migrations
2 changes: 1 addition & 1 deletion src/v/cluster/errc.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ struct errc_category final : public std::error_category {
case errc::invalid_data_migration_state:
return "Invalid data migration state transition requested";
case errc::data_migration_not_exists:
return "Requested data migration does not exists";
return "Requested data migration does not exist";
case errc::data_migration_already_exists:
return "Data migration with requested id already exists";
case errc::data_migration_invalid_resources:
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ ss::future<std::optional<storage::timequery_result>> partition::local_timequery(
}

bool partition::should_construct_archiver() {
// NOTE: construct and archiver even if shadow indexing isn't enabled, e.g.
// NOTE: construct an archiver even if shadow indexing isn't enabled, e.g.
// in the case of read replicas -- we still need the archiver to drive
// manifest updates, etc.
const auto& ntp_config = _raft->log()->config();
Expand Down
8 changes: 7 additions & 1 deletion src/v/cluster/tests/data_migration_table_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "cluster/data_migration_table.h"
#include "cluster/data_migration_types.h"
#include "commands.h"
#include "config/configuration.h"
#include "container/fragmented_vector.h"
#include "data_migrated_resources.h"
#include "data_migration_types.h"
Expand Down Expand Up @@ -83,11 +84,15 @@ create_groups(std::vector<std::string_view> strings) {

struct data_migration_table_fixture : public seastar_test {
ss::future<> SetUpAsync() override {
// for all new topics to be created with it
config::shard_local_cfg().cloud_storage_enable_remote_write.set_value(
true);

co_await resources.start();
co_await topics.start(ss::sharded_parameter(
[this] { return std::ref(resources.local()); }));
table = std::make_unique<cluster::data_migrations::migrations_table>(
resources, topics);
resources, topics, true);
table->register_notification([this](cluster::data_migrations::id id) {
notifications.push_back(id);
});
Expand All @@ -110,6 +115,7 @@ struct data_migration_table_fixture : public seastar_test {
auto p_cnt = random_generators::get_int(1, 64);

topic_configuration cfg(tp_ns.ns, tp_ns.tp, p_cnt, 3);
cfg.properties.shadow_indexing = model::shadow_indexing_mode::full;
ss::chunked_fifo<partition_assignment> assignments;
for (auto i = 0; i < p_cnt; ++i) {
assignments.push_back(
Expand Down
2 changes: 1 addition & 1 deletion src/v/config/property.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class property : public base_property {
, _validator(std::move(validator)) {}

/**
* Properties aren't moved in normal used on the per-shard
* Properties aren't moved in normal use on the per-shard
* cluster configuration objects. This method exists for
* use in unit tests of things like kafka client that carry
* around a config_store as a member.
Expand Down
25 changes: 25 additions & 0 deletions src/v/redpanda/admin/migrations.cc
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,13 @@ void admin_server::register_data_migration_routes() {
std::unique_ptr<ss::http::reply> reply) {
return list_data_migrations(std::move(req), std::move(reply));
});
register_route_raw_async<superuser>(
ss::httpd::migration_json::get_migration,
[this](
std::unique_ptr<ss::http::request> req,
std::unique_ptr<ss::http::reply> reply) {
return get_data_migration(std::move(req), std::move(reply));
});
register_route_raw_async<superuser>(
ss::httpd::migration_json::add_migration,
[this](
Expand Down Expand Up @@ -326,6 +333,24 @@ ss::future<std::unique_ptr<ss::http::reply>> admin_server::list_data_migrations(
co_return std::move(reply);
}

ss::future<std::unique_ptr<ss::http::reply>> admin_server::get_data_migration(
std::unique_ptr<ss::http::request> req,
std::unique_ptr<ss::http::reply> reply) {
auto id = parse_data_migration_id(*req);
auto& frontend = _controller->get_data_migration_frontend();
auto maybe_migration = co_await frontend.local().get_migration(id);
if (maybe_migration.has_value()) [[likely]] {
json::StringBuffer buf;
json::Writer<json::StringBuffer> writer(buf);
write_migration_as_json(maybe_migration.assume_value(), writer);
reply->set_status(ss::http::reply::status_type::ok, buf.GetString());
} else {
co_await throw_on_error(
*req, maybe_migration.error(), model::controller_ntp);
}
co_return std::move(reply);
}

ss::future<std::unique_ptr<ss::http::reply>> admin_server::add_data_migration(
std::unique_ptr<ss::http::request> req,
std::unique_ptr<ss::http::reply> reply) {
Expand Down
9 changes: 9 additions & 0 deletions src/v/redpanda/admin/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1096,6 +1096,15 @@ ss::future<> admin_server::throw_on_error(
config::shard_local_cfg()
.data_transforms_per_core_memory_reservation.name()));
}
case cluster::errc::invalid_data_migration_state:
case cluster::errc::data_migration_already_exists:
case cluster::errc::data_migration_invalid_resources:
throw ss::httpd::bad_request_exception(
fmt::format("{}", ec.message()));
case cluster::errc::data_migration_not_exists:
throw ss::httpd::base_exception(
fmt::format("Data migration does not exist: {}", ec.message()),
ss::http::reply::status_type::not_found);
default:
throw ss::httpd::server_error_exception(
fmt::format("Unexpected cluster error: {}", ec.message()));
Expand Down
2 changes: 2 additions & 0 deletions src/v/redpanda/admin/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,8 @@ class admin_server {
// Data migration routes
ss::future<std::unique_ptr<ss::http::reply>> list_data_migrations(
std::unique_ptr<ss::http::request>, std::unique_ptr<ss::http::reply>);
ss::future<std::unique_ptr<ss::http::reply>> get_data_migration(
std::unique_ptr<ss::http::request>, std::unique_ptr<ss::http::reply>);
ss::future<std::unique_ptr<ss::http::reply>> add_data_migration(
std::unique_ptr<ss::http::request>, std::unique_ptr<ss::http::reply>);
ss::future<ss::json::json_return_type>
Expand Down
6 changes: 6 additions & 0 deletions tests/rptest/services/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -1674,6 +1674,12 @@ def list_data_migrations(self, node: Optional[ClusterNode] = None):
path = "migrations"
return self._request("GET", path, node=node)

def get_data_migration(self,
migration_id: int,
node: Optional[ClusterNode] = None):
path = f"migrations/{migration_id}"
return self._request("GET", path, node=node)

def create_data_migration(self,
migration: InboundDataMigration
| OutboundDataMigration,
Expand Down
90 changes: 85 additions & 5 deletions tests/rptest/tests/data_migrations_api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,33 @@ def get_migrations_map(self, node=None):
migrations = self.admin.list_data_migrations(node).json()
return {migration["id"]: migration for migration in migrations}

def get_migration(self, id, node=None):
try:
return self.admin.get_data_migration(id, node).json()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
return None
else:
raise

def assure_not_deletable(self, id, node=None):
try:
self.admin.delete_data_migration(id, node)
assert False
except:
pass

def on_all_live_nodes(self, migration_id, predicate):
success_cnt = 0
exception_cnt = 0
for n in self.redpanda.nodes:
try:
map = self.get_migrations_map(n)
self.logger.debug(f"migrations on node {n.name}: {map}")
if predicate(map[migration_id] if migration_id in
map else None):
list_item = map[migration_id] if migration_id in map else None
individual = self.get_migration(migration_id, n)

if predicate(list_item) and predicate(individual):
success_cnt += 1
else:
return False
Expand All @@ -146,6 +164,9 @@ def migration_in_one_of_states():
err_msg=
f"Failed waiting for migration {id} to reach one of {states} states"
)
if all(state not in ('planned', 'finished', 'cancelled')
for state in states):
self.assure_not_deletable(id)

def wait_migration_appear(self, migration_id):
def migration_is_present(id: int):
Expand All @@ -170,8 +191,11 @@ def migration_is_absent(id: int):
def wait_partitions_appear(self, topics: list[TopicSpec]):
# we may be unlucky to query a slow node
def topic_has_all_partitions(t: TopicSpec):
return t.partition_count == \
len(self.client().describe_topic(t.name).partitions)
exp_part_cnt = len(self.client().describe_topic(t.name).partitions)
self.logger.debug(
f"topic {t.name} has {t.partition_count} partitions out of {exp_part_cnt} expected"
)
return t.partition_count == exp_part_cnt

wait_until(lambda: all(topic_has_all_partitions(t) for t in topics),
timeout_sec=90,
Expand Down Expand Up @@ -211,6 +235,62 @@ def migration_id_if_exists():
self.wait_migration_appear(migration_id)
return migration_id

def assure_not_migratable(self, topic: TopicSpec):
out_migration = OutboundDataMigration(
[make_namespaced_topic(topic.name)], consumer_groups=[])
try:
self.create_and_wait(out_migration)
assert False
except requests.exceptions.HTTPError as e:
pass

@cluster(num_nodes=3, log_allow_list=MIGRATION_LOG_ALLOW_LIST)
def test_listing_inexistent_migration(self):
try:
self.get_migration(42)
except Exception as e:
# check 404
self.logger.info("f{e}")
raise

@cluster(num_nodes=3, log_allow_list=MIGRATION_LOG_ALLOW_LIST)
def test_creating_with_topic_no_remote_writes(self):
self.redpanda.set_cluster_config(
{"cloud_storage_enable_remote_write": False}, expect_restart=True)
topic = TopicSpec(partition_count=3)
self.client().create_topic(topic)
self.wait_partitions_appear([topic])
self.redpanda.set_cluster_config(
{"cloud_storage_enable_remote_write": True}, expect_restart=True)
self.assure_not_migratable(topic)

@cluster(
num_nodes=3,
log_allow_list=MIGRATION_LOG_ALLOW_LIST + [
r'/v1/migrations.*Requested feature is disabled', # cloud storage disabled
])
def test_creating_when_cluster_misconfigured1(self):
self.creating_when_cluster_misconfigured("cloud_storage_enabled")

@cluster(
num_nodes=3,
log_allow_list=MIGRATION_LOG_ALLOW_LIST + [
r'/v1/migrations.*Requested feature is disabled', # cloud storage disabled
'archival' # a variety of archival errors is observed
])
def test_creating_when_cluster_misconfigured2(self):
self.creating_when_cluster_misconfigured("cloud_storage_enabled")

def creating_when_cluster_misconfigured(self, param_to_disable):
self.redpanda.set_cluster_config({param_to_disable: False},
expect_restart=True)
topic = TopicSpec(partition_count=3)
self.client().create_topic(topic)
self.assure_not_migratable(topic)
# for scrubbing to complete
self.redpanda.set_cluster_config({param_to_disable: True},
expect_restart=True)

@cluster(num_nodes=3, log_allow_list=MIGRATION_LOG_ALLOW_LIST)
def test_creating_and_listing_migrations(self):
topics = [TopicSpec(partition_count=3) for i in range(5)]
Expand Down Expand Up @@ -310,7 +390,7 @@ def test_creating_and_listing_migrations(self):
self.redpanda.si_settings.set_expected_damage(
{"ntr_no_topic_manifest", "missing_segments"})

@cluster(num_nodes=3)
@cluster(num_nodes=3, log_allow_list=MIGRATION_LOG_ALLOW_LIST)
def test_higher_level_migration_api(self):
topics = [TopicSpec(partition_count=3) for i in range(5)]

Expand Down