Skip to content

Commit

Permalink
Merge pull request #1204 from NexVeridian/per-test-case-database
Browse files Browse the repository at this point in the history
test: bgworker parallel database
  • Loading branch information
jondot authored Jan 22, 2025
2 parents 6888130 + 26f2097 commit 0fb982c
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 91 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -210,4 +210,5 @@ sqlx = { version = "0.8.2", default-features = false, features = [
"postgres",
"chrono",
"sqlite",
"migrate",
] }
117 changes: 26 additions & 91 deletions src/bgworker/pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,11 +489,8 @@ pub async fn create_provider(qcfg: &PostgresQueueConfig) -> Result<Queue> {
))
}

#[cfg(all(test, feature = "integration_test"))]
use serial_test::serial;
#[cfg(all(test, feature = "integration_test"))]
mod tests {

use chrono::{NaiveDate, NaiveTime, TimeZone};
use insta::{assert_debug_snapshot, with_settings};
use sqlx::{query_as, FromRow};
Expand Down Expand Up @@ -521,29 +518,6 @@ mod tests {
pub is_updatable: Option<String>,
}

async fn init() -> PgPool {
let qcfg = PostgresQueueConfig {
uri: std::env::var("DATABASE_URL")
.expect("environment variable should be exists 'DATABASE_URL'"),
dangerously_flush: false,
enable_logging: false,
max_connections: 1,
min_connections: 1,
connect_timeout: 500,
idle_timeout: 500,
poll_interval_sec: 1,
num_workers: 1,
};

let pool = connect(&qcfg).await.unwrap();
sqlx::raw_sql("DROP TABLE IF EXISTS pg_loco_queue;")
.execute(&pool)
.await
.expect("drop table if exists");

pool
}

async fn get_all_jobs(pool: &PgPool) -> Vec<Job> {
sqlx::query("select * from pg_loco_queue")
.fetch_all(pool)
Expand All @@ -564,11 +538,8 @@ mod tests {
.expect("job not found")
}

#[tokio::test]
#[serial]
async fn can_initialize_database() {
let pool = init().await;

#[sqlx::test]
async fn can_initialize_database(pool: PgPool) {
assert!(initialize_database(&pool).await.is_ok());

let table_info: Vec<TableInfo> = query_as::<_, TableInfo>(
Expand All @@ -582,11 +553,8 @@ mod tests {
assert_debug_snapshot!(table_info);
}

#[tokio::test]
#[serial]
async fn can_enqueue() {
let pool = init().await;

#[sqlx::test]
async fn can_enqueue(pool: PgPool) {
assert!(initialize_database(&pool).await.is_ok());

let jobs = get_all_jobs(&pool).await;
Expand Down Expand Up @@ -616,11 +584,8 @@ mod tests {
});
}

#[tokio::test]
#[serial]
async fn can_dequeue() {
let pool = init().await;

#[sqlx::test]
async fn can_dequeue(pool: PgPool) {
assert!(initialize_database(&pool).await.is_ok());

let run_at = Utc.from_utc_datetime(
Expand Down Expand Up @@ -662,11 +627,8 @@ mod tests {
});
}

#[tokio::test]
#[serial]
async fn can_complete_job_without_interval() {
let pool = init().await;

#[sqlx::test]
async fn can_complete_job_without_interval(pool: PgPool) {
assert!(initialize_database(&pool).await.is_ok());
tests_cfg::queue::postgres_seed_data(&pool).await;

Expand All @@ -680,11 +642,8 @@ mod tests {
assert_eq!(job.status, JobStatus::Completed);
}

#[tokio::test]
#[serial]
async fn can_complete_job_with_interval() {
let pool = init().await;

#[sqlx::test]
async fn can_complete_job_with_interval(pool: PgPool) {
assert!(initialize_database(&pool).await.is_ok());
tests_cfg::queue::postgres_seed_data(&pool).await;

Expand All @@ -711,11 +670,8 @@ mod tests {
});
}

#[tokio::test]
#[serial]
async fn can_fail_job() {
let pool = init().await;

#[sqlx::test]
async fn can_fail_job(pool: PgPool) {
assert!(initialize_database(&pool).await.is_ok());
tests_cfg::queue::postgres_seed_data(&pool).await;

Expand All @@ -741,11 +697,8 @@ mod tests {
});
}

#[tokio::test]
#[serial]
async fn can_cancel_job_by_name() {
let pool = init().await;

#[sqlx::test]
async fn can_cancel_job_by_name(pool: PgPool) {
assert!(initialize_database(&pool).await.is_ok());
tests_cfg::queue::postgres_seed_data(&pool).await;

Expand All @@ -770,11 +723,8 @@ mod tests {
assert_eq!(count_cancelled_jobs, 2);
}

#[tokio::test]
#[serial]
async fn can_clear() {
let pool = init().await;

#[sqlx::test]
async fn can_clear(pool: PgPool) {
assert!(initialize_database(&pool).await.is_ok());
tests_cfg::queue::postgres_seed_data(&pool).await;

Expand All @@ -794,11 +744,8 @@ mod tests {
assert_eq!(job_count, 0);
}

#[tokio::test]
#[serial]
async fn can_clear_by_status() {
let pool = init().await;

#[sqlx::test]
async fn can_clear_by_status(pool: PgPool) {
assert!(initialize_database(&pool).await.is_ok());
tests_cfg::queue::postgres_seed_data(&pool).await;

Expand Down Expand Up @@ -840,11 +787,8 @@ mod tests {
);
}

#[tokio::test]
#[serial]
async fn can_clear_jobs_older_than() {
let pool = init().await;

#[sqlx::test]
async fn can_clear_jobs_older_than(pool: PgPool) {
assert!(initialize_database(&pool).await.is_ok());

sqlx::query(
Expand All @@ -862,11 +806,8 @@ mod tests {
assert_eq!(get_all_jobs(&pool).await.len(), 2);
}

#[tokio::test]
#[serial]
async fn can_clear_jobs_older_than_with_status() {
let pool = init().await;

#[sqlx::test]
async fn can_clear_jobs_older_than_with_status(pool: PgPool) {
assert!(initialize_database(&pool).await.is_ok());

sqlx::query(
Expand All @@ -892,11 +833,8 @@ mod tests {
assert_eq!(get_all_jobs(&pool).await.len(), 3);
}

#[tokio::test]
#[serial]
async fn can_get_jobs() {
let pool = init().await;

#[sqlx::test]
async fn can_get_jobs(pool: PgPool) {
assert!(initialize_database(&pool).await.is_ok());
tests_cfg::queue::postgres_seed_data(&pool).await;

Expand Down Expand Up @@ -924,11 +862,8 @@ mod tests {
);
}

#[tokio::test]
#[serial]
async fn can_get_jobs_with_age() {
let pool = init().await;

#[sqlx::test]
async fn can_get_jobs_with_age(pool: PgPool) {
assert!(initialize_database(&pool).await.is_ok());

sqlx::query(
Expand Down

0 comments on commit 0fb982c

Please sign in to comment.