diff --git a/.github/workflows/cont_integration.yml b/.github/workflows/cont_integration.yml index f793f7ad6..ef6fc8da1 100644 --- a/.github/workflows/cont_integration.yml +++ b/.github/workflows/cont_integration.yml @@ -130,3 +130,32 @@ jobs: with: token: ${{ secrets.GITHUB_TOKEN }} args: --all-features --all-targets -- -D warnings + + build-examples: + name: Build Examples + runs-on: ubuntu-latest + strategy: + matrix: + example-dir: + - example_cli + - example_bitcoind_rpc_polling + - example_electrum + - example_esplora + - wallet_electrum + - wallet_esplora_async + - wallet_esplora_blocking + - wallet_rpc + steps: + - name: checkout + uses: actions/checkout@v2 + - name: Install Rust toolchain + uses: actions-rs/toolchain@v1 + with: + toolchain: stable + override: true + profile: minimal + - name: Rust Cache + uses: Swatinem/rust-cache@v2.2.1 + - name: Build + working-directory: example-crates/${{ matrix.example-dir }} + run: cargo build diff --git a/crates/chain/src/lib.rs b/crates/chain/src/lib.rs index a756ab11c..3fb8c0eda 100644 --- a/crates/chain/src/lib.rs +++ b/crates/chain/src/lib.rs @@ -37,8 +37,6 @@ pub use tx_data_traits::*; pub use tx_graph::TxGraph; mod chain_oracle; pub use chain_oracle::*; -mod persist; -pub use persist::*; #[doc(hidden)] pub mod example_utils; diff --git a/crates/chain/src/persist.rs b/crates/chain/src/persist.rs deleted file mode 100644 index 2ec88f636..000000000 --- a/crates/chain/src/persist.rs +++ /dev/null @@ -1,169 +0,0 @@ -use core::{ - future::Future, - ops::{Deref, DerefMut}, - pin::Pin, -}; - -use alloc::boxed::Box; - -use crate::Merge; - -/// Represents a type that contains staged changes. -pub trait Staged { - /// Type for staged changes. - type ChangeSet: Merge; - - /// Get mutable reference of staged changes. - fn staged(&mut self) -> &mut Self::ChangeSet; -} - -/// Trait that persists the type with `Db`. -/// -/// Methods of this trait should not be called directly. -pub trait PersistWith: Staged + Sized { - /// Parameters for [`PersistWith::create`]. - type CreateParams; - /// Parameters for [`PersistWith::load`]. - type LoadParams; - /// Error type of [`PersistWith::create`]. - type CreateError; - /// Error type of [`PersistWith::load`]. - type LoadError; - /// Error type of [`PersistWith::persist`]. - type PersistError; - - /// Initialize the `Db` and create `Self`. - fn create(db: &mut Db, params: Self::CreateParams) -> Result; - - /// Initialize the `Db` and load a previously-persisted `Self`. - fn load(db: &mut Db, params: Self::LoadParams) -> Result, Self::LoadError>; - - /// Persist changes to the `Db`. - fn persist( - db: &mut Db, - changeset: &::ChangeSet, - ) -> Result<(), Self::PersistError>; -} - -type FutureResult<'a, T, E> = Pin> + Send + 'a>>; - -/// Trait that persists the type with an async `Db`. -pub trait PersistAsyncWith: Staged + Sized { - /// Parameters for [`PersistAsyncWith::create`]. - type CreateParams; - /// Parameters for [`PersistAsyncWith::load`]. - type LoadParams; - /// Error type of [`PersistAsyncWith::create`]. - type CreateError; - /// Error type of [`PersistAsyncWith::load`]. - type LoadError; - /// Error type of [`PersistAsyncWith::persist`]. - type PersistError; - - /// Initialize the `Db` and create `Self`. - fn create(db: &mut Db, params: Self::CreateParams) -> FutureResult; - - /// Initialize the `Db` and load a previously-persisted `Self`. - fn load(db: &mut Db, params: Self::LoadParams) -> FutureResult, Self::LoadError>; - - /// Persist changes to the `Db`. - fn persist<'a>( - db: &'a mut Db, - changeset: &'a ::ChangeSet, - ) -> FutureResult<'a, (), Self::PersistError>; -} - -/// Represents a persisted `T`. -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] -pub struct Persisted { - inner: T, -} - -impl Persisted { - /// Create a new persisted `T`. - pub fn create(db: &mut Db, params: T::CreateParams) -> Result - where - T: PersistWith, - { - T::create(db, params).map(|inner| Self { inner }) - } - - /// Create a new persisted `T` with async `Db`. - pub async fn create_async( - db: &mut Db, - params: T::CreateParams, - ) -> Result - where - T: PersistAsyncWith, - { - T::create(db, params).await.map(|inner| Self { inner }) - } - - /// Construct a persisted `T` from `Db`. - pub fn load(db: &mut Db, params: T::LoadParams) -> Result, T::LoadError> - where - T: PersistWith, - { - Ok(T::load(db, params)?.map(|inner| Self { inner })) - } - - /// Construct a persisted `T` from an async `Db`. - pub async fn load_async( - db: &mut Db, - params: T::LoadParams, - ) -> Result, T::LoadError> - where - T: PersistAsyncWith, - { - Ok(T::load(db, params).await?.map(|inner| Self { inner })) - } - - /// Persist staged changes of `T` into `Db`. - /// - /// If the database errors, the staged changes will not be cleared. - pub fn persist(&mut self, db: &mut Db) -> Result - where - T: PersistWith, - { - let stage = T::staged(&mut self.inner); - if stage.is_empty() { - return Ok(false); - } - T::persist(db, &*stage)?; - stage.take(); - Ok(true) - } - - /// Persist staged changes of `T` into an async `Db`. - /// - /// If the database errors, the staged changes will not be cleared. - pub async fn persist_async<'a, Db>( - &'a mut self, - db: &'a mut Db, - ) -> Result - where - T: PersistAsyncWith, - { - let stage = T::staged(&mut self.inner); - if stage.is_empty() { - return Ok(false); - } - T::persist(db, &*stage).await?; - stage.take(); - Ok(true) - } -} - -impl Deref for Persisted { - type Target = T; - - fn deref(&self) -> &Self::Target { - &self.inner - } -} - -impl DerefMut for Persisted { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.inner - } -} diff --git a/crates/chain/src/rusqlite_impl.rs b/crates/chain/src/rusqlite_impl.rs index a52c491c6..d8ef65c42 100644 --- a/crates/chain/src/rusqlite_impl.rs +++ b/crates/chain/src/rusqlite_impl.rs @@ -225,7 +225,7 @@ where pub const ANCHORS_TABLE_NAME: &'static str = "bdk_anchors"; /// Initialize sqlite tables. - fn init_sqlite_tables(db_tx: &rusqlite::Transaction) -> rusqlite::Result<()> { + pub fn init_sqlite_tables(db_tx: &rusqlite::Transaction) -> rusqlite::Result<()> { let schema_v0: &[&str] = &[ // full transactions &format!( @@ -264,9 +264,9 @@ where } /// Construct a [`TxGraph`] from an sqlite database. + /// + /// Remember to call [`Self::init_sqlite_tables`] beforehand. pub fn from_sqlite(db_tx: &rusqlite::Transaction) -> rusqlite::Result { - Self::init_sqlite_tables(db_tx)?; - let mut changeset = Self::default(); let mut statement = db_tx.prepare(&format!( @@ -332,9 +332,9 @@ where } /// Persist `changeset` to the sqlite database. + /// + /// Remember to call [`Self::init_sqlite_tables`] beforehand. pub fn persist_to_sqlite(&self, db_tx: &rusqlite::Transaction) -> rusqlite::Result<()> { - Self::init_sqlite_tables(db_tx)?; - let mut statement = db_tx.prepare_cached(&format!( "INSERT INTO {}(txid, raw_tx) VALUES(:txid, :raw_tx) ON CONFLICT(txid) DO UPDATE SET raw_tx=:raw_tx", Self::TXS_TABLE_NAME, @@ -396,7 +396,7 @@ impl local_chain::ChangeSet { pub const BLOCKS_TABLE_NAME: &'static str = "bdk_blocks"; /// Initialize sqlite tables for persisting [`local_chain::LocalChain`]. - fn init_sqlite_tables(db_tx: &rusqlite::Transaction) -> rusqlite::Result<()> { + pub fn init_sqlite_tables(db_tx: &rusqlite::Transaction) -> rusqlite::Result<()> { let schema_v0: &[&str] = &[ // blocks &format!( @@ -411,9 +411,9 @@ impl local_chain::ChangeSet { } /// Construct a [`LocalChain`](local_chain::LocalChain) from sqlite database. + /// + /// Remember to call [`Self::init_sqlite_tables`] beforehand. pub fn from_sqlite(db_tx: &rusqlite::Transaction) -> rusqlite::Result { - Self::init_sqlite_tables(db_tx)?; - let mut changeset = Self::default(); let mut statement = db_tx.prepare(&format!( @@ -435,9 +435,9 @@ impl local_chain::ChangeSet { } /// Persist `changeset` to the sqlite database. + /// + /// Remember to call [`Self::init_sqlite_tables`] beforehand. pub fn persist_to_sqlite(&self, db_tx: &rusqlite::Transaction) -> rusqlite::Result<()> { - Self::init_sqlite_tables(db_tx)?; - let mut replace_statement = db_tx.prepare_cached(&format!( "REPLACE INTO {}(block_height, block_hash) VALUES(:block_height, :block_hash)", Self::BLOCKS_TABLE_NAME, @@ -471,7 +471,7 @@ impl keychain_txout::ChangeSet { /// Initialize sqlite tables for persisting /// [`KeychainTxOutIndex`](keychain_txout::KeychainTxOutIndex). - fn init_sqlite_tables(db_tx: &rusqlite::Transaction) -> rusqlite::Result<()> { + pub fn init_sqlite_tables(db_tx: &rusqlite::Transaction) -> rusqlite::Result<()> { let schema_v0: &[&str] = &[ // last revealed &format!( @@ -487,9 +487,9 @@ impl keychain_txout::ChangeSet { /// Construct [`KeychainTxOutIndex`](keychain_txout::KeychainTxOutIndex) from sqlite database /// and given parameters. + /// + /// Remember to call [`Self::init_sqlite_tables`] beforehand. pub fn from_sqlite(db_tx: &rusqlite::Transaction) -> rusqlite::Result { - Self::init_sqlite_tables(db_tx)?; - let mut changeset = Self::default(); let mut statement = db_tx.prepare(&format!( @@ -511,9 +511,9 @@ impl keychain_txout::ChangeSet { } /// Persist `changeset` to the sqlite database. + /// + /// Remember to call [`Self::init_sqlite_tables`] beforehand. pub fn persist_to_sqlite(&self, db_tx: &rusqlite::Transaction) -> rusqlite::Result<()> { - Self::init_sqlite_tables(db_tx)?; - let mut statement = db_tx.prepare_cached(&format!( "REPLACE INTO {}(descriptor_id, last_revealed) VALUES(:descriptor_id, :last_revealed)", Self::LAST_REVEALED_TABLE_NAME, diff --git a/crates/chain/src/spk_client.rs b/crates/chain/src/spk_client.rs index 3457dfef7..567a8f0a9 100644 --- a/crates/chain/src/spk_client.rs +++ b/crates/chain/src/spk_client.rs @@ -1,388 +1,586 @@ //! Helper types for spk-based blockchain clients. - use crate::{ - collections::BTreeMap, local_chain::CheckPoint, ConfirmationBlockTime, Indexed, TxGraph, + alloc::{boxed::Box, collections::VecDeque, vec::Vec}, + collections::BTreeMap, + local_chain::CheckPoint, + ConfirmationBlockTime, Indexed, TxGraph, }; -use alloc::boxed::Box; use bitcoin::{OutPoint, Script, ScriptBuf, Txid}; -use core::marker::PhantomData; -/// Data required to perform a spk-based blockchain client sync. -/// -/// A client sync fetches relevant chain data for a known list of scripts, transaction ids and -/// outpoints. The sync process also updates the chain from the given [`CheckPoint`]. -pub struct SyncRequest { - /// A checkpoint for the current chain [`LocalChain::tip`]. - /// The sync process will return a new chain update that extends this tip. - /// - /// [`LocalChain::tip`]: crate::local_chain::LocalChain::tip - pub chain_tip: CheckPoint, - /// Transactions that spend from or to these indexed script pubkeys. - pub spks: Box + Send>, - /// Transactions with these txids. - pub txids: Box + Send>, - /// Transactions with these outpoints or spent from these outpoints. - pub outpoints: Box + Send>, -} - -impl SyncRequest { - /// Construct a new [`SyncRequest`] from a given `cp` tip. - pub fn from_chain_tip(cp: CheckPoint) -> Self { +type InspectSync = dyn FnMut(SyncItem, SyncProgress) + Send + 'static; + +type InspectFullScan = dyn FnMut(K, u32, &Script) + Send + 'static; + +/// An item reported to the [`inspect`](SyncRequestBuilder::inspect) closure of [`SyncRequest`]. +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum SyncItem<'i, I> { + /// Script pubkey sync item. + Spk(I, &'i Script), + /// Txid sync item. + Txid(Txid), + /// Outpoint sync item. + OutPoint(OutPoint), +} + +impl<'i, I: core::fmt::Debug + core::any::Any> core::fmt::Display for SyncItem<'i, I> { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + SyncItem::Spk(i, spk) => { + if (i as &dyn core::any::Any).is::<()>() { + write!(f, "script '{}'", spk) + } else { + write!(f, "script {:?} '{}'", i, spk) + } + } + SyncItem::Txid(txid) => write!(f, "txid '{}'", txid), + SyncItem::OutPoint(op) => write!(f, "outpoint '{}'", op), + } + } +} + +/// The progress of [`SyncRequest`]. +#[derive(Debug, Clone)] +pub struct SyncProgress { + /// Script pubkeys consumed by the request. + pub spks_consumed: usize, + /// Script pubkeys remaining in the request. + pub spks_remaining: usize, + /// Txids consumed by the request. + pub txids_consumed: usize, + /// Txids remaining in the request. + pub txids_remaining: usize, + /// Outpoints consumed by the request. + pub outpoints_consumed: usize, + /// Outpoints remaining in the request. + pub outpoints_remaining: usize, +} + +impl SyncProgress { + /// Total items, consumed and remaining, of the request. + pub fn total(&self) -> usize { + self.total_spks() + self.total_txids() + self.total_outpoints() + } + + /// Total script pubkeys, consumed and remaining, of the request. + pub fn total_spks(&self) -> usize { + self.spks_consumed + self.spks_remaining + } + + /// Total txids, consumed and remaining, of the request. + pub fn total_txids(&self) -> usize { + self.txids_consumed + self.txids_remaining + } + + /// Total outpoints, consumed and remaining, of the request. + pub fn total_outpoints(&self) -> usize { + self.outpoints_consumed + self.outpoints_remaining + } + + /// Total consumed items of the request. + pub fn consumed(&self) -> usize { + self.spks_consumed + self.txids_consumed + self.outpoints_consumed + } + + /// Total remaining items of the request. + pub fn remaining(&self) -> usize { + self.spks_remaining + self.txids_remaining + self.outpoints_remaining + } +} + +/// Builds a [`SyncRequest`]. +#[must_use] +pub struct SyncRequestBuilder { + inner: SyncRequest, +} + +impl Default for SyncRequestBuilder { + fn default() -> Self { Self { - chain_tip: cp, - spks: Box::new(core::iter::empty()), - txids: Box::new(core::iter::empty()), - outpoints: Box::new(core::iter::empty()), + inner: Default::default(), } } +} - /// Set the [`Script`]s that will be synced against. - /// - /// This consumes the [`SyncRequest`] and returns the updated one. - #[must_use] - pub fn set_spks( - mut self, - spks: impl IntoIterator + Send + 'static>, +#[cfg(feature = "miniscript")] +impl SyncRequestBuilder<(K, u32)> { + /// Add [`Script`]s that are revealed by the `indexer` of the given `spk_range` that will be + /// synced against. + pub fn revealed_spks_from_indexer( + self, + indexer: &crate::indexer::keychain_txout::KeychainTxOutIndex, + spk_range: impl core::ops::RangeBounds, ) -> Self { - self.spks = Box::new(spks.into_iter()); - self + self.spks_with_indexes(indexer.revealed_spks(spk_range)) } - /// Set the [`Txid`]s that will be synced against. - /// - /// This consumes the [`SyncRequest`] and returns the updated one. - #[must_use] - pub fn set_txids( - mut self, - txids: impl IntoIterator + Send + 'static>, + /// Add [`Script`]s that are revealed by the `indexer` but currently unused. + pub fn unused_spks_from_indexer( + self, + indexer: &crate::indexer::keychain_txout::KeychainTxOutIndex, ) -> Self { - self.txids = Box::new(txids.into_iter()); - self + self.spks_with_indexes(indexer.unused_spks()) + } +} + +impl SyncRequestBuilder<()> { + /// Add [`Script`]s that will be synced against. + pub fn spks(self, spks: impl IntoIterator) -> Self { + self.spks_with_indexes(spks.into_iter().map(|spk| ((), spk))) } +} - /// Set the [`OutPoint`]s that will be synced against. +impl SyncRequestBuilder { + /// Set the initial chain tip for the sync request. /// - /// This consumes the [`SyncRequest`] and returns the updated one. - #[must_use] - pub fn set_outpoints( - mut self, - outpoints: impl IntoIterator< - IntoIter = impl ExactSizeIterator + Send + 'static, - >, - ) -> Self { - self.outpoints = Box::new(outpoints.into_iter()); + /// This is used to update [`LocalChain`](crate::local_chain::LocalChain). + pub fn chain_tip(mut self, cp: CheckPoint) -> Self { + self.inner.chain_tip = Some(cp); self } - /// Chain on additional [`Script`]s that will be synced against. + /// Add [`Script`]s coupled with associated indexes that will be synced against. /// - /// This consumes the [`SyncRequest`] and returns the updated one. - #[must_use] - pub fn chain_spks( - mut self, - spks: impl IntoIterator< - IntoIter = impl ExactSizeIterator + Send + 'static, - Item = ScriptBuf, - >, - ) -> Self { - self.spks = Box::new(ExactSizeChain::new(self.spks, spks.into_iter())); + /// # Example + /// + /// Sync revealed script pubkeys obtained from a + /// [`KeychainTxOutIndex`](crate::keychain_txout::KeychainTxOutIndex). + /// + /// ```rust + /// # use bdk_chain::spk_client::SyncRequest; + /// # use bdk_chain::indexer::keychain_txout::KeychainTxOutIndex; + /// # use bdk_chain::miniscript::{Descriptor, DescriptorPublicKey}; + /// # let secp = bdk_chain::bitcoin::secp256k1::Secp256k1::signing_only(); + /// # let (descriptor_a,_) = Descriptor::::parse_descriptor(&secp, "tr([73c5da0a/86'/0'/0']xprv9xgqHN7yz9MwCkxsBPN5qetuNdQSUttZNKw1dcYTV4mkaAFiBVGQziHs3NRSWMkCzvgjEe3n9xV8oYywvM8at9yRqyaZVz6TYYhX98VjsUk/0/*)").unwrap(); + /// # let (descriptor_b,_) = Descriptor::::parse_descriptor(&secp, "tr([73c5da0a/86'/0'/0']xprv9xgqHN7yz9MwCkxsBPN5qetuNdQSUttZNKw1dcYTV4mkaAFiBVGQziHs3NRSWMkCzvgjEe3n9xV8oYywvM8at9yRqyaZVz6TYYhX98VjsUk/1/*)").unwrap(); + /// let mut indexer = KeychainTxOutIndex::<&'static str>::default(); + /// indexer.insert_descriptor("descriptor_a", descriptor_a)?; + /// indexer.insert_descriptor("descriptor_b", descriptor_b)?; + /// + /// /* Assume that the caller does more mutations to the `indexer` here... */ + /// + /// // Reveal spks for "descriptor_a", then build a sync request. Each spk will be indexed with + /// // `u32`, which represents the derivation index of the associated spk from "descriptor_a". + /// let (newly_revealed_spks, _changeset) = indexer + /// .reveal_to_target("descriptor_a", 21) + /// .expect("keychain must exist"); + /// let _request = SyncRequest::builder() + /// .spks_with_indexes(newly_revealed_spks) + /// .build(); + /// + /// // Sync all revealed spks in the indexer. This time, spks may be derived from different + /// // keychains. Each spk will be indexed with `(&'static str, u32)` where `&'static str` is + /// // the keychain identifier and `u32` is the derivation index. + /// let all_revealed_spks = indexer.revealed_spks(..); + /// let _request = SyncRequest::builder() + /// .spks_with_indexes(all_revealed_spks) + /// .build(); + /// # Ok::<_, bdk_chain::keychain_txout::InsertDescriptorError<_>>(()) + /// ``` + pub fn spks_with_indexes(mut self, spks: impl IntoIterator) -> Self { + self.inner.spks.extend(spks); self } - /// Chain on additional [`Txid`]s that will be synced against. - /// - /// This consumes the [`SyncRequest`] and returns the updated one. - #[must_use] - pub fn chain_txids( - mut self, - txids: impl IntoIterator< - IntoIter = impl ExactSizeIterator + Send + 'static, - Item = Txid, - >, - ) -> Self { - self.txids = Box::new(ExactSizeChain::new(self.txids, txids.into_iter())); + /// Add [`Txid`]s that will be synced against. + pub fn txids(mut self, txids: impl IntoIterator) -> Self { + self.inner.txids.extend(txids); self } - /// Chain on additional [`OutPoint`]s that will be synced against. - /// - /// This consumes the [`SyncRequest`] and returns the updated one. - #[must_use] - pub fn chain_outpoints( - mut self, - outpoints: impl IntoIterator< - IntoIter = impl ExactSizeIterator + Send + 'static, - Item = OutPoint, - >, - ) -> Self { - self.outpoints = Box::new(ExactSizeChain::new(self.outpoints, outpoints.into_iter())); + /// Add [`OutPoint`]s that will be synced against. + pub fn outpoints(mut self, outpoints: impl IntoIterator) -> Self { + self.inner.outpoints.extend(outpoints); self } - /// Add a closure that will be called for [`Script`]s previously added to this request. - /// - /// This consumes the [`SyncRequest`] and returns the updated one. - #[must_use] - pub fn inspect_spks( - mut self, - mut inspect: impl FnMut(&Script) + Send + Sync + 'static, - ) -> Self { - self.spks = Box::new(self.spks.inspect(move |spk| inspect(spk))); + /// Set the closure that will inspect every sync item visited. + pub fn inspect(mut self, inspect: F) -> Self + where + F: FnMut(SyncItem, SyncProgress) + Send + 'static, + { + self.inner.inspect = Box::new(inspect); self } - /// Add a closure that will be called for [`Txid`]s previously added to this request. + /// Build the [`SyncRequest`]. + pub fn build(self) -> SyncRequest { + self.inner + } +} + +/// Data required to perform a spk-based blockchain client sync. +/// +/// A client sync fetches relevant chain data for a known list of scripts, transaction ids and +/// outpoints. The sync process also updates the chain from the given +/// [`chain_tip`](SyncRequestBuilder::chain_tip) (if provided). +/// +/// ```rust +/// # use bdk_chain::{bitcoin::{hashes::Hash, ScriptBuf}, local_chain::LocalChain}; +/// # let (local_chain, _) = LocalChain::from_genesis_hash(Hash::all_zeros()); +/// # let scripts = [ScriptBuf::default(), ScriptBuf::default()]; +/// # use bdk_chain::spk_client::SyncRequest; +/// // Construct a sync request. +/// let sync_request = SyncRequest::builder() +/// // Provide chain tip of the local wallet. +/// .chain_tip(local_chain.tip()) +/// // Provide list of scripts to scan for transactions against. +/// .spks(scripts) +/// // This is called for every synced item. +/// .inspect(|item, progress| println!("{} (remaining: {})", item, progress.remaining())) +/// // Finish constructing the sync request. +/// .build(); +/// ``` +#[must_use] +pub struct SyncRequest { + chain_tip: Option, + spks: VecDeque<(I, ScriptBuf)>, + spks_consumed: usize, + txids: VecDeque, + txids_consumed: usize, + outpoints: VecDeque, + outpoints_consumed: usize, + inspect: Box>, +} + +impl Default for SyncRequest { + fn default() -> Self { + Self { + chain_tip: None, + spks: VecDeque::new(), + spks_consumed: 0, + txids: VecDeque::new(), + txids_consumed: 0, + outpoints: VecDeque::new(), + outpoints_consumed: 0, + inspect: Box::new(|_, _| {}), + } + } +} + +impl From> for SyncRequest { + fn from(builder: SyncRequestBuilder) -> Self { + builder.inner + } +} + +impl SyncRequest { + /// Start building a [`SyncRequest`]. + pub fn builder() -> SyncRequestBuilder { + SyncRequestBuilder { + inner: Default::default(), + } + } + + /// Get the [`SyncProgress`] of this request. + pub fn progress(&self) -> SyncProgress { + SyncProgress { + spks_consumed: self.spks_consumed, + spks_remaining: self.spks.len(), + txids_consumed: self.txids_consumed, + txids_remaining: self.txids.len(), + outpoints_consumed: self.outpoints_consumed, + outpoints_remaining: self.outpoints.len(), + } + } + + /// Get the chain tip [`CheckPoint`] of this request (if any). + pub fn chain_tip(&self) -> Option { + self.chain_tip.clone() + } + + /// Advances the sync request and returns the next [`ScriptBuf`]. /// - /// This consumes the [`SyncRequest`] and returns the updated one. - #[must_use] - pub fn inspect_txids(mut self, mut inspect: impl FnMut(&Txid) + Send + Sync + 'static) -> Self { - self.txids = Box::new(self.txids.inspect(move |txid| inspect(txid))); - self + /// Returns [`None`] when there are no more scripts remaining in the request. + pub fn next_spk(&mut self) -> Option { + let (i, spk) = self.spks.pop_front()?; + self.spks_consumed += 1; + self._call_inspect(SyncItem::Spk(i, spk.as_script())); + Some(spk) } - /// Add a closure that will be called for [`OutPoint`]s previously added to this request. + /// Advances the sync request and returns the next [`Txid`]. /// - /// This consumes the [`SyncRequest`] and returns the updated one. - #[must_use] - pub fn inspect_outpoints( - mut self, - mut inspect: impl FnMut(&OutPoint) + Send + Sync + 'static, - ) -> Self { - self.outpoints = Box::new(self.outpoints.inspect(move |op| inspect(op))); - self + /// Returns [`None`] when there are no more txids remaining in the request. + pub fn next_txid(&mut self) -> Option { + let txid = self.txids.pop_front()?; + self.txids_consumed += 1; + self._call_inspect(SyncItem::Txid(txid)); + Some(txid) } - /// Populate the request with revealed script pubkeys from `index` with the given `spk_range`. + /// Advances the sync request and returns the next [`OutPoint`]. /// - /// This consumes the [`SyncRequest`] and returns the updated one. - #[cfg(feature = "miniscript")] - #[must_use] - pub fn populate_with_revealed_spks( - self, - index: &crate::indexer::keychain_txout::KeychainTxOutIndex, - spk_range: impl core::ops::RangeBounds, - ) -> Self { - use alloc::borrow::ToOwned; - use alloc::vec::Vec; - self.chain_spks( - index - .revealed_spks(spk_range) - .map(|(_, spk)| spk.to_owned()) - .collect::>(), - ) + /// Returns [`None`] when there are no more outpoints in the request. + pub fn next_outpoint(&mut self) -> Option { + let outpoint = self.outpoints.pop_front()?; + self.outpoints_consumed += 1; + self._call_inspect(SyncItem::OutPoint(outpoint)); + Some(outpoint) + } + + /// Iterate over [`ScriptBuf`]s contained in this request. + pub fn iter_spks(&mut self) -> impl ExactSizeIterator + '_ { + SyncIter::::new(self) + } + + /// Iterate over [`Txid`]s contained in this request. + pub fn iter_txids(&mut self) -> impl ExactSizeIterator + '_ { + SyncIter::::new(self) + } + + /// Iterate over [`OutPoint`]s contained in this request. + pub fn iter_outpoints(&mut self) -> impl ExactSizeIterator + '_ { + SyncIter::::new(self) + } + + fn _call_inspect(&mut self, item: SyncItem) { + let progress = self.progress(); + (*self.inspect)(item, progress); } } /// Data returned from a spk-based blockchain client sync. /// /// See also [`SyncRequest`]. +#[must_use] +#[derive(Debug)] pub struct SyncResult { /// The update to apply to the receiving [`TxGraph`]. pub graph_update: TxGraph, /// The update to apply to the receiving [`LocalChain`](crate::local_chain::LocalChain). - pub chain_update: CheckPoint, + pub chain_update: Option, } -/// Data required to perform a spk-based blockchain client full scan. -/// -/// A client full scan iterates through all the scripts for the given keychains, fetching relevant -/// data until some stop gap number of scripts is found that have no data. This operation is -/// generally only used when importing or restoring previously used keychains in which the list of -/// used scripts is not known. The full scan process also updates the chain from the given [`CheckPoint`]. -pub struct FullScanRequest { - /// A checkpoint for the current [`LocalChain::tip`]. - /// The full scan process will return a new chain update that extends this tip. - /// - /// [`LocalChain::tip`]: crate::local_chain::LocalChain::tip - pub chain_tip: CheckPoint, - /// Iterators of script pubkeys indexed by the keychain index. - pub spks_by_keychain: BTreeMap> + Send>>, -} - -impl FullScanRequest { - /// Construct a new [`FullScanRequest`] from a given `chain_tip`. - #[must_use] - pub fn from_chain_tip(chain_tip: CheckPoint) -> Self { +impl Default for SyncResult { + fn default() -> Self { Self { - chain_tip, - spks_by_keychain: BTreeMap::new(), + graph_update: Default::default(), + chain_update: Default::default(), } } +} - /// Construct a new [`FullScanRequest`] from a given `chain_tip` and `index`. - /// - /// Unbounded script pubkey iterators for each keychain (`K`) are extracted using - /// [`KeychainTxOutIndex::all_unbounded_spk_iters`] and is used to populate the - /// [`FullScanRequest`]. - /// - /// [`KeychainTxOutIndex::all_unbounded_spk_iters`]: crate::indexer::keychain_txout::KeychainTxOutIndex::all_unbounded_spk_iters - #[cfg(feature = "miniscript")] - #[must_use] - pub fn from_keychain_txout_index( - chain_tip: CheckPoint, - index: &crate::indexer::keychain_txout::KeychainTxOutIndex, - ) -> Self - where - K: core::fmt::Debug, - { - let mut req = Self::from_chain_tip(chain_tip); - for (keychain, spks) in index.all_unbounded_spk_iters() { - req = req.set_spks_for_keychain(keychain, spks); +/// Builds a [`FullScanRequest`]. +#[must_use] +pub struct FullScanRequestBuilder { + inner: FullScanRequest, +} + +impl Default for FullScanRequestBuilder { + fn default() -> Self { + Self { + inner: Default::default(), } - req } +} - /// Set the [`Script`]s for a given `keychain`. - /// - /// This consumes the [`FullScanRequest`] and returns the updated one. - #[must_use] - pub fn set_spks_for_keychain( +#[cfg(feature = "miniscript")] +impl FullScanRequestBuilder { + /// Add spk iterators for each keychain tracked in `indexer`. + pub fn spks_from_indexer( mut self, - keychain: K, - spks: impl IntoIterator> + Send + 'static>, + indexer: &crate::indexer::keychain_txout::KeychainTxOutIndex, ) -> Self { - self.spks_by_keychain - .insert(keychain, Box::new(spks.into_iter())); + for (keychain, spks) in indexer.all_unbounded_spk_iters() { + self = self.spks_for_keychain(keychain, spks); + } self } +} - /// Chain on additional [`Script`]s that will be synced against. +impl FullScanRequestBuilder { + /// Set the initial chain tip for the full scan request. /// - /// This consumes the [`FullScanRequest`] and returns the updated one. - #[must_use] - pub fn chain_spks_for_keychain( + /// This is used to update [`LocalChain`](crate::local_chain::LocalChain). + pub fn chain_tip(mut self, tip: CheckPoint) -> Self { + self.inner.chain_tip = Some(tip); + self + } + + /// Set the spk iterator for a given `keychain`. + pub fn spks_for_keychain( mut self, keychain: K, spks: impl IntoIterator> + Send + 'static>, ) -> Self { - match self.spks_by_keychain.remove(&keychain) { - // clippy here suggests to remove `into_iter` from `spks.into_iter()`, but doing so - // results in a compilation error - #[allow(clippy::useless_conversion)] - Some(keychain_spks) => self - .spks_by_keychain - .insert(keychain, Box::new(keychain_spks.chain(spks.into_iter()))), - None => self - .spks_by_keychain - .insert(keychain, Box::new(spks.into_iter())), - }; + self.inner + .spks_by_keychain + .insert(keychain, Box::new(spks.into_iter())); self } - /// Add a closure that will be called for every [`Script`] previously added to any keychain in - /// this request. - /// - /// This consumes the [`SyncRequest`] and returns the updated one. - #[must_use] - pub fn inspect_spks_for_all_keychains( - mut self, - inspect: impl FnMut(K, u32, &Script) + Send + Sync + Clone + 'static, - ) -> Self + /// Set the closure that will inspect every sync item visited. + pub fn inspect(mut self, inspect: F) -> Self where - K: Send + 'static, + F: FnMut(K, u32, &Script) + Send + 'static, { - for (keychain, spks) in core::mem::take(&mut self.spks_by_keychain) { - let mut inspect = inspect.clone(); - self.spks_by_keychain.insert( - keychain.clone(), - Box::new(spks.inspect(move |(i, spk)| inspect(keychain.clone(), *i, spk))), - ); - } + self.inner.inspect = Box::new(inspect); self } - /// Add a closure that will be called for every [`Script`] previously added to a given - /// `keychain` in this request. - /// - /// This consumes the [`SyncRequest`] and returns the updated one. - #[must_use] - pub fn inspect_spks_for_keychain( - mut self, - keychain: K, - mut inspect: impl FnMut(u32, &Script) + Send + Sync + 'static, - ) -> Self - where - K: Send + 'static, - { - if let Some(spks) = self.spks_by_keychain.remove(&keychain) { - self.spks_by_keychain.insert( - keychain, - Box::new(spks.inspect(move |(i, spk)| inspect(*i, spk))), - ); + /// Build the [`FullScanRequest`]. + pub fn build(self) -> FullScanRequest { + self.inner + } +} + +/// Data required to perform a spk-based blockchain client full scan. +/// +/// A client full scan iterates through all the scripts for the given keychains, fetching relevant +/// data until some stop gap number of scripts is found that have no data. This operation is +/// generally only used when importing or restoring previously used keychains in which the list of +/// used scripts is not known. The full scan process also updates the chain from the given +/// [`chain_tip`](FullScanRequestBuilder::chain_tip) (if provided). +#[must_use] +pub struct FullScanRequest { + chain_tip: Option, + spks_by_keychain: BTreeMap> + Send>>, + inspect: Box>, +} + +impl From> for FullScanRequest { + fn from(builder: FullScanRequestBuilder) -> Self { + builder.inner + } +} + +impl Default for FullScanRequest { + fn default() -> Self { + Self { + chain_tip: None, + spks_by_keychain: Default::default(), + inspect: Box::new(|_, _, _| {}), + } + } +} + +impl FullScanRequest { + /// Start building a [`FullScanRequest`]. + pub fn builder() -> FullScanRequestBuilder { + FullScanRequestBuilder { + inner: Self::default(), + } + } + + /// Get the chain tip [`CheckPoint`] of this request (if any). + pub fn chain_tip(&self) -> Option { + self.chain_tip.clone() + } + + /// List all keychains contained in this request. + pub fn keychains(&self) -> Vec { + self.spks_by_keychain.keys().cloned().collect() + } + + /// Advances the full scan request and returns the next indexed [`ScriptBuf`] of the given + /// `keychain`. + pub fn next_spk(&mut self, keychain: K) -> Option> { + self.iter_spks(keychain).next() + } + + /// Iterate over indexed [`ScriptBuf`]s contained in this request of the given `keychain`. + pub fn iter_spks(&mut self, keychain: K) -> impl Iterator> + '_ { + let spks = self.spks_by_keychain.get_mut(&keychain); + let inspect = &mut self.inspect; + KeychainSpkIter { + keychain, + spks, + inspect, } - self } } /// Data returned from a spk-based blockchain client full scan. /// /// See also [`FullScanRequest`]. +#[must_use] +#[derive(Debug)] pub struct FullScanResult { /// The update to apply to the receiving [`LocalChain`](crate::local_chain::LocalChain). pub graph_update: TxGraph, /// The update to apply to the receiving [`TxGraph`]. - pub chain_update: CheckPoint, + pub chain_update: Option, /// Last active indices for the corresponding keychains (`K`). pub last_active_indices: BTreeMap, } -/// A version of [`core::iter::Chain`] which can combine two [`ExactSizeIterator`]s to form a new -/// [`ExactSizeIterator`]. -/// -/// The danger of this is explained in [the `ExactSizeIterator` docs] -/// (https://doc.rust-lang.org/core/iter/trait.ExactSizeIterator.html#when-shouldnt-an-adapter-be-exactsizeiterator). -/// This does not apply here since it would be impossible to scan an item count that overflows -/// `usize` anyway. -struct ExactSizeChain { - a: Option, - b: Option, - i: PhantomData, -} - -impl ExactSizeChain { - fn new(a: A, b: B) -> Self { - ExactSizeChain { - a: Some(a), - b: Some(b), - i: PhantomData, +impl Default for FullScanResult { + fn default() -> Self { + Self { + graph_update: Default::default(), + chain_update: Default::default(), + last_active_indices: Default::default(), } } } -impl Iterator for ExactSizeChain -where - A: Iterator, - B: Iterator, -{ - type Item = I; +struct KeychainSpkIter<'r, K> { + keychain: K, + spks: Option<&'r mut Box> + Send>>, + inspect: &'r mut Box>, +} + +impl<'r, K: Ord + Clone> Iterator for KeychainSpkIter<'r, K> { + type Item = Indexed; fn next(&mut self) -> Option { - if let Some(a) = &mut self.a { - let item = a.next(); - if item.is_some() { - return item; - } - self.a = None; - } - if let Some(b) = &mut self.b { - let item = b.next(); - if item.is_some() { - return item; - } - self.b = None; + let (i, spk) = self.spks.as_mut()?.next()?; + (*self.inspect)(self.keychain.clone(), i, &spk); + Some((i, spk)) + } +} + +struct SyncIter<'r, I, Item> { + request: &'r mut SyncRequest, + marker: core::marker::PhantomData, +} + +impl<'r, I, Item> SyncIter<'r, I, Item> { + fn new(request: &'r mut SyncRequest) -> Self { + Self { + request, + marker: core::marker::PhantomData, } - None } } -impl ExactSizeIterator for ExactSizeChain -where - A: ExactSizeIterator, - B: ExactSizeIterator, -{ - fn len(&self) -> usize { - let a_len = self.a.as_ref().map(|a| a.len()).unwrap_or(0); - let b_len = self.b.as_ref().map(|a| a.len()).unwrap_or(0); - a_len + b_len +impl<'r, I, Item> ExactSizeIterator for SyncIter<'r, I, Item> where SyncIter<'r, I, Item>: Iterator {} + +impl<'r, I> Iterator for SyncIter<'r, I, ScriptBuf> { + type Item = ScriptBuf; + + fn next(&mut self) -> Option { + self.request.next_spk() + } + + fn size_hint(&self) -> (usize, Option) { + let remaining = self.request.spks.len(); + (remaining, Some(remaining)) + } +} + +impl<'r, I> Iterator for SyncIter<'r, I, Txid> { + type Item = Txid; + + fn next(&mut self) -> Option { + self.request.next_txid() + } + + fn size_hint(&self) -> (usize, Option) { + let remaining = self.request.txids.len(); + (remaining, Some(remaining)) + } +} + +impl<'r, I> Iterator for SyncIter<'r, I, OutPoint> { + type Item = OutPoint; + + fn next(&mut self) -> Option { + self.request.next_outpoint() + } + + fn size_hint(&self) -> (usize, Option) { + let remaining = self.request.outpoints.len(); + (remaining, Some(remaining)) } } diff --git a/crates/electrum/src/bdk_electrum_client.rs b/crates/electrum/src/bdk_electrum_client.rs index 9dfbdab73..1458e2bd9 100644 --- a/crates/electrum/src/bdk_electrum_client.rs +++ b/crates/electrum/src/bdk_electrum_client.rs @@ -126,17 +126,22 @@ impl BdkElectrumClient { /// [`Wallet.calculate_fee_rate`]: https://docs.rs/bdk_wallet/latest/bdk_wallet/struct.Wallet.html#method.calculate_fee_rate pub fn full_scan( &self, - request: FullScanRequest, + request: impl Into>, stop_gap: usize, batch_size: usize, fetch_prev_txouts: bool, ) -> Result, Error> { - let (tip, latest_blocks) = - fetch_tip_and_latest_blocks(&self.inner, request.chain_tip.clone())?; - let mut graph_update = TxGraph::::default(); - let mut last_active_indices = BTreeMap::::new(); + let mut request: FullScanRequest = request.into(); + + let tip_and_latest_blocks = match request.chain_tip() { + Some(chain_tip) => Some(fetch_tip_and_latest_blocks(&self.inner, chain_tip)?), + None => None, + }; - for (keychain, spks) in request.spks_by_keychain { + let mut graph_update = TxGraph::::default(); + let mut last_active_indices = BTreeMap::::default(); + for keychain in request.keychains() { + let spks = request.iter_spks(keychain.clone()); if let Some(last_active_index) = self.populate_with_spks(&mut graph_update, spks, stop_gap, batch_size)? { @@ -144,13 +149,20 @@ impl BdkElectrumClient { } } - let chain_update = chain_update(tip, &latest_blocks, graph_update.all_anchors())?; - // Fetch previous `TxOut`s for fee calculation if flag is enabled. if fetch_prev_txouts { self.fetch_prev_txout(&mut graph_update)?; } + let chain_update = match tip_and_latest_blocks { + Some((chain_tip, latest_blocks)) => Some(chain_update( + chain_tip, + &latest_blocks, + graph_update.all_anchors(), + )?), + _ => None, + }; + Ok(FullScanResult { graph_update, chain_update, @@ -180,35 +192,49 @@ impl BdkElectrumClient { /// [`CalculateFeeError::MissingTxOut`]: bdk_chain::tx_graph::CalculateFeeError::MissingTxOut /// [`Wallet.calculate_fee`]: https://docs.rs/bdk_wallet/latest/bdk_wallet/struct.Wallet.html#method.calculate_fee /// [`Wallet.calculate_fee_rate`]: https://docs.rs/bdk_wallet/latest/bdk_wallet/struct.Wallet.html#method.calculate_fee_rate - pub fn sync( + pub fn sync( &self, - request: SyncRequest, + request: impl Into>, batch_size: usize, fetch_prev_txouts: bool, ) -> Result { - let full_scan_req = FullScanRequest::from_chain_tip(request.chain_tip.clone()) - .set_spks_for_keychain((), request.spks.enumerate().map(|(i, spk)| (i as u32, spk))); - let mut full_scan_res = self.full_scan(full_scan_req, usize::MAX, batch_size, false)?; - let (tip, latest_blocks) = - fetch_tip_and_latest_blocks(&self.inner, request.chain_tip.clone())?; - - self.populate_with_txids(&mut full_scan_res.graph_update, request.txids)?; - self.populate_with_outpoints(&mut full_scan_res.graph_update, request.outpoints)?; - - let chain_update = chain_update( - tip, - &latest_blocks, - full_scan_res.graph_update.all_anchors(), + let mut request: SyncRequest = request.into(); + + let tip_and_latest_blocks = match request.chain_tip() { + Some(chain_tip) => Some(fetch_tip_and_latest_blocks(&self.inner, chain_tip)?), + None => None, + }; + + let mut graph_update = TxGraph::::default(); + self.populate_with_spks( + &mut graph_update, + request + .iter_spks() + .enumerate() + .map(|(i, spk)| (i as u32, spk)), + usize::MAX, + batch_size, )?; + self.populate_with_txids(&mut graph_update, request.iter_txids())?; + self.populate_with_outpoints(&mut graph_update, request.iter_outpoints())?; // Fetch previous `TxOut`s for fee calculation if flag is enabled. if fetch_prev_txouts { - self.fetch_prev_txout(&mut full_scan_res.graph_update)?; + self.fetch_prev_txout(&mut graph_update)?; } + let chain_update = match tip_and_latest_blocks { + Some((chain_tip, latest_blocks)) => Some(chain_update( + chain_tip, + &latest_blocks, + graph_update.all_anchors(), + )?), + None => None, + }; + Ok(SyncResult { + graph_update, chain_update, - graph_update: full_scan_res.graph_update, }) } diff --git a/crates/electrum/tests/test_electrum.rs b/crates/electrum/tests/test_electrum.rs index afe50be0a..63e91081b 100644 --- a/crates/electrum/tests/test_electrum.rs +++ b/crates/electrum/tests/test_electrum.rs @@ -39,7 +39,7 @@ where Spks::IntoIter: ExactSizeIterator + Send + 'static, { let mut update = client.sync( - SyncRequest::from_chain_tip(chain.tip()).chain_spks(spks), + SyncRequest::builder().chain_tip(chain.tip()).spks(spks), BATCH_SIZE, true, )?; @@ -51,9 +51,11 @@ where .as_secs(); let _ = update.graph_update.update_last_seen_unconfirmed(now); - let _ = chain - .apply_update(update.chain_update.clone()) - .map_err(|err| anyhow::anyhow!("LocalChain update error: {:?}", err))?; + if let Some(chain_update) = update.chain_update.clone() { + let _ = chain + .apply_update(chain_update) + .map_err(|err| anyhow::anyhow!("LocalChain update error: {:?}", err))?; + } let _ = graph.apply_update(update.graph_update.clone()); Ok(update) @@ -103,7 +105,9 @@ pub fn test_update_tx_graph_without_keychain() -> anyhow::Result<()> { let cp_tip = env.make_checkpoint_tip(); let sync_update = { - let request = SyncRequest::from_chain_tip(cp_tip.clone()).set_spks(misc_spks); + let request = SyncRequest::builder() + .chain_tip(cp_tip.clone()) + .spks(misc_spks); client.sync(request, 1, true)? }; @@ -207,15 +211,17 @@ pub fn test_update_tx_graph_stop_gap() -> anyhow::Result<()> { // A scan with a stop_gap of 3 won't find the transaction, but a scan with a gap limit of 4 // will. let full_scan_update = { - let request = - FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone()); + let request = FullScanRequest::builder() + .chain_tip(cp_tip.clone()) + .spks_for_keychain(0, spks.clone()); client.full_scan(request, 3, 1, false)? }; assert!(full_scan_update.graph_update.full_txs().next().is_none()); assert!(full_scan_update.last_active_indices.is_empty()); let full_scan_update = { - let request = - FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone()); + let request = FullScanRequest::builder() + .chain_tip(cp_tip.clone()) + .spks_for_keychain(0, spks.clone()); client.full_scan(request, 4, 1, false)? }; assert_eq!( @@ -246,8 +252,9 @@ pub fn test_update_tx_graph_stop_gap() -> anyhow::Result<()> { // A scan with gap limit 5 won't find the second transaction, but a scan with gap limit 6 will. // The last active indice won't be updated in the first case but will in the second one. let full_scan_update = { - let request = - FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone()); + let request = FullScanRequest::builder() + .chain_tip(cp_tip.clone()) + .spks_for_keychain(0, spks.clone()); client.full_scan(request, 5, 1, false)? }; let txs: HashSet<_> = full_scan_update @@ -259,8 +266,9 @@ pub fn test_update_tx_graph_stop_gap() -> anyhow::Result<()> { assert!(txs.contains(&txid_4th_addr)); assert_eq!(full_scan_update.last_active_indices[&0], 3); let full_scan_update = { - let request = - FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone()); + let request = FullScanRequest::builder() + .chain_tip(cp_tip.clone()) + .spks_for_keychain(0, spks.clone()); client.full_scan(request, 6, 1, false)? }; let txs: HashSet<_> = full_scan_update @@ -311,7 +319,7 @@ fn test_sync() -> anyhow::Result<()> { let txid = env.send(&addr_to_track, SEND_AMOUNT)?; env.wait_until_electrum_sees_txid(txid, Duration::from_secs(6))?; - sync_with_electrum( + let _ = sync_with_electrum( &client, [spk_to_track.clone()], &mut recv_chain, @@ -332,7 +340,7 @@ fn test_sync() -> anyhow::Result<()> { env.mine_blocks(1, None)?; env.wait_until_electrum_sees_block(Duration::from_secs(6))?; - sync_with_electrum( + let _ = sync_with_electrum( &client, [spk_to_track.clone()], &mut recv_chain, @@ -353,7 +361,7 @@ fn test_sync() -> anyhow::Result<()> { env.reorg_empty_blocks(1)?; env.wait_until_electrum_sees_block(Duration::from_secs(6))?; - sync_with_electrum( + let _ = sync_with_electrum( &client, [spk_to_track.clone()], &mut recv_chain, @@ -373,7 +381,7 @@ fn test_sync() -> anyhow::Result<()> { env.mine_blocks(1, None)?; env.wait_until_electrum_sees_block(Duration::from_secs(6))?; - sync_with_electrum(&client, [spk_to_track], &mut recv_chain, &mut recv_graph)?; + let _ = sync_with_electrum(&client, [spk_to_track], &mut recv_chain, &mut recv_graph)?; // Check if balance is correct once transaction is confirmed again. assert_eq!( diff --git a/crates/esplora/Cargo.toml b/crates/esplora/Cargo.toml index f0e82eb11..9148a0f86 100644 --- a/crates/esplora/Cargo.toml +++ b/crates/esplora/Cargo.toml @@ -13,11 +13,9 @@ readme = "README.md" [dependencies] bdk_chain = { path = "../chain", version = "0.17.0", default-features = false } -esplora-client = { version = "0.8.0", default-features = false } +esplora-client = { version = "0.9.0", default-features = false } async-trait = { version = "0.1.66", optional = true } futures = { version = "0.3.26", optional = true } - -bitcoin = { version = "0.32.0", optional = true, default-features = false } miniscript = { version = "12.0.0", optional = true, default-features = false } [dev-dependencies] diff --git a/crates/esplora/README.md b/crates/esplora/README.md index 96d92c76c..ef2e65176 100644 --- a/crates/esplora/README.md +++ b/crates/esplora/README.md @@ -1,11 +1,12 @@ # BDK Esplora -BDK Esplora extends [`esplora-client`] to update [`bdk_chain`] structures -from an Esplora server. +BDK Esplora extends [`esplora-client`] (with extension traits: [`EsploraExt`] and +[`EsploraAsyncExt`]) to update [`bdk_chain`] structures from an Esplora server. -## Usage +The extension traits are primarily intended to satisfy [`SyncRequest`]s with [`sync`] and +[`FullScanRequest`]s with [`full_scan`]. -There are two versions of the extension trait (blocking and async). +## Usage For blocking-only: ```toml @@ -27,10 +28,16 @@ To use the extension traits: // for blocking use bdk_esplora::EsploraExt; // for async -// use bdk_esplora::EsploraAsyncExt; +use bdk_esplora::EsploraAsyncExt; ``` For full examples, refer to [`example-crates/wallet_esplora_blocking`](https://github.com/bitcoindevkit/bdk/tree/master/example-crates/wallet_esplora_blocking) and [`example-crates/wallet_esplora_async`](https://github.com/bitcoindevkit/bdk/tree/master/example-crates/wallet_esplora_async). [`esplora-client`]: https://docs.rs/esplora-client/ [`bdk_chain`]: https://docs.rs/bdk-chain/ +[`EsploraExt`]: crate::EsploraExt +[`EsploraAsyncExt`]: crate::EsploraAsyncExt +[`SyncRequest`]: bdk_chain::spk_client::SyncRequest +[`FullScanRequest`]: bdk_chain::spk_client::FullScanRequest +[`sync`]: crate::EsploraExt::sync +[`full_scan`]: crate::EsploraExt::full_scan diff --git a/crates/esplora/src/async_ext.rs b/crates/esplora/src/async_ext.rs index 70895a43a..066b91e17 100644 --- a/crates/esplora/src/async_ext.rs +++ b/crates/esplora/src/async_ext.rs @@ -3,70 +3,52 @@ use std::collections::BTreeSet; use async_trait::async_trait; use bdk_chain::spk_client::{FullScanRequest, FullScanResult, SyncRequest, SyncResult}; use bdk_chain::{ - bitcoin::{BlockHash, OutPoint, ScriptBuf, TxOut, Txid}, + bitcoin::{BlockHash, OutPoint, ScriptBuf, Txid}, collections::BTreeMap, local_chain::CheckPoint, BlockId, ConfirmationBlockTime, TxGraph, }; use bdk_chain::{Anchor, Indexed}; -use esplora_client::{Amount, TxStatus}; +use esplora_client::{Tx, TxStatus}; use futures::{stream::FuturesOrdered, TryStreamExt}; -use crate::anchor_from_status; +use crate::{insert_anchor_from_status, insert_prevouts}; /// [`esplora_client::Error`] type Error = Box; /// Trait to extend the functionality of [`esplora_client::AsyncClient`]. /// -/// Refer to [crate-level documentation] for more. -/// -/// [crate-level documentation]: crate +/// Refer to [crate-level documentation](crate) for more. #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] #[cfg_attr(not(target_arch = "wasm32"), async_trait)] pub trait EsploraAsyncExt { /// Scan keychain scripts for transactions against Esplora, returning an update that can be /// applied to the receiving structures. /// - /// - `request`: struct with data required to perform a spk-based blockchain client full scan, - /// see [`FullScanRequest`] - /// - /// The full scan for each keychain stops after a gap of `stop_gap` script pubkeys with no - /// associated transactions. `parallel_requests` specifies the max number of HTTP requests to - /// make in parallel. - /// - /// ## Note + /// `request` provides the data required to perform a script-pubkey-based full scan + /// (see [`FullScanRequest`]). The full scan for each keychain (`K`) stops after a gap of + /// `stop_gap` script pubkeys with no associated transactions. `parallel_requests` specifies + /// the maximum number of HTTP requests to make in parallel. /// - /// `stop_gap` is defined as "the maximum number of consecutive unused addresses". - /// For example, with a `stop_gap` of 3, `full_scan` will keep scanning - /// until it encounters 3 consecutive script pubkeys with no associated transactions. - /// - /// This follows the same approach as other Bitcoin-related software, - /// such as [Electrum](https://electrum.readthedocs.io/en/latest/faq.html#what-is-the-gap-limit), - /// [BTCPay Server](https://docs.btcpayserver.org/FAQ/Wallet/#the-gap-limit-problem), - /// and [Sparrow](https://www.sparrowwallet.com/docs/faq.html#ive-restored-my-wallet-but-some-of-my-funds-are-missing). - /// - /// A `stop_gap` of 0 will be treated as a `stop_gap` of 1. - async fn full_scan( + /// Refer to [crate-level docs](crate) for more. + async fn full_scan> + Send>( &self, - request: FullScanRequest, + request: R, stop_gap: usize, parallel_requests: usize, ) -> Result, Error>; - /// Sync a set of scripts with the blockchain (via an Esplora client) for the data - /// specified and return a [`TxGraph`]. - /// - /// - `request`: struct with data required to perform a spk-based blockchain client sync, see - /// [`SyncRequest`] + /// Sync a set of scripts, txids, and/or outpoints against Esplora. /// - /// If the scripts to sync are unknown, such as when restoring or importing a keychain that - /// may include scripts that have been used, use [`full_scan`] with the keychain. + /// `request` provides the data required to perform a script-pubkey-based sync (see + /// [`SyncRequest`]). `parallel_requests` specifies the maximum number of HTTP requests to make + /// in parallel. /// - /// [`full_scan`]: EsploraAsyncExt::full_scan - async fn sync( + /// Refer to [crate-level docs](crate) for more. + async fn sync> + Send>( &self, - request: SyncRequest, + request: R, parallel_requests: usize, ) -> Result; } @@ -74,27 +56,42 @@ pub trait EsploraAsyncExt { #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] #[cfg_attr(not(target_arch = "wasm32"), async_trait)] impl EsploraAsyncExt for esplora_client::AsyncClient { - async fn full_scan( + async fn full_scan> + Send>( &self, - request: FullScanRequest, + request: R, stop_gap: usize, parallel_requests: usize, ) -> Result, Error> { - let latest_blocks = fetch_latest_blocks(self).await?; - let (graph_update, last_active_indices) = full_scan_for_index_and_graph( - self, - request.spks_by_keychain, - stop_gap, - parallel_requests, - ) - .await?; - let chain_update = chain_update( - self, - &latest_blocks, - &request.chain_tip, - graph_update.all_anchors(), - ) - .await?; + let mut request = request.into(); + let keychains = request.keychains(); + + let chain_tip = request.chain_tip(); + let latest_blocks = if chain_tip.is_some() { + Some(fetch_latest_blocks(self).await?) + } else { + None + }; + + let mut graph_update = TxGraph::default(); + let mut last_active_indices = BTreeMap::::new(); + for keychain in keychains { + let keychain_spks = request.iter_spks(keychain.clone()); + let (tx_graph, last_active_index) = + fetch_txs_with_keychain_spks(self, keychain_spks, stop_gap, parallel_requests) + .await?; + let _ = graph_update.apply_update(tx_graph); + if let Some(last_active_index) = last_active_index { + last_active_indices.insert(keychain, last_active_index); + } + } + + let chain_update = match (chain_tip, latest_blocks) { + (Some(chain_tip), Some(latest_blocks)) => Some( + chain_update(self, &latest_blocks, &chain_tip, graph_update.all_anchors()).await?, + ), + _ => None, + }; + Ok(FullScanResult { chain_update, graph_update, @@ -102,27 +99,37 @@ impl EsploraAsyncExt for esplora_client::AsyncClient { }) } - async fn sync( + async fn sync> + Send>( &self, - request: SyncRequest, + request: R, parallel_requests: usize, ) -> Result { - let latest_blocks = fetch_latest_blocks(self).await?; - let graph_update = sync_for_index_and_graph( - self, - request.spks, - request.txids, - request.outpoints, - parallel_requests, - ) - .await?; - let chain_update = chain_update( - self, - &latest_blocks, - &request.chain_tip, - graph_update.all_anchors(), - ) - .await?; + let mut request = request.into(); + + let chain_tip = request.chain_tip(); + let latest_blocks = if chain_tip.is_some() { + Some(fetch_latest_blocks(self).await?) + } else { + None + }; + + let mut graph_update = TxGraph::::default(); + let _ = graph_update + .apply_update(fetch_txs_with_spks(self, request.iter_spks(), parallel_requests).await?); + let _ = graph_update.apply_update( + fetch_txs_with_txids(self, request.iter_txids(), parallel_requests).await?, + ); + let _ = graph_update.apply_update( + fetch_txs_with_outpoints(self, request.iter_outpoints(), parallel_requests).await?, + ); + + let chain_update = match (chain_tip, latest_blocks) { + (Some(chain_tip), Some(latest_blocks)) => Some( + chain_update(self, &latest_blocks, &chain_tip, graph_update.all_anchors()).await?, + ), + _ => None, + }; + Ok(SyncResult { chain_update, graph_update, @@ -230,135 +237,150 @@ async fn chain_update( Ok(tip) } -/// This performs a full scan to get an update for the [`TxGraph`] and -/// [`KeychainTxOutIndex`](bdk_chain::indexer::keychain_txout::KeychainTxOutIndex). -async fn full_scan_for_index_and_graph( +/// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning +/// `keychain_spks` against Esplora. +/// +/// `keychain_spks` is an *unbounded* indexed-[`ScriptBuf`] iterator that represents scripts +/// derived from a keychain. The scanning logic stops after a `stop_gap` number of consecutive +/// scripts with no transaction history is reached. `parallel_requests` specifies the maximum +/// number of HTTP requests to make in parallel. +/// +/// A [`TxGraph`] (containing the fetched transactions and anchors) and the last active +/// keychain index (if any) is returned. The last active keychain index is the keychain's last +/// script pubkey that contains a non-empty transaction history. +/// +/// Refer to [crate-level docs](crate) for more. +async fn fetch_txs_with_keychain_spks> + Send>( client: &esplora_client::AsyncClient, - keychain_spks: BTreeMap< - K, - impl IntoIterator> + Send> + Send, - >, + mut keychain_spks: I, stop_gap: usize, parallel_requests: usize, -) -> Result<(TxGraph, BTreeMap), Error> { +) -> Result<(TxGraph, Option), Error> { type TxsOfSpkIndex = (u32, Vec); - let parallel_requests = Ord::max(parallel_requests, 1); - let mut graph = TxGraph::::default(); - let mut last_active_indexes = BTreeMap::::new(); - - for (keychain, spks) in keychain_spks { - let mut spks = spks.into_iter(); - let mut last_index = Option::::None; - let mut last_active_index = Option::::None; - - loop { - let handles = spks - .by_ref() - .take(parallel_requests) - .map(|(spk_index, spk)| { - let client = client.clone(); - async move { - let mut last_seen = None; - let mut spk_txs = Vec::new(); - loop { - let txs = client.scripthash_txs(&spk, last_seen).await?; - let tx_count = txs.len(); - last_seen = txs.last().map(|tx| tx.txid); - spk_txs.extend(txs); - if tx_count < 25 { - break Result::<_, Error>::Ok((spk_index, spk_txs)); - } - } - } - }) - .collect::>(); - if handles.is_empty() { - break; - } + let mut tx_graph = TxGraph::default(); + let mut last_index = Option::::None; + let mut last_active_index = Option::::None; - for (index, txs) in handles.try_collect::>().await? { - last_index = Some(index); - if !txs.is_empty() { - last_active_index = Some(index); - } - for tx in txs { - let _ = graph.insert_tx(tx.to_tx()); - if let Some(anchor) = anchor_from_status(&tx.status) { - let _ = graph.insert_anchor(tx.txid, anchor); + loop { + let handles = keychain_spks + .by_ref() + .take(parallel_requests) + .map(|(spk_index, spk)| { + let client = client.clone(); + async move { + let mut last_seen = None; + let mut spk_txs = Vec::new(); + loop { + let txs = client.scripthash_txs(&spk, last_seen).await?; + let tx_count = txs.len(); + last_seen = txs.last().map(|tx| tx.txid); + spk_txs.extend(txs); + if tx_count < 25 { + break Result::<_, Error>::Ok((spk_index, spk_txs)); + } } + } + }) + .collect::>(); - let previous_outputs = tx.vin.iter().filter_map(|vin| { - let prevout = vin.prevout.as_ref()?; - Some(( - OutPoint { - txid: vin.txid, - vout: vin.vout, - }, - TxOut { - script_pubkey: prevout.scriptpubkey.clone(), - value: Amount::from_sat(prevout.value), - }, - )) - }); + if handles.is_empty() { + break; + } - for (outpoint, txout) in previous_outputs { - let _ = graph.insert_txout(outpoint, txout); - } - } + for (index, txs) in handles.try_collect::>().await? { + last_index = Some(index); + if !txs.is_empty() { + last_active_index = Some(index); } - - let last_index = last_index.expect("Must be set since handles wasn't empty."); - let gap_limit_reached = if let Some(i) = last_active_index { - last_index >= i.saturating_add(stop_gap as u32) - } else { - last_index + 1 >= stop_gap as u32 - }; - if gap_limit_reached { - break; + for tx in txs { + let _ = tx_graph.insert_tx(tx.to_tx()); + insert_anchor_from_status(&mut tx_graph, tx.txid, tx.status); + insert_prevouts(&mut tx_graph, tx.vin); } } - if let Some(last_active_index) = last_active_index { - last_active_indexes.insert(keychain, last_active_index); + let last_index = last_index.expect("Must be set since handles wasn't empty."); + let gap_limit_reached = if let Some(i) = last_active_index { + last_index >= i.saturating_add(stop_gap as u32) + } else { + last_index + 1 >= stop_gap as u32 + }; + if gap_limit_reached { + break; } } - Ok((graph, last_active_indexes)) + Ok((tx_graph, last_active_index)) } -async fn sync_for_index_and_graph( +/// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning `spks` +/// against Esplora. +/// +/// Unlike with [`EsploraAsyncExt::fetch_txs_with_keychain_spks`], `spks` must be *bounded* as +/// all contained scripts will be scanned. `parallel_requests` specifies the maximum number of +/// HTTP requests to make in parallel. +/// +/// Refer to [crate-level docs](crate) for more. +async fn fetch_txs_with_spks + Send>( client: &esplora_client::AsyncClient, - misc_spks: impl IntoIterator + Send> + Send, - txids: impl IntoIterator + Send> + Send, - outpoints: impl IntoIterator + Send> + Send, + spks: I, parallel_requests: usize, -) -> Result, Error> { - let mut graph = full_scan_for_index_and_graph( +) -> Result, Error> +where + I::IntoIter: Send, +{ + fetch_txs_with_keychain_spks( client, - [( - (), - misc_spks - .into_iter() - .enumerate() - .map(|(i, spk)| (i as u32, spk)), - )] - .into(), + spks.into_iter().enumerate().map(|(i, spk)| (i as u32, spk)), usize::MAX, parallel_requests, ) .await - .map(|(g, _)| g)?; + .map(|(tx_graph, _)| tx_graph) +} + +/// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning `txids` +/// against Esplora. +/// +/// `parallel_requests` specifies the maximum number of HTTP requests to make in parallel. +/// +/// Refer to [crate-level docs](crate) for more. +async fn fetch_txs_with_txids + Send>( + client: &esplora_client::AsyncClient, + txids: I, + parallel_requests: usize, +) -> Result, Error> +where + I::IntoIter: Send, +{ + enum EsploraResp { + TxStatus(TxStatus), + Tx(Option), + } + let mut tx_graph = TxGraph::default(); let mut txids = txids.into_iter(); loop { let handles = txids .by_ref() .take(parallel_requests) - .filter(|&txid| graph.get_tx(txid).is_none()) .map(|txid| { let client = client.clone(); - async move { client.get_tx_status(&txid).await.map(|s| (txid, s)) } + let tx_already_exists = tx_graph.get_tx(txid).is_some(); + async move { + if tx_already_exists { + client + .get_tx_status(&txid) + .await + .map(|s| (txid, EsploraResp::TxStatus(s))) + } else { + client + .get_tx_info(&txid) + .await + .map(|t| (txid, EsploraResp::Tx(t))) + } + } }) .collect::>(); @@ -366,40 +388,82 @@ async fn sync_for_index_and_graph( break; } - for (txid, status) in handles.try_collect::>().await? { - if let Some(anchor) = anchor_from_status(&status) { - let _ = graph.insert_anchor(txid, anchor); + for (txid, resp) in handles.try_collect::>().await? { + match resp { + EsploraResp::TxStatus(status) => { + insert_anchor_from_status(&mut tx_graph, txid, status); + } + EsploraResp::Tx(Some(tx_info)) => { + let _ = tx_graph.insert_tx(tx_info.to_tx()); + insert_anchor_from_status(&mut tx_graph, txid, tx_info.status); + insert_prevouts(&mut tx_graph, tx_info.vin); + } + _ => continue, } } } + Ok(tx_graph) +} - for op in outpoints.into_iter() { - if graph.get_tx(op.txid).is_none() { - if let Some(tx) = client.get_tx(&op.txid).await? { - let _ = graph.insert_tx(tx); - } - let status = client.get_tx_status(&op.txid).await?; - if let Some(anchor) = anchor_from_status(&status) { - let _ = graph.insert_anchor(op.txid, anchor); - } +/// Fetch transactions and [`ConfirmationBlockTime`]s that contain and spend the provided +/// `outpoints`. +/// +/// `parallel_requests` specifies the maximum number of HTTP requests to make in parallel. +/// +/// Refer to [crate-level docs](crate) for more. +async fn fetch_txs_with_outpoints + Send>( + client: &esplora_client::AsyncClient, + outpoints: I, + parallel_requests: usize, +) -> Result, Error> +where + I::IntoIter: Send, +{ + let outpoints = outpoints.into_iter().collect::>(); + + // make sure txs exists in graph and tx statuses are updated + // TODO: We should maintain a tx cache (like we do with Electrum). + let mut tx_graph = fetch_txs_with_txids( + client, + outpoints.iter().copied().map(|op| op.txid), + parallel_requests, + ) + .await?; + + // get outpoint spend-statuses + let mut outpoints = outpoints.into_iter(); + let mut missing_txs = Vec::::with_capacity(outpoints.len()); + loop { + let handles = outpoints + .by_ref() + .take(parallel_requests) + .map(|op| { + let client = client.clone(); + async move { client.get_output_status(&op.txid, op.vout as _).await } + }) + .collect::>(); + + if handles.is_empty() { + break; } - if let Some(op_status) = client.get_output_status(&op.txid, op.vout as _).await? { - if let Some(txid) = op_status.txid { - if graph.get_tx(txid).is_none() { - if let Some(tx) = client.get_tx(&txid).await? { - let _ = graph.insert_tx(tx); - } - let status = client.get_tx_status(&txid).await?; - if let Some(anchor) = anchor_from_status(&status) { - let _ = graph.insert_anchor(txid, anchor); - } - } + for op_status in handles.try_collect::>().await?.into_iter().flatten() { + let spend_txid = match op_status.txid { + Some(txid) => txid, + None => continue, + }; + if tx_graph.get_tx(spend_txid).is_none() { + missing_txs.push(spend_txid); + } + if let Some(spend_status) = op_status.status { + insert_anchor_from_status(&mut tx_graph, spend_txid, spend_status); } } } - Ok(graph) + let _ = + tx_graph.apply_update(fetch_txs_with_txids(client, missing_txs, parallel_requests).await?); + Ok(tx_graph) } #[cfg(test)] diff --git a/crates/esplora/src/blocking_ext.rs b/crates/esplora/src/blocking_ext.rs index dc95a350b..6e3e25afe 100644 --- a/crates/esplora/src/blocking_ext.rs +++ b/crates/esplora/src/blocking_ext.rs @@ -4,86 +4,90 @@ use std::thread::JoinHandle; use bdk_chain::collections::BTreeMap; use bdk_chain::spk_client::{FullScanRequest, FullScanResult, SyncRequest, SyncResult}; use bdk_chain::{ - bitcoin::{Amount, BlockHash, OutPoint, ScriptBuf, TxOut, Txid}, + bitcoin::{BlockHash, OutPoint, ScriptBuf, Txid}, local_chain::CheckPoint, BlockId, ConfirmationBlockTime, TxGraph, }; use bdk_chain::{Anchor, Indexed}; -use esplora_client::TxStatus; +use esplora_client::{OutputStatus, Tx, TxStatus}; -use crate::anchor_from_status; +use crate::{insert_anchor_from_status, insert_prevouts}; /// [`esplora_client::Error`] pub type Error = Box; /// Trait to extend the functionality of [`esplora_client::BlockingClient`]. /// -/// Refer to [crate-level documentation] for more. -/// -/// [crate-level documentation]: crate +/// Refer to [crate-level documentation](crate) for more. pub trait EsploraExt { /// Scan keychain scripts for transactions against Esplora, returning an update that can be /// applied to the receiving structures. /// - /// - `request`: struct with data required to perform a spk-based blockchain client full scan, - /// see [`FullScanRequest`] - /// - /// The full scan for each keychain stops after a gap of `stop_gap` script pubkeys with no - /// associated transactions. `parallel_requests` specifies the max number of HTTP requests to - /// make in parallel. - /// - /// ## Note - /// - /// `stop_gap` is defined as "the maximum number of consecutive unused addresses". - /// For example, with a `stop_gap` of 3, `full_scan` will keep scanning - /// until it encounters 3 consecutive script pubkeys with no associated transactions. + /// `request` provides the data required to perform a script-pubkey-based full scan + /// (see [`FullScanRequest`]). The full scan for each keychain (`K`) stops after a gap of + /// `stop_gap` script pubkeys with no associated transactions. `parallel_requests` specifies + /// the maximum number of HTTP requests to make in parallel. /// - /// This follows the same approach as other Bitcoin-related software, - /// such as [Electrum](https://electrum.readthedocs.io/en/latest/faq.html#what-is-the-gap-limit), - /// [BTCPay Server](https://docs.btcpayserver.org/FAQ/Wallet/#the-gap-limit-problem), - /// and [Sparrow](https://www.sparrowwallet.com/docs/faq.html#ive-restored-my-wallet-but-some-of-my-funds-are-missing). - /// - /// A `stop_gap` of 0 will be treated as a `stop_gap` of 1. - fn full_scan( + /// Refer to [crate-level docs](crate) for more. + fn full_scan>>( &self, - request: FullScanRequest, + request: R, stop_gap: usize, parallel_requests: usize, ) -> Result, Error>; - /// Sync a set of scripts with the blockchain (via an Esplora client) for the data - /// specified and return a [`TxGraph`]. - /// - /// - `request`: struct with data required to perform a spk-based blockchain client sync, see - /// [`SyncRequest`] + /// Sync a set of scripts, txids, and/or outpoints against Esplora. /// - /// If the scripts to sync are unknown, such as when restoring or importing a keychain that - /// may include scripts that have been used, use [`full_scan`] with the keychain. + /// `request` provides the data required to perform a script-pubkey-based sync (see + /// [`SyncRequest`]). `parallel_requests` specifies the maximum number of HTTP requests to make + /// in parallel. /// - /// [`full_scan`]: EsploraExt::full_scan - fn sync(&self, request: SyncRequest, parallel_requests: usize) -> Result; + /// Refer to [crate-level docs](crate) for more. + fn sync>>( + &self, + request: R, + parallel_requests: usize, + ) -> Result; } impl EsploraExt for esplora_client::BlockingClient { - fn full_scan( + fn full_scan>>( &self, - request: FullScanRequest, + request: R, stop_gap: usize, parallel_requests: usize, ) -> Result, Error> { - let latest_blocks = fetch_latest_blocks(self)?; - let (graph_update, last_active_indices) = full_scan_for_index_and_graph_blocking( - self, - request.spks_by_keychain, - stop_gap, - parallel_requests, - )?; - let chain_update = chain_update( - self, - &latest_blocks, - &request.chain_tip, - graph_update.all_anchors(), - )?; + let mut request = request.into(); + + let chain_tip = request.chain_tip(); + let latest_blocks = if chain_tip.is_some() { + Some(fetch_latest_blocks(self)?) + } else { + None + }; + + let mut graph_update = TxGraph::default(); + let mut last_active_indices = BTreeMap::::new(); + for keychain in request.keychains() { + let keychain_spks = request.iter_spks(keychain.clone()); + let (tx_graph, last_active_index) = + fetch_txs_with_keychain_spks(self, keychain_spks, stop_gap, parallel_requests)?; + let _ = graph_update.apply_update(tx_graph); + if let Some(last_active_index) = last_active_index { + last_active_indices.insert(keychain, last_active_index); + } + } + + let chain_update = match (chain_tip, latest_blocks) { + (Some(chain_tip), Some(latest_blocks)) => Some(chain_update( + self, + &latest_blocks, + &chain_tip, + graph_update.all_anchors(), + )?), + _ => None, + }; + Ok(FullScanResult { chain_update, graph_update, @@ -91,21 +95,47 @@ impl EsploraExt for esplora_client::BlockingClient { }) } - fn sync(&self, request: SyncRequest, parallel_requests: usize) -> Result { - let latest_blocks = fetch_latest_blocks(self)?; - let graph_update = sync_for_index_and_graph_blocking( + fn sync>>( + &self, + request: R, + parallel_requests: usize, + ) -> Result { + let mut request: SyncRequest = request.into(); + + let chain_tip = request.chain_tip(); + let latest_blocks = if chain_tip.is_some() { + Some(fetch_latest_blocks(self)?) + } else { + None + }; + + let mut graph_update = TxGraph::default(); + let _ = graph_update.apply_update(fetch_txs_with_spks( self, - request.spks, - request.txids, - request.outpoints, + request.iter_spks(), parallel_requests, - )?; - let chain_update = chain_update( + )?); + let _ = graph_update.apply_update(fetch_txs_with_txids( self, - &latest_blocks, - &request.chain_tip, - graph_update.all_anchors(), - )?; + request.iter_txids(), + parallel_requests, + )?); + let _ = graph_update.apply_update(fetch_txs_with_outpoints( + self, + request.iter_outpoints(), + parallel_requests, + )?); + + let chain_update = match (chain_tip, latest_blocks) { + (Some(chain_tip), Some(latest_blocks)) => Some(chain_update( + self, + &latest_blocks, + &chain_tip, + graph_update.all_anchors(), + )?), + _ => None, + }; + Ok(SyncResult { chain_update, graph_update, @@ -212,184 +242,221 @@ fn chain_update( Ok(tip) } -/// This performs a full scan to get an update for the [`TxGraph`] and -/// [`KeychainTxOutIndex`](bdk_chain::indexer::keychain_txout::KeychainTxOutIndex). -fn full_scan_for_index_and_graph_blocking( +fn fetch_txs_with_keychain_spks>>( client: &esplora_client::BlockingClient, - keychain_spks: BTreeMap>>, + mut keychain_spks: I, stop_gap: usize, parallel_requests: usize, -) -> Result<(TxGraph, BTreeMap), Error> { +) -> Result<(TxGraph, Option), Error> { type TxsOfSpkIndex = (u32, Vec); - let parallel_requests = Ord::max(parallel_requests, 1); - let mut tx_graph = TxGraph::::default(); - let mut last_active_indices = BTreeMap::::new(); - - for (keychain, spks) in keychain_spks { - let mut spks = spks.into_iter(); - let mut last_index = Option::::None; - let mut last_active_index = Option::::None; - - loop { - let handles = spks - .by_ref() - .take(parallel_requests) - .map(|(spk_index, spk)| { - std::thread::spawn({ - let client = client.clone(); - move || -> Result { - let mut last_seen = None; - let mut spk_txs = Vec::new(); - loop { - let txs = client.scripthash_txs(&spk, last_seen)?; - let tx_count = txs.len(); - last_seen = txs.last().map(|tx| tx.txid); - spk_txs.extend(txs); - if tx_count < 25 { - break Ok((spk_index, spk_txs)); - } - } - } - }) - }) - .collect::>>>(); - if handles.is_empty() { - break; - } + let mut tx_graph = TxGraph::default(); + let mut last_index = Option::::None; + let mut last_active_index = Option::::None; - for handle in handles { - let (index, txs) = handle.join().expect("thread must not panic")?; - last_index = Some(index); - if !txs.is_empty() { - last_active_index = Some(index); - } - for tx in txs { - let _ = tx_graph.insert_tx(tx.to_tx()); - if let Some(anchor) = anchor_from_status(&tx.status) { - let _ = tx_graph.insert_anchor(tx.txid, anchor); + loop { + let handles = keychain_spks + .by_ref() + .take(parallel_requests) + .map(|(spk_index, spk)| { + std::thread::spawn({ + let client = client.clone(); + move || -> Result { + let mut last_seen = None; + let mut spk_txs = Vec::new(); + loop { + let txs = client.scripthash_txs(&spk, last_seen)?; + let tx_count = txs.len(); + last_seen = txs.last().map(|tx| tx.txid); + spk_txs.extend(txs); + if tx_count < 25 { + break Ok((spk_index, spk_txs)); + } + } } + }) + }) + .collect::>>>(); - let previous_outputs = tx.vin.iter().filter_map(|vin| { - let prevout = vin.prevout.as_ref()?; - Some(( - OutPoint { - txid: vin.txid, - vout: vin.vout, - }, - TxOut { - script_pubkey: prevout.scriptpubkey.clone(), - value: Amount::from_sat(prevout.value), - }, - )) - }); + if handles.is_empty() { + break; + } - for (outpoint, txout) in previous_outputs { - let _ = tx_graph.insert_txout(outpoint, txout); - } - } + for handle in handles { + let (index, txs) = handle.join().expect("thread must not panic")?; + last_index = Some(index); + if !txs.is_empty() { + last_active_index = Some(index); } - - let last_index = last_index.expect("Must be set since handles wasn't empty."); - let gap_limit_reached = if let Some(i) = last_active_index { - last_index >= i.saturating_add(stop_gap as u32) - } else { - last_index + 1 >= stop_gap as u32 - }; - if gap_limit_reached { - break; + for tx in txs { + let _ = tx_graph.insert_tx(tx.to_tx()); + insert_anchor_from_status(&mut tx_graph, tx.txid, tx.status); + insert_prevouts(&mut tx_graph, tx.vin); } } - if let Some(last_active_index) = last_active_index { - last_active_indices.insert(keychain, last_active_index); + let last_index = last_index.expect("Must be set since handles wasn't empty."); + let gap_limit_reached = if let Some(i) = last_active_index { + last_index >= i.saturating_add(stop_gap as u32) + } else { + last_index + 1 >= stop_gap as u32 + }; + if gap_limit_reached { + break; } } - Ok((tx_graph, last_active_indices)) + Ok((tx_graph, last_active_index)) } -fn sync_for_index_and_graph_blocking( +/// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning `spks` +/// against Esplora. +/// +/// Unlike with [`EsploraExt::fetch_txs_with_keychain_spks`], `spks` must be *bounded* as all +/// contained scripts will be scanned. `parallel_requests` specifies the maximum number of HTTP +/// requests to make in parallel. +/// +/// Refer to [crate-level docs](crate) for more. +fn fetch_txs_with_spks>( client: &esplora_client::BlockingClient, - misc_spks: impl IntoIterator, - txids: impl IntoIterator, - outpoints: impl IntoIterator, + spks: I, parallel_requests: usize, ) -> Result, Error> { - let (mut tx_graph, _) = full_scan_for_index_and_graph_blocking( + fetch_txs_with_keychain_spks( client, - { - let mut keychains = BTreeMap::new(); - keychains.insert( - (), - misc_spks - .into_iter() - .enumerate() - .map(|(i, spk)| (i as u32, spk)), - ); - keychains - }, + spks.into_iter().enumerate().map(|(i, spk)| (i as u32, spk)), usize::MAX, parallel_requests, - )?; + ) + .map(|(tx_graph, _)| tx_graph) +} +/// Fetch transactions and associated [`ConfirmationBlockTime`]s by scanning `txids` +/// against Esplora. +/// +/// `parallel_requests` specifies the maximum number of HTTP requests to make in parallel. +/// +/// Refer to [crate-level docs](crate) for more. +fn fetch_txs_with_txids>( + client: &esplora_client::BlockingClient, + txids: I, + parallel_requests: usize, +) -> Result, Error> { + enum EsploraResp { + TxStatus(TxStatus), + Tx(Option), + } + + let mut tx_graph = TxGraph::default(); let mut txids = txids.into_iter(); loop { let handles = txids .by_ref() .take(parallel_requests) - .filter(|&txid| tx_graph.get_tx(txid).is_none()) .map(|txid| { - std::thread::spawn({ - let client = client.clone(); - move || { + let client = client.clone(); + let tx_already_exists = tx_graph.get_tx(txid).is_some(); + std::thread::spawn(move || { + if tx_already_exists { client .get_tx_status(&txid) .map_err(Box::new) - .map(|s| (txid, s)) + .map(|s| (txid, EsploraResp::TxStatus(s))) + } else { + client + .get_tx_info(&txid) + .map_err(Box::new) + .map(|t| (txid, EsploraResp::Tx(t))) } }) }) - .collect::>>>(); + .collect::>>>(); if handles.is_empty() { break; } for handle in handles { - let (txid, status) = handle.join().expect("thread must not panic")?; - if let Some(anchor) = anchor_from_status(&status) { - let _ = tx_graph.insert_anchor(txid, anchor); + let (txid, resp) = handle.join().expect("thread must not panic")?; + match resp { + EsploraResp::TxStatus(status) => { + insert_anchor_from_status(&mut tx_graph, txid, status); + } + EsploraResp::Tx(Some(tx_info)) => { + let _ = tx_graph.insert_tx(tx_info.to_tx()); + insert_anchor_from_status(&mut tx_graph, txid, tx_info.status); + insert_prevouts(&mut tx_graph, tx_info.vin); + } + _ => continue, } } } + Ok(tx_graph) +} - for op in outpoints { - if tx_graph.get_tx(op.txid).is_none() { - if let Some(tx) = client.get_tx(&op.txid)? { - let _ = tx_graph.insert_tx(tx); - } - let status = client.get_tx_status(&op.txid)?; - if let Some(anchor) = anchor_from_status(&status) { - let _ = tx_graph.insert_anchor(op.txid, anchor); - } +/// Fetch transactions and [`ConfirmationBlockTime`]s that contain and spend the provided +/// `outpoints`. +/// +/// `parallel_requests` specifies the maximum number of HTTP requests to make in parallel. +/// +/// Refer to [crate-level docs](crate) for more. +fn fetch_txs_with_outpoints>( + client: &esplora_client::BlockingClient, + outpoints: I, + parallel_requests: usize, +) -> Result, Error> { + let outpoints = outpoints.into_iter().collect::>(); + + // make sure txs exists in graph and tx statuses are updated + // TODO: We should maintain a tx cache (like we do with Electrum). + let mut tx_graph = fetch_txs_with_txids( + client, + outpoints.iter().map(|op| op.txid), + parallel_requests, + )?; + + // get outpoint spend-statuses + let mut outpoints = outpoints.into_iter(); + let mut missing_txs = Vec::::with_capacity(outpoints.len()); + loop { + let handles = outpoints + .by_ref() + .take(parallel_requests) + .map(|op| { + let client = client.clone(); + std::thread::spawn(move || { + client + .get_output_status(&op.txid, op.vout as _) + .map_err(Box::new) + }) + }) + .collect::, Error>>>>(); + + if handles.is_empty() { + break; } - if let Some(op_status) = client.get_output_status(&op.txid, op.vout as _)? { - if let Some(txid) = op_status.txid { - if tx_graph.get_tx(txid).is_none() { - if let Some(tx) = client.get_tx(&txid)? { - let _ = tx_graph.insert_tx(tx); - } - let status = client.get_tx_status(&txid)?; - if let Some(anchor) = anchor_from_status(&status) { - let _ = tx_graph.insert_anchor(txid, anchor); - } + for handle in handles { + if let Some(op_status) = handle.join().expect("thread must not panic")? { + let spend_txid = match op_status.txid { + Some(txid) => txid, + None => continue, + }; + if tx_graph.get_tx(spend_txid).is_none() { + missing_txs.push(spend_txid); + } + if let Some(spend_status) = op_status.status { + insert_anchor_from_status(&mut tx_graph, spend_txid, spend_status); } } } } + let _ = tx_graph.apply_update(fetch_txs_with_txids( + client, + missing_txs, + parallel_requests, + )?); Ok(tx_graph) } diff --git a/crates/esplora/src/lib.rs b/crates/esplora/src/lib.rs index 718d3cf9c..7db6967b6 100644 --- a/crates/esplora/src/lib.rs +++ b/crates/esplora/src/lib.rs @@ -1,22 +1,32 @@ #![doc = include_str!("../README.md")] - -//! This crate is used for updating structures of [`bdk_chain`] with data from an Esplora server. +//! # Stop Gap +//! +//! [`EsploraExt::full_scan`] takes in a `stop_gap` input which is defined as the maximum number of +//! consecutive unused script pubkeys to scan transactions for before stopping. +//! +//! For example, with a `stop_gap` of 3, `full_scan` will keep scanning until it encounters 3 +//! consecutive script pubkeys with no associated transactions. +//! +//! This follows the same approach as other Bitcoin-related software, +//! such as [Electrum](https://electrum.readthedocs.io/en/latest/faq.html#what-is-the-gap-limit), +//! [BTCPay Server](https://docs.btcpayserver.org/FAQ/Wallet/#the-gap-limit-problem), +//! and [Sparrow](https://www.sparrowwallet.com/docs/faq.html#ive-restored-my-wallet-but-some-of-my-funds-are-missing). //! -//! The two primary methods are [`EsploraExt::sync`] and [`EsploraExt::full_scan`]. In most cases -//! [`EsploraExt::sync`] is used to sync the transaction histories of scripts that the application -//! cares about, for example the scripts for all the receive addresses of a Wallet's keychain that it -//! has shown a user. [`EsploraExt::full_scan`] is meant to be used when importing or restoring a -//! keychain where the range of possibly used scripts is not known. In this case it is necessary to -//! scan all keychain scripts until a number (the "stop gap") of unused scripts is discovered. For a -//! sync or full scan the user receives relevant blockchain data and output updates for [`bdk_chain`] -//! via a new [`TxGraph`] to be appended to any existing [`TxGraph`] data. +//! A `stop_gap` of 0 will be treated as a `stop_gap` of 1. //! -//! Refer to [`example_esplora`] for a complete example. +//! # Async +//! +//! Just like how [`EsploraExt`] extends the functionality of an +//! [`esplora_client::BlockingClient`], [`EsploraAsyncExt`] is the async version which extends +//! [`esplora_client::AsyncClient`]. //! //! [`TxGraph`]: bdk_chain::tx_graph::TxGraph +//! [`LocalChain`]: bdk_chain::local_chain::LocalChain +//! [`ChainOracle`]: bdk_chain::ChainOracle //! [`example_esplora`]: https://github.com/bitcoindevkit/bdk/tree/master/example-crates/example_esplora -use bdk_chain::{BlockId, ConfirmationBlockTime}; +use bdk_chain::bitcoin::{Amount, OutPoint, TxOut, Txid}; +use bdk_chain::{BlockId, ConfirmationBlockTime, TxGraph}; use esplora_client::TxStatus; pub use esplora_client; @@ -31,19 +41,42 @@ mod async_ext; #[cfg(feature = "async")] pub use async_ext::*; -fn anchor_from_status(status: &TxStatus) -> Option { +fn insert_anchor_from_status( + tx_graph: &mut TxGraph, + txid: Txid, + status: TxStatus, +) { if let TxStatus { block_height: Some(height), block_hash: Some(hash), block_time: Some(time), .. - } = status.clone() + } = status { - Some(ConfirmationBlockTime { + let anchor = ConfirmationBlockTime { block_id: BlockId { height, hash }, confirmation_time: time, - }) - } else { - None + }; + let _ = tx_graph.insert_anchor(txid, anchor); + } +} + +/// Inserts floating txouts into `tx_graph` using [`Vin`](esplora_client::api::Vin)s returned by +/// Esplora. +fn insert_prevouts( + tx_graph: &mut TxGraph, + esplora_inputs: impl IntoIterator, +) { + let prevouts = esplora_inputs + .into_iter() + .filter_map(|vin| Some((vin.txid, vin.vout, vin.prevout?))); + for (prev_txid, prev_vout, prev_txout) in prevouts { + let _ = tx_graph.insert_txout( + OutPoint::new(prev_txid, prev_vout), + TxOut { + script_pubkey: prev_txout.scriptpubkey, + value: Amount::from_sat(prev_txout.value), + }, + ); } } diff --git a/crates/esplora/tests/async_ext.rs b/crates/esplora/tests/async_ext.rs index 2258c9d60..70d464194 100644 --- a/crates/esplora/tests/async_ext.rs +++ b/crates/esplora/tests/async_ext.rs @@ -55,7 +55,9 @@ pub async fn test_update_tx_graph_without_keychain() -> anyhow::Result<()> { let cp_tip = env.make_checkpoint_tip(); let sync_update = { - let request = SyncRequest::from_chain_tip(cp_tip.clone()).set_spks(misc_spks); + let request = SyncRequest::builder() + .chain_tip(cp_tip.clone()) + .spks(misc_spks); client.sync(request, 1).await? }; @@ -160,15 +162,17 @@ pub async fn test_async_update_tx_graph_stop_gap() -> anyhow::Result<()> { // A scan with a gap limit of 3 won't find the transaction, but a scan with a gap limit of 4 // will. let full_scan_update = { - let request = - FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone()); + let request = FullScanRequest::builder() + .chain_tip(cp_tip.clone()) + .spks_for_keychain(0, spks.clone()); client.full_scan(request, 3, 1).await? }; assert!(full_scan_update.graph_update.full_txs().next().is_none()); assert!(full_scan_update.last_active_indices.is_empty()); let full_scan_update = { - let request = - FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone()); + let request = FullScanRequest::builder() + .chain_tip(cp_tip.clone()) + .spks_for_keychain(0, spks.clone()); client.full_scan(request, 4, 1).await? }; assert_eq!( @@ -201,8 +205,9 @@ pub async fn test_async_update_tx_graph_stop_gap() -> anyhow::Result<()> { // A scan with gap limit 5 won't find the second transaction, but a scan with gap limit 6 will. // The last active indice won't be updated in the first case but will in the second one. let full_scan_update = { - let request = - FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone()); + let request = FullScanRequest::builder() + .chain_tip(cp_tip.clone()) + .spks_for_keychain(0, spks.clone()); client.full_scan(request, 5, 1).await? }; let txs: HashSet<_> = full_scan_update @@ -214,8 +219,9 @@ pub async fn test_async_update_tx_graph_stop_gap() -> anyhow::Result<()> { assert!(txs.contains(&txid_4th_addr)); assert_eq!(full_scan_update.last_active_indices[&0], 3); let full_scan_update = { - let request = - FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone()); + let request = FullScanRequest::builder() + .chain_tip(cp_tip.clone()) + .spks_for_keychain(0, spks.clone()); client.full_scan(request, 6, 1).await? }; let txs: HashSet<_> = full_scan_update diff --git a/crates/esplora/tests/blocking_ext.rs b/crates/esplora/tests/blocking_ext.rs index 2e363f4e6..818f1f5fb 100644 --- a/crates/esplora/tests/blocking_ext.rs +++ b/crates/esplora/tests/blocking_ext.rs @@ -55,7 +55,9 @@ pub fn test_update_tx_graph_without_keychain() -> anyhow::Result<()> { let cp_tip = env.make_checkpoint_tip(); let sync_update = { - let request = SyncRequest::from_chain_tip(cp_tip.clone()).set_spks(misc_spks); + let request = SyncRequest::builder() + .chain_tip(cp_tip.clone()) + .spks(misc_spks); client.sync(request, 1)? }; @@ -161,15 +163,17 @@ pub fn test_update_tx_graph_stop_gap() -> anyhow::Result<()> { // A scan with a stop_gap of 3 won't find the transaction, but a scan with a gap limit of 4 // will. let full_scan_update = { - let request = - FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone()); + let request = FullScanRequest::builder() + .chain_tip(cp_tip.clone()) + .spks_for_keychain(0, spks.clone()); client.full_scan(request, 3, 1)? }; assert!(full_scan_update.graph_update.full_txs().next().is_none()); assert!(full_scan_update.last_active_indices.is_empty()); let full_scan_update = { - let request = - FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone()); + let request = FullScanRequest::builder() + .chain_tip(cp_tip.clone()) + .spks_for_keychain(0, spks.clone()); client.full_scan(request, 4, 1)? }; assert_eq!( @@ -202,8 +206,9 @@ pub fn test_update_tx_graph_stop_gap() -> anyhow::Result<()> { // A scan with gap limit 5 won't find the second transaction, but a scan with gap limit 6 will. // The last active indice won't be updated in the first case but will in the second one. let full_scan_update = { - let request = - FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone()); + let request = FullScanRequest::builder() + .chain_tip(cp_tip.clone()) + .spks_for_keychain(0, spks.clone()); client.full_scan(request, 5, 1)? }; let txs: HashSet<_> = full_scan_update @@ -215,8 +220,9 @@ pub fn test_update_tx_graph_stop_gap() -> anyhow::Result<()> { assert!(txs.contains(&txid_4th_addr)); assert_eq!(full_scan_update.last_active_indices[&0], 3); let full_scan_update = { - let request = - FullScanRequest::from_chain_tip(cp_tip.clone()).set_spks_for_keychain(0, spks.clone()); + let request = FullScanRequest::builder() + .chain_tip(cp_tip.clone()) + .spks_for_keychain(0, spks.clone()); client.full_scan(request, 6, 1)? }; let txs: HashSet<_> = full_scan_update diff --git a/crates/wallet/README.md b/crates/wallet/README.md index 4bcd7e8e6..3b5422b63 100644 --- a/crates/wallet/README.md +++ b/crates/wallet/README.md @@ -79,10 +79,10 @@ let network = Network::Testnet; let descriptor = "wpkh(tprv8ZgxMBicQKsPdcAqYBpzAFwU5yxBUo88ggoBqu1qPcHUfSbKK1sKMLmC7EAk438btHQrSdu3jGGQa6PA71nvH5nkDexhLteJqkM4dQmWF9g/84'/1'/0'/0/*)"; let change_descriptor = "wpkh(tprv8ZgxMBicQKsPdcAqYBpzAFwU5yxBUo88ggoBqu1qPcHUfSbKK1sKMLmC7EAk438btHQrSdu3jGGQa6PA71nvH5nkDexhLteJqkM4dQmWF9g/84'/1'/0'/1/*)"; let wallet_opt = Wallet::load() - .network(network) .descriptor(KeychainKind::External, Some(descriptor)) .descriptor(KeychainKind::Internal, Some(change_descriptor)) .extract_keys() + .check_network(network) .load_wallet(&mut db) .expect("wallet"); let mut wallet = match wallet_opt { diff --git a/crates/wallet/src/wallet/changeset.rs b/crates/wallet/src/wallet/changeset.rs index 5f3b9b3dc..2d4b700ed 100644 --- a/crates/wallet/src/wallet/changeset.rs +++ b/crates/wallet/src/wallet/changeset.rs @@ -72,10 +72,8 @@ impl ChangeSet { /// Name of table to store wallet descriptors and network. pub const WALLET_TABLE_NAME: &'static str = "bdk_wallet"; - /// Initialize sqlite tables for wallet schema & table. - fn init_wallet_sqlite_tables( - db_tx: &chain::rusqlite::Transaction, - ) -> chain::rusqlite::Result<()> { + /// Initialize sqlite tables for wallet tables. + pub fn init_sqlite_tables(db_tx: &chain::rusqlite::Transaction) -> chain::rusqlite::Result<()> { let schema_v0: &[&str] = &[&format!( "CREATE TABLE {} ( \ id INTEGER PRIMARY KEY NOT NULL CHECK (id = 0), \ @@ -85,12 +83,17 @@ impl ChangeSet { ) STRICT;", Self::WALLET_TABLE_NAME, )]; - crate::rusqlite_impl::migrate_schema(db_tx, Self::WALLET_SCHEMA_NAME, &[schema_v0]) + crate::rusqlite_impl::migrate_schema(db_tx, Self::WALLET_SCHEMA_NAME, &[schema_v0])?; + + bdk_chain::local_chain::ChangeSet::init_sqlite_tables(db_tx)?; + bdk_chain::tx_graph::ChangeSet::::init_sqlite_tables(db_tx)?; + bdk_chain::keychain_txout::ChangeSet::init_sqlite_tables(db_tx)?; + + Ok(()) } /// Recover a [`ChangeSet`] from sqlite database. pub fn from_sqlite(db_tx: &chain::rusqlite::Transaction) -> chain::rusqlite::Result { - Self::init_wallet_sqlite_tables(db_tx)?; use chain::rusqlite::OptionalExtension; use chain::Impl; @@ -129,7 +132,6 @@ impl ChangeSet { &self, db_tx: &chain::rusqlite::Transaction, ) -> chain::rusqlite::Result<()> { - Self::init_wallet_sqlite_tables(db_tx)?; use chain::rusqlite::named_params; use chain::Impl; diff --git a/crates/wallet/src/wallet/mod.rs b/crates/wallet/src/wallet/mod.rs index 4303ad298..4cd721ba1 100644 --- a/crates/wallet/src/wallet/mod.rs +++ b/crates/wallet/src/wallet/mod.rs @@ -29,7 +29,10 @@ use bdk_chain::{ local_chain::{ self, ApplyHeaderError, CannotConnectError, CheckPoint, CheckPointIter, LocalChain, }, - spk_client::{FullScanRequest, FullScanResult, SyncRequest, SyncResult}, + spk_client::{ + FullScanRequest, FullScanRequestBuilder, FullScanResult, SyncRequest, SyncRequestBuilder, + SyncResult, + }, tx_graph::{CanonicalTx, TxGraph, TxNode}, BlockId, ChainPosition, ConfirmationBlockTime, ConfirmationTime, DescriptorExt, FullTxOut, Indexed, IndexedTxGraph, Merge, @@ -42,7 +45,6 @@ use bitcoin::{ use bitcoin::{consensus::encode::serialize, transaction, BlockHash, Psbt}; use bitcoin::{constants::genesis_block, Amount}; use bitcoin::{secp256k1::Secp256k1, Weight}; -use chain::Staged; use core::fmt; use core::mem; use core::ops::Deref; @@ -120,14 +122,6 @@ pub struct Wallet { secp: SecpCtx, } -impl Staged for Wallet { - type ChangeSet = ChangeSet; - - fn staged(&mut self) -> &mut Self::ChangeSet { - &mut self.stage - } -} - /// An update to [`Wallet`]. /// /// It updates [`KeychainTxOutIndex`], [`bdk_chain::TxGraph`] and [`local_chain::LocalChain`] atomically. @@ -151,7 +145,7 @@ impl From> for Update { Self { last_active_indices: value.last_active_indices, graph: value.graph_update, - chain: Some(value.chain_update), + chain: value.chain_update, } } } @@ -161,7 +155,7 @@ impl From for Update { Self { last_active_indices: BTreeMap::new(), graph: value.graph_update, - chain: Some(value.chain_update), + chain: value.chain_update, } } } @@ -479,7 +473,7 @@ impl Wallet { /// .keymap(KeychainKind::External, external_keymap) /// .keymap(KeychainKind::Internal, internal_keymap) /// // ensure loaded wallet's genesis hash matches this value - /// .genesis_hash(genesis_hash) + /// .check_genesis_hash(genesis_hash) /// // set a lookahead for our indexer /// .lookahead(101) /// .load_wallet(&mut conn)? @@ -2300,7 +2294,7 @@ impl Wallet { Ok(()) } - /// Get a reference of the staged [`ChangeSet`] that are yet to be committed (if any). + /// Get a reference of the staged [`ChangeSet`] that is yet to be committed (if any). pub fn staged(&self) -> Option<&ChangeSet> { if self.stage.is_empty() { None @@ -2309,6 +2303,15 @@ impl Wallet { } } + /// Get a mutable reference of the staged [`ChangeSet`] that is yet to be commited (if any). + pub fn staged_mut(&mut self) -> Option<&mut ChangeSet> { + if self.stage.is_empty() { + None + } else { + Some(&mut self.stage) + } + } + /// Take the staged [`ChangeSet`] to be persisted now (if any). pub fn take_staged(&mut self) -> Option { self.stage.take() @@ -2437,9 +2440,10 @@ impl Wallet { /// This is the first step when performing a spk-based wallet partial sync, the returned /// [`SyncRequest`] collects all revealed script pubkeys from the wallet keychain needed to /// start a blockchain sync with a spk based blockchain client. - pub fn start_sync_with_revealed_spks(&self) -> SyncRequest { - SyncRequest::from_chain_tip(self.chain.tip()) - .populate_with_revealed_spks(&self.indexed_graph.index, ..) + pub fn start_sync_with_revealed_spks(&self) -> SyncRequestBuilder<(KeychainKind, u32)> { + SyncRequest::builder() + .chain_tip(self.chain.tip()) + .revealed_spks_from_indexer(&self.indexed_graph.index, ..) } /// Create a [`FullScanRequest] for this wallet. @@ -2450,8 +2454,10 @@ impl Wallet { /// /// This operation is generally only used when importing or restoring a previously used wallet /// in which the list of used scripts is not known. - pub fn start_full_scan(&self) -> FullScanRequest { - FullScanRequest::from_keychain_txout_index(self.chain.tip(), &self.indexed_graph.index) + pub fn start_full_scan(&self) -> FullScanRequestBuilder { + FullScanRequest::builder() + .chain_tip(self.chain.tip()) + .spks_from_indexer(&self.indexed_graph.index) } } diff --git a/crates/wallet/src/wallet/params.rs b/crates/wallet/src/wallet/params.rs index 134218589..46ae67245 100644 --- a/crates/wallet/src/wallet/params.rs +++ b/crates/wallet/src/wallet/params.rs @@ -1,12 +1,14 @@ use alloc::boxed::Box; -use bdk_chain::{keychain_txout::DEFAULT_LOOKAHEAD, PersistAsyncWith, PersistWith}; +use std::prelude::rust_2021::String; +use bdk_chain::keychain_txout::DEFAULT_LOOKAHEAD; use bitcoin::{BlockHash, Network}; use miniscript::descriptor::KeyMap; use crate::{ descriptor::{DescriptorError, ExtendedDescriptor, IntoWalletDescriptor}, utils::SecpCtx, - KeychainKind, Wallet, + AsyncWalletPersister, CreateWithPersistError, KeychainKind, LoadWithPersistError, Wallet, + WalletPersister, }; use super::{ChangeSet, LoadError, PersistedWallet}; @@ -108,26 +110,27 @@ impl CreateParams { self } - /// Create [`PersistedWallet`] with the given `Db`. - pub fn create_wallet( + /// Create [`PersistedWallet`] with the given [`WalletPersister`]. + pub fn create_wallet

