From 09521309d668dc5533e55d9137e50a8388bde379 Mon Sep 17 00:00:00 2001 From: khai96_ Date: Mon, 6 Nov 2023 15:37:51 +0700 Subject: [PATCH] perf: spawn write_cas_file in a tokio thread --- Cargo.lock | 1 + crates/store-dir/Cargo.toml | 1 + crates/store-dir/src/cas_file.rs | 19 ++-- crates/tarball/src/lib.rs | 159 ++++++++++++++----------------- 4 files changed, 89 insertions(+), 91 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 05d751cb1..d46fb5145 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1493,6 +1493,7 @@ dependencies = [ "serde_json", "sha2", "ssri", + "tokio", ] [[package]] diff --git a/crates/store-dir/Cargo.toml b/crates/store-dir/Cargo.toml index fef784cb0..f2edd9c54 100644 --- a/crates/store-dir/Cargo.toml +++ b/crates/store-dir/Cargo.toml @@ -19,6 +19,7 @@ serde = { workspace = true } serde_json = { workspace = true } sha2 = { workspace = true } ssri = { workspace = true } +tokio = { workspace = true } [dev-dependencies] pretty_assertions = { workspace = true } diff --git a/crates/store-dir/src/cas_file.rs b/crates/store-dir/src/cas_file.rs index f53d4baea..7ce768148 100644 --- a/crates/store-dir/src/cas_file.rs +++ b/crates/store-dir/src/cas_file.rs @@ -22,16 +22,23 @@ pub enum WriteCasFileError { impl StoreDir { /// Write a file from an npm package to the store directory. - pub fn write_cas_file( + pub async fn write_cas_file( &self, - buffer: &[u8], + buffer: Buffer, executable: bool, - ) -> Result<(PathBuf, FileHash), WriteCasFileError> { - let file_hash = Sha512::digest(buffer); + ) -> Result<(PathBuf, FileHash), WriteCasFileError> + where + Buffer: AsRef<[u8]> + Send + 'static, + { + let file_hash = Sha512::digest(buffer.as_ref()); let file_path = self.cas_file_path(file_hash, executable); let mode = executable.then_some(EXEC_MODE); - ensure_file(&file_path, buffer, mode).map_err(WriteCasFileError::WriteFile)?; - Ok((file_path, file_hash)) + tokio::task::spawn_blocking(move || { + ensure_file(&file_path, buffer.as_ref(), mode).map_err(WriteCasFileError::WriteFile)?; + Ok((file_path, file_hash)) + }) + .await + .expect("no join error") } } diff --git a/crates/tarball/src/lib.rs b/crates/tarball/src/lib.rs index add46b412..9bbb05a8d 100644 --- a/crates/tarball/src/lib.rs +++ b/crates/tarball/src/lib.rs @@ -17,7 +17,7 @@ use pacquet_store_dir::{ }; use pipe_trait::Pipe; use reqwest::Client; -use ssri::{Integrity, IntegrityChecker}; +use ssri::Integrity; use tar::Archive; use tokio::sync::{Notify, RwLock}; use tracing::instrument; @@ -103,11 +103,6 @@ fn decompress_gzip(gz_data: &[u8], unpacked_size: Option) -> Result Result { - integrity.pipe(IntegrityChecker::new).chain(data).result() -} - /// This subroutine downloads and extracts a tarball to the store directory. /// /// It returns a CAS map of files in the tarball. @@ -186,97 +181,91 @@ impl<'a> DownloadTarballToStore<'a> { .map_err(network_error)? .bytes() .await - .map_err(network_error)?; + .map_err(network_error)? + .pipe(Arc::new); tracing::info!(target: "pacquet::download", ?package_url, "Download completed"); // TODO: Cloning here is less than desirable, there are 2 possible solutions for this problem: // 1. Use an Arc and convert this line to Arc::clone. // 2. Replace ssri with base64 and serde magic (which supports Copy). - let package_integrity = package_integrity.clone(); - - #[derive(Debug, From)] - enum TaskError { - Checksum(ssri::Error), - Other(TarballError), + let package_integrity = package_integrity.clone().pipe(Arc::new); + + { + let response = Arc::clone(&response); + let package_integrity = Arc::clone(&package_integrity); + tokio::task::spawn(async move { package_integrity.check(&*response) }) + .await + .expect("no join error") + .map_err(|error| { + TarballError::Checksum(VerifyChecksumError { + url: package_url.to_string(), + error, + }) + })?; } - let cas_paths = tokio::task::spawn(async move { - verify_checksum(&response, package_integrity.clone()).map_err(TaskError::Checksum)?; - - // TODO: move tarball extraction to its own function - // TODO: test it - // TODO: test the duplication of entries - - let mut archive = decompress_gzip(&response, package_unpacked_size) - .map_err(TaskError::Other)? - .pipe(Cursor::new) - .pipe(Archive::new); - - let entries = archive - .entries() - .map_err(TarballError::ReadTarballEntries) - .map_err(TaskError::Other)? - .filter(|entry| !entry.as_ref().unwrap().header().entry_type().is_dir()); - - let ((_, Some(capacity)) | (capacity, None)) = entries.size_hint(); - let mut cas_paths = HashMap::::with_capacity(capacity); - let mut pkg_files_idx = PackageFilesIndex { files: HashMap::with_capacity(capacity) }; - - for entry in entries { - let mut entry = entry.unwrap(); - - let file_mode = entry.header().mode().expect("get mode"); // TODO: properly propagate this error - let file_is_executable = file_mode::is_all_exec(file_mode); - - // Read the contents of the entry - let mut buffer = Vec::with_capacity(entry.size() as usize); - entry.read_to_end(&mut buffer).unwrap(); - - let entry_path = entry.path().unwrap(); - let cleaned_entry_path = - entry_path.components().skip(1).collect::().into_os_string(); - let (file_path, file_hash) = store_dir - .write_cas_file(&buffer, file_is_executable) - .map_err(TarballError::WriteCasFile)?; - - let tarball_index_key = cleaned_entry_path - .to_str() - .expect("entry path must be valid UTF-8") // TODO: propagate this error, provide more information - .to_string(); // TODO: convert cleaned_entry_path to String too. - - if let Some(previous) = cas_paths.insert(cleaned_entry_path, file_path) { - tracing::warn!(?previous, "Duplication detected. Old entry has been ejected"); - } - let checked_at = UNIX_EPOCH.elapsed().ok().map(|x| x.as_millis()); - let file_size = entry.header().size().ok(); - let file_integrity = format!("sha512-{}", BASE64_STD.encode(file_hash)); - let file_attrs = PackageFileInfo { - checked_at, - integrity: file_integrity, - mode: file_mode, - size: file_size, - }; - - if let Some(previous) = pkg_files_idx.files.insert(tarball_index_key, file_attrs) { - tracing::warn!(?previous, "Duplication detected. Old entry has been ejected"); - } + // TODO: move tarball extraction to its own function + // TODO: test it + // TODO: test the duplication of entries + + let mut archive = + decompress_gzip(&response, package_unpacked_size)?.pipe(Cursor::new).pipe(Archive::new); + + let entries = archive + .entries() + .map_err(TarballError::ReadTarballEntries)? + .filter(|entry| !entry.as_ref().unwrap().header().entry_type().is_dir()); + + let ((_, Some(capacity)) | (capacity, None)) = entries.size_hint(); + let mut cas_paths = HashMap::::with_capacity(capacity); + let mut pkg_files_idx = PackageFilesIndex { files: HashMap::with_capacity(capacity) }; + + for entry in entries { + let mut entry = entry.unwrap(); + + let file_mode = entry.header().mode().expect("get mode"); // TODO: properly propagate this error + let file_is_executable = file_mode::is_all_exec(file_mode); + + // Read the contents of the entry + let mut buffer = Vec::with_capacity(entry.size() as usize); + entry.read_to_end(&mut buffer).unwrap(); + + let entry_path = entry.path().unwrap(); + let cleaned_entry_path = + entry_path.components().skip(1).collect::().into_os_string(); + let (file_path, file_hash) = store_dir + .write_cas_file(buffer, file_is_executable) + .await + .map_err(TarballError::WriteCasFile)?; + + let tarball_index_key = cleaned_entry_path + .to_str() + .expect("entry path must be valid UTF-8") // TODO: propagate this error, provide more information + .to_string(); // TODO: convert cleaned_entry_path to String too. + + if let Some(previous) = cas_paths.insert(cleaned_entry_path, file_path) { + tracing::warn!(?previous, "Duplication detected. Old entry has been ejected"); } - store_dir - .write_index_file(&package_integrity, &pkg_files_idx) - .map_err(TarballError::WriteTarballIndexFile)?; + let checked_at = UNIX_EPOCH.elapsed().ok().map(|x| x.as_millis()); + let file_size = entry.header().size().ok(); + let file_integrity = format!("sha512-{}", BASE64_STD.encode(file_hash)); + let file_attrs = PackageFileInfo { + checked_at, + integrity: file_integrity, + mode: file_mode, + size: file_size, + }; - Ok(cas_paths) - }) - .await - .expect("no join error") - .map_err(|error| match error { - TaskError::Checksum(error) => { - TarballError::Checksum(VerifyChecksumError { url: package_url.to_string(), error }) + if let Some(previous) = pkg_files_idx.files.insert(tarball_index_key, file_attrs) { + tracing::warn!(?previous, "Duplication detected. Old entry has been ejected"); } - TaskError::Other(error) => error, - })?; + } + + store_dir + .write_index_file(&package_integrity, &pkg_files_idx) + .map_err(TarballError::WriteTarballIndexFile)?; tracing::info!(target: "pacquet::download", ?package_url, "Checksum verified");