Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic Job Restoration on Application Restart #448

Closed
anwar-al-jahwari opened this issue Nov 19, 2024 · 15 comments · Fixed by #453
Closed

Dynamic Job Restoration on Application Restart #448

anwar-al-jahwari opened this issue Nov 19, 2024 · 15 comments · Fixed by #453
Assignees
Labels
bug Something isn't working

Comments

@anwar-al-jahwari
Copy link

Dynamic Job Restoration on Application Restart

First off, thank you for the excellent work on Apalis! We've been using it in production and are very happy with its capabilities so far. However, I wanted to inquire about a specific use case.

Context

In our production setup, we rely on Apalis for job scheduling and processing. While everything works smoothly during normal operations, we are exploring ways to ensure robust recovery in the event of an application restart (e.g., due to a deployment or crash). Specifically, we’d like to know if there is a dynamic mechanism to restore pending jobs into the queue automatically after a restart.

Request

  • Is there already a feature or pattern in Apalis to handle this scenario?
  • If not, would you consider adding functionality to enable dynamic restoration of pending or in-progress jobs on app startup?

This would greatly improve the resiliency of job processing in production environments. If this feature is not yet supported, I’d be happy to contribute ideas or discuss further.

Thanks again for the great library!

@geofmureithi
Copy link
Owner

Hello, great to hear that this library is very helpful. It has taken a lot of work and lots of contributors to get here.

Regarding the recovery issue, most storages have a recovery strategy. Eg in Redis we have reenqueue_orphaned which finds tasks that are incomplete and the worker has not had a heartbeat for a specific period of time. I might need to add the time to the config but this already exists, let me know if this works for you.

@anwar-al-jahwari
Copy link
Author

i think it will work but i wish if you can provide a small example.

@geofmureithi
Copy link
Owner

Aah I see what you are talking about, the functionality is implemented but not tied to the worker heartbeat. This is a bug and should be fixed ASAP

@geofmureithi
Copy link
Owner

In essence this should happen in the background by default.

@anwar-al-jahwari
Copy link
Author

anwar-al-jahwari commented Nov 20, 2024

for example i have the following job. the issue now if the app crashes for any reason. then i will the job will be not be resumed when its back again.

how to handle such scenario?

// Constants for configuration
const MAX_DURATION_SECS: u64 = 120; // 2 minutes
const CHECK_INTERVAL_SECS: u64 = 5; // 5 seconds
const MAX_CHECKS: u64 = MAX_DURATION_SECS / CHECK_INTERVAL_SECS;

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq,)]
pub struct PaymentStatusByOrderIdInput {
    pub msisdn: String,
    pub auth_token: String,
    pub order_id: String,
    pub attempts: i32,
    pub max_attempts: i32,
    pub amount: String,
    pub success_message: String,
    pub failure_message: String,
}

impl Job for PaymentStatusByOrderIdInput {
    const NAME: &'static str = "whatsappbot:payment_status_check";
}

pub struct PaymentStatusCheckHandler;

impl PaymentStatusCheckHandler {
    // Helper function to handle payment success
    async fn handle_success(
        msisdn: &str,
        success_message: &str,
        user_context: &Arc<tokio::sync::Mutex<FlowContext,>,>,
        state: &Arc<AppState,>,
    ) -> Result<(), JobError,> {
        // Send success message
        state
            .messaging_service
            .send_text_message(msisdn, success_message,)
            .await
            .map_err(|e| {
                e.change_context(JobError::ProcessingError,)
                    .attach_printable("Failed to send success message",)
            },)?;

        // Initialize action selection flow
        let mut ctx = user_context.lock().await;
        let handler = ActionSelectionHandler {
            app_state: state.clone(),
        };
        handler
            .initialize_action_selection(&mut ctx,)
            .await
            .map_err(|e| {
                e.change_context(JobError::ProcessingError,)
                    .attach_printable("Failed to initialize action selection",)
            },)?;

        Ok((),)
    }

