diff --git a/src/bin/cratesfyi.rs b/src/bin/cratesfyi.rs index 1e36e2dd1..1e22c29f8 100644 --- a/src/bin/cratesfyi.rs +++ b/src/bin/cratesfyi.rs @@ -16,9 +16,9 @@ use docs_rs::utils::{ remove_crate_priority, set_config, set_crate_priority, ConfigName, }; use docs_rs::{ - start_background_metrics_webserver, start_web_server, AsyncStorage, BuildQueue, Config, - Context, Index, InstanceMetrics, PackageKind, RegistryApi, RustwideBuilder, ServiceMetrics, - Storage, + start_background_metrics_webserver, start_web_server, AsyncBuildQueue, AsyncStorage, + BuildQueue, Config, Context, Index, InstanceMetrics, PackageKind, RegistryApi, RustwideBuilder, + ServiceMetrics, Storage, }; use futures_util::StreamExt; use humantime::Duration; @@ -202,7 +202,14 @@ impl CommandLine { start_background_metrics_webserver(Some(metric_server_socket_addr), &ctx)?; - docs_rs::utils::watch_registry(ctx.build_queue()?, ctx.config()?, ctx.index()?)?; + ctx.runtime()?.block_on(async { + docs_rs::utils::watch_registry( + ctx.async_build_queue().await?, + ctx.config()?, + ctx.index()?, + ) + .await + })?; } Self::StartBuildServer { metric_server_socket_addr, @@ -274,12 +281,13 @@ enum QueueSubcommand { impl QueueSubcommand { fn handle_args(self, ctx: BinContext) -> Result<()> { + let build_queue = ctx.build_queue()?; match self { Self::Add { crate_name, crate_version, build_priority, - } => ctx.build_queue()?.add_crate( + } => build_queue.add_crate( &crate_name, &crate_version, build_priority, @@ -287,7 +295,7 @@ impl QueueSubcommand { )?, Self::GetLastSeenReference => { - if let Some(reference) = ctx.build_queue()?.last_seen_reference()? { + if let Some(reference) = build_queue.last_seen_reference()? { println!("Last seen reference: {reference}"); } else { println!("No last seen reference available"); @@ -305,7 +313,7 @@ impl QueueSubcommand { (_, _) => unreachable!(), }; - ctx.build_queue()?.set_last_seen_reference(reference)?; + build_queue.set_last_seen_reference(reference)?; println!("Set last seen reference: {reference}"); } @@ -428,7 +436,6 @@ enum BuildSubcommand { impl BuildSubcommand { fn handle_args(self, ctx: BinContext) -> Result<()> { let build_queue = ctx.build_queue()?; - let rustwide_builder = || -> Result { RustwideBuilder::init(&ctx) }; match self { @@ -817,6 +824,7 @@ enum DeleteSubcommand { struct BinContext { build_queue: OnceCell>, + async_build_queue: tokio::sync::OnceCell>, storage: OnceCell>, cdn: tokio::sync::OnceCell>, config: OnceCell>, @@ -833,6 +841,7 @@ impl BinContext { fn new() -> Self { Self { build_queue: OnceCell::new(), + async_build_queue: tokio::sync::OnceCell::new(), storage: OnceCell::new(), cdn: tokio::sync::OnceCell::new(), config: OnceCell::new(), @@ -864,11 +873,8 @@ impl Context for BinContext { fn build_queue(self) -> BuildQueue = { let runtime = self.runtime()?; BuildQueue::new( - self.pool()?, - self.instance_metrics()?, - self.config()?, runtime.clone(), - runtime.block_on(self.async_storage())?, + runtime.block_on(self.async_build_queue())? ) }; fn storage(self) -> Storage = { @@ -880,8 +886,7 @@ impl Context for BinContext { }; fn config(self) -> Config = Config::from_env()?; fn service_metrics(self) -> ServiceMetrics = { - let runtime = self.runtime()?; - ServiceMetrics::new(runtime)? + ServiceMetrics::new()? }; fn instance_metrics(self) -> InstanceMetrics = InstanceMetrics::new()?; fn runtime(self) -> Runtime = { @@ -928,6 +933,21 @@ impl Context for BinContext { )) } + async fn async_build_queue(&self) -> Result> { + Ok(self + .async_build_queue + .get_or_try_init(|| async { + Ok::<_, Error>(Arc::new(AsyncBuildQueue::new( + self.pool()?, + self.instance_metrics()?, + self.config()?, + self.async_storage().await?, + ))) + }) + .await? + .clone()) + } + async fn cdn(&self) -> Result> { let config = self.config()?; Ok(self diff --git a/src/build_queue.rs b/src/build_queue.rs index a0492304e..8a6d4b07a 100644 --- a/src/build_queue.rs +++ b/src/build_queue.rs @@ -26,73 +26,68 @@ pub(crate) struct QueuedCrate { } #[derive(Debug)] -pub struct BuildQueue { +pub struct AsyncBuildQueue { config: Arc, storage: Arc, pub(crate) db: Pool, metrics: Arc, - runtime: Arc, max_attempts: i32, } -impl BuildQueue { +impl AsyncBuildQueue { pub fn new( db: Pool, metrics: Arc, config: Arc, - runtime: Arc, storage: Arc, ) -> Self { - BuildQueue { + AsyncBuildQueue { max_attempts: config.build_attempts.into(), config, db, metrics, storage, - runtime, } } - pub fn last_seen_reference(&self) -> Result> { - self.runtime.block_on(async { - let mut conn = self.db.get_async().await?; - if let Some(value) = - get_config::(&mut conn, ConfigName::LastSeenIndexReference).await? - { - return Ok(Some(crates_index_diff::gix::ObjectId::from_hex( - value.as_bytes(), - )?)); - } - Ok(None) - }) + pub async fn last_seen_reference(&self) -> Result> { + let mut conn = self.db.get_async().await?; + if let Some(value) = + get_config::(&mut conn, ConfigName::LastSeenIndexReference).await? + { + return Ok(Some(crates_index_diff::gix::ObjectId::from_hex( + value.as_bytes(), + )?)); + } + Ok(None) } - pub fn set_last_seen_reference(&self, oid: crates_index_diff::gix::ObjectId) -> Result<()> { - self.runtime.block_on(async { - let mut conn = self.db.get_async().await?; - set_config( - &mut conn, - ConfigName::LastSeenIndexReference, - oid.to_string(), - ) - .await?; - Ok(()) - }) + pub async fn set_last_seen_reference( + &self, + oid: crates_index_diff::gix::ObjectId, + ) -> Result<()> { + let mut conn = self.db.get_async().await?; + set_config( + &mut conn, + ConfigName::LastSeenIndexReference, + oid.to_string(), + ) + .await?; + Ok(()) } #[context("error trying to add {name}-{version} to build queue")] - pub fn add_crate( + pub async fn add_crate( &self, name: &str, version: &str, priority: i32, registry: Option<&str>, ) -> Result<()> { - self.runtime.block_on(async { - let mut conn = self.db.get_async().await?; + let mut conn = self.db.get_async().await?; - sqlx::query!( - "INSERT INTO queue (name, version, priority, registry) + sqlx::query!( + "INSERT INTO queue (name, version, priority, registry) VALUES ($1, $2, $3, $4) ON CONFLICT (name, version) DO UPDATE SET priority = EXCLUDED.priority, @@ -100,80 +95,78 @@ impl BuildQueue { attempt = 0, last_attempt = NULL ;", - name, - version, - priority, - registry, - ) - .execute(&mut *conn) - .await?; + name, + version, + priority, + registry, + ) + .execute(&mut *conn) + .await?; - Ok(()) - }) + Ok(()) } - pub(crate) fn pending_count(&self) -> Result { - Ok(self.pending_count_by_priority()?.values().sum::()) + pub(crate) async fn pending_count(&self) -> Result { + Ok(self + .pending_count_by_priority() + .await? + .values() + .sum::()) } - pub(crate) fn prioritized_count(&self) -> Result { + pub(crate) async fn prioritized_count(&self) -> Result { Ok(self - .pending_count_by_priority()? + .pending_count_by_priority() + .await? .iter() .filter(|(&priority, _)| priority <= 0) .map(|(_, count)| count) .sum::()) } - pub(crate) fn pending_count_by_priority(&self) -> Result> { - self.runtime.block_on(async { - let mut conn = self.db.get_async().await?; + pub(crate) async fn pending_count_by_priority(&self) -> Result> { + let mut conn = self.db.get_async().await?; - Ok(sqlx::query!( - r#" + Ok(sqlx::query!( + r#" SELECT priority, COUNT(*) as "count!" FROM queue WHERE attempt < $1 GROUP BY priority"#, - self.max_attempts, - ) - .fetch(&mut *conn) - .map_ok(|row| (row.priority, row.count as usize)) - .try_collect() - .await?) - }) + self.max_attempts, + ) + .fetch(&mut *conn) + .map_ok(|row| (row.priority, row.count as usize)) + .try_collect() + .await?) } - pub(crate) fn failed_count(&self) -> Result { - self.runtime.block_on(async { - let mut conn = self.db.get_async().await?; + pub(crate) async fn failed_count(&self) -> Result { + let mut conn = self.db.get_async().await?; - Ok(sqlx::query_scalar!( - r#"SELECT COUNT(*) as "count!" FROM queue WHERE attempt >= $1;"#, - self.max_attempts, - ) - .fetch_one(&mut *conn) - .await? as usize) - }) + Ok(sqlx::query_scalar!( + r#"SELECT COUNT(*) as "count!" FROM queue WHERE attempt >= $1;"#, + self.max_attempts, + ) + .fetch_one(&mut *conn) + .await? as usize) } - pub(crate) fn queued_crates(&self) -> Result> { - self.runtime.block_on(async { - let mut conn = self.db.get_async().await?; + pub(crate) async fn queued_crates(&self) -> Result> { + let mut conn = self.db.get_async().await?; - Ok(sqlx::query_as!( - QueuedCrate, - "SELECT id, name, version, priority, registry + Ok(sqlx::query_as!( + QueuedCrate, + "SELECT id, name, version, priority, registry FROM queue WHERE attempt < $1 ORDER BY priority ASC, attempt ASC, id ASC", - self.max_attempts - ) - .fetch_all(&mut *conn) - .await?) - }) + self.max_attempts + ) + .fetch_all(&mut *conn) + .await?) } pub(crate) async fn has_build_queued(&self, name: &str, version: &str) -> Result { @@ -194,159 +187,57 @@ impl BuildQueue { .await? .is_some()) } - - fn process_next_crate( - &self, - f: impl FnOnce(&QueuedCrate) -> Result, - ) -> Result<()> { - let mut conn = self.runtime.block_on(self.db.get_async())?; - let mut transaction = self.runtime.block_on(conn.begin())?; - - // fetch the next available crate from the queue table. - // We are using `SELECT FOR UPDATE` inside a transaction so - // the QueuedCrate is locked until we are finished with it. - // `SKIP LOCKED` here will enable another build-server to just - // skip over taken (=locked) rows and start building the first - // available one. - let to_process = match self.runtime.block_on( - sqlx::query_as!( - QueuedCrate, - "SELECT id, name, version, priority, registry - FROM queue - WHERE - attempt < $1 AND - (last_attempt IS NULL OR last_attempt < NOW() - make_interval(secs => $2)) - ORDER BY priority ASC, attempt ASC, id ASC - LIMIT 1 - FOR UPDATE SKIP LOCKED", - self.max_attempts, - self.config.delay_between_build_attempts.as_secs_f64(), - ) - .fetch_optional(&mut *transaction), - )? { - Some(krate) => krate, - None => return Ok(()), - }; - - let res = self - .metrics - .build_time - .observe_closure_duration(|| f(&to_process)); - - self.metrics.total_builds.inc(); - if let Err(err) = self.runtime.block_on(cdn::queue_crate_invalidation( - &mut transaction, - &self.config, - &to_process.name, - )) { - report_error(&err); - } - - let mut increase_attempt_count = || -> Result<()> { - let attempt: i32 = self.runtime.block_on( - sqlx::query_scalar!( - "UPDATE queue - SET - attempt = attempt + 1, - last_attempt = NOW() - WHERE id = $1 - RETURNING attempt;", - to_process.id, - ) - .fetch_one(&mut *transaction), - )?; - - if attempt >= self.max_attempts { - self.metrics.failed_builds.inc(); - } - Ok(()) - }; - - match res { - Ok(BuildPackageSummary { - should_reattempt: false, - successful: _, - }) => { - self.runtime.block_on( - sqlx::query!("DELETE FROM queue WHERE id = $1;", to_process.id) - .execute(&mut *transaction), - )?; - } - Ok(BuildPackageSummary { - should_reattempt: true, - successful: _, - }) => { - increase_attempt_count()?; - } - Err(e) => { - increase_attempt_count()?; - report_error(&e.context(format!( - "Failed to build package {}-{} from queue", - to_process.name, to_process.version - ))) - } - } - - self.runtime.block_on(transaction.commit())?; - Ok(()) - } } /// Locking functions. -impl BuildQueue { +impl AsyncBuildQueue { /// Checks for the lock and returns whether it currently exists. - pub fn is_locked(&self) -> Result { - self.runtime.block_on(async { - let mut conn = self.db.get_async().await?; + pub async fn is_locked(&self) -> Result { + let mut conn = self.db.get_async().await?; - Ok(get_config::(&mut conn, ConfigName::QueueLocked) - .await? - .unwrap_or(false)) - }) + Ok(get_config::(&mut conn, ConfigName::QueueLocked) + .await? + .unwrap_or(false)) } /// lock the queue. Daemon will check this lock and stop operating if it exists. - pub fn lock(&self) -> Result<()> { - self.runtime.block_on(async { - let mut conn = self.db.get_async().await?; - set_config(&mut conn, ConfigName::QueueLocked, true).await - }) + pub async fn lock(&self) -> Result<()> { + let mut conn = self.db.get_async().await?; + set_config(&mut conn, ConfigName::QueueLocked, true).await } /// unlock the queue. - pub fn unlock(&self) -> Result<()> { - self.runtime.block_on(async { - let mut conn = self.db.get_async().await?; - set_config(&mut conn, ConfigName::QueueLocked, false).await - }) + pub async fn unlock(&self) -> Result<()> { + let mut conn = self.db.get_async().await?; + set_config(&mut conn, ConfigName::QueueLocked, false).await } } /// Index methods. -impl BuildQueue { +impl AsyncBuildQueue { /// Updates registry index repository and adds new crates into build queue. /// /// Returns the number of crates added - pub fn get_new_crates(&self, index: &Index) -> Result { + pub async fn get_new_crates(&self, index: &Index) -> Result { let diff = index.diff()?; let last_seen_reference = self - .last_seen_reference()? + .last_seen_reference() + .await? .context("no last_seen_reference set in database")?; diff.set_last_seen_reference(last_seen_reference)?; let (changes, new_reference) = diff.peek_changes_ordered()?; - let mut conn = self.runtime.block_on(self.db.get_async())?; + let mut conn = self.db.get_async().await?; let mut crates_added = 0; debug!("queueing changes from {last_seen_reference} to {new_reference}"); for change in &changes { if let Some((ref krate, ..)) = change.crate_deleted() { - match self - .runtime - .block_on(delete_crate(&mut conn, &self.storage, &self.config, krate)) + match delete_crate(&mut conn, &self.storage, &self.config, krate) + .await .with_context(|| format!("failed to delete crate {krate}")) { Ok(_) => info!( @@ -355,52 +246,45 @@ impl BuildQueue { ), Err(err) => report_error(&err), } - if let Err(err) = self.runtime.block_on(cdn::queue_crate_invalidation( - &mut conn, - &self.config, - krate, - )) { + if let Err(err) = + cdn::queue_crate_invalidation(&mut conn, &self.config, krate).await + { report_error(&err); } continue; } if let Some(release) = change.version_deleted() { - match self - .runtime - .block_on(delete_version( - &mut conn, - &self.storage, - &self.config, - &release.name, - &release.version, - )) - .with_context(|| { - format!( - "failed to delete version {}-{}", - release.name, release.version - ) - }) { + match delete_version( + &mut conn, + &self.storage, + &self.config, + &release.name, + &release.version, + ) + .await + .with_context(|| { + format!( + "failed to delete version {}-{}", + release.name, release.version + ) + }) { Ok(_) => info!( "release {}-{} was deleted from the index and the database", release.name, release.version ), Err(err) => report_error(&err), } - if let Err(err) = self.runtime.block_on(cdn::queue_crate_invalidation( - &mut conn, - &self.config, - &release.name, - )) { + if let Err(err) = + cdn::queue_crate_invalidation(&mut conn, &self.config, &release.name).await + { report_error(&err); } continue; } if let Some(release) = change.added() { - let priority = self - .runtime - .block_on(get_crate_priority(&mut conn, &release.name))?; + let priority = get_crate_priority(&mut conn, &release.name).await?; match self .add_crate( @@ -409,6 +293,7 @@ impl BuildQueue { priority, index.repository_url(), ) + .await .with_context(|| { format!( "failed adding {}-{} into build queue", @@ -432,20 +317,21 @@ impl BuildQueue { if let Some(release) = yanked.or(unyanked) { // FIXME: delay yanks of crates that have not yet finished building // https://github.com/rust-lang/docs.rs/issues/1934 - if let Err(err) = self.runtime.block_on(self.set_yanked_inner( - &mut conn, - release.name.as_str(), - release.version.as_str(), - yanked.is_some(), - )) { + if let Err(err) = self + .set_yanked_inner( + &mut conn, + release.name.as_str(), + release.version.as_str(), + yanked.is_some(), + ) + .await + { report_error(&err); } - if let Err(err) = self.runtime.block_on(cdn::queue_crate_invalidation( - &mut conn, - &self.config, - &release.name, - )) { + if let Err(err) = + cdn::queue_crate_invalidation(&mut conn, &self.config, &release.name).await + { report_error(&err); } } @@ -454,17 +340,15 @@ impl BuildQueue { // set the reference in the database // so this survives recreating the registry watcher // server. - self.set_last_seen_reference(new_reference)?; + self.set_last_seen_reference(new_reference).await?; Ok(crates_added) } - pub fn set_yanked(&self, name: &str, version: &str, yanked: bool) -> Result<()> { - self.runtime.block_on(async { - let mut conn = self.db.get_async().await?; - self.set_yanked_inner(&mut conn, name, version, yanked) - .await - }) + pub async fn set_yanked(&self, name: &str, version: &str, yanked: bool) -> Result<()> { + let mut conn = self.db.get_async().await?; + self.set_yanked_inner(&mut conn, name, version, yanked) + .await } #[context("error trying to set {name}-{version} to yanked: {yanked}")] @@ -520,6 +404,176 @@ impl BuildQueue { Ok(()) } +} + +#[derive(Debug)] +pub struct BuildQueue { + runtime: Arc, + inner: Arc, +} + +/// sync versions of async methods +impl BuildQueue { + pub fn add_crate( + &self, + name: &str, + version: &str, + priority: i32, + registry: Option<&str>, + ) -> Result<()> { + self.runtime + .block_on(self.inner.add_crate(name, version, priority, registry)) + } + + pub fn set_yanked(&self, name: &str, version: &str, yanked: bool) -> Result<()> { + self.runtime + .block_on(self.inner.set_yanked(name, version, yanked)) + } + pub fn is_locked(&self) -> Result { + self.runtime.block_on(self.inner.is_locked()) + } + pub fn lock(&self) -> Result<()> { + self.runtime.block_on(self.inner.lock()) + } + pub fn unlock(&self) -> Result<()> { + self.runtime.block_on(self.inner.unlock()) + } + pub fn last_seen_reference(&self) -> Result> { + self.runtime.block_on(self.inner.last_seen_reference()) + } + pub fn set_last_seen_reference(&self, oid: crates_index_diff::gix::ObjectId) -> Result<()> { + self.runtime + .block_on(self.inner.set_last_seen_reference(oid)) + } + #[cfg(test)] + pub(crate) fn pending_count(&self) -> Result { + self.runtime.block_on(self.inner.pending_count()) + } + #[cfg(test)] + pub(crate) fn prioritized_count(&self) -> Result { + self.runtime.block_on(self.inner.prioritized_count()) + } + #[cfg(test)] + pub(crate) fn pending_count_by_priority(&self) -> Result> { + self.runtime + .block_on(self.inner.pending_count_by_priority()) + } + #[cfg(test)] + pub(crate) fn failed_count(&self) -> Result { + self.runtime.block_on(self.inner.failed_count()) + } + #[cfg(test)] + pub(crate) fn queued_crates(&self) -> Result> { + self.runtime.block_on(self.inner.queued_crates()) + } + #[cfg(test)] + pub(crate) fn has_build_queued(&self, name: &str, version: &str) -> Result { + self.runtime + .block_on(self.inner.has_build_queued(name, version)) + } +} + +impl BuildQueue { + pub fn new(runtime: Arc, inner: Arc) -> Self { + Self { runtime, inner } + } + + fn process_next_crate( + &self, + f: impl FnOnce(&QueuedCrate) -> Result, + ) -> Result<()> { + let mut conn = self.runtime.block_on(self.inner.db.get_async())?; + let mut transaction = self.runtime.block_on(conn.begin())?; + + // fetch the next available crate from the queue table. + // We are using `SELECT FOR UPDATE` inside a transaction so + // the QueuedCrate is locked until we are finished with it. + // `SKIP LOCKED` here will enable another build-server to just + // skip over taken (=locked) rows and start building the first + // available one. + let to_process = match self.runtime.block_on( + sqlx::query_as!( + QueuedCrate, + "SELECT id, name, version, priority, registry + FROM queue + WHERE + attempt < $1 AND + (last_attempt IS NULL OR last_attempt < NOW() - make_interval(secs => $2)) + ORDER BY priority ASC, attempt ASC, id ASC + LIMIT 1 + FOR UPDATE SKIP LOCKED", + self.inner.max_attempts, + self.inner.config.delay_between_build_attempts.as_secs_f64(), + ) + .fetch_optional(&mut *transaction), + )? { + Some(krate) => krate, + None => return Ok(()), + }; + + let res = self + .inner + .metrics + .build_time + .observe_closure_duration(|| f(&to_process)); + + self.inner.metrics.total_builds.inc(); + if let Err(err) = self.runtime.block_on(cdn::queue_crate_invalidation( + &mut transaction, + &self.inner.config, + &to_process.name, + )) { + report_error(&err); + } + + let mut increase_attempt_count = || -> Result<()> { + let attempt: i32 = self.runtime.block_on( + sqlx::query_scalar!( + "UPDATE queue + SET + attempt = attempt + 1, + last_attempt = NOW() + WHERE id = $1 + RETURNING attempt;", + to_process.id, + ) + .fetch_one(&mut *transaction), + )?; + + if attempt >= self.inner.max_attempts { + self.inner.metrics.failed_builds.inc(); + } + Ok(()) + }; + + match res { + Ok(BuildPackageSummary { + should_reattempt: false, + successful: _, + }) => { + self.runtime.block_on( + sqlx::query!("DELETE FROM queue WHERE id = $1;", to_process.id) + .execute(&mut *transaction), + )?; + } + Ok(BuildPackageSummary { + should_reattempt: true, + successful: _, + }) => { + increase_attempt_count()?; + } + Err(e) => { + increase_attempt_count()?; + report_error(&e.context(format!( + "Failed to build package {}-{} from queue", + to_process.name, to_process.version + ))) + } + } + + self.runtime.block_on(transaction.commit())?; + Ok(()) + } fn update_toolchain(&self, builder: &mut RustwideBuilder) -> Result<()> { let updated = retry( @@ -559,6 +613,7 @@ impl BuildQueue { builder: &mut RustwideBuilder, ) -> Result { let mut processed = false; + self.process_next_crate(|krate| { processed = true; @@ -605,13 +660,13 @@ mod tests { #[test] fn test_add_duplicate_doesnt_fail_last_priority_wins() { - crate::test::wrapper(|env| { - let queue = env.build_queue(); + crate::test::async_wrapper(|env| async move { + let queue = env.async_build_queue().await; - queue.add_crate("some_crate", "0.1.1", 0, None)?; - queue.add_crate("some_crate", "0.1.1", 9, None)?; + queue.add_crate("some_crate", "0.1.1", 0, None).await?; + queue.add_crate("some_crate", "0.1.1", 9, None).await?; - let queued_crates = queue.queued_crates()?; + let queued_crates = queue.queued_crates().await?; assert_eq!(queued_crates.len(), 1); assert_eq!(queued_crates[0].priority, 9); @@ -621,43 +676,37 @@ mod tests { #[test] fn test_add_duplicate_resets_attempts_and_priority() { - crate::test::wrapper(|env| { + crate::test::async_wrapper(|env| async move { env.override_config(|config| { config.build_attempts = 5; }); - let queue = env.build_queue(); + let queue = env.async_build_queue().await; - env.runtime().block_on(async { - let mut conn = env.async_db().await.async_conn().await; - sqlx::query!( - " + let mut conn = env.async_db().await.async_conn().await; + sqlx::query!( + " INSERT INTO queue (name, version, priority, attempt, last_attempt ) VALUES ('failed_crate', '0.1.1', 0, 99, NOW())", - ) - .execute(&mut *conn) - .await - })?; + ) + .execute(&mut *conn) + .await?; - assert_eq!(queue.pending_count()?, 0); + assert_eq!(queue.pending_count().await?, 0); - queue.add_crate("failed_crate", "0.1.1", 9, None)?; + queue.add_crate("failed_crate", "0.1.1", 9, None).await?; - assert_eq!(queue.pending_count()?, 1); + assert_eq!(queue.pending_count().await?, 1); - let row = env.runtime().block_on(async { - let mut conn = env.async_db().await.async_conn().await; - sqlx::query!( - "SELECT priority, attempt, last_attempt + let row = sqlx::query!( + "SELECT priority, attempt, last_attempt FROM queue WHERE name = $1 AND version = $2", - "failed_crate", - "0.1.1", - ) - .fetch_one(&mut *conn) - .await - .unwrap() - }); + "failed_crate", + "0.1.1", + ) + .fetch_one(&mut *conn) + .await?; assert_eq!(row.priority, 9); assert_eq!(row.attempt, 0); @@ -668,21 +717,20 @@ mod tests { #[test] fn test_has_build_queued() { - crate::test::wrapper(|env| { - let queue = env.build_queue(); + crate::test::async_wrapper(|env| async move { + let queue = env.async_build_queue().await; - queue.add_crate("dummy", "0.1.1", 0, None)?; - env.runtime().block_on(async { - let mut conn = env.async_db().await.async_conn().await; - assert!(queue.has_build_queued("dummy", "0.1.1").await.unwrap()); + queue.add_crate("dummy", "0.1.1", 0, None).await?; - sqlx::query!("UPDATE queue SET attempt = 6") - .execute(&mut *conn) - .await - .unwrap(); + let mut conn = env.async_db().await.async_conn().await; + assert!(queue.has_build_queued("dummy", "0.1.1").await.unwrap()); - assert!(!queue.has_build_queued("dummy", "0.1.1").await.unwrap()); - }); + sqlx::query!("UPDATE queue SET attempt = 6") + .execute(&mut *conn) + .await + .unwrap(); + + assert!(!queue.has_build_queued("dummy", "0.1.1").await.unwrap()); Ok(()) }) @@ -819,12 +867,10 @@ mod tests { assert!(env .runtime() .block_on(async { - dbg!( - cdn::queued_or_active_crate_invalidations( - &mut *env.async_db().await.async_conn().await, - ) - .await + cdn::queued_or_active_crate_invalidations( + &mut *env.async_db().await.async_conn().await, ) + .await })? .is_empty()); diff --git a/src/context.rs b/src/context.rs index 4c8b07177..aa21c6f1a 100644 --- a/src/context.rs +++ b/src/context.rs @@ -3,7 +3,8 @@ use crate::db::Pool; use crate::error::Result; use crate::repositories::RepositoryStatsUpdater; use crate::{ - AsyncStorage, BuildQueue, Config, Index, InstanceMetrics, RegistryApi, ServiceMetrics, Storage, + AsyncBuildQueue, AsyncStorage, BuildQueue, Config, Index, InstanceMetrics, RegistryApi, + ServiceMetrics, Storage, }; use axum::async_trait; use std::sync::Arc; @@ -12,6 +13,7 @@ use tokio::runtime::Runtime; #[async_trait] pub trait Context { fn config(&self) -> Result>; + async fn async_build_queue(&self) -> Result>; fn build_queue(&self) -> Result>; fn storage(&self) -> Result>; async fn async_storage(&self) -> Result>; diff --git a/src/lib.rs b/src/lib.rs index 6036c74f7..a454af96d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,7 +2,7 @@ //! documentation of crates for the Rust Programming Language. #![allow(clippy::cognitive_complexity)] -pub use self::build_queue::BuildQueue; +pub use self::build_queue::{AsyncBuildQueue, BuildQueue}; pub use self::config::Config; pub use self::context::Context; pub use self::docbuilder::PackageKind; diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs index aa1d7056e..50e52f236 100644 --- a/src/metrics/mod.rs +++ b/src/metrics/mod.rs @@ -2,16 +2,14 @@ mod macros; use self::macros::MetricFromOpts; -use crate::{cdn, db::Pool, target::TargetAtom, BuildQueue, Config}; +use crate::{cdn, db::Pool, target::TargetAtom, AsyncBuildQueue, Config}; use anyhow::Error; use dashmap::DashMap; use prometheus::proto::MetricFamily; use std::{ collections::HashSet, - sync::Arc, time::{Duration, Instant}, }; -use tokio::runtime::Runtime; load_metric_type!(IntGauge as single); load_metric_type!(IntCounter as single); @@ -250,15 +248,13 @@ pub struct ServiceMetrics { pub queued_cdn_invalidations_by_distribution: IntGaugeVec, registry: prometheus::Registry, - runtime: Arc, } impl ServiceMetrics { - pub fn new(runtime: Arc) -> Result { + pub fn new() -> Result { let registry = prometheus::Registry::new(); Ok(Self { registry: registry.clone(), - runtime, queued_crates_count: metric_from_opts( ®istry, "queued_crates_count", @@ -298,18 +294,19 @@ impl ServiceMetrics { }) } - pub(crate) fn gather( + pub(crate) async fn gather( &self, pool: &Pool, - queue: &BuildQueue, + queue: &AsyncBuildQueue, config: &Config, ) -> Result, Error> { - self.queue_is_locked.set(queue.is_locked()? as i64); - self.queued_crates_count.set(queue.pending_count()? as i64); + self.queue_is_locked.set(queue.is_locked().await? as i64); + self.queued_crates_count + .set(queue.pending_count().await? as i64); self.prioritized_crates_count - .set(queue.prioritized_count()? as i64); + .set(queue.prioritized_count().await? as i64); - let queue_pending_count = queue.pending_count_by_priority()?; + let queue_pending_count = queue.pending_count_by_priority().await?; // gauges keep their old value per label when it's not removed, reset to zero or updated. // When a priority is used at least once, it would be kept in the metric and the last @@ -336,20 +333,18 @@ impl ServiceMetrics { .set(*count as i64); } - self.runtime.block_on(async { - let mut conn = pool.get_async().await?; - for (distribution_id, count) in - cdn::queued_or_active_crate_invalidation_count_by_distribution(&mut conn, config) - .await? - { - self.queued_cdn_invalidations_by_distribution - .with_label_values(&[&distribution_id]) - .set(count); - } - Ok::<_, Error>(()) - })?; - - self.failed_crates_count.set(queue.failed_count()? as i64); + let mut conn = pool.get_async().await?; + for (distribution_id, count) in + cdn::queued_or_active_crate_invalidation_count_by_distribution(&mut conn, config) + .await? + { + self.queued_cdn_invalidations_by_distribution + .with_label_values(&[&distribution_id]) + .set(count); + } + + self.failed_crates_count + .set(queue.failed_count().await? as i64); Ok(self.registry.gather()) } } diff --git a/src/test/mod.rs b/src/test/mod.rs index 4f06a8dd6..505f453da 100644 --- a/src/test/mod.rs +++ b/src/test/mod.rs @@ -7,7 +7,10 @@ use crate::error::Result; use crate::repositories::RepositoryStatsUpdater; use crate::storage::{AsyncStorage, Storage, StorageKind}; use crate::web::{build_axum_app, cache, page::TemplateData}; -use crate::{BuildQueue, Config, Context, Index, InstanceMetrics, RegistryApi, ServiceMetrics}; +use crate::{ + AsyncBuildQueue, BuildQueue, Config, Context, Index, InstanceMetrics, RegistryApi, + ServiceMetrics, +}; use anyhow::Context as _; use axum::async_trait; use fn_error_context::context; @@ -258,6 +261,7 @@ pub(crate) fn assert_redirect_cached( pub(crate) struct TestEnvironment { build_queue: OnceCell>, + async_build_queue: tokio::sync::OnceCell>, config: OnceCell>, db: tokio::sync::OnceCell, storage: OnceCell>, @@ -293,6 +297,7 @@ impl TestEnvironment { init_logger(); Self { build_queue: OnceCell::new(), + async_build_queue: tokio::sync::OnceCell::new(), config: OnceCell::new(), db: tokio::sync::OnceCell::new(), storage: OnceCell::new(), @@ -363,15 +368,27 @@ impl TestEnvironment { } } + pub(crate) async fn async_build_queue(&self) -> Arc { + self.async_build_queue + .get_or_init(|| async { + Arc::new(AsyncBuildQueue::new( + self.async_db().await.pool(), + self.instance_metrics(), + self.config(), + self.async_storage().await, + )) + }) + .await + .clone() + } + pub(crate) fn build_queue(&self) -> Arc { + let runtime = self.runtime(); self.build_queue .get_or_init(|| { Arc::new(BuildQueue::new( - self.db().pool(), - self.instance_metrics(), - self.config(), - self.runtime(), - self.runtime().block_on(self.async_storage()), + runtime.clone(), + runtime.block_on(self.async_build_queue()), )) }) .clone() @@ -427,10 +444,7 @@ impl TestEnvironment { pub(crate) fn service_metrics(&self) -> Arc { self.service_metrics .get_or_init(|| { - Arc::new( - ServiceMetrics::new(self.runtime()) - .expect("failed to initialize the service metrics"), - ) + Arc::new(ServiceMetrics::new().expect("failed to initialize the service metrics")) }) .clone() } @@ -535,6 +549,10 @@ impl Context for TestEnvironment { Ok(TestEnvironment::config(self)) } + async fn async_build_queue(&self) -> Result> { + Ok(TestEnvironment::async_build_queue(self).await) + } + fn build_queue(&self) -> Result> { Ok(TestEnvironment::build_queue(self)) } diff --git a/src/utils/daemon.rs b/src/utils/daemon.rs index 9b44ed5c5..bcd88e14b 100644 --- a/src/utils/daemon.rs +++ b/src/utils/daemon.rs @@ -6,33 +6,34 @@ use crate::{ cdn, utils::{queue_builder, report_error}, web::start_web_server, - BuildQueue, Config, Context, Index, RustwideBuilder, + AsyncBuildQueue, Config, Context, Index, RustwideBuilder, }; use anyhow::{anyhow, Context as _, Error}; use std::future::Future; use std::sync::Arc; use std::thread; -use std::time::{Duration, Instant}; -use tokio::runtime::Runtime; +use std::time::Duration; +use tokio::{runtime::Runtime, task::spawn_blocking, time::Instant}; use tracing::{debug, info}; /// Run the registry watcher /// NOTE: this should only be run once, otherwise crates would be added /// to the queue multiple times. -pub fn watch_registry( - build_queue: Arc, +pub async fn watch_registry( + build_queue: Arc, config: Arc, index: Arc, ) -> Result<(), Error> { let mut last_gc = Instant::now(); loop { - if build_queue.is_locked()? { + if build_queue.is_locked().await? { debug!("Queue is locked, skipping checking new crates"); } else { debug!("Checking new crates"); match build_queue .get_new_crates(&index) + .await .context("Failed to get new crates") { Ok(n) => debug!("{} crates added to queue", n), @@ -41,26 +42,29 @@ pub fn watch_registry( } if last_gc.elapsed().as_secs() >= config.registry_gc_interval { - index.run_git_gc(); + spawn_blocking({ + let index = index.clone(); + move || index.run_git_gc() + }) + .await?; last_gc = Instant::now(); } - thread::sleep(Duration::from_secs(60)); + tokio::time::sleep(Duration::from_secs(60)).await; } } fn start_registry_watcher(context: &dyn Context) -> Result<(), Error> { - let build_queue = context.build_queue()?; + let build_queue = context.runtime()?.block_on(context.async_build_queue())?; let config = context.config()?; let index = context.index()?; + let runtime = context.runtime()?; - thread::Builder::new() - .name("registry index reader".to_string()) - .spawn(move || { - // space this out to prevent it from clashing against the queue-builder thread on launch - thread::sleep(Duration::from_secs(30)); + runtime.spawn(async { + // space this out to prevent it from clashing against the queue-builder thread on launch + tokio::time::sleep(Duration::from_secs(30)).await; - watch_registry(build_queue, config, index) - })?; + watch_registry(build_queue, config, index).await + }); Ok(()) } diff --git a/src/web/builds.rs b/src/web/builds.rs index fbe22b966..1cd46fbd4 100644 --- a/src/web/builds.rs +++ b/src/web/builds.rs @@ -7,13 +7,12 @@ use crate::{ db::types::BuildStatus, docbuilder::Limits, impl_axum_webpage, - utils::spawn_blocking, web::{ error::AxumResult, extractors::{DbConnection, Path}, filters, match_version, MetaData, ReqVersion, }, - BuildQueue, Config, + AsyncBuildQueue, Config, }; use anyhow::{anyhow, Result}; use axum::{ @@ -156,7 +155,7 @@ async fn build_trigger_check( conn: &mut sqlx::PgConnection, name: &String, version: &Version, - build_queue: &Arc, + build_queue: &Arc, ) -> AxumResult { if !crate_version_exists(&mut *conn, name, version).await? { return Err(AxumNope::VersionNotFound); @@ -182,7 +181,7 @@ const TRIGGERED_REBUILD_PRIORITY: i32 = 5; pub(crate) async fn build_trigger_rebuild_handler( Path((name, version)): Path<(String, Version)>, mut conn: DbConnection, - Extension(build_queue): Extension>, + Extension(build_queue): Extension>, Extension(config): Extension>, opt_auth_header: Option>>, ) -> JsonAxumResult { @@ -208,20 +207,15 @@ pub(crate) async fn build_trigger_rebuild_handler( .await .map_err(JsonAxumNope)?; - spawn_blocking({ - let name = name.clone(); - let version_string = version.to_string(); - move || { - build_queue.add_crate( - &name, - &version_string, - TRIGGERED_REBUILD_PRIORITY, - None, /* because crates.io is the only service that calls this endpoint */ - ) - } - }) - .await - .map_err(|e| JsonAxumNope(e.into()))?; + build_queue + .add_crate( + &name, + &version.to_string(), + TRIGGERED_REBUILD_PRIORITY, + None, /* because crates.io is the only service that calls this endpoint */ + ) + .await + .map_err(|e| JsonAxumNope(e.into()))?; Ok((StatusCode::CREATED, Json(serde_json::json!({})))) } @@ -259,24 +253,13 @@ mod tests { use super::BuildStatus; use crate::{ db::Overrides, - test::{ - assert_cache_control, fake_release_that_failed_before_build, wrapper, FakeBuild, - TestEnvironment, - }, + test::{assert_cache_control, fake_release_that_failed_before_build, wrapper, FakeBuild}, web::cache::CachePolicy, }; use chrono::{DateTime, Utc}; use kuchikiki::traits::TendrilSink; use reqwest::StatusCode; - fn has_build_queued(env: &TestEnvironment, name: &str, version: &str) -> anyhow::Result { - env.runtime().block_on(async move { - let build_queue = env.build_queue(); - - build_queue.has_build_queued(name, version).await - }) - } - #[test] fn build_list_empty_build() { wrapper(|env| { @@ -508,7 +491,7 @@ mod tests { } assert_eq!(env.build_queue().pending_count()?, 0); - assert!(!has_build_queued(env, "foo", "0.1.0")?); + assert!(!env.build_queue().has_build_queued("foo", "0.1.0")?); { let response = env @@ -522,7 +505,7 @@ mod tests { } assert_eq!(env.build_queue().pending_count()?, 1); - assert!(has_build_queued(env, "foo", "0.1.0")?); + assert!(env.build_queue().has_build_queued("foo", "0.1.0")?); { let response = env @@ -542,7 +525,7 @@ mod tests { } assert_eq!(env.build_queue().pending_count()?, 1); - assert!(has_build_queued(env, "foo", "0.1.0")?); + assert!(env.build_queue().has_build_queued("foo", "0.1.0")?); Ok(()) }); diff --git a/src/web/metrics.rs b/src/web/metrics.rs index b5157b2d1..e371f89d3 100644 --- a/src/web/metrics.rs +++ b/src/web/metrics.rs @@ -1,6 +1,6 @@ use crate::{ - db::Pool, metrics::duration_to_seconds, utils::spawn_blocking, web::error::AxumResult, - BuildQueue, Config, InstanceMetrics, ServiceMetrics, + db::Pool, metrics::duration_to_seconds, web::error::AxumResult, AsyncBuildQueue, Config, + InstanceMetrics, ServiceMetrics, }; use anyhow::{Context as _, Result}; use axum::{ @@ -10,20 +10,18 @@ use axum::{ response::IntoResponse, }; use prometheus::{proto::MetricFamily, Encoder, TextEncoder}; -use std::{borrow::Cow, sync::Arc, time::Instant}; +use std::{borrow::Cow, future::Future, sync::Arc, time::Instant}; -async fn fetch_and_render_metrics( - fetch_metrics: impl Fn() -> Result> + Send + 'static, -) -> AxumResult { - let buffer = spawn_blocking(move || { - let metrics_families = fetch_metrics()?; - let mut buffer = Vec::new(); - TextEncoder::new() - .encode(&metrics_families, &mut buffer) - .context("error encoding metrics")?; - Ok(buffer) - }) - .await?; +async fn fetch_and_render_metrics(fetch_metrics: Fut) -> AxumResult +where + Fut: Future>> + Send + 'static, +{ + let metrics_families = fetch_metrics.await?; + + let mut buffer = Vec::new(); + TextEncoder::new() + .encode(&metrics_families, &mut buffer) + .context("error encoding metrics")?; Ok(( StatusCode::OK, @@ -37,12 +35,12 @@ pub(super) async fn metrics_handler( Extension(config): Extension>, Extension(instance_metrics): Extension>, Extension(service_metrics): Extension>, - Extension(queue): Extension>, + Extension(queue): Extension>, ) -> AxumResult { - fetch_and_render_metrics(move || { + fetch_and_render_metrics(async move { let mut families = Vec::new(); families.extend_from_slice(&instance_metrics.gather(&pool)?); - families.extend_from_slice(&service_metrics.gather(&pool, &queue, &config)?); + families.extend_from_slice(&service_metrics.gather(&pool, &queue, &config).await?); Ok(families) }) .await @@ -52,16 +50,16 @@ pub(super) async fn service_metrics_handler( Extension(pool): Extension, Extension(config): Extension>, Extension(metrics): Extension>, - Extension(queue): Extension>, + Extension(queue): Extension>, ) -> AxumResult { - fetch_and_render_metrics(move || metrics.gather(&pool, &queue, &config)).await + fetch_and_render_metrics(async move { metrics.gather(&pool, &queue, &config).await }).await } pub(super) async fn instance_metrics_handler( Extension(pool): Extension, Extension(metrics): Extension>, ) -> AxumResult { - fetch_and_render_metrics(move || metrics.gather(&pool)).await + fetch_and_render_metrics(async move { metrics.gather(&pool) }).await } /// Request recorder middleware diff --git a/src/web/mod.rs b/src/web/mod.rs index 1ae527757..a841558ca 100644 --- a/src/web/mod.rs +++ b/src/web/mod.rs @@ -402,7 +402,7 @@ fn apply_middleware( let has_templates = template_data.is_some(); let runtime = context.runtime()?; let async_storage = runtime.block_on(context.async_storage())?; - let build_queue = context.build_queue()?; + let build_queue = runtime.block_on(context.async_build_queue())?; Ok(router.layer( ServiceBuilder::new() diff --git a/src/web/releases.rs b/src/web/releases.rs index d0bfc0f3b..d26078c8b 100644 --- a/src/web/releases.rs +++ b/src/web/releases.rs @@ -3,7 +3,7 @@ use crate::{ build_queue::QueuedCrate, cdn, impl_axum_webpage, - utils::{report_error, retry_async, spawn_blocking}, + utils::{report_error, retry_async}, web::{ axum_parse_uri_with_params, axum_redirect, encode_url_path, error::{AxumNope, AxumResult}, @@ -12,7 +12,7 @@ use crate::{ page::templates::filters, ReqVersion, }, - BuildQueue, Config, InstanceMetrics, + AsyncBuildQueue, Config, InstanceMetrics, }; use anyhow::{anyhow, bail, Context as _, Result}; use axum::{ @@ -790,7 +790,7 @@ struct BuildQueuePage { impl_axum_webpage! { BuildQueuePage } pub(crate) async fn build_queue_handler( - Extension(build_queue): Extension>, + Extension(build_queue): Extension>, mut conn: DbConnection, ) -> AxumResult { let mut active_cdn_deployments: Vec<_> = cdn::queued_or_active_crate_invalidations(&mut conn) @@ -806,7 +806,7 @@ pub(crate) async fn build_queue_handler( // reverse the list, so the oldest comes first active_cdn_deployments.reverse(); - let mut queue = spawn_blocking(move || build_queue.queued_crates()).await?; + let mut queue = build_queue.queued_crates().await?; for krate in queue.iter_mut() { // The priority here is inverted: in the database if a crate has a higher priority it // will be built after everything else, which is counter-intuitive for people not