diff --git a/src/integration-tests/src/bigquery.rs b/src/integration-tests/src/bigquery.rs index fb2c453a3b..7752edea59 100644 --- a/src/integration-tests/src/bigquery.rs +++ b/src/integration-tests/src/bigquery.rs @@ -13,8 +13,12 @@ // limitations under the License. use crate::Result; +use futures::stream::StreamExt; +use gax::paginator::ItemPaginator; use rand::{Rng, distr::Alphanumeric}; +const INSTANCE_LABEL: &str = "rust-sdk-integration-test"; + pub async fn dataset_admin( builder: bigquery::builder::dataset_service::ClientBuilder, ) -> Result<()> { @@ -33,7 +37,7 @@ pub async fn dataset_admin( }; let project_id = crate::project_id()?; - let client = builder.build().await?; + let client: bigquery::client::DatasetService = builder.build().await?; cleanup_stale_datasets(&client, &project_id).await?; let dataset_id = random_dataset_id(); @@ -48,7 +52,7 @@ pub async fn dataset_admin( .set_dataset_reference( bigquery::model::DatasetReference::new().set_dataset_id(&dataset_id), ) - .set_labels([("integration-test", "true")]), + .set_labels([(INSTANCE_LABEL, "true")]), ) .send() .await?; @@ -59,11 +63,17 @@ pub async fn dataset_admin( let list = client .list_datasets() .set_project_id(&project_id) - .send() - .await?; - println!("LIST DATASET = {} entries", list.datasets.len()); + .set_filter(format!("labels.{INSTANCE_LABEL}")) + .by_item() + .into_stream(); + let items = list.collect::>>().await; + println!("LIST DATASET = {} entries", items.len()); - assert!(list.datasets.iter().any(|v| v.id.contains(&dataset_id))); + assert!( + items + .iter() + .any(|v| v.as_ref().unwrap().id.contains(&dataset_id)) + ); client .delete_dataset() @@ -89,23 +99,27 @@ async fn cleanup_stale_datasets( let list = client .list_datasets() .set_project_id(project_id) - .set_filter("labels.integration-test:true") - .send() - .await?; - let pending_all_datasets = list - .datasets - .into_iter() - .filter_map(|v| { - if let Some(dataset_id) = extract_dataset_id(project_id, v.id) { - return Some( - client - .get_dataset() - .set_project_id(project_id) - .set_dataset_id(dataset_id) - .send(), - ); + .set_filter(format!("labels.{INSTANCE_LABEL}")) + .by_item() + .into_stream(); + let datasets = list.collect::>>().await; + + let pending_all_datasets = datasets + .iter() + .filter_map(|v| match v { + Ok(v) => { + if let Some(dataset_id) = extract_dataset_id(project_id, &v.id) { + return Some( + client + .get_dataset() + .set_project_id(project_id) + .set_dataset_id(dataset_id) + .send(), + ); + } + None } - None + Err(_) => None, }) .collect::>(); @@ -116,7 +130,7 @@ async fn cleanup_stale_datasets( let dataset = r.unwrap(); if dataset .labels - .get("integration-test") + .get(INSTANCE_LABEL) .is_some_and(|v| v == "true") && dataset.creation_time < stale_deadline { @@ -131,7 +145,7 @@ async fn cleanup_stale_datasets( let pending_deletion: Vec<_> = stale_datasets .into_iter() .filter_map(|ds| { - if let Some(dataset_id) = extract_dataset_id(project_id, ds.id) { + if let Some(dataset_id) = extract_dataset_id(project_id, &ds.id) { return Some( client .delete_dataset() @@ -151,16 +165,146 @@ async fn cleanup_stale_datasets( } fn random_dataset_id() -> String { - let rand_suffix: String = rand::rng() + let rand_suffix = random_id_suffix(); + format!("rust_bq_test_dataset_{rand_suffix}") +} + +fn random_job_id() -> String { + let rand_suffix = random_id_suffix(); + format!("rust_bq_test_job_{rand_suffix}") +} + +fn random_id_suffix() -> String { + rand::rng() .sample_iter(&Alphanumeric) .take(8) .map(char::from) - .collect(); - - format!("rust_bq_test_dataset_{rand_suffix}") + .collect() } -fn extract_dataset_id(project_id: &str, id: String) -> Option { - id.strip_prefix(format!("projects/{project_id}").as_str()) +fn extract_dataset_id(project_id: &str, id: &str) -> Option { + id.strip_prefix(format!("{project_id}:").as_str()) .map(|v| v.to_string()) } + +pub async fn job_service(builder: bigquery::builder::job_service::ClientBuilder) -> Result<()> { + // Enable a basic subscriber. Useful to troubleshoot problems and visually + // verify tracing is doing something. + #[cfg(feature = "log-integration-tests")] + let _guard = { + use tracing_subscriber::fmt::format::FmtSpan; + let subscriber = tracing_subscriber::fmt() + .with_level(true) + .with_thread_ids(true) + .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE) + .finish(); + + tracing::subscriber::set_default(subscriber) + }; + + let project_id = crate::project_id()?; + let client: bigquery::client::JobService = builder.build().await?; + cleanup_stale_jobs(&client, &project_id).await?; + + let job_id = random_job_id(); + println!("CREATING JOB WITH ID: {job_id}"); + + let query = "SELECT 1 as one"; + let job = client + .insert_job() + .set_project_id(&project_id) + .set_job( + bigquery::model::Job::new() + .set_job_reference(bigquery::model::JobReference::new().set_job_id(&job_id)) + .set_configuration( + bigquery::model::JobConfiguration::new() + .set_labels([(INSTANCE_LABEL, "true")]) + .set_query(bigquery::model::JobConfigurationQuery::new().set_query(query)), + ), + ) + .send() + .await?; + println!("CREATE JOB = {job:?}"); + + assert!(job.job_reference.is_some()); + + let list = client + .list_jobs() + .set_project_id(&project_id) + .by_item() + .into_stream(); + let items = list.collect::>>().await; + println!("LIST JOBS = {} entries", items.len()); + + assert!( + items + .iter() + .any(|v| v.as_ref().unwrap().id.contains(&job_id)) + ); + + Ok(()) +} + +async fn cleanup_stale_jobs(client: &bigquery::client::JobService, project_id: &str) -> Result<()> { + use std::time::{Duration, SystemTime, UNIX_EPOCH}; + let stale_deadline = SystemTime::now().duration_since(UNIX_EPOCH)?; + let stale_deadline = stale_deadline - Duration::from_secs(48 * 60 * 60); + let stale_deadline = stale_deadline.as_millis() as u64; + + let list = client + .list_jobs() + .set_project_id(project_id) + .set_max_creation_time(stale_deadline) + .by_item() + .into_stream(); + let items = list.collect::>>().await; + println!("LIST JOBS = {} entries", items.len()); + + let pending_all_stale_jobs = items + .iter() + .filter_map(|v| match v { + Ok(v) => { + if let Some(job_reference) = &v.job_reference { + return Some( + client + .get_job() + .set_project_id(project_id) + .set_job_id(&job_reference.job_id) + .send(), + ); + } + None + } + Err(_) => None, + }) + .collect::>(); + + let pending_deletion = futures::future::join_all(pending_all_stale_jobs) + .await + .into_iter() + .filter_map(|r| match r { + Ok(r) => { + let job_reference = r.job_reference?; + if r.configuration + .is_some_and(|c| c.labels.get(INSTANCE_LABEL).is_some_and(|v| v == "true")) + && r.status.is_some_and(|s| s.state == "DONE") + { + return Some( + client + .delete_job() + .set_project_id(project_id) + .set_job_id(&job_reference.job_id) + .send(), + ); + } + None + } + Err(_) => None, + }) + .collect::>(); + + println!("found {} stale test jobs", pending_deletion.len()); + + futures::future::join_all(pending_deletion).await; + Ok(()) +} diff --git a/src/integration-tests/tests/driver.rs b/src/integration-tests/tests/driver.rs index 4919fba67d..a2037da4b2 100644 --- a/src/integration-tests/tests/driver.rs +++ b/src/integration-tests/tests/driver.rs @@ -27,7 +27,7 @@ mod driver { #[test_case(bigquery::client::DatasetService::builder().with_tracing().with_retry_policy(retry_policy()); "with [tracing, retry] enabled")] #[tokio::test(flavor = "multi_thread", worker_threads = 1)] - async fn run_bigquery( + async fn run_bigquery_dataset_service( builder: bigquery::builder::dataset_service::ClientBuilder, ) -> integration_tests::Result<()> { integration_tests::bigquery::dataset_admin(builder) @@ -35,6 +35,16 @@ mod driver { .map_err(integration_tests::report_error) } + #[test_case(bigquery::client::JobService::builder().with_tracing().with_retry_policy(retry_policy()); "with [tracing, retry] enabled")] + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn run_bigquery_job_service( + builder: bigquery::builder::job_service::ClientBuilder, + ) -> integration_tests::Result<()> { + integration_tests::bigquery::job_service(builder) + .await + .map_err(integration_tests::report_error) + } + #[test_case(firestore::client::Firestore::builder(); "default")] #[test_case(firestore::client::Firestore::builder().with_tracing(); "with tracing enabled")] #[test_case(firestore::client::Firestore::builder().with_retry_policy(retry_policy()); "with retry enabled")]