diff --git a/CHANGELOG.md b/CHANGELOG.md index 6a760db5f..94edaa3dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,30 @@ ### Enhancements +- Added periodic cleanup of old account data from the database ([#1304](https://github.com/0xMiden/miden-node/issues/1304)). +- Added support for timeouts in the WASM remote prover clients ([#1383](https://github.com/0xMiden/miden-node/pull/1383)). +- Added block validation endpoint to validator and integrated with block producer ([#1382](https://github.com/0xMiden/miden-node/pull/1381)). +- Added support for caching mempool statistics in the block producer server ([#1388](https://github.com/0xMiden/miden-node/pull/1388)). +- Added mempool statistics to the block producer status in the `miden-network-monitor` binary ([#1392](https://github.com/0xMiden/miden-node/pull/1392)). +- Added success rate to the `miden-network-monitor` binary ([#1420](https://github.com/0xMiden/miden-node/pull/1420)). +- Added chain tip to the block producer status ([#1419](https://github.com/0xMiden/miden-node/pull/1419)). +- The mempool's transaction capacity is now configurable ([#1433](https://github.com/0xMiden/miden-node/pull/1433)). +- Renamed card's names in the `miden-network-monitor` binary ([#1441](https://github.com/0xMiden/miden-node/pull/1441)). +- Integrated NTX Builder with validator via `SubmitProvenTransaction` RPC ([#1453](https://github.com/0xMiden/miden-node/pull/1453)). +- Added pagination to `GetNetworkAccountIds` endpoint ([#1452](https://github.com/0xMiden/miden-node/pull/1452)). +- Improved tracing in `miden-network-monitor` binary ([#1366](https://github.com/0xMiden/miden-node/pull/1366)). +- Integrated RPC stack with Validator component for transaction validation ([#1457](https://github.com/0xMiden/miden-node/pull/1457)). +- Add partial storage map queries to RPC ([#1428](https://github.com/0xMiden/miden-node/pull/1428)). +- Added validated transactions check to block validation logc in Validator ([#1460](https://github.com/0xMiden/miden-node/pull/1460)). +- Added explorer status to the `miden-network-monitor` binary ([#1450](https://github.com/0xMiden/miden-node/pull/1450)). +- Added `GetLimits` endpoint to the RPC server ([#1410](https://github.com/0xMiden/miden-node/pull/1410)). +- Added gRPC-Web probe support to the `miden-network-monitor` binary ([#1484](https://github.com/0xMiden/miden-node/pull/1484)). +- Add DB schema change check ([#1268](https://github.com/0xMiden/miden-node/pull/1485)). +- Improve DB query performance for account queries ([#1496](https://github.com/0xMiden/miden-node/pull/1496). +- Limit number of storage map keys in `GetAccount` requests ([#1517](https://github.com/0xMiden/miden-node/pull/1517)). +- The network monitor now marks the chain as unhealthy if it fails to create new blocks ([#1512](https://github.com/0xMiden/miden-node/pull/1512)). +- Block producer now detects if it is desync'd from the store's chain tip and aborts ([#1520](https://github.com/0xMiden/miden-node/pull/1520)). +- Pin tool versions in CI ([#1523](https://github.com/0xMiden/miden-node/pull/1523)). - [BREAKING] Updated miden-base dependencies to use `next` branch; renamed `NoteInputs` to `NoteStorage`, `.inputs()` to `.storage()`, and database `inputs` column to `storage` ([#1595](https://github.com/0xMiden/miden-node/pull/1595)). - [BREAKING] Move block proving from Blocker Producer to the Store ([#1579](https://github.com/0xMiden/miden-node/pull/1579)). diff --git a/crates/store/src/db/mod.rs b/crates/store/src/db/mod.rs index 6b7ecec6a..7b8344232 100644 --- a/crates/store/src/db/mod.rs +++ b/crates/store/src/db/mod.rs @@ -23,7 +23,7 @@ use miden_protocol::note::{ use miden_protocol::transaction::TransactionId; use miden_protocol::utils::{Deserializable, Serializable}; use tokio::sync::oneshot; -use tracing::{Instrument, info, instrument}; +use tracing::{Instrument, info, info_span, instrument}; use crate::COMPONENT; use crate::db::manager::{ConnectionManager, configure_connection_on_creation}; @@ -54,8 +54,10 @@ pub(crate) mod schema; pub type Result = std::result::Result; +#[derive(Clone)] pub struct Db { pool: deadpool_diesel::Pool>, + notify_cleanup_task: tokio::sync::mpsc::Sender, } /// Describes the value of an asset for an account ID at `block_num` specifically. @@ -314,6 +316,8 @@ impl Db { } /// Open a connection to the DB and apply any pending migrations. + /// + /// This also spawns a background task that handles periodic cleanup of old account data. #[instrument(target = COMPONENT, skip_all)] pub async fn load(database_filepath: PathBuf) -> Result { let manager = ConnectionManager::new(database_filepath.to_str().unwrap()); @@ -325,8 +329,20 @@ impl Db { "Connected to the database" ); - let me = Db { pool }; + // Create channel for cleanup notifications + // Buffer size of 2 is sufficient since cleanup coalesces multiple notifications + let (notify_cleanup_task, rx) = tokio::sync::mpsc::channel(2); + + let me = Db { pool, notify_cleanup_task }; + let me2 = me.clone(); + + // Spawn background cleanup task + // TODO: retain the join handle to coordinate shutdown or surface task failures. + let _cleanup_task_handle = + tokio::spawn(async move { Self::periodic_cleanup_task(me2, rx).await }); + me.query("migrations", apply_migrations).await?; + Ok(me) } @@ -586,28 +602,171 @@ impl Db { signed_block: SignedBlock, notes: Vec<(NoteRecord, Option)>, ) -> Result<()> { - self.transact("apply block", move |conn| -> Result<()> { - models::queries::apply_block( - conn, - signed_block.header(), - signed_block.signature(), - ¬es, - signed_block.body().created_nullifiers(), - signed_block.body().updated_accounts(), - signed_block.body().transactions(), - )?; - - // XXX FIXME TODO free floating mutex MUST NOT exist - // it doesn't bind it properly to the data locked! - if allow_acquire.send(()).is_err() { - tracing::warn!(target: COMPONENT, "failed to send notification for successful block application, potential deadlock"); + let block_num = signed_block.header().block_num(); + + let result = self + .transact("apply block", move |conn| -> Result<()> { + // TODO: This span is logged in a root span, we should connect it to the parent one. + let _span = info_span!(target: COMPONENT, "write_block_to_db").entered(); + + models::queries::apply_block( + conn, + signed_block.header(), + signed_block.signature(), + ¬es, + signed_block.body().created_nullifiers(), + signed_block.body().updated_accounts(), + signed_block.body().transactions(), + )?; + + // XXX FIXME TODO free floating mutex MUST NOT exist + // it doesn't bind it properly to the data locked! + if allow_acquire.send(()).is_err() { + tracing::warn!(target: COMPONENT, "failed to send notification for successful block application, potential deadlock"); + } + + acquire_done.blocking_recv()?; + + Ok(()) + }) + .await; + + // Notify the cleanup task of the latest applied block + // Ignore errors since cleanup is non-critical and shouldn't block block application + // TODO: track dropped cleanup notifications to surface backpressure. + let _res = self.notify_cleanup_task.try_send(block_num); + + result + } + + /// Background task that handles periodic cleanup of old account data. + /// + /// This task runs indefinitely, receiving block numbers from the `apply_block` method + /// and triggering cleanup whenever new blocks are available. The cleanup process: + /// + /// 1. Batches incoming notifications using `recv_many` to avoid excessive cleanup operations + /// 2. Only processes the most recent block number from the batch (coalescing multiple updates) + /// 3. Runs cleanup with a 30-second timeout to prevent blocking + /// 4. Logs success or failure but continues running regardless of cleanup outcome + /// + /// # Batching Strategy + /// + /// The batching approach ensures that if multiple blocks are applied quickly (e.g., during + /// initial sync), only the latest block number triggers cleanup. This prevents redundant + /// cleanup operations while ensuring cleanup runs on the most recent state. + /// + /// # Error Handling + /// + /// This task never exits on cleanup errors. Cleanup failures are logged but the task + /// continues to process future blocks. This ensures that temporary issues (like database + /// locks or high load) don't permanently disable the cleanup mechanism. + /// + /// The task only exits if the channel is closed (i.e., all `Db` instances are dropped), + /// which typically happens during application shutdown. + async fn periodic_cleanup_task(db: Self, mut notify: tokio::sync::mpsc::Receiver) { + let mut buf = Vec::with_capacity(128); + + loop { + // Receive many notifications at once to batch them + // If the channel is closed (returns 0), exit the task + let received = notify.recv_many(&mut buf, 128).await; + if received == 0 { + tracing::info!(target: COMPONENT, "Cleanup task shutting down: channel closed"); + break; } - acquire_done.blocking_recv()?; + // Only process the most recent block number from the batch + // This coalesces multiple cleanup requests during fast block processing + if let Some(block_num) = buf.pop() { + match db.run_periodic_cleanup(block_num).await { + Ok((vault_deleted, storage_deleted)) => { + tracing::info!( + target: COMPONENT, + block_num = block_num.as_u32(), + vault_assets_deleted = vault_deleted, + storage_map_values_deleted = storage_deleted, + "Periodic cleanup completed successfully" + ); + }, + Err(e) => { + tracing::warn!( + target: COMPONENT, + block_num = block_num.as_u32(), + error = %e, + "Periodic cleanup failed, will retry on next block" + ); + }, + } + } - Ok(()) - }) - .await + // Clear the buffer for the next batch + buf.clear(); + } + } + + /// Runs periodic cleanup of old account data with a timeout. + /// + /// This function cleans up old vault asset and storage map value entries for all accounts, + /// keeping only the latest entry and up to MAX_HISTORICAL_ENTRIES_PER_ACCOUNT historical + /// entries per key. + /// + /// The cleanup operation has a 30-second timeout to prevent it from blocking for too long. + /// If the timeout is reached, the cleanup is aborted and returns an error. + /// + /// # Parameters + /// * `block_num` - The block number at which cleanup was triggered (used for logging) + /// + /// # Returns + /// A tuple of (vault_assets_deleted, storage_map_values_deleted) on success, or an error + /// if the operation fails or times out. + #[instrument(level = "debug", target = COMPONENT, skip(self), fields(block_num = %block_num.as_u32()))] + async fn run_periodic_cleanup(&self, block_num: BlockNumber) -> Result<(usize, usize)> { + use std::time::Duration; + + let cleanup_timeout = Duration::from_secs(30); + let start = std::time::Instant::now(); + + let cleanup_task = self.transact("periodic cleanup", move |conn| { + models::queries::cleanup_all_accounts(conn, block_num) + }); + + // Run cleanup with timeout + let result = tokio::time::timeout(cleanup_timeout, cleanup_task).await; + + let duration = start.elapsed(); + + match result { + Ok(Ok((vault_deleted, storage_deleted))) => { + tracing::info!( + target: COMPONENT, + block_num = block_num.as_u32(), + vault_assets_deleted = vault_deleted, + storage_map_values_deleted = storage_deleted, + duration_ms = duration.as_millis(), + "Cleanup completed within timeout" + ); + Ok((vault_deleted, storage_deleted)) + }, + Ok(Err(e)) => { + tracing::error!( + target: COMPONENT, + block_num = block_num.as_u32(), + duration_ms = duration.as_millis(), + error = %e, + "Cleanup failed" + ); + Err(e) + }, + Err(_timeout_err) => { + tracing::warn!( + target: COMPONENT, + block_num = block_num.as_u32(), + timeout_ms = cleanup_timeout.as_millis(), + "Cleanup timed out - operation was aborted" + ); + Err(DatabaseError::QueryTimeout("periodic cleanup".to_string())) + }, + } } /// Selects storage map values for syncing storage maps for a specific account ID. diff --git a/crates/store/src/db/models/queries/accounts.rs b/crates/store/src/db/models/queries/accounts.rs index 85bead244..7f0abb241 100644 --- a/crates/store/src/db/models/queries/accounts.rs +++ b/crates/store/src/db/models/queries/accounts.rs @@ -1252,3 +1252,97 @@ pub(crate) struct AccountStorageMapRowInsert { pub(crate) value: Vec, pub(crate) is_latest: bool, } + +// CLEANUP FUNCTIONS +// ================================================================================================ + +/// Number of historical blocks to retain for vault assets and storage map values. +/// Entries older than `chain_tip - HISTORICAL_BLOCK_RETENTION` will be deleted, +/// except for entries marked with `is_latest=true` which are always retained. +pub const HISTORICAL_BLOCK_RETENTION: u32 = 50; + +/// Clean up old vault asset entries for a specific account, deleting entries older than +/// the retention window. +/// +/// Keeps entries where `block_num >= cutoff_block` OR `is_latest = true`. +#[cfg(test)] +pub(crate) fn cleanup_old_account_vault_assets( + conn: &mut SqliteConnection, + account_id: AccountId, + chain_tip: BlockNumber, +) -> Result { + let account_id_bytes = account_id.to_bytes(); + let cutoff_block = i64::from(chain_tip.as_u32().saturating_sub(HISTORICAL_BLOCK_RETENTION)); + + diesel::sql_query( + r#" + DELETE FROM account_vault_assets + WHERE account_id = ?1 AND block_num < ?2 AND is_latest = 0 + "#, + ) + .bind::(&account_id_bytes) + .bind::(cutoff_block) + .execute(conn) + .map_err(DatabaseError::Diesel) +} + +/// Clean up old storage map value entries for a specific account, deleting entries older than +/// the retention window. +/// +/// Keeps entries where `block_num >= cutoff_block` OR `is_latest = true`. +#[cfg(test)] +pub(crate) fn cleanup_old_account_storage_map_values( + conn: &mut SqliteConnection, + account_id: AccountId, + chain_tip: BlockNumber, +) -> Result { + let account_id_bytes = account_id.to_bytes(); + let cutoff_block = i64::from(chain_tip.as_u32().saturating_sub(HISTORICAL_BLOCK_RETENTION)); + + diesel::sql_query( + r#" + DELETE FROM account_storage_map_values + WHERE account_id = ?1 AND block_num < ?2 AND is_latest = 0 + "#, + ) + .bind::(&account_id_bytes) + .bind::(cutoff_block) + .execute(conn) + .map_err(DatabaseError::Diesel) +} + +/// Clean up old entries for all accounts, deleting entries older than the retention window. +/// +/// Deletes rows where `block_num < chain_tip - HISTORICAL_BLOCK_RETENTION` and `is_latest = false`. +/// This is a simple and efficient approach that doesn't require window functions. +/// +/// # Returns +/// A tuple of (vault_assets_deleted, storage_map_values_deleted) +pub fn cleanup_all_accounts( + conn: &mut SqliteConnection, + chain_tip: BlockNumber, +) -> Result<(usize, usize), DatabaseError> { + let cutoff_block = i64::from(chain_tip.as_u32().saturating_sub(HISTORICAL_BLOCK_RETENTION)); + + let vault_deleted = diesel::sql_query( + r#" + DELETE FROM account_vault_assets + WHERE block_num < ?1 AND is_latest = 0 + "#, + ) + .bind::(cutoff_block) + .execute(conn) + .map_err(DatabaseError::Diesel)?; + + let storage_deleted = diesel::sql_query( + r#" + DELETE FROM account_storage_map_values + WHERE block_num < ?1 AND is_latest = 0 + "#, + ) + .bind::(cutoff_block) + .execute(conn) + .map_err(DatabaseError::Diesel)?; + + Ok((vault_deleted, storage_deleted)) +} diff --git a/crates/store/src/db/tests.rs b/crates/store/src/db/tests.rs index f6cb0c328..8de164e93 100644 --- a/crates/store/src/db/tests.rs +++ b/crates/store/src/db/tests.rs @@ -73,10 +73,10 @@ use pretty_assertions::assert_eq; use rand::Rng; use super::{AccountInfo, NoteRecord, NullifierInfo}; -use crate::db::TransactionSummary; use crate::db::migrations::apply_migrations; use crate::db::models::queries::{StorageMapValue, insert_account_storage_map_value}; use crate::db::models::{Page, queries, utils}; +use crate::db::{TransactionSummary, schema}; use crate::errors::DatabaseError; fn create_db() -> SqliteConnection { @@ -2232,6 +2232,359 @@ fn db_roundtrip_account_storage_with_maps() { ); } +// CLEANUP TESTS +// ================================================================================================ + +/// Chain length used in cleanup tests - must be > HISTORICAL_BLOCK_RETENTION for meaningful tests. +const TEST_CHAIN_LENGTH: u32 = 100; + +#[test] +#[miden_node_test_macro::enable_logging] +fn test_cleanup_old_account_vault_assets() { + use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl}; + + use crate::db::models::queries::{ + HISTORICAL_BLOCK_RETENTION, + cleanup_old_account_vault_assets, + }; + + let mut conn = create_db(); + let account_id = AccountId::try_from(ACCOUNT_ID_PUBLIC_FUNGIBLE_FAUCET).unwrap(); + + // Create blocks 1-TEST_CHAIN_LENGTH + for i in 1..=TEST_CHAIN_LENGTH { + create_block(&mut conn, BlockNumber::from(i)); + } + + // Create account + queries::upsert_accounts( + &mut conn, + &[mock_block_account_update(account_id, 0)], + BlockNumber::from(1), + ) + .unwrap(); + + let vault_key = AssetVaultKey::new_unchecked(num_to_word(100)); + let asset = Asset::Fungible(FungibleAsset::new(account_id, 1000).unwrap()); + + // Insert vault asset entries for blocks 1-TEST_CHAIN_LENGTH + for block_num in 1..=TEST_CHAIN_LENGTH { + queries::insert_account_vault_asset( + &mut conn, + account_id, + BlockNumber::from(block_num), + vault_key, + Some(asset), + ) + .unwrap(); + } + + // Verify we have TEST_CHAIN_LENGTH entries + use schema::account_vault_assets::dsl; + let count = dsl::account_vault_assets + .filter(dsl::account_id.eq(account_id.to_bytes())) + .count() + .get_result::(&mut conn) + .unwrap(); + assert_eq!( + count, TEST_CHAIN_LENGTH as i64, + "Should have TEST_CHAIN_LENGTH entries before cleanup" + ); + + // Run cleanup with chain_tip at block TEST_CHAIN_LENGTH + // cutoff = TEST_CHAIN_LENGTH - 50, so blocks < cutoff should be deleted + let chain_tip = BlockNumber::from(TEST_CHAIN_LENGTH); + let deleted = cleanup_old_account_vault_assets(&mut conn, account_id, chain_tip).unwrap(); + + // Blocks 1 to cutoff-1 are older than cutoff, but block TEST_CHAIN_LENGTH is is_latest=true + // So we should delete (cutoff-1) entries (blocks 1 to cutoff-1, excluding is_latest) + let cutoff = TEST_CHAIN_LENGTH - HISTORICAL_BLOCK_RETENTION; + let expected_deleted = cutoff - 1; // blocks 1 to cutoff-1 + assert_eq!( + deleted, expected_deleted as usize, + "Should have deleted entries older than block {}", + cutoff + ); + + // Verify remaining count + let remaining = dsl::account_vault_assets + .filter(dsl::account_id.eq(account_id.to_bytes())) + .count() + .get_result::(&mut conn) + .unwrap(); + // Remaining: blocks cutoff to TEST_CHAIN_LENGTH + let expected_remaining = TEST_CHAIN_LENGTH - expected_deleted; + assert_eq!( + remaining as u32, expected_remaining, + "Should have {} entries remaining", + expected_remaining + ); + + // Verify the latest entry is still marked as latest + let latest_count = dsl::account_vault_assets + .filter(dsl::account_id.eq(account_id.to_bytes())) + .filter(dsl::is_latest.eq(true)) + .count() + .get_result::(&mut conn) + .unwrap(); + assert_eq!(latest_count, 1, "Should have exactly one latest entry"); + + // Verify the latest entry is from block TEST_CHAIN_LENGTH + let latest_block = dsl::account_vault_assets + .filter(dsl::account_id.eq(account_id.to_bytes())) + .filter(dsl::is_latest.eq(true)) + .select(dsl::block_num) + .first::(&mut conn) + .unwrap(); + assert_eq!( + latest_block, TEST_CHAIN_LENGTH as i64, + "Latest entry should be from block TEST_CHAIN_LENGTH" + ); +} + +#[test] +#[miden_node_test_macro::enable_logging] +fn test_cleanup_old_account_storage_map_values() { + use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl}; + + use crate::db::models::queries::{ + HISTORICAL_BLOCK_RETENTION, + cleanup_old_account_storage_map_values, + }; + + let mut conn = create_db(); + let account_id = AccountId::try_from(ACCOUNT_ID_REGULAR_PUBLIC_ACCOUNT_IMMUTABLE_CODE).unwrap(); + + // Create blocks 1-TEST_CHAIN_LENGTH + for i in 1..=TEST_CHAIN_LENGTH { + create_block(&mut conn, BlockNumber::from(i)); + } + + let slot_name = StorageSlotName::mock(5); + let key = num_to_word(123); + let value_base = num_to_word(456); + + // Insert storage map value entries for blocks 1-TEST_CHAIN_LENGTH + for block_num in 1..=TEST_CHAIN_LENGTH { + queries::insert_account_storage_map_value( + &mut conn, + account_id, + BlockNumber::from(block_num), + slot_name.clone(), + key, + value_base, + ) + .unwrap(); + } + + // Verify we have TEST_CHAIN_LENGTH entries + use schema::account_storage_map_values::dsl; + let count = dsl::account_storage_map_values + .filter(dsl::account_id.eq(account_id.to_bytes())) + .count() + .get_result::(&mut conn) + .unwrap(); + assert_eq!( + count, TEST_CHAIN_LENGTH as i64, + "Should have TEST_CHAIN_LENGTH entries before cleanup" + ); + + // Run cleanup with chain_tip at block TEST_CHAIN_LENGTH + let chain_tip = BlockNumber::from(TEST_CHAIN_LENGTH); + let deleted = cleanup_old_account_storage_map_values(&mut conn, account_id, chain_tip).unwrap(); + + // cutoff = TEST_CHAIN_LENGTH - HISTORICAL_BLOCK_RETENTION, blocks 1 to cutoff-1 should be + // deleted + let cutoff = TEST_CHAIN_LENGTH - HISTORICAL_BLOCK_RETENTION; + let expected_deleted = cutoff - 1; + assert_eq!( + deleted, expected_deleted as usize, + "Should have deleted entries older than block {}", + cutoff + ); + + // Verify remaining count + let remaining = dsl::account_storage_map_values + .filter(dsl::account_id.eq(account_id.to_bytes())) + .count() + .get_result::(&mut conn) + .unwrap(); + let expected_remaining = TEST_CHAIN_LENGTH - expected_deleted; + assert_eq!( + remaining as u32, expected_remaining, + "Should have {} entries remaining", + expected_remaining + ); + + // Verify the latest entry is still marked as latest + let latest_count = dsl::account_storage_map_values + .filter(dsl::account_id.eq(account_id.to_bytes())) + .filter(dsl::is_latest.eq(true)) + .count() + .get_result::(&mut conn) + .unwrap(); + assert_eq!(latest_count, 1, "Should have exactly one latest entry"); +} + +#[test] +#[miden_node_test_macro::enable_logging] +fn test_cleanup_preserves_latest_state() { + use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl}; + + use crate::db::models::queries::cleanup_old_account_vault_assets; + + let mut conn = create_db(); + let account_id = AccountId::try_from(ACCOUNT_ID_PUBLIC_FUNGIBLE_FAUCET).unwrap(); + + // Create blocks + for i in 1..=10 { + create_block(&mut conn, BlockNumber::from(i)); + } + + // Create account + queries::upsert_accounts( + &mut conn, + &[mock_block_account_update(account_id, 0)], + BlockNumber::from(1), + ) + .unwrap(); + + // Test with multiple vault keys to ensure all latest entries are preserved + let vault_key_1 = AssetVaultKey::new_unchecked(num_to_word(100)); + let vault_key_2 = AssetVaultKey::new_unchecked(num_to_word(200)); + let asset = Asset::Fungible(FungibleAsset::new(account_id, 1000).unwrap()); + + // Insert entries for both keys + for block_num in 1..=10 { + queries::insert_account_vault_asset( + &mut conn, + account_id, + BlockNumber::from(block_num), + vault_key_1, + Some(asset), + ) + .unwrap(); + + queries::insert_account_vault_asset( + &mut conn, + account_id, + BlockNumber::from(block_num), + vault_key_2, + Some(asset), + ) + .unwrap(); + } + + // Run cleanup with chain_tip at 10 (cutoff = 10 - 50 = 0, so nothing should be deleted) + let chain_tip = BlockNumber::from(10); + let deleted = cleanup_old_account_vault_assets(&mut conn, account_id, chain_tip).unwrap(); + assert_eq!(deleted, 0, "Should not delete anything when chain is young"); + + // Verify both latest entries exist + use schema::account_vault_assets::dsl; + let latest_entries: Vec<(Vec, i64)> = dsl::account_vault_assets + .filter(dsl::account_id.eq(account_id.to_bytes())) + .filter(dsl::is_latest.eq(true)) + .select((dsl::vault_key, dsl::block_num)) + .order(dsl::vault_key.asc()) + .load(&mut conn) + .unwrap(); + + assert_eq!(latest_entries.len(), 2, "Should have two latest entries"); + assert_eq!(latest_entries[0].1, 10, "Latest for key 1 should be block 10"); + assert_eq!(latest_entries[1].1, 10, "Latest for key 2 should be block 10"); +} + +#[test] +#[miden_node_test_macro::enable_logging] +fn test_cleanup_all_accounts() { + use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl}; + + use crate::db::models::queries::{HISTORICAL_BLOCK_RETENTION, cleanup_all_accounts}; + + let mut conn = create_db(); + + // Create two accounts + let account1 = AccountId::try_from(ACCOUNT_ID_REGULAR_PUBLIC_ACCOUNT_IMMUTABLE_CODE).unwrap(); + let account2 = AccountId::try_from(ACCOUNT_ID_REGULAR_PUBLIC_ACCOUNT_IMMUTABLE_CODE_2).unwrap(); + let faucet_id = AccountId::try_from(ACCOUNT_ID_PUBLIC_FUNGIBLE_FAUCET).unwrap(); + + // Create blocks 1-TEST_CHAIN_LENGTH + for i in 1..=TEST_CHAIN_LENGTH { + create_block(&mut conn, BlockNumber::from(i)); + } + + // Create accounts + queries::upsert_accounts( + &mut conn, + &[mock_block_account_update(account1, 0)], + BlockNumber::from(1), + ) + .unwrap(); + queries::upsert_accounts( + &mut conn, + &[mock_block_account_update(account2, 0)], + BlockNumber::from(1), + ) + .unwrap(); + + // Insert many entries for both accounts + let vault_key = AssetVaultKey::new_unchecked(num_to_word(100)); + let asset = Asset::Fungible(FungibleAsset::new(faucet_id, 1000).unwrap()); + + for block_num in 1..=TEST_CHAIN_LENGTH { + queries::insert_account_vault_asset( + &mut conn, + account1, + BlockNumber::from(block_num), + vault_key, + Some(asset), + ) + .unwrap(); + + let asset2 = Asset::Fungible(FungibleAsset::new(faucet_id, 2000).unwrap()); + queries::insert_account_vault_asset( + &mut conn, + account2, + BlockNumber::from(block_num), + vault_key, + Some(asset2), + ) + .unwrap(); + } + + // Run cleanup for all accounts with chain_tip at TEST_CHAIN_LENGTH + let chain_tip = BlockNumber::from(TEST_CHAIN_LENGTH); + let (vault_deleted, _) = cleanup_all_accounts(&mut conn, chain_tip).unwrap(); + + // Each account should have deleted entries for blocks 1 to cutoff-1 + let cutoff = TEST_CHAIN_LENGTH - HISTORICAL_BLOCK_RETENTION; + let expected_deleted_per_account = cutoff - 1; + assert_eq!( + vault_deleted, + (expected_deleted_per_account * 2) as usize, + "Should have deleted entries from both accounts" + ); + + // Each account should have entries for blocks cutoff to TEST_CHAIN_LENGTH + use schema::account_vault_assets::dsl; + let count1 = dsl::account_vault_assets + .filter(dsl::account_id.eq(account1.to_bytes())) + .count() + .get_result::(&mut conn) + .unwrap(); + + let count2 = dsl::account_vault_assets + .filter(dsl::account_id.eq(account2.to_bytes())) + .count() + .get_result::(&mut conn) + .unwrap(); + + let expected_remaining = TEST_CHAIN_LENGTH - expected_deleted_per_account; + assert_eq!(count1 as u32, expected_remaining); + assert_eq!(count2 as u32, expected_remaining); +} + #[test] #[miden_node_test_macro::enable_logging] fn test_note_metadata_with_attachment_roundtrip() { diff --git a/crates/store/src/errors.rs b/crates/store/src/errors.rs index 0267a42e7..e0ee28f32 100644 --- a/crates/store/src/errors.rs +++ b/crates/store/src/errors.rs @@ -135,6 +135,8 @@ pub enum DatabaseError { SqlValueConversion(#[from] DatabaseTypeConversionError), #[error("Not implemented: {0}")] NotImplemented(String), + #[error("query timeout: {0}")] + QueryTimeout(String), #[error("storage root not found for account {account_id}, slot {slot_name}, block {block_num}")] StorageRootNotFound { account_id: AccountId, diff --git a/crates/store/src/inner_forest/mod.rs b/crates/store/src/inner_forest/mod.rs index 2154cde70..60f952465 100644 --- a/crates/store/src/inner_forest/mod.rs +++ b/crates/store/src/inner_forest/mod.rs @@ -595,4 +595,6 @@ impl InnerForest { ); } } + + // TODO: tie in-memory forest retention to DB pruning policy once forest queries rely on it. }