From 24a1034b96f02c65c9b435582b636380a193f21d Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 30 Jul 2023 19:19:23 +0530 Subject: [PATCH] fix: stop download on action timeout, don't drop action on resend (#258) * fix: stop download on action timeout * fix: don't clear action on unsuccessful re-send * feat: log timeout of previous download * style: clippy suggestions --- uplink/src/base/bridge/mod.rs | 17 ++++---- uplink/src/collector/downloader.rs | 66 ++++++++++++++++++++++-------- 2 files changed, 58 insertions(+), 25 deletions(-) diff --git a/uplink/src/base/bridge/mod.rs b/uplink/src/base/bridge/mod.rs index 247bd6656..487d7562f 100644 --- a/uplink/src/base/bridge/mod.rs +++ b/uplink/src/base/bridge/mod.rs @@ -167,7 +167,7 @@ impl Bridge { } fn clear_current_action(&mut self) { - self.current_action = None; + self.current_action.take(); } pub async fn start(&mut self) -> Result<(), Error> { @@ -217,6 +217,9 @@ impl Bridge { error!("Failed to route action to app. Error = {:?}", error); self.forward_action_error(action, error).await; + + // Remove action because it couldn't be routed + self.clear_current_action() } event = self.bridge_rx.recv_async() => { let event = event?; @@ -236,6 +239,9 @@ impl Bridge { let action = self.current_action.take().unwrap(); error!("Timeout waiting for action response. Action ID = {}", action.id); self.forward_action_error(action.action, Error::ActionTimeout).await; + + // Remove action because it timedout + self.clear_current_action() } // Flush streams that timeout Some(timedout_stream) = streams.stream_timeouts.next(), if streams.stream_timeouts.has_pending() => { @@ -371,6 +377,9 @@ impl Bridge { if let Err(e) = self.try_route_action(fwd_action.clone()) { error!("Failed to route action to app. Error = {:?}", e); self.forward_action_error(fwd_action, e).await; + + // Remove action because it couldn't be forwarded + self.clear_current_action() } } } @@ -392,12 +401,6 @@ impl Bridge { if let Err(e) = self.action_status.fill(status).await { error!("Failed to send status. Error = {:?}", e); } - - // Clear current action only if the error being forwarded was triggered by it - match self.current_action.as_ref() { - Some(c) if c.id == action.action_id => self.clear_current_action(), - _ => {} - } } } diff --git a/uplink/src/collector/downloader.rs b/uplink/src/collector/downloader.rs index d68d277d1..65da5a949 100644 --- a/uplink/src/collector/downloader.rs +++ b/uplink/src/collector/downloader.rs @@ -50,10 +50,12 @@ use bytes::BytesMut; use flume::RecvError; use futures_util::StreamExt; -use log::{error, info}; +use log::{error, info, warn}; use reqwest::{Certificate, Client, ClientBuilder, Identity, Response}; use serde::{Deserialize, Serialize}; +use tokio::time::timeout; +use std::collections::HashMap; use std::fs::{create_dir_all, metadata, remove_dir_all, File}; use std::sync::Arc; use std::time::Duration; @@ -91,6 +93,7 @@ pub struct FileDownloader { bridge_tx: BridgeTx, client: Client, sequence: u32, + timeouts: HashMap, } impl FileDownloader { @@ -111,8 +114,16 @@ impl FileDownloader { } .build()?; + let timeouts = config + .downloader + .actions + .iter() + .map(|s| (s.name.to_owned(), Duration::from_secs(s.timeout))) + .collect(); + Ok(Self { config: config.downloader.clone(), + timeouts, client, bridge_tx, sequence: 0, @@ -134,27 +145,46 @@ impl FileDownloader { let action = download_rx.recv_async().await?; self.action_id = action.action_id.clone(); - let mut error = None; - // NOTE: retry mechanism tries atleast 3 times before returning an error - for _ in 0..3 { - match self.run(action.clone()).await { - Ok(_) => { - error = None; - break; - } - Err(e) => { - error!("Download failed: {e}\nretrying"); - error = Some(e); - } + let duration = match self.timeouts.get(&action.name) { + Some(t) => *t, + _ => { + error!("Action: {} unconfigured", action.name); + continue; } - tokio::time::sleep(Duration::from_secs(30)).await; + }; + + // NOTE: if download has timedout don't do anything, else ensure errors are forwarded after three retries + match timeout(duration, self.retry_thrice(action)).await { + Ok(Err(e)) => self.forward_error(e).await, + Err(_) => error!("Last download has timedout"), + _ => {} } - if let Some(e) = error { - let status = ActionResponse::failure(&self.action_id, e.to_string()); - let status = status.set_sequence(self.sequence()); - self.bridge_tx.send_action_response(status).await; + } + } + + // Forward errors as action response to bridge + async fn forward_error(&mut self, err: Error) { + let status = + ActionResponse::failure(&self.action_id, err.to_string()).set_sequence(self.sequence()); + self.bridge_tx.send_action_response(status).await; + } + + // Retry mechanism tries atleast 3 times before returning an error + async fn retry_thrice(&mut self, action: Action) -> Result<(), Error> { + let mut res = Ok(()); + for _ in 0..3 { + match self.run(action.clone()).await { + Ok(_) => return Ok(()), + Err(e) => { + error!("Download failed: {e}"); + res = Err(e); + } } + tokio::time::sleep(Duration::from_secs(30)).await; + warn!("Retrying download"); } + + res } // Accepts a download `Action` and performs necessary data extraction to actually download the file