diff --git a/consensus/src/consensus/mod.rs b/consensus/src/consensus/mod.rs index deb0ebdacf..46eaa6e205 100644 --- a/consensus/src/consensus/mod.rs +++ b/consensus/src/consensus/mod.rs @@ -307,13 +307,39 @@ impl Consensus { is_consensus_exiting, }; - // TODO (post HF): remove the upgrade - // Database upgrade to include pruning samples - this.pruning_samples_database_upgrade(); + // Run database upgrades if any + this.run_database_upgrades(); this } + /// A procedure for calling database upgrades which are self-contained (i.e., do not require knowing the DB version) + fn run_database_upgrades(&self) { + // Upgrade to initialize the new retention root field correctly + self.retention_root_database_upgrade(); + + // TODO (post HF): remove this upgrade + // Database upgrade to include pruning samples + self.pruning_samples_database_upgrade(); + } + + fn retention_root_database_upgrade(&self) { + let mut pruning_point_store = self.pruning_point_store.write(); + if pruning_point_store.retention_period_root().unwrap_option().is_none() { + let mut batch = rocksdb::WriteBatch::default(); + if self.config.is_archival { + // The retention checkpoint is what was previously known as history root + let retention_checkpoint = pruning_point_store.retention_checkpoint().unwrap(); + pruning_point_store.set_retention_period_root(&mut batch, retention_checkpoint).unwrap(); + } else { + // For non-archival nodes the retention root was the pruning point + let pruning_point = pruning_point_store.get().unwrap().pruning_point; + pruning_point_store.set_retention_period_root(&mut batch, pruning_point).unwrap(); + } + self.db.write(batch).unwrap(); + } + } + fn pruning_samples_database_upgrade(&self) { // // For the first time this version runs, make sure we populate pruning samples @@ -622,8 +648,7 @@ impl ConsensusApi for Consensus { } fn get_retention_period_root(&self) -> Hash { - let pruning_point_read = self.pruning_point_store.read(); - pruning_point_read.retention_period_root().unwrap_or(pruning_point_read.pruning_point().unwrap()) + self.pruning_point_store.read().retention_period_root().unwrap() } /// Estimates the number of blocks and headers stored in the node database. diff --git a/consensus/src/pipeline/pruning_processor/processor.rs b/consensus/src/pipeline/pruning_processor/processor.rs index 2858032cbf..57c7776ddf 100644 --- a/consensus/src/pipeline/pruning_processor/processor.rs +++ b/consensus/src/pipeline/pruning_processor/processor.rs @@ -134,9 +134,9 @@ impl PruningProcessor { fn recover_pruning_workflows_if_needed(&self) { let pruning_point_read = self.pruning_point_store.read(); let pruning_point = pruning_point_read.pruning_point().unwrap(); - let retention_checkpoint = pruning_point_read.retention_checkpoint().unwrap_option(); - let retention_period_root = pruning_point_read.retention_period_root().unwrap_or(pruning_point); - let pruning_utxoset_position = self.pruning_utxoset_stores.read().utxoset_position().unwrap_option(); + let retention_checkpoint = pruning_point_read.retention_checkpoint().unwrap(); + let retention_period_root = pruning_point_read.retention_period_root().unwrap(); + let pruning_utxoset_position = self.pruning_utxoset_stores.read().utxoset_position().unwrap(); drop(pruning_point_read); debug!( @@ -144,14 +144,12 @@ impl PruningProcessor { pruning_point, retention_checkpoint, pruning_utxoset_position ); - if let Some(pruning_utxoset_position) = pruning_utxoset_position { - // This indicates the node crashed during a former pruning point move and we need to recover - if pruning_utxoset_position != pruning_point { - info!("Recovering pruning utxo-set from {} to the pruning point {}", pruning_utxoset_position, pruning_point); - if !self.advance_pruning_utxoset(pruning_utxoset_position, pruning_point) { - info!("Interrupted while advancing the pruning point UTXO set: Process is exiting"); - return; - } + // This indicates the node crashed during a former pruning point move and we need to recover + if pruning_utxoset_position != pruning_point { + info!("Recovering pruning utxo-set from {} to the pruning point {}", pruning_utxoset_position, pruning_point); + if !self.advance_pruning_utxoset(pruning_utxoset_position, pruning_point) { + info!("Interrupted while advancing the pruning point UTXO set: Process is exiting"); + return; } } @@ -161,15 +159,12 @@ impl PruningProcessor { retention_period_root, pruning_point ); - if let Some(retention_checkpoint) = retention_checkpoint { - // This indicates the node crashed or was forced to stop during a former data prune operation hence - // we need to complete it - if retention_checkpoint != retention_period_root { - self.prune(pruning_point, retention_period_root); - } - } - // TODO: both `pruning_utxoset_position` and `retention_checkpoint` are new DB keys so for now we assume correct state if the keys are missing + // This indicates the node crashed or was forced to stop during a former data prune operation hence + // we need to complete it + if retention_checkpoint != retention_period_root { + self.prune(pruning_point, retention_period_root); + } } fn advance_pruning_point_and_candidate_if_possible(&self, sink_ghostdag_data: CompactGhostdagData) { @@ -182,8 +177,7 @@ impl PruningProcessor { ); if !new_pruning_points.is_empty() { - let retention_period_root = - pruning_point_read.retention_period_root().unwrap_or(pruning_point_read.pruning_point().unwrap()); + let retention_period_root = pruning_point_read.retention_period_root().unwrap(); // Update past pruning points and pruning point stores let mut batch = WriteBatch::default(); @@ -549,7 +543,7 @@ impl PruningProcessor { /// This function is expected to be called only when a new pruning point is determined and right before /// doing any pruning. Pruning point must be the new pruning point this node is advancing to. /// - /// retention_period_root is guaranteed to be in the past(pruning_point) + /// The returned retention_period_root is guaranteed to be in past(pruning_point) or the pruning point itself. fn advance_retention_period_root(&self, retention_period_root: Hash, pruning_point: Hash) -> Hash { match self.config.retention_period_days { // If the retention period wasn't set, immediately default to the pruning point. @@ -561,7 +555,11 @@ impl PruningProcessor { // to this function serves as a clamp. let retention_period_ms = (retention_period_days * 86400.0 * 1000.0).ceil() as u64; + // The target timestamp we would like to find a point below let retention_period_root_ts_target = unix_now().saturating_sub(retention_period_ms); + + // Iterate from the new pruning point to the prev retention root and search for the first point with enough days above it. + // Note that prev retention root is always a past pruning point, so we can iterate via pruning samples until we reach it. let mut new_retention_period_root = pruning_point; trace!( @@ -569,7 +567,7 @@ impl PruningProcessor { retention_period_root_ts_target, ); - while self.reachability_service.is_dag_ancestor_of(retention_period_root, new_retention_period_root) { + while new_retention_period_root != retention_period_root { let block = new_retention_period_root; let timestamp = self.headers_store.get_timestamp(block).unwrap(); @@ -580,16 +578,7 @@ impl PruningProcessor { break; } - new_retention_period_root = - self.pruning_samples_store.pruning_sample_from_pov(block).unwrap_or(retention_period_root); - } - - // We may be at a pruning sample that's in the past or anticone of current retention_period_root. Clamp to retention_period_root here. - // Happens when the node is newly started and retention_period_root itself still doesn't cover the full retention period. - let is_new_root_in_future_of_old = - self.reachability_service.is_dag_ancestor_of(retention_period_root, new_retention_period_root); - if !is_new_root_in_future_of_old { - new_retention_period_root = retention_period_root; + new_retention_period_root = self.pruning_samples_store.pruning_sample_from_pov(block).unwrap(); } new_retention_period_root diff --git a/kaspad/src/daemon.rs b/kaspad/src/daemon.rs index 1bb08239ea..5df7c4bf2e 100644 --- a/kaspad/src/daemon.rs +++ b/kaspad/src/daemon.rs @@ -3,6 +3,7 @@ use std::{fs, path::PathBuf, process::exit, sync::Arc, time::Duration}; use async_channel::unbounded; use kaspa_consensus_core::{ config::ConfigBuilder, + constants::TRANSIENT_BYTE_TO_MASS_FACTOR, errors::config::{ConfigError, ConfigResult}, }; use kaspa_consensus_notify::{root::ConsensusNotificationRoot, service::NotifyService}; @@ -50,9 +51,9 @@ pub const DESIRED_DAEMON_SOFT_FD_LIMIT: u64 = 8 * 1024; /// this value may impact the database performance). pub const MINIMUM_DAEMON_SOFT_FD_LIMIT: u64 = 4 * 1024; -// If set, the retention period days must at least be this value. -// This value is assumed to be greater than all pruning periods. -const MINIMUM_RETENTION_PERIOD_DAYS: f64 = 3.0; +/// If set, the retention period days must be at least this value +/// (otherwise it is meaningless since pruning periods are typically at least 2 days long) +const MINIMUM_RETENTION_PERIOD_DAYS: f64 = 2.0; const ONE_GIGABYTE: f64 = 1_000_000_000.0; use crate::args::Args; @@ -238,8 +239,6 @@ pub fn create_core_with_runtime(runtime: &Runtime, args: &Args, fd_total_budget: .build(), ); - // TODO: Validate `config` forms a valid set of properties - let app_dir = get_app_dir_from_args(args); let db_dir = app_dir.join(network.to_prefixed()).join(DEFAULT_DATA_DIR); @@ -282,7 +281,7 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm if !args.archival && args.retention_period_days.is_some() { let retention_period_days = args.retention_period_days.unwrap(); - // Look at only post-fork values + // Look only at post-fork values (which are the worst-case) let finality_depth = config.finality_depth().after(); let target_time_per_block = config.target_time_per_block().after(); // in ms @@ -291,10 +290,11 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm let total_blocks = retention_period_milliseconds / target_time_per_block; // This worst case usage only considers block space. It does not account for usage of // other stores (reachability, block status, mempool, etc.) - let worst_case_usage = ((total_blocks + finality_depth) * (config.max_block_mass / 4)) as f64 / ONE_GIGABYTE; + let worst_case_usage = + ((total_blocks + finality_depth) * (config.max_block_mass / TRANSIENT_BYTE_TO_MASS_FACTOR)) as f64 / ONE_GIGABYTE; info!( - "Retention period is set to {} days. Disk usage may be up to {:.2} GB for block space per pruning period.", + "Retention period is set to {} days. Disk usage may be up to {:.2} GB for block space required for this period.", retention_period_days, worst_case_usage ); } else {