( self, - db: &mut Db, - ) -> Result>::CreateError> + persister: &mut P, + ) -> Result, CreateWithPersistError> where - Wallet: PersistWith, + P: WalletPersister, { - PersistedWallet::create(db, self) + PersistedWallet::create(persister, self) } - /// Create [`PersistedWallet`] with the given async `Db`. - pub async fn create_wallet_async( + /// Create [`PersistedWallet`] with the given [`AsyncWalletPersister`]. + pub async fn create_wallet_async

( self, - db: &mut Db, - ) -> Result>::CreateError> + persister: &mut P, + wallet_name: String, + ) -> Result, CreateWithPersistError> where - Wallet: PersistAsyncWith, + P: AsyncWalletPersister, { - PersistedWallet::create_async(db, self).await + PersistedWallet::create_async(persister, self, wallet_name).await } /// Create [`Wallet`] without persistence. @@ -194,14 +197,14 @@ impl LoadParams { self } - /// Check for `network`. - pub fn network(mut self, network: Network) -> Self { + /// Checks that the given network matches the one loaded from persistence. + pub fn check_network(mut self, network: Network) -> Self { self.check_network = Some(network); self } - /// Check for a `genesis_hash`. - pub fn genesis_hash(mut self, genesis_hash: BlockHash) -> Self { + /// Checks that the given `genesis_hash` matches the one loaded from persistence. + pub fn check_genesis_hash(mut self, genesis_hash: BlockHash) -> Self { self.check_genesis_hash = Some(genesis_hash); self } @@ -219,26 +222,27 @@ impl LoadParams { self } - /// Load [`PersistedWallet`] with the given `Db`. - pub fn load_wallet( + /// Load [`PersistedWallet`] with the given [`WalletPersister`]. + pub fn load_wallet

