Skip to content

Commit

Permalink
fix(fortuna): watch blocks from start and infinite get queries (#1551)
Browse files Browse the repository at this point in the history
* fix: watch blocks from start and infinite get queries

* formatting

* fix

* undo small change
  • Loading branch information
Dev Kalra authored May 7, 2024
1 parent bf2c8b5 commit 77c68c5
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 17 deletions.
2 changes: 1 addition & 1 deletion apps/fortuna/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion apps/fortuna/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fortuna"
version = "5.2.1"
version = "5.2.2"
edition = "2021"

[dependencies]
Expand Down
46 changes: 31 additions & 15 deletions apps/fortuna/src/keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ use {
},
config::EthereumConfig,
},
anyhow::Result,
anyhow::{
anyhow,
Result,
},
ethers::{
contract::ContractError,
providers::{
Expand Down Expand Up @@ -64,10 +67,14 @@ async fn get_latest_safe_block(chain_state: &BlockchainState) -> BlockNumber {
.await
{
Ok(latest_confirmed_block) => {
return latest_confirmed_block - chain_state.reveal_delay_blocks
tracing::info!(
"Fetched latest safe block {}",
latest_confirmed_block - chain_state.reveal_delay_blocks
);
return latest_confirmed_block - chain_state.reveal_delay_blocks;
}
Err(e) => {
tracing::error!("error while getting block number. error: {:?}", e);
tracing::error!("Error while getting block number. error: {:?}", e);
time::sleep(RETRY_INTERVAL).await;
}
}
Expand Down Expand Up @@ -346,10 +353,11 @@ pub async fn watch_blocks_wrapper(
tx: mpsc::Sender<BlockRange>,
geth_rpc_wss: Option<String>,
) {
let mut last_safe_block_processed = latest_safe_block;
loop {
if let Err(e) = watch_blocks(
chain_state.clone(),
latest_safe_block,
&mut last_safe_block_processed,
tx.clone(),
geth_rpc_wss.clone(),
)
Expand All @@ -368,12 +376,11 @@ pub async fn watch_blocks_wrapper(
/// know about it.
pub async fn watch_blocks(
chain_state: BlockchainState,
latest_safe_block: BlockNumber,
last_safe_block_processed: &mut BlockNumber,
tx: mpsc::Sender<BlockRange>,
geth_rpc_wss: Option<String>,
) -> Result<()> {
tracing::info!("Watching blocks to handle new events");
let mut last_safe_block_processed = latest_safe_block;

let provider_option = match geth_rpc_wss {
Some(wss) => Some(match Provider::<Ws>::connect(wss.clone()).await {
Expand All @@ -390,42 +397,51 @@ pub async fn watch_blocks(
};

let mut stream_option = match provider_option {
Some(ref provider) => Some(provider.subscribe_blocks().await?),
Some(ref provider) => Some(match provider.subscribe_blocks().await {
Ok(client) => client,
Err(e) => {
tracing::error!("Error while subscribing to blocks. error {:?}", e);
return Err(e.into());
}
}),
None => None,
};

loop {
match stream_option {
Some(ref mut stream) => {
stream.next().await;
if let None = stream.next().await {
tracing::error!("Error blocks subscription stream ended");
return Err(anyhow!("Error blocks subscription stream ended"));
}
}
None => {
time::sleep(POLL_INTERVAL).await;
}
}

let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await;
if latest_safe_block > last_safe_block_processed {
if latest_safe_block > *last_safe_block_processed {
match tx
.send(BlockRange {
from: last_safe_block_processed + 1,
from: *last_safe_block_processed + 1,
to: latest_safe_block,
})
.await
{
Ok(_) => {
tracing::info!(
from_block = &last_safe_block_processed + 1,
from_block = *last_safe_block_processed + 1,
to_block = &latest_safe_block,
"Block range sent to handle events",
);
last_safe_block_processed = latest_safe_block;
*last_safe_block_processed = latest_safe_block;
}
Err(e) => {
tracing::error!(
"Error while sending block range to handle events. These will be handled in next call. error: {:?}",
e
);
"Error while sending block range to handle events. These will be handled in next call. error: {:?}",
e
);
}
};
}
Expand Down

0 comments on commit 77c68c5

Please sign in to comment.