Skip to content

Commit e042836

Browse files
authored
Merge pull request #23479 from bashtanov/migrations-implement-get-single
Improve migrations API
2 parents a7190eb + 68889db commit e042836

15 files changed

+186
-14
lines changed

TODO

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pass a boolean to table instead of configs; sort out test commits

src/v/cluster/controller.cc

+5-1
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,11 @@ ss::future<> controller::wire_up() {
166166
.then([this] {
167167
_data_migration_table
168168
= std::make_unique<data_migrations::migrations_table>(
169-
_data_migrated_resources, std::ref(_tp_state));
169+
_data_migrated_resources,
170+
std::ref(_tp_state),
171+
config::shard_local_cfg().cloud_storage_enabled()
172+
&& config::shard_local_cfg()
173+
.cloud_storage_disable_archiver_manager());
170174
})
171175
.then([this] {
172176
return _authorizer.start(

src/v/cluster/data_migration_frontend.cc

+11
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,17 @@ ss::future<chunked_vector<migration_metadata>> frontend::list_migrations() {
289289
});
290290
}
291291

292+
ss::future<result<migration_metadata>>
293+
frontend::get_migration(id migration_id) {
294+
return container().invoke_on(
295+
data_migrations_shard, [migration_id](frontend& local) {
296+
auto maybe_migration = local._table.get_migration(migration_id);
297+
return maybe_migration
298+
? result<migration_metadata>(maybe_migration->get().copy())
299+
: errc::data_migration_not_exists;
300+
});
301+
}
302+
292303
ss::future<std::error_code> frontend::insert_barrier() {
293304
const auto barrier_deadline = _operation_timeout
294305
+ model::timeout_clock::now();

src/v/cluster/data_migration_frontend.h

+2
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ class frontend : public ss::peering_sharded_service<frontend> {
4646

4747
ss::future<check_ntp_states_reply> check_ntp_states_on_foreign_node(
4848
model::node_id node, check_ntp_states_request&& req);
49+
50+
ss::future<result<migration_metadata>> get_migration(id);
4951
ss::future<chunked_vector<migration_metadata>> list_migrations();
5052

5153
private:

src/v/cluster/data_migration_table.cc

+26-3
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,12 @@
2525
namespace cluster::data_migrations {
2626

2727
migrations_table::migrations_table(
28-
ss::sharded<migrated_resources>& resources, ss::sharded<topic_table>& topics)
28+
ss::sharded<migrated_resources>& resources,
29+
ss::sharded<topic_table>& topics,
30+
bool enabled)
2931
: _resources(resources)
30-
, _topics(topics) {}
32+
, _topics(topics)
33+
, _enabled(enabled) {}
3134

3235
bool migrations_table::is_valid_state_transition(state current, state target) {
3336
switch (current) {
@@ -201,6 +204,11 @@ migrations_table::apply(create_data_migration_cmd cmd) {
201204
std::optional<migrations_table::validation_error>
202205
migrations_table::validate_migrated_resources(
203206
const data_migration& migration) const {
207+
// cloud_storage_api is checked on startup
208+
if (!_enabled) {
209+
return validation_error{"cloud storage disabled"};
210+
}
211+
204212
return ss::visit(migration, [this](const auto& migration) {
205213
return validate_migrated_resources(migration);
206214
});
@@ -237,11 +245,26 @@ std::optional<migrations_table::validation_error>
237245
migrations_table::validate_migrated_resources(
238246
const outbound_migration& odm) const {
239247
for (const auto& t : odm.topics) {
240-
if (!_topics.local().contains(t)) {
248+
if (t.ns != model::kafka_namespace) {
249+
return validation_error{ssx::sformat(
250+
"topic with name {} is not in default namespace, so probably it "
251+
"has archiver disabled",
252+
t)};
253+
}
254+
255+
auto maybe_topic_cfg = _topics.local().get_topic_cfg(t);
256+
if (!maybe_topic_cfg) {
241257
return validation_error{ssx::sformat(
242258
"topic with name {} does not exists in current cluster", t)};
243259
}
244260

261+
if (!model::is_archival_enabled(
262+
maybe_topic_cfg->properties.shadow_indexing.value_or(
263+
model::shadow_indexing_mode::disabled))) {
264+
return validation_error{ssx::sformat(
265+
"topic with name {} does not have archiving enabled", t)};
266+
}
267+
245268
if (_resources.local().is_already_migrated(t)) {
246269
return validation_error{ssx::sformat(
247270
"topic with name {} is already part of active migration", t)};

src/v/cluster/data_migration_table.h

+4-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ class migrations_table {
3838

3939
explicit migrations_table(
4040
ss::sharded<migrated_resources>& resources,
41-
ss::sharded<topic_table>& topics);
41+
ss::sharded<topic_table>& topics,
42+
bool enabled);
4243

4344
using notification_id = named_type<uint64_t, struct notification_id_tag>;
4445
using notification_callback = ss::noncopyable_function<void(id)>;
@@ -132,6 +133,8 @@ class migrations_table {
132133
absl::node_hash_map<id, migration_metadata> _migrations;
133134
ss::sharded<migrated_resources>& _resources;
134135
ss::sharded<topic_table>& _topics;
136+
bool _enabled;
137+
135138
notification_list<notification_callback, notification_id> _callbacks;
136139
};
137140
} // namespace cluster::data_migrations

src/v/cluster/errc.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ struct errc_category final : public std::error_category {
274274
case errc::invalid_data_migration_state:
275275
return "Invalid data migration state transition requested";
276276
case errc::data_migration_not_exists:
277-
return "Requested data migration does not exists";
277+
return "Requested data migration does not exist";
278278
case errc::data_migration_already_exists:
279279
return "Data migration with requested id already exists";
280280
case errc::data_migration_invalid_resources:

src/v/cluster/partition.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -717,7 +717,7 @@ ss::future<std::optional<storage::timequery_result>> partition::local_timequery(
717717
}
718718

719719
bool partition::should_construct_archiver() {
720-
// NOTE: construct and archiver even if shadow indexing isn't enabled, e.g.
720+
// NOTE: construct an archiver even if shadow indexing isn't enabled, e.g.
721721
// in the case of read replicas -- we still need the archiver to drive
722722
// manifest updates, etc.
723723
const auto& ntp_config = _raft->log()->config();

src/v/cluster/tests/data_migration_table_test.cc

+7-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include "cluster/data_migration_table.h"
1212
#include "cluster/data_migration_types.h"
1313
#include "commands.h"
14+
#include "config/configuration.h"
1415
#include "container/fragmented_vector.h"
1516
#include "data_migrated_resources.h"
1617
#include "data_migration_types.h"
@@ -83,11 +84,15 @@ create_groups(std::vector<std::string_view> strings) {
8384

8485
struct data_migration_table_fixture : public seastar_test {
8586
ss::future<> SetUpAsync() override {
87+
// for all new topics to be created with it
88+
config::shard_local_cfg().cloud_storage_enable_remote_write.set_value(
89+
true);
90+
8691
co_await resources.start();
8792
co_await topics.start(ss::sharded_parameter(
8893
[this] { return std::ref(resources.local()); }));
8994
table = std::make_unique<cluster::data_migrations::migrations_table>(
90-
resources, topics);
95+
resources, topics, true);
9196
table->register_notification([this](cluster::data_migrations::id id) {
9297
notifications.push_back(id);
9398
});
@@ -110,6 +115,7 @@ struct data_migration_table_fixture : public seastar_test {
110115
auto p_cnt = random_generators::get_int(1, 64);
111116

112117
topic_configuration cfg(tp_ns.ns, tp_ns.tp, p_cnt, 3);
118+
cfg.properties.shadow_indexing = model::shadow_indexing_mode::full;
113119
ss::chunked_fifo<partition_assignment> assignments;
114120
for (auto i = 0; i < p_cnt; ++i) {
115121
assignments.push_back(

src/v/config/property.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ class property : public base_property {
8787
, _validator(std::move(validator)) {}
8888

8989
/**
90-
* Properties aren't moved in normal used on the per-shard
90+
* Properties aren't moved in normal use on the per-shard
9191
* cluster configuration objects. This method exists for
9292
* use in unit tests of things like kafka client that carry
9393
* around a config_store as a member.

src/v/redpanda/admin/migrations.cc

+25
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,13 @@ void admin_server::register_data_migration_routes() {
288288
std::unique_ptr<ss::http::reply> reply) {
289289
return list_data_migrations(std::move(req), std::move(reply));
290290
});
291+
register_route_raw_async<superuser>(
292+
ss::httpd::migration_json::get_migration,
293+
[this](
294+
std::unique_ptr<ss::http::request> req,
295+
std::unique_ptr<ss::http::reply> reply) {
296+
return get_data_migration(std::move(req), std::move(reply));
297+
});
291298
register_route_raw_async<superuser>(
292299
ss::httpd::migration_json::add_migration,
293300
[this](
@@ -326,6 +333,24 @@ ss::future<std::unique_ptr<ss::http::reply>> admin_server::list_data_migrations(
326333
co_return std::move(reply);
327334
}
328335

336+
ss::future<std::unique_ptr<ss::http::reply>> admin_server::get_data_migration(
337+
std::unique_ptr<ss::http::request> req,
338+
std::unique_ptr<ss::http::reply> reply) {
339+
auto id = parse_data_migration_id(*req);
340+
auto& frontend = _controller->get_data_migration_frontend();
341+
auto maybe_migration = co_await frontend.local().get_migration(id);
342+
if (maybe_migration.has_value()) [[likely]] {
343+
json::StringBuffer buf;
344+
json::Writer<json::StringBuffer> writer(buf);
345+
write_migration_as_json(maybe_migration.assume_value(), writer);
346+
reply->set_status(ss::http::reply::status_type::ok, buf.GetString());
347+
} else {
348+
co_await throw_on_error(
349+
*req, maybe_migration.error(), model::controller_ntp);
350+
}
351+
co_return std::move(reply);
352+
}
353+
329354
ss::future<std::unique_ptr<ss::http::reply>> admin_server::add_data_migration(
330355
std::unique_ptr<ss::http::request> req,
331356
std::unique_ptr<ss::http::reply> reply) {

src/v/redpanda/admin/server.cc

+9
Original file line numberDiff line numberDiff line change
@@ -1099,6 +1099,15 @@ ss::future<> admin_server::throw_on_error(
10991099
config::shard_local_cfg()
11001100
.data_transforms_per_core_memory_reservation.name()));
11011101
}
1102+
case cluster::errc::invalid_data_migration_state:
1103+
case cluster::errc::data_migration_already_exists:
1104+
case cluster::errc::data_migration_invalid_resources:
1105+
throw ss::httpd::bad_request_exception(
1106+
fmt::format("{}", ec.message()));
1107+
case cluster::errc::data_migration_not_exists:
1108+
throw ss::httpd::base_exception(
1109+
fmt::format("Data migration does not exist: {}", ec.message()),
1110+
ss::http::reply::status_type::not_found);
11021111
default:
11031112
throw ss::httpd::server_error_exception(
11041113
fmt::format("Unexpected cluster error: {}", ec.message()));

src/v/redpanda/admin/server.h

+2
Original file line numberDiff line numberDiff line change
@@ -655,6 +655,8 @@ class admin_server {
655655
// Data migration routes
656656
ss::future<std::unique_ptr<ss::http::reply>> list_data_migrations(
657657
std::unique_ptr<ss::http::request>, std::unique_ptr<ss::http::reply>);
658+
ss::future<std::unique_ptr<ss::http::reply>> get_data_migration(
659+
std::unique_ptr<ss::http::request>, std::unique_ptr<ss::http::reply>);
658660
ss::future<std::unique_ptr<ss::http::reply>> add_data_migration(
659661
std::unique_ptr<ss::http::request>, std::unique_ptr<ss::http::reply>);
660662
ss::future<ss::json::json_return_type>

tests/rptest/services/admin.py

+6
Original file line numberDiff line numberDiff line change
@@ -1737,6 +1737,12 @@ def list_data_migrations(self, node: Optional[ClusterNode] = None):
17371737
path = "migrations"
17381738
return self._request("GET", path, node=node)
17391739

1740+
def get_data_migration(self,
1741+
migration_id: int,
1742+
node: Optional[ClusterNode] = None):
1743+
path = f"migrations/{migration_id}"
1744+
return self._request("GET", path, node=node)
1745+
17401746
def create_data_migration(self,
17411747
migration: InboundDataMigration
17421748
| OutboundDataMigration,

tests/rptest/tests/data_migrations_api_test.py

+85-5
Original file line numberDiff line numberDiff line change
@@ -117,15 +117,33 @@ def get_migrations_map(self, node=None):
117117
migrations = self.admin.list_data_migrations(node).json()
118118
return {migration["id"]: migration for migration in migrations}
119119

120+
def get_migration(self, id, node=None):
121+
try:
122+
return self.admin.get_data_migration(id, node).json()
123+
except requests.exceptions.HTTPError as e:
124+
if e.response.status_code == 404:
125+
return None
126+
else:
127+
raise
128+
129+
def assure_not_deletable(self, id, node=None):
130+
try:
131+
self.admin.delete_data_migration(id, node)
132+
assert False
133+
except:
134+
pass
135+
120136
def on_all_live_nodes(self, migration_id, predicate):
121137
success_cnt = 0
122138
exception_cnt = 0
123139
for n in self.redpanda.nodes:
124140
try:
125141
map = self.get_migrations_map(n)
126142
self.logger.debug(f"migrations on node {n.name}: {map}")
127-
if predicate(map[migration_id] if migration_id in
128-
map else None):
143+
list_item = map[migration_id] if migration_id in map else None
144+
individual = self.get_migration(migration_id, n)
145+
146+
if predicate(list_item) and predicate(individual):
129147
success_cnt += 1
130148
else:
131149
return False
@@ -146,6 +164,9 @@ def migration_in_one_of_states():
146164
err_msg=
147165
f"Failed waiting for migration {id} to reach one of {states} states"
148166
)
167+
if all(state not in ('planned', 'finished', 'cancelled')
168+
for state in states):
169+
self.assure_not_deletable(id)
149170

150171
def wait_migration_appear(self, migration_id):
151172
def migration_is_present(id: int):
@@ -170,8 +191,11 @@ def migration_is_absent(id: int):
170191
def wait_partitions_appear(self, topics: list[TopicSpec]):
171192
# we may be unlucky to query a slow node
172193
def topic_has_all_partitions(t: TopicSpec):
173-
return t.partition_count == \
174-
len(self.client().describe_topic(t.name).partitions)
194+
exp_part_cnt = len(self.client().describe_topic(t.name).partitions)
195+
self.logger.debug(
196+
f"topic {t.name} has {t.partition_count} partitions out of {exp_part_cnt} expected"
197+
)
198+
return t.partition_count == exp_part_cnt
175199

176200
wait_until(lambda: all(topic_has_all_partitions(t) for t in topics),
177201
timeout_sec=90,
@@ -211,6 +235,62 @@ def migration_id_if_exists():
211235
self.wait_migration_appear(migration_id)
212236
return migration_id
213237

238+
def assure_not_migratable(self, topic: TopicSpec):
239+
out_migration = OutboundDataMigration(
240+
[make_namespaced_topic(topic.name)], consumer_groups=[])
241+
try:
242+
self.create_and_wait(out_migration)
243+
assert False
244+
except requests.exceptions.HTTPError as e:
245+
pass
246+
247+
@cluster(num_nodes=3, log_allow_list=MIGRATION_LOG_ALLOW_LIST)
248+
def test_listing_inexistent_migration(self):
249+
try:
250+
self.get_migration(42)
251+
except Exception as e:
252+
# check 404
253+
self.logger.info("f{e}")
254+
raise
255+
256+
@cluster(num_nodes=3, log_allow_list=MIGRATION_LOG_ALLOW_LIST)
257+
def test_creating_with_topic_no_remote_writes(self):
258+
self.redpanda.set_cluster_config(
259+
{"cloud_storage_enable_remote_write": False}, expect_restart=True)
260+
topic = TopicSpec(partition_count=3)
261+
self.client().create_topic(topic)
262+
self.wait_partitions_appear([topic])
263+
self.redpanda.set_cluster_config(
264+
{"cloud_storage_enable_remote_write": True}, expect_restart=True)
265+
self.assure_not_migratable(topic)
266+
267+
@cluster(
268+
num_nodes=3,
269+
log_allow_list=MIGRATION_LOG_ALLOW_LIST + [
270+
r'/v1/migrations.*Requested feature is disabled', # cloud storage disabled
271+
])
272+
def test_creating_when_cluster_misconfigured1(self):
273+
self.creating_when_cluster_misconfigured("cloud_storage_enabled")
274+
275+
@cluster(
276+
num_nodes=3,
277+
log_allow_list=MIGRATION_LOG_ALLOW_LIST + [
278+
r'/v1/migrations.*Requested feature is disabled', # cloud storage disabled
279+
'archival' # a variety of archival errors is observed
280+
])
281+
def test_creating_when_cluster_misconfigured2(self):
282+
self.creating_when_cluster_misconfigured("cloud_storage_enabled")
283+
284+
def creating_when_cluster_misconfigured(self, param_to_disable):
285+
self.redpanda.set_cluster_config({param_to_disable: False},
286+
expect_restart=True)
287+
topic = TopicSpec(partition_count=3)
288+
self.client().create_topic(topic)
289+
self.assure_not_migratable(topic)
290+
# for scrubbing to complete
291+
self.redpanda.set_cluster_config({param_to_disable: True},
292+
expect_restart=True)
293+
214294
@cluster(num_nodes=3, log_allow_list=MIGRATION_LOG_ALLOW_LIST)
215295
def test_creating_and_listing_migrations(self):
216296
topics = [TopicSpec(partition_count=3) for i in range(5)]
@@ -310,7 +390,7 @@ def test_creating_and_listing_migrations(self):
310390
self.redpanda.si_settings.set_expected_damage(
311391
{"ntr_no_topic_manifest", "missing_segments"})
312392

313-
@cluster(num_nodes=3)
393+
@cluster(num_nodes=3, log_allow_list=MIGRATION_LOG_ALLOW_LIST)
314394
def test_higher_level_migration_api(self):
315395
topics = [TopicSpec(partition_count=3) for i in range(5)]
316396

0 commit comments

Comments
 (0)