Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

archival: Add replica validator #24626

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/v/cluster/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ redpanda_cc_library(
"archival/ntp_archiver_service.cc",
"archival/probe.cc",
"archival/purger.cc",
"archival/replica_state_validator.cc",
"archival/retention_calculator.cc",
"archival/scrubber.cc",
"archival/segment_reupload.cc",
Expand Down Expand Up @@ -428,6 +429,7 @@ redpanda_cc_library(
"archival/ntp_archiver_service.h",
"archival/probe.h",
"archival/purger.h",
"archival/replica_state_validator.h",
"archival/retention_calculator.h",
"archival/scrubber.h",
"archival/scrubber_scheduler.h",
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ v_cc_library(
archival/archival_metadata_stm.cc
archival/archival_policy.cc
archival/ntp_archiver_service.cc
archival/replica_state_validator.cc
archival/probe.cc
archival/types.cc
archival/upload_controller.cc
Expand Down
22 changes: 22 additions & 0 deletions src/v/cluster/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "cluster/archival/ntp_archiver_service.h"

#include "archival/replica_state_validator.h"
#include "base/vlog.h"
#include "cloud_storage/async_manifest_view.h"
#include "cloud_storage/partition_manifest.h"
Expand Down Expand Up @@ -400,6 +401,27 @@ ss::future<> ntp_archiver::upload_until_abort(bool legacy_mode) {
continue;
}

replica_state_validator validator(_parent);
if (validator.has_anomalies()) {
// The validation and printing of anomalies is happening
// every time we start an archiver
validator.maybe_print_scarry_log_message();
// Disable uploads and housekeeping if consistency
// checks are not disabled
if (!config::shard_local_cfg()
.cloud_storage_disable_upload_consistency_checks()) {
// Consistency checks will not let us do anything anyway
vlog(
_rtclog.warn,
"upload loop stalled for {}ms in term {} because of "
"anomalies",
sync_timeout.count(),
_start_term);
co_await ss::sleep_abortable(sync_timeout, _as);
continue;
}
}

vlog(_rtclog.debug, "upload loop synced in term {}", _start_term);
if (!may_begin_uploads()) {
continue;
Expand Down
108 changes: 108 additions & 0 deletions src/v/cluster/archival/replica_state_validator.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/

#include "cluster/archival/replica_state_validator.h"

#include "cluster/archival/logger.h"

namespace archival {

replica_state_validator::replica_state_validator(const cluster::partition& p)
: _partition(p)
, _anomalies(validate()) {}

bool replica_state_validator::has_anomalies() const noexcept {
return _anomalies.size() > 0;
}

const std::deque<replica_state_anomaly>
replica_state_validator::get_anomalies() const noexcept {
return _anomalies;
}

void replica_state_validator::maybe_print_scarry_log_message() const {
if (has_anomalies()) {
for (const auto& anomalie : _anomalies) {
vlog(
archival_log.error,
"[{}] anomaly detected: {}",
_partition.get_ntp_config().ntp(),
anomalie.message);
}
}
}

std::deque<replica_state_anomaly> replica_state_validator::validate() {
std::deque<replica_state_anomaly> result;
// Before we start uploading we need to make sure that some invariants
// are met.
// We're validating the replica state over previously uploaded metadata.
// The idea is that we can gradually build confidence by making incremental
// checks. This code also defines manual intervention point. Once the
// problem is detected the NTP archiver shouldn't do anything other than
// reporting the problem periodically and asking for intervention. After
// examining the anomaly the operator might decide to disable the checks and
// allow the uploads to continue. This is an improvement over the situation
// when the uploads are getting stuck because we're not doing any extra
// work. It may also be easier to diagnose the problem if the archiver is
// not running. Potentially, we can do some automatic mitigations
// (transferring leadership or blocking writes).
const auto& manifest = _partition.archival_meta_stm()->manifest();
if (manifest.empty()) {
// Nothing to validate
return result;
}

// 1. Check local log start offset
auto manifest_next = model::next_offset(manifest.get_last_offset());
auto local_so = _partition.log()->offsets().start_offset;
if (manifest_next < local_so) {
// There is a gap between last uploaded offset and first available
// local offset. Progress is impossible if the metadata consistency
// checks are on.
result.push_back(replica_state_anomaly{
.type = replica_state_anomaly_type::offsets_gap,
.message = ssx::sformat(
"There is a {} to {} gap between the manifest and local storage",
manifest_next,
local_so)});
}

// 2. Check offset translator state
// Get the last uploaded segment and try to translate one of its offsets
// and compare the results.
auto last_segment = manifest.last_segment();
if (last_segment.has_value() && local_so < last_segment->base_offset) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<=?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Missing test too.

// Last segment exists in the manifest and can be translated using
// local offset translation state.

auto expected_delta = last_segment->delta_offset;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to also validate delta_offset_end?

auto actual_delta = _partition.log()->offset_delta(
last_segment->base_offset);

// Offset translation state diverged on two different replicas.
// Previous leader translated offset differently
if (expected_delta != actual_delta) {
result.push_back(replica_state_anomaly{
.type = replica_state_anomaly_type::ot_state,
.message = ssx::sformat(
"Offset translation anomaly detected for offset {}, expected "
"delta {}, actual delta {}, segment_meta",
last_segment->base_offset,
expected_delta,
actual_delta,
last_segment)});
}
}

return result;
}

} // namespace archival
57 changes: 57 additions & 0 deletions src/v/cluster/archival/replica_state_validator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/

#pragma once

#include "cluster/fwd.h"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused?

#include "cluster/partition.h"

#include <seastar/core/shared_ptr.hh>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused include?


namespace archival {

enum class replica_state_anomaly_type {
// Gap between the manifest and the local storage
offsets_gap,

// Inconsistent offset-translator state
ot_state,
};

struct replica_state_anomaly {
replica_state_anomaly_type type;
ss::sstring message;
};

/// This class is used to validate partition replica state
/// over cluster replicas. If the archiver starts on a replica
/// with inconsistent offset translator state it should be able
/// to detect this and report the problem.
class replica_state_validator {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need to be a class? i think it would be better as a free function with a struct result

a bit unusual for a validator class to do work in constructor and then have no other use

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the method could accept a log object and a manifest object only, no need for a full blown partition

will make testing easier, less includes too

public:
explicit replica_state_validator(const cluster::partition&);

bool has_anomalies() const noexcept;

const std::deque<replica_state_anomaly> get_anomalies() const noexcept;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dead code?


void maybe_print_scarry_log_message() const;

private:
/// Run validations.
/// Return set of detected anomalies.
/// Throw if we failed to perform validation process.
std::deque<replica_state_anomaly> validate();

const cluster::partition& _partition;
std::deque<replica_state_anomaly> _anomalies;
};

} // namespace archival
Loading