Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BoxDynError in worker fn #470

Open
boingrach opened this issue Nov 29, 2024 · 7 comments
Open

BoxDynError in worker fn #470

boingrach opened this issue Nov 29, 2024 · 7 comments

Comments

@boingrach
Copy link

Hi,
I'm using boxed error in worker function.

example
use apalis::prelude::*;

use apalis_sql::sqlite::SqliteStorage;
use sqlx::SqlitePool;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    std::env::set_var("RUST_LOG", "debug,sqlx::query=info");
    tracing_subscriber::fmt::init();

    let pool = SqlitePool::connect("sqlite::memory:").await?;
    // Do migrations: Mainly for "sqlite::memory:"
    SqliteStorage::setup(&pool)
        .await
        .expect("unable to run migrations for sqlite");

    let storage: SqliteStorage<String> = SqliteStorage::new(pool.clone());

    Monitor::new()
        .register({
            WorkerBuilder::new("tasty-mango")
                .backend(storage)
                .build_fn(do_job)
        })
        .run()
        .await?;
    Ok(())
}

async fn do_job(job: String) -> Result<(), apalis::prelude::BoxDynError> {
    tracing::info!("job: {job}");
    Ok(())
}

#[derive(Debug)]
enum CustomError {}
impl std::error::Error for CustomError {}
impl std::fmt::Display for CustomError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        todo!()
    }
}

But i'm getting errors

errors
error[E0277]: the trait bound `ServiceFn<fn(std::string::String) -> impl Future<Output = Result<(), Box<(dyn StdError + Send + Sync + 'static)>>> {try_notify}, std::string::String, SqlContext, _>: tower_service::Service<apalis::prelude::Request<std::string::String, SqlContext>>` is not satisfied
  --> examples/sqlite/src/main.rs:25:18
   |
25 |                 .build_fn(try_notify)
   |                  ^^^^^^^^ unsatisfied trait bound
   |
   = help: the trait `tower_service::Service<apalis::prelude::Request<std::string::String, SqlContext>>` is not implemented for `ServiceFn<fn(String) -> ... {try_notify}, ..., ..., ...>`
   = help: the following other types implement trait `tower_service::Service<Request>`:
             ServiceFn<T, Req, Ctx, ()>
             ServiceFn<T, Req, Ctx, (A1, A2)>
             ServiceFn<T, Req, Ctx, (A1, A2, A3)>
             ServiceFn<T, Req, Ctx, (A1, A2, A3, A4)>
             ServiceFn<T, Req, Ctx, (A1, A2, A3, A4, A5)>
             ServiceFn<T, Req, Ctx, (A1, A2, A3, A4, A5, A6)>
             ServiceFn<T, Req, Ctx, (A1, A2, A3, A4, A5, A6, A7)>
             ServiceFn<T, Req, Ctx, (A1, A2, A3, A4, A5, A6, A7, A8)>
           and 9 others
   = note: required for `WorkerBuilder<String, SqlContext, SqliteStorage<String>, Identity, ...>` to implement `apalis::prelude::WorkerFactory<std::string::String, SqlContext, ServiceFn<fn(std::string::String) -> impl Future<Output = Result<(), Box<(dyn StdError + Send + Sync + 'static)>>> {try_notify}, std::string::String, SqlContext, _>>`
   = note: the full name for the type has been written to '/tmp/apalis/target/debug/deps/sqlite_example-de103ae0c3a1493e.long-type-8084250147342458497.txt'
   = note: consider using `--verbose` to print the full type name to the console

Replacing boxed error with concrete enum fixes errors

async fn do_job(job: String) -> Result<(), CustomError> {
    tracing::info!("job: {job}");
    Ok(())
}

#[derive(Debug)]
enum CustomError {}
impl std::error::Error for CustomError {}
impl std::fmt::Display for CustomError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        todo!()
    }
}

apalis v0.6.0

@geofmureithi
Copy link
Owner

geofmureithi commented Nov 29, 2024 via email

@boingrach
Copy link
Author

boingrach commented Nov 29, 2024

A lot of my services have tower layers applied. Some layers convert error into Box<dyn Error+...>

example
use apalis::prelude::*;

use apalis_sql::{context::SqlContext, sqlite::SqliteStorage};
use futures::future::BoxFuture;
use sqlx::SqlitePool;
use tower::{Layer, Service};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    tracing_subscriber::fmt::init();

    let pool = SqlitePool::connect("sqlite::memory:").await?;
    // Do migrations: Mainly for "sqlite::memory:"
    SqliteStorage::setup(&pool)
        .await
        .expect("unable to run migrations for sqlite");

    let storage: SqliteStorage<String> = SqliteStorage::new(pool.clone());

    Monitor::new()
        .register({
            WorkerBuilder::new("tasty-mango")
                .layer(DynLayer)
                .backend(storage)
                .build_fn(do_job)
        })
        .run()
        .await?;
    Ok(())
}

