From 03c5016e3dfe51dea4e84850ec35826dfdbe7400 Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Fri, 4 Oct 2024 14:47:38 +0200 Subject: [PATCH 1/3] bin/monitor: Convert to async `main()` fn --- src/bin/monitor.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/bin/monitor.rs b/src/bin/monitor.rs index 187b7aff90..74fe6bb69f 100644 --- a/src/bin/monitor.rs +++ b/src/bin/monitor.rs @@ -5,19 +5,24 @@ //! cargo run --bin monitor use anyhow::Result; +use crates_io::tasks::spawn_blocking; use crates_io::worker::jobs; use crates_io::{admin::on_call, db, schema::*}; use crates_io_env_vars::{var, var_parsed}; use crates_io_worker::BackgroundJob; use diesel::prelude::*; -fn main() -> Result<()> { - let conn = &mut db::oneoff_connection()?; +#[tokio::main] +async fn main() -> Result<()> { + spawn_blocking(move || { + let conn = &mut db::oneoff_connection()?; - check_failing_background_jobs(conn)?; - check_stalled_update_downloads(conn)?; - check_spam_attack(conn)?; - Ok(()) + check_failing_background_jobs(conn)?; + check_stalled_update_downloads(conn)?; + check_spam_attack(conn)?; + Ok(()) + }) + .await } /// Check for old background jobs that are not currently running. From dc60164a5271d8bc6027e23e592af3825f270f2e Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Fri, 4 Oct 2024 14:49:52 +0200 Subject: [PATCH 2/3] db: Add `oneoff_async_connection()` fns --- src/db.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/db.rs b/src/db.rs index e14181a7e8..a64b8fba94 100644 --- a/src/db.rs +++ b/src/db.rs @@ -2,7 +2,7 @@ use crate::certs::CRUNCHY; use diesel::{Connection, ConnectionResult, PgConnection, QueryResult}; use diesel_async::pooled_connection::deadpool::{Hook, HookError}; use diesel_async::pooled_connection::ManagerConfig; -use diesel_async::{AsyncPgConnection, RunQueryDsl}; +use diesel_async::{AsyncConnection, AsyncPgConnection, RunQueryDsl}; use native_tls::{Certificate, TlsConnector}; use postgres_native_tls::MakeTlsConnector; use secrecy::ExposeSecret; @@ -23,6 +23,18 @@ pub fn oneoff_connection() -> anyhow::Result { oneoff_connection_with_config(&config).map_err(Into::into) } +pub async fn oneoff_async_connection_with_config( + config: &config::DatabasePools, +) -> ConnectionResult { + let url = connection_url(config, config.primary.url.expose_secret()); + AsyncPgConnection::establish(&url).await +} + +pub async fn oneoff_async_connection() -> anyhow::Result { + let config = config::DatabasePools::full_from_environment(&config::Base::from_environment()?)?; + Ok(oneoff_async_connection_with_config(&config).await?) +} + pub fn connection_url(config: &config::DatabasePools, url: &str) -> String { let mut url = Url::parse(url).expect("Invalid database URL"); From 9888fd13fc73a28f8b710eaea5849c0270d0f35b Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Fri, 4 Oct 2024 14:54:21 +0200 Subject: [PATCH 3/3] bin/monitor: Convert other fns to async --- src/bin/monitor.rs | 52 ++++++++++++++++++++++++++-------------------- 1 file changed, 30 insertions(+), 22 deletions(-) diff --git a/src/bin/monitor.rs b/src/bin/monitor.rs index 74fe6bb69f..0f45cb1f0c 100644 --- a/src/bin/monitor.rs +++ b/src/bin/monitor.rs @@ -11,18 +11,16 @@ use crates_io::{admin::on_call, db, schema::*}; use crates_io_env_vars::{var, var_parsed}; use crates_io_worker::BackgroundJob; use diesel::prelude::*; +use diesel_async::{AsyncPgConnection, RunQueryDsl}; #[tokio::main] async fn main() -> Result<()> { - spawn_blocking(move || { - let conn = &mut db::oneoff_connection()?; + let conn = &mut db::oneoff_async_connection().await?; - check_failing_background_jobs(conn)?; - check_stalled_update_downloads(conn)?; - check_spam_attack(conn)?; - Ok(()) - }) - .await + check_failing_background_jobs(conn).await?; + check_stalled_update_downloads(conn).await?; + check_spam_attack(conn).await?; + Ok(()) } /// Check for old background jobs that are not currently running. @@ -33,7 +31,7 @@ async fn main() -> Result<()> { /// /// Within the default 15 minute time, a job should have already had several /// failed retry attempts. -fn check_failing_background_jobs(conn: &mut PgConnection) -> Result<()> { +async fn check_failing_background_jobs(conn: &mut AsyncPgConnection) -> Result<()> { use diesel::dsl::*; use diesel::sql_types::Integer; @@ -50,7 +48,8 @@ fn check_failing_background_jobs(conn: &mut PgConnection) -> Result<()> { .filter(background_jobs::priority.ge(0)) .for_update() .skip_locked() - .load(conn)?; + .load(conn) + .await?; let stalled_job_count = stalled_jobs.len(); @@ -68,12 +67,13 @@ fn check_failing_background_jobs(conn: &mut PgConnection) -> Result<()> { } }; - log_and_trigger_event(event)?; + spawn_blocking(move || log_and_trigger_event(event)).await?; + Ok(()) } /// Check for an `update_downloads` job that has run longer than expected -fn check_stalled_update_downloads(conn: &mut PgConnection) -> Result<()> { +async fn check_stalled_update_downloads(conn: &mut AsyncPgConnection) -> Result<()> { use chrono::{DateTime, NaiveDateTime, Utc}; const EVENT_KEY: &str = "update_downloads_stalled"; @@ -86,28 +86,35 @@ fn check_stalled_update_downloads(conn: &mut PgConnection) -> Result<()> { let start_time: Result = background_jobs::table .filter(background_jobs::job_type.eq(jobs::UpdateDownloads::JOB_NAME)) .select(background_jobs::created_at) - .first(conn); + .first(conn) + .await; if let Ok(start_time) = start_time { let start_time = DateTime::::from_naive_utc_and_offset(start_time, Utc); let minutes = Utc::now().signed_duration_since(start_time).num_minutes(); if minutes > max_job_time { - return log_and_trigger_event(on_call::Event::Trigger { - incident_key: Some(EVENT_KEY.into()), - description: format!("update_downloads job running for {minutes} minutes"), - }); + return spawn_blocking(move || { + log_and_trigger_event(on_call::Event::Trigger { + incident_key: Some(EVENT_KEY.into()), + description: format!("update_downloads job running for {minutes} minutes"), + }) + }) + .await; } }; - log_and_trigger_event(on_call::Event::Resolve { - incident_key: EVENT_KEY.into(), - description: Some("No stalled update_downloads job".into()), + spawn_blocking(move || { + log_and_trigger_event(on_call::Event::Resolve { + incident_key: EVENT_KEY.into(), + description: Some("No stalled update_downloads job".into()), + }) }) + .await } /// Check for known spam patterns -fn check_spam_attack(conn: &mut PgConnection) -> Result<()> { +async fn check_spam_attack(conn: &mut AsyncPgConnection) -> Result<()> { use crates_io::sql::canon_crate_name; const EVENT_KEY: &str = "spam_attack"; @@ -126,6 +133,7 @@ fn check_spam_attack(conn: &mut PgConnection) -> Result<()> { .filter(canon_crate_name(crates::name).eq_any(bad_crate_names)) .select(crates::name) .first(conn) + .await .optional()?; if let Some(bad_crate) = bad_crate { @@ -144,7 +152,7 @@ fn check_spam_attack(conn: &mut PgConnection) -> Result<()> { } }; - log_and_trigger_event(event)?; + spawn_blocking(move || log_and_trigger_event(event)).await?; Ok(()) }