    // Helper function to handle payment failure
    async fn handle_failure(
        msisdn: &str,
        failure_message: &str,
        order_id: &str,
        amount: &str,
        user_context: &Arc<tokio::sync::Mutex<FlowContext,>,>,
        state: &Arc<AppState,>,
    ) -> Result<(), JobError,> {
        error!(
            msisdn = %msisdn,
            order_id = %order_id,
            amount = %amount,
            "Payment transaction failed"
        );

        // Send failure message
        state
            .messaging_service
            .send_text_message(msisdn, failure_message,)
            .await
            .map_err(|e| {
                e.change_context(JobError::ProcessingError,)
                    .attach_printable("Failed to send failure message",)
            },)?;

        // Initialize action selection flow
        let mut ctx = user_context.lock().await;
        let handler = ActionSelectionHandler {
            app_state: state.clone(),
        };
        handler
            .initialize_action_selection(&mut ctx,)
            .await
            .map_err(|e| {
                e.change_context(JobError::ProcessingError,)
                    .attach_printable("Failed to initialize action selection",)
            },)?;

        Ok((),)
    }

    // Helper function to handle timeout
    async fn handle_timeout(
        msisdn: &str,
        user_context: &Arc<tokio::sync::Mutex<FlowContext,>,>,
        state: &Arc<AppState,>,
    ) -> Result<(), JobError,> {
        const TIMEOUT_MESSAGE: &str = "We haven't received confirmation of your payment yet. If you've completed the payment, please contact support for assistance.";

        // Send timeout message
        state
            .messaging_service
            .send_text_message(msisdn, TIMEOUT_MESSAGE,)
            .await
            .map_err(|e| {
                e.change_context(JobError::ProcessingError,)
                    .attach_printable("Failed to send timeout message",)
            },)?;

        // Initialize action selection flow
        let mut ctx = user_context.lock().await;
        let handler = ActionSelectionHandler {
            app_state: state.clone(),
        };
        handler
            .initialize_action_selection(&mut ctx,)
            .await
            .map_err(|e| {
                e.change_context(JobError::ProcessingError,)
                    .attach_printable("Failed to initialize action selection",)
            },)?;

        Ok((),)
    }
}

#[async_trait]
impl BaseJobHandler for PaymentStatusCheckHandler {
    async fn handle_job(&self, job: JobType, state: Arc<AppState,>,) -> Result<(), JobError,> {
        match job {
            JobType::PaymentStatusByOrderId(payment_job,) => {
                let max_duration = Duration::from_secs(MAX_DURATION_SECS,);
                let interval = Duration::from_secs(CHECK_INTERVAL_SECS,);
                let start_time = std::time::Instant::now();

                info!(
                    msisdn = %payment_job.msisdn,
                    order_id = %payment_job.order_id,
                    attempt = %payment_job.attempts,
                    "Starting payment status check"
                );

                let mut interval_timer = time::interval(interval,);
                let user_context = match state
                    .flow_manager
                    .get_context(&payment_job.msisdn,)
                    .await
                    .map_err(|e| {
                        e.change_context(JobError::ProcessingError,)
                            .attach_printable("Failed to get context from Redis",)
                            .attach_printable(format!("User ID: {}", &payment_job.msisdn),)
                    },)?
                {
                    Some(context,) => Arc::new(tokio::sync::Mutex::new(context,),),
                    None => Arc::new(tokio::sync::Mutex::new(FlowContext::new(
                        payment_job.msisdn.clone(),
                    ),),),
                };

                let mut check_count = 0u32;

                while start_time.elapsed() < max_duration {
                    interval_timer.tick().await;
                    check_count += 1;

                    info!(
                        msisdn = %payment_job.msisdn,
                        order_id = %payment_job.order_id,
                        elapsed_secs = %start_time.elapsed().as_secs(),
                        "Checking payment status - Attempt {}/{}",
                        check_count,
                        MAX_CHECKS
                    );

                    match state
                        .internal_services
                        .payment
                        .get_change_plan_payment_status(
                            &payment_job.msisdn,
                            &payment_job.auth_token,
                            &payment_job.order_id,
                        )
                        .await
                    {
                        Ok(CreateChangePlanPaymentLinkResult::Success(response,),) => {
                            info!(
                                msisdn = %payment_job.msisdn,
                                order_id = %payment_job.order_id,
                                status = %response.status,
                                "Payment status response received"
                            );

                            match response.status.as_str() {
                                "PROCESSED" | "PROCESSED_PAID" => {
                                    return Self::handle_success(
                                        &payment_job.msisdn,
                                        &payment_job.success_message,
                                        &user_context,
                                        &state,
                                    )
                                    .await;
                                }
                                "FAILED" => {
                                    return Self::handle_failure(
                                        &payment_job.msisdn,
                                        &payment_job.failure_message,
                                        &payment_job.order_id,
                                        &payment_job.amount,
                                        &user_context,
                                        &state,
                                    )
                                    .await;
                                }
                                _ => {
                                    warn!(
                                        msisdn = %payment_job.msisdn,
                                        order_id = %payment_job.order_id,
                                        status = %response.status,
                                        "Payment status still pending"
                                    );
                                }
                            }
                        }
                        Ok(CreateChangePlanPaymentLinkResult::Error { error, },) => {
                            error!(
                                msisdn = %payment_job.msisdn,
                                order_id = %payment_job.order_id,
                                error = ?error,
                                "Error checking payment status"
                            );
                            return Self::handle_failure(
                                &payment_job.msisdn,
                                &payment_job.failure_message,
                                &payment_job.order_id,
                                &payment_job.amount,
                                &user_context,
                                &state,
                            )
                            .await;
                        }
                        Err(e,) => {
                            error!(
                                msisdn = %payment_job.msisdn,
                                order_id = %payment_job.order_id,
                                error = ?e,
                                "Failed to check payment status"
                            );
                            return Self::handle_failure(
                                &payment_job.msisdn,
                                &payment_job.failure_message,
                                &payment_job.order_id,
                                &payment_job.amount,
                                &user_context,
                                &state,
                            )
                            .await;
                        }
                    }
                }

                Self::handle_timeout(&payment_job.msisdn, &user_context, &state,).await
            }
            _ => Err(JobError::ProcessingError.into(),),
        }
    }
}

impl JobHandler<PaymentStatusByOrderIdInput,> for PaymentStatusCheckHandler {
    fn handle(
        job: PaymentStatusByOrderIdInput,
        app_state: Arc<AppState,>,
    ) -> Pin<Box<dyn Future<Output = Result<(), JobError,>,> + Send,>,> {
        Box::pin(async move {
            let handler = PaymentStatusCheckHandler;
            handler
                .handle_job(JobType::PaymentStatusByOrderId(job,), app_state,)
                .await
        },)
    }
}

pub struct PaymentStatusByOrderIdProcessor {
    processor: GenericJobProcessor<PaymentStatusByOrderIdInput,>,
}

impl PaymentStatusByOrderIdProcessor {
    pub async fn new(redis_url: &str, app_state: Arc<AppState,>,) -> Result<Self, JobError,> {
        let processor = GenericJobProcessor::new(redis_url, app_state,).await?;
        Ok(Self { processor, },)
    }

    pub async fn push(
        &mut self,
        status_check: PaymentStatusByOrderIdInput,
    ) -> Result<(), JobError,> {
        self.processor.push(status_check,).await
    }

    pub fn create_processor(
        self,
    ) -> impl Future<Output = std::result::Result<(), std::io::Error,>,> {
        self.processor
            .create_processor::<PaymentStatusCheckHandler>()
    }
}

pub async fn setup_payment_status_by_order_id_processor(
    redis_url: &str,
    app_state: Arc<AppState,>,
) -> Result<(), JobError,> {
    let processor = PaymentStatusByOrderIdProcessor::new(redis_url, app_state,).await?;
    tokio::spawn(processor.create_processor(),);
    Ok((),)
}

@geofmureithi
Copy link
Owner

Currently there is a bug in the current implementation. The resuming feature is implemented but is not being called.
The solution I will offer you will look like this:

/// Config for a [RedisStorage]
#[derive(Clone, Debug)]
pub struct Config {
    poll_interval: Duration,
    buffer_size: usize,
    max_retries: usize,
    keep_alive: Duration,
    enqueue_scheduled: Duration,
    reenqueue_orphaned_after: Duration,
    namespace: String,
}

