Skip to content

Commit

Permalink
unit tests passing for v0.25.0
Browse files Browse the repository at this point in the history
  • Loading branch information
gschoeni committed Jan 8, 2025
1 parent 183d515 commit 6c729d3
Show file tree
Hide file tree
Showing 25 changed files with 34 additions and 2,086 deletions.
202 changes: 3 additions & 199 deletions src/lib/src/api/client/commits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ use crate::opts::PaginateOpts;
use crate::util::fs::oxen_hidden_dir;
use crate::util::hasher::hash_buffer;
use crate::util::progress_bar::{oxify_bar, ProgressBarType};
use crate::view::commit::{CommitSyncStatusResponse, CommitTreeValidationResponse};
use crate::view::commit::CommitTreeValidationResponse;
use crate::view::tree::merkle_hashes::MerkleHashes;
use crate::{api, constants, repositories};
use crate::{current_function, util};
// use crate::util::ReadProgress;
use crate::view::{
CommitResponse, IsValidStatusMessage, ListCommitResponse, MerkleHashesResponse,
PaginatedCommits, RootCommitResponse, StatusMessage,
CommitResponse, ListCommitResponse, MerkleHashesResponse, PaginatedCommits, RootCommitResponse,
StatusMessage,
};

use std::collections::HashSet;
Expand Down Expand Up @@ -261,74 +261,6 @@ async fn list_all_commits_paginated(
}
}

pub async fn commit_is_synced(
remote_repo: &RemoteRepository,
commit_id: &str,
) -> Result<Option<IsValidStatusMessage>, OxenError> {
let uri = format!("/commits/{commit_id}/is_synced");
let url = api::endpoint::url_from_repo(remote_repo, &uri)?;
log::debug!("commit_is_synced checking URL: {}", url);

let client = client::new_for_url(&url)?;
if let Ok(res) = client.get(&url).send().await {
log::debug!("commit_is_synced Got response [{}]", res.status());
if res.status() == 404 {
return Ok(None);
}

let body = client::parse_json_body(&url, res).await?;
log::debug!("commit_is_synced got response body: {}", body);
let response: Result<IsValidStatusMessage, serde_json::Error> = serde_json::from_str(&body);
match response {
Ok(j_res) => Ok(Some(j_res)),
Err(err) => {
log::debug!("Error getting remote commit {}", err);
Err(OxenError::basic_str(
"commit_is_synced() unable to parse body",
))
}
}
} else {
Err(OxenError::basic_str("commit_is_synced() Request failed"))
}
}

pub async fn latest_commit_synced(
remote_repo: &RemoteRepository,
commit_id: &str,
) -> Result<CommitSyncStatusResponse, OxenError> {
let uri = format!("/commits/{commit_id}/latest_synced");
let url = api::endpoint::url_from_repo(remote_repo, &uri)?;
log::debug!("latest_commit_synced checking URL: {}", url);

let client = client::new_for_url(&url)?;
if let Ok(res) = client.get(&url).send().await {
log::debug!("latest_commit_synced Got response [{}]", res.status());
if res.status() == 404 {
return Err(OxenError::basic_str("No synced commits found"));
}

let body = client::parse_json_body(&url, res).await?;
log::debug!("latest_commit_synced got response body: {}", body);
// Sync status response
let response: Result<CommitSyncStatusResponse, serde_json::Error> =
serde_json::from_str(&body);
match response {
Ok(result) => Ok(result),
Err(err) => {
log::debug!("Error getting remote commit {}", err);
Err(OxenError::basic_str(
"latest_commit_synced() unable to parse body",
))
}
}
} else {
Err(OxenError::basic_str(
"latest_commit_synced() Request failed",
))
}
}

