From 79813c03c2ce2a4df7ef60eaeebccbb29614711c Mon Sep 17 00:00:00 2001 From: Greg Schoeninger Date: Wed, 1 Nov 2023 16:32:39 -0700 Subject: [PATCH] fix push and pull progress bars and size computations --- src/cli/src/dispatch.rs | 4 +- src/lib/src/api/remote/entries.rs | 8 ++-- src/lib/src/core/index/pusher.rs | 67 ++++++++++++++++++------------- 3 files changed, 45 insertions(+), 34 deletions(-) diff --git a/src/cli/src/dispatch.rs b/src/cli/src/dispatch.rs index 2ca38f1ec..9c78f278d 100644 --- a/src/cli/src/dispatch.rs +++ b/src/cli/src/dispatch.rs @@ -145,8 +145,8 @@ pub async fn create_remote( repo_new.host = Some(String::from(host)); let remote_repo = api::remote::repositories::create_empty(repo_new).await?; println!( - "Remote created for {}\n\nIf this is a brand new repository:\n\n oxen clone {} {}\n\nTo push an existing repository to a new remote:\n\n oxen config --set-remote origin {}\n", - name, name, remote_repo.remote.url, remote_repo.remote.url + "🎉 Remote successfully created for '{}/{}' if this is a brand new repository:\n\n oxen clone {}\n\nTo push an existing repository to a new remote:\n\n oxen config --set-remote origin {}\n", + namespace, name, remote_repo.remote.url, remote_repo.remote.url ); } else { // Creating a remote with an initial commit and a README diff --git a/src/lib/src/api/remote/entries.rs b/src/lib/src/api/remote/entries.rs index b659d46e7..1d32f912a 100644 --- a/src/lib/src/api/remote/entries.rs +++ b/src/lib/src/api/remote/entries.rs @@ -266,8 +266,8 @@ pub async fn download_large_entry( bodies .for_each(|b| async { match b { - Ok(_) => { - bar.inc(chunk_size); + Ok(s) => { + bar.inc(s); } Err(err) => { log::error!("Error uploading chunk: {:?}", err) @@ -335,7 +335,7 @@ async fn try_download_entry_chunk( revision: impl AsRef, chunk_start: u64, chunk_size: u64, -) -> Result<(), OxenError> { +) -> Result { let mut try_num = 0; while try_num < constants::NUM_HTTP_RETRIES { match download_entry_chunk( @@ -350,7 +350,7 @@ async fn try_download_entry_chunk( { Ok(_) => { log::debug!("Downloaded chunk {:?}", local_path.as_ref()); - return Ok(()); + return Ok(chunk_size); } Err(err) => { log::error!("Error trying to download chunk: {}", err); diff --git a/src/lib/src/core/index/pusher.rs b/src/lib/src/core/index/pusher.rs index d222fe720..450de2da4 100644 --- a/src/lib/src/core/index/pusher.rs +++ b/src/lib/src/core/index/pusher.rs @@ -800,9 +800,9 @@ async fn upload_large_file_chunks( let file_name = Some(String::from(path.to_str().unwrap())); // Calculate chunk sizes - let total_size = entry.num_bytes; - let total_chunks = ((total_size / chunk_size) + 1) as usize; - let mut total_read = 0; + let total_bytes = entry.num_bytes; + let total_chunks = ((total_bytes / chunk_size) + 1) as usize; + let mut total_bytes_read = 0; let mut chunk_size = chunk_size; // Create queues for sending data to workers @@ -827,44 +827,56 @@ async fn upload_large_file_chunks( bar.enable_steady_tick(Duration::from_secs(1)); bar.inc(0); - let mut read_chunk_idx = 0; + let mut total_chunk_idx = 0; + let mut processed_chunk_idx = 0; let num_sub_chunks = (total_chunks / sub_chunk_size) + 1; log::debug!( - "upload_large_file_chunks proccessing file in {} subchunks of size {} from total {}", + "upload_large_file_chunks {:?} proccessing file in {} subchunks of size {} from total {} chunk size {} file size {}", + entry.path, num_sub_chunks, sub_chunk_size, - total_chunks + total_chunks, + chunk_size, + total_bytes ); for i in 0..num_sub_chunks { log::debug!( - "Reading subchunk {}/{} of size {}", - i, - num_sub_chunks, - sub_chunk_size + "upload_large_file_chunks Start reading subchunk {i}/{num_sub_chunks} of size {sub_chunk_size} from total {total_chunks} chunk size {chunk_size} file size {total_bytes_read}/{total_bytes}" ); // Read and send the subset of buffers sequentially let mut sub_buffers: Vec> = Vec::new(); for _ in 0..sub_chunk_size { + // If we have read all the bytes, break + if total_bytes_read >= total_bytes { + break; + } + // Make sure we read the last size correctly - let mut should_break = false; - if (total_read + chunk_size) > total_size { - chunk_size = total_size % chunk_size; - should_break = true; + if (total_bytes_read + chunk_size) > total_bytes { + chunk_size = total_bytes % chunk_size; } + let percent_read = (total_bytes_read as f64 / total_bytes as f64) * 100.0; + log::debug!("upload_large_file_chunks has read {total_bytes_read}/{total_bytes} = {percent_read}% about to read {chunk_size}"); + // Only read as much as you need to send so we don't blow up memory on large files let mut buffer = vec![0u8; chunk_size as usize]; - reader.read_exact(&mut buffer).unwrap(); - total_read += chunk_size; + match reader.read_exact(&mut buffer) { + Ok(_) => {} + Err(err) => { + log::error!("upload_large_file_chunks Error reading file {:?} chunk {total_chunk_idx}/{total_chunks} chunk size {chunk_size} total_bytes_read: {total_bytes_read} total_bytes: {total_bytes} {:?}", entry.path, err); + return; + } + } + total_bytes_read += chunk_size; + total_chunk_idx += 1; sub_buffers.push(buffer); - - if should_break { - break; - } } log::debug!( - "Done, have read subchunk {}/{} of size {}", + "upload_large_file_chunks Done, have read subchunk {}/{} subchunk {}/{} of size {}", + processed_chunk_idx, + total_chunks, i, num_sub_chunks, sub_chunk_size @@ -878,16 +890,16 @@ async fn upload_large_file_chunks( tasks.push(( buffer.to_owned(), chunk_size, - read_chunk_idx, // Needs to be the overall chunk num + processed_chunk_idx, // Needs to be the overall chunk num total_chunks, - total_size, + total_bytes, remote_repo.to_owned(), entry.hash.to_owned(), commit.to_owned(), file_name.to_owned(), )); // finished_queue.try_push(false).unwrap(); - read_chunk_idx += 1; + processed_chunk_idx += 1; } // Setup the stream chunks in parallel @@ -906,7 +918,7 @@ async fn upload_large_file_chunks( ) = item; let size = buffer.len() as u64; log::debug!( - "Got entry buffer [{}/{}] of size {}", + "upload_large_file_chunks Streaming entry buffer {}/{} of size {}", chunk_num, total_chunks, size @@ -931,9 +943,8 @@ async fn upload_large_file_chunks( .await { Ok(_) => { - // bar.inc(chunk_size.clone()); log::debug!( - "Successfully uploaded subchunk overall chunk {}/{}", + "upload_large_file_chunks Successfully uploaded subchunk overall chunk {}/{}", chunk_num, total_chunks ); @@ -961,7 +972,7 @@ async fn upload_large_file_chunks( }) .await; - log::debug!("Subchunk [{i}] tasks done. :-)"); + log::debug!("upload_large_file_chunks Subchunk {i}/{num_sub_chunks} tasks done. :-)"); } }