Skip to content

Commit

Permalink
Merge pull request #13597 from VladLazar/scrub-metadata
Browse files Browse the repository at this point in the history
cloud_storage: scrub segment metadata
  • Loading branch information
Vlad Lazar authored Sep 26, 2023
2 parents 6e70513 + 6b0f20e commit 5176601
Show file tree
Hide file tree
Showing 11 changed files with 1,048 additions and 66 deletions.
7 changes: 4 additions & 3 deletions src/v/archival/scrubber.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ scrubber::run(retry_chain_node& rtc_node, run_quota_t quota) {
.remaining = quota};
}

vlog(_logger.debug, "Starting scrub ...");
vlog(_logger.info, "Starting scrub ...");

// TODO: make the timeout dynamic
retry_chain_node anomaly_detection_rtc(1min, 100ms, &rtc_node);
Expand Down Expand Up @@ -95,7 +95,7 @@ scrubber::run(retry_chain_node& rtc_node, run_quota_t quota) {

if (detect_result.status == cloud_storage::scrub_status::failed) {
vlog(
_logger.debug,
_logger.info,
"Scrub failed after {} operations. Will retry ...",
detect_result.ops);
co_return run_result{
Expand All @@ -112,10 +112,11 @@ scrubber::run(retry_chain_node& rtc_node, run_quota_t quota) {
}

vlog(
_logger.debug,
_logger.info,
"Scrub finished with status {} and detected {}",
detect_result.status,
detect_result.detected);

auto replicate_result = co_await _archiver.process_anomalies(
model::timestamp::now(),
detect_result.status,
Expand Down
68 changes: 48 additions & 20 deletions src/v/cloud_storage/anomalies_detector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include "cloud_storage/base_manifest.h"
#include "cloud_storage/partition_manifest.h"
#include "cloud_storage/remote.h"
#include "cloud_storage/spillover_manifest.h"

namespace cloud_storage {

Expand Down Expand Up @@ -96,27 +95,53 @@ anomalies_detector::run(retry_chain_node& rtc_node) {
auto stm_manifest_check_res = co_await check_manifest(manifest, rtc_node);
final_res += std::move(stm_manifest_check_res);

for (auto iter = spill_manifest_paths.begin();
iter != spill_manifest_paths.end();
++iter) {
std::optional<segment_meta> first_seg_previous_manifest;
if (!manifest.empty()) {
first_seg_previous_manifest = *manifest.begin();
}

for (const auto& spill_manifest_path : spill_manifest_paths) {
if (_as.abort_requested()) {
final_res.status = scrub_status::partial;
co_return final_res;
}

auto manifest_res = co_await download_and_check_spill_manifest(
*iter, rtc_node);
final_res += std::move(manifest_res);
++final_res.ops;
const auto spill = co_await download_spill_manifest(
spill_manifest_path, rtc_node);
if (spill) {
// Check adjacent segments which have a manifest
// boundary between them.
if (auto last_in_spill = spill->last_segment();
last_in_spill && first_seg_previous_manifest) {
scrub_segment_meta(
*first_seg_previous_manifest,
last_in_spill,
final_res.detected.segment_metadata_anomalies);
}

final_res += co_await check_manifest(*spill, rtc_node);

if (!spill->empty()) {
first_seg_previous_manifest = *spill->begin();
} else {
vlog(
_logger.warn,
"Empty spillover manifest at {}",
spill_manifest_path);
}
} else {
final_res.status = scrub_status::partial;
first_seg_previous_manifest = std::nullopt;
}
}

co_return final_res;
}

ss::future<anomalies_detector::result>
anomalies_detector::download_and_check_spill_manifest(
ss::future<std::optional<spillover_manifest>>
anomalies_detector::download_spill_manifest(
const ss::sstring& path, retry_chain_node& rtc_node) {
result res{};

vlog(_logger.debug, "Downloading spillover manifest {}", path);

spillover_manifest spill{_ntp, _initial_rev};
Expand All @@ -125,18 +150,14 @@ anomalies_detector::download_and_check_spill_manifest(
{manifest_format::serde, remote_manifest_path{path}},
spill,
rtc_node);
res.ops += 1;

if (manifest_get_result != download_result::success) {
vlog(_logger.debug, "Failed downloading spillover manifest {}", path);

res.status = scrub_status::partial;
co_return res;
co_return std::nullopt;
}

res += co_await check_manifest(spill, rtc_node);

co_return res;
co_return spill;
}

ss::future<anomalies_detector::result> anomalies_detector::check_manifest(
Expand All @@ -145,20 +166,23 @@ ss::future<anomalies_detector::result> anomalies_detector::check_manifest(

vlog(_logger.debug, "Checking manifest {}", manifest.get_manifest_path());

std::optional<segment_meta> previous_seg_meta;
for (auto seg_iter = manifest.begin(); seg_iter != manifest.end();
++seg_iter) {
if (_as.abort_requested()) {
res.status = scrub_status::partial;
co_return res;
}

auto segment_path = manifest.generate_segment_path(*seg_iter);
auto exists_result = co_await _remote.segment_exists(
const auto seg_meta = *seg_iter;

const auto segment_path = manifest.generate_segment_path(seg_meta);
const auto exists_result = co_await _remote.segment_exists(
_bucket, segment_path, rtc_node);
res.ops += 1;

if (exists_result == download_result::notfound) {
res.detected.missing_segments.emplace(*seg_iter);
res.detected.missing_segments.emplace(seg_meta);
} else if (exists_result != download_result::success) {
vlog(
_logger.debug,
Expand All @@ -167,6 +191,10 @@ ss::future<anomalies_detector::result> anomalies_detector::check_manifest(

res.status = scrub_status::partial;
}

scrub_segment_meta(
seg_meta, previous_seg_meta, res.detected.segment_metadata_anomalies);
previous_seg_meta = seg_meta;
}

vlog(
Expand Down
5 changes: 3 additions & 2 deletions src/v/cloud_storage/anomalies_detector.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#pragma once

#include "cloud_storage/fwd.h"
#include "cloud_storage/spillover_manifest.h"
#include "cloud_storage/types.h"
#include "model/fundamental.h"
#include "model/metadata.h"
Expand Down Expand Up @@ -52,13 +53,13 @@ class anomalies_detector {

ss::future<result> run(retry_chain_node&);

ss::future<anomalies_detector::result> download_and_check_spill_manifest(
private:
ss::future<std::optional<spillover_manifest>> download_spill_manifest(
const ss::sstring& path, retry_chain_node& rtc_node);

ss::future<anomalies_detector::result> check_manifest(
const partition_manifest& manifest, retry_chain_node& rtc_node);

private:
cloud_storage_clients::bucket_name _bucket;
model::ntp _ntp;
model::initial_revision_id _initial_rev;
Expand Down
167 changes: 141 additions & 26 deletions src/v/cloud_storage/partition_manifest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,20 @@ bool partition_manifest::contains(const segment_name& name) const {
return _segments.contains(maybe_key->base_offset);
}

bool partition_manifest::segment_with_offset_range_exists(
model::offset base, model::offset committed) const {
if (auto iter = find(base); iter != end()) {
const auto expected_committed
= _segments.get_committed_offset_column().at_index(iter.index());

// false when committed offset doesn't match
return committed == *expected_committed;
} else {
// base offset doesn't match any segment
return false;
}
}

void partition_manifest::delete_replaced_segments() { _replaced.clear(); }

model::offset partition_manifest::get_archive_start_offset() const {
Expand Down Expand Up @@ -923,38 +937,116 @@ bool partition_manifest::safe_segment_meta_to_add(const segment_meta& m) {
// The empty manifest can be started from any offset. If we deleted all
// segments due to retention we should start from last uploaded offset.
// The reuploads are not possible if the manifest is empty.
return _last_offset == model::offset{0}
|| model::next_offset(_last_offset) == m.base_offset;
}
auto last = _segments.last_segment().value();
auto next = model::next_offset(last.committed_offset);
if (m.base_offset == next) {
return last.delta_offset_end == m.delta_offset;
}
if (m.base_offset > next) {
// Base offset of the uploaded segment overshoots the expected value
return false;
const bool is_safe = _last_offset == model::offset{0}
|| model::next_offset(_last_offset)
== m.base_offset;

if (!is_safe) {
vlog(
cst_log.error,
"[{}] New segment does not line up with last offset of empty "
"log: "
"last_offset: {}, new_segment: {}",
_ntp,
_last_offset,
m);
}

return is_safe;
}
// Check reupload correctness
// The segment should be aligned with existing segments in the manifest but
// there should be more than one segment covered by 'm'.

auto format_seg_meta_anomalies = [](const segment_meta_anomalies& smas) {
if (smas.empty()) {
return ss::sstring{};
}

std::vector<anomaly_type> types;
for (const auto& a : smas) {
types.push_back(a.type);
}

return ssx::sformat(
"{{anomaly_types: {}, new_segment: {}, previous_segment: {}}}",
types,
smas.begin()->at,
smas.begin()->previous);
};

auto it = _segments.find(m.base_offset);
if (it == _segments.end()) {
return false;
}
if (it->committed_offset == m.committed_offset) {
// 'm' is a reupload of an individual segment, the segment should have
// different size
return it->size_bytes != m.size_bytes;
}
++it;
while (it != _segments.end()) {
// Segment added to tip of the log
const auto last_seg = last_segment();
vassert(last_seg.has_value(), "Empty manifest");

segment_meta_anomalies anomalies;
scrub_segment_meta(m, last_seg, anomalies);
if (!anomalies.empty()) {
vlog(
cst_log.error,
"[{}] New segment does not line up with previous segment: {}",
_ntp,
format_seg_meta_anomalies(anomalies));
return false;
}
return true;
} else {
// Segment reupload case:
// The segment should be aligned with existing segments in the manifest
// but there should be at least one segment covered by 'm'.
if (it != _segments.begin()) {
// Firstly, check that the replacement lines up with the segment
// preceeding it if one exists.
const auto prev_segment_it = _segments.prev(it);
segment_meta_anomalies anomalies;
scrub_segment_meta(m, *prev_segment_it, anomalies);
if (!anomalies.empty()) {
vlog(
cst_log.error,
"[{}] New replacement segment does not line up with "
"previous "
"segment: {}",
_ntp,
format_seg_meta_anomalies(anomalies));
return false;
}
}

if (it->committed_offset == m.committed_offset) {
// 'm' is a reupload of an individual segment, the segment should
// have different size
if (it->size_bytes == m.size_bytes) {
vlog(
cst_log.error,
"[{}] New replacement segment has the same size as replaced "
"segment: new_segment: {}, replaced_segment: {}",
_ntp,
m,
*it);
return false;
}

return true;
}

// 'm' is a reupload which merges multiple segments. The committed
// offset of 'm' should match an existing segment. Otherwise, the
// re-upload changes segment boundaries and is *not valid*.
++it;
while (it != _segments.end()) {
if (it->committed_offset == m.committed_offset) {
return true;
}
++it;
}

vlog(
cst_log.error,
"[{}] New replacement segment does not match the committed offset of "
"any previous segment: new_segment: {}",
_ntp,
m);
return false;
}
return false;
}

partition_manifest
Expand Down Expand Up @@ -2576,10 +2668,33 @@ void partition_manifest::process_anomalies(

auto first_kafka_offset = full_log_start_kafka_offset();
auto& missing_segs = _detected_anomalies.missing_segments;
erase_if(missing_segs, [&first_kafka_offset](const auto& meta) {
return meta.next_kafka_offset() <= first_kafka_offset;
erase_if(missing_segs, [this, &first_kafka_offset](const auto& meta) {
if (meta.next_kafka_offset() <= first_kafka_offset) {
return true;
}

// The segment might have been missing because it was merged with
// something else. If the offset range doesn't match a segment exactly,
// discard the anomaly.
return !segment_with_offset_range_exists(
meta.base_offset, meta.committed_offset);
});

auto& segment_meta_anomalies
= _detected_anomalies.segment_metadata_anomalies;
erase_if(
segment_meta_anomalies,
[this, &first_kafka_offset](const auto& anomaly_meta) {
if (anomaly_meta.at.next_kafka_offset() <= first_kafka_offset) {
return true;
}

// Similarly to the missing segment case, if the boundaries of the
// segment where the anomaly was detected changed, drop it.
return !segment_with_offset_range_exists(
anomaly_meta.at.base_offset, anomaly_meta.at.committed_offset);
});

_last_partition_scrub = scrub_timestamp;
}

Expand Down
5 changes: 5 additions & 0 deletions src/v/cloud_storage/partition_manifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,11 @@ class partition_manifest : public base_manifest {
bool contains(const key& key) const;
bool contains(const segment_name& name) const;

/// Check if the provided offset range matches any segment in the manifest
/// exactly.
bool segment_with_offset_range_exists(
model::offset base, model::offset committed) const;

/// Add new segment to the manifest
bool add(segment_meta meta);
bool add(const segment_name& name, const segment_meta& meta);
Expand Down
Loading

0 comments on commit 5176601

Please sign in to comment.