/// Download the database of all the commits in a repository
pub async fn download_commits_db_to_repo(
local_repo: &LocalRepository,
Expand Down Expand Up @@ -1337,140 +1269,12 @@ mod tests {
use crate::constants::DEFAULT_BRANCH_NAME;
use crate::error::OxenError;

use crate::model::entry::commit_entry::Entry;
use crate::model::entry::unsynced_commit_entry::UnsyncedCommitEntries;
use crate::model::MerkleHash;
use crate::repositories;
use crate::test;

use std::str::FromStr;

#[tokio::test]
async fn test_remote_commits_post_commits_to_server() -> Result<(), OxenError> {
test::run_training_data_sync_test_no_commits(|local_repo, remote_repo| async move {
// Track the annotations dir
// has format
// annotations/
// train/
// one_shot.csv
// annotations.txt
// test/
// annotations.txt
let train_dir = local_repo.path.join("annotations").join("train");
repositories::add(&local_repo, &train_dir)?;
// Commit the directory
let commit1 = repositories::commit(&local_repo, "Adding 1")?;

let test_dir = local_repo.path.join("annotations").join("test");
repositories::add(&local_repo, &test_dir)?;
// Commit the directory
let commit2 = repositories::commit(&local_repo, "Adding 2")?;

let branch = repositories::branches::current_branch(&local_repo)?.unwrap();

// Post commit

let unsynced_commits = vec![
UnsyncedCommitEntries {
commit: commit1,
entries: Vec::<Entry>::new(),
},
UnsyncedCommitEntries {
commit: commit2,
entries: Vec::<Entry>::new(),
},
];

api::client::commits::post_commits_to_server(
&local_repo,
&remote_repo,
&unsynced_commits,
branch.name.clone(),
)
.await?;

Ok(remote_repo)
})
.await
}

#[tokio::test]
async fn test_remote_commits_commit_is_valid() -> Result<(), OxenError> {
test::run_training_data_repo_test_fully_committed_async(|local_repo| async move {
let mut local_repo = local_repo;
let commit_history = repositories::commits::list(&local_repo)?;
let commit = commit_history.first().unwrap();

// Set the proper remote
let name = local_repo.dirname();
let remote = test::repo_remote_url_from(&name);
command::config::set_remote(&mut local_repo, constants::DEFAULT_REMOTE_NAME, &remote)?;

// Create Remote
let remote_repo = test::create_remote_repo(&local_repo).await?;

// Push it
repositories::push(&local_repo).await?;

let is_synced = api::client::commits::commit_is_synced(&remote_repo, &commit.id)
.await?
.unwrap();
assert!(is_synced.is_valid);

api::client::repositories::delete(&remote_repo).await?;

Ok(())
})
.await
}

#[tokio::test]
async fn test_remote_commits_is_not_valid() -> Result<(), OxenError> {
test::run_training_data_sync_test_no_commits(|local_repo, remote_repo| async move {
// Track the annotations dir
// has format
// annotations/
// train/
// one_shot.csv
// annotations.txt
// test/
// annotations.txt
let annotations_dir = local_repo.path.join("annotations");
repositories::add(&local_repo, &annotations_dir)?;
// Commit the directory
let commit = repositories::commit(
&local_repo,
"Adding annotations data dir, which has two levels",
)?;
let branch = repositories::branches::current_branch(&local_repo)?.unwrap();

// Dummy entries, not checking this
let entries = Vec::<Entry>::new();

let unsynced_commits = vec![UnsyncedCommitEntries {
commit: commit.clone(),
entries,
}];

api::client::commits::post_commits_to_server(
&local_repo,
&remote_repo,
&unsynced_commits,
branch.name.clone(),
)
.await?;

// Should not be synced because we didn't actually post the files
let is_synced =
api::client::commits::commit_is_synced(&remote_repo, &commit.id).await?;
// We never kicked off the background processes
assert!(is_synced.is_none());

Ok(remote_repo)
})
.await
}

#[tokio::test]
async fn test_list_remote_commits_all() -> Result<(), OxenError> {
test::run_training_data_repo_test_fully_committed_async(|local_repo| async move {
Expand Down
114 changes: 1 addition & 113 deletions src/lib/src/core/v0_10_0/index/pusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::sync::Arc;

use tokio::time::Duration;

use crate::constants::{self, AVG_CHUNK_SIZE, NUM_HTTP_RETRIES};
use crate::constants::{self, AVG_CHUNK_SIZE};

use crate::core::v0_10_0::index::{CommitReader, Merger};
use crate::error::OxenError;
Expand Down Expand Up @@ -260,7 +260,6 @@ pub async fn try_push_remote_repo(
unsynced_entries_commits.len() as u64,
"Remote validating commits",
);
poll_until_synced(remote_repo, &head_commit, &bar).await?;
bar.finish_and_clear();

log::debug!("Just finished push.");
Expand Down Expand Up @@ -511,53 +510,6 @@ fn commits_to_push_are_synced(
Ok(true)
}

async fn poll_until_synced(
remote_repo: &RemoteRepository,
commit: &Commit,
bar: &Arc<ProgressBar>,
) -> Result<(), OxenError> {
let commits_to_sync = bar.length().unwrap();

let head_commit_id = &commit.id;

let mut retries = 0;

loop {
match api::client::commits::latest_commit_synced(remote_repo, head_commit_id).await {
Ok(sync_status) => {
retries = 0;
log::debug!("Got latest synced commit {:?}", sync_status.latest_synced);
log::debug!("Got n unsynced commits {:?}", sync_status.num_unsynced);
if commits_to_sync > sync_status.num_unsynced as u64 {
bar.set_position(commits_to_sync - sync_status.num_unsynced as u64);
}
if sync_status.num_unsynced == 0 {
bar.finish_and_clear();
println!("🎉 Push successful");
return Ok(());
}
}
Err(err) => {
retries += 1;
// Back off, but don't want to go all the way to 100s
let sleep_time = 2 * retries;
if retries >= NUM_HTTP_RETRIES {
bar.finish_and_clear();
return Err(err);
}
log::warn!(
"Server error encountered, retrying... ({}/{})",
retries,
NUM_HTTP_RETRIES
);
// Extra sleep time in error cases
std::thread::sleep(std::time::Duration::from_secs(sleep_time));
}
}
std::thread::sleep(std::time::Duration::from_millis(1000));
}
}

async fn push_missing_commit_dbs(
local_repo: &LocalRepository,
remote_repo: &RemoteRepository,
Expand Down Expand Up @@ -1219,67 +1171,3 @@ async fn bundle_and_send_small_entries(

Ok(())
}

#[cfg(test)]
mod tests {
use crate::api;
use crate::command;
use crate::constants;
use crate::core::v0_10_0::index::pusher;

use crate::core::versions::MinOxenVersion;
use crate::error::OxenError;

use crate::repositories;
use crate::test;

#[tokio::test]
async fn test_push_missing_commit_dbs() -> Result<(), OxenError> {
test::run_training_data_repo_test_fully_committed_async_min_version(
MinOxenVersion::V0_10_0,
|mut repo| async move {
// Set the proper remote
let name = repo.dirname();
let remote = test::repo_remote_url_from(&name);
command::config::set_remote(&mut repo, constants::DEFAULT_REMOTE_NAME, &remote)?;

// Create remote repo
let remote_repo = test::create_remote_repo(&repo).await?;

// Get commits to sync...
let head_commit = repositories::commits::head_commit(&repo)?;
let branch = repositories::branches::current_branch(&repo)?.unwrap();

// Create all commit objects
let unsynced_commits =
pusher::get_commit_objects_to_sync(&repo, &remote_repo, &head_commit, &branch)
.await?;
pusher::push_missing_commit_objects(
&repo,
&remote_repo,
&unsynced_commits,
&branch,
)
.await?;

// Should have one missing commit db - root created on repo creation
let unsynced_db_commits =
api::client::commits::get_commits_with_unsynced_dbs(&remote_repo, &branch)
.await?;
assert_eq!(unsynced_db_commits.len(), 0);

// Push to the remote
pusher::push_missing_commit_dbs(&repo, &remote_repo, unsynced_db_commits).await?;

// All commits should now have dbs
let unsynced_db_commits =
api::client::commits::get_commits_with_unsynced_dbs(&remote_repo, &branch)
.await?;
assert_eq!(unsynced_db_commits.len(), 0);

Ok(())
},
)
.await
}
}
Loading

0 comments on commit 6c729d3

Please sign in to comment.