( self, - db: &mut Db, - ) -> Result, >::LoadError> + persister: &mut P, + ) -> Result>, LoadWithPersistError> where - Wallet: PersistWith, + P: WalletPersister, { - PersistedWallet::load(db, self) + PersistedWallet::load(persister, self) } - /// Load [`PersistedWallet`] with the given async `Db`. - pub async fn load_wallet_async( + /// Load [`PersistedWallet`] with the given [`AsyncWalletPersister`]. + pub async fn load_wallet_async

( self, - db: &mut Db, - ) -> Result, >::LoadError> + persister: &mut P, + wallet_name:String, + ) -> Result>, LoadWithPersistError> where - Wallet: PersistAsyncWith, + P: AsyncWalletPersister, { - PersistedWallet::load_async(db, self).await + PersistedWallet::load_async(persister, self, wallet_name).await } /// Load [`Wallet`] without persistence. diff --git a/crates/wallet/src/wallet/persisted.rs b/crates/wallet/src/wallet/persisted.rs index cc9f267f4..5fc2a4d76 100644 --- a/crates/wallet/src/wallet/persisted.rs +++ b/crates/wallet/src/wallet/persisted.rs @@ -1,130 +1,334 @@ -use core::fmt; +use core::{ + fmt, + future::Future, + marker::PhantomData, + ops::{Deref, DerefMut}, + pin::Pin, +}; -use crate::{descriptor::DescriptorError, Wallet}; +use alloc::boxed::Box; +use std::prelude::rust_2021::String; +use chain::Merge; -/// Represents a persisted wallet. -pub type PersistedWallet = bdk_chain::Persisted; +use crate::{descriptor::DescriptorError, ChangeSet, CreateParams, LoadParams, Wallet}; -#[cfg(feature = "rusqlite")] -impl<'c> chain::PersistWith> for Wallet { - type CreateParams = crate::CreateParams; - type LoadParams = crate::LoadParams; - - type CreateError = CreateWithPersistError; - type LoadError = LoadWithPersistError; - type PersistError = bdk_chain::rusqlite::Error; - - fn create( - db: &mut bdk_chain::rusqlite::Transaction<'c>, - params: Self::CreateParams, - ) -> Result { - let mut wallet = - Self::create_with_params(params).map_err(CreateWithPersistError::Descriptor)?; - if let Some(changeset) = wallet.take_staged() { - changeset - .persist_to_sqlite(db) +/// Trait that persists [`PersistedWallet`]. +/// +/// For an async version, use [`AsyncWalletPersister`]. +/// +/// Associated functions of this trait should not be called directly, and the trait is designed so +/// that associated functions are hard to find (since they are not methods!). [`WalletPersister`] is +/// used by [`PersistedWallet`] (a light wrapper around [`Wallet`]) which enforces some level of +/// safety. Refer to [`PersistedWallet`] for more about the safety checks. +pub trait WalletPersister { + /// Error type of the persister. + type Error; + + /// Initialize the `persister` and load all data. + /// + /// This is called by [`PersistedWallet::create`] and [`PersistedWallet::load`] to ensure + /// the [`WalletPersister`] is initialized and returns all data in the `persister`. + /// + /// # Implementation Details + /// + /// The database schema of the `persister` (if any), should be initialized and migrated here. + /// + /// The implementation must return all data currently stored in the `persister`. If there is no + /// data, return an empty changeset (using [`ChangeSet::default()`]). + /// + /// Error should only occur on database failure. Multiple calls to `initialize` should not + /// error. Calling `initialize` inbetween calls to `persist` should not error. + /// + /// Calling [`persist`] before the `persister` is `initialize`d may error. However, some + /// persister implementations may NOT require initialization at all (and not error). + /// + /// [`persist`]: WalletPersister::persist + fn initialize(persister: &mut Self) -> Result; + + /// Persist the given `changeset` to the `persister`. + /// + /// This method can fail if the `persister` is not [`initialize`]d. + /// + /// [`initialize`]: WalletPersister::initialize + fn persist(persister: &mut Self, changeset: &ChangeSet) -> Result<(), Self::Error>; +} + +type FutureResult<'a, T, E> = Pin> + Send + 'a>>; + +/// Async trait that persists [`PersistedWallet`]. +/// +/// For a blocking version, use [`WalletPersister`]. +/// +/// Associated functions of this trait should not be called directly, and the trait is designed so +/// that associated functions are hard to find (since they are not methods!). [`WalletPersister`] is +/// used by [`PersistedWallet`] (a light wrapper around [`Wallet`]) which enforces some level of +/// safety. Refer to [`PersistedWallet`] for more about the safety checks. +pub trait AsyncWalletPersister { + /// Error type of the persister. + type Error; + + /// Initialize the `persister` and load all data. + /// + /// This is called by [`PersistedWallet::create_async`] and [`PersistedWallet::load_async`] to + /// ensure the [`WalletPersister`] is initialized and returns all data in the `persister`. + /// + /// # Implementation Details + /// + /// The database schema of the `persister` (if any), should be initialized and migrated here. + /// + /// The implementation must return all data currently stored in the `persister`. If there is no + /// data, return an empty changeset (using [`ChangeSet::default()`]). + /// + /// Error should only occur on database failure. Multiple calls to `initialize` should not + /// error. Calling `initialize` inbetween calls to `persist` should not error. + /// + /// Calling [`persist`] before the `persister` is `initialize`d may error. However, some + /// persister implementations may NOT require initialization at all (and not error). + /// + /// [`persist`]: AsyncWalletPersister::persist + fn initialize<'a>(persister: &'a mut Self, wallet_name: String) -> FutureResult<'a, ChangeSet, Self::Error> + where + Self: 'a; + + /// Persist the given `changeset` to the `persister`. + /// + /// This method can fail if the `persister` is not [`initialize`]d. + /// + /// [`initialize`]: AsyncWalletPersister::initialize + fn persist<'a>( + persister: &'a mut Self, + changeset: &'a ChangeSet, + wallet_name:String, + ) -> FutureResult<'a, (), Self::Error> + where + Self: 'a; +} + +/// Represents a persisted wallet which persists into type `P`. +/// +/// This is a light wrapper around [`Wallet`] that enforces some level of safety-checking when used +/// with a [`WalletPersister`] or [`AsyncWalletPersister`] implementation. Safety checks assume that +/// [`WalletPersister`] and/or [`AsyncWalletPersister`] are implemented correctly. +/// +/// Checks include: +/// +/// * Ensure the persister is initialized before data is persisted. +/// * Ensure there were no previously persisted wallet data before creating a fresh wallet and +/// persisting it. +/// * Only clear the staged changes of [`Wallet`] after persisting succeeds. +/// * Ensure the wallet is persisted to the same `P` type as when created/loaded. Note that this is +/// not completely fool-proof as you can have multiple instances of the same `P` type that are +/// connected to different databases. +#[derive(Debug)] +pub struct PersistedWallet