async fn do_job(job: String) -> Result<(), CustomError> {
    tracing::info!("job: {job}");
    Ok(())
}


#[derive(Debug)]
enum CustomError {}
impl std::error::Error for CustomError {}
impl std::fmt::Display for CustomError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        todo!()
    }
}


#[derive(Debug)]
struct DynLayerService<S> {
    inner: S,
}
impl<R, S> Service<R> for DynLayerService<S>
where
    S: Service<R>,
{
    type Response = S::Response;

    type Error = Box<dyn std::error::Error + Send + Sync>;

    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

    fn poll_ready(
        &mut self,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Result<(), Self::Error>> {
        todo!()
    }

    fn call(&mut self, req: R) -> Self::Future {
        todo!()
    }
}

struct DynLayer;
impl<S> Layer<S> for DynLayer {
    type Service = DynLayerService<S>;

    fn layer(&self, inner: S) -> Self::Service {
        DynLayerService { inner }
    }
}

this gives a possible hint why original build_fn is not allowed

error
error[E0277]: the size for values of type `(dyn std::error::Error + std::marker::Send + Sync + 'static)` cannot be known at compilation time
  --> src/main.rs:21:10
   |
21 |         .register({
   |          ^^^^^^^^ doesn't have a size known at compile-time
   |
   = help: the trait `Sized` is not implemented for `(dyn std::error::Error + std::marker::Send + Sync + 'static)`
   = help: the trait `tower::Layer<S>` is implemented for `apalis_core::layers::AckLayer<A, Req, Ctx, Res>`
   = note: required for `Box<(dyn std::error::Error + std::marker::Send + Sync + 'static)>` to implement `std::error::Error`
   = note: required for `apalis_core::layers::AckLayer<SqliteStorage<std::string::String>, std::string::String, SqlContext, ()>` to implement `tower::Layer<DynLayerService<apalis::prelude::ServiceFn<fn(std::string::String) -> impl futures::Future<Output = Result<(), CustomError>> {do_job}, std::string::String, SqlContext, ()>>>`

@boingrach
Copy link
Author

Some services do not need to pass real error, basic info is enough so boxed error suites well

@geofmureithi
Copy link
Owner

geofmureithi commented Nov 29, 2024 via email

@boingrach
Copy link
Author

boingrach commented Nov 29, 2024

kinda
ErrorHandlerLayer works only for manual Service implementation

example
use apalis::{layers::ErrorHandlingLayer, prelude::*};

use apalis_sql::{context::SqlContext, sqlite::SqliteStorage};
use futures::future::BoxFuture;
use sqlx::SqlitePool;
use tower::Service;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    tracing_subscriber::fmt().init();

    let pool = SqlitePool::connect("sqlite::memory:").await?;
    SqliteStorage::setup(&pool)
        .await
        .expect("unable to run migrations for sqlite");

    let storage: SqliteStorage<String> = SqliteStorage::new(pool.clone());

    let service_worker = WorkerBuilder::new("do_job")
        .layer(ErrorHandlingLayer::new())
        .backend(storage.clone())
        // fixed with layer
        .build(ServiceWithBoxedError);

    Monitor::new().register(service_worker).run().await?;

    let fn_worker = WorkerBuilder::new("do_job")
        .layer(ErrorHandlingLayer::new())
        .backend(storage.clone())
        // errors
        .build_fn(do_job);

    Monitor::new().register(fn_worker).run().await?;

    Ok(())
}

async fn do_job(job: String) -> Result<(), Box<dyn std::error::Error + Sync + Send + 'static>> {
    tracing::info!("job: {job}");
    Ok(())
}

struct ServiceWithBoxedError;

impl Service<Request<String, SqlContext>> for ServiceWithBoxedError {
    type Response = ();
    type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

    fn poll_ready(
        &mut self,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Result<(), Self::Error>> {
        todo!()
    }

    fn call(&mut self, req: Request<String, SqlContext>) -> Self::Future {
        todo!()
    }
}

the fn case fails with errors

errors
error[E0277]: the trait bound `apalis::prelude::ServiceFn<fn(std::string::String) -> impl futures::Future<Output = Result<(), Box<(dyn std::error::Error + std::marker::Send + Sync + 'static)>>> {do_job}, std::string::String, SqlContext, _>: Service<apalis::prelude::Request<std::string::String, SqlContext>>` is not satisfied

@geofmureithi
Copy link
Owner

I have replicated your issue and ack that it is because of what constraints that ServiceFn has.
Currently, there are several workarounds. I will spend some time on this within the coming days.

@geofmureithi
Copy link
Owner

A lot of my services have tower layers applied. Some layers convert error into Box<dyn Error+...>

For this, the ErrorHandlingLayer should work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants