Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: bgworker parallel database #1204

Merged
merged 1 commit into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -210,4 +210,5 @@ sqlx = { version = "0.8.2", default-features = false, features = [
"postgres",
"chrono",
"sqlite",
"migrate",
] }
117 changes: 26 additions & 91 deletions src/bgworker/pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,11 +489,8 @@ pub async fn create_provider(qcfg: &PostgresQueueConfig) -> Result<Queue> {
))
}

#[cfg(all(test, feature = "integration_test"))]
use serial_test::serial;
#[cfg(all(test, feature = "integration_test"))]
mod tests {

use chrono::{NaiveDate, NaiveTime, TimeZone};
use insta::{assert_debug_snapshot, with_settings};
use sqlx::{query_as, FromRow};
Expand Down Expand Up @@ -521,29 +518,6 @@ mod tests {
pub is_updatable: Option<String>,
}

async fn init() -> PgPool {
let qcfg = PostgresQueueConfig {
uri: std::env::var("DATABASE_URL")
.expect("environment variable should be exists 'DATABASE_URL'"),
dangerously_flush: false,
enable_logging: false,
max_connections: 1,
min_connections: 1,
connect_timeout: 500,
idle_timeout: 500,
poll_interval_sec: 1,
num_workers: 1,
};

let pool = connect(&qcfg).await.unwrap();
sqlx::raw_sql("DROP TABLE IF EXISTS pg_loco_queue;")
.execute(&pool)
.await
.expect("drop table if exists");

pool
}

async fn get_all_jobs(pool: &PgPool) -> Vec<Job> {
sqlx::query("select * from pg_loco_queue")
.fetch_all(pool)
Expand All @@ -564,11 +538,8 @@ mod tests {
.expect("job not found")
}

#[tokio::test]
#[serial]
async fn can_initialize_database() {
let pool = init().await;

#[sqlx::test]
async fn can_initialize_database(pool: PgPool) {
assert!(initialize_database(&pool).await.is_ok());

let table_info: Vec<TableInfo> = query_as::<_, TableInfo>(
Expand All @@ -582,11 +553,8 @@ mod tests {
assert_debug_snapshot!(table_info);
}

#[tokio::test]
#[serial]
async fn can_enqueue() {
let pool = init().await;

#[sqlx::test]
async fn can_enqueue(pool: PgPool) {
assert!(initialize_database(&pool).await.is_ok());

let jobs = get_all_jobs(&pool).await;
Expand Down Expand Up @@ -616,11 +584,8 @@ mod tests {
});
}

#[tokio::test]
#[serial]
async fn can_dequeue() {
let pool = init().await;

#[sqlx::test]
async fn can_dequeue(pool: PgPool) {
assert!(initialize_database(&pool).await.is_ok());

let run_at = Utc.from_utc_datetime(
Expand Down Expand Up @@ -662,11 +627,8 @@ mod tests {
});
}

#[tokio::test]
#[serial]
async fn can_complete_job_without_interval() {
let pool = init().await;

#[sqlx::test]
async fn can_complete_job_without_interval(pool: PgPool) {
assert!(initialize_database(&pool).await.is_ok());
tests_cfg::queue::postgres_seed_data(&pool).await;

Expand All @@ -680,11 +642,8 @@ mod tests {
assert_eq!(job.status, JobStatus::Completed);
}

#[tokio::test]
#[serial]
async fn can_complete_job_with_interval() {
let pool = init().await;

#[sqlx::test]
async fn can_complete_job_with_interval(pool: PgPool) {
assert!(initialize_database(&pool).await.is_ok());
tests_cfg::queue::postgres_seed_data(&pool).await;

Expand All @@ -711,11 +670,8 @@ mod tests {
});
}

#[tokio::test]
#[serial]
async fn can_fail_job() {
let pool = init().await;

#[sqlx::test]
async fn can_fail_job(pool: PgPool) {
assert!(initialize_database(&pool).await.is_ok());
tests_cfg::queue::postgres_seed_data(&pool).await;

Expand All @@ -741,11 +697,8 @@ mod tests {
});
}

#[tokio::test]
#[serial]
async fn can_cancel_job_by_name() {
let pool = init().await;

#[sqlx::test]
async fn can_cancel_job_by_name(pool: PgPool) {
assert!(initialize_database(&pool).await.is_ok());
tests_cfg::queue::postgres_seed_data(&pool).await;

Expand All @@ -770,11 +723,8 @@ mod tests {
assert_eq!(count_cancelled_jobs, 2);
}

#[tokio::test]
#[serial]
async fn can_clear() {
let pool = init().await;

#[sqlx::test]
async fn can_clear(pool: PgPool) {
assert!(initialize_database(&pool).await.is_ok());
tests_cfg::queue::postgres_seed_data(&pool).await;

Expand All @@ -794,11 +744,8 @@ mod tests {
assert_eq!(job_count, 0);
}

#[tokio::test]
#[serial]
async fn can_clear_by_status() {
let pool = init().await;

#[sqlx::test]
async fn can_clear_by_status(pool: PgPool) {
assert!(initialize_database(&pool).await.is_ok());
tests_cfg::queue::postgres_seed_data(&pool).await;

Expand Down Expand Up @@ -840,11 +787,8 @@ mod tests {
);
}

#[tokio::test]
#[serial]
async fn can_clear_jobs_older_than() {
let pool = init().await;

#[sqlx::test]
async fn can_clear_jobs_older_than(pool: PgPool) {
assert!(initialize_database(&pool).await.is_ok());

sqlx::query(
Expand All @@ -862,11 +806,8 @@ mod tests {
assert_eq!(get_all_jobs(&pool).await.len(), 2);
}

#[tokio::test]
#[serial]
async fn can_clear_jobs_older_than_with_status() {
let pool = init().await;

#[sqlx::test]
async fn can_clear_jobs_older_than_with_status(pool: PgPool) {
assert!(initialize_database(&pool).await.is_ok());

sqlx::query(
Expand All @@ -892,11 +833,8 @@ mod tests {
assert_eq!(get_all_jobs(&pool).await.len(), 3);
}

#[tokio::test]
#[serial]
async fn can_get_jobs() {
let pool = init().await;

#[sqlx::test]
async fn can_get_jobs(pool: PgPool) {
assert!(initialize_database(&pool).await.is_ok());
tests_cfg::queue::postgres_seed_data(&pool).await;

Expand Down Expand Up @@ -924,11 +862,8 @@ mod tests {
);
}

#[tokio::test]
#[serial]
async fn can_get_jobs_with_age() {
let pool = init().await;

#[sqlx::test]
async fn can_get_jobs_with_age(pool: PgPool) {
assert!(initialize_database(&pool).await.is_ok());

sqlx::query(
Expand Down
Loading