Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 141 additions & 0 deletions crates/iota-network/src/state_sync/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use iota_archival::{reader::ArchiveReaderBalancer, writer::ArchiveWriter};
use iota_config::{
node::ArchiveReaderConfig,
object_storage_config::{ObjectStoreConfig, ObjectStoreType},
p2p::StateSyncConfig,
};
use iota_storage::{FileCompression, StorageFormat};
use iota_swarm_config::test_utils::{
Expand Down Expand Up @@ -936,3 +937,143 @@ async fn sync_with_checkpoints_watermark() {
&last_checkpoint_seq
);
}

/// Regression test for https://github.com/iotaledger/iota/issues/11496.
///
/// When checkpoint content is unavailable
/// (`ContentSyncError::PrunedOnAllPeers`) during content sync attempt the
/// failing checkpoint must be retried first before trying to sync later
/// checkpoints.
#[tokio::test]
async fn sync_with_checkpoints_gap() -> anyhow::Result<()> {
telemetry_subscribers::init_for_testing();

// 6 checkpoints: genesis (seq 0) + sequences 1–5.
// Checkpoint 1 will be simulated as "pruned" on the peer; 2–5 are available.
let (committee, (ordered_checkpoints, contents, _, _)) =
make_committee_and_checkpoints(0, 4, 6, None, random_contents);

let genesis_content = contents.first().cloned().unwrap();
let genesis_checkpoint = ordered_checkpoints.first().cloned().unwrap();

let store_1 = store_with_genesis_state(
genesis_checkpoint.clone(),
genesis_content.clone(),
committee.committee().to_owned(),
);
let store_2 = store_with_genesis_state(
genesis_checkpoint.clone(),
genesis_content.clone(),
committee.committee().to_owned(),
);

let (builder_1, server_1) = Builder::new().store(store_1.clone()).build();
let network_1 = build_network(|router| router.add_rpc_service(server_1));
let (event_loop_1, _handle_1) = builder_1.build(network_1.clone());

// Shorten the retry back-off so checkpoint 1's failure loop cycles quickly
// and any watermark regression shows up within the 2 s assertion window.
let config_2 = StateSyncConfig {
wait_interval_when_no_peer_to_sync_content_ms: Some(50),
..Default::default()
};
let (builder_2, server_2) = Builder::new()
.config(config_2)
.store(store_2.clone())
.build();
let network_2 = build_network(|router| router.add_rpc_service(server_2));
let (event_loop_2, _handle_2) = builder_2.build(network_2.clone());

// Node 1: insert all summaries and contents (sequences 0–5), synced
// watermark at 5.
{
let mut store_1 = store_1.inner_mut();
for (checkpoint, content) in ordered_checkpoints.iter().zip(contents.iter()) {
store_1.insert_checkpoint(checkpoint);
store_1.insert_checkpoint_contents(checkpoint, content.clone());
store_1.update_highest_synced_checkpoint(checkpoint);
}
}

// Simulate checkpoint 1 being pruned: set node 1's lowest-available
// watermark to 2. Node 2 will learn this via the handshake and see
// `is_pruned = true` for checkpoint 1 (seq 1 < lowest 2) while
// checkpoints 2–5 remain fetchable (seq >= 2).
store_1.inner_mut().set_lowest_available_checkpoint(2);

tokio::spawn(event_loop_1.start());
tokio::spawn(event_loop_2.start());
network_2.connect(network_1.local_addr()).await.unwrap();

let genesis_seq = *genesis_checkpoint.sequence_number();
let last_seq = *ordered_checkpoints.last().unwrap().sequence_number();

// Wait for node 2 to verify all summaries (sequences 0–5).
timeout(Duration::from_secs(10), async {
loop {
if *store_2
.try_get_highest_verified_checkpoint()
.unwrap()
.sequence_number()
== last_seq
{
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
})
.await
.expect("node 2 failed to sync all checkpoint summaries");

// Give the content-sync loop 2 s to run. With a 50 ms retry interval
// ~40 retry cycles fire for checkpoint 1. If push_back were used
// (pre-fix), checkpoints 2–5 would advance the watermark to 5 within
// this window.
tokio::time::sleep(Duration::from_secs(2)).await;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure a fixed 2s here is good especially for a heavy/slow CI box, and it might be even safer to add an assertion here like

// e.g. before the watermark assert:
assert!(
    store_2.get_full_checkpoint_contents_by_sequence_number(2).is_some(),
    "content loop did not even fetch seq 2 within the window — test is not exercising the gap",
);

so we can prove the loop ran and still refused to advance.
How do you think?


// REGRESSION CHECK: the synced watermark must not have advanced past
// genesis (sequence 0) while checkpoint 1's contents are unavailable.
assert_eq!(
*store_2
.try_get_highest_synced_checkpoint()
.unwrap()
.sequence_number(),
genesis_seq,
"synced watermark advanced past genesis even though checkpoint 1 \
contents are unavailable — regression from PR #11485 (push_back bug)"
);

// Restore availability: lower the watermark to 0. Node 2 will refresh
// node 1's watermark on the next periodic tick (≤5 s) and unblock
// checkpoint 1's content sync.
store_1.inner_mut().set_lowest_available_checkpoint(0);

// Allow up to 12 s for the tick, the watermark refresh, and the full sync.
timeout(Duration::from_secs(12), async {
loop {
if *store_2
.try_get_highest_synced_checkpoint()
.unwrap()
.sequence_number()
== last_seq
{
break;
}
tokio::time::sleep(Duration::from_millis(200)).await;
}
})
.await
.expect("node 2 failed to fully sync after checkpoint 1 became available");

// Verify that all checkpoint contents are present in node 2's store.
for (i, checkpoint) in ordered_checkpoints.iter().enumerate() {
assert!(
store_2
.get_full_checkpoint_contents_by_sequence_number(*checkpoint.sequence_number())
.is_some(),
"checkpoint {i} contents missing from synced store"
);
}

Ok(())
}
Loading