-
Notifications
You must be signed in to change notification settings - Fork 44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix: Ensure local snapshot staging directory exists for restore #2449
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -405,6 +405,10 @@ impl SnapshotRepository { | |
return Ok(None); // perhaps this needs to be a configuration error | ||
} | ||
|
||
if !self.staging_dir.exists() { | ||
std::fs::create_dir_all(&self.staging_dir)?; | ||
} | ||
|
||
Comment on lines
+408
to
+411
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would this make more sense if moved to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I deliberately put it closer to the point where we need to use it; a couple of reasons:
|
||
// The snapshot ingest directory should be on the same filesystem as the partition store | ||
// to minimize IO and disk space usage during import. | ||
let snapshot_dir = TempDir::with_prefix_in( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,169 @@ | ||
// Copyright (c) 2024 - 2025 Restate Software, Inc., Restate GmbH. | ||
// All rights reserved. | ||
// | ||
// Use of this software is governed by the Business Source License | ||
// included in the LICENSE file. | ||
// | ||
// As of the Change Date specified in that file, in accordance with | ||
// the Business Source License, use of this software will be governed | ||
// by the Apache License, Version 2.0. | ||
|
||
use std::time::Duration; | ||
|
||
use enumset::enum_set; | ||
use futures_util::StreamExt; | ||
use googletest::fail; | ||
use hyper_util::rt::TokioIo; | ||
use regex::Regex; | ||
use tempfile::TempDir; | ||
use test_log::test; | ||
use tokio::io; | ||
use tokio::net::UnixStream; | ||
use tonic::codec::CompressionEncoding; | ||
use tonic::transport::{Channel, Endpoint, Uri}; | ||
use tower::service_fn; | ||
use url::Url; | ||
|
||
use restate_admin::cluster_controller::protobuf::cluster_ctrl_svc_client::ClusterCtrlSvcClient; | ||
use restate_admin::cluster_controller::protobuf::{ | ||
ClusterStateRequest, CreatePartitionSnapshotRequest, | ||
}; | ||
use restate_local_cluster_runner::{ | ||
cluster::Cluster, | ||
node::{BinarySource, Node}, | ||
}; | ||
use restate_types::config::{LogFormat, MetadataStoreClient}; | ||
use restate_types::net::AdvertisedAddress; | ||
use restate_types::protobuf::cluster::node_state::State; | ||
use restate_types::{config::Configuration, nodes_config::Role}; | ||
|
||
mod common; | ||
|
||
#[test(restate_core::test)] | ||
async fn create_and_restore_snapshot() -> googletest::Result<()> { | ||
let mut base_config = Configuration::default(); | ||
base_config.common.bootstrap_num_partitions = 1.try_into()?; | ||
base_config.common.log_filter = "restate=debug,warn".to_owned(); | ||
base_config.common.log_format = LogFormat::Compact; | ||
|
||
let snapshots_dir = TempDir::new()?; | ||
base_config.worker.snapshots.destination = Some( | ||
Url::from_file_path(snapshots_dir.path()) | ||
.unwrap() | ||
.to_string(), | ||
); | ||
|
||
let nodes = Node::new_test_nodes_with_metadata( | ||
base_config.clone(), | ||
BinarySource::CargoTest, | ||
enum_set!(Role::Worker), | ||
1, | ||
); | ||
|
||
let mut partition_ready = nodes[1].lines(Regex::new("Won the leadership campaign")?); | ||
|
||
let cluster = Cluster::builder() | ||
.temp_base_dir() | ||
.nodes(nodes.clone()) | ||
.build() | ||
.start() | ||
.await?; | ||
|
||
cluster.wait_healthy(Duration::from_secs(30)).await?; | ||
assert!(partition_ready.next().await.is_some()); | ||
|
||
let mut client = | ||
ClusterCtrlSvcClient::new(grpc_connect(cluster.nodes[0].node_address().clone()).await?) | ||
.accept_compressed(CompressionEncoding::Gzip); | ||
|
||
any_partition_active(&mut client, Duration::from_secs(5)).await?; | ||
|
||
let snapshot_response = client | ||
.create_partition_snapshot(CreatePartitionSnapshotRequest { partition_id: 0 }) | ||
.await? | ||
.into_inner(); | ||
|
||
let mut node_2 = Node::new_test_node( | ||
"node-2", | ||
base_config, | ||
BinarySource::CargoTest, | ||
enum_set!(Role::Worker), | ||
); | ||
*node_2.metadata_store_client_mut() = MetadataStoreClient::Embedded { | ||
address: cluster.nodes[0].node_address().clone(), | ||
}; | ||
|
||
let mut snapshot_restored = node_2.lines( | ||
format!( | ||
"Importing partition store snapshot.*{}", | ||
snapshot_response.snapshot_id | ||
) | ||
.parse()?, | ||
); | ||
|
||
node_2 | ||
.start_clustered(cluster.base_dir(), cluster.cluster_name()) | ||
.await?; | ||
|
||
assert!(snapshot_restored.next().await.is_some()); | ||
Ok(()) | ||
} | ||
|
||
async fn any_partition_active( | ||
client: &mut ClusterCtrlSvcClient<Channel>, | ||
timeout: Duration, | ||
) -> googletest::Result<()> { | ||
let deadline = tokio::time::Instant::now() + timeout; | ||
loop { | ||
let cluster_state = client | ||
.get_cluster_state(ClusterStateRequest {}) | ||
.await? | ||
.into_inner() | ||
.cluster_state | ||
.unwrap(); | ||
|
||
if cluster_state.nodes.values().any(|n| { | ||
n.state.as_ref().is_some_and(|s| match s { | ||
State::Alive(s) => s | ||
.partitions | ||
.values() | ||
.any(|p| p.effective_mode.cmp(&1).is_eq()), | ||
_ => false, | ||
}) | ||
}) { | ||
break; // partition is ready; we can request snapshot | ||
} | ||
if tokio::time::Instant::now() > deadline { | ||
fail!( | ||
"Partition processor did not become ready within {:?}", | ||
timeout | ||
)?; | ||
} | ||
tokio::time::sleep(Duration::from_millis(250)).await; | ||
} | ||
Ok(()) | ||
} | ||
|
||
async fn grpc_connect(address: AdvertisedAddress) -> Result<Channel, tonic::transport::Error> { | ||
match address { | ||
AdvertisedAddress::Uds(uds_path) => { | ||
// dummy endpoint required to specify an uds connector, it is not used anywhere | ||
Endpoint::try_from("http://127.0.0.1") | ||
.expect("/ should be a valid Uri") | ||
.connect_with_connector(service_fn(move |_: Uri| { | ||
let uds_path = uds_path.clone(); | ||
async move { | ||
Ok::<_, io::Error>(TokioIo::new(UnixStream::connect(uds_path).await?)) | ||
} | ||
})).await | ||
} | ||
AdvertisedAddress::Http(uri) => { | ||
Channel::builder(uri) | ||
.connect_timeout(Duration::from_secs(2)) | ||
.timeout(Duration::from_secs(2)) | ||
.http2_adaptive_window(true) | ||
.connect() | ||
.await | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed that
super::data_dir
is feature gated by#[cfg(any(test, feature = "test-util"))]
won't this mean it should not be used outside of tests?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hadn't noticed that, just copied what we already do in
data_dir(..)
just above. Doesn't seem to hurt - but I'm somewhat surprised that this works. And we already use that andnode_dir()
from production code elsewhere.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, i am surprised that it's working to!