diff --git a/ampd/src/event_sub.rs b/ampd/src/event_sub.rs index d7393ab9e..512b0ee4c 100644 --- a/ampd/src/event_sub.rs +++ b/ampd/src/event_sub.rs @@ -15,7 +15,6 @@ use tokio_stream::Stream; use tokio_util::sync::CancellationToken; use tracing::info; -use crate::asyncutil::future::{self, RetryPolicy}; use crate::tm_client::TmClient; #[automock] @@ -125,20 +124,13 @@ impl EventPublisher { } async fn events(&self, block_height: block::Height) -> Result, EventSubError> { - let block_results = future::with_retry( - || { - self.tm_client.block_results(block_height).change_context( - EventSubError::EventQuery { - block: block_height, - }, - ) - }, - RetryPolicy::RepeatConstant { - sleep: Duration::from_secs(1), - max_attempts: 15, - }, - ) - .await?; + let block_results = self + .tm_client + .block_results(block_height) + .change_context(EventSubError::EventQuery { + block: block_height, + }) + .await?; let begin_block_events = block_results.begin_block_events.into_iter().flatten(); let tx_events = block_results diff --git a/ampd/src/tm_client.rs b/ampd/src/tm_client.rs index 44f91b475..fddf7feca 100644 --- a/ampd/src/tm_client.rs +++ b/ampd/src/tm_client.rs @@ -1,9 +1,13 @@ +use std::time::Duration; + use async_trait::async_trait; use error_stack::{Report, Result}; use mockall::automock; use tendermint::block::Height; use tendermint_rpc::{Client, HttpClient}; +use crate::asyncutil::future::{self, RetryPolicy}; + pub type BlockResultsResponse = tendermint_rpc::endpoint::block_results::Response; pub type BlockResponse = tendermint_rpc::endpoint::block::Response; pub type Error = tendermint_rpc::Error; @@ -18,12 +22,26 @@ pub trait TmClient { #[async_trait] impl TmClient for HttpClient { async fn latest_block(&self) -> Result { - Client::latest_block(self).await.map_err(Report::from) + future::with_retry( + || Client::latest_block(self), + RetryPolicy::RepeatConstant { + sleep: Duration::from_secs(1), + max_attempts: 15, + }, + ) + .await + .map_err(Report::from) } async fn block_results(&self, height: Height) -> Result { - Client::block_results(self, height) - .await - .map_err(Report::from) + future::with_retry( + || Client::block_results(self, height), + RetryPolicy::RepeatConstant { + sleep: Duration::from_secs(1), + max_attempts: 15, + }, + ) + .await + .map_err(Report::from) } }