Skip to content

Commit

Permalink
Merge pull request #23563 from Lazin/fix/torture-test
Browse files Browse the repository at this point in the history
archival: Fix torture test
  • Loading branch information
dotnwat authored Oct 1, 2024
2 parents a810f88 + 5972e4c commit 540d9ce
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 39 deletions.
21 changes: 13 additions & 8 deletions src/v/cluster/archival/archiver_operations_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -565,10 +565,13 @@ class archiver_operations_impl : public archiver_operations_api {
auto upload = co_await _upl_builder->prepare_segment_upload(
part, range, _read_buffer_size(), sg, deadline);
if (upload.has_error()) {
vlog(
_rtclog.warn,
"prepare_segment_upload failed {}",
upload.error());
if (
upload.error() != archival::error_outcome::not_enough_data) {
vlog(
_rtclog.warn,
"prepare_segment_upload failed {}",
upload.error().message());
}
co_return upload.error();
}
auto res = std::move(upload.value());
Expand Down Expand Up @@ -1193,10 +1196,12 @@ class upload_builder : public detail::segment_upload_builder_api {
auto upl = co_await segment_upload::make_segment_upload(
cp, range, read_buffer_size, sg, deadline);
if (upl.has_error()) {
vlog(
archival_log.error,
"Can't find upload candidate: {}",
upl.error());
if (upl.error() != archival::error_outcome::not_enough_data) {
vlog(
archival_log.error,
"Can't find upload candidate: {}",
upl.error().message());
}
co_return upl.error();
}

Expand Down
13 changes: 13 additions & 0 deletions src/v/cluster/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,19 @@ ss::future<> ntp_archiver::upload_until_term_change() {
_last_manifest_upload_time = ss::lowres_clock::now();
}

const auto& upl_results = upload_list.value().results;
const auto all_failed = std::all_of(
upl_results.begin(),
upl_results.end(),
[](cloud_storage::upload_result r) {
return r != cloud_storage::upload_result::success;
});

if (all_failed) {
vlog(_rtclog.debug, "All uploads has failed");
continue;
}

auto admit_result = co_await _ops->admit_uploads(
_rtcnode, std::move(upload_list.value()));
if (admit_result.has_error()) {
Expand Down
85 changes: 56 additions & 29 deletions src/v/cluster/archival/tests/archival_service_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "config/configuration.h"
#include "config/property.h"
#include "container/fragmented_vector.h"
#include "errc.h"
#include "http/tests/http_imposter.h"
#include "model/fundamental.h"
#include "model/metadata.h"
Expand Down Expand Up @@ -237,42 +238,68 @@ class archiver_cluster_fixture
/// Move partition from source node to the target node
// NOLINTNEXTLINE
void move_partition(model::ntp ntp, std::vector<model::node_id> replicas) {
auto node = controller_leader();
BOOST_REQUIRE(node != nullptr);
auto errc = node->controller->get_topics_frontend()
.local()
.move_partition_replicas(
std::move(ntp),
std::move(replicas),
cluster::reconfiguration_policy::min_local_retention,
model::no_timeout)
.get();
vlog(
arch_fixture_log.info,
"move_partition_replicas result {}",
errc.message());
static constexpr int num_retries = 4;
std::error_code errc;
for (int i = 0; i < num_retries; i++) {
auto node = controller_leader();
BOOST_REQUIRE(node != nullptr);
errc = node->controller->get_topics_frontend()
.local()
.move_partition_replicas(
ntp,
replicas,
cluster::reconfiguration_policy::min_local_retention,
model::no_timeout)
.get();
vlog(
arch_fixture_log.info,
"move_partition_replicas result {}",
errc.message());

if (errc == raft::errc::not_leader) {
vlog(
arch_fixture_log.info,
"move_partition_replicas controller leadership has moved, "
"retrying");
continue;
}
break;
}
BOOST_REQUIRE(errc == cluster::errc::success);
}

/// Move partition from source node to the target node
// NOLINTNEXTLINE
void
move_partition(model::ntp ntp, std::vector<model::broker_shard> replicas) {
auto node = controller_leader();
BOOST_REQUIRE(node != nullptr);
auto errc = node->controller->get_topics_frontend()
.local()
.move_partition_replicas(
std::move(ntp),
std::move(replicas),
cluster::reconfiguration_policy::min_local_retention,
model::no_timeout)
.get();
vlog(
arch_fixture_log.info,
"move_partition_replicas (x-shard) result {}",
errc.message());
BOOST_REQUIRE_EQUAL(errc, cluster::errc::success);
static constexpr int num_retries = 4;
std::error_code errc;
for (int i = 0; i < num_retries; i++) {
auto node = controller_leader();
BOOST_REQUIRE(node != nullptr);
errc = node->controller->get_topics_frontend()
.local()
.move_partition_replicas(
ntp,
replicas,
cluster::reconfiguration_policy::min_local_retention,
model::no_timeout)
.get();
vlog(
arch_fixture_log.info,
"move_partition_replicas (x-shard) result {}",
errc.message());

if (errc == raft::errc::not_leader) {
vlog(
arch_fixture_log.info,
"move_partition_replicas (x-shard) controller leadership has "
"moved, retrying");
continue;
}
break;
}
BOOST_REQUIRE(errc == cluster::errc::success);
}

/// Return list of nodes that have a replica of the partition
Expand Down
18 changes: 16 additions & 2 deletions src/v/cluster/archival/tests/archiver_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
#include "random/generators.h"
#include "test_utils/async.h"
#include "test_utils/fixture.h"
#include "test_utils/scoped_config.h"

#include <seastar/core/loop.hh>
#include <seastar/core/lowres_clock.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/timed_out_error.hh>

Expand Down Expand Up @@ -173,6 +175,11 @@ FIXTURE_TEST(test_archiver_service_torture_test, archiver_cluster_fixture) {
node_id0, node_id1, node_id2, node_id3};
std::vector<model::node_id> replica_set = {node_id0, node_id1, node_id2};

// The test is using only shard 0 and all 'nodes' are sharing the same
// config values stored in the TSL.
scoped_config cfg;
cfg.get("cloud_storage_disable_upload_loop_for_tests").set_value(true);

wait_for_controller_leadership(node_id0).get();
wait_for_all_members(3s).get();

Expand All @@ -190,8 +197,14 @@ FIXTURE_TEST(test_archiver_service_torture_test, archiver_cluster_fixture) {
#else
const int num_iterations = 20;
#endif
const std::chrono::seconds max_time = 300s;

auto test_start = ss::lowres_clock::now();
for (int i = 0; i < num_iterations; i++) {
if (ss::lowres_clock::now() - test_start > max_time) {
vlog(arch_test_log.info, "Time limit reached");
break;
}
for (auto ntp : panda_topic) {
// On every iteration we either shuffling the leadership or moving
// one of the replicas between the nodes or performing the
Expand All @@ -214,7 +227,8 @@ FIXTURE_TEST(test_archiver_service_torture_test, archiver_cluster_fixture) {
n_s.shard = n_s.shard == 0 ? 1 : 0;
}
move_partition(panda_topic.back(), loc_list);
wait_partition_movement_complete(panda_topic.back(), loc_list);
wait_partition_movement_complete(
panda_topic.back(), loc_list, 60s);
} else {
// Move partition to another node
auto ntp = random_generators::random_choice(all_ntp);
Expand Down Expand Up @@ -251,7 +265,7 @@ FIXTURE_TEST(test_archiver_service_torture_test, archiver_cluster_fixture) {
}
}
move_partition(panda_topic.back(), replica_set);
wait_partition_movement_complete(panda_topic.back());
wait_partition_movement_complete(panda_topic.back(), 60s);
}
}
wait_all_partition_leaders(panda_topic);
Expand Down

0 comments on commit 540d9ce

Please sign in to comment.