Skip to content

Commit

Permalink
bin/monitor: Convert other fns to async
Browse files Browse the repository at this point in the history
  • Loading branch information
Turbo87 committed Oct 4, 2024
1 parent dc60164 commit 9888fd1
Showing 1 changed file with 30 additions and 22 deletions.
52 changes: 30 additions & 22 deletions src/bin/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,16 @@ use crates_io::{admin::on_call, db, schema::*};
use crates_io_env_vars::{var, var_parsed};
use crates_io_worker::BackgroundJob;
use diesel::prelude::*;
use diesel_async::{AsyncPgConnection, RunQueryDsl};

#[tokio::main]
async fn main() -> Result<()> {
spawn_blocking(move || {
let conn = &mut db::oneoff_connection()?;
let conn = &mut db::oneoff_async_connection().await?;

Check warning on line 18 in src/bin/monitor.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/monitor.rs#L17-L18

Added lines #L17 - L18 were not covered by tests

check_failing_background_jobs(conn)?;
check_stalled_update_downloads(conn)?;
check_spam_attack(conn)?;
Ok(())
})
.await
check_failing_background_jobs(conn).await?;
check_stalled_update_downloads(conn).await?;
check_spam_attack(conn).await?;

Check warning on line 22 in src/bin/monitor.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/monitor.rs#L20-L22

Added lines #L20 - L22 were not covered by tests
Ok(())
}

/// Check for old background jobs that are not currently running.
Expand All @@ -33,7 +31,7 @@ async fn main() -> Result<()> {
///
/// Within the default 15 minute time, a job should have already had several
/// failed retry attempts.
fn check_failing_background_jobs(conn: &mut PgConnection) -> Result<()> {
async fn check_failing_background_jobs(conn: &mut AsyncPgConnection) -> Result<()> {

Check warning on line 34 in src/bin/monitor.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/monitor.rs#L34

Added line #L34 was not covered by tests
use diesel::dsl::*;
use diesel::sql_types::Integer;

Expand All @@ -50,7 +48,8 @@ fn check_failing_background_jobs(conn: &mut PgConnection) -> Result<()> {
.filter(background_jobs::priority.ge(0))
.for_update()
.skip_locked()
.load(conn)?;
.load(conn)
.await?;

Check warning on line 52 in src/bin/monitor.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/monitor.rs#L51-L52

Added lines #L51 - L52 were not covered by tests

let stalled_job_count = stalled_jobs.len();

Expand All @@ -68,12 +67,13 @@ fn check_failing_background_jobs(conn: &mut PgConnection) -> Result<()> {
}
};

log_and_trigger_event(event)?;
spawn_blocking(move || log_and_trigger_event(event)).await?;

Check warning on line 70 in src/bin/monitor.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/monitor.rs#L70

Added line #L70 was not covered by tests

Ok(())
}

/// Check for an `update_downloads` job that has run longer than expected
fn check_stalled_update_downloads(conn: &mut PgConnection) -> Result<()> {
async fn check_stalled_update_downloads(conn: &mut AsyncPgConnection) -> Result<()> {

Check warning on line 76 in src/bin/monitor.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/monitor.rs#L76

Added line #L76 was not covered by tests
use chrono::{DateTime, NaiveDateTime, Utc};

const EVENT_KEY: &str = "update_downloads_stalled";
Expand All @@ -86,28 +86,35 @@ fn check_stalled_update_downloads(conn: &mut PgConnection) -> Result<()> {
let start_time: Result<NaiveDateTime, _> = background_jobs::table
.filter(background_jobs::job_type.eq(jobs::UpdateDownloads::JOB_NAME))
.select(background_jobs::created_at)
.first(conn);
.first(conn)
.await;

Check warning on line 90 in src/bin/monitor.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/monitor.rs#L89-L90

Added lines #L89 - L90 were not covered by tests

if let Ok(start_time) = start_time {
let start_time = DateTime::<Utc>::from_naive_utc_and_offset(start_time, Utc);
let minutes = Utc::now().signed_duration_since(start_time).num_minutes();

if minutes > max_job_time {
return log_and_trigger_event(on_call::Event::Trigger {
incident_key: Some(EVENT_KEY.into()),
description: format!("update_downloads job running for {minutes} minutes"),
});
return spawn_blocking(move || {
log_and_trigger_event(on_call::Event::Trigger {
incident_key: Some(EVENT_KEY.into()),
description: format!("update_downloads job running for {minutes} minutes"),
})
})
.await;

Check warning on line 103 in src/bin/monitor.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/monitor.rs#L97-L103

Added lines #L97 - L103 were not covered by tests
}
};

log_and_trigger_event(on_call::Event::Resolve {
incident_key: EVENT_KEY.into(),
description: Some("No stalled update_downloads job".into()),
spawn_blocking(move || {
log_and_trigger_event(on_call::Event::Resolve {
incident_key: EVENT_KEY.into(),
description: Some("No stalled update_downloads job".into()),
})

Check warning on line 111 in src/bin/monitor.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/monitor.rs#L107-L111

Added lines #L107 - L111 were not covered by tests
})
.await

Check warning on line 113 in src/bin/monitor.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/monitor.rs#L113

Added line #L113 was not covered by tests
}

/// Check for known spam patterns
fn check_spam_attack(conn: &mut PgConnection) -> Result<()> {
async fn check_spam_attack(conn: &mut AsyncPgConnection) -> Result<()> {

Check warning on line 117 in src/bin/monitor.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/monitor.rs#L117

Added line #L117 was not covered by tests
use crates_io::sql::canon_crate_name;

const EVENT_KEY: &str = "spam_attack";
Expand All @@ -126,6 +133,7 @@ fn check_spam_attack(conn: &mut PgConnection) -> Result<()> {
.filter(canon_crate_name(crates::name).eq_any(bad_crate_names))
.select(crates::name)
.first(conn)
.await

Check warning on line 136 in src/bin/monitor.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/monitor.rs#L136

Added line #L136 was not covered by tests
.optional()?;

if let Some(bad_crate) = bad_crate {
Expand All @@ -144,7 +152,7 @@ fn check_spam_attack(conn: &mut PgConnection) -> Result<()> {
}
};

log_and_trigger_event(event)?;
spawn_blocking(move || log_and_trigger_event(event)).await?;

Check warning on line 155 in src/bin/monitor.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/monitor.rs#L155

Added line #L155 was not covered by tests
Ok(())
}

Expand Down

0 comments on commit 9888fd1

Please sign in to comment.