Skip to content

Commit

Permalink
Version: 0.6.0-alpha.1 (#342)
Browse files Browse the repository at this point in the history
* api: for redis and sqlite

* Version: 0.6.0-alpha.1

Changelog:
- Redis storage doesnt require pool to be clone. Allows use of deadpool-redis among others.
- Namespace is picked by default for `new` methods.

* fix: docs and tests

* lint: cargo clippy and fmt

* postgres: add a listener example
  • Loading branch information
geofmureithi authored Jul 2, 2024
1 parent 2fec565 commit bcbb015
Show file tree
Hide file tree
Showing 27 changed files with 610 additions and 427 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ members = [
"examples/tracing",
# "examples/rest-api",
"examples/async-std-runtime",
"examples/basics", "examples/redis-with-msg-pack",
"examples/basics", "examples/redis-with-msg-pack", "examples/redis-deadpool",
]


Expand Down
2 changes: 1 addition & 1 deletion benches/storages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ define_bench!("sqlite_in_memory", {

define_bench!("redis", {
let conn = apalis::redis::connect(env!("REDIS_URL")).await.unwrap();
let redis = RedisStorage::new(conn, Config::default());
let redis = RedisStorage::new(conn);
redis
});

Expand Down
3 changes: 1 addition & 2 deletions examples/actix-web/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use actix_web::rt::signal;
use actix_web::{web, App, HttpResponse, HttpServer};
use anyhow::Result;
use apalis::prelude::*;
use apalis::redis::Config;
use apalis::utils::TokioExecutor;
use apalis::{layers::tracing::TraceLayer, redis::RedisStorage};
use futures::future;
Expand All @@ -28,7 +27,7 @@ async fn main() -> Result<()> {
env_logger::init();

let conn = apalis::redis::connect("redis://127.0.0.1/").await?;
let storage = RedisStorage::new(conn, Config::default());
let storage = RedisStorage::new(conn);
let data = web::Data::new(storage.clone());
let http = async {
HttpServer::new(move || {
Expand Down
3 changes: 1 addition & 2 deletions examples/axum/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
//! ```
use anyhow::Result;
use apalis::prelude::*;
use apalis::redis::Config;
use apalis::{layers::tracing::TraceLayer, redis::RedisStorage};
use axum::{
extract::Form,
Expand Down Expand Up @@ -57,7 +56,7 @@ async fn main() -> Result<()> {
.with(tracing_subscriber::fmt::layer())
.init();
let conn = apalis::redis::connect("redis://127.0.0.1/").await?;
let storage = RedisStorage::new(conn, Config::default());
let storage = RedisStorage::new(conn);
// build our application with some routes
let app = Router::new()
.route("/", get(show_form).post(add_new_job::<Email>))
Expand Down
20 changes: 13 additions & 7 deletions examples/postgres/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
use anyhow::Result;
use apalis::layers::retry::RetryPolicy;
use apalis::postgres::PgPool;
use apalis::postgres::{PgListen, PgPool};
use apalis::prelude::*;
use apalis::{layers::tracing::TraceLayer, postgres::PostgresStorage};
use email_service::{send_email, Email};
use tower::retry::RetryLayer;
use tracing::{debug, info};

async fn produce_jobs(storage: &PostgresStorage<Email>) -> Result<()> {
// The programmatic way
let mut storage = storage.clone();
async fn produce_jobs(storage: &mut PostgresStorage<Email>) -> Result<()> {
for index in 0..10 {
storage
.push(Email {
Expand All @@ -35,15 +33,23 @@ async fn main() -> Result<()> {
.await
.expect("unable to run migrations for postgres");

let pg = PostgresStorage::new(pool);
produce_jobs(&pg).await?;
let mut pg = PostgresStorage::new(pool.clone());
produce_jobs(&mut pg).await?;

let mut listener = PgListen::new(pool).await?;

listener.subscribe_with(&mut pg);

tokio::spawn(async move {
listener.listen().await.unwrap();
});

Monitor::<TokioExecutor>::new()
.register_with_count(4, {
WorkerBuilder::new("tasty-orange")
.layer(TraceLayer::new())
.layer(RetryLayer::new(RetryPolicy::retries(5)))
.with_storage(pg.clone())
.with_storage(pg)
.build_fn(send_email)
})
.on_event(|e| debug!("{e:?}"))
Expand Down
3 changes: 1 addition & 2 deletions examples/prometheus/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
//! ```
use anyhow::Result;
use apalis::prelude::*;
use apalis::redis::Config;
use apalis::{layers::prometheus::PrometheusLayer, redis::RedisStorage};
use axum::{
extract::Form,
Expand All @@ -31,7 +30,7 @@ async fn main() -> Result<()> {
.with(tracing_subscriber::fmt::layer())
.init();
let conn = apalis::redis::connect("redis://127.0.0.1/").await?;
let storage = RedisStorage::new(conn, Config::default());
let storage = RedisStorage::new(conn);
// build our application with some routes
let recorder_handle = setup_metrics_recorder();
let app = Router::new()
Expand Down
21 changes: 21 additions & 0 deletions examples/redis-deadpool/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[package]
name = "redis-deadpool"
version = "0.1.0"
edition = "2021"

[dependencies]
deadpool-redis = { version = "0.15.1" }
anyhow = "1"
tokio = { version = "1", features = ["full"] }
apalis = { path = "../../", features = ["redis", "timeout"] }
serde = "1"
env_logger = "0.10"
tracing-subscriber = "0.3.11"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
email-service = { path = "../email-service" }
rmp-serde = "1.3"


[dependencies.tracing]
default-features = false
version = "0.1"
57 changes: 57 additions & 0 deletions examples/redis-deadpool/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::time::Duration;

use anyhow::Result;
use apalis::prelude::*;
use apalis::redis::RedisStorage;

use deadpool_redis::{Config, Connection, Runtime};
use email_service::{send_email, Email};
use tracing::info;

#[tokio::main]
async fn main() -> Result<()> {
std::env::set_var("RUST_LOG", "debug");

tracing_subscriber::fmt::init();

let config = apalis::redis::Config::default()
.set_namespace("apalis::redis-dead-pool")
.set_max_retries(5);

let cfg = Config::from_url("redis://127.0.0.1/");
let pool = cfg.create_pool(Some(Runtime::Tokio1)).unwrap();
let conn = pool.get().await.unwrap();
let mut storage = RedisStorage::new_with_config(conn, config);
// This can be in another part of the program
produce_jobs(&mut storage).await?;

let worker = WorkerBuilder::new("rango-tango")
.with_storage(storage)
.data(pool)
.build_fn(send_email);

Monitor::<TokioExecutor>::new()
.register_with_count(2, worker)
.shutdown_timeout(Duration::from_millis(5000))
.run_with_signal(async {
tokio::signal::ctrl_c().await?;
info!("Monitor starting shutdown");
Ok(())
})
.await?;
info!("Monitor shutdown complete");
Ok(())
}

async fn produce_jobs(storage: &mut RedisStorage<Email, Connection>) -> Result<()> {
for index in 0..10 {
storage
.push(Email {
to: index.to_string(),
text: "Test background job from apalis".to_string(),
subject: "Background email job".to_string(),
})
.await?;
}
Ok(())
}
4 changes: 2 additions & 2 deletions examples/redis/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use std::{
};

use anyhow::Result;
use apalis::prelude::*;
use apalis::redis::RedisStorage;
use apalis::{prelude::*, redis::Config};

use email_service::{send_email, Email};
use tracing::{error, info};
Expand Down Expand Up @@ -41,7 +41,7 @@ async fn main() -> Result<()> {
tracing_subscriber::fmt::init();

let conn = apalis::redis::connect("redis://127.0.0.1/").await?;
let storage = RedisStorage::new(conn, Config::default());
let storage = RedisStorage::new(conn);
// This can be in another part of the program
produce_jobs(storage.clone()).await?;

Expand Down
4 changes: 2 additions & 2 deletions examples/sentry/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use anyhow::Result;
use apalis::{
layers::{sentry::SentryLayer, tracing::TraceLayer},
prelude::*,
redis::{Config, RedisStorage},
redis::RedisStorage,
};
use email_service::Email;
use tokio::time::sleep;
Expand Down Expand Up @@ -130,7 +130,7 @@ async fn main() -> Result<()> {
.init();

let conn = apalis::redis::connect(redis_url).await?;
let storage = RedisStorage::new(conn, Config::default());
let storage = RedisStorage::new(conn);
//This can be in another part of the program
produce_jobs(storage.clone()).await?;

Expand Down
4 changes: 2 additions & 2 deletions examples/tracing/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tracing_subscriber::prelude::*;
use apalis::{
layers::tracing::TraceLayer,
prelude::{Monitor, Storage, WorkerBuilder, WorkerFactoryFn},
redis::{Config, RedisStorage},
redis::RedisStorage,
utils::TokioExecutor,
};

Expand Down Expand Up @@ -66,7 +66,7 @@ async fn main() -> Result<()> {
let conn = apalis::redis::connect(redis_url)
.await
.expect("Could not connect to RedisStorage");
let storage = RedisStorage::new(conn, Config::default());
let storage = RedisStorage::new(conn);
//This can be in another part of the program
produce_jobs(storage.clone()).await?;

Expand Down
9 changes: 5 additions & 4 deletions packages/apalis-core/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,18 +167,19 @@ where
S::Future: Send,

S::Response: 'static,
P::Layer: Layer<S>,
M: Layer<<P::Layer as Layer<S>>::Service>,
M: Layer<S>,
// P::Layer: Layer<S>,
// M: Layer<<P::Layer as Layer<S>>::Service>,
{
type Source = P;

type Service = M::Service;
/// Build a worker, given a tower service
fn build(self, service: S) -> Worker<Ready<Self::Service, P>> {
let worker_id = self.id;
let common_layer = self.source.common_layer(worker_id.clone());
// let common_layer = self.source.common_layer(worker_id.clone());
let poller = self.source;
let middleware = self.layer.layer(common_layer);
let middleware = self.layer;
let service = middleware.service(service);

Worker::new(worker_id, Ready::new(service, poller))
Expand Down
29 changes: 24 additions & 5 deletions packages/apalis-core/src/layers.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use futures::channel::mpsc::{SendError, Sender};
use futures::SinkExt;
use std::marker::PhantomData;
use std::{fmt, sync::Arc};
pub use tower::{layer::layer_fn, util::BoxCloneService, Layer, Service, ServiceBuilder};

use futures::{future::BoxFuture, Future, FutureExt};
pub use tower::{
layer::layer_fn, layer::util::Identity, util::BoxCloneService, Layer, Service, ServiceBuilder,
};

use crate::{request::Request, worker::WorkerId};
use futures::{future::BoxFuture, Future, FutureExt};

/// A generic layer that has been stripped off types.
/// This is returned by a [crate::Backend] and can be used to customize the middleware of the service consuming tasks
Expand Down Expand Up @@ -158,12 +161,28 @@ pub trait Ack<J> {
type Error: std::error::Error;
/// Acknowledges successful processing of the given request
fn ack(
&self,
&mut self,
worker_id: &WorkerId,
data: &Self::Acknowledger,
) -> impl Future<Output = Result<(), Self::Error>> + Send;
}

/// A generic stream that emits (worker_id, task_id)
#[derive(Debug)]
pub struct AckStream<A>(pub Sender<(WorkerId, A)>);

impl<J, A: Send + Clone + 'static> Ack<J> for AckStream<A> {
type Acknowledger = A;
type Error = SendError;
fn ack(
&mut self,
worker_id: &WorkerId,
data: &Self::Acknowledger,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
self.0.send((worker_id.clone(), data.clone())).boxed()
}
}

/// A layer that acknowledges a job completed successfully
#[derive(Debug)]
pub struct AckLayer<A: Ack<J>, J> {
Expand Down Expand Up @@ -244,7 +263,7 @@ where
}

fn call(&mut self, request: Request<J>) -> Self::Future {
let ack = self.ack.clone();
let mut ack = self.ack.clone();
let worker_id = self.worker_id.clone();
let data = request.get::<<A as Ack<J>>::Acknowledger>().cloned();

Expand Down
Loading

0 comments on commit bcbb015

Please sign in to comment.