From 498ecd6404a2e8f7729daf13c4f19a1a787c75e3 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Thu, 17 Oct 2024 09:47:25 +0200 Subject: [PATCH] Fetch more than one page when polling stripe events (#19343) This fixes a bug that was causing most users to be unable to use the LLMs via Zed. It was caused by not using pagination and, instead, always querying the very first page of stripe events. Note that we're also allowing processing events generated in the last 24 hours (before, this was only 1 hour). I did this so that we can process the backlog of events that the aforementioned bug was skipping. Release Notes: - N/A --- crates/collab/src/api/billing.rs | 63 +++++++++++++++++--------------- 1 file changed, 34 insertions(+), 29 deletions(-) diff --git a/crates/collab/src/api/billing.rs b/crates/collab/src/api/billing.rs index 02282b40c6d91..36745820a0a43 100644 --- a/crates/collab/src/api/billing.rs +++ b/crates/collab/src/api/billing.rs @@ -452,29 +452,28 @@ async fn poll_stripe_events( let mut pages_of_already_processed_events = 0; let mut unprocessed_events = Vec::new(); - loop { - if pages_of_already_processed_events >= NUMBER_OF_ALREADY_PROCESSED_PAGES_BEFORE_WE_STOP { - log::info!("saw {pages_of_already_processed_events} pages of already-processed events: stopping event retrieval"); - break; - } - - log::info!("retrieving events from Stripe: {}", event_types.join(", ")); - - let mut params = ListEvents::new(); - params.types = Some(event_types.clone()); - params.limit = Some(EVENTS_LIMIT_PER_PAGE); + log::info!( + "Stripe events: starting retrieval for {}", + event_types.join(", ") + ); + let mut params = ListEvents::new(); + params.types = Some(event_types.clone()); + params.limit = Some(EVENTS_LIMIT_PER_PAGE); - let events = stripe::Event::list(stripe_client, ¶ms).await?; + let mut event_pages = stripe::Event::list(&stripe_client, ¶ms) + .await? + .paginate(params); + loop { let processed_event_ids = { - let event_ids = &events + let event_ids = event_pages + .page .data .iter() .map(|event| event.id.as_str()) .collect::>(); - app.db - .get_processed_stripe_events_by_event_ids(event_ids) + .get_processed_stripe_events_by_event_ids(&event_ids) .await? .into_iter() .map(|event| event.stripe_event_id) @@ -482,13 +481,13 @@ async fn poll_stripe_events( }; let mut processed_events_in_page = 0; - let events_in_page = events.data.len(); - for event in events.data { + let events_in_page = event_pages.page.data.len(); + for event in &event_pages.page.data { if processed_event_ids.contains(&event.id.to_string()) { processed_events_in_page += 1; - log::debug!("Stripe event {} already processed: skipping", event.id); + log::debug!("Stripe events: already processed '{}', skipping", event.id); } else { - unprocessed_events.push(event); + unprocessed_events.push(event.clone()); } } @@ -496,15 +495,21 @@ async fn poll_stripe_events( pages_of_already_processed_events += 1; } - if !events.has_more { + if event_pages.page.has_more { + if pages_of_already_processed_events >= NUMBER_OF_ALREADY_PROCESSED_PAGES_BEFORE_WE_STOP + { + log::info!("Stripe events: stopping, saw {pages_of_already_processed_events} pages of already-processed events"); + break; + } else { + log::info!("Stripe events: retrieving next page"); + event_pages = event_pages.next(&stripe_client).await?; + } + } else { break; } } - log::info!( - "unprocessed events from Stripe: {}", - unprocessed_events.len() - ); + log::info!("Stripe events: unprocessed {}", unprocessed_events.len()); // Sort all of the unprocessed events in ascending order, so we can handle them in the order they occurred. unprocessed_events.sort_by(|a, b| a.created.cmp(&b.created).then_with(|| a.id.cmp(&b.id))); @@ -520,12 +525,12 @@ async fn poll_stripe_events( // If the event has happened too far in the past, we don't want to // process it and risk overwriting other more-recent updates. // - // 1 hour was chosen arbitrarily. This could be made longer or shorter. - let one_hour = Duration::from_secs(60 * 60); - let an_hour_ago = Utc::now() - one_hour; - if an_hour_ago.timestamp() > event.created { + // 1 day was chosen arbitrarily. This could be made longer or shorter. + let one_day = Duration::from_secs(24 * 60 * 60); + let a_day_ago = Utc::now() - one_day; + if a_day_ago.timestamp() > event.created { log::info!( - "Stripe event {} is more than {one_hour:?} old, marking as processed", + "Stripe events: event '{}' is more than {one_day:?} old, marking as processed", event_id ); app.db