Skip to content

Commit

Permalink
feat(ampd): retry event handling with timeout and max attemmpts (#251)
Browse files Browse the repository at this point in the history
* feat(ampd): retry event handling with timeout and max attemmpts

* add tests

* address comments

* improve tests

* address comments

* improve comments

---------

Co-authored-by: Sammy Liu <[email protected]>
  • Loading branch information
fish-sammy and Sammy Liu authored Feb 16, 2024
1 parent e7a2301 commit 96eadd5
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 13 deletions.
166 changes: 166 additions & 0 deletions ampd/src/asyncutil/future.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
use futures::{Future, FutureExt};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

use tokio::time;

pub fn with_retry<F, Fut, R, Err>(
future: F,
policy: RetryPolicy,
) -> impl Future<Output = Result<R, Err>>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<R, Err>>,
{
RetriableFuture::new(future, policy)
}

pub enum RetryPolicy {
RepeatConstant { sleep: Duration, max_attempts: u64 },
}

struct RetriableFuture<F, Fut, R, Err>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<R, Err>>,
{
future: F,
inner: Pin<Box<Fut>>,
policy: RetryPolicy,
err_count: u64,
}

impl<F, Fut, R, Err> Unpin for RetriableFuture<F, Fut, R, Err>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<R, Err>>,
{
}

impl<F, Fut, R, Err> RetriableFuture<F, Fut, R, Err>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<R, Err>>,
{
fn new(get_future: F, policy: RetryPolicy) -> Self {
let future = get_future();

Self {
future: get_future,
inner: Box::pin(future),
policy,
err_count: 0,
}
}

fn handle_err(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
error: Err,
) -> Poll<Result<R, Err>> {
self.err_count += 1;

match self.policy {
RetryPolicy::RepeatConstant {
sleep,
max_attempts,
} => {
if self.err_count >= max_attempts {
return Poll::Ready(Err(error));
}

self.inner = Box::pin((self.future)());

let waker = cx.waker().clone();
tokio::spawn(time::sleep(sleep).then(|_| async {
waker.wake();
}));

Poll::Pending
}
}
}
}

impl<F, Fut, R, Err> Future for RetriableFuture<F, Fut, R, Err>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<R, Err>>,
{
type Output = Result<R, Err>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.inner.as_mut().poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(result)) => Poll::Ready(Ok(result)),
Poll::Ready(Err(error)) => self.handle_err(cx, error),
}
}
}

#[cfg(test)]
mod tests {
use std::{future, sync::Mutex};

use tokio::time::Instant;

use super::*;

#[tokio::test]
async fn should_return_ok_when_the_internal_future_returns_ok_immediately() {
let fut = with_retry(
|| future::ready(Ok::<(), ()>(())),
RetryPolicy::RepeatConstant {
sleep: Duration::from_secs(1),
max_attempts: 3,
},
);
let start = Instant::now();

assert!(fut.await.is_ok());
assert!(start.elapsed() < Duration::from_secs(1));
}

#[tokio::test(start_paused = true)]
async fn should_return_ok_when_the_internal_future_returns_ok_eventually() {
let max_attempts = 3;
let count = Mutex::new(0);
let fut = with_retry(
|| async {
*count.lock().unwrap() += 1;
time::sleep(Duration::from_secs(1)).await;

if *count.lock().unwrap() < max_attempts - 1 {
Err::<(), ()>(())
} else {
Ok::<(), ()>(())
}
},
RetryPolicy::RepeatConstant {
sleep: Duration::from_secs(1),
max_attempts,
},
);
let start = Instant::now();

assert!(fut.await.is_ok());
assert!(start.elapsed() >= Duration::from_secs(3));
assert!(start.elapsed() < Duration::from_secs(4));
}

#[tokio::test(start_paused = true)]
async fn should_return_error_when_the_internal_future_returns_error_after_max_attempts() {
let fut = with_retry(
|| future::ready(Err::<(), ()>(())),
RetryPolicy::RepeatConstant {
sleep: Duration::from_secs(1),
max_attempts: 3,
},
);
let start = Instant::now();

assert!(fut.await.is_err());
assert!(start.elapsed() >= Duration::from_secs(2));
}
}
1 change: 1 addition & 0 deletions ampd/src/asyncutil/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod future;
pub mod task;
41 changes: 28 additions & 13 deletions ampd/src/event_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@ use std::pin::Pin;
use std::time::Duration;

