Skip to content

Commit

Permalink
fix: stop download on action timeout, don't drop action on resend (#258)
Browse files Browse the repository at this point in the history
* fix: stop download on action timeout

* fix: don't clear action on unsuccessful re-send

* feat: log timeout of previous download

* style: clippy suggestions
  • Loading branch information
de-sh authored Jul 30, 2023
1 parent 8c12bb7 commit 24a1034
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 25 deletions.
17 changes: 10 additions & 7 deletions uplink/src/base/bridge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl Bridge {
}

fn clear_current_action(&mut self) {
self.current_action = None;
self.current_action.take();
}

pub async fn start(&mut self) -> Result<(), Error> {
Expand Down Expand Up @@ -217,6 +217,9 @@ impl Bridge {

error!("Failed to route action to app. Error = {:?}", error);
self.forward_action_error(action, error).await;

// Remove action because it couldn't be routed
self.clear_current_action()
}
event = self.bridge_rx.recv_async() => {
let event = event?;
Expand All @@ -236,6 +239,9 @@ impl Bridge {
let action = self.current_action.take().unwrap();
error!("Timeout waiting for action response. Action ID = {}", action.id);
self.forward_action_error(action.action, Error::ActionTimeout).await;

// Remove action because it timedout
self.clear_current_action()
}
// Flush streams that timeout
Some(timedout_stream) = streams.stream_timeouts.next(), if streams.stream_timeouts.has_pending() => {
Expand Down Expand Up @@ -371,6 +377,9 @@ impl Bridge {
if let Err(e) = self.try_route_action(fwd_action.clone()) {
error!("Failed to route action to app. Error = {:?}", e);
self.forward_action_error(fwd_action, e).await;

// Remove action because it couldn't be forwarded
self.clear_current_action()
}
}
}
Expand All @@ -392,12 +401,6 @@ impl Bridge {
if let Err(e) = self.action_status.fill(status).await {
error!("Failed to send status. Error = {:?}", e);
}

// Clear current action only if the error being forwarded was triggered by it
match self.current_action.as_ref() {
Some(c) if c.id == action.action_id => self.clear_current_action(),
_ => {}
}
}
}

Expand Down
66 changes: 48 additions & 18 deletions uplink/src/collector/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@
use bytes::BytesMut;
use flume::RecvError;
use futures_util::StreamExt;
use log::{error, info};
use log::{error, info, warn};
use reqwest::{Certificate, Client, ClientBuilder, Identity, Response};
use serde::{Deserialize, Serialize};
use tokio::time::timeout;

use std::collections::HashMap;
use std::fs::{create_dir_all, metadata, remove_dir_all, File};
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -91,6 +93,7 @@ pub struct FileDownloader {
bridge_tx: BridgeTx,
client: Client,
sequence: u32,
timeouts: HashMap<String, Duration>,
}

impl FileDownloader {
Expand All @@ -111,8 +114,16 @@ impl FileDownloader {
}
.build()?;

let timeouts = config
.downloader
.actions
.iter()
.map(|s| (s.name.to_owned(), Duration::from_secs(s.timeout)))
.collect();

Ok(Self {
config: config.downloader.clone(),
timeouts,
client,
bridge_tx,
sequence: 0,
Expand All @@ -134,27 +145,46 @@ impl FileDownloader {
let action = download_rx.recv_async().await?;
self.action_id = action.action_id.clone();

let mut error = None;
// NOTE: retry mechanism tries atleast 3 times before returning an error
for _ in 0..3 {
match self.run(action.clone()).await {
Ok(_) => {
error = None;
break;
}
Err(e) => {
error!("Download failed: {e}\nretrying");
error = Some(e);
}
let duration = match self.timeouts.get(&action.name) {
Some(t) => *t,
_ => {
error!("Action: {} unconfigured", action.name);
continue;
}
tokio::time::sleep(Duration::from_secs(30)).await;
};

// NOTE: if download has timedout don't do anything, else ensure errors are forwarded after three retries
match timeout(duration, self.retry_thrice(action)).await {
Ok(Err(e)) => self.forward_error(e).await,
Err(_) => error!("Last download has timedout"),
_ => {}
}
if let Some(e) = error {
let status = ActionResponse::failure(&self.action_id, e.to_string());
let status = status.set_sequence(self.sequence());
self.bridge_tx.send_action_response(status).await;
}
}

// Forward errors as action response to bridge
async fn forward_error(&mut self, err: Error) {
let status =
ActionResponse::failure(&self.action_id, err.to_string()).set_sequence(self.sequence());
self.bridge_tx.send_action_response(status).await;
}

// Retry mechanism tries atleast 3 times before returning an error
async fn retry_thrice(&mut self, action: Action) -> Result<(), Error> {
let mut res = Ok(());
for _ in 0..3 {
match self.run(action.clone()).await {
Ok(_) => return Ok(()),
Err(e) => {
error!("Download failed: {e}");
res = Err(e);
}
}
tokio::time::sleep(Duration::from_secs(30)).await;
warn!("Retrying download");
}

res
}

// Accepts a download `Action` and performs necessary data extraction to actually download the file
Expand Down

0 comments on commit 24a1034

Please sign in to comment.