Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements to the Retry Layer #399

Open
itsyaasir opened this issue Aug 24, 2024 · 10 comments
Open

Improvements to the Retry Layer #399

itsyaasir opened this issue Aug 24, 2024 · 10 comments

Comments

@itsyaasir
Copy link

Hi, I have seen that in the new tower v0.5 they finally added the Backoff/ExponentialBackoff policy which can be used with the retry layer.

It will be great if this library will be able to integrate this.

Thanks

@geofmureithi
Copy link
Owner

geofmureithi commented Aug 25, 2024

Technically we have first class support for anything from tower.
That said, we need a robust solution that integrates well with different backends as the current approach handles retries in memory.

@reneklacan
Copy link

reneklacan commented Dec 5, 2024

In case somebody is looking for a working solution for apalis v0.5 that they can just copy-paste and easily modify:

use anyhow::Result;
use apalis::prelude::*;
use std::time::Duration;
use tokio::time::{sleep, Sleep};
use tower::retry::Policy;

type Req<T> = Request<T>;
type Err = Error;

#[derive(Clone, Debug)]
pub struct BackoffRetryPolicy {
    pub retries: usize,
    pub initial_backoff: Duration,
    pub multiplier: f64,
    pub max_backoff: Duration,
}

impl Default for BackoffRetryPolicy {
    fn default() -> Self {
        Self {
            retries: 25,
            initial_backoff: Duration::from_millis(1000),
            multiplier: 1.5,
            max_backoff: Duration::from_secs(60),
        }
    }
}

impl BackoffRetryPolicy {
    fn backoff_duration(&self, attempt: usize) -> Duration {
        let backoff = self.initial_backoff.as_millis() as f64 * self.multiplier.powi(attempt as i32);
        Duration::from_millis(backoff.min(self.max_backoff.as_millis() as f64) as u64)
    }
}

impl<T, Res> Policy<Req<T>, Res, Err> for BackoffRetryPolicy
where
    T: Clone,
{
    type Future = Sleep;

    fn retry(&mut self, req: &mut Req<T>, result: &mut Result<Res, Err>) -> Option<Self::Future> {
        let ctx = req.get::<Attempt>().cloned().unwrap_or_default();

        match result {
            Ok(_) => None,
            Err(_) if (self.retries - ctx.current() > 0) => {
                let backoff_duration = self.backoff_duration(ctx.current());
                Some(sleep(backoff_duration))
            }
            Err(_) => None,
        }
    }

    fn clone_request(&mut self, req: &Req<T>) -> Option<Req<T>> {
        let mut req = req.clone();
        let value = req
            .get::<Attempt>()
            .cloned()
            .map(|attempt| {
                attempt.increment();
                attempt
            })
            .unwrap_or_default();
        req.insert(value);
        Some(req)
    }
}

and then:

// ...

.layer(RetryLayer::new(BackoffRetryPolicy::default())

// or 

.layer(RetryLayer::new(BackoffRetryPolicy {
    retries: 10,
    initial_backoff: std::time::Duration::from_millis(1000),
    multiplier: 4.0,
    max_backoff: std::time::Duration::from_secs(60),
}))

@geofmureithi
Copy link
Owner

@reneklacan The example provided is outdated, it would only work for v0.5.
The other problem that needs to be resolved is the fact that this happens in memory, meaning that it cannot be gracefully shutdown and it does not update the backend.

@reneklacan
Copy link

@geofmureithi I updated my comment to mention v0.5 (when I get to upgrading Apalis, will make sure to add a version for v0.6 as well)

@reneklacan The example provided is outdated, it would only work for v0.5.
The other problem that needs to be resolved is the fact that this happens in memory, meaning that it cannot be gracefully shutdown and it does not update the backend.

I realize that based on your first comment in this thread but I mean this one is still better than nothing

Either way, thanks a lot for all the effort you are putting into Apalis.

@geofmureithi
Copy link
Owner

Yeah it's better than nothing. From your approach I got some ideas. Let me try something and see if it works.

@reneklacan
Copy link

reneklacan commented Dec 30, 2024

Working version of previously shared retry policy for Apalis v0.6

use anyhow::Result;
use apalis::prelude::*;
use std::time::Duration;
use tokio::time::{sleep, Sleep};
use tower::retry::Policy;

type Req<T, Ctx> = Request<T, Ctx>;
type Err = Error;

#[derive(Clone, Debug)]
pub struct BackoffRetryPolicy {
    pub retries: usize,
    pub initial_backoff: Duration,
    pub multiplier: f64,
    pub max_backoff: Duration,
}

impl Default for BackoffRetryPolicy {
    fn default() -> Self {
        Self {
            retries: 25,
            initial_backoff: Duration::from_millis(1000),
            multiplier: 1.5,
            max_backoff: Duration::from_secs(60),
        }
    }
}

impl BackoffRetryPolicy {
    fn backoff_duration(&self, attempt: usize) -> Duration {
        let backoff = self.initial_backoff.as_millis() as f64 * self.multiplier.powi(attempt as i32);
        Duration::from_millis(backoff.min(self.max_backoff.as_millis() as f64) as u64)
    }
}

impl<T, Res, Ctx> Policy<Req<T, Ctx>, Res, Err> for BackoffRetryPolicy
where
    T: Clone,
    Ctx: Clone,
{
    type Future = Sleep;

    fn retry(&mut self, req: &mut Req<T, Ctx>, result: &mut Result<Res, Err>) -> Option<Self::Future> {
        let attempt = req.parts.attempt.current();

        match result {
            Ok(_) => None,
            Err(_) if (self.retries - attempt > 0) => Some(sleep(self.backoff_duration(attempt))),
            Err(_) => None,
        }
    }

    fn clone_request(&mut self, req: &Req<T, Ctx>) -> Option<Req<T, Ctx>> {
        let req = req.clone();
        req.parts.attempt.increment();
        Some(req)
    }
}

Usage is the same:

// ...

.layer(RetryLayer::new(BackoffRetryPolicy::default())

// or 

.layer(RetryLayer::new(BackoffRetryPolicy {
    retries: 10,
    initial_backoff: std::time::Duration::from_millis(1000),
    multiplier: 4.0,
    max_backoff: std::time::Duration::from_secs(60),
}))

@geofmureithi
Copy link
Owner

@reneklacan I would recommend using req.parts.attempt rather than the .get() as that is not supported for v0.6 going on. It may still be working for backward compatibility.

@reneklacan
Copy link

@geofmureithi thanks, updated and refactored my example and it feels much cleaner now

@geofmureithi
Copy link
Owner

Awesome!

You can also replace .layer(RetryLayer::new with just .retry

@geofmureithi
Copy link
Owner

Just another reminder, that this does not synchronize the number of attempts with the backend.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants