diff --git a/crates/collab/src/api/billing.rs b/crates/collab/src/api/billing.rs index 02282b40c6d91..36745820a0a43 100644 --- a/crates/collab/src/api/billing.rs +++ b/crates/collab/src/api/billing.rs @@ -452,29 +452,28 @@ async fn poll_stripe_events( let mut pages_of_already_processed_events = 0; let mut unprocessed_events = Vec::new(); - loop { - if pages_of_already_processed_events >= NUMBER_OF_ALREADY_PROCESSED_PAGES_BEFORE_WE_STOP { - log::info!("saw {pages_of_already_processed_events} pages of already-processed events: stopping event retrieval"); - break; - } - - log::info!("retrieving events from Stripe: {}", event_types.join(", ")); - - let mut params = ListEvents::new(); - params.types = Some(event_types.clone()); - params.limit = Some(EVENTS_LIMIT_PER_PAGE); + log::info!( + "Stripe events: starting retrieval for {}", + event_types.join(", ") + ); + let mut params = ListEvents::new(); + params.types = Some(event_types.clone()); + params.limit = Some(EVENTS_LIMIT_PER_PAGE); - let events = stripe::Event::list(stripe_client, ¶ms).await?; + let mut event_pages = stripe::Event::list(&stripe_client, ¶ms) + .await? + .paginate(params); + loop { let processed_event_ids = { - let event_ids = &events + let event_ids = event_pages + .page .data .iter() .map(|event| event.id.as_str()) .collect::>(); - app.db - .get_processed_stripe_events_by_event_ids(event_ids) + .get_processed_stripe_events_by_event_ids(&event_ids) .await? .into_iter() .map(|event| event.stripe_event_id) @@ -482,13 +481,13 @@ async fn poll_stripe_events( }; let mut processed_events_in_page = 0; - let events_in_page = events.data.len(); - for event in events.data { + let events_in_page = event_pages.page.data.len(); + for event in &event_pages.page.data { if processed_event_ids.contains(&event.id.to_string()) { processed_events_in_page += 1; - log::debug!("Stripe event {} already processed: skipping", event.id); + log::debug!("Stripe events: already processed '{}', skipping", event.id); } else { - unprocessed_events.push(event); + unprocessed_events.push(event.clone()); } } @@ -496,15 +495,21 @@ async fn poll_stripe_events( pages_of_already_processed_events += 1; } - if !events.has_more { + if event_pages.page.has_more { + if pages_of_already_processed_events >= NUMBER_OF_ALREADY_PROCESSED_PAGES_BEFORE_WE_STOP + { + log::info!("Stripe events: stopping, saw {pages_of_already_processed_events} pages of already-processed events"); + break; + } else { + log::info!("Stripe events: retrieving next page"); + event_pages = event_pages.next(&stripe_client).await?; + } + } else { break; } } - log::info!( - "unprocessed events from Stripe: {}", - unprocessed_events.len() - ); + log::info!("Stripe events: unprocessed {}", unprocessed_events.len()); // Sort all of the unprocessed events in ascending order, so we can handle them in the order they occurred. unprocessed_events.sort_by(|a, b| a.created.cmp(&b.created).then_with(|| a.id.cmp(&b.id))); @@ -520,12 +525,12 @@ async fn poll_stripe_events( // If the event has happened too far in the past, we don't want to // process it and risk overwriting other more-recent updates. // - // 1 hour was chosen arbitrarily. This could be made longer or shorter. - let one_hour = Duration::from_secs(60 * 60); - let an_hour_ago = Utc::now() - one_hour; - if an_hour_ago.timestamp() > event.created { + // 1 day was chosen arbitrarily. This could be made longer or shorter. + let one_day = Duration::from_secs(24 * 60 * 60); + let a_day_ago = Utc::now() - one_day; + if a_day_ago.timestamp() > event.created { log::info!( - "Stripe event {} is more than {one_hour:?} old, marking as processed", + "Stripe events: event '{}' is more than {one_day:?} old, marking as processed", event_id ); app.db