diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 2222486..c9a1cd3 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -41,10 +41,10 @@ jobs: run: cargo test --no-default-features --verbose - name: Run Tests (ssl cross) - run: | - cargo test --no-default-features --features ureq,native-tls - cargo test --no-default-features --features ureq,rustls-tls - cargo test --no-default-features --features tokio,native-tls + run: > + cargo test --no-default-features --features ureq,native-tls && + cargo test --no-default-features --features ureq,rustls-tls && + cargo test --no-default-features --features tokio,native-tls && cargo test --no-default-features --features tokio,rustls-tls - name: Run Audit diff --git a/Cargo.toml b/Cargo.toml index bd7ae95..acf6224 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,15 @@ ureq = { version = "2.8.0", optional = true, features = [ ] } native-tls = { version = "0.2.12", optional = true } +[target.'cfg(windows)'.dependencies.windows-sys] +version = "0.59" +features = ["Win32_Foundation", "Win32_Storage_FileSystem", "Win32_System_IO"] +optional = true + +[target.'cfg(unix)'.dependencies.libc] +version = "0.2" +optional = true + [features] default = ["default-tls", "tokio", "ureq"] # These features are only relevant when used with the `tokio` feature, but this might change in the future. @@ -58,6 +67,8 @@ tokio = [ "dep:thiserror", "dep:tokio", "tokio/rt-multi-thread", + "dep:libc", + "dep:windows-sys", ] ureq = [ "dep:http", @@ -67,6 +78,8 @@ ureq = [ "dep:serde_json", "dep:thiserror", "dep:ureq", + "dep:libc", + "dep:windows-sys", ] [dev-dependencies] diff --git a/src/api/sync.rs b/src/api/sync.rs index cd2c4b4..1819f13 100644 --- a/src/api/sync.rs +++ b/src/api/sync.rs @@ -11,7 +11,6 @@ use std::io::Seek; use std::num::ParseIntError; use std::path::{Component, Path, PathBuf}; use std::str::FromStr; -use std::time::Duration; use thiserror::Error; use ureq::{Agent, AgentBuilder, Request}; @@ -70,38 +69,90 @@ impl HeaderAgent { } } -#[derive(Debug)] struct Handle { - _file: std::fs::File, - path: PathBuf, + file: std::fs::File, } + impl Drop for Handle { fn drop(&mut self) { - std::fs::remove_file(&self.path).expect("Removing lockfile") + unlock(&self.file); } } fn lock_file(mut path: PathBuf) -> Result { path.set_extension("lock"); - let mut lock_handle = None; - for i in 0..30 { - match std::fs::File::create_new(path.clone()) { - Ok(handle) => { - lock_handle = Some(handle); - break; - } - _ => { - if i == 0 { - log::warn!("Waiting for lock {path:?}"); - } - std::thread::sleep(Duration::from_secs(1)); - } + let file = std::fs::File::create(path.clone())?; + let mut res = lock(&file); + for _ in 0..5 { + if res == 0 { + break; + } + std::thread::sleep(std::time::Duration::from_secs(1)); + res = lock(&file); + } + if res != 0 { + Err(ApiError::LockAcquisition(path)) + } else { + Ok(Handle { file }) + } +} + +#[cfg(target_family = "unix")] +mod unix { + use std::os::fd::AsRawFd; + + pub(crate) fn lock(file: &std::fs::File) -> i32 { + unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX | libc::LOCK_NB) } + } + pub(crate) fn unlock(file: &std::fs::File) -> i32 { + unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_UN) } + } +} +#[cfg(target_family = "unix")] +use unix::{lock, unlock}; + +#[cfg(target_family = "windows")] +mod windows { + use std::os::windows::io::AsRawHandle; + use windows_sys::Win32::Foundation::HANDLE; + use windows_sys::Win32::Storage::FileSystem::{ + LockFileEx, UnlockFile, LOCKFILE_EXCLUSIVE_LOCK, LOCKFILE_FAIL_IMMEDIATELY, + }; + + pub(crate) fn lock(file: &std::fs::File) -> i32 { + unsafe { + let mut overlapped = std::mem::zeroed(); + let flags = LOCKFILE_EXCLUSIVE_LOCK | LOCKFILE_FAIL_IMMEDIATELY; + let res = LockFileEx( + file.as_raw_handle() as HANDLE, + flags, + 0, + !0, + !0, + &mut overlapped, + ); + 1 - res } } - let _file = lock_handle.ok_or_else(|| ApiError::LockAcquisition(path.clone()))?; - Ok(Handle { path, _file }) + pub(crate) fn unlock(file: &std::fs::File) -> i32 { + unsafe { UnlockFile(file.as_raw_handle() as HANDLE, 0, 0, !0, !0) } + } +} +#[cfg(target_family = "windows")] +use windows::{lock, unlock}; + +#[cfg(not(any(target_family = "unix", target_family = "windows")))] +mod other { + pub(crate) fn lock(file: &std::fs::File) -> i32 { + 0 + } + pub(crate) fn unlock(file: &std::fs::File) -> i32 { + 0 + } } +#[cfg(not(any(target_family = "unix", target_family = "windows")))] +use other::{lock, unlock}; #[derive(Debug, Error)] /// All errors the API can throw @@ -688,7 +739,7 @@ impl ApiRepo { .blob_path(&metadata.etag); std::fs::create_dir_all(blob_path.parent().unwrap())?; - let lock = lock_file(blob_path.clone())?; + let lock = lock_file(blob_path.clone()).unwrap(); let mut tmp_path = blob_path.clone(); tmp_path.set_extension(EXTENSION); let tmp_filename = @@ -769,6 +820,7 @@ mod tests { use serde_json::{json, Value}; use sha2::{Digest, Sha256}; use std::io::{Seek, SeekFrom, Write}; + use std::time::Duration; struct TempDir { path: PathBuf, diff --git a/src/api/tokio.rs b/src/api/tokio.rs index 0677f2f..69070a2 100644 --- a/src/api/tokio.rs +++ b/src/api/tokio.rs @@ -18,7 +18,6 @@ use std::collections::BinaryHeap; use std::num::ParseIntError; use std::path::{Component, Path, PathBuf}; use std::sync::Arc; -use std::time::Duration; use thiserror::Error; use tokio::io::AsyncReadExt; use tokio::io::{AsyncSeekExt, AsyncWriteExt, SeekFrom}; @@ -65,36 +64,89 @@ impl Progress for () { } struct Handle { - _file: tokio::fs::File, - path: PathBuf, + file: tokio::fs::File, } + impl Drop for Handle { fn drop(&mut self) { - std::fs::remove_file(&self.path).expect("Removing lockfile") + unlock(&self.file); } } async fn lock_file(mut path: PathBuf) -> Result { path.set_extension("lock"); - let mut lock_handle = None; - for i in 0..30 { - match tokio::fs::File::create_new(path.clone()).await { - Ok(handle) => { - lock_handle = Some(handle); - break; - } - Err(_err) => { - if i == 0 { - log::warn!("Waiting for lock {path:?}"); - } - tokio::time::sleep(Duration::from_secs(1)).await; - } + let file = tokio::fs::File::create(path.clone()).await?; + let mut res = lock(&file); + for _ in 0..5 { + if res == 0 { + break; + } + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + res = lock(&file); + } + if res != 0 { + Err(ApiError::LockAcquisition(path)) + } else { + Ok(Handle { file }) + } +} + +#[cfg(target_family = "unix")] +mod unix { + use std::os::fd::AsRawFd; + + pub(crate) fn lock(file: &tokio::fs::File) -> i32 { + unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX | libc::LOCK_NB) } + } + pub(crate) fn unlock(file: &tokio::fs::File) -> i32 { + unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_UN) } + } +} +#[cfg(target_family = "unix")] +use unix::{lock, unlock}; + +#[cfg(target_family = "windows")] +mod windows { + use std::os::windows::io::AsRawHandle; + use windows_sys::Win32::Foundation::HANDLE; + use windows_sys::Win32::Storage::FileSystem::{ + LockFileEx, UnlockFile, LOCKFILE_EXCLUSIVE_LOCK, LOCKFILE_FAIL_IMMEDIATELY, + }; + + pub(crate) fn lock(file: &tokio::fs::File) -> i32 { + unsafe { + let mut overlapped = std::mem::zeroed(); + let flags = LOCKFILE_EXCLUSIVE_LOCK | LOCKFILE_FAIL_IMMEDIATELY; + let res = LockFileEx( + file.as_raw_handle() as HANDLE, + flags, + 0, + !0, + !0, + &mut overlapped, + ); + 1 - res } } - let _file = lock_handle.ok_or_else(|| ApiError::LockAcquisition(path.clone()))?; - Ok(Handle { path, _file }) + pub(crate) fn unlock(file: &tokio::fs::File) -> i32 { + unsafe { UnlockFile(file.as_raw_handle() as HANDLE, 0, 0, !0, !0) } + } } +#[cfg(target_family = "windows")] +use windows::{lock, unlock}; + +#[cfg(not(any(target_family = "unix", target_family = "windows")))] +mod other { + pub(crate) fn lock(file: &tokio::fs::File) -> i32 { + 0 + } + pub(crate) fn unlock(file: &tokio::fs::File) -> i32 { + 0 + } +} +#[cfg(not(any(target_family = "unix", target_family = "windows")))] +use other::{lock, unlock}; #[derive(Debug, Error)] /// All errors the API can throw @@ -670,14 +722,21 @@ impl ApiRepo { .await?; file.seek(SeekFrom::Start(length as u64)).await?; file.write_all(&committed.to_le_bytes()).await?; + file.flush().await?; } } - tokio::fs::OpenOptions::new() + let mut f = tokio::fs::OpenOptions::new() .write(true) .open(&filename) - .await? - .set_len(length as u64) .await?; + f.set_len(length as u64).await?; + // XXX Extremely important and not obvious. + // Tokio::fs doesn't guarantee data is written at the end of `.await` + // boundaries. Even though we await the `set_len` it may not have been + // committed to disk, leading to invalid rename. + // Forcing a flush forces the data (here the truncation) to be committed to disk + f.flush().await?; + progressbar.finish().await; Ok(filename) } @@ -714,6 +773,7 @@ impl ApiRepo { .await?; file.seek(SeekFrom::Start(start as u64)).await?; file.write_all(&buf).await?; + file.flush().await?; Ok((start, stop)) } @@ -798,7 +858,7 @@ impl ApiRepo { let blob_path = cache.blob_path(&metadata.etag); std::fs::create_dir_all(blob_path.parent().unwrap())?; - let lock = lock_file(blob_path.clone()).await; + let lock = lock_file(blob_path.clone()).await?; progress.init(metadata.size, filename).await; let mut tmp_path = blob_path.clone(); tmp_path.set_extension(EXTENSION); @@ -859,6 +919,7 @@ mod tests { use serde_json::{json, Value}; use sha2::{Digest, Sha256}; use std::io::{Seek, Write}; + use std::time::Duration; struct TempDir { path: PathBuf,