From b958843805f42fd79ab5acd16802a2d76fd03029 Mon Sep 17 00:00:00 2001 From: Brian May Date: Fri, 4 Aug 2023 12:28:32 +1000 Subject: [PATCH] fix: Refactor slack rate limit retrying --- src/output/slack.rs | 152 ++++++++++++++++++++++---------------------- 1 file changed, 75 insertions(+), 77 deletions(-) diff --git a/src/output/slack.rs b/src/output/slack.rs index b9bba12..e174a75 100644 --- a/src/output/slack.rs +++ b/src/output/slack.rs @@ -10,6 +10,7 @@ use crate::Task; use anyhow::Error; use anyhow::Result; use async_trait::async_trait; +use futures::Future; use slack_morphism::errors::SlackClientError; use slack_morphism::prelude::*; use std::collections::HashMap; @@ -200,6 +201,33 @@ struct SlackState { ts: Option, } +async fn retry_slack(f: F) -> Option +where + F: Fn() -> Fut + Send + Sync, + Fut: Future> + Send, + R: Send, +{ + let mut count = 0u32; + loop { + count += 1; + match f().await { + Err(SlackClientError::RateLimitError(rle)) => { + let retry_time = Instant::now() + rle.retry_after.unwrap_or(DEFAULT_RETRY); + sleep_until(retry_time).await; + if count >= MAX_TRIES { + println!("Too many retries posting finished to slack: {rle}"); + break None; + }; + } + Err(err) => { + println!("Slack error posting finished: {err}"); + break None; + } + Ok(result) => break Some(result), + } + } +} + impl SlackState { fn new() -> Result { let slack_channel: String = config_env_var("SLACK_CHANNEL")?; @@ -232,19 +260,7 @@ impl SlackState { let client = SlackClient::new(SlackClientHyperConnector::new()); let session = client.open_session(&self.token); - - let title = slack_title(state); - - let mut installation_blocks = get_installation_blocks(state, &title); - let mut outdated_blocks = get_outdated_blocks(state); - - let mut blocks = vec![]; - blocks.append(&mut installation_blocks); - blocks.append(&mut outdated_blocks); - - let content = SlackMessageContent::new() - .with_blocks(blocks) - .with_text(title); + let content = get_update_content(state); if let Some(ts) = &self.ts { let update_req = SlackApiChatUpdateRequest::new( @@ -264,6 +280,32 @@ impl SlackState { Ok(()) } + async fn update_slack_final(&mut self, state: &State) { + let client = SlackClient::new(SlackClientHyperConnector::new()); + let session = client.open_session(&self.token); + let content = get_update_content(state); + + if let Some(ts) = &self.ts { + let update_req = SlackApiChatUpdateRequest::new( + self.slack_channel.clone().into(), + content, + ts.clone(), + ); + let do_post = || async { session.chat_update(&update_req).await }; + if let Some(update_response) = retry_slack(do_post).await { + self.ts = Some(update_response.ts); + } + } else { + let post_chat_req = + SlackApiChatPostMessageRequest::new(self.slack_channel.clone().into(), content); + + let do_post = || async { session.chat_post_message(&post_chat_req).await }; + if let Some(post_chat_response) = retry_slack(do_post).await { + self.ts = Some(post_chat_response.ts); + } + } + } + async fn send_finished(&self, state: &State) { #[allow(clippy::match_same_arms)] let data = state @@ -325,27 +367,8 @@ impl SlackState { content, ); - let mut count = 0u32; - loop { - count += 1; - match session.chat_post_message(&post_chat_req).await { - Err(SlackClientError::RateLimitError(err)) => { - let retry_time = Instant::now() + err.retry_after.unwrap_or(DEFAULT_RETRY); - sleep_until(retry_time).await; - if count >= MAX_TRIES { - println!("Too many retries posting finished to slack: {err}"); - break; - }; - } - Err(err) => { - println!("Slack error posting finished: {err}"); - break; - } - Ok(_) => { - break; - } - } - } + let do_post = || async { session.chat_post_message(&post_chat_req).await }; + retry_slack(do_post).await; } } @@ -395,30 +418,26 @@ impl SlackState { let post_chat_req = SlackApiChatPostMessageRequest::new(self.slack_channel.clone().into(), content); - let mut count = 0u32; - loop { - count += 1; - match session.chat_post_message(&post_chat_req).await { - Err(SlackClientError::RateLimitError(err)) => { - let retry_time = Instant::now() + err.retry_after.unwrap_or(DEFAULT_RETRY); - sleep_until(retry_time).await; - if count >= MAX_TRIES { - println!("Too many retries posting helm result to slack: {err}"); - break; - }; - } - Err(err) => { - println!("Slack error posting helm result: {err}"); - break; - } - Ok(_) => { - break; - } - } - } + let do_post = || async { session.chat_post_message(&post_chat_req).await }; + retry_slack(do_post).await; } } +fn get_update_content(state: &State) -> SlackMessageContent { + let title = slack_title(state); + + let mut installation_blocks = get_installation_blocks(state, &title); + let mut outdated_blocks = get_outdated_blocks(state); + + let mut blocks = vec![]; + blocks.append(&mut installation_blocks); + blocks.append(&mut outdated_blocks); + + SlackMessageContent::new() + .with_blocks(blocks) + .with_text(title) +} + fn get_installation_blocks(state: &State, title: &str) -> Vec { let status = results_to_string(state); let status = ["```".to_string(), status, "```".to_string()]; @@ -474,28 +493,7 @@ async fn update_results(state: &State, slack: &mut SlackState) -> Instant { } async fn update_final_results(state: &State, slack: &mut SlackState) { - let mut count = 0u32; - loop { - count += 1; - match slack.update_slack(state).await { - Err(SlackClientError::RateLimitError(err)) => { - let retry_time = Instant::now() + err.retry_after.unwrap_or(DEFAULT_RETRY); - sleep_until(retry_time).await; - if count >= MAX_TRIES { - println!("Too many retries posting final result to slack: {err}"); - break; - }; - } - Err(err) => { - println!("Slack error posting final result: {err}"); - break; - } - Ok(_) => { - break; - } - } - } - + slack.update_slack_final(state).await; slack.send_finished(state).await; }