Now what you will be able to do is set the value when creating the storage:

let config = apalis::redis::Config::default();
config.reenqueue_orphaned_after = Duration::from_secs(30);
let storage = RedisStorage::new_with_config(conn, config);

This will mean that if a job has been abandoned for more than 30 secs( worker died, worker unreachable, or system restart), it will be added back to the queue.

This should solve you issue and will be applied to all backends (redis, postgres, sqlite etc).

Let me know if this fixes your issue.

@geofmureithi geofmureithi added the bug Something isn't working label Nov 20, 2024
@geofmureithi geofmureithi self-assigned this Nov 20, 2024
@geofmureithi
Copy link
Owner

For now before the bug fix is implemented, here is an example of how to work around the issue:

fn main() {
    let config = apalis::redis::Config::default();
    let storage = RedisStorage::new_with_config(conn, config);

    // Spawn a future in the background to recover abandoned jobs
   tokio::spawn(async {
        loop {
            tokio::time::sleep(Duration::from_secs(30).await;
            // resume the oldest 100 jobs that have been abandoned for more than 60secs
            storage.reenqueue_orphaned(100, 60).await; 
        }
   })
}

@anwar-al-jahwari
Copy link
Author

thanks man i really appreciate your effort. and i would like to contribute to the project. but i haven't worked in any open source project before as contributor. how i can start?

@geofmureithi
Copy link
Owner

Appreciated!
I think there is no standard way to contribute. For example reporting this but is actually contributing. You can also help with other issues and requested features. You can help improve documentation, write an article about apalis, or sponsor the project. I think the idea is start small, and dont think anything small is not appreciated.

@anwar-al-jahwari
Copy link
Author

anwar-al-jahwari commented Nov 23, 2024

thank you for your kind words. i see that u have merged the changes dose this mean it will now be a default behavior?

@geofmureithi
Copy link
Owner

Yes!, this is now going to be in the next release (0.6.0-rc9).

@anwar-al-jahwari
Copy link
Author

another question in mind what if i want to only resume jobs that are abandoned for only max of 5 mintues

@geofmureithi
Copy link
Owner

Do you mean like after 5min? Like jobs abandoned after 5mins or more? Because if you are asking if 5mins or less, it becomes hard to know which jobs that are still pending and which are abandoned.

@anwar-al-jahwari
Copy link
Author

i meant less than 5 minutes.
Also, when the job is considered to be not successful?

because i want to apply the retry policy so that it will automatically retry for like 4 minutes with 5 second pause between tries. currently i am using for loop which is not ideal but it worked.

for example.

#[async_trait]
impl BaseJobHandler for PaymentStatusByPaymentIHandler {
    async fn handle_job(&self, job: JobType) -> Result<(), JobError> {
        match job {
            JobType::PaymentStatusByPaymentId(payment_job) => {
                let max_duration = Duration::from_secs(MAX_DURATION_SECS);
                let interval = Duration::from_secs(CHECK_INTERVAL_SECS);
                let start_time = std::time::Instant::now();

                info!(
                    msisdn = %payment_job.msisdn,
                    payment_id = %payment_job.payment_id,
                    attempt = %payment_job.attempts,
                    "Starting payment status check"
                );

                let mut interval_timer = time::interval(interval);
                let mut user_context = global_flow_manager()
                    .get_context(&payment_job.msisdn)
                    .await
                    .change_context(JobError::ProcessingError)?
                    .ok_or_else(|| error_stack::report!(JobError::ProcessingError))?;

                let mut check_count = 0u32;
               // the ugly loop
                while start_time.elapsed() < max_duration {
                    interval_timer.tick().await;
                    check_count += 1;
               }
--- the rest of the logic

@geofmureithi
Copy link
Owner

@anwar-al-jahwari You could use a BackoffRetryLayer, this would retry the task then wait some time if its still failing, meaning you would only care about:

         match state
                        .internal_services
                        .payment
                        .get_change_plan_payment_status(
                            &payment_job.msisdn,
                            &payment_job.auth_token,
                            &payment_job.order_id,
                        )
                        .await
                    {
....

See more conversations here: #399

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants