Skip to content

Commit

Permalink
feat(ampd): retry fetching block height (#543)
Browse files Browse the repository at this point in the history
  • Loading branch information
cjcobb23 authored Jul 25, 2024
1 parent 852e68b commit 0ac9bde
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 19 deletions.
22 changes: 7 additions & 15 deletions ampd/src/event_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use tokio_stream::Stream;
use tokio_util::sync::CancellationToken;
use tracing::info;

use crate::asyncutil::future::{self, RetryPolicy};
use crate::tm_client::TmClient;

#[automock]
Expand Down Expand Up @@ -125,20 +124,13 @@ impl<T: TmClient + Sync> EventPublisher<T> {
}

async fn events(&self, block_height: block::Height) -> Result<Vec<Event>, EventSubError> {
let block_results = future::with_retry(
|| {
self.tm_client.block_results(block_height).change_context(
EventSubError::EventQuery {
block: block_height,
},
)
},
RetryPolicy::RepeatConstant {
sleep: Duration::from_secs(1),
max_attempts: 15,
},
)
.await?;
let block_results = self
.tm_client
.block_results(block_height)
.change_context(EventSubError::EventQuery {
block: block_height,
})
.await?;

let begin_block_events = block_results.begin_block_events.into_iter().flatten();
let tx_events = block_results
Expand Down
26 changes: 22 additions & 4 deletions ampd/src/tm_client.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use std::time::Duration;

use async_trait::async_trait;
use error_stack::{Report, Result};
use mockall::automock;
use tendermint::block::Height;
use tendermint_rpc::{Client, HttpClient};

use crate::asyncutil::future::{self, RetryPolicy};

pub type BlockResultsResponse = tendermint_rpc::endpoint::block_results::Response;
pub type BlockResponse = tendermint_rpc::endpoint::block::Response;
pub type Error = tendermint_rpc::Error;
Expand All @@ -18,12 +22,26 @@ pub trait TmClient {
#[async_trait]
impl TmClient for HttpClient {
async fn latest_block(&self) -> Result<BlockResponse, Error> {
Client::latest_block(self).await.map_err(Report::from)
future::with_retry(
|| Client::latest_block(self),
RetryPolicy::RepeatConstant {
sleep: Duration::from_secs(1),
max_attempts: 15,
},
)
.await
.map_err(Report::from)
}

async fn block_results(&self, height: Height) -> Result<BlockResultsResponse, Error> {
Client::block_results(self, height)
.await
.map_err(Report::from)
future::with_retry(
|| Client::block_results(self, height),
RetryPolicy::RepeatConstant {
sleep: Duration::from_secs(1),
max_attempts: 15,
},
)
.await
.map_err(Report::from)
}
}

0 comments on commit 0ac9bde

Please sign in to comment.