diff --git a/CHANGELOG.md b/CHANGELOG.md index 7202738f58f..fd11100bc88 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,9 +29,12 @@ ### Added +- [#6082](https://github.com/ChainSafe/forest/issues/6082) Added `forest-cli snapshot export-status` and `forest-cli snapshot export-cancel` subcommands to monitor or cancel an export, respectively. + - [#6161](https://github.com/ChainSafe/forest/pull/6161) Added `forest-tool db import` subcommand. - [#6166](https://github.com/ChainSafe/forest/pull/6166) Gate `JWT` expiration validation behind environment variable `FOREST_JWT_DISABLE_EXP_VALIDATION`. + - [#6171](https://github.com/ChainSafe/forest/pull/6171) Enable V2 API support for basic Eth RPC methods: `EthChainId`, `EthProtocolVersion`, `EthSyncing`, `EthAccounts`. ### Changed diff --git a/scripts/tests/calibnet_export_check.sh b/scripts/tests/calibnet_export_check.sh index 44163f757bc..8309cd49739 100755 --- a/scripts/tests/calibnet_export_check.sh +++ b/scripts/tests/calibnet_export_check.sh @@ -11,9 +11,53 @@ source "$(dirname "$0")/harness.sh" forest_init "$@" +retries=10 +sleep_interval=0.1 + echo "Cleaning up the initial snapshot" rm --force --verbose ./*.{car,car.zst,sha256sum} +output=$($FOREST_CLI_PATH snapshot export-status --format json) +is_exporting=$(echo "$output" | jq -r '.exporting') +echo "Testing that no export is in progress" +if [ "$is_exporting" == "true" ]; then + exit 1 +fi + +echo "Exporting zstd compressed snapshot in $format format" +$FOREST_CLI_PATH snapshot export --format "$format" > snapshot_export.log 2>&1 & + +echo "Testing that export is in progress" +for ((i=1; i<=retries; i++)); do + output=$($FOREST_CLI_PATH snapshot export-status --format json) + is_exporting=$(echo "$output" | jq -r '.exporting') + if [ "$is_exporting" == "true" ]; then + break + fi + if [ $i -eq $retries ]; then + cat snapshot_export.log + exit 1 + fi + sleep $sleep_interval +done + +$FOREST_CLI_PATH snapshot export-cancel + +echo "Testing that export has been cancelled" +for ((i=1; i<=retries; i++)); do + output=$($FOREST_CLI_PATH snapshot export-status --format json) + is_exporting=$(echo "$output" | jq -r '.exporting') + is_cancelled=$(echo "$output" | jq -r '.cancelled') + if [ "$is_exporting" == "false" ] && [ "$is_cancelled" == "true" ]; then + break + fi + if [ $i -eq $retries ]; then + cat snapshot_export.log + exit 1 + fi + sleep $sleep_interval +done + echo "Exporting zstd compressed snapshot at genesis" $FOREST_CLI_PATH snapshot export --tipset 0 --format "$format" diff --git a/src/chain/mod.rs b/src/chain/mod.rs index 193baf4b02a..cbbf4ef718a 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -160,7 +160,8 @@ async fn export_to_forest_car( tipset.clone().chain_owned(Arc::clone(db)), stateroot_lookup_limit, ) - .with_seen(seen), + .with_seen(seen) + .track_progress(true), ); // Encode Ipld key-value pairs in zstd frames diff --git a/src/cli/subcommands/snapshot_cmd.rs b/src/cli/subcommands/snapshot_cmd.rs index 7394bfcfb70..3d98addbc92 100644 --- a/src/cli/subcommands/snapshot_cmd.rs +++ b/src/cli/subcommands/snapshot_cmd.rs @@ -7,6 +7,7 @@ use crate::cli_shared::snapshot::{self, TrustedVendor}; use crate::db::car::forest::new_forest_car_temp_path_in; use crate::networks::calibnet; use crate::rpc::chain::ForestChainExportDiffParams; +use crate::rpc::types::ApiExportResult; use crate::rpc::{self, chain::ForestChainExportParams, prelude::*}; use crate::shim::policy::policy_constants::CHAIN_FINALITY; use anyhow::Context as _; @@ -19,6 +20,12 @@ use std::{ }; use tokio::io::AsyncWriteExt; +#[derive(Debug, Clone, clap::ValueEnum)] +pub enum Format { + Json, + Text, +} + #[derive(Debug, Subcommand)] pub enum SnapshotCommands { /// Export a snapshot of the chain to `` @@ -42,6 +49,17 @@ pub enum SnapshotCommands { #[arg(long, value_enum, default_value_t = FilecoinSnapshotVersion::V2)] format: FilecoinSnapshotVersion, }, + /// Show status of the current export. + ExportStatus { + /// Wait until it completes and print progress. + #[arg(long)] + wait: bool, + /// Format of the output. `json` or `text`. + #[arg(long, value_enum, default_value_t = Format::Text)] + format: Format, + }, + /// Cancel the current export. + ExportCancel {}, /// Export a diff snapshot between `from` and `to` epochs to `` ExportDiff { /// `./forest_snapshot_diff_{chain}_{from}_{to}+{depth}.car.zst`. @@ -152,7 +170,7 @@ impl SnapshotCommands { // Manually construct RpcRequest because snapshot export could // take a few hours on mainnet - let hash_result = client + let export_result = client .call(ForestChainExport::request((params,))?.with_timeout(Duration::MAX)) .await?; @@ -161,13 +179,106 @@ impl SnapshotCommands { _ = handle.await; if !dry_run { - if let Some(hash) = hash_result { - save_checksum(&output_path, hash).await?; + match export_result.clone() { + ApiExportResult::Done(hash_opt) => { + // Move the file first; prevents orphaned checksum on persist error. + temp_path.persist(&output_path)?; + if let Some(hash) = hash_opt { + save_checksum(&output_path, hash).await?; + } + } + ApiExportResult::Cancelled => { /* no file to persist on cancel */ } } - temp_path.persist(output_path)?; } - println!("Export completed."); + match export_result { + ApiExportResult::Done(_) => { + println!("Export completed."); + } + ApiExportResult::Cancelled => { + println!("Export cancelled."); + } + } + Ok(()) + } + Self::ExportStatus { wait, format } => { + let result = client + .call( + ForestChainExportStatus::request(())?.with_timeout(Duration::from_secs(30)), + ) + .await?; + if !result.exporting + && let Format::Text = format + { + if result.cancelled { + println!("No export in progress (last export was cancelled)"); + } else { + println!("No export in progress"); + } + return Ok(()); + } + if wait { + let elapsed = chrono::Utc::now() + .signed_duration_since(result.start_time) + .to_std() + .unwrap_or(Duration::ZERO); + let pb = ProgressBar::new(10000) + .with_elapsed(elapsed) + .with_message("Exporting"); + pb.set_style( + ProgressStyle::with_template( + "[{elapsed_precise}] [{wide_bar}] {percent}% {msg} ", + ) + .expect("indicatif template must be valid") + .progress_chars("#>-"), + ); + loop { + let result = client + .call( + ForestChainExportStatus::request(())? + .with_timeout(Duration::from_secs(30)), + ) + .await?; + if result.cancelled { + pb.set_message("Export cancelled"); + pb.abandon(); + + return Ok(()); + } + let position = (result.progress.clamp(0.0, 1.0) * 10000.0).trunc() as u64; + pb.set_position(position); + + if position >= 10000 { + break; + } + tokio::time::sleep(Duration::from_millis(500)).await; + } + pb.finish_with_message("Export completed"); + + return Ok(()); + } + match format { + Format::Text => { + println!("Exporting: {:.1}%", result.progress.clamp(0.0, 1.0) * 100.0); + } + Format::Json => { + println!("{}", serde_json::to_string_pretty(&result)?); + } + } + + Ok(()) + } + Self::ExportCancel {} => { + let result = client + .call( + ForestChainExportCancel::request(())?.with_timeout(Duration::from_secs(30)), + ) + .await?; + if result { + println!("Export cancelled."); + } else { + println!("No export in progress to cancel."); + } Ok(()) } Self::ExportDiff { diff --git a/src/ipld/util.rs b/src/ipld/util.rs index b4ba68cc5ed..4a60bd59b35 100644 --- a/src/ipld/util.rs +++ b/src/ipld/util.rs @@ -8,15 +8,58 @@ use crate::shim::clock::ChainEpoch; use crate::utils::db::car_stream::CarBlock; use crate::utils::encoding::extract_cids; use crate::utils::multihash::prelude::*; +use chrono::{DateTime, Utc}; use cid::Cid; use futures::Stream; use fvm_ipld_blockstore::Blockstore; +use parking_lot::Mutex; use pin_project_lite::pin_project; use std::borrow::Borrow; use std::collections::VecDeque; use std::pin::Pin; +use std::sync::LazyLock; use std::task::{Context, Poll}; +#[derive(Default)] +pub struct ExportStatus { + pub epoch: i64, + pub initial_epoch: i64, + pub exporting: bool, + pub cancelled: bool, + pub start_time: DateTime, +} + +pub static CHAIN_EXPORT_STATUS: LazyLock> = + LazyLock::new(|| ExportStatus::default().into()); + +fn update_epoch(new_value: i64) { + let mut mutex = CHAIN_EXPORT_STATUS.lock(); + mutex.epoch = new_value; + if mutex.initial_epoch == 0 { + mutex.initial_epoch = new_value; + } +} + +pub fn start_export() { + let mut mutex = CHAIN_EXPORT_STATUS.lock(); + mutex.epoch = 0; + mutex.initial_epoch = 0; + mutex.exporting = true; + mutex.cancelled = false; + mutex.start_time = Utc::now(); +} + +pub fn end_export() { + let mut mutex = CHAIN_EXPORT_STATUS.lock(); + mutex.exporting = false; +} + +pub fn cancel_export() { + let mut mutex = CHAIN_EXPORT_STATUS.lock(); + mutex.exporting = false; + mutex.cancelled = true; +} + fn should_save_block_to_snapshot(cid: Cid) -> bool { // Don't include identity CIDs. // We only include raw and dagcbor, for now. @@ -112,6 +155,7 @@ pin_project! { seen: CidHashSet, stateroot_limit_exclusive: ChainEpoch, fail_on_dead_links: bool, + track_progress: bool, } } @@ -126,6 +170,11 @@ impl ChainStream { self } + pub fn track_progress(mut self, track_progress: bool) -> Self { + self.track_progress = track_progress; + self + } + #[allow(dead_code)] pub fn into_seen(self) -> CidHashSet { self.seen @@ -155,6 +204,7 @@ pub fn stream_chain, ITER: Iterator seen: CidHashSet::default(), stateroot_limit_exclusive, fail_on_dead_links: true, + track_progress: false, } } @@ -197,6 +247,9 @@ impl, ITER: Iterator + Unpin> Stream } } Iterate(epoch, block_cid, _type, cid_vec) => { + if *this.track_progress { + update_epoch(*epoch); + } while let Some(cid) = cid_vec.pop_front() { // The link traversal implementation assumes there are three types of encoding: // 1. DAG_CBOR: needs to be reachable, so we add it to the queue and load. @@ -242,6 +295,9 @@ impl, ITER: Iterator + Unpin> Stream for block in tipset.borrow().block_headers() { let (cid, data) = block.car_block()?; if this.seen.insert(cid) { + if *this.track_progress { + update_epoch(block.epoch); + } // Make sure we always yield a block otherwise. this.dfs.push_back(Emit(cid, Some(data))); diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index f052bf89adb..43c5c1be681 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -12,13 +12,14 @@ use crate::chain::index::ResolveNullTipset; use crate::chain::{ChainStore, ExportOptions, FilecoinSnapshotVersion, HeadChange}; use crate::cid_collections::CidHashSet; use crate::ipld::DfsIter; +use crate::ipld::{CHAIN_EXPORT_STATUS, cancel_export, end_export, start_export}; use crate::lotus_json::{HasLotusJson, LotusJson, lotus_json_with_self}; #[cfg(test)] use crate::lotus_json::{assert_all_snapshots, assert_unchanged_via_json}; use crate::message::{ChainMessage, SignedMessage}; use crate::rpc::eth::{EthLog, eth_logs_with_filter, types::ApiHeaders, types::EthFilterSpec}; use crate::rpc::f3::F3ExportLatestSnapshot; -use crate::rpc::types::{ApiTipsetKey, Event}; +use crate::rpc::types::{ApiExportResult, ApiExportStatus, ApiTipsetKey, Event}; use crate::rpc::{ApiPaths, Ctx, EthEventHandler, Permission, RpcMethod, ServerError}; use crate::shim::clock::ChainEpoch; use crate::shim::error::ExitCode; @@ -49,10 +50,12 @@ use tokio::sync::{ broadcast::{self, Receiver as Subscriber}, }; use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; const HEAD_CHANNEL_CAPACITY: usize = 10; -static CHAIN_EXPORT_LOCK: LazyLock> = LazyLock::new(|| Mutex::new(())); +static CHAIN_EXPORT_LOCK: LazyLock>> = + LazyLock::new(|| Mutex::new(None)); /// Subscribes to head changes from the chain store and broadcasts new blocks. /// @@ -380,7 +383,7 @@ impl RpcMethod<1> for ForestChainExport { const PERMISSION: Permission = Permission::Read; type Params = (ForestChainExportParams,); - type Ok = Option; + type Ok = ApiExportResult; async fn handle( ctx: Ctx, @@ -396,10 +399,17 @@ impl RpcMethod<1> for ForestChainExport { dry_run, } = params; - let _locked = CHAIN_EXPORT_LOCK.try_lock(); - if _locked.is_err() { - return Err(anyhow::anyhow!("Another chain export job is still in progress").into()); + let token = CancellationToken::new(); + { + let mut guard = CHAIN_EXPORT_LOCK.lock().await; + if guard.is_some() { + return Err( + anyhow::anyhow!("A chain export is still in progress. Cancel it with the export-cancel subcommand if needed.").into(), + ); + } + *guard = Some(token.clone()); } + start_export(); let head = ctx.chain_store().load_required_tipset_or_heaviest(&tsk)?; let start_ts = @@ -415,18 +425,27 @@ impl RpcMethod<1> for ForestChainExport { } else { tokio_util::either::Either::Right(tokio::fs::File::create(&output_path).await?) }; - match match version { + let result = match version { FilecoinSnapshotVersion::V1 => { - crate::chain::export::( - &ctx.store_owned(), - &start_ts, - recent_roots, - writer, - options, - ) - .await + let db = ctx.store_owned(); + + let chain_export = + crate::chain::export::(&db, &start_ts, recent_roots, writer, options); + + tokio::select! { + result = chain_export => { + result.map(|checksum_opt| ApiExportResult::Done(checksum_opt.map(|hash| hash.encode_hex()))) + }, + _ = token.cancelled() => { + cancel_export(); + tracing::warn!("Snapshot export was cancelled"); + Ok(ApiExportResult::Cancelled) + }, + } } FilecoinSnapshotVersion::V2 => { + let db = ctx.store_owned(); + let f3_snap_tmp_path = { let mut f3_snap_dir = output_path.clone(); let mut builder = tempfile::Builder::new(); @@ -448,23 +467,94 @@ impl RpcMethod<1> for ForestChainExport { } } }; - crate::chain::export_v2::( - &ctx.store_owned(), + + let chain_export = crate::chain::export_v2::( + &db, f3_snap, &start_ts, recent_roots, writer, options, - ) - .await + ); + + tokio::select! { + result = chain_export => { + result.map(|checksum_opt| ApiExportResult::Done(checksum_opt.map(|hash| hash.encode_hex()))) + }, + _ = token.cancelled() => { + cancel_export(); + tracing::warn!("Snapshot export was cancelled"); + Ok(ApiExportResult::Cancelled) + }, + } } - } { - Ok(checksum_opt) => Ok(checksum_opt.map(|hash| hash.encode_hex())), + }; + end_export(); + // Clean up token + let mut guard = CHAIN_EXPORT_LOCK.lock().await; + *guard = None; + match result { + Ok(export_result) => Ok(export_result), Err(e) => Err(anyhow::anyhow!(e).into()), } } } +pub enum ForestChainExportStatus {} +impl RpcMethod<0> for ForestChainExportStatus { + const NAME: &'static str = "Forest.ChainExportStatus"; + const PARAM_NAMES: [&'static str; 0] = []; + const API_PATHS: BitFlags = ApiPaths::all(); + const PERMISSION: Permission = Permission::Read; + + type Params = (); + type Ok = ApiExportStatus; + + async fn handle(_ctx: Ctx, (): Self::Params) -> Result { + let mutex = CHAIN_EXPORT_STATUS.lock(); + + let progress = if mutex.initial_epoch == 0 { + 0.0 + } else { + let p = 1.0 - ((mutex.epoch as f64) / (mutex.initial_epoch as f64)); + if p.is_finite() { + p.clamp(0.0, 1.0) + } else { + 0.0 + } + }; + + let status = ApiExportStatus { + progress, + exporting: mutex.exporting, + cancelled: mutex.cancelled, + start_time: mutex.start_time, + }; + + Ok(status) + } +} + +pub enum ForestChainExportCancel {} +impl RpcMethod<0> for ForestChainExportCancel { + const NAME: &'static str = "Forest.ChainExportCancel"; + const PARAM_NAMES: [&'static str; 0] = []; + const API_PATHS: BitFlags = ApiPaths::all(); + const PERMISSION: Permission = Permission::Read; + + type Params = (); + type Ok = bool; + + async fn handle(_ctx: Ctx, (): Self::Params) -> Result { + if let Some(token) = CHAIN_EXPORT_LOCK.lock().await.as_ref() { + token.cancel(); + return Ok(true); + } + + Ok(false) + } +} + pub enum ForestChainExportDiff {} impl RpcMethod<1> for ForestChainExportDiff { const NAME: &'static str = "Forest.ChainExportDiff"; @@ -529,7 +619,7 @@ impl RpcMethod<1> for ChainExport { const PERMISSION: Permission = Permission::Read; type Params = (ChainExportParams,); - type Ok = Option; + type Ok = ApiExportResult; async fn handle( ctx: Ctx, diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 460a0121b25..5a4330895c6 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -83,6 +83,8 @@ macro_rules! for_each_rpc_method { $callback!($crate::rpc::chain::ChainTipSetWeight); $callback!($crate::rpc::chain::ForestChainExport); $callback!($crate::rpc::chain::ForestChainExportDiff); + $callback!($crate::rpc::chain::ForestChainExportStatus); + $callback!($crate::rpc::chain::ForestChainExportCancel); // common vertical $callback!($crate::rpc::common::Session); diff --git a/src/rpc/types/mod.rs b/src/rpc/types/mod.rs index ec4380d7839..d69dd8d4284 100644 --- a/src/rpc/types/mod.rs +++ b/src/rpc/types/mod.rs @@ -30,6 +30,7 @@ use crate::shim::{ message::Message, sector::{ExtendedSectorInfo, RegisteredSealProof, SectorNumber, StoragePower}, }; +use chrono::Utc; use cid::Cid; use fil_actors_shared::fvm_ipld_bitfield::BitField; use fvm_ipld_encoding::RawBytes; @@ -561,3 +562,21 @@ impl From for Event { } } } + +#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone, PartialEq)] +pub struct ApiExportStatus { + pub progress: f64, + pub exporting: bool, + pub cancelled: bool, + pub start_time: chrono::DateTime, +} + +lotus_json_with_self!(ApiExportStatus); + +#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone, PartialEq, Eq, Hash)] +pub enum ApiExportResult { + Done(Option), + Cancelled, +} + +lotus_json_with_self!(ApiExportResult); diff --git a/src/tool/subcommands/api_cmd/test_snapshots_ignored.txt b/src/tool/subcommands/api_cmd/test_snapshots_ignored.txt index 7f8b7a4da9a..9e5938b7bc7 100644 --- a/src/tool/subcommands/api_cmd/test_snapshots_ignored.txt +++ b/src/tool/subcommands/api_cmd/test_snapshots_ignored.txt @@ -79,7 +79,9 @@ Filecoin.WalletSign Filecoin.WalletSignMessage Filecoin.Web3ClientVersion Forest.ChainExport +Forest.ChainExportCancel Forest.ChainExportDiff +Forest.ChainExportStatus Forest.ChainGetMinBaseFee Forest.NetInfo Forest.SnapshotGC