diff --git a/Cargo.toml b/Cargo.toml index 544dfca08..34b16994f 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,7 +65,7 @@ serde = { version = "1.0.210", default-features = false, features = ["std", "der serde_json = { version = "1.0.128", default-features = false, features = ["std"] } log = { version = "0.4.22", default-features = false, features = ["std"]} -vss-client = "0.3" +vss-client = { package = "vss-client-ng", version = "0.4" } prost = { version = "0.11.6", default-features = false} [target.'cfg(windows)'.dependencies] @@ -91,6 +91,9 @@ clightningrpc = { version = "0.3.0-beta.8", default-features = false } lnd_grpc_rust = { version = "2.10.0", default-features = false } tokio = { version = "1.37", features = ["fs"] } +[target.'cfg(vss_test)'.dev-dependencies] +ldk-node-062 = { package = "ldk-node", version = "=0.6.2" } + [build-dependencies] uniffi = { version = "0.28.3", features = ["build"], optional = true } @@ -151,3 +154,6 @@ harness = false #lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "21e9a9c0ef80021d0669f2a366f55d08ba8d9b03" } #lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "21e9a9c0ef80021d0669f2a366f55d08ba8d9b03" } #lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "21e9a9c0ef80021d0669f2a366f55d08ba8d9b03" } + +#vss-client-ng = { path = "../vss-client" } +#vss-client-ng = { git = "https://github.com/lightningdevkit/vss-client", branch = "main" } diff --git a/src/builder.rs b/src/builder.rs index c0e39af7a..b45f03f6d 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -732,7 +732,11 @@ impl NodeBuilder { let vss_seed_bytes: [u8; 32] = vss_xprv.private_key.secret_bytes(); let vss_store = - VssStore::new(vss_url, store_id, vss_seed_bytes, header_provider, Arc::clone(&runtime)); + VssStore::new(vss_url, store_id, vss_seed_bytes, header_provider).map_err(|e| { + log_error!(logger, "Failed to setup VSS store: {}", e); + BuildError::KVStoreSetupFailed + })?; + build_with_store_internal( config, self.chain_data_source_config.as_ref(), diff --git a/src/io/vss_store.rs b/src/io/vss_store.rs index 0e7d0872a..491b36b54 100644 --- a/src/io/vss_store.rs +++ b/src/io/vss_store.rs @@ -16,8 +16,10 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use bitcoin::hashes::{sha256, Hash, HashEngine, Hmac, HmacEngine}; +use lightning::impl_writeable_tlv_based_enum; use lightning::io::{self, Error, ErrorKind}; use lightning::util::persist::{KVStore, KVStoreSync}; +use lightning::util::ser::{Readable, Writeable}; use prost::Message; use rand::RngCore; use vss_client::client::VssClient; @@ -35,7 +37,6 @@ use vss_client::util::retry::{ use vss_client::util::storable_builder::{EntropySource, StorableBuilder}; use crate::io::utils::check_namespace_key_validity; -use crate::runtime::Runtime; type CustomRetryPolicy = FilteredRetryPolicy< JitteredRetryPolicy< @@ -44,10 +45,32 @@ type CustomRetryPolicy = FilteredRetryPolicy< Box bool + 'static + Send + Sync>, >; +#[derive(Debug, PartialEq)] +enum VssSchemaVersion { + // The initial schema version. + // + // This used an empty `aad` and unobfuscated `primary_namespace`/`secondary_namespace`s in the + // stored key. + V0, + // The second deployed schema version. + // + // Here we started to obfuscate the primary and secondary namespaces and the obfuscated + // `store_key` (`obfuscate(primary_namespace#secondary_namespace)#obfuscate(key)`) is now used + // as `aad` for encryption, ensuring that the encrypted blobs commit to the key they're stored + // under. + V1, +} + +impl_writeable_tlv_based_enum!(VssSchemaVersion, + (0, V0) => {}, + (1, V1) => {}, +); + +const VSS_SCHEMA_VERSION_KEY: &str = "vss_schema_version"; + // We set this to a small number of threads that would still allow to make some progress if one // would hit a blocking case const INTERNAL_RUNTIME_WORKERS: usize = 2; -const VSS_IO_TIMEOUT: Duration = Duration::from_secs(5); /// A [`KVStoreSync`] implementation that writes to and reads from a [VSS](https://github.com/lightningdevkit/vss-server/blob/main/README.md) backend. pub struct VssStore { @@ -55,7 +78,6 @@ pub struct VssStore { // Version counter to ensure that writes are applied in the correct order. It is assumed that read and list // operations aren't sensitive to the order of execution. next_version: AtomicU64, - runtime: Arc, // A VSS-internal runtime we use to avoid any deadlocks we could hit when waiting on a spawned // blocking task to finish while the blocked thread had acquired the reactor. In particular, // this works around a previously-hit case where a concurrent call to @@ -68,25 +90,59 @@ pub struct VssStore { impl VssStore { pub(crate) fn new( base_url: String, store_id: String, vss_seed: [u8; 32], - header_provider: Arc, runtime: Arc, - ) -> Self { - let inner = Arc::new(VssStoreInner::new(base_url, store_id, vss_seed, header_provider)); + header_provider: Arc, + ) -> io::Result { let next_version = AtomicU64::new(1); - let internal_runtime = Some( - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .thread_name_fn(|| { - static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); - let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); - format!("ldk-node-vss-runtime-{}", id) - }) - .worker_threads(INTERNAL_RUNTIME_WORKERS) - .max_blocking_threads(INTERNAL_RUNTIME_WORKERS) - .build() - .unwrap(), + let internal_runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .thread_name_fn(|| { + static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); + let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); + format!("ldk-node-vss-runtime-{}", id) + }) + .worker_threads(INTERNAL_RUNTIME_WORKERS) + .max_blocking_threads(INTERNAL_RUNTIME_WORKERS) + .build() + .unwrap(); + + let (data_encryption_key, obfuscation_master_key) = + derive_data_encryption_and_obfuscation_keys(&vss_seed); + let key_obfuscator = KeyObfuscator::new(obfuscation_master_key); + + let sync_retry_policy = retry_policy(); + let blocking_client = VssClient::new_with_headers( + base_url.clone(), + sync_retry_policy, + header_provider.clone(), ); - Self { inner, next_version, runtime, internal_runtime } + let runtime_handle = internal_runtime.handle(); + let schema_version = tokio::task::block_in_place(|| { + runtime_handle.block_on(async { + determine_and_write_schema_version( + &blocking_client, + &store_id, + data_encryption_key, + &key_obfuscator, + ) + .await + }) + })?; + + let async_retry_policy = retry_policy(); + let async_client = + VssClient::new_with_headers(base_url, async_retry_policy, header_provider); + + let inner = Arc::new(VssStoreInner::new( + schema_version, + blocking_client, + async_client, + store_id, + data_encryption_key, + key_obfuscator, + )); + + Ok(Self { inner, next_version, internal_runtime: Some(internal_runtime) }) } // Same logic as for the obfuscated keys below, but just for locking, using the plaintext keys @@ -129,17 +185,12 @@ impl KVStoreSync for VssStore { let secondary_namespace = secondary_namespace.to_string(); let key = key.to_string(); let inner = Arc::clone(&self.inner); - let fut = - async move { inner.read_internal(primary_namespace, secondary_namespace, key).await }; - // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always - // times out. - let spawned_fut = internal_runtime.spawn(async move { - tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| { - let msg = "VssStore::read timed out"; - Error::new(ErrorKind::Other, msg) - }) - }); - self.runtime.block_on(spawned_fut).expect("We should always finish")? + let fut = async move { + inner + .read_internal(&inner.blocking_client, primary_namespace, secondary_namespace, key) + .await + }; + tokio::task::block_in_place(move || internal_runtime.block_on(fut)) } fn write( @@ -159,6 +210,7 @@ impl KVStoreSync for VssStore { let fut = async move { inner .write_internal( + &inner.blocking_client, inner_lock_ref, locking_key, version, @@ -169,15 +221,7 @@ impl KVStoreSync for VssStore { ) .await }; - // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always - // times out. - let spawned_fut = internal_runtime.spawn(async move { - tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| { - let msg = "VssStore::write timed out"; - Error::new(ErrorKind::Other, msg) - }) - }); - self.runtime.block_on(spawned_fut).expect("We should always finish")? + tokio::task::block_in_place(move || internal_runtime.block_on(fut)) } fn remove( @@ -197,6 +241,7 @@ impl KVStoreSync for VssStore { let fut = async move { inner .remove_internal( + &inner.blocking_client, inner_lock_ref, locking_key, version, @@ -206,15 +251,7 @@ impl KVStoreSync for VssStore { ) .await }; - // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always - // times out. - let spawned_fut = internal_runtime.spawn(async move { - tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| { - let msg = "VssStore::remove timed out"; - Error::new(ErrorKind::Other, msg) - }) - }); - self.runtime.block_on(spawned_fut).expect("We should always finish")? + tokio::task::block_in_place(move || internal_runtime.block_on(fut)) } fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { @@ -226,16 +263,12 @@ impl KVStoreSync for VssStore { let primary_namespace = primary_namespace.to_string(); let secondary_namespace = secondary_namespace.to_string(); let inner = Arc::clone(&self.inner); - let fut = async move { inner.list_internal(primary_namespace, secondary_namespace).await }; - // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always - // times out. - let spawned_fut = internal_runtime.spawn(async move { - tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| { - let msg = "VssStore::list timed out"; - Error::new(ErrorKind::Other, msg) - }) - }); - self.runtime.block_on(spawned_fut).expect("We should always finish")? + let fut = async move { + inner + .list_internal(&inner.blocking_client, primary_namespace, secondary_namespace) + .await + }; + tokio::task::block_in_place(move || internal_runtime.block_on(fut)) } } @@ -247,9 +280,11 @@ impl KVStore for VssStore { let secondary_namespace = secondary_namespace.to_string(); let key = key.to_string(); let inner = Arc::clone(&self.inner); - Box::pin( - async move { inner.read_internal(primary_namespace, secondary_namespace, key).await }, - ) + Box::pin(async move { + inner + .read_internal(&inner.async_client, primary_namespace, secondary_namespace, key) + .await + }) } fn write( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, @@ -263,6 +298,7 @@ impl KVStore for VssStore { Box::pin(async move { inner .write_internal( + &inner.async_client, inner_lock_ref, locking_key, version, @@ -286,6 +322,7 @@ impl KVStore for VssStore { Box::pin(async move { inner .remove_internal( + &inner.async_client, inner_lock_ref, locking_key, version, @@ -302,7 +339,9 @@ impl KVStore for VssStore { let primary_namespace = primary_namespace.to_string(); let secondary_namespace = secondary_namespace.to_string(); let inner = Arc::clone(&self.inner); - Box::pin(async move { inner.list_internal(primary_namespace, secondary_namespace).await }) + Box::pin(async move { + inner.list_internal(&inner.async_client, primary_namespace, secondary_namespace).await + }) } } @@ -314,9 +353,13 @@ impl Drop for VssStore { } struct VssStoreInner { - client: VssClient, + schema_version: VssSchemaVersion, + blocking_client: VssClient, + // A secondary client that will only be used for async persistence via `KVStore`, to ensure TCP + // connections aren't shared between our outer and the internal runtime. + async_client: VssClient, store_id: String, - storable_builder: StorableBuilder, + data_encryption_key: [u8; 32], key_obfuscator: KeyObfuscator, // Per-key locks that ensures that we don't have concurrent writes to the same namespace/key. // The lock also encapsulates the latest written version per key. @@ -325,29 +368,20 @@ struct VssStoreInner { impl VssStoreInner { pub(crate) fn new( - base_url: String, store_id: String, vss_seed: [u8; 32], - header_provider: Arc, + schema_version: VssSchemaVersion, blocking_client: VssClient, + async_client: VssClient, store_id: String, + data_encryption_key: [u8; 32], key_obfuscator: KeyObfuscator, ) -> Self { - let (data_encryption_key, obfuscation_master_key) = - derive_data_encryption_and_obfuscation_keys(&vss_seed); - let key_obfuscator = KeyObfuscator::new(obfuscation_master_key); - let storable_builder = StorableBuilder::new(data_encryption_key, RandEntropySource); - let retry_policy = ExponentialBackoffRetryPolicy::new(Duration::from_millis(10)) - .with_max_attempts(10) - .with_max_total_delay(Duration::from_secs(15)) - .with_max_jitter(Duration::from_millis(10)) - .skip_retry_on_error(Box::new(|e: &VssError| { - matches!( - e, - VssError::NoSuchKeyError(..) - | VssError::InvalidRequestError(..) - | VssError::ConflictError(..) - ) - }) as _); - - let client = VssClient::new_with_headers(base_url, retry_policy, header_provider); let locks = Mutex::new(HashMap::new()); - Self { client, store_id, storable_builder, key_obfuscator, locks } + Self { + schema_version, + blocking_client, + async_client, + store_id, + data_encryption_key, + key_obfuscator, + locks, + } } fn get_inner_lock_ref(&self, locking_key: String) -> Arc> { @@ -358,17 +392,45 @@ impl VssStoreInner { fn build_obfuscated_key( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> String { - let obfuscated_key = self.key_obfuscator.obfuscate(key); - if primary_namespace.is_empty() { - obfuscated_key + if self.schema_version == VssSchemaVersion::V1 { + let obfuscated_prefix = + self.build_obfuscated_prefix(primary_namespace, secondary_namespace); + let obfuscated_key = self.key_obfuscator.obfuscate(key); + format!("{}#{}", obfuscated_prefix, obfuscated_key) } else { - format!("{}#{}#{}", primary_namespace, secondary_namespace, obfuscated_key) + // Default to V0 schema + let obfuscated_key = self.key_obfuscator.obfuscate(key); + if primary_namespace.is_empty() { + obfuscated_key + } else { + format!("{}#{}#{}", primary_namespace, secondary_namespace, obfuscated_key) + } + } + } + + fn build_obfuscated_prefix( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> String { + if self.schema_version == VssSchemaVersion::V1 { + let prefix = format!("{}#{}", primary_namespace, secondary_namespace); + self.key_obfuscator.obfuscate(&prefix) + } else { + // Default to V0 schema + format!("{}#{}", primary_namespace, secondary_namespace) } } fn extract_key(&self, unified_key: &str) -> io::Result { - let mut parts = unified_key.splitn(3, '#'); - let (_primary_namespace, _secondary_namespace) = (parts.next(), parts.next()); + let mut parts = if self.schema_version == VssSchemaVersion::V1 { + let mut parts = unified_key.splitn(2, '#'); + let _obfuscated_namespace = parts.next(); + parts + } else { + // Default to V0 schema + let mut parts = unified_key.splitn(3, '#'); + let (_primary_namespace, _secondary_namespace) = (parts.next(), parts.next()); + parts + }; match parts.next() { Some(obfuscated_key) => { let actual_key = self.key_obfuscator.deobfuscate(obfuscated_key)?; @@ -379,11 +441,12 @@ impl VssStoreInner { } async fn list_all_keys( - &self, primary_namespace: &str, secondary_namespace: &str, + &self, client: &VssClient, primary_namespace: &str, + secondary_namespace: &str, ) -> io::Result> { let mut page_token = None; let mut keys = vec![]; - let key_prefix = format!("{}#{}", primary_namespace, secondary_namespace); + let key_prefix = self.build_obfuscated_prefix(primary_namespace, secondary_namespace); while page_token != Some("".to_string()) { let request = ListKeyVersionsRequest { store_id: self.store_id.clone(), @@ -392,7 +455,7 @@ impl VssStoreInner { page_size: None, }; - let response = self.client.list_key_versions(&request).await.map_err(|e| { + let response = client.list_key_versions(&request).await.map_err(|e| { let msg = format!( "Failed to list keys in {}/{}: {}", primary_namespace, secondary_namespace, e @@ -409,14 +472,14 @@ impl VssStoreInner { } async fn read_internal( - &self, primary_namespace: String, secondary_namespace: String, key: String, + &self, client: &VssClient, primary_namespace: String, + secondary_namespace: String, key: String, ) -> io::Result> { check_namespace_key_validity(&primary_namespace, &secondary_namespace, Some(&key), "read")?; - let obfuscated_key = - self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key); - let request = GetObjectRequest { store_id: self.store_id.clone(), key: obfuscated_key }; - let resp = self.client.get_object(&request).await.map_err(|e| { + let store_key = self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key); + let request = GetObjectRequest { store_id: self.store_id.clone(), key: store_key.clone() }; + let resp = client.get_object(&request).await.map_err(|e| { let msg = format!( "Failed to read from key {}/{}/{}: {}", primary_namespace, secondary_namespace, key, e @@ -437,12 +500,17 @@ impl VssStoreInner { Error::new(ErrorKind::Other, msg) })?; - Ok(self.storable_builder.deconstruct(storable)?.0) + let storable_builder = StorableBuilder::new(RandEntropySource); + let aad = + if self.schema_version == VssSchemaVersion::V1 { store_key.as_bytes() } else { &[] }; + let decrypted = storable_builder.deconstruct(storable, &self.data_encryption_key, aad)?.0; + Ok(decrypted) } async fn write_internal( - &self, inner_lock_ref: Arc>, locking_key: String, version: u64, - primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + &self, client: &VssClient, inner_lock_ref: Arc>, + locking_key: String, version: u64, primary_namespace: String, secondary_namespace: String, + key: String, buf: Vec, ) -> io::Result<()> { check_namespace_key_validity( &primary_namespace, @@ -451,23 +519,26 @@ impl VssStoreInner { "write", )?; - self.execute_locked_write(inner_lock_ref, locking_key, version, async move || { - let obfuscated_key = - self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key); - let vss_version = -1; - let storable = self.storable_builder.build(buf, vss_version); - let request = PutObjectRequest { - store_id: self.store_id.clone(), - global_version: None, - transaction_items: vec![KeyValue { - key: obfuscated_key, - version: vss_version, - value: storable.encode_to_vec(), - }], - delete_items: vec![], - }; + let store_key = self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key); + let vss_version = -1; + let storable_builder = StorableBuilder::new(RandEntropySource); + let aad = + if self.schema_version == VssSchemaVersion::V1 { store_key.as_bytes() } else { &[] }; + let storable = + storable_builder.build(buf.to_vec(), vss_version, &self.data_encryption_key, aad); + let request = PutObjectRequest { + store_id: self.store_id.clone(), + global_version: None, + transaction_items: vec![KeyValue { + key: store_key, + version: vss_version, + value: storable.encode_to_vec(), + }], + delete_items: vec![], + }; - self.client.put_object(&request).await.map_err(|e| { + self.execute_locked_write(inner_lock_ref, locking_key, version, async move || { + client.put_object(&request).await.map_err(|e| { let msg = format!( "Failed to write to key {}/{}/{}: {}", primary_namespace, secondary_namespace, key, e @@ -481,8 +552,9 @@ impl VssStoreInner { } async fn remove_internal( - &self, inner_lock_ref: Arc>, locking_key: String, version: u64, - primary_namespace: String, secondary_namespace: String, key: String, + &self, client: &VssClient, inner_lock_ref: Arc>, + locking_key: String, version: u64, primary_namespace: String, secondary_namespace: String, + key: String, ) -> io::Result<()> { check_namespace_key_validity( &primary_namespace, @@ -499,7 +571,7 @@ impl VssStoreInner { key_value: Some(KeyValue { key: obfuscated_key, version: -1, value: vec![] }), }; - self.client.delete_object(&request).await.map_err(|e| { + client.delete_object(&request).await.map_err(|e| { let msg = format!( "Failed to delete key {}/{}/{}: {}", primary_namespace, secondary_namespace, key, e @@ -513,12 +585,15 @@ impl VssStoreInner { } async fn list_internal( - &self, primary_namespace: String, secondary_namespace: String, + &self, client: &VssClient, primary_namespace: String, + secondary_namespace: String, ) -> io::Result> { check_namespace_key_validity(&primary_namespace, &secondary_namespace, None, "list")?; - let keys = - self.list_all_keys(&primary_namespace, &secondary_namespace).await.map_err(|e| { + let keys = self + .list_all_keys(client, &primary_namespace, &secondary_namespace) + .await + .map_err(|e| { let msg = format!( "Failed to retrieve keys in namespace: {}/{} : {}", primary_namespace, secondary_namespace, e @@ -587,6 +662,126 @@ fn derive_data_encryption_and_obfuscation_keys(vss_seed: &[u8; 32]) -> ([u8; 32] (k1, k2) } +fn retry_policy() -> CustomRetryPolicy { + ExponentialBackoffRetryPolicy::new(Duration::from_millis(10)) + .with_max_attempts(100) + .with_max_total_delay(Duration::from_secs(180)) + .with_max_jitter(Duration::from_millis(100)) + .skip_retry_on_error(Box::new(|e: &VssError| { + matches!( + e, + VssError::NoSuchKeyError(..) + | VssError::InvalidRequestError(..) + | VssError::ConflictError(..) + ) + }) as _) +} + +async fn determine_and_write_schema_version( + client: &VssClient, store_id: &String, data_encryption_key: [u8; 32], + key_obfuscator: &KeyObfuscator, +) -> io::Result { + // Build the obfuscated `vss_schema_version` key. + let obfuscated_prefix = key_obfuscator.obfuscate(&format! {"{}#{}", "", ""}); + let obfuscated_key = key_obfuscator.obfuscate(VSS_SCHEMA_VERSION_KEY); + let store_key = format!("{}#{}", obfuscated_prefix, obfuscated_key); + + // Try to read the stored schema version. + let request = GetObjectRequest { store_id: store_id.clone(), key: store_key.clone() }; + let resp = match client.get_object(&request).await { + Ok(resp) => Some(resp), + Err(VssError::NoSuchKeyError(..)) => { + // The value is not set. + None + }, + Err(e) => { + let msg = format!("Failed to read schema version: {}", e); + return Err(Error::new(ErrorKind::Other, msg)); + }, + }; + + if let Some(resp) = resp { + // The schema version was present, so just decrypt the stored data. + + // unwrap safety: resp.value must be always present for a non-erroneous VSS response, otherwise + // it is an API-violation which is converted to [`VssError::InternalServerError`] in [`VssClient`] + let storable = Storable::decode(&resp.value.unwrap().value[..]).map_err(|e| { + let msg = format!("Failed to decode schema version: {}", e); + Error::new(ErrorKind::Other, msg) + })?; + + let storable_builder = StorableBuilder::new(RandEntropySource); + // Schema version was added starting with V1, so if set at all, we use the key as `aad` + let aad = store_key.as_bytes(); + let decrypted = storable_builder + .deconstruct(storable, &data_encryption_key, aad) + .map_err(|e| { + let msg = format!("Failed to decode schema version: {}", e); + Error::new(ErrorKind::Other, msg) + })? + .0; + + let schema_version: VssSchemaVersion = Readable::read(&mut io::Cursor::new(decrypted)) + .map_err(|e| { + let msg = format!("Failed to decode schema version: {}", e); + Error::new(ErrorKind::Other, msg) + })?; + Ok(schema_version) + } else { + // The schema version wasn't present, this either means we're running for the first time *or* it's V0 pre-migration (predating writing of the schema version). + + // Check if any `bdk_wallet` data was written by listing keys under the respective + // (unobfuscated) prefix. + const V0_BDK_WALLET_PREFIX: &str = "bdk_wallet#"; + let request = ListKeyVersionsRequest { + store_id: store_id.clone(), + key_prefix: Some(V0_BDK_WALLET_PREFIX.to_string()), + page_token: None, + page_size: None, + }; + + let response = client.list_key_versions(&request).await.map_err(|e| { + let msg = format!("Failed to determine schema version: {}", e); + Error::new(ErrorKind::Other, msg) + })?; + + let wallet_data_present = !response.key_versions.is_empty(); + if wallet_data_present { + // If the wallet data is present, it means we're not running for the first time. + Ok(VssSchemaVersion::V0) + } else { + // We're running for the first time, write the schema version to save unnecessary IOps + // on future startup. + let schema_version = VssSchemaVersion::V1; + let encoded_version = schema_version.encode(); + + let storable_builder = StorableBuilder::new(RandEntropySource); + let vss_version = -1; + let aad = store_key.as_bytes(); + let storable = + storable_builder.build(encoded_version, vss_version, &data_encryption_key, aad); + + let request = PutObjectRequest { + store_id: store_id.clone(), + global_version: None, + transaction_items: vec![KeyValue { + key: store_key, + version: vss_version, + value: storable.encode_to_vec(), + }], + delete_items: vec![], + }; + + client.put_object(&request).await.map_err(|e| { + let msg = format!("Failed to write schema version: {}", e); + Error::new(ErrorKind::Other, msg) + })?; + + Ok(schema_version) + } + } +} + /// A source for generating entropy/randomness using [`rand`]. pub(crate) struct RandEntropySource; @@ -610,7 +805,6 @@ mod tests { use super::*; use crate::io::test_utils::do_read_write_remove_list_persist; - use crate::logger::Logger; #[test] fn vss_read_write_remove_list_persist() { @@ -620,11 +814,8 @@ mod tests { let mut vss_seed = [0u8; 32]; rng.fill_bytes(&mut vss_seed); let header_provider = Arc::new(FixedHeaders::new(HashMap::new())); - let logger = Arc::new(Logger::new_log_facade()); - let runtime = Arc::new(Runtime::new(logger).unwrap()); let vss_store = - VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider, runtime); - + VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider).unwrap(); do_read_write_remove_list_persist(&vss_store); } @@ -636,10 +827,8 @@ mod tests { let mut vss_seed = [0u8; 32]; rng.fill_bytes(&mut vss_seed); let header_provider = Arc::new(FixedHeaders::new(HashMap::new())); - let logger = Arc::new(Logger::new_log_facade()); - let runtime = Arc::new(Runtime::new(logger).unwrap()); let vss_store = - VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider, runtime); + VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider).unwrap(); do_read_write_remove_list_persist(&vss_store); drop(vss_store) diff --git a/tests/integration_tests_vss.rs b/tests/integration_tests_vss.rs index 93f167dae..3b384ec45 100644 --- a/tests/integration_tests_vss.rs +++ b/tests/integration_tests_vss.rs @@ -12,6 +12,7 @@ mod common; use std::collections::HashMap; use ldk_node::Builder; +use rand::{rng, Rng}; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn channel_full_cycle_with_vss_store() { @@ -55,3 +56,143 @@ async fn channel_full_cycle_with_vss_store() { ) .await; } + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn vss_v0_schema_backwards_compatibility() { + let (bitcoind, electrsd) = common::setup_bitcoind_and_electrsd(); + let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); + let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap(); + + let rand_suffix: String = + (0..7).map(|_| rng().sample(rand::distr::Alphanumeric) as char).collect(); + let store_id = format!("v0_compat_test_{}", rand_suffix); + let storage_path = common::random_storage_path().to_str().unwrap().to_owned(); + let seed_bytes = [42u8; 64]; + + // Setup a v0.6.2 `Node` persisted with the v0 scheme. + let (old_balance, old_node_id) = { + let mut builder_old = ldk_node_062::Builder::new(); + builder_old.set_network(bitcoin::Network::Regtest); + builder_old.set_storage_dir_path(storage_path.clone()); + builder_old.set_entropy_seed_bytes(seed_bytes); + builder_old.set_chain_source_esplora(esplora_url.clone(), None); + let node_old = builder_old + .build_with_vss_store_and_fixed_headers( + vss_base_url.clone(), + store_id.clone(), + HashMap::new(), + ) + .unwrap(); + + node_old.start().unwrap(); + let addr_old = node_old.onchain_payment().new_address().unwrap(); + common::premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![addr_old], + bitcoin::Amount::from_sat(100_000), + ) + .await; + node_old.sync_wallets().unwrap(); + + let balance = node_old.list_balances().spendable_onchain_balance_sats; + assert!(balance > 0); + let node_id = node_old.node_id(); + + // Workaround necessary as v0.6.2's VSS runtime wasn't dropsafe in a tokio context. + tokio::task::block_in_place(move || { + node_old.stop().unwrap(); + drop(node_old); + }); + + (balance, node_id) + }; + + // Now ensure we can still reinit from the same backend. + let mut builder_new = Builder::new(); + builder_new.set_network(bitcoin::Network::Regtest); + builder_new.set_storage_dir_path(storage_path); + builder_new.set_entropy_seed_bytes(seed_bytes); + builder_new.set_chain_source_esplora(esplora_url, None); + + let node_new = builder_new + .build_with_vss_store_and_fixed_headers(vss_base_url, store_id, HashMap::new()) + .unwrap(); + + node_new.start().unwrap(); + node_new.sync_wallets().unwrap(); + + let new_balance = node_new.list_balances().spendable_onchain_balance_sats; + let new_node_id = node_new.node_id(); + + assert_eq!(old_node_id, new_node_id); + assert_eq!(old_balance, new_balance); + + node_new.stop().unwrap(); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn vss_node_restart() { + let (bitcoind, electrsd) = common::setup_bitcoind_and_electrsd(); + let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); + let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap(); + + let rand_suffix: String = + (0..7).map(|_| rng().sample(rand::distr::Alphanumeric) as char).collect(); + let store_id = format!("restart_test_{}", rand_suffix); + let storage_path = common::random_storage_path().to_str().unwrap().to_owned(); + let seed_bytes = [42u8; 64]; + + // Setup initial node and fund it. + let (expected_balance_sats, expected_node_id) = { + let mut builder = Builder::new(); + builder.set_network(bitcoin::Network::Regtest); + builder.set_storage_dir_path(storage_path.clone()); + builder.set_entropy_seed_bytes(seed_bytes); + builder.set_chain_source_esplora(esplora_url.clone(), None); + let node = builder + .build_with_vss_store_and_fixed_headers( + vss_base_url.clone(), + store_id.clone(), + HashMap::new(), + ) + .unwrap(); + + node.start().unwrap(); + let addr = node.onchain_payment().new_address().unwrap(); + common::premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![addr], + bitcoin::Amount::from_sat(100_000), + ) + .await; + node.sync_wallets().unwrap(); + + let balance = node.list_balances().spendable_onchain_balance_sats; + assert!(balance > 0); + let node_id = node.node_id(); + + node.stop().unwrap(); + (balance, node_id) + }; + + // Verify node can be restarted from VSS backend. + let mut builder = Builder::new(); + builder.set_network(bitcoin::Network::Regtest); + builder.set_storage_dir_path(storage_path); + builder.set_entropy_seed_bytes(seed_bytes); + builder.set_chain_source_esplora(esplora_url, None); + + let node = builder + .build_with_vss_store_and_fixed_headers(vss_base_url, store_id, HashMap::new()) + .unwrap(); + + node.start().unwrap(); + node.sync_wallets().unwrap(); + + assert_eq!(expected_node_id, node.node_id()); + assert_eq!(expected_balance_sats, node.list_balances().spendable_onchain_balance_sats); + + node.stop().unwrap(); +}