Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Ensure local snapshot staging directory exists for restore #2449

Merged
merged 3 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions crates/types/src/config/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,10 @@ impl StorageOptions {
pub fn data_dir(&self) -> PathBuf {
super::data_dir("db")
}

pub fn snapshots_staging_dir(&self) -> PathBuf {
super::data_dir("pp-snapshots")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that super::data_dir is feature gated by #[cfg(any(test, feature = "test-util"))] won't this mean it should not be used outside of tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hadn't noticed that, just copied what we already do in data_dir(..) just above. Doesn't seem to hurt - but I'm somewhat surprised that this works. And we already use that and node_dir() from production code elsewhere.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, i am surprised that it's working to!

}
}

impl Default for StorageOptions {
Expand Down
2 changes: 1 addition & 1 deletion crates/worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl Worker {
bifrost,
SnapshotRepository::create_if_configured(
snapshots_options,
config.common.base_dir().join("pp-snapshots"),
config.worker.storage.snapshots_staging_dir(),
config.common.cluster_name().to_owned(),
)
.await
Expand Down
4 changes: 4 additions & 0 deletions crates/worker/src/partition/snapshots/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,10 @@ impl SnapshotRepository {
return Ok(None); // perhaps this needs to be a configuration error
}

if !self.staging_dir.exists() {
std::fs::create_dir_all(&self.staging_dir)?;
}

Comment on lines +408 to +411
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this make more sense if moved to create_if_configured ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I deliberately put it closer to the point where we need to use it; a couple of reasons:

  • we won't create it if a snapshots destination is configured, but the node only ever produces snapshots, and never downloads any
  • we will (re-)create it just-in-time, in case someone or something helpfully deletes the empty staging directory created on startup

// The snapshot ingest directory should be on the same filesystem as the partition store
// to minimize IO and disk space usage during import.
let snapshot_dir = TempDir::with_prefix_in(
Expand Down
7 changes: 6 additions & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ toml = { version = "0.8.12" }
tracing = { workspace = true }
tracing-panic = { version = "0.1.2" }
regex = "1.10.4"
url = { version = "2.5.4", features = [] }

[dev-dependencies]
restate-admin = { workspace = true, features = ["memory-loglet"] }
restate-admin = { workspace = true, features = ["memory-loglet", "clients"] }
restate-bifrost = { workspace = true, features = ["test-util"] }
restate-core = { workspace = true, features = ["test-util"] }
restate-local-cluster-runner = { workspace = true }
Expand All @@ -76,9 +77,13 @@ anyhow = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
googletest = { workspace = true }
hyper-util = { workspace = true }
tempfile = { workspace = true }
test-log = { workspace = true }
tonic = { workspace = true, features = ["transport", "prost"] }
tower = { workspace = true }
tracing-subscriber = { workspace = true }
url = { workspace = true }

[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = { workspace = true, features = ["unprefixed_malloc_on_supported_platforms", "profiling"] }
Expand Down
173 changes: 173 additions & 0 deletions server/tests/snapshots.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// Copyright (c) 2024 - 2025 Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::time::Duration;

use enumset::enum_set;
use futures_util::StreamExt;
use googletest::fail;
use hyper_util::rt::TokioIo;
use regex::Regex;
use tempfile::TempDir;
use test_log::test;
use tokio::io;
use tokio::net::UnixStream;
use tonic::codec::CompressionEncoding;
use tonic::transport::{Channel, Endpoint, Uri};
use tower::service_fn;
use url::Url;

use restate_admin::cluster_controller::protobuf::cluster_ctrl_svc_client::ClusterCtrlSvcClient;
use restate_admin::cluster_controller::protobuf::{
ClusterStateRequest, CreatePartitionSnapshotRequest,
};
use restate_local_cluster_runner::{
cluster::Cluster,
node::{BinarySource, Node},
};
use restate_types::config::{LogFormat, MetadataStoreClient};
use restate_types::net::AdvertisedAddress;
use restate_types::protobuf::cluster::node_state::State;
use restate_types::protobuf::cluster::{NodeState, RunMode};

Check failure on line 38 in server/tests/snapshots.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

unused imports: `NodeState` and `RunMode`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running cargo clippy --all-targets --workspace -- -D warnings usually helps with this kind of CI errors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I normally do just lint but point taken - in this case, I did not :-)

use restate_types::{config::Configuration, nodes_config::Role};

mod common;

#[test(restate_core::test)]
async fn create_and_restore_snapshot() -> googletest::Result<()> {
let mut base_config = Configuration::default();
base_config.common.bootstrap_num_partitions = 1.try_into()?;
base_config.common.log_filter = "restate=debug,warn".to_owned();
base_config.common.log_format = LogFormat::Compact;

let snapshots_dir = TempDir::new()?;
base_config.worker.snapshots.destination = Some(
Url::from_file_path(snapshots_dir.path())
.unwrap()
.to_string(),
);

let nodes = Node::new_test_nodes_with_metadata(
base_config.clone(),
BinarySource::CargoTest,
enum_set!(Role::Worker),
1,
);

let mut partition_ready = nodes[1].lines(Regex::new("Won the leadership campaign")?);

let cluster = Cluster::builder()
.temp_base_dir()
.nodes(nodes.clone())
.build()
.start()
.await?;

cluster.wait_healthy(Duration::from_secs(30)).await?;
assert!(partition_ready.next().await.is_some());

let mut client =
ClusterCtrlSvcClient::new(grpc_connect(cluster.nodes[0].node_address().clone()).await?)
.accept_compressed(CompressionEncoding::Gzip);

any_partition_active(&mut client, Duration::from_secs(5)).await?;

let snapshot_response = client
.create_partition_snapshot(CreatePartitionSnapshotRequest { partition_id: 0 })
.await?
.into_inner();

let mut node_2 = Node::new_test_node(
"node-2",
base_config,
BinarySource::CargoTest,
enum_set!(Role::Worker),
);
*node_2.metadata_store_client_mut() = MetadataStoreClient::Embedded {
address: cluster.nodes[0].node_address().clone(),
};

let mut snapshot_restored = node_2.lines(
format!(
"Importing partition store snapshot.*{}",
snapshot_response.snapshot_id
)
.parse()?,
);

node_2
.start_clustered(cluster.base_dir(), cluster.cluster_name())
.await?;

assert!(snapshot_restored.next().await.is_some());
Ok(())
}

async fn any_partition_active(
client: &mut ClusterCtrlSvcClient<Channel>,
timeout: Duration,
) -> googletest::Result<()> {
let deadline = tokio::time::Instant::now() + timeout;
loop {
let cluster_state = client
.get_cluster_state(ClusterStateRequest {})
.await?
.into_inner()
.cluster_state
.unwrap();

if cluster_state

Check failure on line 126 in server/tests/snapshots.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

called `is_some()` after searching an `Iterator` with `find`
.nodes
.values()
.find(|n| {
n.state.as_ref().is_some_and(|s| match s {
State::Alive(s) => s
.partitions
.values()
.find(|p| p.effective_mode.cmp(&1).is_eq())

Check failure on line 134 in server/tests/snapshots.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

called `is_some()` after searching an `Iterator` with `find`
.is_some(),
_ => false,
})
})
.is_some()
{
break; // partition is ready; we can request snapshot
}
if tokio::time::Instant::now() > deadline {
fail!("Partition processor did not become ready within the timeout");

Check failure on line 144 in server/tests/snapshots.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

unused `std::result::Result` that must be used

Check failure on line 144 in server/tests/snapshots.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

unused return value of `any_partition_active::{closure#0}::create_fail_result` that must be used
}
tokio::time::sleep(Duration::from_millis(250)).await;
}
Ok(())
}

async fn grpc_connect(address: AdvertisedAddress) -> Result<Channel, tonic::transport::Error> {
match address {
AdvertisedAddress::Uds(uds_path) => {
// dummy endpoint required to specify an uds connector, it is not used anywhere
Endpoint::try_from("http://127.0.0.1")
.expect("/ should be a valid Uri")
.connect_with_connector(service_fn(move |_: Uri| {
let uds_path = uds_path.clone();
async move {
Ok::<_, io::Error>(TokioIo::new(UnixStream::connect(uds_path).await?))
}
})).await
}
AdvertisedAddress::Http(uri) => {
Channel::builder(uri)
.connect_timeout(Duration::from_secs(2))
.timeout(Duration::from_secs(2))
.http2_adaptive_window(true)
.connect()
.await
}
}
}
Loading