diff --git a/ampd/src/asyncutil/future.rs b/ampd/src/asyncutil/future.rs new file mode 100644 index 000000000..9b693a740 --- /dev/null +++ b/ampd/src/asyncutil/future.rs @@ -0,0 +1,166 @@ +use futures::{Future, FutureExt}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Duration; + +use tokio::time; + +pub fn with_retry( + future: F, + policy: RetryPolicy, +) -> impl Future> +where + F: Fn() -> Fut, + Fut: Future>, +{ + RetriableFuture::new(future, policy) +} + +pub enum RetryPolicy { + RepeatConstant { sleep: Duration, max_attempts: u64 }, +} + +struct RetriableFuture +where + F: Fn() -> Fut, + Fut: Future>, +{ + future: F, + inner: Pin>, + policy: RetryPolicy, + err_count: u64, +} + +impl Unpin for RetriableFuture +where + F: Fn() -> Fut, + Fut: Future>, +{ +} + +impl RetriableFuture +where + F: Fn() -> Fut, + Fut: Future>, +{ + fn new(get_future: F, policy: RetryPolicy) -> Self { + let future = get_future(); + + Self { + future: get_future, + inner: Box::pin(future), + policy, + err_count: 0, + } + } + + fn handle_err( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + error: Err, + ) -> Poll> { + self.err_count += 1; + + match self.policy { + RetryPolicy::RepeatConstant { + sleep, + max_attempts, + } => { + if self.err_count >= max_attempts { + return Poll::Ready(Err(error)); + } + + self.inner = Box::pin((self.future)()); + + let waker = cx.waker().clone(); + tokio::spawn(time::sleep(sleep).then(|_| async { + waker.wake(); + })); + + Poll::Pending + } + } + } +} + +impl Future for RetriableFuture +where + F: Fn() -> Fut, + Fut: Future>, +{ + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.inner.as_mut().poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Ok(result)) => Poll::Ready(Ok(result)), + Poll::Ready(Err(error)) => self.handle_err(cx, error), + } + } +} + +#[cfg(test)] +mod tests { + use std::{future, sync::Mutex}; + + use tokio::time::Instant; + + use super::*; + + #[tokio::test] + async fn should_return_ok_when_the_internal_future_returns_ok_immediately() { + let fut = with_retry( + || future::ready(Ok::<(), ()>(())), + RetryPolicy::RepeatConstant { + sleep: Duration::from_secs(1), + max_attempts: 3, + }, + ); + let start = Instant::now(); + + assert!(fut.await.is_ok()); + assert!(start.elapsed() < Duration::from_secs(1)); + } + + #[tokio::test(start_paused = true)] + async fn should_return_ok_when_the_internal_future_returns_ok_eventually() { + let max_attempts = 3; + let count = Mutex::new(0); + let fut = with_retry( + || async { + *count.lock().unwrap() += 1; + time::sleep(Duration::from_secs(1)).await; + + if *count.lock().unwrap() < max_attempts - 1 { + Err::<(), ()>(()) + } else { + Ok::<(), ()>(()) + } + }, + RetryPolicy::RepeatConstant { + sleep: Duration::from_secs(1), + max_attempts, + }, + ); + let start = Instant::now(); + + assert!(fut.await.is_ok()); + assert!(start.elapsed() >= Duration::from_secs(3)); + assert!(start.elapsed() < Duration::from_secs(4)); + } + + #[tokio::test(start_paused = true)] + async fn should_return_error_when_the_internal_future_returns_error_after_max_attempts() { + let fut = with_retry( + || future::ready(Err::<(), ()>(())), + RetryPolicy::RepeatConstant { + sleep: Duration::from_secs(1), + max_attempts: 3, + }, + ); + let start = Instant::now(); + + assert!(fut.await.is_err()); + assert!(start.elapsed() >= Duration::from_secs(2)); + } +} diff --git a/ampd/src/asyncutil/mod.rs b/ampd/src/asyncutil/mod.rs index cdafe4ad6..a6c842d6e 100644 --- a/ampd/src/asyncutil/mod.rs +++ b/ampd/src/asyncutil/mod.rs @@ -1 +1,2 @@ +pub mod future; pub mod task; diff --git a/ampd/src/event_processor.rs b/ampd/src/event_processor.rs index a03decb88..fc168b84b 100644 --- a/ampd/src/event_processor.rs +++ b/ampd/src/event_processor.rs @@ -2,16 +2,20 @@ use std::pin::Pin; use std::time::Duration; use async_trait::async_trait; +use axelar_wasm_std::utils::InspectorResult; use error_stack::{Context, Result, ResultExt}; use events::Event; use futures::StreamExt; +use report::LoggableError; use thiserror::Error; use tokio::time::timeout; use tokio_stream::Stream; use tokio_util::sync::CancellationToken; +use tracing::warn; +use valuable::Valuable; +use crate::asyncutil::future::{self, RetryPolicy}; use crate::asyncutil::task::TaskError; - use crate::handlers::chain; #[async_trait] @@ -31,8 +35,6 @@ pub trait EventHandler { #[derive(Error, Debug)] pub enum Error { - #[error("handler failed to process event")] - Handler, #[error("could not consume events from stream")] EventStream, #[error("handler stopped prematurely")] @@ -60,7 +62,22 @@ where .change_context(Error::EventStream)?; if let StreamStatus::Active(event) = &stream_status { - handler.handle(event).await.change_context(Error::Handler)?; + // if handlers run into errors we log them and then move on to the next event + let _ = future::with_retry( + || handler.handle(event), + // TODO: make timeout and max_attempts configurable + RetryPolicy::RepeatConstant { + sleep: Duration::from_secs(1), + max_attempts: 3, + }, + ) + .await + .tap_err(|err| { + warn!( + err = LoggableError::from(err).as_value(), + "handler failed to process event {}", event, + ) + }); } if should_task_stop(stream_status, &token) { @@ -171,21 +188,19 @@ mod tests { assert!(result_with_timeout.unwrap().is_err()); } - #[tokio::test] - async fn return_error_when_handler_fails() { - let events: Vec> = vec![ - Ok(Event::BlockEnd(0_u32.into())), - Ok(Event::BlockEnd(1_u32.into())), - ]; + #[tokio::test(start_paused = true)] + async fn return_ok_when_handler_fails() { + let events: Vec> = + vec![Ok(Event::BlockEnd(0_u32.into()))]; let mut handler = MockEventHandler::new(); handler .expect_handle() - .times(1) + .times(3) .returning(|_| Err(report!(EventHandlerError::Failed))); let result_with_timeout = timeout( - Duration::from_secs(1), + Duration::from_secs(3), consume_events( handler, stream::iter(events), @@ -196,7 +211,7 @@ mod tests { .await; assert!(result_with_timeout.is_ok()); - assert!(result_with_timeout.unwrap().is_err()); + assert!(result_with_timeout.unwrap().is_ok()); } #[tokio::test] diff --git a/packages/events/src/event.rs b/packages/events/src/event.rs index f3e590803..d39ce5925 100644 --- a/packages/events/src/event.rs +++ b/packages/events/src/event.rs @@ -1,3 +1,5 @@ +use std::fmt::Display; + use axelar_wasm_std::FnExt; use base64::engine::general_purpose::STANDARD; use base64::Engine; @@ -21,6 +23,24 @@ pub enum Event { }, } +impl Display for Event { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Event::BlockBegin(height) => write!(f, "BlockBegin({})", height), + Event::BlockEnd(height) => write!(f, "BlockEnd({})", height), + Event::Abci { + event_type, + attributes, + } => write!( + f, + "Abci {{ event_type: {}, attributes: {} }}", + event_type, + serde_json::to_string(attributes).expect("event attributes must be serializable") + ), + } + } +} + impl Event { pub fn block_begin(height: impl Into) -> Self { Event::BlockBegin(height.into())