diff --git a/Cargo.lock b/Cargo.lock index f246bd4d8..202a6b25d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1637,6 +1637,8 @@ dependencies = [ "bitflags", "cfg-if", "libc", + "memoffset", + "pin-utils", "static_assertions", ] diff --git a/Makefile b/Makefile index 997fd852a..b042e581a 100644 --- a/Makefile +++ b/Makefile @@ -40,6 +40,7 @@ test-asan: cargo +nightly test -Z build-std --target x86_64-unknown-linux-gnu --features $(RUST_FEATURES) $$packages -- \ --skip reftest_ \ --skip proptest_ \ + --skip fork_test \ --skip sequential_read_large .PHONY: fmt diff --git a/s3-file-connector/Cargo.toml b/s3-file-connector/Cargo.toml index 5a037c98f..0e4966219 100644 --- a/s3-file-connector/Cargo.toml +++ b/s3-file-connector/Cargo.toml @@ -24,7 +24,7 @@ supports-color = "1.3.0" thiserror = "1.0.34" tracing = { version = "0.1.35", default-features = false, features = ["std", "log"] } tracing-subscriber = { version = "0.3.14", features = ["fmt", "env-filter"] } -nix = { version = "0.26.1", default-features = false, features = ["user"] } +nix = "0.26.2" time = "0.3.17" const_format = "0.2.30" diff --git a/s3-file-connector/src/main.rs b/s3-file-connector/src/main.rs index 06ce5e2e7..0e65da27e 100644 --- a/s3-file-connector/src/main.rs +++ b/s3-file-connector/src/main.rs @@ -1,9 +1,16 @@ +use std::fs::File; +use std::io::{Read, Write}; +use std::os::unix::prelude::FromRawFd; use std::path::PathBuf; +use std::thread; +use std::time::Duration; use anyhow::{anyhow, Context as _}; use aws_crt_s3::common::rust_log_adapter::RustLogAdapter; use clap::{ArgGroup, Parser}; use fuser::{BackgroundSession, MountOption, Session}; +use nix::sys::signal::Signal; +use nix::unistd::ForkResult; use s3_client::{AddressingStyle, Endpoint, HeadBucketError, ObjectClientError, S3ClientConfig, S3CrtClient}; use s3_file_connector::fs::S3FilesystemConfig; use s3_file_connector::fuse::S3FuseFilesystem; @@ -90,6 +97,9 @@ struct CliArgs { #[clap(long, help = "File permissions [default: 0644]", value_parser = parse_perm_bits)] pub file_mode: Option, + + #[clap(short, long, help = "Run as foreground process")] + pub foreground: bool, } impl CliArgs { @@ -105,8 +115,6 @@ impl CliArgs { } fn main() -> anyhow::Result<()> { - init_tracing_subscriber(); - let args = CliArgs::parse(); // validate mount point @@ -117,36 +125,123 @@ fn main() -> anyhow::Result<()> { )); } - let mut options = vec![ - MountOption::RO, - MountOption::DefaultPermissions, - MountOption::FSName("fuse_sync".to_string()), - MountOption::NoAtime, - ]; - if args.auto_unmount { - options.push(MountOption::AutoUnmount); - } - if args.allow_root { - options.push(MountOption::AllowRoot); - } - if args.allow_other { - options.push(MountOption::AllowOther); - } + if args.foreground { + // mount file system as a foreground process + init_tracing_subscriber(); + let session = mount(args)?; - let mut filesystem_config = S3FilesystemConfig::default(); - if let Some(uid) = args.uid { - filesystem_config.uid = uid; - } - if let Some(gid) = args.gid { - filesystem_config.gid = gid; - } - if let Some(dir_mode) = args.dir_mode { - filesystem_config.dir_mode = dir_mode; - } - if let Some(file_mode) = args.file_mode { - filesystem_config.file_mode = file_mode; + let (sender, receiver) = std::sync::mpsc::sync_channel(0); + + ctrlc::set_handler(move || { + let _ = sender.send(()); + }) + .context("Failed to install signal handler") + .unwrap(); + + let _ = receiver.recv(); + + drop(session); + } else { + // mount file system as a background process + + // create a pipe for interprocess communication. + // child process will report its status via this pipe. + let (read_fd, write_fd) = nix::unistd::pipe().context("Failed to create a pipe")?; + + // SAFETY: Child process has full ownership of its resources. + // There is no shared data between parent and child processes. + let pid = unsafe { nix::unistd::fork() }; + match pid.expect("Failed to fork mount process") { + ForkResult::Child => { + init_tracing_subscriber(); + + let child_args = CliArgs::parse(); + let session = mount(child_args); + + // close unused file descriptor, we only write from this end. + nix::unistd::close(read_fd).context("Failed to close unused file descriptor")?; + + // SAFETY: `write_fd` is a valid file descriptor. + let mut pipe_file = unsafe { File::from_raw_fd(write_fd) }; + + let status_success = [b'0']; + let status_failure = [b'1']; + + match session { + Ok(_session) => { + pipe_file + .write(&status_success) + .context("Failed to write data to the pipe")?; + drop(pipe_file); + + // the session stays running because its lifetime is bound to the match statement. + // it won't be dropped until after the park. + // also `park()` does not guarantee to remain parked forever. so, we put it inside a loop. + loop { + thread::park(); + } + } + Err(e) => { + pipe_file + .write(&status_failure) + .context("Failed to write data to the pipe")?; + return Err(anyhow!(e)); + } + } + } + ForkResult::Parent { child } => { + init_tracing_subscriber(); + + // close unused file descriptor, we only read from this end. + nix::unistd::close(write_fd).context("Failed to close unused file descriptor")?; + + // SAFETY: `read_fd` is a valid file descriptor. + let mut pipe_file = unsafe { File::from_raw_fd(read_fd) }; + + let (sender, receiver) = std::sync::mpsc::channel(); + + // create a thread that read from the pipe so that we can enforce a time out. + std::thread::spawn(move || { + let mut buf = [0]; + match pipe_file + .read_exact(&mut buf) + .context("Failed to read data from the pipe") + { + Ok(_) => { + let status = buf[0] as char; + sender.send(status).unwrap(); + } + Err(_) => sender.send('1').unwrap(), + } + }); + + let timeout = Duration::from_secs(30); + let status = receiver.recv_timeout(timeout); + match status { + Ok('0') => tracing::debug!("success status flag received from child process"), + Ok(_) => { + nix::sys::wait::waitpid(child, None).context("Failed to wait for child process to exit")?; + return Err(anyhow!("Failed to create mount process")); + } + Err(_timeout_err) => { + // kill child process before returning error. + if let Err(e) = nix::sys::signal::kill(child, Signal::SIGTERM) { + tracing::error!("Unable to kill hanging child process with SIGTERM: {:?}", e); + } + return Err(anyhow!( + "Timeout after {} seconds while waiting for mount process to be ready", + timeout.as_secs() + )); + } + }; + } + } } + Ok(()) +} + +fn mount(args: CliArgs) -> anyhow::Result { let throughput_target_gbps = args.throughput_target_gbps.map(|t| t as f64); let addressing_style = args.addressing_style(); @@ -174,6 +269,20 @@ fn main() -> anyhow::Result<()> { .context("Failed to create S3 client")?; let runtime = client.event_loop_group(); + let mut filesystem_config = S3FilesystemConfig::default(); + if let Some(uid) = args.uid { + filesystem_config.uid = uid; + } + if let Some(gid) = args.gid { + filesystem_config.gid = gid; + } + if let Some(dir_mode) = args.dir_mode { + filesystem_config.dir_mode = dir_mode; + } + if let Some(file_mode) = args.file_mode { + filesystem_config.file_mode = file_mode; + } + let fs = S3FuseFilesystem::new( client, runtime, @@ -182,6 +291,23 @@ fn main() -> anyhow::Result<()> { filesystem_config, ); + let fs_name = String::from("s3-file-connector"); + let mut options = vec![ + MountOption::RO, + MountOption::DefaultPermissions, + MountOption::FSName(fs_name), + MountOption::NoAtime, + ]; + if args.auto_unmount { + options.push(MountOption::AutoUnmount); + } + if args.allow_root { + options.push(MountOption::AllowRoot); + } + if args.allow_other { + options.push(MountOption::AllowOther); + } + let session = Session::new(fs, &args.mount_point, &options).context("Failed to create FUSE session")?; let session = if let Some(thread_count) = args.thread_count { @@ -193,18 +319,7 @@ fn main() -> anyhow::Result<()> { tracing::info!("successfully mounted {:?}", args.mount_point); - let (sender, receiver) = std::sync::mpsc::sync_channel(0); - - ctrlc::set_handler(move || { - let _ = sender.send(()); - }) - .context("Failed to install signal handler")?; - - let _ = receiver.recv(); - - drop(session); - - Ok(()) + Ok(session) } /// Create a client for a bucket in the given region and send a HeadBucket request to validate it's diff --git a/s3-file-connector/tests/common/mod.rs b/s3-file-connector/tests/common/mod.rs index 1d68b3b31..6e76aa49d 100644 --- a/s3-file-connector/tests/common/mod.rs +++ b/s3-file-connector/tests/common/mod.rs @@ -1,6 +1,10 @@ use aws_crt_s3::common::rust_log_adapter::RustLogAdapter; +use aws_sdk_s3::types::ByteStream; +use aws_sdk_s3::Region; use fuser::{FileAttr, FileType}; use futures::executor::ThreadPool; +use rand::rngs::OsRng; +use rand::RngCore; use s3_client::mock_client::{MockClient, MockClientConfig}; use s3_file_connector::fs::{DirectoryReplier, ReadReplier}; use s3_file_connector::{S3Filesystem, S3FilesystemConfig}; @@ -27,6 +31,50 @@ pub fn make_test_filesystem( (client, fs) } +pub fn get_test_bucket_and_prefix(test_name: &str) -> (String, String) { + let bucket = std::env::var("S3_BUCKET_NAME").expect("Set S3_BUCKET_NAME to run integration tests"); + + // Generate a random nonce to make sure this prefix is truly unique + let nonce = OsRng.next_u64(); + + // Prefix always has a trailing "/" to keep meaning in sync with the S3 API. + let prefix = std::env::var("S3_BUCKET_TEST_PREFIX").expect("Set S3_BUCKET_TEST_PREFIX to run integration tests"); + assert!(prefix.ends_with('/'), "S3_BUCKET_TEST_PREFIX should end in '/'"); + + let prefix = format!("{prefix}{test_name}/{nonce}/"); + + (bucket, prefix) +} + +pub fn get_test_bucket_forbidden() -> String { + std::env::var("S3_FORBIDDEN_BUCKET_NAME").expect("Set S3_FORBIDDEN_BUCKET_NAME to run integration tests") +} + +pub fn get_test_region() -> String { + std::env::var("S3_REGION").expect("Set S3_REGION to run integration tests") +} + +pub fn create_objects(bucket: &str, prefix: &str, region: &str, key: &str, value: &[u8]) { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_io() + .enable_time() + .build() + .unwrap(); + + let config = runtime.block_on(aws_config::from_env().region(Region::new(region.to_string())).load()); + let sdk_client = aws_sdk_s3::Client::new(&config); + // runtime.block_on(client.list_buckets()); + let full_key = format!("{prefix}{key}"); + let _ = runtime.block_on( + sdk_client + .put_object() + .bucket(bucket) + .key(full_key) + .body(ByteStream::from(value.to_vec())) + .send(), + ); +} + #[track_caller] pub fn assert_attr(attr: FileAttr, ftype: FileType, size: u64, uid: u32, gid: u32, perm: u16) { assert_eq!(attr.kind, ftype); diff --git a/s3-file-connector/tests/fuse_tests/fork_test.rs b/s3-file-connector/tests/fuse_tests/fork_test.rs new file mode 100644 index 000000000..f81da22b2 --- /dev/null +++ b/s3-file-connector/tests/fuse_tests/fork_test.rs @@ -0,0 +1,276 @@ +use assert_cmd::prelude::*; // Add methods on commands +use std::fs; +use std::io::{BufRead, BufReader}; +use std::process::Stdio; +use std::{path::PathBuf, process::Command}; // Run programs + +use crate::common::{create_objects, get_test_bucket_and_prefix, get_test_bucket_forbidden, get_test_region}; + +const MAX_WAIT_DURATION: std::time::Duration = std::time::Duration::from_secs(10); + +#[cfg(feature = "fuse_tests")] +#[test] +fn run_in_background() -> Result<(), Box> { + let (bucket, prefix) = get_test_bucket_and_prefix("test_run_in_background"); + let region = get_test_region(); + let mount_point = assert_fs::TempDir::new()?; + + let mut cmd = Command::cargo_bin("s3-file-connector")?; + let mut child = cmd + .arg(&bucket) + .arg(mount_point.path()) + .arg(format!("--prefix={prefix}")) + .arg("--auto-unmount") + .spawn() + .expect("unable to spawn child"); + + let st = std::time::Instant::now(); + + let exit_status = loop { + if st.elapsed() > MAX_WAIT_DURATION { + panic!("wait for result timeout") + } + match child.try_wait().expect("unable to wait for result") { + Some(result) => break result, + None => std::thread::sleep(std::time::Duration::from_millis(100)), + } + }; + + // verify mount status and mount entry + assert!(exit_status.success()); + assert!(mount_exists("s3-file-connector", mount_point.path().to_str().unwrap())); + + test_read_files(&bucket, &prefix, ®ion, &mount_point.to_path_buf()); + + Ok(()) +} + +#[cfg(feature = "fuse_tests")] +#[test] +fn run_in_foreground() -> Result<(), Box> { + let (bucket, prefix) = get_test_bucket_and_prefix("test_run_in_foreground"); + let region = get_test_region(); + let mount_point = assert_fs::TempDir::new()?; + + let mut cmd = Command::cargo_bin("s3-file-connector")?; + let mut child = cmd + .arg(&bucket) + .arg(mount_point.path()) + .arg(format!("--prefix={prefix}")) + .arg("--auto-unmount") + .arg("--foreground") + .spawn() + .expect("unable to spawn child"); + + let st = std::time::Instant::now(); + + loop { + if st.elapsed() > MAX_WAIT_DURATION { + panic!("wait for result timeout") + } + if mount_exists("s3-file-connector", mount_point.path().to_str().unwrap()) { + break; + } + std::thread::sleep(std::time::Duration::from_millis(100)); + } + + // verify that process is still alive + let child_status = child.try_wait().unwrap(); + assert_eq!(None, child_status); + + assert!(mount_exists("s3-file-connector", mount_point.path().to_str().unwrap())); + + test_read_files(&bucket, &prefix, ®ion, &mount_point.to_path_buf()); + + Ok(()) +} + +#[cfg(feature = "fuse_tests")] +#[test] +fn run_in_background_fail_on_mount() -> Result<(), Box> { + // the mount would fail from error 403 on HeadBucket + let bucket = get_test_bucket_forbidden(); + let mount_point = assert_fs::TempDir::new()?; + + let mut cmd = Command::cargo_bin("s3-file-connector")?; + let mut child = cmd + .arg(&bucket) + .arg(mount_point.path()) + .arg("--auto-unmount") + .spawn() + .expect("unable to spawn child"); + + let st = std::time::Instant::now(); + + let exit_status = loop { + if st.elapsed() > MAX_WAIT_DURATION { + panic!("wait for result timeout") + } + match child.try_wait().expect("unable to wait for result") { + Some(result) => break result, + None => std::thread::sleep(std::time::Duration::from_millis(100)), + } + }; + + // verify mount status and mount entry + assert!(!exit_status.success()); + assert!(!mount_exists("s3-file-connector", mount_point.path().to_str().unwrap())); + + Ok(()) +} + +#[cfg(feature = "fuse_tests")] +#[test] +fn run_in_foreground_fail_on_mount() -> Result<(), Box> { + // the mount would fail from error 403 on HeadBucket + let bucket = get_test_bucket_forbidden(); + let mount_point = assert_fs::TempDir::new()?; + + let mut cmd = Command::cargo_bin("s3-file-connector")?; + let mut child = cmd + .arg(&bucket) + .arg(mount_point.path()) + .arg("--auto-unmount") + .arg("--foreground") + .spawn() + .expect("unable to spawn child"); + + let st = std::time::Instant::now(); + + let exit_status = loop { + if st.elapsed() > MAX_WAIT_DURATION { + panic!("wait for result timeout") + } + match child.try_wait().expect("unable to wait for result") { + Some(result) => break result, + None => std::thread::sleep(std::time::Duration::from_millis(100)), + } + }; + + // verify mount status and mount entry + assert!(!exit_status.success()); + assert!(!mount_exists("s3-file-connector", mount_point.path().to_str().unwrap())); + + Ok(()) +} + +#[cfg(feature = "fuse_tests")] +#[test] +fn run_fail_on_duplicate_mount() -> Result<(), Box> { + let (bucket, prefix) = get_test_bucket_and_prefix("run_fail_on_duplicate_mount"); + let mount_point = assert_fs::TempDir::new()?; + + let mut cmd = Command::cargo_bin("s3-file-connector")?; + let mut first_mount = cmd + .arg(&bucket) + .arg(mount_point.path()) + .arg(format!("--prefix={prefix}")) + .arg("--auto-unmount") + .spawn() + .expect("unable to spawn child"); + + let st = std::time::Instant::now(); + + let exit_status = loop { + if st.elapsed() > MAX_WAIT_DURATION { + panic!("wait for result timeout") + } + match first_mount.try_wait().expect("unable to wait for result") { + Some(result) => break result, + None => std::thread::sleep(std::time::Duration::from_millis(100)), + } + }; + + // verify mount status and mount entry + assert!(exit_status.success()); + assert!(mount_exists("s3-file-connector", mount_point.path().to_str().unwrap())); + + let mut cmd = Command::cargo_bin("s3-file-connector")?; + let mut second_mount = cmd + .arg(&bucket) + .arg(mount_point.path()) + .arg(format!("--prefix={prefix}")) + .arg("--auto-unmount") + .spawn() + .expect("unable to spawn child"); + + let st = std::time::Instant::now(); + + let exit_status = loop { + if st.elapsed() > MAX_WAIT_DURATION { + panic!("wait for result timeout") + } + match second_mount.try_wait().expect("unable to wait for result") { + Some(result) => break result, + None => std::thread::sleep(std::time::Duration::from_millis(100)), + } + }; + + // verify mount status + assert!(!exit_status.success()); + + Ok(()) +} + +fn test_read_files(bucket: &str, prefix: &str, region: &str, mount_point: &PathBuf) { + // create objects for test + create_objects(bucket, prefix, region, "file1.txt", b"hello world"); + create_objects(bucket, prefix, region, "dir/file2.txt", b"hello world"); + + // verify readdir works on mount point + let dir = fs::read_dir(mount_point).unwrap(); + let dirs: Vec<_> = dir.map(|f| f.unwrap()).collect(); + assert_eq!( + dirs.iter() + .map(|f| f.path().file_name().unwrap().to_str().unwrap().to_owned()) + .collect::>(), + vec!["dir", "file1.txt"] + ); + + // verify readdir works + let dir = fs::read_dir(mount_point.as_path().join("dir")).unwrap(); + let dirs: Vec<_> = dir.map(|f| f.unwrap()).collect(); + assert_eq!( + dirs.iter() + .map(|f| f.path().file_name().unwrap().to_str().unwrap().to_owned()) + .collect::>(), + vec!["file2.txt"] + ); + + // verify read file works + let file_content = fs::read_to_string(mount_point.as_path().join("file1.txt")).unwrap(); + assert_eq!(file_content, "hello world"); + + let file_content = fs::read_to_string(mount_point.as_path().join("dir/file2.txt")).unwrap(); + assert_eq!(file_content, "hello world"); +} + +/// Read all mount records in the system and find the one that matches given arguments. +/// # Arguments +/// +/// * `source` - name of the file system. +/// * `mount_point` - path to the mount point. +fn mount_exists(source: &str, mount_point: &str) -> bool { + // macOS wrap its temp directory under /private but it's not visible to users + #[cfg(target_os = "macos")] + let mount_point = format!("/private{}", mount_point); + + let mut cmd = Command::new("mount"); + #[cfg(target_os = "linux")] + cmd.arg("-l"); + let mut cmd = cmd.stdout(Stdio::piped()).spawn().expect("Unable to spawn mount tool"); + + let stdout = cmd.stdout.as_mut().unwrap(); + let stdout_reader = BufReader::new(stdout); + let stdout_lines = stdout_reader.lines(); + + for line in stdout_lines.flatten() { + let str: Vec<&str> = line.split_whitespace().collect(); + let source_rec = str[0]; + let mount_point_rec = str[2]; + if source_rec == source && mount_point_rec == mount_point { + return true; + } + } + false +} diff --git a/s3-file-connector/tests/fuse_tests/mod.rs b/s3-file-connector/tests/fuse_tests/mod.rs index 975fa2f29..568f2de8c 100644 --- a/s3-file-connector/tests/fuse_tests/mod.rs +++ b/s3-file-connector/tests/fuse_tests/mod.rs @@ -1,3 +1,4 @@ +mod fork_test; mod lookup_test; mod mount_test; mod perm_test; @@ -66,14 +67,14 @@ mod mock_session { #[cfg(feature = "s3_tests")] mod s3_session { + use crate::common::{get_test_bucket_and_prefix, get_test_region}; + use super::*; use std::future::Future; use aws_sdk_s3::types::ByteStream; use aws_sdk_s3::Region; - use rand::rngs::OsRng; - use rand::RngCore as _; use s3_client::{S3ClientConfig, S3CrtClient}; /// Create a FUSE mount backed by a real S3 client @@ -122,26 +123,6 @@ mod s3_session { (mount_dir, session, Box::new(put_object)) } - fn get_test_bucket_and_prefix(test_name: &str) -> (String, String) { - let bucket = std::env::var("S3_BUCKET_NAME").expect("Set S3_BUCKET_NAME to run integration tests"); - - // Generate a random nonce to make sure this prefix is truly unique - let nonce = OsRng.next_u64(); - - // Prefix always has a trailing "/" to keep meaning in sync with the S3 API. - let prefix = - std::env::var("S3_BUCKET_TEST_PREFIX").expect("Set S3_BUCKET_TEST_PREFIX to run integration tests"); - assert!(prefix.ends_with('/'), "S3_BUCKET_TEST_PREFIX should end in '/'"); - - let prefix = format!("{prefix}{test_name}/{nonce}/"); - - (bucket, prefix) - } - - fn get_test_region() -> String { - std::env::var("S3_REGION").expect("Set S3_REGION to run integration tests") - } - async fn get_test_sdk_client(region: &str) -> aws_sdk_s3::Client { let config = aws_config::from_env() .region(Region::new(region.to_string()))