Skip to content

Commit

Permalink
fix push and pull progress bars and size computations
Browse files Browse the repository at this point in the history
  • Loading branch information
gschoeni committed Nov 1, 2023
1 parent 12e2504 commit 79813c0
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 34 deletions.
4 changes: 2 additions & 2 deletions src/cli/src/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ pub async fn create_remote(
repo_new.host = Some(String::from(host));
let remote_repo = api::remote::repositories::create_empty(repo_new).await?;
println!(
"Remote created for {}\n\nIf this is a brand new repository:\n\n oxen clone {} {}\n\nTo push an existing repository to a new remote:\n\n oxen config --set-remote origin {}\n",
name, name, remote_repo.remote.url, remote_repo.remote.url
"🎉 Remote successfully created for '{}/{}' if this is a brand new repository:\n\n oxen clone {}\n\nTo push an existing repository to a new remote:\n\n oxen config --set-remote origin {}\n",
namespace, name, remote_repo.remote.url, remote_repo.remote.url
);
} else {
// Creating a remote with an initial commit and a README
Expand Down
8 changes: 4 additions & 4 deletions src/lib/src/api/remote/entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,8 @@ pub async fn download_large_entry(
bodies
.for_each(|b| async {
match b {
Ok(_) => {
bar.inc(chunk_size);
Ok(s) => {
bar.inc(s);
}
Err(err) => {
log::error!("Error uploading chunk: {:?}", err)
Expand Down Expand Up @@ -335,7 +335,7 @@ async fn try_download_entry_chunk(
revision: impl AsRef<str>,
chunk_start: u64,
chunk_size: u64,
) -> Result<(), OxenError> {
) -> Result<u64, OxenError> {
let mut try_num = 0;
while try_num < constants::NUM_HTTP_RETRIES {
match download_entry_chunk(
Expand All @@ -350,7 +350,7 @@ async fn try_download_entry_chunk(
{
Ok(_) => {
log::debug!("Downloaded chunk {:?}", local_path.as_ref());
return Ok(());
return Ok(chunk_size);
}
Err(err) => {
log::error!("Error trying to download chunk: {}", err);
Expand Down
67 changes: 39 additions & 28 deletions src/lib/src/core/index/pusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -800,9 +800,9 @@ async fn upload_large_file_chunks(
let file_name = Some(String::from(path.to_str().unwrap()));

// Calculate chunk sizes
let total_size = entry.num_bytes;
let total_chunks = ((total_size / chunk_size) + 1) as usize;
let mut total_read = 0;
let total_bytes = entry.num_bytes;
let total_chunks = ((total_bytes / chunk_size) + 1) as usize;
let mut total_bytes_read = 0;
let mut chunk_size = chunk_size;

// Create queues for sending data to workers
Expand All @@ -827,44 +827,56 @@ async fn upload_large_file_chunks(
bar.enable_steady_tick(Duration::from_secs(1));
bar.inc(0);

let mut read_chunk_idx = 0;
let mut total_chunk_idx = 0;
let mut processed_chunk_idx = 0;
let num_sub_chunks = (total_chunks / sub_chunk_size) + 1;
log::debug!(
"upload_large_file_chunks proccessing file in {} subchunks of size {} from total {}",
"upload_large_file_chunks {:?} proccessing file in {} subchunks of size {} from total {} chunk size {} file size {}",
entry.path,
num_sub_chunks,
sub_chunk_size,
total_chunks
total_chunks,
chunk_size,
total_bytes
);
for i in 0..num_sub_chunks {
log::debug!(
"Reading subchunk {}/{} of size {}",
i,
num_sub_chunks,
sub_chunk_size
"upload_large_file_chunks Start reading subchunk {i}/{num_sub_chunks} of size {sub_chunk_size} from total {total_chunks} chunk size {chunk_size} file size {total_bytes_read}/{total_bytes}"
);
// Read and send the subset of buffers sequentially
let mut sub_buffers: Vec<Vec<u8>> = Vec::new();
for _ in 0..sub_chunk_size {
// If we have read all the bytes, break
if total_bytes_read >= total_bytes {
break;
}

// Make sure we read the last size correctly
let mut should_break = false;
if (total_read + chunk_size) > total_size {
chunk_size = total_size % chunk_size;
should_break = true;
if (total_bytes_read + chunk_size) > total_bytes {
chunk_size = total_bytes % chunk_size;
}

let percent_read = (total_bytes_read as f64 / total_bytes as f64) * 100.0;
log::debug!("upload_large_file_chunks has read {total_bytes_read}/{total_bytes} = {percent_read}% about to read {chunk_size}");

// Only read as much as you need to send so we don't blow up memory on large files
let mut buffer = vec![0u8; chunk_size as usize];
reader.read_exact(&mut buffer).unwrap();
total_read += chunk_size;
match reader.read_exact(&mut buffer) {
Ok(_) => {}
Err(err) => {
log::error!("upload_large_file_chunks Error reading file {:?} chunk {total_chunk_idx}/{total_chunks} chunk size {chunk_size} total_bytes_read: {total_bytes_read} total_bytes: {total_bytes} {:?}", entry.path, err);
return;
}
}
total_bytes_read += chunk_size;
total_chunk_idx += 1;

sub_buffers.push(buffer);

if should_break {
break;
}
}
log::debug!(
"Done, have read subchunk {}/{} of size {}",
"upload_large_file_chunks Done, have read subchunk {}/{} subchunk {}/{} of size {}",
processed_chunk_idx,
total_chunks,
i,
num_sub_chunks,
sub_chunk_size
Expand All @@ -878,16 +890,16 @@ async fn upload_large_file_chunks(
tasks.push((
buffer.to_owned(),
chunk_size,
read_chunk_idx, // Needs to be the overall chunk num
processed_chunk_idx, // Needs to be the overall chunk num
total_chunks,
total_size,
total_bytes,
remote_repo.to_owned(),
entry.hash.to_owned(),
commit.to_owned(),
file_name.to_owned(),
));
// finished_queue.try_push(false).unwrap();
read_chunk_idx += 1;
processed_chunk_idx += 1;
}

// Setup the stream chunks in parallel
Expand All @@ -906,7 +918,7 @@ async fn upload_large_file_chunks(
) = item;
let size = buffer.len() as u64;
log::debug!(
"Got entry buffer [{}/{}] of size {}",
"upload_large_file_chunks Streaming entry buffer {}/{} of size {}",
chunk_num,
total_chunks,
size
Expand All @@ -931,9 +943,8 @@ async fn upload_large_file_chunks(
.await
{
Ok(_) => {
// bar.inc(chunk_size.clone());
log::debug!(
"Successfully uploaded subchunk overall chunk {}/{}",
"upload_large_file_chunks Successfully uploaded subchunk overall chunk {}/{}",
chunk_num,
total_chunks
);
Expand Down Expand Up @@ -961,7 +972,7 @@ async fn upload_large_file_chunks(
})
.await;

log::debug!("Subchunk [{i}] tasks done. :-)");
log::debug!("upload_large_file_chunks Subchunk {i}/{num_sub_chunks} tasks done. :-)");
}
}

Expand Down

0 comments on commit 79813c0

Please sign in to comment.