Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 174 additions & 30 deletions src/integration-tests/src/bigquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@
// limitations under the License.

use crate::Result;
use futures::stream::StreamExt;
use gax::paginator::ItemPaginator;
use rand::{Rng, distr::Alphanumeric};

const INSTANCE_LABEL: &str = "rust-sdk-integration-test";

pub async fn dataset_admin(
builder: bigquery::builder::dataset_service::ClientBuilder,
) -> Result<()> {
Expand All @@ -33,7 +37,7 @@ pub async fn dataset_admin(
};

let project_id = crate::project_id()?;
let client = builder.build().await?;
let client: bigquery::client::DatasetService = builder.build().await?;
cleanup_stale_datasets(&client, &project_id).await?;

let dataset_id = random_dataset_id();
Expand All @@ -48,7 +52,7 @@ pub async fn dataset_admin(
.set_dataset_reference(
bigquery::model::DatasetReference::new().set_dataset_id(&dataset_id),
)
.set_labels([("integration-test", "true")]),
.set_labels([(INSTANCE_LABEL, "true")]),
)
.send()
.await?;
Expand All @@ -59,11 +63,17 @@ pub async fn dataset_admin(
let list = client
.list_datasets()
.set_project_id(&project_id)
.send()
.await?;
println!("LIST DATASET = {} entries", list.datasets.len());
.set_filter(format!("labels.{INSTANCE_LABEL}"))
.by_item()
.into_stream();
let items = list.collect::<Vec<gax::Result<_>>>().await;
println!("LIST DATASET = {} entries", items.len());

assert!(list.datasets.iter().any(|v| v.id.contains(&dataset_id)));
assert!(
items
.iter()
.any(|v| v.as_ref().unwrap().id.contains(&dataset_id))
);

client
.delete_dataset()
Expand All @@ -89,23 +99,27 @@ async fn cleanup_stale_datasets(
let list = client
.list_datasets()
.set_project_id(project_id)
.set_filter("labels.integration-test:true")
.send()
.await?;
let pending_all_datasets = list
.datasets
.into_iter()
.filter_map(|v| {
if let Some(dataset_id) = extract_dataset_id(project_id, v.id) {
return Some(
client
.get_dataset()
.set_project_id(project_id)
.set_dataset_id(dataset_id)
.send(),
);
.set_filter(format!("labels.{INSTANCE_LABEL}"))
.by_item()
.into_stream();
let datasets = list.collect::<Vec<gax::Result<_>>>().await;

let pending_all_datasets = datasets
.iter()
.filter_map(|v| match v {
Ok(v) => {
if let Some(dataset_id) = extract_dataset_id(project_id, &v.id) {
return Some(
client
.get_dataset()
.set_project_id(project_id)
.set_dataset_id(dataset_id)
.send(),
);
}
None
}
None
Err(_) => None,
})
.collect::<Vec<_>>();

Expand All @@ -116,7 +130,7 @@ async fn cleanup_stale_datasets(
let dataset = r.unwrap();
if dataset
.labels
.get("integration-test")
.get(INSTANCE_LABEL)
.is_some_and(|v| v == "true")
&& dataset.creation_time < stale_deadline
{
Expand All @@ -131,7 +145,7 @@ async fn cleanup_stale_datasets(
let pending_deletion: Vec<_> = stale_datasets
.into_iter()
.filter_map(|ds| {
if let Some(dataset_id) = extract_dataset_id(project_id, ds.id) {
if let Some(dataset_id) = extract_dataset_id(project_id, &ds.id) {
return Some(
client
.delete_dataset()
Expand All @@ -151,16 +165,146 @@ async fn cleanup_stale_datasets(
}

fn random_dataset_id() -> String {
let rand_suffix: String = rand::rng()
let rand_suffix = random_id_suffix();
format!("rust_bq_test_dataset_{rand_suffix}")
}

fn random_job_id() -> String {
let rand_suffix = random_id_suffix();
format!("rust_bq_test_job_{rand_suffix}")
}

fn random_id_suffix() -> String {
rand::rng()
.sample_iter(&Alphanumeric)
.take(8)
.map(char::from)
.collect();

format!("rust_bq_test_dataset_{rand_suffix}")
.collect()
}

fn extract_dataset_id(project_id: &str, id: String) -> Option<String> {
id.strip_prefix(format!("projects/{project_id}").as_str())
fn extract_dataset_id(project_id: &str, id: &str) -> Option<String> {
id.strip_prefix(format!("{project_id}:").as_str())
.map(|v| v.to_string())
}

pub async fn job_service(builder: bigquery::builder::job_service::ClientBuilder) -> Result<()> {
// Enable a basic subscriber. Useful to troubleshoot problems and visually
// verify tracing is doing something.
#[cfg(feature = "log-integration-tests")]
let _guard = {
use tracing_subscriber::fmt::format::FmtSpan;
let subscriber = tracing_subscriber::fmt()
.with_level(true)
.with_thread_ids(true)
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
.finish();

tracing::subscriber::set_default(subscriber)
};

let project_id = crate::project_id()?;
let client: bigquery::client::JobService = builder.build().await?;
cleanup_stale_jobs(&client, &project_id).await?;

let job_id = random_job_id();
println!("CREATING JOB WITH ID: {job_id}");

let query = "SELECT 1 as one";
let job = client
.insert_job()
.set_project_id(&project_id)
.set_job(
bigquery::model::Job::new()
.set_job_reference(bigquery::model::JobReference::new().set_job_id(&job_id))
.set_configuration(
bigquery::model::JobConfiguration::new()
.set_labels([(INSTANCE_LABEL, "true")])
.set_query(bigquery::model::JobConfigurationQuery::new().set_query(query)),
),
)
.send()
.await?;
println!("CREATE JOB = {job:?}");

assert!(job.job_reference.is_some());

let list = client
.list_jobs()
.set_project_id(&project_id)
.by_item()
.into_stream();
let items = list.collect::<Vec<gax::Result<_>>>().await;
println!("LIST JOBS = {} entries", items.len());

assert!(
items
.iter()
.any(|v| v.as_ref().unwrap().id.contains(&job_id))
);

Ok(())
}

async fn cleanup_stale_jobs(client: &bigquery::client::JobService, project_id: &str) -> Result<()> {
use std::time::{Duration, SystemTime, UNIX_EPOCH};
let stale_deadline = SystemTime::now().duration_since(UNIX_EPOCH)?;
let stale_deadline = stale_deadline - Duration::from_secs(48 * 60 * 60);
let stale_deadline = stale_deadline.as_millis() as u64;

let list = client
.list_jobs()
.set_project_id(project_id)
.set_max_creation_time(stale_deadline)
.by_item()
.into_stream();
let items = list.collect::<Vec<gax::Result<_>>>().await;
println!("LIST JOBS = {} entries", items.len());

let pending_all_stale_jobs = items
.iter()
.filter_map(|v| match v {
Ok(v) => {
if let Some(job_reference) = &v.job_reference {
return Some(
client
.get_job()
.set_project_id(project_id)
.set_job_id(&job_reference.job_id)
.send(),
);
}
None
}
Err(_) => None,
})
.collect::<Vec<_>>();

let pending_deletion = futures::future::join_all(pending_all_stale_jobs)
.await
.into_iter()
.filter_map(|r| match r {
Ok(r) => {
let job_reference = r.job_reference?;
if r.configuration
.is_some_and(|c| c.labels.get(INSTANCE_LABEL).is_some_and(|v| v == "true"))
&& r.status.is_some_and(|s| s.state == "DONE")
{
return Some(
client
.delete_job()
.set_project_id(project_id)
.set_job_id(&job_reference.job_id)
.send(),
);
}
None
}
Err(_) => None,
})
.collect::<Vec<_>>();

println!("found {} stale test jobs", pending_deletion.len());

futures::future::join_all(pending_deletion).await;
Ok(())
}
12 changes: 11 additions & 1 deletion src/integration-tests/tests/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,24 @@ mod driver {

#[test_case(bigquery::client::DatasetService::builder().with_tracing().with_retry_policy(retry_policy()); "with [tracing, retry] enabled")]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn run_bigquery(
async fn run_bigquery_dataset_service(
builder: bigquery::builder::dataset_service::ClientBuilder,
) -> integration_tests::Result<()> {
integration_tests::bigquery::dataset_admin(builder)
.await
.map_err(integration_tests::report_error)
}

#[test_case(bigquery::client::JobService::builder().with_tracing().with_retry_policy(retry_policy()); "with [tracing, retry] enabled")]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn run_bigquery_job_service(
builder: bigquery::builder::job_service::ClientBuilder,
) -> integration_tests::Result<()> {
integration_tests::bigquery::job_service(builder)
.await
.map_err(integration_tests::report_error)
}

#[test_case(firestore::client::Firestore::builder(); "default")]
#[test_case(firestore::client::Firestore::builder().with_tracing(); "with tracing enabled")]
#[test_case(firestore::client::Firestore::builder().with_retry_policy(retry_policy()); "with retry enabled")]
Expand Down
Loading