Skip to content
This repository has been archived by the owner on Jan 2, 2025. It is now read-only.

Commit

Permalink
Sync-related consistency improvements (#739)
Browse files Browse the repository at this point in the history
* Make sure all handles correspond to an existing Repository

* Always wait for github credentials before polling repos

* No need for Arcs

* Only index & report remote repositories, never locals

* Check ee feature for consistency

* Fix correct remote handling, take 2

* Address review comments
  • Loading branch information
rsdy authored Jul 11, 2023
1 parent 35fe31a commit 8f6e4a8
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 82 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/server-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
run: nix develop -c bash -c 'cargo --locked fmt -p bleep -- --check'

- name: Clippy
run: nix develop -c bash -c 'cargo --locked clippy -p bleep'
run: nix develop -c bash -c 'cargo --locked clippy -p bleep --features=ee'

- name: Tests
run: nix develop -c bash -c 'cargo --locked test -p bleep --release'
Expand Down
5 changes: 3 additions & 2 deletions server/bleep/src/background.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,8 @@ impl BoundSyncQueue {
}

info!(%reporef, "queueing for sync");
let handle = SyncHandle::new(self.0.clone(), reporef, self.1.progress.clone(), None);
let handle =
SyncHandle::new(self.0.clone(), reporef, self.1.progress.clone(), None).await;
self.1.queue.push(handle).await;
num_queued += 1;
}
Expand All @@ -236,7 +237,7 @@ impl BoundSyncQueue {
///
/// Returns the new status.
pub(crate) async fn block_until_synced(self, reporef: RepoRef) -> anyhow::Result<SyncStatus> {
let handle = SyncHandle::new(self.0.clone(), reporef, self.1.progress.clone(), None);
let handle = SyncHandle::new(self.0.clone(), reporef, self.1.progress.clone(), None).await;
let finished = handle.notify_done();
self.1.queue.push(handle).await;
Ok(finished.recv_async().await?)
Expand Down
4 changes: 2 additions & 2 deletions server/bleep/src/background/control.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::{Arc, RwLock};
use std::sync::RwLock;

use crate::repo::{RepoRef, SyncStatus};

Expand All @@ -15,7 +15,7 @@ enum ControlEvent {
pub struct SyncPipes {
reporef: RepoRef,
progress: super::ProgressStream,
event: Arc<RwLock<Option<ControlEvent>>>,
event: RwLock<Option<ControlEvent>>,
}

impl SyncPipes {
Expand Down
70 changes: 41 additions & 29 deletions server/bleep/src/background/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub(crate) struct SyncHandle {
pub(crate) reporef: RepoRef,
pub(crate) new_branch_filters: Option<crate::repo::BranchFilter>,
pub(crate) app: Application,
pub(super) pipes: Arc<SyncPipes>,
pub(super) pipes: SyncPipes,
exited: flume::Sender<SyncStatus>,
exit_signal: flume::Receiver<SyncStatus>,
}
Expand Down Expand Up @@ -87,23 +87,49 @@ impl Drop for SyncHandle {
}

impl SyncHandle {
pub(crate) fn new(
pub(crate) async fn new(
app: Application,
reporef: RepoRef,
status: super::ProgressStream,
new_branch_filters: Option<crate::repo::BranchFilter>,
) -> Arc<Self> {
let (exited, exit_signal) = flume::bounded(1);
let pipes = SyncPipes::new(reporef.clone(), status).into();
Self {
app,
let pipes = SyncPipes::new(reporef.clone(), status);
let current = app
.repo_pool
.entry_async(reporef.clone())
.await
.or_insert_with(|| {
let name = reporef.to_string();
let disk_path = app
.config
.source
.repo_path_for_name(&name.replace('/', "_"));

let remote = reporef.as_ref().into();

Repository {
disk_path,
remote,
sync_status: SyncStatus::Queued,
last_index_unix_secs: 0,
last_commit_unix_secs: 0,
most_common_lang: None,
branch_filter: None,
}
});

let sh = Self {
app: app.clone(),
reporef: reporef.clone(),
pipes,
reporef,
new_branch_filters,
exited,
exit_signal,
}
.into()
};

sh.pipes.status(&sh, current.get().sync_status.clone());
sh.into()
}

pub(super) fn notify_done(&self) -> flume::Receiver<SyncStatus> {
Expand Down Expand Up @@ -333,41 +359,27 @@ impl SyncHandle {
Some(new_status)
}

/// Will return the current Repository, inserting a new one if none
pub(crate) async fn create_new(&self, repo: impl FnOnce() -> Repository) -> Repository {
let current = self
.app
.repo_pool
.entry_async(self.reporef.clone())
.await
.or_insert_with(repo)
.get()
.clone();

self.pipes.status(self, current.sync_status.clone());
current
}

pub(crate) async fn sync_lock(&self) -> Option<std::result::Result<(), RemoteError>> {
let new = self
pub(crate) async fn sync_lock(&self) -> std::result::Result<Repository, RemoteError> {
let repo = self
.app
.repo_pool
.update_async(&self.reporef, |_k, repo| {
if repo.sync_status == SyncStatus::Syncing {
Err(RemoteError::SyncInProgress)
} else {
repo.sync_status = SyncStatus::Syncing;
Ok(repo.sync_status.clone())
Ok(repo.clone())
}
})
.await;

if let Some(Ok(new_status)) = new {
if let Some(Ok(repo)) = repo {
let new_status = repo.sync_status.clone();
debug!(?self.reporef, ?new_status, "new status");
self.pipes.status(self, new_status);
Some(Ok(()))
Ok(repo)
} else {
new.map(|inner| inner.map(|_| ()))
repo.expect("repo was already deleted")
}
}
}
3 changes: 2 additions & 1 deletion server/bleep/src/ee/background.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ impl BoundSyncQueue {
reporef,
self.1.progress.clone(),
Some(new_branches),
);
)
.await;
self.1.queue.push(handle).await;
}
}
4 changes: 4 additions & 0 deletions server/bleep/src/periodic/remotes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ async fn update_credentials(app: &Application) {
}

pub(crate) async fn check_repo_updates(app: Application) {
while app.credentials.github().is_none() {
sleep(Duration::from_millis(100)).await
}

let handles: Arc<scc::HashMap<RepoRef, JoinHandle<_>>> = Arc::default();
loop {
app.repo_pool
Expand Down
61 changes: 23 additions & 38 deletions server/bleep/src/remotes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use anyhow::Context;
use gix::sec::identity::Account;
use ignore::WalkBuilder;
use serde::{Deserialize, Serialize};
use tracing::{error, warn};
use tracing::{debug, error, warn};

use crate::{
background::SyncHandle,
Expand Down Expand Up @@ -298,40 +298,47 @@ pub(crate) enum BackendCredential {
}

impl BackendCredential {
#[tracing::instrument(fields(repo=%sync_handle.reporef), skip_all)]
pub(crate) async fn sync(self, sync_handle: &SyncHandle) -> Result<()> {
let SyncHandle { app, .. } = sync_handle;

use BackendCredential::*;
let existing = sync_handle.sync_lock().await;

let Github(gh) = self;
let synced = match existing {
Some(Err(err)) => return Err(err),
Some(Ok(_)) => {
let repo = sync_handle
.repo()
.expect("repo exists & locked, this shouldn't happen");
gh.auth.pull_repo(repo).await
}
None => {
let repo = create_repository(app, sync_handle).await;
gh.auth.clone_repo(repo).await
let mut synced = match existing {
Err(err) => return Err(err),
Ok(repo) if repo.last_index_unix_secs == 0 && repo.disk_path.exists() => {
// it is possible syncing was killed, but the repo is
// intact. pull if the dir exists, then quietly revert
// to cloning if that fails
if let Ok(success) = gh.auth.pull_repo(&repo).await {
Ok(success)
} else {
gh.auth.clone_repo(&repo).await
}
}
Ok(repo) if repo.last_index_unix_secs == 0 => gh.auth.clone_repo(&repo).await,
Ok(repo) => gh.auth.pull_repo(&repo).await,
};

let new_status = match synced {
Ok(_) => SyncStatus::Queued,
Err(ref err) => {
Err(err) => {
warn!(?err, "sync failed; removing dir before retry");

let repo = sync_handle
.repo()
.expect("repo exists & locked, this shouldn't happen");

// try cloning again
_ = tokio::fs::remove_dir_all(&repo.disk_path).await;
let removed = tokio::fs::remove_dir_all(&repo.disk_path).await;
debug!(?removed, "removing recursively");

match gh.auth.clone_repo(repo).await {
synced = gh.auth.clone_repo(&repo).await;
match synced {
Ok(_) => SyncStatus::Queued,
Err(_) => SyncStatus::Error {
Err(ref err) => SyncStatus::Error {
message: err.to_string(),
},
}
Expand All @@ -346,25 +353,3 @@ impl BackendCredential {
synced
}
}

async fn create_repository<'a>(app: &'a Application, sync_handle: &SyncHandle) -> Repository {
let name = sync_handle.reporef.to_string();
let disk_path = app
.config
.source
.repo_path_for_name(&name.replace('/', "_"));

let remote = sync_handle.reporef.as_ref().into();

sync_handle
.create_new(|| Repository {
disk_path,
remote,
sync_status: SyncStatus::Syncing,
last_index_unix_secs: 0,
last_commit_unix_secs: 0,
most_common_lang: None,
branch_filter: None,
})
.await
}
10 changes: 5 additions & 5 deletions server/bleep/src/remotes/github.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,14 @@ impl Auth {
}

impl Auth {
pub(crate) async fn clone_repo(&self, repo: Repository) -> Result<()> {
self.check_repo(&repo).await?;
pub(crate) async fn clone_repo(&self, repo: &Repository) -> Result<()> {
self.check_repo(repo).await?;
git_clone(self.git_cred(), &repo.remote.to_string(), &repo.disk_path).await
}

pub(crate) async fn pull_repo(&self, repo: Repository) -> Result<()> {
self.check_repo(&repo).await?;
git_pull(self.git_cred(), &repo).await
pub(crate) async fn pull_repo(&self, repo: &Repository) -> Result<()> {
self.check_repo(repo).await?;
git_pull(self.git_cred(), repo).await
}

pub async fn check_repo(&self, repo: &Repository) -> Result<()> {
Expand Down
26 changes: 22 additions & 4 deletions server/bleep/src/repo/iterator/git.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,19 @@ impl GitWalker {

let local_git = git.to_thread_local();
let mut head = local_git.head()?;

// HEAD name needs to be pinned to the remote pointer
//
// Otherwise the local branch will never advance to the
// remote's branch ref
//
// The easiest here is to check by name, and assume the
// default remote is `origin`, since we don't configure it
// otherwise.
let head_name = head
.clone()
.try_into_referent()
.map(|r| r.name().to_owned());
.map(|r| format!("origin/{}", human_readable_branch_name(&r)));

let refs = local_git.references()?;
let trees = if head_name.is_none() && matches!(branches, BranchFilter::Head) {
Expand All @@ -72,16 +81,25 @@ impl GitWalker {
} else {
refs.all()?
.filter_map(Result::ok)
// Check if it's HEAD
// Normalize the name of the branch for further steps
//
.map(|r| {
let name = human_readable_branch_name(&r);
(
head_name
.as_ref()
.map(|head| head.as_ref() == r.name())
.map(|head| head == &name)
.unwrap_or_default(),
human_readable_branch_name(&r),
name,
r,
)
})
// Only consider remote branches
//
.filter(|(_, name, _)| name.starts_with("origin/"))
// Apply branch filters, along whether it's HEAD
//
.filter(|(is_head, name, _)| branches.filter(*is_head, name))
.filter_map(|(is_head, branch, r)| -> Option<_> {
Some((
Expand Down Expand Up @@ -134,7 +152,7 @@ impl GitWalker {
}

// the HEAD branch will not have an origin prefix
branches.insert(format!("origin/{}", branch.trim_start_matches("origin/")));
branches.insert(branch);
acc
},
);
Expand Down

0 comments on commit 8f6e4a8

Please sign in to comment.