From c77dc67b55b9fe88abd4dbd38e309e545c5428e3 Mon Sep 17 00:00:00 2001 From: Phong Chuong Date: Mon, 15 Sep 2025 20:43:24 +0000 Subject: [PATCH 01/16] reduce flake for BQ IT that is caused by listDatasets pagination --- src/integration-tests/src/bigquery.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/integration-tests/src/bigquery.rs b/src/integration-tests/src/bigquery.rs index fb2c453a3b..583dcc47c1 100644 --- a/src/integration-tests/src/bigquery.rs +++ b/src/integration-tests/src/bigquery.rs @@ -15,6 +15,8 @@ use crate::Result; use rand::{Rng, distr::Alphanumeric}; +const INSTANCE_LABEL: &str = "rust-sdk-integration-test"; + pub async fn dataset_admin( builder: bigquery::builder::dataset_service::ClientBuilder, ) -> Result<()> { @@ -48,7 +50,7 @@ pub async fn dataset_admin( .set_dataset_reference( bigquery::model::DatasetReference::new().set_dataset_id(&dataset_id), ) - .set_labels([("integration-test", "true")]), + .set_labels([(INSTANCE_LABEL, "true")]), ) .send() .await?; @@ -59,6 +61,7 @@ pub async fn dataset_admin( let list = client .list_datasets() .set_project_id(&project_id) + .set_filter(format!("labels.{INSTANCE_LABEL}")) .send() .await?; println!("LIST DATASET = {} entries", list.datasets.len()); From f662c620c0cf0703b8af1141729ce8dfb40ad322 Mon Sep 17 00:00:00 2001 From: Phong Chuong Date: Tue, 16 Sep 2025 22:26:19 +0000 Subject: [PATCH 02/16] BQ pagination prototype & IT --- .../cloud/bigquery/v2/src/builder.rs | 23 ++++++++ src/generated/cloud/bigquery/v2/src/model.rs | 14 +++++ src/generated/cloud/sql/v1/src/builder.rs | 1 + src/generated/cloud/sql/v1/src/model.rs | 1 + src/integration-tests/src/bigquery.rs | 57 +++++++++++-------- src/integration-tests/src/sql.rs | 1 + 6 files changed, 73 insertions(+), 24 deletions(-) diff --git a/src/generated/cloud/bigquery/v2/src/builder.rs b/src/generated/cloud/bigquery/v2/src/builder.rs index 94dabef1f0..c0e35a6955 100644 --- a/src/generated/cloud/bigquery/v2/src/builder.rs +++ b/src/generated/cloud/bigquery/v2/src/builder.rs @@ -641,6 +641,29 @@ pub mod dataset_service { self.0.request.filter = v.into(); self } + + /// Streams each page in the collection. + pub fn by_page( + self, + ) -> impl gax::paginator::Paginator { + use std::clone::Clone; + let token = self.0.request.page_token.clone(); + let execute = move |token: String| { + let mut builder = self.clone(); + builder.0.request = builder.0.request.set_page_token(token); + builder.send() + }; + gax::paginator::internal::new_paginator(token, execute) + } + + /// Streams each item in the collection. + pub fn by_item( + self, + ) -> impl gax::paginator::ItemPaginator + { + use gax::paginator::Paginator; + self.by_page().items() + } } #[doc(hidden)] diff --git a/src/generated/cloud/bigquery/v2/src/model.rs b/src/generated/cloud/bigquery/v2/src/model.rs index 640f61f38a..83e844596e 100644 --- a/src/generated/cloud/bigquery/v2/src/model.rs +++ b/src/generated/cloud/bigquery/v2/src/model.rs @@ -5930,6 +5930,20 @@ impl wkt::message::Message for DatasetList { } } +#[doc(hidden)] +impl gax::paginator::internal::PageableResponse for DatasetList { + type PageItem = crate::model::ListFormatDataset; + + fn items(self) -> std::vec::Vec { + self.datasets + } + + fn next_page_token(&self) -> std::string::String { + use std::clone::Clone; + self.next_page_token.clone() + } +} + #[doc(hidden)] impl<'de> serde::de::Deserialize<'de> for DatasetList { fn deserialize(deserializer: D) -> std::result::Result diff --git a/src/generated/cloud/sql/v1/src/builder.rs b/src/generated/cloud/sql/v1/src/builder.rs index 057a8a018d..ac2e8e7aa5 100644 --- a/src/generated/cloud/sql/v1/src/builder.rs +++ b/src/generated/cloud/sql/v1/src/builder.rs @@ -365,6 +365,7 @@ pub mod sql_backup_runs_service { .map(gax::response::Response::into_body) } + // NOWNOW /// Streams each page in the collection. pub fn by_page( self, diff --git a/src/generated/cloud/sql/v1/src/model.rs b/src/generated/cloud/sql/v1/src/model.rs index f7af637303..662b2427aa 100644 --- a/src/generated/cloud/sql/v1/src/model.rs +++ b/src/generated/cloud/sql/v1/src/model.rs @@ -1625,6 +1625,7 @@ impl std::fmt::Debug for BackupRun { } } +// NOWNOW /// Backup run list results. #[derive(Clone, Default, PartialEq)] #[non_exhaustive] diff --git a/src/integration-tests/src/bigquery.rs b/src/integration-tests/src/bigquery.rs index 583dcc47c1..a140471c21 100644 --- a/src/integration-tests/src/bigquery.rs +++ b/src/integration-tests/src/bigquery.rs @@ -13,6 +13,8 @@ // limitations under the License. use crate::Result; +use futures::stream::StreamExt; +use gax::paginator::ItemPaginator; use rand::{Rng, distr::Alphanumeric}; const INSTANCE_LABEL: &str = "rust-sdk-integration-test"; @@ -35,7 +37,7 @@ pub async fn dataset_admin( }; let project_id = crate::project_id()?; - let client = builder.build().await?; + let client: bigquery::client::DatasetService = builder.build().await?; cleanup_stale_datasets(&client, &project_id).await?; let dataset_id = random_dataset_id(); @@ -62,11 +64,12 @@ pub async fn dataset_admin( .list_datasets() .set_project_id(&project_id) .set_filter(format!("labels.{INSTANCE_LABEL}")) - .send() - .await?; - println!("LIST DATASET = {} entries", list.datasets.len()); + .by_item() + .into_stream(); + let items = list.collect::>>().await; + println!("LIST DATASET = {} entries", items.len()); - assert!(list.datasets.iter().any(|v| v.id.contains(&dataset_id))); + assert!(items.iter().any(|v| v.as_ref().unwrap().id.contains(&dataset_id))); client .delete_dataset() @@ -92,23 +95,29 @@ async fn cleanup_stale_datasets( let list = client .list_datasets() .set_project_id(project_id) - .set_filter("labels.integration-test:true") - .send() - .await?; - let pending_all_datasets = list - .datasets - .into_iter() + .set_filter(format!("labels.{INSTANCE_LABEL}")) + .by_item() + .into_stream(); + let datasets = list.collect::>>().await; + + let pending_all_datasets = datasets + .iter() .filter_map(|v| { - if let Some(dataset_id) = extract_dataset_id(project_id, v.id) { - return Some( - client - .get_dataset() - .set_project_id(project_id) - .set_dataset_id(dataset_id) - .send(), - ); + match v { + Ok(v) => { + if let Some(dataset_id) = extract_dataset_id(project_id, &v.id) { + return Some( + client + .get_dataset() + .set_project_id(project_id) + .set_dataset_id(dataset_id) + .send(), + ); + } + None + } + Err(_) => None, } - None }) .collect::>(); @@ -119,7 +128,7 @@ async fn cleanup_stale_datasets( let dataset = r.unwrap(); if dataset .labels - .get("integration-test") + .get(INSTANCE_LABEL) .is_some_and(|v| v == "true") && dataset.creation_time < stale_deadline { @@ -134,7 +143,7 @@ async fn cleanup_stale_datasets( let pending_deletion: Vec<_> = stale_datasets .into_iter() .filter_map(|ds| { - if let Some(dataset_id) = extract_dataset_id(project_id, ds.id) { + if let Some(dataset_id) = extract_dataset_id(project_id, &ds.id) { return Some( client .delete_dataset() @@ -163,7 +172,7 @@ fn random_dataset_id() -> String { format!("rust_bq_test_dataset_{rand_suffix}") } -fn extract_dataset_id(project_id: &str, id: String) -> Option { - id.strip_prefix(format!("projects/{project_id}").as_str()) +fn extract_dataset_id(project_id: &str, id: &String) -> Option { + id.strip_prefix(format!("{project_id}:").as_str()) .map(|v| v.to_string()) } diff --git a/src/integration-tests/src/sql.rs b/src/integration-tests/src/sql.rs index 66af49ff74..02223cac2a 100644 --- a/src/integration-tests/src/sql.rs +++ b/src/integration-tests/src/sql.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +// NOWNOW use crate::Result; use futures::stream::StreamExt; use gax::paginator::ItemPaginator; From 39d210af2e7b3790cf27eb289f6a9c235afcbeb6 Mon Sep 17 00:00:00 2001 From: Phong Chuong Date: Tue, 16 Sep 2025 22:42:11 +0000 Subject: [PATCH 03/16] cargo fmt --- src/integration-tests/src/bigquery.rs | 32 ++++++++++++++------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/src/integration-tests/src/bigquery.rs b/src/integration-tests/src/bigquery.rs index a140471c21..b3c4b00ce1 100644 --- a/src/integration-tests/src/bigquery.rs +++ b/src/integration-tests/src/bigquery.rs @@ -69,7 +69,11 @@ pub async fn dataset_admin( let items = list.collect::>>().await; println!("LIST DATASET = {} entries", items.len()); - assert!(items.iter().any(|v| v.as_ref().unwrap().id.contains(&dataset_id))); + assert!( + items + .iter() + .any(|v| v.as_ref().unwrap().id.contains(&dataset_id)) + ); client .delete_dataset() @@ -102,22 +106,20 @@ async fn cleanup_stale_datasets( let pending_all_datasets = datasets .iter() - .filter_map(|v| { - match v { - Ok(v) => { - if let Some(dataset_id) = extract_dataset_id(project_id, &v.id) { - return Some( - client - .get_dataset() - .set_project_id(project_id) - .set_dataset_id(dataset_id) - .send(), - ); - } - None + .filter_map(|v| match v { + Ok(v) => { + if let Some(dataset_id) = extract_dataset_id(project_id, &v.id) { + return Some( + client + .get_dataset() + .set_project_id(project_id) + .set_dataset_id(dataset_id) + .send(), + ); } - Err(_) => None, + None } + Err(_) => None, }) .collect::>(); From 4056ef5b40ef910edd4b619ebdf1692e1241a066 Mon Sep 17 00:00:00 2001 From: Phong Chuong Date: Tue, 16 Sep 2025 22:59:21 +0000 Subject: [PATCH 04/16] use &str instead of &String --- src/integration-tests/src/bigquery.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/integration-tests/src/bigquery.rs b/src/integration-tests/src/bigquery.rs index b3c4b00ce1..0a7d57caa2 100644 --- a/src/integration-tests/src/bigquery.rs +++ b/src/integration-tests/src/bigquery.rs @@ -174,7 +174,7 @@ fn random_dataset_id() -> String { format!("rust_bq_test_dataset_{rand_suffix}") } -fn extract_dataset_id(project_id: &str, id: &String) -> Option { +fn extract_dataset_id(project_id: &str, id: &str) -> Option { id.strip_prefix(format!("{project_id}:").as_str()) .map(|v| v.to_string()) } From 7fd50ce0e03b1aeadab80c1793ac7b706b5b12cc Mon Sep 17 00:00:00 2001 From: Phong Chuong Date: Wed, 17 Sep 2025 14:09:20 +0000 Subject: [PATCH 05/16] sidekick generated bigquery --- .../cloud/bigquery/v2/src/builder.rs | 169 +++++++++++++++--- src/generated/cloud/bigquery/v2/src/model.rs | 56 ++++++ 2 files changed, 197 insertions(+), 28 deletions(-) diff --git a/src/generated/cloud/bigquery/v2/src/builder.rs b/src/generated/cloud/bigquery/v2/src/builder.rs index c0e35a6955..361a6db7eb 100644 --- a/src/generated/cloud/bigquery/v2/src/builder.rs +++ b/src/generated/cloud/bigquery/v2/src/builder.rs @@ -558,9 +558,13 @@ pub mod dataset_service { /// # use google_cloud_bigquery_v2::builder; /// use builder::dataset_service::ListDatasets; /// # tokio_test::block_on(async { + /// use gax::paginator::ItemPaginator; /// /// let builder = prepare_request_builder(); - /// let response = builder.send().await?; + /// let mut items = builder.by_item(); + /// while let Some(result) = items.next().await { + /// let item = result?; + /// } /// # gax::Result::<()>::Ok(()) }); /// /// fn prepare_request_builder() -> ListDatasets { @@ -598,6 +602,29 @@ pub mod dataset_service { .map(gax::response::Response::into_body) } + /// Streams each page in the collection. + pub fn by_page( + self, + ) -> impl gax::paginator::Paginator { + use std::clone::Clone; + let token = self.0.request.page_token.clone(); + let execute = move |token: String| { + let mut builder = self.clone(); + builder.0.request = builder.0.request.set_page_token(token); + builder.send() + }; + gax::paginator::internal::new_paginator(token, execute) + } + + /// Streams each item in the collection. + pub fn by_item( + self, + ) -> impl gax::paginator::ItemPaginator + { + use gax::paginator::Paginator; + self.by_page().items() + } + /// Sets the value of [project_id][crate::model::ListDatasetsRequest::project_id]. /// /// This is a **required** field for requests. @@ -641,29 +668,6 @@ pub mod dataset_service { self.0.request.filter = v.into(); self } - - /// Streams each page in the collection. - pub fn by_page( - self, - ) -> impl gax::paginator::Paginator { - use std::clone::Clone; - let token = self.0.request.page_token.clone(); - let execute = move |token: String| { - let mut builder = self.clone(); - builder.0.request = builder.0.request.set_page_token(token); - builder.send() - }; - gax::paginator::internal::new_paginator(token, execute) - } - - /// Streams each item in the collection. - pub fn by_item( - self, - ) -> impl gax::paginator::ItemPaginator - { - use gax::paginator::Paginator; - self.by_page().items() - } } #[doc(hidden)] @@ -1137,9 +1141,13 @@ pub mod job_service { /// # use google_cloud_bigquery_v2::builder; /// use builder::job_service::ListJobs; /// # tokio_test::block_on(async { + /// use gax::paginator::ItemPaginator; /// /// let builder = prepare_request_builder(); - /// let response = builder.send().await?; + /// let mut items = builder.by_item(); + /// while let Some(result) = items.next().await { + /// let item = result?; + /// } /// # gax::Result::<()>::Ok(()) }); /// /// fn prepare_request_builder() -> ListJobs { @@ -1177,6 +1185,28 @@ pub mod job_service { .map(gax::response::Response::into_body) } + /// Streams each page in the collection. + pub fn by_page( + self, + ) -> impl gax::paginator::Paginator { + use std::clone::Clone; + let token = self.0.request.page_token.clone(); + let execute = move |token: String| { + let mut builder = self.clone(); + builder.0.request = builder.0.request.set_page_token(token); + builder.send() + }; + gax::paginator::internal::new_paginator(token, execute) + } + + /// Streams each item in the collection. + pub fn by_item( + self, + ) -> impl gax::paginator::ItemPaginator { + use gax::paginator::Paginator; + self.by_page().items() + } + /// Sets the value of [project_id][crate::model::ListJobsRequest::project_id]. pub fn set_project_id>(mut self, v: T) -> Self { self.0.request.project_id = v.into(); @@ -1651,9 +1681,13 @@ pub mod model_service { /// # use google_cloud_bigquery_v2::builder; /// use builder::model_service::ListModels; /// # tokio_test::block_on(async { + /// use gax::paginator::ItemPaginator; /// /// let builder = prepare_request_builder(); - /// let response = builder.send().await?; + /// let mut items = builder.by_item(); + /// while let Some(result) = items.next().await { + /// let item = result?; + /// } /// # gax::Result::<()>::Ok(()) }); /// /// fn prepare_request_builder() -> ListModels { @@ -1691,6 +1725,30 @@ pub mod model_service { .map(gax::response::Response::into_body) } + /// Streams each page in the collection. + pub fn by_page( + self, + ) -> impl gax::paginator::Paginator + { + use std::clone::Clone; + let token = self.0.request.page_token.clone(); + let execute = move |token: String| { + let mut builder = self.clone(); + builder.0.request = builder.0.request.set_page_token(token); + builder.send() + }; + gax::paginator::internal::new_paginator(token, execute) + } + + /// Streams each item in the collection. + pub fn by_item( + self, + ) -> impl gax::paginator::ItemPaginator + { + use gax::paginator::Paginator; + self.by_page().items() + } + /// Sets the value of [project_id][crate::model::ListModelsRequest::project_id]. /// /// This is a **required** field for requests. @@ -2460,9 +2518,13 @@ pub mod routine_service { /// # use google_cloud_bigquery_v2::builder; /// use builder::routine_service::ListRoutines; /// # tokio_test::block_on(async { + /// use gax::paginator::ItemPaginator; /// /// let builder = prepare_request_builder(); - /// let response = builder.send().await?; + /// let mut items = builder.by_item(); + /// while let Some(result) = items.next().await { + /// let item = result?; + /// } /// # gax::Result::<()>::Ok(()) }); /// /// fn prepare_request_builder() -> ListRoutines { @@ -2500,6 +2562,30 @@ pub mod routine_service { .map(gax::response::Response::into_body) } + /// Streams each page in the collection. + pub fn by_page( + self, + ) -> impl gax::paginator::Paginator + { + use std::clone::Clone; + let token = self.0.request.page_token.clone(); + let execute = move |token: String| { + let mut builder = self.clone(); + builder.0.request = builder.0.request.set_page_token(token); + builder.send() + }; + gax::paginator::internal::new_paginator(token, execute) + } + + /// Streams each item in the collection. + pub fn by_item( + self, + ) -> impl gax::paginator::ItemPaginator + { + use gax::paginator::Paginator; + self.by_page().items() + } + /// Sets the value of [project_id][crate::model::ListRoutinesRequest::project_id]. /// /// This is a **required** field for requests. @@ -3816,9 +3902,13 @@ pub mod table_service { /// # use google_cloud_bigquery_v2::builder; /// use builder::table_service::ListTables; /// # tokio_test::block_on(async { + /// use gax::paginator::ItemPaginator; /// /// let builder = prepare_request_builder(); - /// let response = builder.send().await?; + /// let mut items = builder.by_item(); + /// while let Some(result) = items.next().await { + /// let item = result?; + /// } /// # gax::Result::<()>::Ok(()) }); /// /// fn prepare_request_builder() -> ListTables { @@ -3856,6 +3946,29 @@ pub mod table_service { .map(gax::response::Response::into_body) } + /// Streams each page in the collection. + pub fn by_page( + self, + ) -> impl gax::paginator::Paginator { + use std::clone::Clone; + let token = self.0.request.page_token.clone(); + let execute = move |token: String| { + let mut builder = self.clone(); + builder.0.request = builder.0.request.set_page_token(token); + builder.send() + }; + gax::paginator::internal::new_paginator(token, execute) + } + + /// Streams each item in the collection. + pub fn by_item( + self, + ) -> impl gax::paginator::ItemPaginator + { + use gax::paginator::Paginator; + self.by_page().items() + } + /// Sets the value of [project_id][crate::model::ListTablesRequest::project_id]. /// /// This is a **required** field for requests. diff --git a/src/generated/cloud/bigquery/v2/src/model.rs b/src/generated/cloud/bigquery/v2/src/model.rs index 83e844596e..cbecff1256 100644 --- a/src/generated/cloud/bigquery/v2/src/model.rs +++ b/src/generated/cloud/bigquery/v2/src/model.rs @@ -14479,6 +14479,20 @@ impl wkt::message::Message for JobList { } } +#[doc(hidden)] +impl gax::paginator::internal::PageableResponse for JobList { + type PageItem = crate::model::ListFormatJob; + + fn items(self) -> std::vec::Vec { + self.jobs + } + + fn next_page_token(&self) -> std::string::String { + use std::clone::Clone; + self.next_page_token.clone() + } +} + #[doc(hidden)] impl<'de> serde::de::Deserialize<'de> for JobList { fn deserialize(deserializer: D) -> std::result::Result @@ -65673,6 +65687,20 @@ impl wkt::message::Message for ListModelsResponse { } } +#[doc(hidden)] +impl gax::paginator::internal::PageableResponse for ListModelsResponse { + type PageItem = crate::model::Model; + + fn items(self) -> std::vec::Vec { + self.models + } + + fn next_page_token(&self) -> std::string::String { + use std::clone::Clone; + self.next_page_token.clone() + } +} + #[doc(hidden)] impl<'de> serde::de::Deserialize<'de> for ListModelsResponse { fn deserialize(deserializer: D) -> std::result::Result @@ -74667,6 +74695,20 @@ impl wkt::message::Message for ListRoutinesResponse { } } +#[doc(hidden)] +impl gax::paginator::internal::PageableResponse for ListRoutinesResponse { + type PageItem = crate::model::Routine; + + fn items(self) -> std::vec::Vec { + self.routines + } + + fn next_page_token(&self) -> std::string::String { + use std::clone::Clone; + self.next_page_token.clone() + } +} + #[doc(hidden)] impl<'de> serde::de::Deserialize<'de> for ListRoutinesResponse { fn deserialize(deserializer: D) -> std::result::Result @@ -85412,6 +85454,20 @@ impl wkt::message::Message for TableList { } } +#[doc(hidden)] +impl gax::paginator::internal::PageableResponse for TableList { + type PageItem = crate::model::ListFormatTable; + + fn items(self) -> std::vec::Vec { + self.tables + } + + fn next_page_token(&self) -> std::string::String { + use std::clone::Clone; + self.next_page_token.clone() + } +} + #[doc(hidden)] impl<'de> serde::de::Deserialize<'de> for TableList { fn deserialize(deserializer: D) -> std::result::Result From 05bf8829c81d1dc296ced71274b90446ee26f73a Mon Sep 17 00:00:00 2001 From: Phong Chuong Date: Wed, 17 Sep 2025 17:50:40 +0000 Subject: [PATCH 06/16] bigquery job service IT --- src/integration-tests/src/bigquery.rs | 153 +++++++++++++++++++++++++- src/integration-tests/tests/driver.rs | 12 +- 2 files changed, 160 insertions(+), 5 deletions(-) diff --git a/src/integration-tests/src/bigquery.rs b/src/integration-tests/src/bigquery.rs index 0a7d57caa2..312d487040 100644 --- a/src/integration-tests/src/bigquery.rs +++ b/src/integration-tests/src/bigquery.rs @@ -165,16 +165,161 @@ async fn cleanup_stale_datasets( } fn random_dataset_id() -> String { - let rand_suffix: String = rand::rng() + let rand_suffix = random_id_suffix(); + format!("rust_bq_test_dataset_{rand_suffix}") +} + +fn random_job_id() -> String { + let rand_suffix = random_id_suffix(); + format!("rust_bq_test_job_{rand_suffix}") +} + +fn random_id_suffix() -> String { + rand::rng() .sample_iter(&Alphanumeric) .take(8) .map(char::from) - .collect(); - - format!("rust_bq_test_dataset_{rand_suffix}") + .collect() } fn extract_dataset_id(project_id: &str, id: &str) -> Option { id.strip_prefix(format!("{project_id}:").as_str()) .map(|v| v.to_string()) } + +pub async fn job_service(builder: bigquery::builder::job_service::ClientBuilder) -> Result<()> { + // Enable a basic subscriber. Useful to troubleshoot problems and visually + // verify tracing is doing something. + #[cfg(feature = "log-integration-tests")] + let _guard = { + use tracing_subscriber::fmt::format::FmtSpan; + let subscriber = tracing_subscriber::fmt() + .with_level(true) + .with_thread_ids(true) + .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE) + .finish(); + + tracing::subscriber::set_default(subscriber) + }; + + let project_id = crate::project_id()?; + let client: bigquery::client::JobService = builder.build().await?; + cleanup_stale_jobs(&client, &project_id).await?; + + let job_id = random_job_id(); + println!("CREATING JOB WITH ID: {job_id}"); + + let query = "SELECT 1 as one"; + let job = client + .insert_job() + .set_project_id(&project_id) + .set_job( + bigquery::model::Job::new() + .set_job_reference(bigquery::model::JobReference::new().set_job_id(&job_id)) + .set_configuration( + bigquery::model::JobConfiguration::new() + .set_labels([(INSTANCE_LABEL, "true")]) + .set_query(bigquery::model::JobConfigurationQuery::new().set_query(query)), + ), + ) + .send() + .await?; + println!("CREATE JOB = {job:?}"); + + assert!(job.job_reference.is_some()); + + let list = client + .list_jobs() + .set_project_id(&project_id) + .by_item() + .into_stream(); + let items = list.collect::>>().await; + println!("LIST JOBS = {} entries", items.len()); + + assert!( + items + .iter() + .any(|v| v.as_ref().unwrap().id.contains(&job_id)) + ); + + client + .delete_job() + .set_project_id(&project_id) + .set_job_id(&job_id) + .send() + .await?; + println!("DELETE JOB"); + + Ok(()) +} + +async fn cleanup_stale_jobs(client: &bigquery::client::JobService, project_id: &str) -> Result<()> { + use std::time::{Duration, SystemTime, UNIX_EPOCH}; + let stale_deadline = SystemTime::now().duration_since(UNIX_EPOCH)?; + let stale_deadline = stale_deadline - Duration::from_secs(48 * 60 * 60); + let stale_deadline = stale_deadline.as_millis() as u64; + + let list = client + .list_jobs() + .set_project_id(project_id) + .set_max_creation_time(stale_deadline) + .by_item() + .into_stream(); + let items = list.collect::>>().await; + println!("LIST JOBS = {} entries", items.len()); + + let pending_all_stale_jobs = items + .iter() + .filter_map(|v| match v { + Ok(v) => { + if let Some(job_reference) = &v.job_reference { + return Some( + client + .get_job() + .set_project_id(project_id) + .set_job_id(&job_reference.job_id) + .send(), + ); + } + None + } + Err(_) => None, + }) + .collect::>(); + + let pending_deletion = futures::future::join_all(pending_all_stale_jobs) + .await + .into_iter() + .filter_map(|r| { + match r { + Ok(r) => { + // Only delete jobs with testing label. + if let Some(job_reference) = &r.job_reference + && let Some(configuration) = &r.configuration + { + if configuration + .labels + .get(INSTANCE_LABEL) + .is_some_and(|v| v == "true") + { + return Some( + client + .delete_job() + .set_project_id(project_id) + .set_job_id(&job_reference.job_id) + .send(), + ); + } + } + None + } + Err(_) => None, + } + }) + .collect::>(); + + println!("found {} stale test jobs", pending_deletion.len()); + + futures::future::join_all(pending_deletion).await; + Ok(()) +} diff --git a/src/integration-tests/tests/driver.rs b/src/integration-tests/tests/driver.rs index 4919fba67d..a2037da4b2 100644 --- a/src/integration-tests/tests/driver.rs +++ b/src/integration-tests/tests/driver.rs @@ -27,7 +27,7 @@ mod driver { #[test_case(bigquery::client::DatasetService::builder().with_tracing().with_retry_policy(retry_policy()); "with [tracing, retry] enabled")] #[tokio::test(flavor = "multi_thread", worker_threads = 1)] - async fn run_bigquery( + async fn run_bigquery_dataset_service( builder: bigquery::builder::dataset_service::ClientBuilder, ) -> integration_tests::Result<()> { integration_tests::bigquery::dataset_admin(builder) @@ -35,6 +35,16 @@ mod driver { .map_err(integration_tests::report_error) } + #[test_case(bigquery::client::JobService::builder().with_tracing().with_retry_policy(retry_policy()); "with [tracing, retry] enabled")] + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn run_bigquery_job_service( + builder: bigquery::builder::job_service::ClientBuilder, + ) -> integration_tests::Result<()> { + integration_tests::bigquery::job_service(builder) + .await + .map_err(integration_tests::report_error) + } + #[test_case(firestore::client::Firestore::builder(); "default")] #[test_case(firestore::client::Firestore::builder().with_tracing(); "with tracing enabled")] #[test_case(firestore::client::Firestore::builder().with_retry_policy(retry_policy()); "with retry enabled")] From 2d6b6e69b394c3051758a9f1dd9f985e18c1a2c6 Mon Sep 17 00:00:00 2001 From: Phong Chuong Date: Wed, 17 Sep 2025 17:59:24 +0000 Subject: [PATCH 07/16] sidekick refreshall --- src/generated/cloud/sql/v1/src/builder.rs | 1 - src/generated/cloud/sql/v1/src/model.rs | 1 - 2 files changed, 2 deletions(-) diff --git a/src/generated/cloud/sql/v1/src/builder.rs b/src/generated/cloud/sql/v1/src/builder.rs index ac2e8e7aa5..057a8a018d 100644 --- a/src/generated/cloud/sql/v1/src/builder.rs +++ b/src/generated/cloud/sql/v1/src/builder.rs @@ -365,7 +365,6 @@ pub mod sql_backup_runs_service { .map(gax::response::Response::into_body) } - // NOWNOW /// Streams each page in the collection. pub fn by_page( self, diff --git a/src/generated/cloud/sql/v1/src/model.rs b/src/generated/cloud/sql/v1/src/model.rs index 662b2427aa..f7af637303 100644 --- a/src/generated/cloud/sql/v1/src/model.rs +++ b/src/generated/cloud/sql/v1/src/model.rs @@ -1625,7 +1625,6 @@ impl std::fmt::Debug for BackupRun { } } -// NOWNOW /// Backup run list results. #[derive(Clone, Default, PartialEq)] #[non_exhaustive] From 6889c17e8faf3ee7d4c49c4156c9f43b749ef10b Mon Sep 17 00:00:00 2001 From: Phong Chuong Date: Wed, 17 Sep 2025 18:13:37 +0000 Subject: [PATCH 08/16] fix unstable compile for integration test --- src/integration-tests/src/bigquery.rs | 30 +++++++++++++-------------- src/integration-tests/src/sql.rs | 1 - 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/src/integration-tests/src/bigquery.rs b/src/integration-tests/src/bigquery.rs index 312d487040..b41bad408b 100644 --- a/src/integration-tests/src/bigquery.rs +++ b/src/integration-tests/src/bigquery.rs @@ -294,21 +294,21 @@ async fn cleanup_stale_jobs(client: &bigquery::client::JobService, project_id: & match r { Ok(r) => { // Only delete jobs with testing label. - if let Some(job_reference) = &r.job_reference - && let Some(configuration) = &r.configuration - { - if configuration - .labels - .get(INSTANCE_LABEL) - .is_some_and(|v| v == "true") - { - return Some( - client - .delete_job() - .set_project_id(project_id) - .set_job_id(&job_reference.job_id) - .send(), - ); + if let Some(job_reference) = &r.job_reference { + if let Some(configuration) = &r.configuration { + if configuration + .labels + .get(INSTANCE_LABEL) + .is_some_and(|v| v == "true") + { + return Some( + client + .delete_job() + .set_project_id(project_id) + .set_job_id(&job_reference.job_id) + .send(), + ); + } } } None diff --git a/src/integration-tests/src/sql.rs b/src/integration-tests/src/sql.rs index 02223cac2a..66af49ff74 100644 --- a/src/integration-tests/src/sql.rs +++ b/src/integration-tests/src/sql.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -// NOWNOW use crate::Result; use futures::stream::StreamExt; use gax::paginator::ItemPaginator; From 564901587162157e9414657e914b2dfed2aaa905 Mon Sep 17 00:00:00 2001 From: Phong Chuong Date: Wed, 17 Sep 2025 18:48:06 +0000 Subject: [PATCH 09/16] collapse nested if block --- src/integration-tests/src/bigquery.rs | 30 +++++++++++++-------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/src/integration-tests/src/bigquery.rs b/src/integration-tests/src/bigquery.rs index b41bad408b..49b65ed00d 100644 --- a/src/integration-tests/src/bigquery.rs +++ b/src/integration-tests/src/bigquery.rs @@ -294,22 +294,20 @@ async fn cleanup_stale_jobs(client: &bigquery::client::JobService, project_id: & match r { Ok(r) => { // Only delete jobs with testing label. - if let Some(job_reference) = &r.job_reference { - if let Some(configuration) = &r.configuration { - if configuration - .labels - .get(INSTANCE_LABEL) - .is_some_and(|v| v == "true") - { - return Some( - client - .delete_job() - .set_project_id(project_id) - .set_job_id(&job_reference.job_id) - .send(), - ); - } - } + if let Some(job_reference) = &r.job_reference + && let Some(configuration) = &r.configuration + && configuration + .labels + .get(INSTANCE_LABEL) + .is_some_and(|v| v == "true") + { + return Some( + client + .delete_job() + .set_project_id(project_id) + .set_job_id(&job_reference.job_id) + .send(), + ); } None } From 527001196876c4b350a16c662b364ca482f44f82 Mon Sep 17 00:00:00 2001 From: Phong Chuong Date: Wed, 17 Sep 2025 18:57:19 +0000 Subject: [PATCH 10/16] Change first if let to match --- src/integration-tests/src/bigquery.rs | 38 ++++++++++++++++----------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/src/integration-tests/src/bigquery.rs b/src/integration-tests/src/bigquery.rs index 49b65ed00d..1cac812e96 100644 --- a/src/integration-tests/src/bigquery.rs +++ b/src/integration-tests/src/bigquery.rs @@ -293,23 +293,29 @@ async fn cleanup_stale_jobs(client: &bigquery::client::JobService, project_id: & .filter_map(|r| { match r { Ok(r) => { - // Only delete jobs with testing label. - if let Some(job_reference) = &r.job_reference - && let Some(configuration) = &r.configuration - && configuration - .labels - .get(INSTANCE_LABEL) - .is_some_and(|v| v == "true") - { - return Some( - client - .delete_job() - .set_project_id(project_id) - .set_job_id(&job_reference.job_id) - .send(), - ); + // Rust 1.85 does not like if let chains. + // See: https://github.com/rust-lang/rust/issues/53667 + match &r.job_reference { + Some(job_reference) => { + // Only delete jobs with testing label. + if let Some(configuration) = &r.configuration + && configuration + .labels + .get(INSTANCE_LABEL) + .is_some_and(|v| v == "true") + { + return Some( + client + .delete_job() + .set_project_id(project_id) + .set_job_id(&job_reference.job_id) + .send(), + ); + } + None + } + None => None, } - None } Err(_) => None, } From 89ba59fcc141b7859edfbb41520dc174c7a6bc21 Mon Sep 17 00:00:00 2001 From: Phong Chuong Date: Wed, 17 Sep 2025 19:46:11 +0000 Subject: [PATCH 11/16] if match if test --- src/integration-tests/src/bigquery.rs | 34 +++++++++++++-------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/src/integration-tests/src/bigquery.rs b/src/integration-tests/src/bigquery.rs index 1cac812e96..139e03fa49 100644 --- a/src/integration-tests/src/bigquery.rs +++ b/src/integration-tests/src/bigquery.rs @@ -293,29 +293,29 @@ async fn cleanup_stale_jobs(client: &bigquery::client::JobService, project_id: & .filter_map(|r| { match r { Ok(r) => { - // Rust 1.85 does not like if let chains. - // See: https://github.com/rust-lang/rust/issues/53667 - match &r.job_reference { - Some(job_reference) => { - // Only delete jobs with testing label. - if let Some(configuration) = &r.configuration - && configuration + if let Some(job_reference) = &r.job_reference { + // Use match here as Rust 1.85 does not like if let chains. + // See: https://github.com/rust-lang/rust/issues/53667 + match &r.configuration { + Some(configuration) => { + if configuration .labels .get(INSTANCE_LABEL) .is_some_and(|v| v == "true") - { - return Some( - client - .delete_job() - .set_project_id(project_id) - .set_job_id(&job_reference.job_id) - .send(), - ); + { + return Some( + client + .delete_job() + .set_project_id(project_id) + .set_job_id(&job_reference.job_id) + .send(), + ); + } } - None + None => (), } - None => None, } + None } Err(_) => None, } From 5aacd1753346fcababca5006a24dbc25ee6f88f4 Mon Sep 17 00:00:00 2001 From: Phong Chuong Date: Wed, 17 Sep 2025 20:03:30 +0000 Subject: [PATCH 12/16] if if if test --- src/integration-tests/src/bigquery.rs | 43 +++++++++++---------------- 1 file changed, 18 insertions(+), 25 deletions(-) diff --git a/src/integration-tests/src/bigquery.rs b/src/integration-tests/src/bigquery.rs index 139e03fa49..d140113c20 100644 --- a/src/integration-tests/src/bigquery.rs +++ b/src/integration-tests/src/bigquery.rs @@ -290,35 +290,28 @@ async fn cleanup_stale_jobs(client: &bigquery::client::JobService, project_id: & let pending_deletion = futures::future::join_all(pending_all_stale_jobs) .await .into_iter() - .filter_map(|r| { - match r { - Ok(r) => { - if let Some(job_reference) = &r.job_reference { - // Use match here as Rust 1.85 does not like if let chains. - // See: https://github.com/rust-lang/rust/issues/53667 - match &r.configuration { - Some(configuration) => { - if configuration - .labels - .get(INSTANCE_LABEL) - .is_some_and(|v| v == "true") - { - return Some( - client - .delete_job() - .set_project_id(project_id) - .set_job_id(&job_reference.job_id) - .send(), - ); - } - } - None => (), + .filter_map(|r| match r { + Ok(r) => { + if let Some(job_reference) = &r.job_reference { + if let Some(configuration) = &r.configuration { + if configuration + .labels + .get(INSTANCE_LABEL) + .is_some_and(|v| v == "true") + { + return Some( + client + .delete_job() + .set_project_id(project_id) + .set_job_id(&job_reference.job_id) + .send(), + ); } } - None } - Err(_) => None, + None } + Err(_) => None, }) .collect::>(); From b0f4f5d874f5d9f922a1ac92affff3608d639665 Mon Sep 17 00:00:00 2001 From: Phong Chuong Date: Wed, 17 Sep 2025 20:56:29 +0000 Subject: [PATCH 13/16] if if-config --- src/integration-tests/src/bigquery.rs | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/src/integration-tests/src/bigquery.rs b/src/integration-tests/src/bigquery.rs index d140113c20..d34233796d 100644 --- a/src/integration-tests/src/bigquery.rs +++ b/src/integration-tests/src/bigquery.rs @@ -293,20 +293,19 @@ async fn cleanup_stale_jobs(client: &bigquery::client::JobService, project_id: & .filter_map(|r| match r { Ok(r) => { if let Some(job_reference) = &r.job_reference { - if let Some(configuration) = &r.configuration { - if configuration + if let Some(configuration) = &r.configuration + && configuration .labels .get(INSTANCE_LABEL) .is_some_and(|v| v == "true") - { - return Some( - client - .delete_job() - .set_project_id(project_id) - .set_job_id(&job_reference.job_id) - .send(), - ); - } + { + return Some( + client + .delete_job() + .set_project_id(project_id) + .set_job_id(&job_reference.job_id) + .send(), + ); } } None From ad6f77f090df571bd7afc33fc8567ff92bc0bec7 Mon Sep 17 00:00:00 2001 From: Phong Chuong Date: Wed, 17 Sep 2025 21:11:32 +0000 Subject: [PATCH 14/16] allow clippy::collapsible_if --- src/integration-tests/src/bigquery.rs | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/src/integration-tests/src/bigquery.rs b/src/integration-tests/src/bigquery.rs index d34233796d..7123a0dccf 100644 --- a/src/integration-tests/src/bigquery.rs +++ b/src/integration-tests/src/bigquery.rs @@ -253,6 +253,8 @@ pub async fn job_service(builder: bigquery::builder::job_service::ClientBuilder) Ok(()) } +// For nested if let chain. +#[allow(clippy::collapsible_if)] async fn cleanup_stale_jobs(client: &bigquery::client::JobService, project_id: &str) -> Result<()> { use std::time::{Duration, SystemTime, UNIX_EPOCH}; let stale_deadline = SystemTime::now().duration_since(UNIX_EPOCH)?; @@ -293,19 +295,20 @@ async fn cleanup_stale_jobs(client: &bigquery::client::JobService, project_id: & .filter_map(|r| match r { Ok(r) => { if let Some(job_reference) = &r.job_reference { - if let Some(configuration) = &r.configuration - && configuration + if let Some(configuration) = &r.configuration { + if configuration .labels .get(INSTANCE_LABEL) .is_some_and(|v| v == "true") - { - return Some( - client - .delete_job() - .set_project_id(project_id) - .set_job_id(&job_reference.job_id) - .send(), - ); + { + return Some( + client + .delete_job() + .set_project_id(project_id) + .set_job_id(&job_reference.job_id) + .send(), + ); + } } } None From 5e0b92b52dd4cdcdcac6ff0e4df7792f8d16425b Mon Sep 17 00:00:00 2001 From: Phong Chuong Date: Thu, 18 Sep 2025 13:19:12 +0000 Subject: [PATCH 15/16] use is_some_and --- src/integration-tests/src/bigquery.rs | 29 ++++++++++----------------- 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/src/integration-tests/src/bigquery.rs b/src/integration-tests/src/bigquery.rs index 7123a0dccf..206fb83e9d 100644 --- a/src/integration-tests/src/bigquery.rs +++ b/src/integration-tests/src/bigquery.rs @@ -253,8 +253,6 @@ pub async fn job_service(builder: bigquery::builder::job_service::ClientBuilder) Ok(()) } -// For nested if let chain. -#[allow(clippy::collapsible_if)] async fn cleanup_stale_jobs(client: &bigquery::client::JobService, project_id: &str) -> Result<()> { use std::time::{Duration, SystemTime, UNIX_EPOCH}; let stale_deadline = SystemTime::now().duration_since(UNIX_EPOCH)?; @@ -294,22 +292,17 @@ async fn cleanup_stale_jobs(client: &bigquery::client::JobService, project_id: & .into_iter() .filter_map(|r| match r { Ok(r) => { - if let Some(job_reference) = &r.job_reference { - if let Some(configuration) = &r.configuration { - if configuration - .labels - .get(INSTANCE_LABEL) - .is_some_and(|v| v == "true") - { - return Some( - client - .delete_job() - .set_project_id(project_id) - .set_job_id(&job_reference.job_id) - .send(), - ); - } - } + let job_reference = r.job_reference?; + if r.configuration + .is_some_and(|c| c.labels.get(INSTANCE_LABEL).is_some_and(|v| v == "true")) + { + return Some( + client + .delete_job() + .set_project_id(project_id) + .set_job_id(&job_reference.job_id) + .send(), + ); } None } From 779f4b869007434f933c86dc04b89f96e7845c2c Mon Sep 17 00:00:00 2001 From: Phong Chuong Date: Thu, 18 Sep 2025 17:19:04 +0000 Subject: [PATCH 16/16] clean up only DONE jobs --- src/integration-tests/src/bigquery.rs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/integration-tests/src/bigquery.rs b/src/integration-tests/src/bigquery.rs index 206fb83e9d..7752edea59 100644 --- a/src/integration-tests/src/bigquery.rs +++ b/src/integration-tests/src/bigquery.rs @@ -242,14 +242,6 @@ pub async fn job_service(builder: bigquery::builder::job_service::ClientBuilder) .any(|v| v.as_ref().unwrap().id.contains(&job_id)) ); - client - .delete_job() - .set_project_id(&project_id) - .set_job_id(&job_id) - .send() - .await?; - println!("DELETE JOB"); - Ok(()) } @@ -295,6 +287,7 @@ async fn cleanup_stale_jobs(client: &bigquery::client::JobService, project_id: & let job_reference = r.job_reference?; if r.configuration .is_some_and(|c| c.labels.get(INSTANCE_LABEL).is_some_and(|v| v == "true")) + && r.status.is_some_and(|s| s.state == "DONE") { return Some( client