Skip to content

Commit

Permalink
Merge pull request #9578 from Turbo87/async-monitor
Browse files Browse the repository at this point in the history
bin/monitor: Convert to async
  • Loading branch information
Turbo87 authored Oct 4, 2024
2 parents b692905 + 9888fd1 commit 5faabd6
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 20 deletions.
51 changes: 32 additions & 19 deletions src/bin/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,21 @@
//! cargo run --bin monitor

use anyhow::Result;
use crates_io::tasks::spawn_blocking;
use crates_io::worker::jobs;
use crates_io::{admin::on_call, db, schema::*};
use crates_io_env_vars::{var, var_parsed};
use crates_io_worker::BackgroundJob;
use diesel::prelude::*;
use diesel_async::{AsyncPgConnection, RunQueryDsl};

fn main() -> Result<()> {
let conn = &mut db::oneoff_connection()?;
#[tokio::main]
async fn main() -> Result<()> {
let conn = &mut db::oneoff_async_connection().await?;

check_failing_background_jobs(conn)?;
check_stalled_update_downloads(conn)?;
check_spam_attack(conn)?;
check_failing_background_jobs(conn).await?;
check_stalled_update_downloads(conn).await?;
check_spam_attack(conn).await?;
Ok(())
}

Expand All @@ -28,7 +31,7 @@ fn main() -> Result<()> {
///
/// Within the default 15 minute time, a job should have already had several
/// failed retry attempts.
fn check_failing_background_jobs(conn: &mut PgConnection) -> Result<()> {
async fn check_failing_background_jobs(conn: &mut AsyncPgConnection) -> Result<()> {
use diesel::dsl::*;
use diesel::sql_types::Integer;

Expand All @@ -45,7 +48,8 @@ fn check_failing_background_jobs(conn: &mut PgConnection) -> Result<()> {
.filter(background_jobs::priority.ge(0))
.for_update()
.skip_locked()
.load(conn)?;
.load(conn)
.await?;

let stalled_job_count = stalled_jobs.len();

Expand All @@ -63,12 +67,13 @@ fn check_failing_background_jobs(conn: &mut PgConnection) -> Result<()> {
}
};

log_and_trigger_event(event)?;
spawn_blocking(move || log_and_trigger_event(event)).await?;

Ok(())
}

/// Check for an `update_downloads` job that has run longer than expected
fn check_stalled_update_downloads(conn: &mut PgConnection) -> Result<()> {
async fn check_stalled_update_downloads(conn: &mut AsyncPgConnection) -> Result<()> {
use chrono::{DateTime, NaiveDateTime, Utc};

const EVENT_KEY: &str = "update_downloads_stalled";
Expand All @@ -81,28 +86,35 @@ fn check_stalled_update_downloads(conn: &mut PgConnection) -> Result<()> {
let start_time: Result<NaiveDateTime, _> = background_jobs::table
.filter(background_jobs::job_type.eq(jobs::UpdateDownloads::JOB_NAME))
.select(background_jobs::created_at)
.first(conn);
.first(conn)
.await;

if let Ok(start_time) = start_time {
let start_time = DateTime::<Utc>::from_naive_utc_and_offset(start_time, Utc);
let minutes = Utc::now().signed_duration_since(start_time).num_minutes();

if minutes > max_job_time {
return log_and_trigger_event(on_call::Event::Trigger {
incident_key: Some(EVENT_KEY.into()),
description: format!("update_downloads job running for {minutes} minutes"),
});
return spawn_blocking(move || {
log_and_trigger_event(on_call::Event::Trigger {
incident_key: Some(EVENT_KEY.into()),
description: format!("update_downloads job running for {minutes} minutes"),
})
})
.await;
}
};

log_and_trigger_event(on_call::Event::Resolve {
incident_key: EVENT_KEY.into(),
description: Some("No stalled update_downloads job".into()),
spawn_blocking(move || {
log_and_trigger_event(on_call::Event::Resolve {
incident_key: EVENT_KEY.into(),
description: Some("No stalled update_downloads job".into()),
})
})
.await
}

/// Check for known spam patterns
fn check_spam_attack(conn: &mut PgConnection) -> Result<()> {
async fn check_spam_attack(conn: &mut AsyncPgConnection) -> Result<()> {
use crates_io::sql::canon_crate_name;

const EVENT_KEY: &str = "spam_attack";
Expand All @@ -121,6 +133,7 @@ fn check_spam_attack(conn: &mut PgConnection) -> Result<()> {
.filter(canon_crate_name(crates::name).eq_any(bad_crate_names))
.select(crates::name)
.first(conn)
.await
.optional()?;

if let Some(bad_crate) = bad_crate {
Expand All @@ -139,7 +152,7 @@ fn check_spam_attack(conn: &mut PgConnection) -> Result<()> {
}
};

log_and_trigger_event(event)?;
spawn_blocking(move || log_and_trigger_event(event)).await?;
Ok(())
}

Expand Down
14 changes: 13 additions & 1 deletion src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::certs::CRUNCHY;
use diesel::{Connection, ConnectionResult, PgConnection, QueryResult};
use diesel_async::pooled_connection::deadpool::{Hook, HookError};
use diesel_async::pooled_connection::ManagerConfig;
use diesel_async::{AsyncPgConnection, RunQueryDsl};
use diesel_async::{AsyncConnection, AsyncPgConnection, RunQueryDsl};
use native_tls::{Certificate, TlsConnector};
use postgres_native_tls::MakeTlsConnector;
use secrecy::ExposeSecret;
Expand All @@ -23,6 +23,18 @@ pub fn oneoff_connection() -> anyhow::Result<PgConnection> {
oneoff_connection_with_config(&config).map_err(Into::into)
}

pub async fn oneoff_async_connection_with_config(
config: &config::DatabasePools,
) -> ConnectionResult<AsyncPgConnection> {
let url = connection_url(config, config.primary.url.expose_secret());
AsyncPgConnection::establish(&url).await
}

pub async fn oneoff_async_connection() -> anyhow::Result<AsyncPgConnection> {
let config = config::DatabasePools::full_from_environment(&config::Base::from_environment()?)?;
Ok(oneoff_async_connection_with_config(&config).await?)
}

pub fn connection_url(config: &config::DatabasePools, url: &str) -> String {
let mut url = Url::parse(url).expect("Invalid database URL");

Expand Down

0 comments on commit 5faabd6

Please sign in to comment.