diff --git a/Cargo.lock b/Cargo.lock index c5541df21f..a95123c9d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1518,7 +1518,7 @@ dependencies = [ "commonware-runtime", "commonware-storage", "commonware-utils", - "crc32fast", + "crc-fast", "criterion", "futures", "futures-util", @@ -1765,6 +1765,16 @@ dependencies = [ "libc", ] +[[package]] +name = "crc-fast" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e75b2483e97a5a7da73ac68a05b629f9c53cff58d8ed1c77866079e18b00dba5" +dependencies = [ + "digest 0.10.7", + "spin", +] + [[package]] name = "crc32fast" version = "1.5.0" @@ -4626,6 +4636,12 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "spin" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5fe4ccb98d9c292d56fec89a5e07da7fc4cf0dc11e156b41793132775d3e591" + [[package]] name = "spinning_top" version = "0.3.0" diff --git a/Cargo.toml b/Cargo.toml index 5f4d0b35f9..ef4ddf5c5b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -94,7 +94,7 @@ commonware-storage = { version = "0.0.64", path = "storage", default-features = commonware-stream = { version = "0.0.64", path = "stream" } commonware-utils = { version = "0.0.64", path = "utils", default-features = false } console-subscriber = "0.5.0" -crc32fast = "1.5.0" +crc-fast = "1.10.0" criterion = "0.7.0" crossterm = "0.29.0" ecdsa = { version = "0.16.9", default-features = false } diff --git a/storage/Cargo.toml b/storage/Cargo.toml index 8fea605001..e27b692bcd 100644 --- a/storage/Cargo.toml +++ b/storage/Cargo.toml @@ -23,7 +23,7 @@ commonware-cryptography = { workspace = true, default-features = false } commonware-macros.workspace = true commonware-runtime = { workspace = true, optional = true } commonware-utils = { workspace = true, default-features = false } -crc32fast = { workspace = true, optional = true } +crc-fast = { workspace = true, optional = true } futures = { workspace = true, optional = true } futures-util = { workspace = true, optional = true } prometheus-client = { workspace = true, optional = true } @@ -50,7 +50,7 @@ std = [ "commonware-cryptography/std", "commonware-runtime", "commonware-utils/std", - "crc32fast/std", + "crc-fast/std", "futures", "futures-util", "prometheus-client", diff --git a/storage/conformance.toml b/storage/conformance.toml index 9e4d9e6926..d442112b2d 100644 --- a/storage/conformance.toml +++ b/storage/conformance.toml @@ -40,11 +40,11 @@ hash = "b4f2eb51a158d964317fb1714dbe708ffbe6673453dc648eabdd72409fb30440" ["commonware_storage::journal::conformance::FixedJournal"] n_cases = 512 -hash = "9cd764e31b5dbc0bd78cd0908851ba1d645f083884beacd2c8a63f66de0fb9db" +hash = "cd54a3c986fd26f2692e344b80b2e83a13d55b1569536493c74863f09e3bc0b7" ["commonware_storage::journal::conformance::VariableJournal"] n_cases = 512 -hash = "c0af6899248693a3262f31b9a8554cd64c014d9b59f2514840c8828ad74ddf85" +hash = "923a5a5b32ffc608fa625e0176c2bea9955e212bf823640df7860e47b1a63ecd" ["commonware_storage::mmr::proof::tests::conformance::CodecConformance>"] n_cases = 65536 @@ -52,7 +52,7 @@ hash = "e3e6735a810f1002164333013fbff442c91a690483e75fe7a78618a96d5afd62" ["commonware_storage::ordinal::storage::conformance::CodecConformance>"] n_cases = 65536 -hash = "07a88b442e9f86b5395a73584211cb6abbb58e51c6f3954b29095c56d77d370c" +hash = "f93b1dda40f6d9f1ccb3f05994be56189205dcb558551e056c2f3db03a79182d" ["commonware_storage::qmdb::any::operation::tests::conformance::CodecConformance>>"] n_cases = 65536 diff --git a/storage/src/crc32.rs b/storage/src/crc32.rs new file mode 100644 index 0000000000..61a43ba9f9 --- /dev/null +++ b/storage/src/crc32.rs @@ -0,0 +1,110 @@ +//! CRC32 checksum utilities. +//! +//! This module provides CRC32C checksum computation using the iSCSI polynomial +//! (0x1EDC6F41) as specified in RFC 3720. + +/// Size of a CRC32 checksum in bytes. +pub const SIZE: usize = 4; + +/// The CRC32 algorithm used (CRC32C/iSCSI/Castagnoli). +const ALGORITHM: crc_fast::CrcAlgorithm = crc_fast::CrcAlgorithm::Crc32Iscsi; + +/// Incremental CRC32 hasher for computing checksums over multiple data chunks. +pub struct Crc32 { + inner: crc_fast::Digest, +} + +impl Default for Crc32 { + fn default() -> Self { + Self::new() + } +} + +impl Crc32 { + /// Create a new incremental hasher. + #[inline] + pub fn new() -> Self { + Self { + inner: crc_fast::Digest::new(ALGORITHM), + } + } + + /// Add data to the checksum computation. + #[inline] + pub fn update(&mut self, data: &[u8]) { + self.inner.update(data); + } + + /// Finalize and return the checksum. + #[inline] + pub fn finalize(self) -> u32 { + self.inner.finalize() as u32 + } + + /// Compute a CRC32 checksum of the given data. + #[inline] + pub fn checksum(data: &[u8]) -> u32 { + crc_fast::checksum(ALGORITHM, data) as u32 + } +} + +#[cfg(test)] +mod tests { + use super::*; + + /// Test vectors from RFC 3720 Appendix B.4 "CRC Examples". + /// https://datatracker.ietf.org/doc/html/rfc3720#appendix-B.4 + #[test] + fn rfc3720_test_vectors() { + // 32 bytes of zeros -> CRC = aa 36 91 8a + assert_eq!(Crc32::checksum(&[0x00; 32]), 0x8A9136AA); + + // 32 bytes of 0xFF -> CRC = 43 ab a8 62 + assert_eq!(Crc32::checksum(&[0xFF; 32]), 0x62A8AB43); + + // 32 bytes ascending (0x00..0x1F) -> CRC = 4e 79 dd 46 + let ascending: Vec = (0x00..0x20).collect(); + assert_eq!(Crc32::checksum(&ascending), 0x46DD794E); + + // 32 bytes descending (0x1F..0x00) -> CRC = 5c db 3f 11 + let descending: Vec = (0x00..0x20).rev().collect(); + assert_eq!(Crc32::checksum(&descending), 0x113FDB5C); + + // iSCSI SCSI Read (10) Command PDU -> CRC = 56 3a 96 d9 + let iscsi_read_pdu: [u8; 48] = [ + 0x01, 0xc0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x14, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x14, + 0x00, 0x00, 0x00, 0x18, 0x28, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + ]; + assert_eq!(Crc32::checksum(&iscsi_read_pdu), 0xD9963A56); + } + + /// Check value from the CRC catalogue. + /// https://reveng.sourceforge.io/crc-catalogue/17plus.htm#crc.cat.crc-32c + #[test] + fn crc_catalogue_check_value() { + assert_eq!(Crc32::checksum(b"123456789"), 0xE3069283); + } + + #[test] + fn incremental_matches_oneshot() { + let data = b"The quick brown fox jumps over the lazy dog"; + + let oneshot = Crc32::checksum(data); + + // Chunked + let mut hasher = Crc32::new(); + hasher.update(&data[..10]); + hasher.update(&data[10..25]); + hasher.update(&data[25..]); + assert_eq!(hasher.finalize(), oneshot); + + // Byte-by-byte + let mut hasher = Crc32::new(); + for byte in data { + hasher.update(&[*byte]); + } + assert_eq!(hasher.finalize(), oneshot); + } +} diff --git a/storage/src/freezer/mod.rs b/storage/src/freezer/mod.rs index 7dda5392ae..1899b9b0bf 100644 --- a/storage/src/freezer/mod.rs +++ b/storage/src/freezer/mod.rs @@ -961,19 +961,22 @@ mod tests { .unwrap(); // Insert keys to trigger resize + // key0 -> entry 0, key2 -> entry 1 freezer.put(test_key("key0"), 0).await.unwrap(); - freezer.put(test_key("key1"), 1).await.unwrap(); + freezer.put(test_key("key2"), 1).await.unwrap(); freezer.sync().await.unwrap(); // should start resize // Verify resize started assert!(freezer.resizing().is_some()); // Insert during resize (to first entry) - freezer.put(test_key("key2"), 2).await.unwrap(); + // key6 -> entry 0 + freezer.put(test_key("key6"), 2).await.unwrap(); assert!(context.encode().contains("unnecessary_writes_total 1")); assert_eq!(freezer.resizable(), 3); // Insert another key (to unmodified entry) + // key3 -> entry 1 freezer.put(test_key("key3"), 3).await.unwrap(); assert!(context.encode().contains("unnecessary_writes_total 1")); assert_eq!(freezer.resizable(), 3); @@ -984,17 +987,21 @@ mod tests { assert_eq!(freezer.resizable(), 2); // More inserts + // key4 -> entry 1, key7 -> entry 0 freezer.put(test_key("key4"), 4).await.unwrap(); - freezer.put(test_key("key5"), 5).await.unwrap(); + freezer.put(test_key("key7"), 5).await.unwrap(); freezer.sync().await.unwrap(); // Another resize should've started assert!(freezer.resizing().is_some()); // Verify all can be retrieved during resize - for i in 0..6 { - let key = test_key(&format!("key{i}")); - assert_eq!(freezer.get(Identifier::Key(&key)).await.unwrap(), Some(i)); + let keys = ["key0", "key2", "key6", "key3", "key4", "key7"]; + for (i, k) in keys.iter().enumerate() { + assert_eq!( + freezer.get(Identifier::Key(&test_key(k))).await.unwrap(), + Some(i as i32) + ); } // Sync until resize completes @@ -1033,8 +1040,9 @@ mod tests { .unwrap(); // Insert keys to trigger resize + // key0 -> entry 0, key2 -> entry 1 freezer.put(test_key("key0"), 0).await.unwrap(); - freezer.put(test_key("key1"), 1).await.unwrap(); + freezer.put(test_key("key2"), 1).await.unwrap(); let checkpoint = freezer.sync().await.unwrap(); // Verify resize started diff --git a/storage/src/freezer/storage.rs b/storage/src/freezer/storage.rs index 61a47e451b..8f56393dcf 100644 --- a/storage/src/freezer/storage.rs +++ b/storage/src/freezer/storage.rs @@ -1,7 +1,8 @@ use super::{Config, Error, Identifier}; use crate::{ + crc32, journal::segmented::variable::{Config as JournalConfig, Journal}, - kv, Persistable, + kv, Crc32, Persistable, }; use bytes::{Buf, BufMut}; use commonware_codec::{Codec, Encode, EncodeSize, FixedSize, Read, ReadExt, Write as CodecWrite}; @@ -187,7 +188,7 @@ impl Entry { /// Compute a checksum for [Entry]. fn compute_crc(epoch: u64, section: u64, offset: u32, added: u8) -> u32 { - let mut hasher = crc32fast::Hasher::new(); + let mut hasher = Crc32::new(); hasher.update(&epoch.to_be_bytes()); hasher.update(§ion.to_be_bytes()); hasher.update(&offset.to_be_bytes()); @@ -218,7 +219,7 @@ impl Entry { } impl FixedSize for Entry { - const SIZE: usize = u64::SIZE + u64::SIZE + u32::SIZE + u8::SIZE + u32::SIZE; + const SIZE: usize = u64::SIZE + u64::SIZE + u32::SIZE + u8::SIZE + crc32::SIZE; } impl CodecWrite for Entry { @@ -721,7 +722,7 @@ impl Freezer { /// /// To determine the appropriate entry, we AND the key's hash with the current table size. fn table_index(&self, key: &K) -> u32 { - let hash = crc32fast::hash(key.as_ref()); + let hash = Crc32::checksum(key.as_ref()); hash & (self.table_size - 1) } diff --git a/storage/src/journal/contiguous/fixed.rs b/storage/src/journal/contiguous/fixed.rs index f256756b7d..03625af5af 100644 --- a/storage/src/journal/contiguous/fixed.rs +++ b/storage/src/journal/contiguous/fixed.rs @@ -56,11 +56,12 @@ //! The `replay` method supports fast reading of all unpruned items into memory. use crate::{ + crc32, journal::{contiguous::MutableContiguous, Error}, - Persistable, + Crc32, Persistable, }; use bytes::BufMut; -use commonware_codec::{CodecFixed, DecodeExt as _, FixedSize}; +use commonware_codec::{CodecFixed, DecodeExt as _}; use commonware_runtime::{ buffer::{Append, PoolRef, Read}, telemetry::metrics::status::GaugeExt, @@ -137,7 +138,7 @@ pub struct Journal { } impl> Journal { - pub(crate) const CHUNK_SIZE: usize = u32::SIZE + A::SIZE; + pub(crate) const CHUNK_SIZE: usize = crc32::SIZE + A::SIZE; pub(crate) const CHUNK_SIZE_U64: u64 = Self::CHUNK_SIZE as u64; /// Initialize a new `Journal` instance. @@ -334,7 +335,7 @@ impl> Journal { assert_eq!(size % Self::CHUNK_SIZE_U64, 0); let mut buf: Vec = Vec::with_capacity(Self::CHUNK_SIZE); let item = item.encode(); - let checksum = crc32fast::hash(&item); + let checksum = Crc32::checksum(&item); buf.extend_from_slice(&item); buf.put_u32(checksum); @@ -469,7 +470,7 @@ impl> Journal { /// Error::Codec likely indicates a logic error rather than a corruption issue. fn verify_integrity(buf: &[u8]) -> Result { let stored_checksum = u32::from_be_bytes(buf[A::SIZE..].try_into().unwrap()); - let checksum = crc32fast::hash(&buf[..A::SIZE]); + let checksum = Crc32::checksum(&buf[..A::SIZE]); if checksum != stored_checksum { return Err(Error::ChecksumMismatch(stored_checksum, checksum)); } @@ -695,6 +696,7 @@ impl> Persistable for Journal { pub(crate) const ITEM_ALIGNMENT: u64 = 16; -/// Minimum size of any item: 1 byte varint (size=0) + 0 bytes data + 4 bytes checksum. +/// Minimum size of any item: 1 byte varint (size=0) + 0 bytes data + crc32::SIZE bytes checksum. /// This is also the max varint size for u32, so we can always read this many bytes /// at the start of an item to get the complete varint. -const MIN_ITEM_SIZE: usize = 5; +const MIN_ITEM_SIZE: usize = 1 + crc32::SIZE; /// Computes the next offset for an item using the underlying `u64` /// offset of `Blob`. @@ -239,7 +239,7 @@ impl Journal { offset: u32, ) -> Result<(u32, u32, V), Error> { // Read varint size (max 5 bytes for u32) - let mut hasher = crc32fast::Hasher::new(); + let mut hasher = Crc32::new(); let offset = offset as u64 * ITEM_ALIGNMENT; let varint_buf = blob.read_at(vec![0; MIN_ITEM_SIZE], offset).await?; let mut varint = varint_buf.as_ref(); @@ -251,7 +251,7 @@ impl Journal { .ok_or(Error::OffsetOverflow)?; // Read remaining - let buf_size = size.checked_add(4).ok_or(Error::OffsetOverflow)?; + let buf_size = size.checked_add(crc32::SIZE).ok_or(Error::OffsetOverflow)?; let buf = blob.read_at(vec![0u8; buf_size], offset).await?; let buf = buf.as_ref(); let offset = offset @@ -301,7 +301,7 @@ impl Journal { } // Read varint size (max 5 bytes for u32, and min item size is 5 bytes) - let mut hasher = crc32fast::Hasher::new(); + let mut hasher = Crc32::new(); let mut varint_buf = [0u8; MIN_ITEM_SIZE]; reader .read_exact(&mut varint_buf, MIN_ITEM_SIZE) @@ -526,7 +526,7 @@ impl Journal { Err(_) => return Err(Error::ItemTooLarge(item_len)), }; let size_len = UInt(item_len).encode_size(); - let entry_len = size_len + item_len as usize + 4; + let entry_len = size_len + item_len as usize + crc32::SIZE; // Get existing blob or create new one let blob = match self.blobs.entry(section) { @@ -566,7 +566,7 @@ impl Journal { buf.put_slice(&encoded); // Calculate checksum only for the entry data (without padding) - let checksum = crc32fast::hash(&buf[entry_start..]); + let checksum = Crc32::checksum(&buf[entry_start..]); buf.put_u32(checksum); assert_eq!(buf[entry_start..].len(), entry_len); @@ -2135,7 +2135,7 @@ mod tests { let digest = Sha256::hash(buf.as_ref()); assert_eq!( hex(&digest), - "f55bf27a59118603466fcf6a507ab012eea4cb2d6bdd06ce8f515513729af847", + "dd945d1b3d7058730f79c7eabecad6dac42ce737ed1c367ea0f4cf71e74970e7", ); }); } diff --git a/storage/src/lib.rs b/storage/src/lib.rs index 08c6a27a5a..cafcc3ff23 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -16,6 +16,8 @@ pub mod mmr; cfg_if::cfg_if! { if #[cfg(feature = "std")] { + pub mod crc32; + pub use crc32::Crc32; pub mod qmdb; pub mod archive; mod bitmap; diff --git a/storage/src/metadata/storage.rs b/storage/src/metadata/storage.rs index 08f7f64ed9..7a12233e31 100644 --- a/storage/src/metadata/storage.rs +++ b/storage/src/metadata/storage.rs @@ -1,4 +1,5 @@ use super::{Config, Error}; +use crate::{crc32, Crc32}; use bytes::BufMut; use commonware_codec::{Codec, FixedSize, ReadExt}; use commonware_runtime::{ @@ -151,8 +152,8 @@ impl Metadata { // Verify integrity. // - // 8 bytes for version + 4 bytes for checksum. - if buf.len() < 12 { + // 8 bytes for version + crc32::SIZE bytes for checksum. + if buf.len() < 8 + crc32::SIZE { // Truncate and return none warn!( blob = index, @@ -165,10 +166,10 @@ impl Metadata { } // Extract checksum - let checksum_index = buf.len() - 4; + let checksum_index = buf.len() - crc32::SIZE; let stored_checksum = u32::from_be_bytes(buf.as_ref()[checksum_index..].try_into().unwrap()); - let computed_checksum = crc32fast::hash(&buf.as_ref()[..checksum_index]); + let computed_checksum = Crc32::checksum(&buf.as_ref()[..checksum_index]); if stored_checksum != computed_checksum { // Truncate and return none warn!( @@ -371,8 +372,8 @@ impl Metadata { writes.push(target.blob.write_at(version.as_slice().into(), 0)); // Update checksum - let checksum_index = target.data.len() - 4; - let checksum = crc32fast::hash(&target.data[..checksum_index]).to_be_bytes(); + let checksum_index = target.data.len() - crc32::SIZE; + let checksum = Crc32::checksum(&target.data[..checksum_index]).to_be_bytes(); target.data[checksum_index..].copy_from_slice(&checksum); writes.push( target @@ -402,7 +403,7 @@ impl Metadata { value.write(&mut next_data); lengths.insert(key.clone(), Info::new(start, value.encode_size())); } - next_data.put_u32(crc32fast::hash(&next_data[..])); + next_data.put_u32(Crc32::checksum(&next_data[..])); // Persist changes target.blob.write_at(next_data.clone(), 0).await?; diff --git a/storage/src/ordinal/mod.rs b/storage/src/ordinal/mod.rs index 193dc609c5..eb3bcf58d8 100644 --- a/storage/src/ordinal/mod.rs +++ b/storage/src/ordinal/mod.rs @@ -133,6 +133,7 @@ pub struct Config { #[cfg(test)] mod tests { use super::*; + use crate::Crc32; use bytes::{Buf, BufMut}; use commonware_codec::{FixedSize, Read, ReadExt, Write}; use commonware_macros::{test_group, test_traced}; @@ -911,7 +912,7 @@ mod tests { // Write a valid record after the zeros let mut valid_record = vec![44u8; 32]; - let crc = crc32fast::hash(&valid_record); + let crc = Crc32::checksum(&valid_record); valid_record.extend_from_slice(&crc.to_be_bytes()); blob.write_at(valid_record, 36 * 5).await.unwrap(); diff --git a/storage/src/ordinal/storage.rs b/storage/src/ordinal/storage.rs index e58d3c621e..cb6bff927c 100644 --- a/storage/src/ordinal/storage.rs +++ b/storage/src/ordinal/storage.rs @@ -1,5 +1,5 @@ use super::{Config, Error}; -use crate::{kv, rmap::RMap, Persistable}; +use crate::{crc32, kv, rmap::RMap, Crc32, Persistable}; use bytes::{Buf, BufMut}; use commonware_codec::{CodecFixed, Encode, FixedSize, Read, ReadExt, Write as CodecWrite}; use commonware_runtime::{ @@ -24,17 +24,17 @@ struct Record> { impl> Record { fn new(value: V) -> Self { - let crc = crc32fast::hash(&value.encode()); + let crc = Crc32::checksum(&value.encode()); Self { value, crc } } fn is_valid(&self) -> bool { - self.crc == crc32fast::hash(&self.value.encode()) + self.crc == Crc32::checksum(&self.value.encode()) } } impl> FixedSize for Record { - const SIZE: usize = V::SIZE + u32::SIZE; + const SIZE: usize = V::SIZE + crc32::SIZE; } impl> CodecWrite for Record {