Skip to content

Commit

Permalink
archival: Add replica validator
Browse files Browse the repository at this point in the history
The validator is used to check if replica state is not diverged from
the previous leader.

Signed-off-by: Evgeny Lazin <[email protected]>
  • Loading branch information
Lazin committed Dec 20, 2024
1 parent be58f47 commit 4126158
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/v/cluster/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ redpanda_cc_library(
"archival/ntp_archiver_service.cc",
"archival/probe.cc",
"archival/purger.cc",
"archival/replica_state_validator.cc",
"archival/retention_calculator.cc",
"archival/scrubber.cc",
"archival/segment_reupload.cc",
Expand Down Expand Up @@ -428,6 +429,7 @@ redpanda_cc_library(
"archival/ntp_archiver_service.h",
"archival/probe.h",
"archival/purger.h",
"archival/replica_state_validator.h",
"archival/retention_calculator.h",
"archival/scrubber.h",
"archival/scrubber_scheduler.h",
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ v_cc_library(
archival/archival_metadata_stm.cc
archival/archival_policy.cc
archival/ntp_archiver_service.cc
archival/replica_state_validator.cc
archival/probe.cc
archival/types.cc
archival/upload_controller.cc
Expand Down
22 changes: 22 additions & 0 deletions src/v/cluster/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "cluster/archival/ntp_archiver_service.h"

#include "archival/replica_state_validator.h"
#include "base/vlog.h"
#include "cloud_storage/async_manifest_view.h"
#include "cloud_storage/partition_manifest.h"
Expand Down Expand Up @@ -400,6 +401,27 @@ ss::future<> ntp_archiver::upload_until_abort(bool legacy_mode) {
continue;
}

replica_state_validator validator(_parent);
if (validator.has_anomalies()) {
// The validation and printing of anomalies is happening
// every time we start an archiver
validator.maybe_print_scarry_log_message();
// Disable uploads and housekeeping if consistency
// checks are not disabled
if (!config::shard_local_cfg()
.cloud_storage_disable_upload_consistency_checks()) {
// Consistency checks will not let us do anything anyway
vlog(
_rtclog.warn,
"upload loop stalled for {}ms in term {} because of "
"anomalies",
sync_timeout.count(),
_start_term);
co_await ss::sleep_abortable(sync_timeout, _as);
continue;
}
}

vlog(_rtclog.debug, "upload loop synced in term {}", _start_term);
if (!may_begin_uploads()) {
continue;
Expand Down
108 changes: 108 additions & 0 deletions src/v/cluster/archival/replica_state_validator.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/

#include "cluster/archival/replica_state_validator.h"

#include "cluster/archival/logger.h"

namespace archival {

replica_state_validator::replica_state_validator(const cluster::partition& p)
: _partition(p)
, _anomalies(validate()) {}

bool replica_state_validator::has_anomalies() const noexcept {
return _anomalies.size() > 0;
}

const std::deque<replica_state_anomaly>
replica_state_validator::get_anomalies() const noexcept {
return _anomalies;
}

void replica_state_validator::maybe_print_scarry_log_message() const {
if (has_anomalies()) {
for (const auto& anomalie : _anomalies) {
vlog(
archival_log.error,
"[{}] anomaly detected: {}",
_partition.get_ntp_config().ntp(),
anomalie.message);
}
}
}

std::deque<replica_state_anomaly> replica_state_validator::validate() {
std::deque<replica_state_anomaly> result;
// Before we start uploading we need to make sure that some invariants
// are met.
// We're validating the replica state over previously uploaded metadata.
// The idea is that we can gradually build confidence by making incremental
// checks. This code also defines manual intervention point. Once the
// problem is detected the NTP archiver shouldn't do anything other than
// reporting the problem periodically and asking for intervention. After
// examining the anomaly the operator might decide to disable the checks and
// allow the uploads to continue. This is an improvement over the situation
// when the uploads are getting stuck because we're not doing any extra
// work. It may also be easier to diagnose the problem if the archiver is
// not running. Potentially, we can do some automatic mitigations
// (transferring leadership or blocking writes).
const auto& manifest = _partition.archival_meta_stm()->manifest();
if (manifest.empty()) {
// Nothing to validate
return result;
}

// 1. Check local log start offset
auto manifest_next = model::next_offset(manifest.get_last_offset());
auto local_so = _partition.log()->offsets().start_offset;
if (manifest_next < local_so) {
// There is a gap between last uploaded offset and first available
// local offset. Progress is impossible if the metadata consistency
// checks are on.
result.push_back(replica_state_anomaly{
.type = replica_state_anomaly_type::offsets_gap,
.message = ssx::sformat(
"There is a {} to {} gap between the manifest and local storage",
manifest_next,
local_so)});
}

// 2. Check offset translator state
// Get the last uploaded segment and try to translate one of its offsets
// and compare the results.
auto last_segment = manifest.last_segment();
if (last_segment.has_value() && local_so < last_segment->base_offset) {
// Last segment exists in the manifest and can be translated using
// local offset translation state.

auto expected_delta = last_segment->delta_offset;
auto actual_delta = _partition.log()->offset_delta(
last_segment->base_offset);

// Offset translation state diverged on two different replicas.
// Previous leader translated offset differently
if (expected_delta != actual_delta) {
result.push_back(replica_state_anomaly{
.type = replica_state_anomaly_type::ot_state,
.message = ssx::sformat(
"Offset translation anomaly detected for offset {}, expected "
"delta {}, actual delta {}, segment_meta",
last_segment->base_offset,
expected_delta,
actual_delta,
last_segment)});
}
}

return result;
}

} // namespace archival
57 changes: 57 additions & 0 deletions src/v/cluster/archival/replica_state_validator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/

#pragma once

#include "cluster/fwd.h"
#include "cluster/partition.h"

#include <seastar/core/shared_ptr.hh>

namespace archival {

enum class replica_state_anomaly_type {
// Gap between the manifest and the local storage
offsets_gap,

// Inconsistent offset-translator state
ot_state,
};

struct replica_state_anomaly {
replica_state_anomaly_type type;
ss::sstring message;
};

/// This class is used to validate partition replica state
/// over cluster replicas. If the archiver starts on a replica
/// with inconsistent offset translator state it should be able
/// to detect this and report the problem.
class replica_state_validator {
public:
explicit replica_state_validator(const cluster::partition&);

bool has_anomalies() const noexcept;

const std::deque<replica_state_anomaly> get_anomalies() const noexcept;

void maybe_print_scarry_log_message() const;

private:
/// Run validations.
/// Return set of detected anomalies.
/// Throw if we failed to perform validation process.
std::deque<replica_state_anomaly> validate();

const cluster::partition& _partition;
std::deque<replica_state_anomaly> _anomalies;
};

} // namespace archival

0 comments on commit 4126158

Please sign in to comment.