From 93c0029c75571ef1316867c212dcb00d792f7ff3 Mon Sep 17 00:00:00 2001 From: Pavlo Khrystenko Date: Wed, 29 May 2024 14:42:11 +0200 Subject: [PATCH 1/3] Ensures poll order wrt subscription ID's closes #1567 --- Cargo.lock | 30 +++++------ .../backend/unstable/follow_stream_driver.rs | 54 ++++++++++++++----- subxt/src/backend/unstable/mod.rs | 38 +++++++++++-- subxt/src/backend/utils.rs | 16 ------ 4 files changed, 89 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d7510c044e..aacc922a3d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -327,7 +327,7 @@ checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" [[package]] name = "artifacts" -version = "0.36.1" +version = "0.37.0" dependencies = [ "substrate-runner", ] @@ -1812,7 +1812,7 @@ dependencies = [ [[package]] name = "generate-custom-metadata" -version = "0.36.1" +version = "0.37.0" dependencies = [ "frame-metadata 16.0.0", "parity-scale-codec", @@ -2309,7 +2309,7 @@ dependencies = [ [[package]] name = "integration-tests" -version = "0.36.1" +version = "0.37.0" dependencies = [ "assert_matches", "cfg_aliases", @@ -4745,7 +4745,7 @@ dependencies = [ [[package]] name = "substrate-runner" -version = "0.36.1" +version = "0.37.0" [[package]] name = "subtle" @@ -4755,7 +4755,7 @@ checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" [[package]] name = "subxt" -version = "0.36.1" +version = "0.37.0" dependencies = [ "assert_matches", "async-trait", @@ -4798,7 +4798,7 @@ dependencies = [ [[package]] name = "subxt-cli" -version = "0.36.1" +version = "0.37.0" dependencies = [ "clap 4.5.4", "color-eyre", @@ -4827,7 +4827,7 @@ dependencies = [ [[package]] name = "subxt-codegen" -version = "0.36.1" +version = "0.37.0" dependencies = [ "frame-metadata 16.0.0", "getrandom", @@ -4847,7 +4847,7 @@ dependencies = [ [[package]] name = "subxt-core" -version = "0.36.1" +version = "0.37.0" dependencies = [ "assert_matches", "base58", @@ -4879,7 +4879,7 @@ dependencies = [ [[package]] name = "subxt-lightclient" -version = "0.36.1" +version = "0.37.0" dependencies = [ "futures", "futures-timer", @@ -4904,7 +4904,7 @@ dependencies = [ [[package]] name = "subxt-macro" -version = "0.36.1" +version = "0.37.0" dependencies = [ "darling 0.20.8", "parity-scale-codec", @@ -4917,7 +4917,7 @@ dependencies = [ [[package]] name = "subxt-metadata" -version = "0.36.1" +version = "0.37.0" dependencies = [ "assert_matches", "bitvec", @@ -4931,7 +4931,7 @@ dependencies = [ [[package]] name = "subxt-signer" -version = "0.36.1" +version = "0.37.0" dependencies = [ "bip32", "bip39", @@ -4958,7 +4958,7 @@ dependencies = [ [[package]] name = "subxt-test-macro" -version = "0.36.1" +version = "0.37.0" dependencies = [ "quote", "syn 2.0.60", @@ -5021,7 +5021,7 @@ dependencies = [ [[package]] name = "test-runtime" -version = "0.36.1" +version = "0.37.0" dependencies = [ "hex", "impl-serde", @@ -5446,7 +5446,7 @@ checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" [[package]] name = "ui-tests" -version = "0.36.1" +version = "0.37.0" dependencies = [ "frame-metadata 16.0.0", "generate-custom-metadata", diff --git a/subxt/src/backend/unstable/follow_stream_driver.rs b/subxt/src/backend/unstable/follow_stream_driver.rs index a9bd508990..aab13d521e 100644 --- a/subxt/src/backend/unstable/follow_stream_driver.rs +++ b/subxt/src/backend/unstable/follow_stream_driver.rs @@ -72,7 +72,12 @@ pub struct FollowStreamDriverHandle { impl FollowStreamDriverHandle { /// Subscribe to follow events. pub fn subscribe(&self) -> FollowStreamDriverSubscription { - self.shared.subscribe() + self.shared.subscribe(true) + } + + /// Returns if Followstream has reconnected + pub async fn reconnected(&self) { + self.shared.subscribe(false).reconnected().await; } } @@ -137,6 +142,24 @@ impl FollowStreamDriverSubscription { } } + /// Returns if the backend has reconnected + pub async fn reconnected(self) -> bool { + let ready_event = self + .skip_while(|ev| { + std::future::ready(!matches!( + ev, + FollowStreamMsg::Event(FollowEvent::Initialized(_)) + )) + }) + .next() + .await; + + matches!( + ready_event, + Some(FollowStreamMsg::Event(FollowEvent::Initialized(_))) + ) + } + /// Subscribe to the follow events, ignoring any other messages. pub fn events(self) -> impl Stream>> + Send + Sync { self.filter_map(|ev| std::future::ready(ev.into_event())) @@ -145,7 +168,7 @@ impl FollowStreamDriverSubscription { impl Clone for FollowStreamDriverSubscription { fn clone(&self) -> Self { - self.shared.subscribe() + self.shared.subscribe(true) } } @@ -330,7 +353,10 @@ impl Shared { } /// Create a new subscription. - pub fn subscribe(&self) -> FollowStreamDriverSubscription { + pub fn subscribe( + &self, + insert_subscription_data: bool, + ) -> FollowStreamDriverSubscription { let mut shared = self.0.lock().unwrap(); let id = shared.next_id; @@ -349,16 +375,18 @@ impl Shared { // it means the subscription is currently stopped, and we should expect new Ready/Init // messages anyway once it restarts. let mut local_items = VecDeque::new(); - if let Some(sub_id) = &shared.current_subscription_id { - local_items.push_back(FollowStreamMsg::Ready(sub_id.clone())); - } - if let Some(init_msg) = &shared.current_init_message { - local_items.push_back(FollowStreamMsg::Event(FollowEvent::Initialized( - init_msg.clone(), - ))); - } - for ev in &shared.block_events_for_new_subscriptions { - local_items.push_back(FollowStreamMsg::Event(ev.clone())); + if insert_subscription_data { + if let Some(sub_id) = &shared.current_subscription_id { + local_items.push_back(FollowStreamMsg::Ready(sub_id.clone())); + } + if let Some(init_msg) = &shared.current_init_message { + local_items.push_back(FollowStreamMsg::Event(FollowEvent::Initialized( + init_msg.clone(), + ))); + } + for ev in &shared.block_events_for_new_subscriptions { + local_items.push_back(FollowStreamMsg::Event(ev.clone())); + } } drop(shared); diff --git a/subxt/src/backend/unstable/mod.rs b/subxt/src/backend/unstable/mod.rs index ead0af1d9e..7a6b9ca765 100644 --- a/subxt/src/backend/unstable/mod.rs +++ b/subxt/src/backend/unstable/mod.rs @@ -32,7 +32,7 @@ use crate::Config; use async_trait::async_trait; use follow_stream_driver::{FollowStreamDriver, FollowStreamDriverHandle}; use futures::future::Either; -use futures::{Stream, StreamExt}; +use futures::{pin_mut, Future, FutureExt, Stream, StreamExt}; use std::collections::HashMap; use std::task::Poll; use storage_items::StorageItems; @@ -302,7 +302,7 @@ impl Backend for UnstableBackend { } async fn block_header(&self, at: T::Hash) -> Result, Error> { - retry(|| async { + retry_with_reset_on_reconnect(&self.follow_handle, || async { let sub_id = get_subscription_id(&self.follow_handle).await?; self.methods.chainhead_v1_header(&sub_id, at).await }) @@ -310,7 +310,7 @@ impl Backend for UnstableBackend { } async fn block_body(&self, at: T::Hash) -> Result>>, Error> { - retry(|| async { + retry_with_reset_on_reconnect(&self.follow_handle, || async { let sub_id = get_subscription_id(&self.follow_handle).await?; // Subscribe to the body response and get our operationId back. @@ -669,9 +669,8 @@ impl Backend for UnstableBackend { call_parameters: Option<&[u8]>, at: T::Hash, ) -> Result, Error> { - retry(|| async { + retry_with_reset_on_reconnect(&self.follow_handle, || async { let sub_id = get_subscription_id(&self.follow_handle).await?; - // Subscribe to the body response and get our operationId back. let follow_events = self.follow_handle.subscribe().events(); let call_parameters = call_parameters.unwrap_or(&[]); @@ -716,3 +715,32 @@ async fn get_subscription_id( Ok(sub_id) } + +/// A helper to restart calls on subscription reconnect. +async fn retry_with_reset_on_reconnect( + follow_handle: &FollowStreamDriverHandle, + mut fun: F, +) -> Result +where + Hash: BlockHash, + F: FnMut() -> T, + T: Future>, +{ + loop { + let reconnected = follow_handle.reconnected().fuse(); + let action = retry(|| fun()).fuse(); + + pin_mut!(reconnected, action); + + let result = futures::future::select(reconnected, action).await; + match result { + Either::Left((_, _)) => (), + Either::Right((result, reset)) => { + let is_reconnected = reset.now_or_never().is_some(); + if !is_reconnected { + break result; + } + } + } + } +} diff --git a/subxt/src/backend/utils.rs b/subxt/src/backend/utils.rs index e8587734ba..8a6fd72247 100644 --- a/subxt/src/backend/utils.rs +++ b/subxt/src/backend/utils.rs @@ -106,9 +106,6 @@ where F: FnMut() -> T, T: Future>, { - const REJECTED_MAX_RETRIES: usize = 10; - let mut rejected_retries = 0; - loop { match retry_future().await { Ok(v) => return Ok(v), @@ -117,19 +114,6 @@ where continue; } - // TODO: https://github.com/paritytech/subxt/issues/1567 - // This is a hack because if a reconnection occurs - // the order of pending calls is not guaranteed. - // - // Such that it's possible the a pending future completes - // before `chainHead_follow` is established with fresh - // subscription id. - // - if e.is_rejected() && rejected_retries < REJECTED_MAX_RETRIES { - rejected_retries += 1; - continue; - } - return Err(e); } } From ccf9146e848739144814070fb31d861cd82b1fc4 Mon Sep 17 00:00:00 2001 From: Pavlo Khrystenko Date: Wed, 29 May 2024 16:52:27 +0200 Subject: [PATCH 2/3] fix clippy --- subxt/src/backend/unstable/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/subxt/src/backend/unstable/mod.rs b/subxt/src/backend/unstable/mod.rs index 7a6b9ca765..f59c11a943 100644 --- a/subxt/src/backend/unstable/mod.rs +++ b/subxt/src/backend/unstable/mod.rs @@ -728,7 +728,7 @@ where { loop { let reconnected = follow_handle.reconnected().fuse(); - let action = retry(|| fun()).fuse(); + let action = retry(&mut fun).fuse(); pin_mut!(reconnected, action); From b1e55e0f4bf14da9cf8533c86756ca547442daec Mon Sep 17 00:00:00 2001 From: Pavlo Khrystenko Date: Wed, 29 May 2024 18:24:20 +0200 Subject: [PATCH 3/3] use stream of bools to track whether the connection is stopped/initialized --- .../backend/unstable/follow_stream_driver.rs | 33 ++++++------ subxt/src/backend/unstable/mod.rs | 52 ++++++++++++++++--- 2 files changed, 60 insertions(+), 25 deletions(-) diff --git a/subxt/src/backend/unstable/follow_stream_driver.rs b/subxt/src/backend/unstable/follow_stream_driver.rs index aab13d521e..5fc69c986d 100644 --- a/subxt/src/backend/unstable/follow_stream_driver.rs +++ b/subxt/src/backend/unstable/follow_stream_driver.rs @@ -76,8 +76,8 @@ impl FollowStreamDriverHandle { } /// Returns if Followstream has reconnected - pub async fn reconnected(&self) { - self.shared.subscribe(false).reconnected().await; + pub fn reconnected(&self) -> impl Stream { + self.shared.subscribe(false).reconnected() } } @@ -142,22 +142,19 @@ impl FollowStreamDriverSubscription { } } - /// Returns if the backend has reconnected - pub async fn reconnected(self) -> bool { - let ready_event = self - .skip_while(|ev| { - std::future::ready(!matches!( - ev, - FollowStreamMsg::Event(FollowEvent::Initialized(_)) - )) - }) - .next() - .await; - - matches!( - ready_event, - Some(FollowStreamMsg::Event(FollowEvent::Initialized(_))) - ) + /// Returns if the backend has reconnected/is reconnecting + pub fn reconnected(self) -> impl Stream { + self.filter_map(|ev| { + let result = match ev { + FollowStreamMsg::Ready(_) => None, + FollowStreamMsg::Event(ev) => match ev { + FollowEvent::Initialized(_) => Some(true), + FollowEvent::Stop => Some(false), + _ => None, + }, + }; + std::future::ready(result) + }) } /// Subscribe to the follow events, ignoring any other messages. diff --git a/subxt/src/backend/unstable/mod.rs b/subxt/src/backend/unstable/mod.rs index f59c11a943..623e6bd395 100644 --- a/subxt/src/backend/unstable/mod.rs +++ b/subxt/src/backend/unstable/mod.rs @@ -34,6 +34,7 @@ use follow_stream_driver::{FollowStreamDriver, FollowStreamDriverHandle}; use futures::future::Either; use futures::{pin_mut, Future, FutureExt, Stream, StreamExt}; use std::collections::HashMap; +use std::pin::Pin; use std::task::Poll; use storage_items::StorageItems; @@ -726,21 +727,58 @@ where F: FnMut() -> T, T: Future>, { + let reconnected = follow_handle.reconnected().fuse(); + pin_mut!(reconnected); + + async fn check_for_reconnect( + mut reconnected: Pin<&mut impl Stream>, + ) -> Result<(), Error> { + loop { + match reconnected.next().await { + Some(true) => { + break; + } + Some(false) => (), + None => { + return Err(RpcError::SubscriptionDropped.into()); + } + } + } + Ok(()) + } + loop { - let reconnected = follow_handle.reconnected().fuse(); let action = retry(&mut fun).fuse(); - pin_mut!(reconnected, action); + pin_mut!(action); - let result = futures::future::select(reconnected, action).await; + let result = futures::future::select(reconnected.next(), action).await; match result { - Either::Left((_, _)) => (), + // We reconnected and received FollowEvent::Initialized() + Either::Left((Some(has_reconnected), _)) if has_reconnected => {} + Either::Left((Some(_), _)) => { + // Wait until we see Initialized Event + check_for_reconnect(reconnected.as_mut()).await? + } Either::Right((result, reset)) => { - let is_reconnected = reset.now_or_never().is_some(); - if !is_reconnected { - break result; + let is_reconnected = reset.now_or_never(); + if is_reconnected.is_none() { + return result; } + let is_reconnected = is_reconnected.flatten(); + if let Some(has_reconnected) = is_reconnected { + // Wait until we see Initialized Event + if !has_reconnected { + check_for_reconnect(reconnected.as_mut()).await? + } + } else { + break; + } + } + Either::Left((None, _)) => { + break; } } } + Err(RpcError::SubscriptionDropped.into()) }