use async_trait::async_trait;
use axelar_wasm_std::utils::InspectorResult;
use error_stack::{Context, Result, ResultExt};
use events::Event;
use futures::StreamExt;
use report::LoggableError;
use thiserror::Error;
use tokio::time::timeout;
use tokio_stream::Stream;
use tokio_util::sync::CancellationToken;
use tracing::warn;
use valuable::Valuable;

use crate::asyncutil::future::{self, RetryPolicy};
use crate::asyncutil::task::TaskError;

use crate::handlers::chain;

#[async_trait]
Expand All @@ -31,8 +35,6 @@ pub trait EventHandler {

#[derive(Error, Debug)]
pub enum Error {
#[error("handler failed to process event")]
Handler,
#[error("could not consume events from stream")]
EventStream,
#[error("handler stopped prematurely")]
Expand Down Expand Up @@ -60,7 +62,22 @@ where
.change_context(Error::EventStream)?;

if let StreamStatus::Active(event) = &stream_status {
handler.handle(event).await.change_context(Error::Handler)?;
// if handlers run into errors we log them and then move on to the next event
let _ = future::with_retry(
|| handler.handle(event),
// TODO: make timeout and max_attempts configurable
RetryPolicy::RepeatConstant {
sleep: Duration::from_secs(1),
max_attempts: 3,
},
)
.await
.tap_err(|err| {
warn!(
err = LoggableError::from(err).as_value(),
"handler failed to process event {}", event,
)
});
}

if should_task_stop(stream_status, &token) {
Expand Down Expand Up @@ -171,21 +188,19 @@ mod tests {
assert!(result_with_timeout.unwrap().is_err());
}

#[tokio::test]
async fn return_error_when_handler_fails() {
let events: Vec<Result<Event, event_processor::Error>> = vec![
Ok(Event::BlockEnd(0_u32.into())),
Ok(Event::BlockEnd(1_u32.into())),
];
#[tokio::test(start_paused = true)]
async fn return_ok_when_handler_fails() {
let events: Vec<Result<Event, event_processor::Error>> =
vec![Ok(Event::BlockEnd(0_u32.into()))];

let mut handler = MockEventHandler::new();
handler
.expect_handle()
.times(1)
.times(3)
.returning(|_| Err(report!(EventHandlerError::Failed)));

let result_with_timeout = timeout(
Duration::from_secs(1),
Duration::from_secs(3),
consume_events(
handler,
stream::iter(events),
Expand All @@ -196,7 +211,7 @@ mod tests {
.await;

assert!(result_with_timeout.is_ok());
assert!(result_with_timeout.unwrap().is_err());
assert!(result_with_timeout.unwrap().is_ok());
}

#[tokio::test]
Expand Down
20 changes: 20 additions & 0 deletions packages/events/src/event.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::fmt::Display;

use axelar_wasm_std::FnExt;
use base64::engine::general_purpose::STANDARD;
use base64::Engine;
Expand All @@ -21,6 +23,24 @@ pub enum Event {
},
}

impl Display for Event {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Event::BlockBegin(height) => write!(f, "BlockBegin({})", height),
Event::BlockEnd(height) => write!(f, "BlockEnd({})", height),
Event::Abci {
event_type,
attributes,
} => write!(
f,
"Abci {{ event_type: {}, attributes: {} }}",
event_type,
serde_json::to_string(attributes).expect("event attributes must be serializable")
),
}
}
}

impl Event {
pub fn block_begin(height: impl Into<block::Height>) -> Self {
Event::BlockBegin(height.into())
Expand Down

0 comments on commit 96eadd5

Please sign in to comment.