{ + inner: Wallet, + marker: PhantomData

, +} + +impl

Deref for PersistedWallet

{ + type Target = Wallet; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl

DerefMut for PersistedWallet

{ + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +/// Methods when `P` is a [`WalletPersister`]. +impl PersistedWallet

{ + /// Create a new [`PersistedWallet`] with the given `persister` and `params`. + pub fn create( + persister: &mut P, + params: CreateParams, + ) -> Result> { + let existing = P::initialize(persister).map_err(CreateWithPersistError::Persist)?; + if !existing.is_empty() { + return Err(CreateWithPersistError::DataAlreadyExists(existing)); + } + let mut inner = + Wallet::create_with_params(params).map_err(CreateWithPersistError::Descriptor)?; + if let Some(changeset) = inner.take_staged() { + P::persist(persister, &changeset).map_err(CreateWithPersistError::Persist)?; + } + Ok(Self { + inner, + marker: PhantomData, + }) + } + + /// Load a previously [`PersistedWallet`] from the given `persister` and `params`. + pub fn load( + persister: &mut P, + params: LoadParams, + ) -> Result, LoadWithPersistError> { + let changeset = P::initialize(persister).map_err(LoadWithPersistError::Persist)?; + Wallet::load_with_params(changeset, params) + .map(|opt| { + opt.map(|inner| PersistedWallet { + inner, + marker: PhantomData, + }) + }) + .map_err(LoadWithPersistError::InvalidChangeSet) + } + + /// Persist staged changes of wallet into `persister`. + /// + /// Returns whether any new changes were persisted. + /// + /// If the `persister` errors, the staged changes will not be cleared. + pub fn persist(&mut self, persister: &mut P) -> Result { + match self.inner.staged_mut() { + Some(stage) => { + P::persist(persister, &*stage)?; + let _ = stage.take(); + Ok(true) + } + None => Ok(false), + } + } +} + +/// Methods when `P` is an [`AsyncWalletPersister`]. +impl PersistedWallet

{ + /// Create a new [`PersistedWallet`] with the given async `persister` and `params`. + pub async fn create_async( + persister: &mut P, + params: CreateParams, + wallet_name:String, + ) -> Result> { + let existing = P::initialize(persister, wallet_name.clone()) + .await + .map_err(CreateWithPersistError::Persist)?; + if !existing.is_empty() { + return Err(CreateWithPersistError::DataAlreadyExists(existing)); + } + let mut inner = + Wallet::create_with_params(params).map_err(CreateWithPersistError::Descriptor)?; + if let Some(changeset) = inner.take_staged() { + P::persist(persister, &changeset, wallet_name) + .await .map_err(CreateWithPersistError::Persist)?; } - Ok(wallet) + Ok(Self { + inner, + marker: PhantomData, + }) } - fn load( - conn: &mut bdk_chain::rusqlite::Transaction<'c>, - params: Self::LoadParams, - ) -> Result, Self::LoadError> { - let changeset = - crate::ChangeSet::from_sqlite(conn).map_err(LoadWithPersistError::Persist)?; - if chain::Merge::is_empty(&changeset) { - return Ok(None); + /// Load a previously [`PersistedWallet`] from the given async `persister` and `params`. + pub async fn load_async( + persister: &mut P, + params: LoadParams, + wallet_name: String, + ) -> Result, LoadWithPersistError> { + let changeset = P::initialize(persister, wallet_name) + .await + .map_err(LoadWithPersistError::Persist)?; + Wallet::load_with_params(changeset, params) + .map(|opt| { + opt.map(|inner| PersistedWallet { + inner, + marker: PhantomData, + }) + }) + .map_err(LoadWithPersistError::InvalidChangeSet) + } + + /// Persist staged changes of wallet into an async `persister`. + /// + /// Returns whether any new changes were persisted. + /// + /// If the `persister` errors, the staged changes will not be cleared. + pub async fn persist_async<'a>(&'a mut self, persister: &mut P, wallet_name:String) -> Result { + match self.inner.staged_mut() { + Some(stage) => { + P::persist(persister, &*stage, wallet_name).await?; + let _ = stage.take(); + Ok(true) + } + None => Ok(false), } - Self::load_with_params(changeset, params).map_err(LoadWithPersistError::InvalidChangeSet) } +} - fn persist( - db: &mut bdk_chain::rusqlite::Transaction<'c>, - changeset: &::ChangeSet, - ) -> Result<(), Self::PersistError> { - changeset.persist_to_sqlite(db) +#[cfg(feature = "rusqlite")] +impl<'c> WalletPersister for bdk_chain::rusqlite::Transaction<'c> { + type Error = bdk_chain::rusqlite::Error; + + fn initialize(persister: &mut Self) -> Result { + ChangeSet::init_sqlite_tables(&*persister)?; + ChangeSet::from_sqlite(persister) + } + + fn persist(persister: &mut Self, changeset: &ChangeSet) -> Result<(), Self::Error> { + changeset.persist_to_sqlite(persister) } } #[cfg(feature = "rusqlite")] -impl chain::PersistWith for Wallet { - type CreateParams = crate::CreateParams; - type LoadParams = crate::LoadParams; - - type CreateError = CreateWithPersistError; - type LoadError = LoadWithPersistError; - type PersistError = bdk_chain::rusqlite::Error; - - fn create( - db: &mut bdk_chain::rusqlite::Connection, - params: Self::CreateParams, - ) -> Result { - let mut db_tx = db.transaction().map_err(CreateWithPersistError::Persist)?; - let wallet = chain::PersistWith::create(&mut db_tx, params)?; - db_tx.commit().map_err(CreateWithPersistError::Persist)?; - Ok(wallet) - } - - fn load( - db: &mut bdk_chain::rusqlite::Connection, - params: Self::LoadParams, - ) -> Result, Self::LoadError> { - let mut db_tx = db.transaction().map_err(LoadWithPersistError::Persist)?; - let wallet_opt = chain::PersistWith::load(&mut db_tx, params)?; - db_tx.commit().map_err(LoadWithPersistError::Persist)?; - Ok(wallet_opt) - } - - fn persist( - db: &mut bdk_chain::rusqlite::Connection, - changeset: &::ChangeSet, - ) -> Result<(), Self::PersistError> { - let db_tx = db.transaction()?; +impl WalletPersister for bdk_chain::rusqlite::Connection { + type Error = bdk_chain::rusqlite::Error; + + fn initialize(persister: &mut Self) -> Result { + let db_tx = persister.transaction()?; + ChangeSet::init_sqlite_tables(&db_tx)?; + let changeset = ChangeSet::from_sqlite(&db_tx)?; + db_tx.commit()?; + Ok(changeset) + } + + fn persist(persister: &mut Self, changeset: &ChangeSet) -> Result<(), Self::Error> { + let db_tx = persister.transaction()?; changeset.persist_to_sqlite(&db_tx)?; db_tx.commit() } } +/// Error for [`bdk_file_store`]'s implementation of [`WalletPersister`]. #[cfg(feature = "file_store")] -impl chain::PersistWith> for Wallet { - type CreateParams = crate::CreateParams; - type LoadParams = crate::LoadParams; - type CreateError = CreateWithPersistError; - type LoadError = - LoadWithPersistError>; - type PersistError = std::io::Error; - - fn create( - db: &mut bdk_file_store::Store, - params: Self::CreateParams, - ) -> Result { - let mut wallet = - Self::create_with_params(params).map_err(CreateWithPersistError::Descriptor)?; - if let Some(changeset) = wallet.take_staged() { - db.append_changeset(&changeset) - .map_err(CreateWithPersistError::Persist)?; +#[derive(Debug)] +pub enum FileStoreError { + /// Error when loading from the store. + Load(bdk_file_store::AggregateChangesetsError), + /// Error when writing to the store. + Write(std::io::Error), +} + +#[cfg(feature = "file_store")] +impl core::fmt::Display for FileStoreError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + use core::fmt::Display; + match self { + FileStoreError::Load(e) => Display::fmt(e, f), + FileStoreError::Write(e) => Display::fmt(e, f), } - Ok(wallet) } +} + +#[cfg(feature = "file_store")] +impl std::error::Error for FileStoreError {} + +#[cfg(feature = "file_store")] +impl WalletPersister for bdk_file_store::Store { + type Error = FileStoreError; - fn load( - db: &mut bdk_file_store::Store, - params: Self::LoadParams, - ) -> Result, Self::LoadError> { - let changeset = db + fn initialize(persister: &mut Self) -> Result { + persister .aggregate_changesets() - .map_err(LoadWithPersistError::Persist)? - .unwrap_or_default(); - Self::load_with_params(changeset, params).map_err(LoadWithPersistError::InvalidChangeSet) + .map(Option::unwrap_or_default) + .map_err(FileStoreError::Load) } - fn persist( - db: &mut bdk_file_store::Store, - changeset: &::ChangeSet, - ) -> Result<(), Self::PersistError> { - db.append_changeset(changeset) + fn persist(persister: &mut Self, changeset: &ChangeSet) -> Result<(), Self::Error> { + persister + .append_changeset(changeset) + .map_err(FileStoreError::Write) } } @@ -154,6 +358,8 @@ impl std::error::Error for LoadWithPersistError pub enum CreateWithPersistError { /// Error from persistence. Persist(E), + /// Persister already has wallet data. + DataAlreadyExists(ChangeSet), /// Occurs when the loaded changeset cannot construct [`Wallet`]. Descriptor(DescriptorError), } @@ -162,6 +368,11 @@ impl fmt::Display for CreateWithPersistError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::Persist(err) => fmt::Display::fmt(err, f), + Self::DataAlreadyExists(changeset) => write!( + f, + "Cannot create wallet in persister which already contains wallet data: {:?}", + changeset + ), Self::Descriptor(err) => fmt::Display::fmt(&err, f), } } diff --git a/crates/wallet/tests/wallet.rs b/crates/wallet/tests/wallet.rs index ee60ab972..c530e779c 100644 --- a/crates/wallet/tests/wallet.rs +++ b/crates/wallet/tests/wallet.rs @@ -5,15 +5,15 @@ use std::str::FromStr; use anyhow::Context; use assert_matches::assert_matches; +use bdk_chain::COINBASE_MATURITY; use bdk_chain::{BlockId, ConfirmationTime}; -use bdk_chain::{PersistWith, COINBASE_MATURITY}; use bdk_wallet::coin_selection::{self, LargestFirstCoinSelection}; use bdk_wallet::descriptor::{calc_checksum, DescriptorError, IntoWalletDescriptor}; use bdk_wallet::error::CreateTxError; use bdk_wallet::psbt::PsbtUtils; use bdk_wallet::signer::{SignOptions, SignerError}; use bdk_wallet::tx_builder::AddForeignUtxoError; -use bdk_wallet::{AddressInfo, Balance, CreateParams, LoadParams, Wallet}; +use bdk_wallet::{AddressInfo, Balance, ChangeSet, Wallet, WalletPersister}; use bdk_wallet::{KeychainKind, LoadError, LoadMismatch, LoadWithPersistError}; use bitcoin::constants::ChainHash; use bitcoin::hashes::Hash; @@ -111,10 +111,8 @@ fn wallet_is_persisted() -> anyhow::Result<()> { where CreateDb: Fn(&Path) -> anyhow::Result, OpenDb: Fn(&Path) -> anyhow::Result, - Wallet: PersistWith, - >::CreateError: std::error::Error + Send + Sync + 'static, - >::LoadError: std::error::Error + Send + Sync + 'static, - >::PersistError: std::error::Error + Send + Sync + 'static, + Db: WalletPersister, + Db::Error: std::error::Error + Send + Sync + 'static, { let temp_dir = tempfile::tempdir().expect("must create tempdir"); let file_path = temp_dir.path().join(filename); @@ -137,7 +135,7 @@ fn wallet_is_persisted() -> anyhow::Result<()> { { let mut db = open_db(&file_path).context("failed to recover db")?; let _ = Wallet::load() - .network(Network::Testnet) + .check_network(Network::Testnet) .load_wallet(&mut db)? .expect("wallet must exist"); } @@ -146,7 +144,7 @@ fn wallet_is_persisted() -> anyhow::Result<()> { let wallet = Wallet::load() .descriptor(KeychainKind::External, Some(external_desc)) .descriptor(KeychainKind::Internal, Some(internal_desc)) - .network(Network::Testnet) + .check_network(Network::Testnet) .load_wallet(&mut db)? .expect("wallet must exist"); @@ -188,7 +186,7 @@ fn wallet_is_persisted() -> anyhow::Result<()> { #[test] fn wallet_load_checks() -> anyhow::Result<()> { - fn run( + fn run( filename: &str, create_db: CreateDb, open_db: OpenDb, @@ -196,15 +194,8 @@ fn wallet_load_checks() -> anyhow::Result<()> { where CreateDb: Fn(&Path) -> anyhow::Result, OpenDb: Fn(&Path) -> anyhow::Result, - Wallet: PersistWith< - Db, - CreateParams = CreateParams, - LoadParams = LoadParams, - LoadError = LoadWithPersistError, - >, - >::CreateError: std::error::Error + Send + Sync + 'static, - >::LoadError: std::error::Error + Send + Sync + 'static, - >::PersistError: std::error::Error + Send + Sync + 'static, + Db: WalletPersister + std::fmt::Debug, + Db::Error: std::error::Error + Send + Sync + 'static, { let temp_dir = tempfile::tempdir().expect("must create tempdir"); let file_path = temp_dir.path().join(filename); @@ -218,7 +209,7 @@ fn wallet_load_checks() -> anyhow::Result<()> { assert_matches!( Wallet::load() - .network(Network::Regtest) + .check_network(Network::Regtest) .load_wallet(&mut open_db(&file_path)?), Err(LoadWithPersistError::InvalidChangeSet(LoadError::Mismatch( LoadMismatch::Network { @@ -228,21 +219,9 @@ fn wallet_load_checks() -> anyhow::Result<()> { ))), "unexpected network check result: Regtest (check) is not Testnet (loaded)", ); - assert_matches!( - Wallet::load() - .network(Network::Bitcoin) - .load_wallet(&mut open_db(&file_path)?), - Err(LoadWithPersistError::InvalidChangeSet(LoadError::Mismatch( - LoadMismatch::Network { - loaded: Network::Testnet, - expected: Network::Bitcoin, - } - ))), - "unexpected network check result: Bitcoin (check) is not Testnet (loaded)", - ); let mainnet_hash = BlockHash::from_byte_array(ChainHash::BITCOIN.to_bytes()); assert_matches!( - Wallet::load().genesis_hash(mainnet_hash).load_wallet(&mut open_db(&file_path)?), + Wallet::load().check_genesis_hash(mainnet_hash).load_wallet(&mut open_db(&file_path)?), Err(LoadWithPersistError::InvalidChangeSet(LoadError::Mismatch(LoadMismatch::Genesis { .. }))), "unexpected genesis hash check result: mainnet hash (check) is not testnet hash (loaded)", ); @@ -270,8 +249,12 @@ fn wallet_load_checks() -> anyhow::Result<()> { run( "store.db", - |path| Ok(bdk_file_store::Store::create_new(DB_MAGIC, path)?), - |path| Ok(bdk_file_store::Store::open(DB_MAGIC, path)?), + |path| { + Ok(bdk_file_store::Store::::create_new( + DB_MAGIC, path, + )?) + }, + |path| Ok(bdk_file_store::Store::::open(DB_MAGIC, path)?), )?; run( "store.sqlite", @@ -292,7 +275,7 @@ fn single_descriptor_wallet_persist_and_recover() { let mut db = rusqlite::Connection::open(db_path).unwrap(); let desc = get_test_tr_single_sig_xprv(); - let mut wallet = CreateParams::new_single(desc) + let mut wallet = Wallet::create_single(desc) .network(Network::Testnet) .create_wallet(&mut db) .unwrap(); @@ -4186,7 +4169,7 @@ fn test_insert_tx_balance_and_utxos() { #[test] fn single_descriptor_wallet_can_create_tx_and_receive_change() { // create single descriptor wallet and fund it - let mut wallet = CreateParams::new_single(get_test_tr_single_sig_xprv()) + let mut wallet = Wallet::create_single(get_test_tr_single_sig_xprv()) .network(Network::Testnet) .create_wallet_no_persist() .unwrap(); diff --git a/example-crates/example_cli/Cargo.toml b/example-crates/example_cli/Cargo.toml index 9b8c4debb..09f093ebf 100644 --- a/example-crates/example_cli/Cargo.toml +++ b/example-crates/example_cli/Cargo.toml @@ -9,8 +9,10 @@ edition = "2021" bdk_chain = { path = "../../crates/chain", features = ["serde", "miniscript"]} bdk_coin_select = "0.3.0" bdk_file_store = { path = "../../crates/file_store" } +bitcoin = { version = "0.32.0", features = ["base64"], default-features = false } anyhow = "1" clap = { version = "3.2.23", features = ["derive", "env"] } +rand = "0.8" serde = { version = "1", features = ["derive"] } serde_json = "1.0" diff --git a/example-crates/example_cli/src/lib.rs b/example-crates/example_cli/src/lib.rs index ee0c9b376..393f9d3fb 100644 --- a/example-crates/example_cli/src/lib.rs +++ b/example-crates/example_cli/src/lib.rs @@ -10,14 +10,9 @@ use std::sync::Mutex; use anyhow::bail; use anyhow::Context; use bdk_chain::bitcoin::{ - absolute, - address::NetworkUnchecked, - bip32, consensus, constants, - hex::DisplayHex, - relative, - secp256k1::{rand::prelude::*, Secp256k1}, - transaction, Address, Amount, Network, NetworkKind, PrivateKey, Psbt, PublicKey, Sequence, - Transaction, TxIn, TxOut, + absolute, address::NetworkUnchecked, bip32, consensus, constants, hex::DisplayHex, relative, + secp256k1::Secp256k1, transaction, Address, Amount, Network, NetworkKind, PrivateKey, Psbt, + PublicKey, Sequence, Transaction, TxIn, TxOut, }; use bdk_chain::miniscript::{ descriptor::{DescriptorSecretKey, SinglePubKey}, @@ -37,6 +32,7 @@ use bdk_coin_select::{ }; use bdk_file_store::Store; use clap::{Parser, Subcommand}; +use rand::prelude::*; pub use anyhow; pub use clap; @@ -675,7 +671,7 @@ pub fn handle_commands( Ok(()) } PsbtCmd::Sign { psbt, descriptor } => { - let mut psbt = Psbt::from_str(&psbt.unwrap_or_default())?; + let mut psbt = Psbt::from_str(psbt.unwrap_or_default().as_str())?; let desc_str = match descriptor { Some(s) => s, @@ -717,7 +713,7 @@ pub fn handle_commands( chain_specific, psbt, } => { - let mut psbt = Psbt::from_str(&psbt)?; + let mut psbt = Psbt::from_str(psbt.as_str())?; psbt.finalize_mut(&Secp256k1::new()) .map_err(|errors| anyhow::anyhow!("failed to finalize PSBT {errors:?}"))?; diff --git a/example-crates/example_electrum/src/main.rs b/example-crates/example_electrum/src/main.rs index cda8c5526..49608fbf1 100644 --- a/example-crates/example_electrum/src/main.rs +++ b/example-crates/example_electrum/src/main.rs @@ -1,7 +1,7 @@ use std::io::{self, Write}; use bdk_chain::{ - bitcoin::{Address, Network, Txid}, + bitcoin::Network, collections::BTreeSet, indexed_tx_graph, spk_client::{FullScanRequest, SyncRequest}, @@ -139,8 +139,9 @@ fn main() -> anyhow::Result<()> { let graph = &*graph.lock().unwrap(); let chain = &*chain.lock().unwrap(); - FullScanRequest::from_chain_tip(chain.tip()) - .set_spks_for_keychain( + FullScanRequest::builder() + .chain_tip(chain.tip()) + .spks_for_keychain( Keychain::External, graph .index @@ -148,7 +149,7 @@ fn main() -> anyhow::Result<()> { .into_iter() .flatten(), ) - .set_spks_for_keychain( + .spks_for_keychain( Keychain::Internal, graph .index @@ -156,7 +157,7 @@ fn main() -> anyhow::Result<()> { .into_iter() .flatten(), ) - .inspect_spks_for_all_keychains({ + .inspect({ let mut once = BTreeSet::new(); move |k, spk_i, _| { if once.insert(k) { @@ -199,99 +200,43 @@ fn main() -> anyhow::Result<()> { } let chain_tip = chain.tip(); - let mut request = SyncRequest::from_chain_tip(chain_tip.clone()); + let mut request = + SyncRequest::builder() + .chain_tip(chain_tip.clone()) + .inspect(|item, progress| { + let pc = (100 * progress.consumed()) as f32 / progress.total() as f32; + eprintln!("[ SCANNING {:03.0}% ] {}", pc, item); + }); if all_spks { - let all_spks = graph - .index - .revealed_spks(..) - .map(|(index, spk)| (index, spk.to_owned())) - .collect::>(); - request = request.chain_spks(all_spks.into_iter().map(|((k, spk_i), spk)| { - eprint!("Scanning {}: {}", k, spk_i); - spk - })); + request = request.spks_with_indexes(graph.index.revealed_spks(..)); } if unused_spks { - let unused_spks = graph - .index - .unused_spks() - .map(|(index, spk)| (index, spk.to_owned())) - .collect::>(); - request = - request.chain_spks(unused_spks.into_iter().map(move |((k, spk_i), spk)| { - eprint!( - "Checking if address {} {}:{} has been used", - Address::from_script(&spk, network).unwrap(), - k, - spk_i, - ); - spk - })); + request = request.spks_with_indexes(graph.index.unused_spks()); } - if utxos { let init_outpoints = graph.index.outpoints(); - - let utxos = graph - .graph() - .filter_chain_unspents( - &*chain, - chain_tip.block_id(), - init_outpoints.iter().cloned(), - ) - .map(|(_, utxo)| utxo) - .collect::>(); - request = request.chain_outpoints(utxos.into_iter().map(|utxo| { - eprint!( - "Checking if outpoint {} (value: {}) has been spent", - utxo.outpoint, utxo.txout.value - ); - utxo.outpoint - })); + request = request.outpoints( + graph + .graph() + .filter_chain_unspents( + &*chain, + chain_tip.block_id(), + init_outpoints.iter().cloned(), + ) + .map(|(_, utxo)| utxo.outpoint), + ); }; - if unconfirmed { - let unconfirmed_txids = graph - .graph() - .list_canonical_txs(&*chain, chain_tip.block_id()) - .filter(|canonical_tx| !canonical_tx.chain_position.is_confirmed()) - .map(|canonical_tx| canonical_tx.tx_node.txid) - .collect::>(); - - request = request.chain_txids( - unconfirmed_txids - .into_iter() - .inspect(|txid| eprint!("Checking if {} is confirmed yet", txid)), + request = request.txids( + graph + .graph() + .list_canonical_txs(&*chain, chain_tip.block_id()) + .filter(|canonical_tx| !canonical_tx.chain_position.is_confirmed()) + .map(|canonical_tx| canonical_tx.tx_node.txid), ); } - let total_spks = request.spks.len(); - let total_txids = request.txids.len(); - let total_ops = request.outpoints.len(); - request = request - .inspect_spks({ - let mut visited = 0; - move |_| { - visited += 1; - eprintln!(" [ {:>6.2}% ]", (visited * 100) as f32 / total_spks as f32) - } - }) - .inspect_txids({ - let mut visited = 0; - move |_| { - visited += 1; - eprintln!(" [ {:>6.2}% ]", (visited * 100) as f32 / total_txids as f32) - } - }) - .inspect_outpoints({ - let mut visited = 0; - move |_| { - visited += 1; - eprintln!(" [ {:>6.2}% ]", (visited * 100) as f32 / total_ops as f32) - } - }); - let res = client .sync(request, scan_options.batch_size, false) .context("scanning the blockchain")?; @@ -313,7 +258,7 @@ fn main() -> anyhow::Result<()> { let mut chain = chain.lock().unwrap(); let mut graph = graph.lock().unwrap(); - let chain_changeset = chain.apply_update(chain_update)?; + let chain_changeset = chain.apply_update(chain_update.expect("request has chain tip"))?; let mut indexed_tx_graph_changeset = indexed_tx_graph::ChangeSet::::default(); diff --git a/example-crates/example_esplora/src/main.rs b/example-crates/example_esplora/src/main.rs index 608e58d11..b07a6697d 100644 --- a/example-crates/example_esplora/src/main.rs +++ b/example-crates/example_esplora/src/main.rs @@ -1,10 +1,11 @@ +use core::f32; use std::{ collections::BTreeSet, io::{self, Write}, }; use bdk_chain::{ - bitcoin::{Address, Network, Txid}, + bitcoin::Network, spk_client::{FullScanRequest, SyncRequest}, Merge, }; @@ -144,8 +145,10 @@ fn main() -> anyhow::Result<()> { let request = { let chain_tip = chain.lock().expect("mutex must not be poisoned").tip(); let indexed_graph = &*graph.lock().expect("mutex must not be poisoned"); - FullScanRequest::from_keychain_txout_index(chain_tip, &indexed_graph.index) - .inspect_spks_for_all_keychains({ + FullScanRequest::builder() + .chain_tip(chain_tip) + .spks_from_indexer(&indexed_graph.index) + .inspect({ let mut once = BTreeSet::::new(); move |keychain, spk_i, _| { if once.insert(keychain) { @@ -156,6 +159,7 @@ fn main() -> anyhow::Result<()> { let _ = io::stderr().flush(); } }) + .build() }; // The client scans keychain spks for transaction histories, stopping after `stop_gap` @@ -176,14 +180,17 @@ fn main() -> anyhow::Result<()> { // deriviation indices. Usually before a scan you are on a fresh wallet with no // addresses derived so we need to derive up to last active addresses the scan found // before adding the transactions. - (chain.apply_update(update.chain_update)?, { - let index_changeset = graph - .index - .reveal_to_target_multi(&update.last_active_indices); - let mut indexed_tx_graph_changeset = graph.apply_update(update.graph_update); - indexed_tx_graph_changeset.merge(index_changeset.into()); - indexed_tx_graph_changeset - }) + ( + chain.apply_update(update.chain_update.expect("request included chain tip"))?, + { + let index_changeset = graph + .index + .reveal_to_target_multi(&update.last_active_indices); + let mut indexed_tx_graph_changeset = graph.apply_update(update.graph_update); + indexed_tx_graph_changeset.merge(index_changeset.into()); + indexed_tx_graph_changeset + }, + ) } EsploraCommands::Sync { mut unused_spks, @@ -206,7 +213,15 @@ fn main() -> anyhow::Result<()> { let local_tip = chain.lock().expect("mutex must not be poisoned").tip(); // Spks, outpoints and txids we want updates on will be accumulated here. - let mut request = SyncRequest::from_chain_tip(local_tip.clone()); + let mut request = + SyncRequest::builder() + .chain_tip(local_tip.clone()) + .inspect(|item, progress| { + let pc = (100 * progress.consumed()) as f32 / progress.total() as f32; + eprintln!("[ SCANNING {:03.0}% ] {}", pc, item); + // Flush early to ensure we print at every iteration. + let _ = io::stderr().flush(); + }); // Get a short lock on the structures to get spks, utxos, and txs that we are interested // in. @@ -215,108 +230,41 @@ fn main() -> anyhow::Result<()> { let chain = chain.lock().unwrap(); if *all_spks { - let all_spks = graph - .index - .revealed_spks(..) - .map(|((k, i), spk)| (k, i, spk.to_owned())) - .collect::>(); - request = request.chain_spks(all_spks.into_iter().map(|(k, i, spk)| { - eprint!("scanning {}:{}", k, i); - // Flush early to ensure we print at every iteration. - let _ = io::stderr().flush(); - spk - })); + request = request.spks_with_indexes(graph.index.revealed_spks(..)); } if unused_spks { - let unused_spks = graph - .index - .unused_spks() - .map(|(index, spk)| (index, spk.to_owned())) - .collect::>(); - request = - request.chain_spks(unused_spks.into_iter().map(move |((k, i), spk)| { - eprint!( - "Checking if address {} {}:{} has been used", - Address::from_script(&spk, network).unwrap(), - k, - i, - ); - // Flush early to ensure we print at every iteration. - let _ = io::stderr().flush(); - spk - })); + request = request.spks_with_indexes(graph.index.unused_spks()); } if utxos { // We want to search for whether the UTXO is spent, and spent by which // transaction. We provide the outpoint of the UTXO to // `EsploraExt::update_tx_graph_without_keychain`. let init_outpoints = graph.index.outpoints(); - let utxos = graph - .graph() - .filter_chain_unspents( - &*chain, - local_tip.block_id(), - init_outpoints.iter().cloned(), - ) - .map(|(_, utxo)| utxo) - .collect::>(); - request = request.chain_outpoints( - utxos - .into_iter() - .inspect(|utxo| { - eprint!( - "Checking if outpoint {} (value: {}) has been spent", - utxo.outpoint, utxo.txout.value - ); - // Flush early to ensure we print at every iteration. - let _ = io::stderr().flush(); - }) - .map(|utxo| utxo.outpoint), + request = request.outpoints( + graph + .graph() + .filter_chain_unspents( + &*chain, + local_tip.block_id(), + init_outpoints.iter().cloned(), + ) + .map(|(_, utxo)| utxo.outpoint), ); }; if unconfirmed { // We want to search for whether the unconfirmed transaction is now confirmed. // We provide the unconfirmed txids to // `EsploraExt::update_tx_graph_without_keychain`. - let unconfirmed_txids = graph - .graph() - .list_canonical_txs(&*chain, local_tip.block_id()) - .filter(|canonical_tx| !canonical_tx.chain_position.is_confirmed()) - .map(|canonical_tx| canonical_tx.tx_node.txid) - .collect::>(); - request = request.chain_txids(unconfirmed_txids.into_iter().inspect(|txid| { - eprint!("Checking if {} is confirmed yet", txid); - // Flush early to ensure we print at every iteration. - let _ = io::stderr().flush(); - })); + request = request.txids( + graph + .graph() + .list_canonical_txs(&*chain, local_tip.block_id()) + .filter(|canonical_tx| !canonical_tx.chain_position.is_confirmed()) + .map(|canonical_tx| canonical_tx.tx_node.txid), + ); } } - let total_spks = request.spks.len(); - let total_txids = request.txids.len(); - let total_ops = request.outpoints.len(); - request = request - .inspect_spks({ - let mut visited = 0; - move |_| { - visited += 1; - eprintln!(" [ {:>6.2}% ]", (visited * 100) as f32 / total_spks as f32) - } - }) - .inspect_txids({ - let mut visited = 0; - move |_| { - visited += 1; - eprintln!(" [ {:>6.2}% ]", (visited * 100) as f32 / total_txids as f32) - } - }) - .inspect_outpoints({ - let mut visited = 0; - move |_| { - visited += 1; - eprintln!(" [ {:>6.2}% ]", (visited * 100) as f32 / total_ops as f32) - } - }); let mut update = client.sync(request, scan_options.parallel_requests)?; // Update last seen unconfirmed @@ -324,7 +272,10 @@ fn main() -> anyhow::Result<()> { let _ = update.graph_update.update_last_seen_unconfirmed(now); ( - chain.lock().unwrap().apply_update(update.chain_update)?, + chain + .lock() + .unwrap() + .apply_update(update.chain_update.expect("request has chain tip"))?, graph.lock().unwrap().apply_update(update.graph_update), ) } diff --git a/example-crates/wallet_electrum/src/main.rs b/example-crates/wallet_electrum/src/main.rs index b1e7655de..f4596ce18 100644 --- a/example-crates/wallet_electrum/src/main.rs +++ b/example-crates/wallet_electrum/src/main.rs @@ -26,10 +26,10 @@ fn main() -> Result<(), anyhow::Error> { let mut db = Store::::open_or_create_new(DB_MAGIC.as_bytes(), db_path)?; let wallet_opt = Wallet::load() - .network(NETWORK) .descriptor(KeychainKind::External, Some(EXTERNAL_DESC)) .descriptor(KeychainKind::Internal, Some(INTERNAL_DESC)) .extract_keys() + .check_network(NETWORK) .load_wallet(&mut db)?; let mut wallet = match wallet_opt { Some(wallet) => wallet, @@ -52,19 +52,17 @@ fn main() -> Result<(), anyhow::Error> { // already have. client.populate_tx_cache(wallet.tx_graph()); - let request = wallet - .start_full_scan() - .inspect_spks_for_all_keychains({ - let mut once = HashSet::::new(); - move |k, spk_i, _| { - if once.insert(k) { - print!("\nScanning keychain [{:?}]", k) - } else { - print!(" {:<3}", spk_i) - } + let request = wallet.start_full_scan().inspect({ + let mut stdout = std::io::stdout(); + let mut once = HashSet::::new(); + move |k, spk_i, _| { + if once.insert(k) { + print!("\nScanning keychain [{:?}]", k); } - }) - .inspect_spks_for_all_keychains(|_, _, _| std::io::stdout().flush().expect("must flush")); + print!(" {:<3}", spk_i); + stdout.flush().expect("must flush"); + } + }); let mut update = client.full_scan(request, STOP_GAP, BATCH_SIZE, false)?; diff --git a/example-crates/wallet_esplora_async/src/main.rs b/example-crates/wallet_esplora_async/src/main.rs index 535abc6af..f81f8101c 100644 --- a/example-crates/wallet_esplora_async/src/main.rs +++ b/example-crates/wallet_esplora_async/src/main.rs @@ -23,10 +23,10 @@ async fn main() -> Result<(), anyhow::Error> { let mut conn = Connection::open(DB_PATH)?; let wallet_opt = Wallet::load() - .network(NETWORK) .descriptor(KeychainKind::External, Some(EXTERNAL_DESC)) .descriptor(KeychainKind::Internal, Some(INTERNAL_DESC)) .extract_keys() + .check_network(NETWORK) .load_wallet(&mut conn)?; let mut wallet = match wallet_opt { Some(wallet) => wallet, @@ -45,14 +45,15 @@ async fn main() -> Result<(), anyhow::Error> { print!("Syncing..."); let client = esplora_client::Builder::new(ESPLORA_URL).build_async()?; - let request = wallet.start_full_scan().inspect_spks_for_all_keychains({ + let request = wallet.start_full_scan().inspect({ + let mut stdout = std::io::stdout(); let mut once = BTreeSet::::new(); move |keychain, spk_i, _| { if once.insert(keychain) { - print!("\nScanning keychain [{:?}] ", keychain); + print!("\nScanning keychain [{:?}]", keychain); } print!(" {:<3}", spk_i); - std::io::stdout().flush().expect("must flush") + stdout.flush().expect("must flush") } }); diff --git a/example-crates/wallet_esplora_blocking/src/main.rs b/example-crates/wallet_esplora_blocking/src/main.rs index 7e825150d..bec395611 100644 --- a/example-crates/wallet_esplora_blocking/src/main.rs +++ b/example-crates/wallet_esplora_blocking/src/main.rs @@ -22,10 +22,10 @@ fn main() -> Result<(), anyhow::Error> { let mut db = Store::::open_or_create_new(DB_MAGIC.as_bytes(), DB_PATH)?; let wallet_opt = Wallet::load() - .network(NETWORK) .descriptor(KeychainKind::External, Some(EXTERNAL_DESC)) .descriptor(KeychainKind::Internal, Some(INTERNAL_DESC)) .extract_keys() + .check_network(NETWORK) .load_wallet(&mut db)?; let mut wallet = match wallet_opt { Some(wallet) => wallet, @@ -47,14 +47,15 @@ fn main() -> Result<(), anyhow::Error> { print!("Syncing..."); let client = esplora_client::Builder::new(ESPLORA_URL).build_blocking(); - let request = wallet.start_full_scan().inspect_spks_for_all_keychains({ + let request = wallet.start_full_scan().inspect({ + let mut stdout = std::io::stdout(); let mut once = BTreeSet::::new(); move |keychain, spk_i, _| { if once.insert(keychain) { print!("\nScanning keychain [{:?}] ", keychain); } print!(" {:<3}", spk_i); - std::io::stdout().flush().expect("must flush") + stdout.flush().expect("must flush") } }); diff --git a/example-crates/wallet_rpc/src/main.rs b/example-crates/wallet_rpc/src/main.rs index 324cf0445..388ccaf67 100644 --- a/example-crates/wallet_rpc/src/main.rs +++ b/example-crates/wallet_rpc/src/main.rs @@ -89,10 +89,10 @@ fn main() -> anyhow::Result<()> { let mut db = Store::::open_or_create_new(DB_MAGIC.as_bytes(), args.db_path)?; let wallet_opt = Wallet::load() - .network(args.network) .descriptor(KeychainKind::External, Some(args.descriptor.clone())) .descriptor(KeychainKind::Internal, Some(args.change_descriptor.clone())) .extract_keys() + .check_network(args.network) .load_wallet(&mut db)?; let mut wallet = match wallet_opt { Some(wallet) => wallet,