From 725317fcbc4cb20600ebde88c73a3ed0a35d39ab Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Mon, 13 Oct 2025 21:18:40 +0800 Subject: [PATCH 1/7] feat: `forest-tool db import` --- Cargo.lock | 2 + Cargo.toml | 2 +- docs/docs/users/reference/cli.sh | 1 + src/daemon/bundle.rs | 5 +-- src/daemon/db_util.rs | 5 +-- src/db/car/mod.rs | 2 +- src/tool/subcommands/archive_cmd.rs | 6 +-- src/tool/subcommands/db_cmd.rs | 62 +++++++++++++++++++++++++++-- src/utils/db/car_stream.rs | 46 +++++++++++++++++++-- src/utils/io/mod.rs | 8 ++++ 10 files changed, 118 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2209d7520638..7badfd6efaa2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9834,6 +9834,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb066959b24b5196ae73cb057f45598450d2c5f71460e98c49b738086eff9c06" dependencies = [ "bytes", + "futures-io", + "futures-util", "tokio-util", ] diff --git a/Cargo.toml b/Cargo.toml index 19d6e50ddc9e..09e06aed7729 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -214,7 +214,7 @@ tower-http = { version = "0.6", features = ["compression-full", "sensitive-heade tracing = "0.1" tracing-appender = "0.2" tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } -unsigned-varint = { version = "0.8", features = ["codec"] } +unsigned-varint = { version = "0.8", features = ["codec", "futures"] } url = { version = "2", features = ["serde"] } uuid = { version = "1", features = ["v4", "serde"] } walkdir = "2" diff --git a/docs/docs/users/reference/cli.sh b/docs/docs/users/reference/cli.sh index 1cc235b056b1..0dd1176b2a12 100755 --- a/docs/docs/users/reference/cli.sh +++ b/docs/docs/users/reference/cli.sh @@ -130,6 +130,7 @@ generate_markdown_section "forest-tool" "archive sync-bucket" generate_markdown_section "forest-tool" "db" generate_markdown_section "forest-tool" "db stats" generate_markdown_section "forest-tool" "db destroy" +generate_markdown_section "forest-tool" "db import" generate_markdown_section "forest-tool" "car" generate_markdown_section "forest-tool" "car concat" diff --git a/src/daemon/bundle.rs b/src/daemon/bundle.rs index f90adda70636..d41dd963fd38 100644 --- a/src/daemon/bundle.rs +++ b/src/daemon/bundle.rs @@ -48,10 +48,7 @@ pub async fn load_actor_bundles_from_path( "Bundle file not found at {}", bundle_path.as_ref().display() ); - let mut car_stream = CarStream::new(tokio::io::BufReader::new( - tokio::fs::File::open(bundle_path.as_ref()).await?, - )) - .await?; + let mut car_stream = CarStream::new_from_path(bundle_path.as_ref()).await?; // Validate the bundle let roots = HashSet::from_iter(car_stream.header_v1.roots.iter()); diff --git a/src/daemon/db_util.rs b/src/daemon/db_util.rs index c314c0adac65..3fb7cf90fb23 100644 --- a/src/daemon/db_util.rs +++ b/src/daemon/db_util.rs @@ -305,10 +305,7 @@ async fn transcode_into_forest_car(from: &Path, to: &Path) -> anyhow::Result<()> to = %to.display(), "transcoding into forest car" ); - let car_stream = CarStream::new(tokio::io::BufReader::new( - tokio::fs::File::open(from).await?, - )) - .await?; + let car_stream = CarStream::new_from_path(from).await?; let roots = car_stream.header_v1.roots.clone(); let mut writer = tokio::io::BufWriter::new(tokio::fs::File::create(to).await?); diff --git a/src/db/car/mod.rs b/src/db/car/mod.rs index efb97fc576ee..6b74bab81edd 100644 --- a/src/db/car/mod.rs +++ b/src/db/car/mod.rs @@ -33,7 +33,7 @@ pub type CacheKey = u64; type FrameOffset = u64; /// According to FRC-0108, v2 snapshots have exactly one root pointing to metadata -const V2_SNAPSHOT_ROOT_COUNT: usize = 1; +pub const V2_SNAPSHOT_ROOT_COUNT: usize = 1; pub static ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE: LazyLock = LazyLock::new(|| { const ENV_KEY: &str = "FOREST_ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE"; diff --git a/src/tool/subcommands/archive_cmd.rs b/src/tool/subcommands/archive_cmd.rs index fb338c89c27e..27de4df3008e 100644 --- a/src/tool/subcommands/archive_cmd.rs +++ b/src/tool/subcommands/archive_cmd.rs @@ -678,11 +678,7 @@ async fn merge_f3_snapshot(filecoin: PathBuf, f3: PathBuf, output: PathBuf) -> a let mut f3_data = File::open(f3)?; let f3_cid = crate::f3::snapshot::get_f3_snapshot_cid(&mut f3_data)?; - let car_stream = CarStream::new(tokio::io::BufReader::new( - tokio::fs::File::open(&filecoin).await?, - )) - .await?; - + let car_stream = CarStream::new_from_path(&filecoin).await?; let chain_head = car_stream.header_v1.roots.clone(); println!("f3 snapshot cid: {f3_cid}"); diff --git a/src/tool/subcommands/db_cmd.rs b/src/tool/subcommands/db_cmd.rs index 60ab7959c38b..4c6711011fa7 100644 --- a/src/tool/subcommands/db_cmd.rs +++ b/src/tool/subcommands/db_cmd.rs @@ -5,9 +5,14 @@ use std::path::PathBuf; use crate::cli::subcommands::prompt_confirm; use crate::cli_shared::{chain_path, read_config}; -use crate::db::db_engine::db_root; +use crate::db::BlockstoreWithWriteBuffer; +use crate::db::db_engine::{db_root, open_db}; use crate::networks::NetworkChain; +use crate::utils::db::car_stream::CarStream; use clap::Subcommand; +use fvm_ipld_blockstore::Blockstore; +use indicatif::{ProgressBar, ProgressStyle}; +use tokio_stream::StreamExt; use tracing::error; #[derive(Debug, Subcommand)] @@ -33,13 +38,25 @@ pub enum DBCommands { #[arg(long)] chain: Option, }, + /// Import CAR files into the key-value store + Import { + /// Snapshot input paths. Supports `.car`, `.car.zst`, and `.forest.car.zst`. + #[arg(num_args = 1.., required = true)] + snapshot_files: Vec, + /// Filecoin network chain + #[arg(long, required = true)] + chain: NetworkChain, + /// Optional path to the database folder that powers a Forest node + #[arg(long)] + db: Option, + }, } impl DBCommands { - pub async fn run(&self) -> anyhow::Result<()> { + pub async fn run(self) -> anyhow::Result<()> { match self { Self::Stats { config, chain } => { - use human_repr::HumanCount; + use human_repr::HumanCount as _; let (_, config) = read_config(config.as_ref(), chain.clone())?; @@ -80,6 +97,45 @@ impl DBCommands { } } } + Self::Import { + snapshot_files, + chain, + db, + } => { + const DB_WRITE_BUFFER_CAPACITY: usize = 10000; + + let db_root = if let Some(db) = db { + db + } else { + let (_, config) = read_config(None, Some(chain.clone()))?; + db_root(&chain_path(&config))? + }; + println!("Opening parity-db at {}", db_root.display()); + let db_writer = BlockstoreWithWriteBuffer::new_with_capacity( + open_db(db_root, &Default::default())?, + DB_WRITE_BUFFER_CAPACITY, + ); + + let pb = ProgressBar::new_spinner().with_style( + ProgressStyle::with_template("{spinner} {msg}") + .expect("indicatif template must be valid"), + ); + pb.enable_steady_tick(std::time::Duration::from_millis(100)); + + let mut total = 0; + for snap in snapshot_files { + let mut car = CarStream::new_from_path(&snap).await?; + while let Some(b) = car.try_next().await? { + db_writer.put_keyed(&b.cid, &b.data)?; + total += 1; + let text = format!("{total} blocks imported"); + pb.set_message(text); + } + } + drop(db_writer); + pb.finish(); + Ok(()) + } } } } diff --git a/src/utils/db/car_stream.rs b/src/utils/db/car_stream.rs index c38af112ecda..b42fe8fd6ffd 100644 --- a/src/utils/db/car_stream.rs +++ b/src/utils/db/car_stream.rs @@ -1,7 +1,9 @@ // Copyright 2019-2025 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT +use crate::chain::FilecoinSnapshotMetadata; use crate::db::car::plain::read_v2_header; +use crate::utils::io::skip_bytes; use crate::utils::multihash::prelude::*; use async_compression::tokio::bufread::ZstdDecoder; use bytes::{Buf, BufMut, Bytes, BytesMut}; @@ -14,6 +16,7 @@ use nunny::Vec as NonEmpty; use pin_project_lite::pin_project; use serde::{Deserialize, Serialize}; use std::io::{self, Cursor, Read, SeekFrom, Write}; +use std::path::Path; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::io::{ @@ -22,6 +25,7 @@ use tokio::io::{ }; use tokio_util::codec::Encoder; use tokio_util::codec::FramedRead; +use tokio_util::compat::TokioAsyncReadCompatExt as _; use tokio_util::either::Either; use unsigned_varint::codec::UviBytes; @@ -164,8 +168,7 @@ impl CarStream { // Skip v2 header bytes if let Some(header_v2) = &header_v2 { - let mut to_skip = vec![0; header_v2.data_offset as usize]; - reader.read_exact(&mut to_skip).await?; + reader = skip_bytes(reader, header_v2.data_offset as _).await?; } let max_car_v1_bytes = header_v2 @@ -187,11 +190,39 @@ impl CarStream { "invalid first block", )); } + + let first_block = if header_v1.roots.len() == crate::db::car::V2_SNAPSHOT_ROOT_COUNT { + let maybe_metadata_cid = header_v1.roots.first(); + if maybe_metadata_cid == &block.cid + && let Ok(metadata) = + fvm_ipld_encoding::from_slice::(&block.data) + { + // Skip the F3 block in the block stream + if metadata.f3_data.is_some() { + // manipulate the inner reader directly because `reader.next()` is slow for skipping the large F3 block + let mut inner_reader_compat = reader.into_inner().compat(); + let len = unsigned_varint::aio::read_usize(&mut inner_reader_compat) + .await + .map_err(io::Error::other)?; + let inner_reader = + skip_bytes(inner_reader_compat.into_inner(), len as _).await?; + reader = FramedRead::new(inner_reader, uvi_bytes()); + } + + // Skip the metadata block in the block stream + None + } else { + Some(block) + } + } else { + Some(block) + }; + Ok(CarStream { reader, header_v1, header_v2, - first_block: Some(block), + first_block, }) } else { Ok(CarStream { @@ -257,6 +288,15 @@ impl CarStream { } } +impl CarStream> { + pub async fn new_from_path(path: impl AsRef) -> io::Result { + Self::new(tokio::io::BufReader::new( + tokio::fs::File::open(path.as_ref()).await?, + )) + .await + } +} + impl Stream for CarStream { type Item = io::Result; diff --git a/src/utils/io/mod.rs b/src/utils/io/mod.rs index 1cb3c04429b0..0fc45836f3c2 100644 --- a/src/utils/io/mod.rs +++ b/src/utils/io/mod.rs @@ -44,6 +44,14 @@ pub fn create_new_sensitive_file(path: &Path) -> Result { Ok(file) } +/// Skips `n` bytes from the reader. +pub async fn skip_bytes(reader: T, n: u64) -> std::io::Result { + use tokio::io::AsyncReadExt as _; + let mut take = reader.take(n); + tokio::io::copy(&mut take, &mut tokio::io::sink()).await?; + Ok(take.into_inner()) +} + /// Converts a TOML file represented as a string to `S` /// /// # Example From d4c8ce46c2141194d495c753d6506593d3ab4dd0 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Tue, 14 Oct 2025 22:39:01 +0800 Subject: [PATCH 2/7] resolve AI comments --- src/db/car/plain.rs | 29 +++++++++++++++-------------- src/utils/db/car_stream.rs | 32 +++++++++++++++++++++++++------- src/utils/io/mod.rs | 11 +++++++++-- 3 files changed, 49 insertions(+), 23 deletions(-) diff --git a/src/db/car/plain.rs b/src/db/car/plain.rs index e22275347b87..bc5bec4225db 100644 --- a/src/db/car/plain.rs +++ b/src/db/car/plain.rs @@ -127,20 +127,21 @@ impl PlainCar { let mut cursor = positioned_io::Cursor::new(&reader); let position = cursor.position(); let header_v2 = read_v2_header(&mut cursor)?; - let (limit_position, version) = if let Some(header_v2) = &header_v2 { - cursor.set_position(position.saturating_add(header_v2.data_offset as u64)); - ( - Some( - cursor - .stream_position()? - .saturating_add(header_v2.data_size as u64), - ), - 2, - ) - } else { - cursor.set_position(position); - (None, 1) - }; + let (limit_position, version) = + if let Some(header_v2) = &header_v2 { + cursor.set_position(position.saturating_add( + u64::try_from(header_v2.data_offset).map_err(io::Error::other)?, + )); + ( + Some(cursor.stream_position()?.saturating_add( + u64::try_from(header_v2.data_size).map_err(io::Error::other)?, + )), + 2, + ) + } else { + cursor.set_position(position); + (None, 1) + }; let header_v1 = read_v1_header(&mut cursor)?; // When indexing, we perform small reads of the length and CID before seeking diff --git a/src/utils/db/car_stream.rs b/src/utils/db/car_stream.rs index ff47d2d1f465..972d7b7669c2 100644 --- a/src/utils/db/car_stream.rs +++ b/src/utils/db/car_stream.rs @@ -29,6 +29,9 @@ use unsigned_varint::codec::UviBytes; use crate::utils::encoding::from_slice_with_fallback; +// 8GiB +const MAX_FRAME_LEN: usize = 8 * 1024 * 1024 * 1024; + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct CarV1Header { // The roots array must contain one or more CIDs, @@ -168,12 +171,17 @@ impl CarStream { // Skip v2 header bytes if let Some(header_v2) = &header_v2 { - reader = skip_bytes(reader, header_v2.data_offset as _).await?; + reader = skip_bytes( + reader, + u64::try_from(header_v2.data_offset).map_err(std::io::Error::other)?, + ) + .await?; } let max_car_v1_bytes = header_v2 .as_ref() - .map(|h| h.data_size as u64) + .map(|h| u64::try_from(h.data_size).map_err(std::io::Error::other)) + .transpose()? .unwrap_or(u64::MAX); let mut reader = reader.take(max_car_v1_bytes); let header_v1 = read_v1_header(&mut reader).await?; @@ -196,8 +204,14 @@ impl CarStream { { // Skip the F3 block in the block stream if metadata.f3_data.is_some() { - let len: usize = reader.read_varint_async().await?; - reader = skip_bytes(reader, len as _).await?; + let len: u64 = reader.read_varint_async().await?; + if len > MAX_FRAME_LEN as u64 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("f3 block frame length too large: {len} > {MAX_FRAME_LEN}"), + )); + } + reader = skip_bytes(reader, len).await?; } // Skip the metadata block in the block stream @@ -372,6 +386,12 @@ async fn read_frame( reader: &mut ReaderT, ) -> std::io::Result>> { let len: usize = match reader.read_varint_async().await { + Ok(len) if len > MAX_FRAME_LEN => { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("frame too large: {len} > {MAX_FRAME_LEN}"), + )); + } Ok(len) => len, Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None), Err(e) => return Err(e), @@ -398,10 +418,8 @@ async fn read_car_block( } pub fn uvi_bytes() -> UviBytes { - // 8GiB - const MAX_LEN: usize = 8 * 1024 * 1024 * 1024; let mut decoder = UviBytes::default(); - decoder.set_max_len(MAX_LEN); + decoder.set_max_len(MAX_FRAME_LEN); decoder } diff --git a/src/utils/io/mod.rs b/src/utils/io/mod.rs index 0fc45836f3c2..8ff9f6a67289 100644 --- a/src/utils/io/mod.rs +++ b/src/utils/io/mod.rs @@ -48,8 +48,15 @@ pub fn create_new_sensitive_file(path: &Path) -> Result { pub async fn skip_bytes(reader: T, n: u64) -> std::io::Result { use tokio::io::AsyncReadExt as _; let mut take = reader.take(n); - tokio::io::copy(&mut take, &mut tokio::io::sink()).await?; - Ok(take.into_inner()) + let n_skipped = tokio::io::copy(&mut take, &mut tokio::io::sink()).await?; + if n == n_skipped { + Ok(take.into_inner()) + } else { + Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + format!("{n_skipped} bytes skipped, {n} expected"), + )) + } } /// Converts a TOML file represented as a string to `S` From 0c8c24c89d4a8c432c97822cbbcb7719ed7eb414 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 16 Oct 2025 18:31:47 +0800 Subject: [PATCH 3/7] rename --no-validation to --skip-validation --- src/tool/subcommands/db_cmd.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/tool/subcommands/db_cmd.rs b/src/tool/subcommands/db_cmd.rs index 1ac406e4e4e5..95e96038ac76 100644 --- a/src/tool/subcommands/db_cmd.rs +++ b/src/tool/subcommands/db_cmd.rs @@ -49,9 +49,9 @@ pub enum DBCommands { /// Optional path to the database folder that powers a Forest node #[arg(long)] db: Option, - /// No block validation + /// Skip block validation #[arg(long)] - no_validation: bool, + skip_validation: bool, }, } @@ -104,7 +104,7 @@ impl DBCommands { snapshot_files, chain, db, - no_validation, + skip_validation: no_validation, } => { const DB_WRITE_BUFFER_CAPACITY: usize = 10000; From 46cf6a07a5e8528e68a123f290611b4033b0b452 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Fri, 17 Oct 2025 14:27:07 +0800 Subject: [PATCH 4/7] resolve comments --- src/tool/subcommands/db_cmd.rs | 6 +++--- src/utils/db/car_stream.rs | 11 ++--------- 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/src/tool/subcommands/db_cmd.rs b/src/tool/subcommands/db_cmd.rs index 95e96038ac76..d77b1452340b 100644 --- a/src/tool/subcommands/db_cmd.rs +++ b/src/tool/subcommands/db_cmd.rs @@ -108,15 +108,15 @@ impl DBCommands { } => { const DB_WRITE_BUFFER_CAPACITY: usize = 10000; - let db_root = if let Some(db) = db { + let db_root_path = if let Some(db) = db { db } else { let (_, config) = read_config(None, Some(chain.clone()))?; db_root(&chain_path(&config))? }; - println!("Opening parity-db at {}", db_root.display()); + println!("Opening parity-db at {}", db_root_path.display()); let db_writer = BlockstoreWithWriteBuffer::new_with_capacity( - open_db(db_root, &Default::default())?, + open_db(db_root_path, &Default::default())?, DB_WRITE_BUFFER_CAPACITY, ); diff --git a/src/utils/db/car_stream.rs b/src/utils/db/car_stream.rs index 972d7b7669c2..4308587d0cdf 100644 --- a/src/utils/db/car_stream.rs +++ b/src/utils/db/car_stream.rs @@ -397,15 +397,8 @@ async fn read_frame( Err(e) => return Err(e), }; let mut bytes = vec![0; len]; - let n = reader.read_exact(&mut bytes[..]).await?; - if n == len { - Ok(Some(bytes)) - } else { - Err(std::io::Error::new( - std::io::ErrorKind::UnexpectedEof, - format!("{len} expected, {n} read"), - )) - } + reader.read_exact(&mut bytes[..]).await?; + Ok(Some(bytes)) } async fn read_car_block( From d46d5574b94311d10449a71ced26e534d8c17828 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Fri, 17 Oct 2025 14:34:51 +0800 Subject: [PATCH 5/7] separate MAX_FRAME_LEN and MAX_F3_FRAME_LEN --- src/utils/db/car_stream.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/utils/db/car_stream.rs b/src/utils/db/car_stream.rs index 4308587d0cdf..999b9332028f 100644 --- a/src/utils/db/car_stream.rs +++ b/src/utils/db/car_stream.rs @@ -29,8 +29,8 @@ use unsigned_varint::codec::UviBytes; use crate::utils::encoding::from_slice_with_fallback; -// 8GiB -const MAX_FRAME_LEN: usize = 8 * 1024 * 1024 * 1024; +// 512MiB +const MAX_FRAME_LEN: usize = 512 * 1024 * 1024; #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct CarV1Header { @@ -204,8 +204,10 @@ impl CarStream { { // Skip the F3 block in the block stream if metadata.f3_data.is_some() { + // 16GiB + const MAX_F3_FRAME_LEN: u64 = 16 * 1024 * 1024 * 1024; let len: u64 = reader.read_varint_async().await?; - if len > MAX_FRAME_LEN as u64 { + if len > MAX_F3_FRAME_LEN { return Err(io::Error::new( io::ErrorKind::InvalidData, format!("f3 block frame length too large: {len} > {MAX_FRAME_LEN}"), From 0b1e490c1f7d51ada97f0142a935d2d4a031d27c Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Fri, 17 Oct 2025 14:41:16 +0800 Subject: [PATCH 6/7] avoid casting f3_data_len from usize to u64 --- src/chain/mod.rs | 2 +- src/tool/subcommands/archive_cmd.rs | 2 +- src/utils/db/car_stream.rs | 22 ++++++++-------------- 3 files changed, 10 insertions(+), 16 deletions(-) diff --git a/src/chain/mod.rs b/src/chain/mod.rs index a1f16e031c00..193baf4b02a7 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -108,7 +108,7 @@ pub async fn export_v2( f3_data.seek(SeekFrom::Start(0))?; prefix_data_frames.push({ let mut encoder = forest::new_encoder(forest::DEFAULT_FOREST_CAR_COMPRESSION_LEVEL)?; - encoder.write_car_block(f3_cid, f3_data_len as _, &mut f3_data)?; + encoder.write_car_block(f3_cid, f3_data_len, &mut f3_data)?; anyhow::Ok(( vec![f3_cid], finalize_frame(forest::DEFAULT_FOREST_CAR_COMPRESSION_LEVEL, &mut encoder)?, diff --git a/src/tool/subcommands/archive_cmd.rs b/src/tool/subcommands/archive_cmd.rs index 27de4df3008e..789e16b1f44d 100644 --- a/src/tool/subcommands/archive_cmd.rs +++ b/src/tool/subcommands/archive_cmd.rs @@ -715,7 +715,7 @@ async fn merge_f3_snapshot(filecoin: PathBuf, f3: PathBuf, output: PathBuf) -> a crate::db::car::forest::new_encoder(DEFAULT_FOREST_CAR_COMPRESSION_LEVEL)?; let f3_data_len = f3_data.seek(SeekFrom::End(0))?; f3_data.seek(SeekFrom::Start(0))?; - encoder.write_car_block(f3_cid, f3_data_len as _, &mut f3_data)?; + encoder.write_car_block(f3_cid, f3_data_len, &mut f3_data)?; anyhow::Ok(( vec![f3_cid], crate::db::car::forest::finalize_frame( diff --git a/src/utils/db/car_stream.rs b/src/utils/db/car_stream.rs index 999b9332028f..abf4af020bef 100644 --- a/src/utils/db/car_stream.rs +++ b/src/utils/db/car_stream.rs @@ -59,7 +59,11 @@ pub struct CarBlock { impl CarBlock { // Write a varint frame containing the cid and the data pub fn write(&self, writer: &mut impl Write) -> io::Result<()> { - writer.write_car_block(self.cid, self.data.len(), &mut Cursor::new(&self.data)) + writer.write_car_block( + self.cid, + self.data.len() as u64, + &mut Cursor::new(&self.data), + ) } pub fn from_bytes(bytes: impl Into) -> io::Result { @@ -93,22 +97,12 @@ impl CarBlock { } pub trait CarBlockWrite { - fn write_car_block( - &mut self, - cid: Cid, - data_len: usize, - data: &mut impl Read, - ) -> io::Result<()>; + fn write_car_block(&mut self, cid: Cid, data_len: u64, data: &mut impl Read) -> io::Result<()>; } impl CarBlockWrite for T { - fn write_car_block( - &mut self, - cid: Cid, - data_len: usize, - data: &mut impl Read, - ) -> io::Result<()> { - let frame_length = cid.encoded_len() + data_len; + fn write_car_block(&mut self, cid: Cid, data_len: u64, data: &mut impl Read) -> io::Result<()> { + let frame_length = cid.encoded_len() as u64 + data_len; self.write_all(&frame_length.encode_var_vec())?; cid.write_bytes(&mut *self) .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; From 35dd565f0a31198570f5a3f3563aeb26f773994f Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Fri, 17 Oct 2025 15:47:10 +0800 Subject: [PATCH 7/7] resolve AI comments --- src/utils/db/car_stream.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/utils/db/car_stream.rs b/src/utils/db/car_stream.rs index abf4af020bef..165f3dbe00be 100644 --- a/src/utils/db/car_stream.rs +++ b/src/utils/db/car_stream.rs @@ -204,7 +204,9 @@ impl CarStream { if len > MAX_F3_FRAME_LEN { return Err(io::Error::new( io::ErrorKind::InvalidData, - format!("f3 block frame length too large: {len} > {MAX_FRAME_LEN}"), + format!( + "f3 block frame length too large: {len} > {MAX_F3_FRAME_LEN}" + ), )); } reader = skip_bytes(reader, len).await?;