Skip to content

Commit

Permalink
perf: spawn write_cas_file in a tokio thread
Browse files Browse the repository at this point in the history
  • Loading branch information
KSXGitHub committed Nov 6, 2023
1 parent ac9dbbd commit 0952130
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 91 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/store-dir/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ serde = { workspace = true }
serde_json = { workspace = true }
sha2 = { workspace = true }
ssri = { workspace = true }
tokio = { workspace = true }

[dev-dependencies]
pretty_assertions = { workspace = true }
Expand Down
19 changes: 13 additions & 6 deletions crates/store-dir/src/cas_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,23 @@ pub enum WriteCasFileError {

impl StoreDir {
/// Write a file from an npm package to the store directory.
pub fn write_cas_file(
pub async fn write_cas_file<Buffer>(
&self,
buffer: &[u8],
buffer: Buffer,
executable: bool,
) -> Result<(PathBuf, FileHash), WriteCasFileError> {
let file_hash = Sha512::digest(buffer);
) -> Result<(PathBuf, FileHash), WriteCasFileError>
where
Buffer: AsRef<[u8]> + Send + 'static,
{
let file_hash = Sha512::digest(buffer.as_ref());
let file_path = self.cas_file_path(file_hash, executable);
let mode = executable.then_some(EXEC_MODE);
ensure_file(&file_path, buffer, mode).map_err(WriteCasFileError::WriteFile)?;
Ok((file_path, file_hash))
tokio::task::spawn_blocking(move || {
ensure_file(&file_path, buffer.as_ref(), mode).map_err(WriteCasFileError::WriteFile)?;
Ok((file_path, file_hash))
})
.await
.expect("no join error")
}
}

Expand Down
159 changes: 74 additions & 85 deletions crates/tarball/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use pacquet_store_dir::{
};
use pipe_trait::Pipe;
use reqwest::Client;
use ssri::{Integrity, IntegrityChecker};
use ssri::Integrity;
use tar::Archive;
use tokio::sync::{Notify, RwLock};
use tracing::instrument;
Expand Down Expand Up @@ -103,11 +103,6 @@ fn decompress_gzip(gz_data: &[u8], unpacked_size: Option<usize>) -> Result<Vec<u
.map_err(TarballError::DecodeGzip)
}

#[instrument(skip(data), fields(data_len = data.len()))]
fn verify_checksum(data: &[u8], integrity: Integrity) -> Result<ssri::Algorithm, ssri::Error> {
integrity.pipe(IntegrityChecker::new).chain(data).result()
}

/// This subroutine downloads and extracts a tarball to the store directory.
///
/// It returns a CAS map of files in the tarball.
Expand Down Expand Up @@ -186,97 +181,91 @@ impl<'a> DownloadTarballToStore<'a> {
.map_err(network_error)?
.bytes()
.await
.map_err(network_error)?;
.map_err(network_error)?
.pipe(Arc::new);

tracing::info!(target: "pacquet::download", ?package_url, "Download completed");

// TODO: Cloning here is less than desirable, there are 2 possible solutions for this problem:
// 1. Use an Arc and convert this line to Arc::clone.
// 2. Replace ssri with base64 and serde magic (which supports Copy).
let package_integrity = package_integrity.clone();

#[derive(Debug, From)]
enum TaskError {
Checksum(ssri::Error),
Other(TarballError),
let package_integrity = package_integrity.clone().pipe(Arc::new);

{
let response = Arc::clone(&response);
let package_integrity = Arc::clone(&package_integrity);
tokio::task::spawn(async move { package_integrity.check(&*response) })
.await

This comment has been minimized.

Copy link
@TmLev

TmLev Nov 16, 2023

tokio::spawn on line 197 seems unnecessary to me, unless I'm missing something obvious. Spawning a task and then immediately awaiting it gives no benefit and is usually slower than just awaiting it.

package_integrity.check(&response)
    .await

This comment has been minimized.

Copy link
@KSXGitHub

KSXGitHub Nov 16, 2023

Author Contributor

.await doesn't actually wait in the current thread. It will spawn another thread then yield current thread to another task. That was the logic, as the tasks in the scope can be expensive. But now that we have a convenient tool to benchmark it, might as well try it out.

This comment has been minimized.

Copy link
@TmLev

TmLev Nov 16, 2023

tokio::task::spawn doesn't spawn a new thread. It also doesn't guarantee to run on a different thread, as per docs:

Spawning a task enables the task to execute concurrently to other tasks. The spawned task may execute on the current thread, or it may be sent to a different thread to be executed. The specifics depend on the current Runtime configuration.

Perhaps you were talking about spawn_blocking?

This comment has been minimized.

Copy link
@KSXGitHub

KSXGitHub Nov 19, 2023

Author Contributor

@TmLev Removing tokio::task::spawn actually makes pacquet slower. The bigger the lockfile, the slower it becomes.

Commit: fd2d14e

Command:

just integrated-benchmark -s frozen-lockfile main HEAD -V

Result:

result screenshot

This comment has been minimized.

Copy link
@KSXGitHub

KSXGitHub Nov 19, 2023

Author Contributor

Changing to to spawn_blocking does make it slightly faster:

benchmark result screenshot

diff --git a/crates/tarball/src/lib.rs b/crates/tarball/src/lib.rs
index c182ff4..7ab4802 100644
--- a/crates/tarball/src/lib.rs
+++ b/crates/tarball/src/lib.rs
@@ -200,7 +200,7 @@ impl<'a> DownloadTarballToStore<'a> {
             Checksum(ssri::Error),
             Other(TarballError),
         }
-        let cas_paths = tokio::task::spawn(async move {
+        let cas_paths = tokio::task::spawn_blocking(move || {
             verify_checksum(&response, package_integrity.clone()).map_err(TaskError::Checksum)?;
 
             // TODO: move tarball extraction to its own function
.expect("no join error")
.map_err(|error| {
TarballError::Checksum(VerifyChecksumError {
url: package_url.to_string(),
error,
})
})?;
}
let cas_paths = tokio::task::spawn(async move {
verify_checksum(&response, package_integrity.clone()).map_err(TaskError::Checksum)?;

// TODO: move tarball extraction to its own function
// TODO: test it
// TODO: test the duplication of entries

let mut archive = decompress_gzip(&response, package_unpacked_size)
.map_err(TaskError::Other)?
.pipe(Cursor::new)
.pipe(Archive::new);

let entries = archive
.entries()
.map_err(TarballError::ReadTarballEntries)
.map_err(TaskError::Other)?
.filter(|entry| !entry.as_ref().unwrap().header().entry_type().is_dir());

let ((_, Some(capacity)) | (capacity, None)) = entries.size_hint();
let mut cas_paths = HashMap::<OsString, PathBuf>::with_capacity(capacity);
let mut pkg_files_idx = PackageFilesIndex { files: HashMap::with_capacity(capacity) };

for entry in entries {
let mut entry = entry.unwrap();

let file_mode = entry.header().mode().expect("get mode"); // TODO: properly propagate this error
let file_is_executable = file_mode::is_all_exec(file_mode);

// Read the contents of the entry
let mut buffer = Vec::with_capacity(entry.size() as usize);
entry.read_to_end(&mut buffer).unwrap();

let entry_path = entry.path().unwrap();
let cleaned_entry_path =
entry_path.components().skip(1).collect::<PathBuf>().into_os_string();
let (file_path, file_hash) = store_dir
.write_cas_file(&buffer, file_is_executable)
.map_err(TarballError::WriteCasFile)?;

let tarball_index_key = cleaned_entry_path
.to_str()
.expect("entry path must be valid UTF-8") // TODO: propagate this error, provide more information
.to_string(); // TODO: convert cleaned_entry_path to String too.

if let Some(previous) = cas_paths.insert(cleaned_entry_path, file_path) {
tracing::warn!(?previous, "Duplication detected. Old entry has been ejected");
}

let checked_at = UNIX_EPOCH.elapsed().ok().map(|x| x.as_millis());
let file_size = entry.header().size().ok();
let file_integrity = format!("sha512-{}", BASE64_STD.encode(file_hash));
let file_attrs = PackageFileInfo {
checked_at,
integrity: file_integrity,
mode: file_mode,
size: file_size,
};

if let Some(previous) = pkg_files_idx.files.insert(tarball_index_key, file_attrs) {
tracing::warn!(?previous, "Duplication detected. Old entry has been ejected");
}
// TODO: move tarball extraction to its own function
// TODO: test it
// TODO: test the duplication of entries

let mut archive =
decompress_gzip(&response, package_unpacked_size)?.pipe(Cursor::new).pipe(Archive::new);

let entries = archive
.entries()
.map_err(TarballError::ReadTarballEntries)?
.filter(|entry| !entry.as_ref().unwrap().header().entry_type().is_dir());

let ((_, Some(capacity)) | (capacity, None)) = entries.size_hint();
let mut cas_paths = HashMap::<OsString, PathBuf>::with_capacity(capacity);
let mut pkg_files_idx = PackageFilesIndex { files: HashMap::with_capacity(capacity) };

for entry in entries {
let mut entry = entry.unwrap();

let file_mode = entry.header().mode().expect("get mode"); // TODO: properly propagate this error
let file_is_executable = file_mode::is_all_exec(file_mode);

// Read the contents of the entry
let mut buffer = Vec::with_capacity(entry.size() as usize);
entry.read_to_end(&mut buffer).unwrap();

let entry_path = entry.path().unwrap();
let cleaned_entry_path =
entry_path.components().skip(1).collect::<PathBuf>().into_os_string();
let (file_path, file_hash) = store_dir
.write_cas_file(buffer, file_is_executable)
.await
.map_err(TarballError::WriteCasFile)?;

let tarball_index_key = cleaned_entry_path
.to_str()
.expect("entry path must be valid UTF-8") // TODO: propagate this error, provide more information
.to_string(); // TODO: convert cleaned_entry_path to String too.

if let Some(previous) = cas_paths.insert(cleaned_entry_path, file_path) {
tracing::warn!(?previous, "Duplication detected. Old entry has been ejected");
}

store_dir
.write_index_file(&package_integrity, &pkg_files_idx)
.map_err(TarballError::WriteTarballIndexFile)?;
let checked_at = UNIX_EPOCH.elapsed().ok().map(|x| x.as_millis());
let file_size = entry.header().size().ok();
let file_integrity = format!("sha512-{}", BASE64_STD.encode(file_hash));
let file_attrs = PackageFileInfo {
checked_at,
integrity: file_integrity,
mode: file_mode,
size: file_size,
};

Ok(cas_paths)
})
.await
.expect("no join error")
.map_err(|error| match error {
TaskError::Checksum(error) => {
TarballError::Checksum(VerifyChecksumError { url: package_url.to_string(), error })
if let Some(previous) = pkg_files_idx.files.insert(tarball_index_key, file_attrs) {
tracing::warn!(?previous, "Duplication detected. Old entry has been ejected");
}
TaskError::Other(error) => error,
})?;
}

store_dir
.write_index_file(&package_integrity, &pkg_files_idx)
.map_err(TarballError::WriteTarballIndexFile)?;

tracing::info!(target: "pacquet::download", ?package_url, "Checksum verified");

Expand Down

0 comments on commit 0952130

Please sign in to comment.