diff --git a/Cargo.lock b/Cargo.lock index d7510c044e..aacc922a3d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -327,7 +327,7 @@ checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" [[package]] name = "artifacts" -version = "0.36.1" +version = "0.37.0" dependencies = [ "substrate-runner", ] @@ -1812,7 +1812,7 @@ dependencies = [ [[package]] name = "generate-custom-metadata" -version = "0.36.1" +version = "0.37.0" dependencies = [ "frame-metadata 16.0.0", "parity-scale-codec", @@ -2309,7 +2309,7 @@ dependencies = [ [[package]] name = "integration-tests" -version = "0.36.1" +version = "0.37.0" dependencies = [ "assert_matches", "cfg_aliases", @@ -4745,7 +4745,7 @@ dependencies = [ [[package]] name = "substrate-runner" -version = "0.36.1" +version = "0.37.0" [[package]] name = "subtle" @@ -4755,7 +4755,7 @@ checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" [[package]] name = "subxt" -version = "0.36.1" +version = "0.37.0" dependencies = [ "assert_matches", "async-trait", @@ -4798,7 +4798,7 @@ dependencies = [ [[package]] name = "subxt-cli" -version = "0.36.1" +version = "0.37.0" dependencies = [ "clap 4.5.4", "color-eyre", @@ -4827,7 +4827,7 @@ dependencies = [ [[package]] name = "subxt-codegen" -version = "0.36.1" +version = "0.37.0" dependencies = [ "frame-metadata 16.0.0", "getrandom", @@ -4847,7 +4847,7 @@ dependencies = [ [[package]] name = "subxt-core" -version = "0.36.1" +version = "0.37.0" dependencies = [ "assert_matches", "base58", @@ -4879,7 +4879,7 @@ dependencies = [ [[package]] name = "subxt-lightclient" -version = "0.36.1" +version = "0.37.0" dependencies = [ "futures", "futures-timer", @@ -4904,7 +4904,7 @@ dependencies = [ [[package]] name = "subxt-macro" -version = "0.36.1" +version = "0.37.0" dependencies = [ "darling 0.20.8", "parity-scale-codec", @@ -4917,7 +4917,7 @@ dependencies = [ [[package]] name = "subxt-metadata" -version = "0.36.1" +version = "0.37.0" dependencies = [ "assert_matches", "bitvec", @@ -4931,7 +4931,7 @@ dependencies = [ [[package]] name = "subxt-signer" -version = "0.36.1" +version = "0.37.0" dependencies = [ "bip32", "bip39", @@ -4958,7 +4958,7 @@ dependencies = [ [[package]] name = "subxt-test-macro" -version = "0.36.1" +version = "0.37.0" dependencies = [ "quote", "syn 2.0.60", @@ -5021,7 +5021,7 @@ dependencies = [ [[package]] name = "test-runtime" -version = "0.36.1" +version = "0.37.0" dependencies = [ "hex", "impl-serde", @@ -5446,7 +5446,7 @@ checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" [[package]] name = "ui-tests" -version = "0.36.1" +version = "0.37.0" dependencies = [ "frame-metadata 16.0.0", "generate-custom-metadata", diff --git a/subxt/src/backend/unstable/follow_stream_driver.rs b/subxt/src/backend/unstable/follow_stream_driver.rs index a9bd508990..5fc69c986d 100644 --- a/subxt/src/backend/unstable/follow_stream_driver.rs +++ b/subxt/src/backend/unstable/follow_stream_driver.rs @@ -72,7 +72,12 @@ pub struct FollowStreamDriverHandle { impl FollowStreamDriverHandle { /// Subscribe to follow events. pub fn subscribe(&self) -> FollowStreamDriverSubscription { - self.shared.subscribe() + self.shared.subscribe(true) + } + + /// Returns if Followstream has reconnected + pub fn reconnected(&self) -> impl Stream { + self.shared.subscribe(false).reconnected() } } @@ -137,6 +142,21 @@ impl FollowStreamDriverSubscription { } } + /// Returns if the backend has reconnected/is reconnecting + pub fn reconnected(self) -> impl Stream { + self.filter_map(|ev| { + let result = match ev { + FollowStreamMsg::Ready(_) => None, + FollowStreamMsg::Event(ev) => match ev { + FollowEvent::Initialized(_) => Some(true), + FollowEvent::Stop => Some(false), + _ => None, + }, + }; + std::future::ready(result) + }) + } + /// Subscribe to the follow events, ignoring any other messages. pub fn events(self) -> impl Stream>> + Send + Sync { self.filter_map(|ev| std::future::ready(ev.into_event())) @@ -145,7 +165,7 @@ impl FollowStreamDriverSubscription { impl Clone for FollowStreamDriverSubscription { fn clone(&self) -> Self { - self.shared.subscribe() + self.shared.subscribe(true) } } @@ -330,7 +350,10 @@ impl Shared { } /// Create a new subscription. - pub fn subscribe(&self) -> FollowStreamDriverSubscription { + pub fn subscribe( + &self, + insert_subscription_data: bool, + ) -> FollowStreamDriverSubscription { let mut shared = self.0.lock().unwrap(); let id = shared.next_id; @@ -349,16 +372,18 @@ impl Shared { // it means the subscription is currently stopped, and we should expect new Ready/Init // messages anyway once it restarts. let mut local_items = VecDeque::new(); - if let Some(sub_id) = &shared.current_subscription_id { - local_items.push_back(FollowStreamMsg::Ready(sub_id.clone())); - } - if let Some(init_msg) = &shared.current_init_message { - local_items.push_back(FollowStreamMsg::Event(FollowEvent::Initialized( - init_msg.clone(), - ))); - } - for ev in &shared.block_events_for_new_subscriptions { - local_items.push_back(FollowStreamMsg::Event(ev.clone())); + if insert_subscription_data { + if let Some(sub_id) = &shared.current_subscription_id { + local_items.push_back(FollowStreamMsg::Ready(sub_id.clone())); + } + if let Some(init_msg) = &shared.current_init_message { + local_items.push_back(FollowStreamMsg::Event(FollowEvent::Initialized( + init_msg.clone(), + ))); + } + for ev in &shared.block_events_for_new_subscriptions { + local_items.push_back(FollowStreamMsg::Event(ev.clone())); + } } drop(shared); diff --git a/subxt/src/backend/unstable/mod.rs b/subxt/src/backend/unstable/mod.rs index ead0af1d9e..623e6bd395 100644 --- a/subxt/src/backend/unstable/mod.rs +++ b/subxt/src/backend/unstable/mod.rs @@ -32,8 +32,9 @@ use crate::Config; use async_trait::async_trait; use follow_stream_driver::{FollowStreamDriver, FollowStreamDriverHandle}; use futures::future::Either; -use futures::{Stream, StreamExt}; +use futures::{pin_mut, Future, FutureExt, Stream, StreamExt}; use std::collections::HashMap; +use std::pin::Pin; use std::task::Poll; use storage_items::StorageItems; @@ -302,7 +303,7 @@ impl Backend for UnstableBackend { } async fn block_header(&self, at: T::Hash) -> Result, Error> { - retry(|| async { + retry_with_reset_on_reconnect(&self.follow_handle, || async { let sub_id = get_subscription_id(&self.follow_handle).await?; self.methods.chainhead_v1_header(&sub_id, at).await }) @@ -310,7 +311,7 @@ impl Backend for UnstableBackend { } async fn block_body(&self, at: T::Hash) -> Result>>, Error> { - retry(|| async { + retry_with_reset_on_reconnect(&self.follow_handle, || async { let sub_id = get_subscription_id(&self.follow_handle).await?; // Subscribe to the body response and get our operationId back. @@ -669,9 +670,8 @@ impl Backend for UnstableBackend { call_parameters: Option<&[u8]>, at: T::Hash, ) -> Result, Error> { - retry(|| async { + retry_with_reset_on_reconnect(&self.follow_handle, || async { let sub_id = get_subscription_id(&self.follow_handle).await?; - // Subscribe to the body response and get our operationId back. let follow_events = self.follow_handle.subscribe().events(); let call_parameters = call_parameters.unwrap_or(&[]); @@ -716,3 +716,69 @@ async fn get_subscription_id( Ok(sub_id) } + +/// A helper to restart calls on subscription reconnect. +async fn retry_with_reset_on_reconnect( + follow_handle: &FollowStreamDriverHandle, + mut fun: F, +) -> Result +where + Hash: BlockHash, + F: FnMut() -> T, + T: Future>, +{ + let reconnected = follow_handle.reconnected().fuse(); + pin_mut!(reconnected); + + async fn check_for_reconnect( + mut reconnected: Pin<&mut impl Stream>, + ) -> Result<(), Error> { + loop { + match reconnected.next().await { + Some(true) => { + break; + } + Some(false) => (), + None => { + return Err(RpcError::SubscriptionDropped.into()); + } + } + } + Ok(()) + } + + loop { + let action = retry(&mut fun).fuse(); + + pin_mut!(action); + + let result = futures::future::select(reconnected.next(), action).await; + match result { + // We reconnected and received FollowEvent::Initialized() + Either::Left((Some(has_reconnected), _)) if has_reconnected => {} + Either::Left((Some(_), _)) => { + // Wait until we see Initialized Event + check_for_reconnect(reconnected.as_mut()).await? + } + Either::Right((result, reset)) => { + let is_reconnected = reset.now_or_never(); + if is_reconnected.is_none() { + return result; + } + let is_reconnected = is_reconnected.flatten(); + if let Some(has_reconnected) = is_reconnected { + // Wait until we see Initialized Event + if !has_reconnected { + check_for_reconnect(reconnected.as_mut()).await? + } + } else { + break; + } + } + Either::Left((None, _)) => { + break; + } + } + } + Err(RpcError::SubscriptionDropped.into()) +} diff --git a/subxt/src/backend/utils.rs b/subxt/src/backend/utils.rs index e8587734ba..8a6fd72247 100644 --- a/subxt/src/backend/utils.rs +++ b/subxt/src/backend/utils.rs @@ -106,9 +106,6 @@ where F: FnMut() -> T, T: Future>, { - const REJECTED_MAX_RETRIES: usize = 10; - let mut rejected_retries = 0; - loop { match retry_future().await { Ok(v) => return Ok(v), @@ -117,19 +114,6 @@ where continue; } - // TODO: https://github.com/paritytech/subxt/issues/1567 - // This is a hack because if a reconnection occurs - // the order of pending calls is not guaranteed. - // - // Such that it's possible the a pending future completes - // before `chainHead_follow` is established with fresh - // subscription id. - // - if e.is_rejected() && rejected_retries < REJECTED_MAX_RETRIES { - rejected_retries += 1; - continue; - } - return Err(e); } }