Skip to content

Commit

Permalink
fix(tests): fix state sync test errors (#12150)
Browse files Browse the repository at this point in the history
These tests create node home dirs in tempfiles, but these are dropped
too soon, and the directories are removed while the nodes are still
running. This is visible in the logs as snapshot creation failure
messages. After #12147, these no
longer cause test failures, but the underlying failures to create
snapshots are still there.

Fix it by keeping them around in an Arc in the enclosing scopes

Note that every now and then these tests still fail with many messages
showing `Received an invalid block during state sync` followed by a test
timeout, but it seems to be for an unrelated reason.
  • Loading branch information
marcelo-gonzalez authored Oct 4, 2024
1 parent a9bf310 commit 0aa1343
Showing 1 changed file with 67 additions and 28 deletions.
95 changes: 67 additions & 28 deletions integration-tests/src/tests/client/sync_state_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,16 @@ fn sync_state_nodes() {
let mut near1 = load_test_config("test1", port1, genesis.clone());
near1.network_config.peer_store.boot_nodes = convert_boot_nodes(vec![]);
near1.client_config.min_num_peers = 0;

// In this test and the ones below, we have an Arc<TempDir>, that we make sure to keep alive by cloning it
// and keeping the original one around after we pass the clone to run_actix(). Otherwise it will be dropped early
// and the directories will actually be removed while the nodes are running.
let _dir1 = Arc::new(tempfile::Builder::new().prefix("sync_nodes_1").tempdir().unwrap());
let dir1 = _dir1.clone();
let _dir2 = Arc::new(tempfile::Builder::new().prefix("sync_nodes_2").tempdir().unwrap());
let dir2 = _dir2.clone();

run_actix(async move {
let dir1 = tempfile::Builder::new().prefix("sync_nodes_1").tempdir().unwrap();
let nearcore::NearNode { view_client: view_client1, .. } =
start_with_config(dir1.path(), near1).expect("start_with_config");

Expand All @@ -61,6 +69,7 @@ fn sync_state_nodes() {
let view_client2_holder2 = view_client2_holder.clone();
let arbiters_holder2 = arbiters_holder2.clone();
let genesis2 = genesis.clone();
let dir2 = dir2.clone();

let actor = view_client1.send(GetBlock::latest().with_span_context());
let actor = actor.then(move |res| {
Expand All @@ -78,10 +87,6 @@ fn sync_state_nodes() {
near2.network_config.peer_store.boot_nodes =
convert_boot_nodes(vec![("test1", *port1)]);

let dir2 = tempfile::Builder::new()
.prefix("sync_nodes_2")
.tempdir()
.unwrap();
let nearcore::NearNode {
view_client: view_client2,
arbiters,
Expand Down Expand Up @@ -125,6 +130,8 @@ fn sync_state_nodes() {
)
.start();
});
drop(_dir1);
drop(_dir2);
});
}

Expand All @@ -148,6 +155,15 @@ fn sync_state_nodes_multishard() {
);
genesis.config.epoch_length = 150; // so that by the time test2 joins it is not kicked out yet

let _dir1 = Arc::new(tempfile::Builder::new().prefix("sync_nodes_1").tempdir().unwrap());
let dir1 = _dir1.clone();
let _dir2 = Arc::new(tempfile::Builder::new().prefix("sync_nodes_2").tempdir().unwrap());
let dir2 = _dir2.clone();
let _dir3 = Arc::new(tempfile::Builder::new().prefix("sync_nodes_3").tempdir().unwrap());
let dir3 = _dir3.clone();
let _dir4 = Arc::new(tempfile::Builder::new().prefix("sync_nodes_4").tempdir().unwrap());
let dir4 = _dir4.clone();

run_actix(async move {
let (port1, port2, port3, port4) = (
tcp::ListenerAddr::reserve_for_test(),
Expand Down Expand Up @@ -181,14 +197,10 @@ fn sync_state_nodes_multishard() {
near4.client_config.max_block_production_delay =
near1.client_config.max_block_production_delay;

let dir1 = tempfile::Builder::new().prefix("sync_nodes_1").tempdir().unwrap();
let nearcore::NearNode { view_client: view_client1, .. } =
start_with_config(dir1.path(), near1).expect("start_with_config");

let dir3 = tempfile::Builder::new().prefix("sync_nodes_3").tempdir().unwrap();
start_with_config(dir3.path(), near3).expect("start_with_config");

let dir4 = tempfile::Builder::new().prefix("sync_nodes_4").tempdir().unwrap();
start_with_config(dir4.path(), near4).expect("start_with_config");

let view_client2_holder = Arc::new(RwLock::new(None));
Expand All @@ -201,6 +213,7 @@ fn sync_state_nodes_multishard() {
let view_client2_holder2 = view_client2_holder.clone();
let arbiter_holder2 = arbiter_holder2.clone();
let genesis2 = genesis.clone();
let dir2 = dir2.clone();

let actor = view_client1.send(GetBlock::latest().with_span_context());
let actor = actor.then(move |res| {
Expand All @@ -225,10 +238,6 @@ fn sync_state_nodes_multishard() {
("test4", *port4),
]);

let dir2 = tempfile::Builder::new()
.prefix("sync_nodes_2")
.tempdir()
.unwrap();
let nearcore::NearNode {
view_client: view_client2,
arbiters,
Expand Down Expand Up @@ -280,6 +289,10 @@ fn sync_state_nodes_multishard() {
)
.start();
});
drop(_dir1);
drop(_dir2);
drop(_dir3);
drop(_dir4);
});
}

Expand All @@ -298,9 +311,15 @@ fn sync_empty_state() {
);
genesis.config.epoch_length = 20;

let _dir1 = Arc::new(tempfile::Builder::new().prefix("sync_nodes_1").tempdir().unwrap());
let dir1 = _dir1.clone();
let _dir2 = Arc::new(tempfile::Builder::new().prefix("sync_nodes_2").tempdir().unwrap());
let dir2 = _dir2.clone();

run_actix(async move {
let (port1, port2) =
(tcp::ListenerAddr::reserve_for_test(), tcp::ListenerAddr::reserve_for_test());

// State sync triggers when header head is two epochs in the future.
// Produce more blocks to make sure that state sync gets triggered when the second node starts.
let state_sync_horizon = 10;
Expand All @@ -312,10 +331,8 @@ fn sync_empty_state() {
near1.client_config.min_block_production_delay = Duration::milliseconds(200);
near1.client_config.max_block_production_delay = Duration::milliseconds(400);

let dir1 = tempfile::Builder::new().prefix("sync_nodes_1").tempdir().unwrap();
let nearcore::NearNode { view_client: view_client1, .. } =
start_with_config(dir1.path(), near1).expect("start_with_config");
let dir2 = Arc::new(tempfile::Builder::new().prefix("sync_nodes_2").tempdir().unwrap());

let view_client2_holder = Arc::new(RwLock::new(None));
let arbiters_holder = Arc::new(RwLock::new(vec![]));
Expand Down Expand Up @@ -403,6 +420,8 @@ fn sync_empty_state() {
)
.start();
});
drop(_dir1);
drop(_dir2);
});
}

Expand All @@ -426,6 +445,14 @@ fn sync_state_dump() {
// start, sync headers and find a dump of state.
genesis.config.epoch_length = 30;

let _dump_dir =
Arc::new(tempfile::Builder::new().prefix("state_dump_1").tempdir().unwrap());
let dump_dir = _dump_dir.clone();
let _dir1 = Arc::new(tempfile::Builder::new().prefix("sync_nodes_1").tempdir().unwrap());
let dir1 = _dir1.clone();
let _dir2 = Arc::new(tempfile::Builder::new().prefix("sync_nodes_2").tempdir().unwrap());
let dir2 = _dir2.clone();

run_actix(async move {
let (port1, port2) =
(tcp::ListenerAddr::reserve_for_test(), tcp::ListenerAddr::reserve_for_test());
Expand All @@ -440,7 +467,7 @@ fn sync_state_dump() {
near1.client_config.min_block_production_delay = Duration::milliseconds(300);
near1.client_config.max_block_production_delay = Duration::milliseconds(600);
near1.client_config.tracked_shards = vec![0]; // Track all shards.
let dump_dir = tempfile::Builder::new().prefix("state_dump_1").tempdir().unwrap();

near1.client_config.state_sync.dump = Some(DumpConfig {
location: Filesystem { root_dir: dump_dir.path().to_path_buf() },
restart_dump_for_shards: None,
Expand All @@ -449,14 +476,12 @@ fn sync_state_dump() {
});
near1.config.store.state_snapshot_enabled = true;

let dir1 = tempfile::Builder::new().prefix("sync_nodes_1").tempdir().unwrap();
let nearcore::NearNode {
view_client: view_client1,
// State sync dumper should be kept in the scope to avoid dropping it, which stops the state dumper loop.
state_sync_dumper: _dumper,
..
} = start_with_config(dir1.path(), near1).expect("start_with_config");
let dir2 = tempfile::Builder::new().prefix("sync_nodes_2").tempdir().unwrap();

let view_client2_holder = Arc::new(RwLock::new(None));
let arbiters_holder = Arc::new(RwLock::new(vec![]));
Expand Down Expand Up @@ -542,6 +567,9 @@ fn sync_state_dump() {
.unwrap();
System::current().stop();
});
drop(_dump_dir);
drop(_dir1);
drop(_dir2);
});
}

Expand Down Expand Up @@ -741,6 +769,10 @@ fn test_state_sync_headers() {
heavy_test(|| {
init_test_logger();

let _dir1 =
Arc::new(tempfile::Builder::new().prefix("test_state_sync_headers").tempdir().unwrap());
let dir1 = _dir1.clone();

run_actix(async {
let mut genesis = Genesis::test(vec!["test1".parse().unwrap()], 1);
// Increase epoch_length if the test is flaky.
Expand All @@ -750,8 +782,6 @@ fn test_state_sync_headers() {
load_test_config("test1", tcp::ListenerAddr::reserve_for_test(), genesis.clone());
near1.client_config.min_num_peers = 0;
near1.client_config.tracked_shards = vec![0]; // Track all shards.
let dir1 =
tempfile::Builder::new().prefix("test_state_sync_headers").tempdir().unwrap();
near1.config.store.state_snapshot_enabled = true;

let nearcore::NearNode { view_client: view_client1, .. } =
Expand Down Expand Up @@ -924,6 +954,7 @@ fn test_state_sync_headers() {
.unwrap();
System::current().stop();
});
drop(_dir1);
});
}

Expand All @@ -933,6 +964,20 @@ fn test_state_sync_headers_no_tracked_shards() {
heavy_test(|| {
init_test_logger();

let _dir1 = Arc::new(
tempfile::Builder::new()
.prefix("test_state_sync_headers_no_tracked_shards_1")
.tempdir()
.unwrap(),
);
let dir1 = _dir1.clone();
let _dir2 = Arc::new(
tempfile::Builder::new()
.prefix("test_state_sync_headers_no_tracked_shards_2")
.tempdir()
.unwrap(),
);
let dir2 = _dir2.clone();
run_actix(async {
let mut genesis = Genesis::test(vec!["test1".parse().unwrap()], 1);
// Increase epoch_length if the test is flaky.
Expand All @@ -943,10 +988,6 @@ fn test_state_sync_headers_no_tracked_shards() {
let mut near1 = load_test_config("test1", port1, genesis.clone());
near1.client_config.min_num_peers = 0;
near1.client_config.tracked_shards = vec![0]; // Track all shards, it is a validator.
let dir1 = tempfile::Builder::new()
.prefix("test_state_sync_headers_no_tracked_shards_1")
.tempdir()
.unwrap();
near1.config.store.state_snapshot_enabled = false;
near1.config.state_sync_enabled = false;
near1.client_config.state_sync_enabled = false;
Expand All @@ -959,10 +1000,6 @@ fn test_state_sync_headers_no_tracked_shards() {
convert_boot_nodes(vec![("test1", *port1)]);
near2.client_config.min_num_peers = 0;
near2.client_config.tracked_shards = vec![]; // Track no shards.
let dir2 = tempfile::Builder::new()
.prefix("test_state_sync_headers_no_tracked_shards_2")
.tempdir()
.unwrap();
near2.config.store.state_snapshot_enabled = true;
near2.config.state_sync_enabled = false;
near2.client_config.state_sync_enabled = false;
Expand Down Expand Up @@ -1082,5 +1119,7 @@ fn test_state_sync_headers_no_tracked_shards() {
.unwrap();
System::current().stop();
});
drop(_dir1);
drop(_dir2);
});
}

0 comments on commit 0aa1343

Please sign in to comment.