Skip to content

Commit

Permalink
perf(tarball): write_cas_file in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
KSXGitHub committed Nov 4, 2023
1 parent 690d5e1 commit 9c38751
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 47 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/tarball/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ dashmap = { workspace = true }
derive_more = { workspace = true }
miette = { workspace = true }
pipe-trait = { workspace = true }
rayon = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
Expand Down
99 changes: 52 additions & 47 deletions crates/tarball/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use pacquet_store_dir::{
PackageFileInfo, PackageFilesIndex, StoreDir, WriteCasFileError, WriteTarballIndexFileError,
};
use pipe_trait::Pipe;
use rayon::prelude::*;
use reqwest::Client;
use ssri::Integrity;
use tar::Archive;
Expand Down Expand Up @@ -211,55 +212,59 @@ impl<'a> DownloadTarballToStore<'a> {
.pipe(Cursor::new)
.pipe(Archive::new);

let entries = archive
struct FileInfo {
cleaned_entry_path: OsString,
file_mode: u32,
file_size: Option<u64>,
buffer: Vec<u8>,
}
let (cas_paths, index_entries) = archive
.entries()
.map_err(TarballError::ReadTarballEntries)?
.filter(|entry| !entry.as_ref().unwrap().header().entry_type().is_dir());

let ((_, Some(capacity)) | (capacity, None)) = entries.size_hint();
let mut cas_paths = HashMap::<OsString, PathBuf>::with_capacity(capacity);
let mut pkg_files_idx = PackageFilesIndex { files: HashMap::with_capacity(capacity) };

for entry in entries {
let mut entry = entry.unwrap();

let file_mode = entry.header().mode().expect("get mode"); // TODO: properly propagate this error
let file_is_executable = file_mode::is_all_exec(file_mode);

// Read the contents of the entry
let mut buffer = Vec::with_capacity(entry.size() as usize);
entry.read_to_end(&mut buffer).unwrap();

let entry_path = entry.path().unwrap();
let cleaned_entry_path =
entry_path.components().skip(1).collect::<PathBuf>().into_os_string();
let (file_path, file_hash) = store_dir
.write_cas_file(&buffer, file_is_executable)
.map_err(TarballError::WriteCasFile)?;

let tarball_index_key = cleaned_entry_path
.to_str()
.expect("entry path must be valid UTF-8") // TODO: propagate this error, provide more information
.to_string(); // TODO: convert cleaned_entry_path to String too.

if let Some(previous) = cas_paths.insert(cleaned_entry_path, file_path) {
tracing::warn!(?previous, "Duplication detected. Old entry has been ejected");
}

let checked_at = UNIX_EPOCH.elapsed().ok().map(|x| x.as_millis());
let file_size = entry.header().size().ok();
let file_integrity = format!("sha512-{}", BASE64_STD.encode(file_hash));
let file_attrs = PackageFileInfo {
checked_at,
integrity: file_integrity,
mode: file_mode,
size: file_size,
};

if let Some(previous) = pkg_files_idx.files.insert(tarball_index_key, file_attrs) {
tracing::warn!(?previous, "Duplication detected. Old entry has been ejected");
}
}
.filter(|entry| !entry.as_ref().unwrap().header().entry_type().is_dir())
.map(|entry| entry.expect("get entry"))
.map(|entry| {
let cleaned_entry_path = entry
.path()
.expect("get path") // TODO: properly propagate this error
.components()
.skip(1)
.collect::<PathBuf>()
.into_os_string();
let file_mode = entry.header().mode().expect("get mode"); // TODO: properly propagate this error
let file_size = entry.header().size().ok();
let buffer = entry.bytes().collect::<Result<_, _>>().expect("read content"); // TODO: properly propagate this error
FileInfo { cleaned_entry_path, file_mode, file_size, buffer }
})
.collect::<Vec<FileInfo>>()
.into_par_iter()
.map(|file_info| -> Result<_, TarballError> {
let FileInfo { cleaned_entry_path, file_mode, file_size, buffer } = file_info;
let file_is_executable = file_mode::is_all_exec(file_mode);

let (file_path, file_hash) = store_dir
.write_cas_file(&buffer, file_is_executable)
.map_err(TarballError::WriteCasFile)?;

let index_key = cleaned_entry_path
.to_str()
.expect("entry path must be valid UTF-8") // TODO: propagate this error, provide more information
.to_string(); // TODO: convert cleaned_entry_path to String too.

let checked_at = UNIX_EPOCH.elapsed().ok().map(|x| x.as_millis());
let file_integrity = format!("sha512-{}", BASE64_STD.encode(file_hash));
let index_value = PackageFileInfo {
checked_at,
integrity: file_integrity,
mode: file_mode,
size: file_size,
};

Ok(((cleaned_entry_path, file_path), (index_key, index_value)))
})
.collect::<Result<(HashMap<_, _>, HashMap<_, _>), TarballError>>()?;

let pkg_files_idx = PackageFilesIndex { files: index_entries };

store_dir
.write_index_file(&package_integrity, &pkg_files_idx)
Expand Down

0 comments on commit 9c38751

Please sign in to comment.