diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 00000000..b7a4f75f --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,44 @@ +# Changelog +All notable changes to this project will be documented in this file. + +The format is based on Keep a Changelog and this project adheres to +(or is loosely based on) Semantic Versioning. + +## [Unreleased] + +### Added +### Changed +### Deprecated +### Removed +### Fixed +### Security +### Migration +- If there are breaking changes, put a short, actionable checklist here. + +## [0.14.0-alpha] - 2024-09-08 +### Breaking +- Files written by 0.14.0-alpha use padded payload starts for fixed alignment. + Older readers (<= 0.13.x-alpha) may misinterpret pre-pad bytes as part of the + payload. Upgrade all readers/writers before mixing file versions. + +### Added +- Fixed payload alignment for zero-copy typed views. Payloads now begin + at an address that is a multiple of `PAYLOAD_ALIGNMENT`, configured in + `src/storage_engine/constants.rs` via: + - `PAYLOAD_ALIGN_LOG2` + - `PAYLOAD_ALIGNMENT = 1 << PAYLOAD_ALIGN_LOG2` +- Experimental `arrow` feature which exposes `as_arrow_buffer` and `into_arrow_buffer` + methods in `EntryHandle`. + +### Changed +- Internal on-disk layout: each non-tombstone payload may be preceded by + a small zero pre-pad (0..A-1 bytes) to satisfy alignment (A is the + configured alignment). Public API is unchanged. + +### Migration +- Regenerate stores with the new version: + 1) Open the old store with the matching old binary and read entries. + 2) Write each entry into a new 0.14.0-alpha store. + 3) Replace the old file after verification. +- If you maintain separate services, deploy reader upgrades before + writer upgrades to avoid mixed-version reads. diff --git a/Cargo.lock b/Cargo.lock index 05c60b01..c7d68453 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,20 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" +[[package]] +name = "ahash" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" +dependencies = [ + "cfg-if", + "const-random", + "getrandom 0.3.1", + "once_cell", + "version_check", + "zerocopy 0.8.25", +] + [[package]] name = "aho-corasick" version = "1.1.3" @@ -103,6 +117,160 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" +[[package]] +name = "arrow" +version = "56.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c26b57282a08ae92f727497805122fec964c6245cfa0e13f0e75452eaf3bc41f" +dependencies = [ + "arrow-arith", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-ord", + "arrow-row", + "arrow-schema", + "arrow-select", + "arrow-string", +] + +[[package]] +name = "arrow-arith" +version = "56.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cebf38ca279120ff522f4954b81a39527425b6e9f615e6b72842f4de1ffe02b8" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "num", +] + +[[package]] +name = "arrow-array" +version = "56.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "744109142cdf8e7b02795e240e20756c2a782ac9180d4992802954a8f871c0de" +dependencies = [ + "ahash", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "half", + "hashbrown 0.15.4", + "num", +] + +[[package]] +name = "arrow-buffer" +version = "56.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "601bb103c4c374bcd1f62c66bcea67b42a2ee91a690486c37d4c180236f11ccc" +dependencies = [ + "bytes", + "half", + "num", +] + +[[package]] +name = "arrow-cast" +version = "56.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eed61d9d73eda8df9e3014843def37af3050b5080a9acbe108f045a316d5a0be" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "atoi", + "base64", + "chrono", + "half", + "lexical-core", + "num", + "ryu", +] + +[[package]] +name = "arrow-data" +version = "56.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43407f2c6ba2367f64d85d4603d6fb9c4b92ed79d2ffd21021b37efa96523e12" +dependencies = [ + "arrow-buffer", + "arrow-schema", + "half", + "num", +] + +[[package]] +name = "arrow-ord" +version = "56.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c142a147dceb59d057bad82400f1693847c80dca870d008bf7b91caf902810ae" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", +] + +[[package]] +name = "arrow-row" +version = "56.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dac6620667fccdab4204689ca173bd84a15de6bb6b756c3a8764d4d7d0c2fc04" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "half", +] + +[[package]] +name = "arrow-schema" +version = "56.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfa93af9ff2bb80de539e6eb2c1c8764abd0f4b73ffb0d7c82bf1f9868785e66" + +[[package]] +name = "arrow-select" +version = "56.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be8b2e0052cd20d36d64f32640b68a5ab54d805d24a473baee5d52017c85536c" +dependencies = [ + "ahash", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "num", +] + +[[package]] +name = "arrow-string" +version = "56.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2155e26e17f053c8975c546fc70cf19c00542f9abf43c23a88a46ef7204204f" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "memchr", + "num", + "regex", + "regex-syntax", +] + [[package]] name = "async-trait" version = "0.1.88" @@ -114,6 +282,15 @@ dependencies = [ "syn", ] +[[package]] +name = "atoi" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" +dependencies = [ + "num-traits", +] + [[package]] name = "autocfg" version = "1.4.0" @@ -254,9 +431,9 @@ checksum = "793db76d6187cd04dff33004d8e6c9cc4e05cd330500379d2394209271b4aeee" [[package]] name = "bytemuck" -version = "1.23.1" +version = "1.23.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c76a5792e44e4abe34d3abf15636779261d45a7450612059293d1d2cfc63422" +checksum = "3995eaeebcdf32f91f980d360f78732ddc061097ab4e39991ae7a6ace9194677" [[package]] name = "byteorder" @@ -378,6 +555,26 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" +[[package]] +name = "const-random" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" +dependencies = [ + "const-random-macro", +] + +[[package]] +name = "const-random-macro" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" +dependencies = [ + "getrandom 0.2.16", + "once_cell", + "tiny-keccak", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -654,6 +851,17 @@ dependencies = [ "version_check", ] +[[package]] +name = "getrandom" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592" +dependencies = [ + "cfg-if", + "libc", + "wasi 0.11.1+wasi-snapshot-preview1", +] + [[package]] name = "getrandom" version = "0.3.1" @@ -686,6 +894,7 @@ checksum = "459196ed295495a68f7d7fe1d84f6c4b7ff0e21fe3017b2f283c6fac3ad803c9" dependencies = [ "cfg-if", "crunchy", + "num-traits", ] [[package]] @@ -873,12 +1082,82 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" +[[package]] +name = "lexical-core" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b765c31809609075565a70b4b71402281283aeda7ecaf4818ac14a7b2ade8958" +dependencies = [ + "lexical-parse-float", + "lexical-parse-integer", + "lexical-util", + "lexical-write-float", + "lexical-write-integer", +] + +[[package]] +name = "lexical-parse-float" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de6f9cb01fb0b08060209a057c048fcbab8717b4c1ecd2eac66ebfe39a65b0f2" +dependencies = [ + "lexical-parse-integer", + "lexical-util", + "static_assertions", +] + +[[package]] +name = "lexical-parse-integer" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72207aae22fc0a121ba7b6d479e42cbfea549af1479c3f3a4f12c70dd66df12e" +dependencies = [ + "lexical-util", + "static_assertions", +] + +[[package]] +name = "lexical-util" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a82e24bf537fd24c177ffbbdc6ebcc8d54732c35b50a3f28cc3f4e4c949a0b3" +dependencies = [ + "static_assertions", +] + +[[package]] +name = "lexical-write-float" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5afc668a27f460fb45a81a757b6bf2f43c2d7e30cb5a2dcd3abf294c78d62bd" +dependencies = [ + "lexical-util", + "lexical-write-integer", + "static_assertions", +] + +[[package]] +name = "lexical-write-integer" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "629ddff1a914a836fb245616a7888b62903aae58fa771e1d83943035efa0f978" +dependencies = [ + "lexical-util", + "static_assertions", +] + [[package]] name = "libc" version = "0.2.174" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1171693293099992e19cddea4e8b849964e9846f4acee11b3948bcc337be8776" +[[package]] +name = "libm" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de" + [[package]] name = "linux-raw-sys" version = "0.9.2" @@ -1054,6 +1333,70 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "num" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35bd024e8b2ff75562e5f34e7f4905839deb4b22955ef5e73d2fea1b9813cb23" +dependencies = [ + "num-bigint", + "num-complex", + "num-integer", + "num-iter", + "num-rational", + "num-traits", +] + +[[package]] +name = "num-bigint" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" +dependencies = [ + "num-integer", + "num-traits", +] + +[[package]] +name = "num-complex" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73f88a1307638156682bada9d7604135552957b7818057dcef22705b4d509495" +dependencies = [ + "num-traits", +] + +[[package]] +name = "num-integer" +version = "0.1.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" +dependencies = [ + "num-traits", +] + +[[package]] +name = "num-iter" +version = "0.1.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1429034a0490724d0075ebb2bc9e875d6503c3cf69e235a8941aa757d83ef5bf" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-rational" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f83d14da390562dca69fc84082e73e548e1ad308d24accdedd2720017cb37824" +dependencies = [ + "num-bigint", + "num-integer", + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -1061,6 +1404,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" dependencies = [ "autocfg", + "libm", ] [[package]] @@ -1237,7 +1581,7 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" dependencies = [ - "getrandom", + "getrandom 0.3.1", ] [[package]] @@ -1475,10 +1819,11 @@ dependencies = [ [[package]] name = "simd-r-drive" -version = "0.12.0-alpha" +version = "0.14.0-alpha" dependencies = [ "async-trait", "bincode", + "bytemuck", "clap", "crc32fast", "criterion", @@ -1502,15 +1847,16 @@ dependencies = [ [[package]] name = "simd-r-drive-entry-handle" -version = "0.12.0-alpha" +version = "0.14.0-alpha" dependencies = [ + "arrow", "crc32fast", "memmap2", ] [[package]] name = "simd-r-drive-extensions" -version = "0.12.0-alpha" +version = "0.14.0-alpha" dependencies = [ "bincode", "doc-comment", @@ -1522,7 +1868,7 @@ dependencies = [ [[package]] name = "simd-r-drive-muxio-service-definition" -version = "0.12.0-alpha" +version = "0.14.0-alpha" dependencies = [ "bitcode", "muxio-rpc-service", @@ -1530,7 +1876,7 @@ dependencies = [ [[package]] name = "simd-r-drive-ws-client" -version = "0.12.0-alpha" +version = "0.14.0-alpha" dependencies = [ "async-trait", "muxio-rpc-service", @@ -1544,7 +1890,7 @@ dependencies = [ [[package]] name = "simd-r-drive-ws-server" -version = "0.12.0-alpha" +version = "0.14.0-alpha" dependencies = [ "clap", "indoc", @@ -1582,6 +1928,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "strsim" version = "0.11.1" @@ -1612,7 +1964,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "488960f40a3fd53d72c2a29a58722561dee8afdd175bd88e3db4677d7b2ba600" dependencies = [ "fastrand", - "getrandom", + "getrandom 0.3.1", "once_cell", "rustix", "windows-sys 0.59.0", @@ -1653,6 +2005,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + [[package]] name = "tinytemplate" version = "1.2.1" @@ -2171,19 +2532,3 @@ dependencies = [ "quote", "syn", ] - -[[patch.unused]] -name = "muxio-rpc-service" -version = "0.10.0-alpha" - -[[patch.unused]] -name = "muxio-rpc-service-caller" -version = "0.10.0-alpha" - -[[patch.unused]] -name = "muxio-tokio-rpc-client" -version = "0.10.0-alpha" - -[[patch.unused]] -name = "muxio-tokio-rpc-server" -version = "0.10.0-alpha" diff --git a/Cargo.toml b/Cargo.toml index 5be1a9df..5229a819 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [workspace.package] authors = ["Jeremy Harris "] -version = "0.12.0-alpha" +version = "0.14.0-alpha" edition = "2024" repository = "https://github.com/jzombie/rust-simd-r-drive" license = "Apache-2.0" @@ -35,6 +35,7 @@ xxhash-rust = { version = "0.8.15", features = ["xxh3", "const_xxh3"] } [dev-dependencies] bincode = { workspace = true } +bytemuck = "1.23.2" criterion = { workspace = true } futures = { workspace = true } rand = { workspace = true } @@ -50,6 +51,9 @@ default = [] expose-internal-api = [] parallel = ["rayon"] +# Proxy: when users enable simd-r-drive/arrow, it enables the dep's arrow. +arrow = ["simd-r-drive-entry-handle/arrow"] + [[bench]] name = "storage_benchmark" harness = false @@ -75,16 +79,17 @@ resolver = "2" [workspace.dependencies] # Intra-workspace crates -simd-r-drive = { path = ".", version = "0.12.0-alpha" } -simd-r-drive-entry-handle = { path = "./simd-r-drive-entry-handle", version = "0.12.0-alpha" } -simd-r-drive-ws-client = { path = "./experiments/simd-r-drive-ws-client", version = "0.12.0-alpha" } -simd-r-drive-muxio-service-definition = { path = "./experiments/simd-r-drive-muxio-service-definition", version = "0.12.0-alpha" } +simd-r-drive = { path = ".", version = "0.14.0-alpha" } +simd-r-drive-entry-handle = { path = "./simd-r-drive-entry-handle", version = "0.14.0-alpha" } +simd-r-drive-ws-client = { path = "./experiments/simd-r-drive-ws-client", version = "0.14.0-alpha" } +simd-r-drive-muxio-service-definition = { path = "./experiments/simd-r-drive-muxio-service-definition", version = "0.14.0-alpha" } muxio-tokio-rpc-client = "0.9.0-alpha" muxio-tokio-rpc-server = "0.9.0-alpha" muxio-rpc-service = "0.9.0-alpha" muxio-rpc-service-caller = "0.9.0-alpha" # Third-party crates (note, not all dependencies are used in the base drive) +arrow = { version = "56.1.0", default-features = false } async-trait = "0.1.88" bincode = "1.3.3" # TODO: Replace with `bitcode` bitcode = "0.6.6" diff --git a/README.md b/README.md index cc4bbcf9..e3d8caf6 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,7 @@ Can be used as a command line interface (CLI) app, or as a library in another ap ## Table of Contents - [Zero-Copy Memory-Mapped Access](#zero-copy-memory-mapped-access) +- [Fixed Payload Alignment (Zero-Copy Typed Slices)](#fixed-payload-alignment-zero-copy-typed-slices) - [Single-File Storage Container for Binary Data](#single-file-storage-container-for-binary-data) - [Nestable Storage (Recursive Embedding)](#nestable-storage-recursive-embedding) - [No Assumptions About Your Data or Resource-Wasting Serialization Overhead](#no-assumptions-about-your-data-or-resource-wasting-serialization-overhead) @@ -45,6 +46,14 @@ Unlike `FlatBuffers`, which also supports zero-copy reads but requires predefine Additionally, `SIMD R Drive` is designed to handle datasets larger than available RAM by leveraging memory mapping. The system transparently accesses only the necessary portions of the file, reducing memory pressure and enabling efficient storage operations on large-scale datasets. +## Fixed Payload Alignment (Zero-Copy Typed Slices) + +Every non-tombstone payload now starts at a fixed, power-of-two boundary (16 bytes by default, configurable). This guarantees that, when your payload length matches the element size, you can reinterpret bytes as typed slices (e.g., `&[u16]`, `&[u32]`, `&[u64]`, `&[u128]`) without copying. + +This change is transparent to the public API and works with all write modes, including streaming. The on-disk layout may include a few padding bytes per entry to maintain alignment. Tombstones are unaffected. + +Practical benefits include faster vectorized reads, simpler use of zero-copy helpers (e.g., casting libraries), and fewer fallback copies. If you need a stricter boundary for a target platform, adjust the [alignment constant](./src/storage_engine/constants.rs) and rebuild. + ## Single-File Storage Container for Binary Data - Stores any binary format without interpretation or modification. @@ -94,16 +103,32 @@ Think of it as a self-contained binary filesystem—capable of storing and retri -| Offset Range | Field | Size (Bytes) | Description | -|-------------------|-----------------|--------------|---------------------------------------------------| -| `0 → N` | **Payload** | `N` | Variable-length data | -| `N → N + 8` | **Key Hash** | `8` | 64-bit XXH3 hash of the key (fast lookups) | -| `N + 8 → N + 16` | **Prev Offset** | `8` | Absolute offset pointing to the previous version | -| `N + 16 → N + 20` | **Checksum** | `4` | 32-bit CRC32C checksum for integrity verification | +Aligned entry (non-tombstone): + +| Offset Range | Field | Size (Bytes) | Description | +|----------------|--------------------|--------------|-----------------------------------| +| `P .. P+pad` | Pre-Pad (optional) | `pad` | Zero bytes to align payload start | +| `P+pad .. N` | Payload | `N-(P+pad)` | Variable-length data | +| `N .. N+8` | Key Hash | `8` | 64-bit XXH3 key hash | +| `N+8 .. N+16` | Prev Offset | `8` | Absolute offset of previous tail | +| `N+16 .. N+20` | Checksum | `4` | CRC32C of payload | + +Where: +- `pad = (A - (prev_tail % A)) & (A - 1)`, `A = PAYLOAD_ALIGNMENT`. +- The next entry starts at `N + 20`. + +Tombstone (deletion marker): -**Per-Payload Size** +| Offset Range | Field | Size (Bytes) | Description | +|---------------|----------|--------------|------------------------| +| `T .. T+1` | Payload | `1` | Single byte `0x00` | +| `T+1 .. T+21` | Metadata | `20` | Key hash, prev, crc32c | -`N + 20` bytes, where `N` is the length of the payload. +Notes: +- Using the previous tail in `Prev Offset` lets us insert pre-pad while + keeping chain traversal unambiguous. +- Readers compute `payload_start = prev_offset + prepad_len(prev_offset)` + and use the current metadata position as `payload_end`. #### Storage Validation Chain diff --git a/experiments/bindings/python-ws-client/Cargo.lock b/experiments/bindings/python-ws-client/Cargo.lock index 0afc2677..2ff051d4 100644 --- a/experiments/bindings/python-ws-client/Cargo.lock +++ b/experiments/bindings/python-ws-client/Cargo.lock @@ -1048,7 +1048,7 @@ dependencies = [ [[package]] name = "simd-r-drive" -version = "0.12.0-alpha" +version = "0.14.0-alpha" dependencies = [ "async-trait", "clap", @@ -1064,7 +1064,7 @@ dependencies = [ [[package]] name = "simd-r-drive-entry-handle" -version = "0.12.0-alpha" +version = "0.14.0-alpha" dependencies = [ "crc32fast", "memmap2", @@ -1072,7 +1072,7 @@ dependencies = [ [[package]] name = "simd-r-drive-muxio-service-definition" -version = "0.12.0-alpha" +version = "0.14.0-alpha" dependencies = [ "bitcode", "muxio-rpc-service", @@ -1080,7 +1080,7 @@ dependencies = [ [[package]] name = "simd-r-drive-ws-client" -version = "0.12.0-alpha" +version = "0.14.0-alpha" dependencies = [ "async-trait", "muxio-rpc-service", diff --git a/experiments/bindings/python_(old_client)/README.md b/experiments/bindings/python_(old_client)/README.md index 4b17f3a8..0c932aad 100644 --- a/experiments/bindings/python_(old_client)/README.md +++ b/experiments/bindings/python_(old_client)/README.md @@ -50,12 +50,32 @@ Older versions (≤3.9) are explicitly skipped during wheel builds. -| Offset Range | Field | Size (Bytes) | Description | -|-------------------|-----------------|--------------|---------------------------------------------------| -| `0 → N` | **Payload** | `N` | Variable-length data | -| `N → N + 8` | **Key Hash** | `8` | 64-bit XXH3 hash of the key (fast lookups) | -| `N + 8 → N + 16` | **Prev Offset** | `8` | Absolute offset pointing to the previous version | -| `N + 16 → N + 20` | **Checksum** | `4` | 32-bit CRC32C checksum for integrity verification | +Aligned entry (non-tombstone): + +| Offset Range | Field | Size (Bytes) | Description | +|----------------|--------------------|--------------|-----------------------------------| +| `P .. P+pad` | Pre-Pad (optional) | `pad` | Zero bytes to align payload start | +| `P+pad .. N` | Payload | `N-(P+pad)` | Variable-length data | +| `N .. N+8` | Key Hash | `8` | 64-bit XXH3 key hash | +| `N+8 .. N+16` | Prev Offset | `8` | Absolute offset of previous tail | +| `N+16 .. N+20` | Checksum | `4` | CRC32C of payload | + +Where: +- `pad = (A - (prev_tail % A)) & (A - 1)`, `A = PAYLOAD_ALIGNMENT`. +- The next entry starts at `N + 20`. + +Tombstone (deletion marker): + +| Offset Range | Field | Size (Bytes) | Description | +|---------------|----------|--------------|------------------------| +| `T .. T+1` | Payload | `1` | Single byte `0x00` | +| `T+1 .. T+21` | Metadata | `20` | Key hash, prev, crc32c | + +Notes: +- Using the previous tail in `Prev Offset` lets us insert pre-pad while + keeping chain traversal unambiguous. +- Readers compute `payload_start = prev_offset + prepad_len(prev_offset)` + and use the current metadata position as `payload_end`. ## Installation diff --git a/experiments/bindings/python_(old_client)/pyproject.toml b/experiments/bindings/python_(old_client)/pyproject.toml index a0d97026..2ea15565 100644 --- a/experiments/bindings/python_(old_client)/pyproject.toml +++ b/experiments/bindings/python_(old_client)/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "simd-r-drive-py" -version = "0.12.0-alpha" +version = "0.14.0-alpha" description = "SIMD-optimized append-only schema-less storage engine. Key-based binary storage in a single-file storage container." repository = "https://github.com/jzombie/rust-simd-r-drive" license = "Apache-2.0" diff --git a/simd-r-drive-entry-handle/Cargo.toml b/simd-r-drive-entry-handle/Cargo.toml index 53dc18b7..1ed41726 100644 --- a/simd-r-drive-entry-handle/Cargo.toml +++ b/simd-r-drive-entry-handle/Cargo.toml @@ -11,8 +11,10 @@ keywords.workspace = true publish.workspace = true [dependencies] +arrow = { workspace = true, optional = true } crc32fast = { workspace = true } memmap2 = { workspace = true } [features] expose-internal-api = [] +arrow = ["dep:arrow"] diff --git a/simd-r-drive-entry-handle/src/entry_handle.rs b/simd-r-drive-entry-handle/src/entry_handle.rs index 54370094..905faf49 100644 --- a/simd-r-drive-entry-handle/src/entry_handle.rs +++ b/simd-r-drive-entry-handle/src/entry_handle.rs @@ -350,3 +350,84 @@ impl EntryHandle { &self.mmap_arc } } + +/// Zero-copy Arrow Buffer views over this entry. +/// +/// Safety: the pointer comes from an `Arc`, which keeps the underlying `Arc` alive. +#[cfg(feature = "arrow")] +impl EntryHandle { + /// View the payload as an Arrow `Buffer` without copying. + /// + /// Feature: `arrow` + /// + /// Returns a zero-copy `arrow::buffer::Buffer` whose contents point at + /// the same bytes as `self.as_slice()`. The returned `Buffer` captures + /// an `Arc` internally, which keeps the `Arc arrow::buffer::Buffer { + use arrow::buffer::Buffer; + use std::ptr::NonNull; + use std::sync::Arc; + + // Pointer to the start of the payload. + let ptr = NonNull::new(self.as_slice().as_ptr() as *mut u8).expect("non-null slice ptr"); + + // Owner keeps the mmap alive for the Buffer's lifetime. + unsafe { Buffer::from_custom_allocation(ptr, self.size(), Arc::new(self.clone())) } + } + + /// Convert this handle into an Arrow `Buffer` without copying. + /// + /// Feature: `arrow` + /// + /// Like [`as_arrow_buffer`](Self::as_arrow_buffer) but consumes `self` + /// to avoid one extra `Arc` clone. This is otherwise identical to the + /// borrowing variant and still performs zero copies of the payload. + /// + /// Safety + /// ------ + /// Same assumptions as [`as_arrow_buffer`](Self::as_arrow_buffer): + /// - Pointer is valid for `len` bytes and remains immutable while the + /// `Buffer` lives. + /// - Alignment is suitable for `u8`. + /// + /// Panics + /// ------ + /// See [`as_arrow_buffer`](Self::as_arrow_buffer). The check is + /// defensive and should never panic. + pub fn into_arrow_buffer(self) -> arrow::buffer::Buffer { + use arrow::buffer::Buffer; + use std::ptr::NonNull; + use std::sync::Arc; + + let len: usize = self.size(); + let ptr = NonNull::new(self.as_slice().as_ptr() as *mut u8).expect("non-null slice ptr"); + + // Move self into the owner to avoid an extra Arc bump later. + unsafe { Buffer::from_custom_allocation(ptr, len, Arc::new(self)) } + } +} + +impl AsRef<[u8]> for EntryHandle { + fn as_ref(&self) -> &[u8] { + self.as_slice() + } +} diff --git a/simd-r-drive-entry-handle/src/entry_metadata.rs b/simd-r-drive-entry-handle/src/entry_metadata.rs index 1a9509b9..a011a485 100644 --- a/simd-r-drive-entry-handle/src/entry_metadata.rs +++ b/simd-r-drive-entry-handle/src/entry_metadata.rs @@ -8,16 +8,32 @@ use crate::constants::*; /// /// ## Entry Storage Layout /// -/// Each entry consists of a **variable-sized payload** followed by a **fixed-size metadata block**. -/// The metadata is stored **at the end** of the entry to simplify sequential writes and enable -/// efficient recovery. +/// Aligned entry (non-tombstone): /// -/// - **Offset `0` → `N`**: **Payload** (variable-length data) -/// - **Offset `N` → `N + 8`**: **Key Hash** (64-bit XXH3 hash of the key, used for fast lookups) -/// - **Offset `N + 8` → `N + 16`**: **Prev Offset** (absolute file offset pointing to the previous version) -/// - **Offset `N + 16` → `N + 20`**: **Checksum** (full 32-bit CRC32C checksum for integrity verification) +/// | Offset Range | Field | Size (Bytes) | Description | +/// |----------------|--------------------|--------------|-----------------------------------| +/// | `P .. P+pad` | Pre-Pad (optional) | `pad` | Zero bytes to align payload start | +/// | `P+pad .. N` | Payload | `N-(P+pad)` | Variable-length data | +/// | `N .. N+8` | Key Hash | `8` | 64-bit XXH3 key hash | +/// | `N+8 .. N+16` | Prev Offset | `8` | Absolute offset of previous tail | +/// | `N+16 .. N+20` | Checksum | `4` | CRC32C of payload | /// -/// **Total Size**: `N + 20` bytes, where `N` is the length of the payload. +/// Where: +/// - `pad = (A - (prev_tail % A)) & (A - 1)`, `A = PAYLOAD_ALIGNMENT`. +/// - The next entry starts at `N + 20`. +/// +/// Tombstone (deletion marker): +/// +/// | Offset Range | Field | Size (Bytes) | Description | +/// |---------------|----------|--------------|------------------------| +/// | `T .. T+1` | Payload | `1` | Single byte `0x00` | +/// | `T+1 .. T+21` | Metadata | `20` | Key hash, prev, crc32c | +/// +/// Notes: +/// - Using the previous tail in `Prev Offset` lets us insert pre-pad while +/// keeping chain traversal unambiguous. +/// - Readers compute `payload_start = prev_offset + prepad_len(prev_offset)` +/// and use the current metadata position as `payload_end`. /// /// Storage Layout /// diff --git a/src/lib.rs b/src/lib.rs index 7cabbd4b..19f68867 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -116,7 +116,7 @@ //! //! ## Performance Considerations //! - **Use memory-mapped reads** for best performance. -//! - **Batch writes** to reduce file I/O overhead. +//! - **Batch reads and writes** to reduce file I/O overhead. //! - **Avoid unnecessary locks** to maximize concurrency. //! //! ## Safety Notes diff --git a/src/storage_engine.rs b/src/storage_engine.rs index d820fc5c..037e96fb 100644 --- a/src/storage_engine.rs +++ b/src/storage_engine.rs @@ -1,5 +1,5 @@ -mod constants; -pub use constants::NULL_BYTE; +pub mod constants; +pub use constants::*; mod data_store; pub use data_store::*; diff --git a/src/storage_engine/constants.rs b/src/storage_engine/constants.rs index a325595a..b6af64f2 100644 --- a/src/storage_engine/constants.rs +++ b/src/storage_engine/constants.rs @@ -1,6 +1,12 @@ pub use simd_r_drive_entry_handle::constants::*; -// Marker indicating a logically deleted entry in the storage +/// Marker indicating a logically deleted entry in the storage pub const NULL_BYTE: [u8; 1] = [0]; -pub const WRITE_STREAM_BUFFER_SIZE: usize = 64 * 1024; // 64 KB chunks +/// Stream copy chunk size. +pub const WRITE_STREAM_BUFFER_SIZE: usize = 64 * 1024; // 64 KB + +/// Fixed alignment (power of two) for the start of every payload. +/// 16 bytes covers u8/u16/u32/u64/u128 on mainstream targets. +pub const PAYLOAD_ALIGN_LOG2: u8 = 4; +pub const PAYLOAD_ALIGNMENT: u64 = 1 << PAYLOAD_ALIGN_LOG2; diff --git a/src/storage_engine/data_store.rs b/src/storage_engine/data_store.rs index cb7150ac..05000afc 100644 --- a/src/storage_engine/data_store.rs +++ b/src/storage_engine/data_store.rs @@ -31,11 +31,13 @@ pub struct DataStore { /// Provides a **consuming sequential** iterator over the valid entries. /// -/// This allows a `DataStore` to be consumed to produce a sequential iterator. -/// For non-consuming iteration, iterate over a reference (`&storage`). +/// This allows a `DataStore` to be consumed to produce a sequential +/// iterator. For non-consuming iteration, iterate over a reference +/// (`&storage`). /// /// The iterator produced is **sequential**. For parallel processing, -/// enable the `parallel` feature and use the `.par_iter_entries()` method instead. +/// enable the `parallel` feature and use the `.par_iter_entries()` +/// method instead. impl IntoIterator for DataStore { type Item = EntryHandle; type IntoIter = EntryIterator; @@ -48,7 +50,8 @@ impl IntoIterator for DataStore { impl From for DataStore { /// Creates an `DataStore` instance from a `PathBuf`. /// - /// This allows creating a storage instance **directly from a file path**. + /// This allows creating a storage instance **directly from a file + /// path**. /// /// # Panics: /// - If the file cannot be opened or mapped into memory. @@ -61,10 +64,12 @@ impl DataStore { /// Opens an **existing** or **new** append-only storage file. /// /// This function: - /// 1. **Opens the file** in read/write mode (creating it if necessary). + /// 1. **Opens the file** in read/write mode (creating it if + /// necessary). /// 2. **Maps the file** into memory using `mmap` for fast access. /// 3. **Recovers the valid chain**, ensuring **data integrity**. - /// 4. **Re-maps** the file after recovery to reflect the correct state. + /// 4. **Re-maps** the file after recovery to reflect the correct + /// state. /// 5. **Builds an in-memory index** for **fast key lookups**. /// /// # Parameters: @@ -110,32 +115,39 @@ impl DataStore { /// Opens an **existing** append-only storage file. /// - /// This function verifies that the file exists before attempting to open it. - /// If the file does not exist or is not a valid file, an error is returned. + /// This function verifies that the file exists before attempting to + /// open it. If the file does not exist or is not a valid file, an + /// error is returned. /// /// # Parameters: /// - `path`: The **file path** of the storage file. /// /// # Returns: - /// - `Ok(DataStore)`: A **new storage instance** if the file exists and can be opened. - /// - `Err(std::io::Error)`: If the file does not exist or is invalid. + /// - `Ok(DataStore)`: A **new storage instance** if the file exists + /// and can be opened. + /// - `Err(std::io::Error)`: If the file does not exist or is + /// invalid. /// /// # Notes: - /// - Unlike `open()`, this function **does not create** a new storage file if the - /// specified file does not exist. - /// - If the file is **missing** or is not a regular file, an error is returned. - /// - This is useful in scenarios where the caller needs to **ensure** that they are - /// working with an already existing storage file. + /// - Unlike `open()`, this function **does not create** a new + /// storage file if the specified file does not exist. + /// - If the file is **missing** or is not a regular file, an error + /// is returned. + /// - This is useful in scenarios where the caller needs to **ensure** + /// that they are working with an already existing storage file. pub fn open_existing(path: &Path) -> Result { verify_file_existence(path)?; Self::open(path) } - /// Workaround for directly opening in **append mode** causing permissions issues on Windows + /// Workaround for directly opening in **append mode** causing + /// permissions issues on Windows /// - /// The file is opened normally and the **cursor is moved to the end. + /// The file is opened normally and the **cursor is moved to the + /// end. /// - /// Unix family unaffected by this issue, but this standardizes their handling. + /// Unix family unaffected by this issue, but this standardizes + /// their handling. /// /// # Parameters: /// - `path`: The **file path** of the storage file. @@ -158,43 +170,54 @@ impl DataStore { unsafe { memmap2::MmapOptions::new().map(file.get_ref()) } } - /// Re-maps the storage file and updates the key index after a write operation. + /// Re-maps the storage file and updates the key index after a write + /// operation. /// /// This function performs two key tasks: - /// 1. **Re-maps the file (`mmap`)**: Ensures that newly written data is visible - /// to readers by creating a fresh memory-mapped view of the storage file. - /// 2. **Updates the key index**: Inserts new key hash-to-offset mappings into - /// the in-memory key index, ensuring efficient key lookups for future reads. + /// 1. **Re-maps the file (`mmap`)**: Ensures that newly written data + /// is visible to readers by creating a fresh memory-mapped view of + /// the storage file. + /// 2. **Updates the key index**: Inserts new key hash-to-offset + /// mappings into the in-memory key index, ensuring efficient key + /// lookups for future reads. /// /// # Parameters: - /// - `write_guard`: A locked reference to the `BufWriter`, ensuring that - /// writes are completed before remapping and indexing. - /// - `key_hash_offsets`: A slice of `(key_hash, tail_offset)` tuples containing - /// the latest key mappings to be added to the index. - /// - `tail_offset`: The **new absolute file offset** after the most recent write. - /// This represents the byte position where the next write operation should begin. - /// It is updated to reflect the latest valid data in the storage. + /// - `write_guard`: A locked reference to the `BufWriter`, + /// ensuring that writes are completed before remapping and + /// indexing. + /// - `key_hash_offsets`: A slice of `(key_hash, tail_offset)` + /// tuples containing the latest key mappings to be added to the + /// index. + /// - `tail_offset`: The **new absolute file offset** after the most + /// recent write. This represents the byte position where the next + /// write operation should begin. It is updated to reflect the + /// latest valid data in the storage. /// /// # Returns: /// - `Ok(())` if the reindexing process completes successfully. - /// - `Err(std::io::Error)` if file metadata retrieval, memory mapping, or - /// key index updates fail. + /// - `Err(std::io::Error)` if file metadata retrieval, memory + /// mapping, or key index updates fail. /// /// # Important: - /// - **The write operation must be flushed before calling `reindex`** to ensure - /// all pending writes are persisted and visible in the new memory-mapped file. - /// This prevents potential inconsistencies where written data is not reflected - /// in the remapped view. + /// - **The write operation must be flushed before calling + /// `reindex`** to ensure all pending writes are persisted and + /// visible in the new memory-mapped file. This prevents potential + /// inconsistencies where written data is not reflected in the + /// remapped view. /// /// # Safety: - /// - This function should be called **immediately after a write operation** - /// to ensure the file is in a consistent state before remapping. - /// - The function acquires locks on both the `mmap` and `key_indexer` - /// to prevent race conditions while updating shared structures. + /// - This function should be called **immediately after a write + /// operation** to ensure the file is in a consistent state before + /// remapping. + /// - The function acquires locks on both the `mmap` and + /// `key_indexer` to prevent race conditions while updating shared + /// structures. /// /// # Locks Acquired: - /// - `mmap` (`Mutex>`) is locked to update the memory-mapped file. - /// - `key_indexer` (`RwLock>`) is locked to modify key mappings. + /// - `mmap` (`Mutex>`) is locked to update the + /// memory-mapped file. + /// - `key_indexer` (`RwLock>`) is locked to + /// modify key mappings. fn reindex( &self, write_guard: &std::sync::RwLockWriteGuard<'_, BufWriter>, @@ -218,8 +241,8 @@ impl DataStore { } else { // Handle the Result from the new insert method if let Err(e) = key_indexer_guard.insert(*key_hash, *offset) { - // A collision was detected on write. The entire batch operation - // should fail to prevent an inconsistent state. + // A collision was detected on write. The entire batch + // operation should fail to prevent an inconsistent state. warn!("Write operation aborted due to hash collision: {}", e); return Err(std::io::Error::other(e)); } @@ -242,8 +265,8 @@ impl DataStore { /// Retrieves an iterator over all valid entries in the storage. /// - /// This iterator allows scanning the storage file and retrieving **only the most recent** - /// versions of each key. + /// This iterator allows scanning the storage file and retrieving + /// **only the most recent** versions of each key. /// /// # Returns: /// - An `EntryIterator` instance for iterating over valid entries. @@ -253,21 +276,24 @@ impl DataStore { EntryIterator::new(mmap_clone, tail_offset) } - /// Provides a parallel iterator over all valid entries in the storage. + /// Provides a parallel iterator over all valid entries in the + /// storage. /// - /// This method is only available when the `parallel` feature is enabled. - /// It leverages the Rayon crate to process entries across multiple threads, - /// which can be significantly faster for bulk operations on multi-core machines. + /// This method is only available when the `parallel` feature is + /// enabled. It leverages the Rayon crate to process entries across + /// multiple threads, which can be significantly faster for bulk + /// operations on multi-core machines. /// - /// The iterator is efficient, collecting only the necessary entry offsets first - /// and then constructing the `EntryHandle` objects in parallel. + /// The iterator is efficient, collecting only the necessary entry + /// offsets first and then constructing the `EntryHandle` objects in + /// parallel. /// /// # Returns /// - A Rayon `ParallelIterator` that yields `EntryHandle` items. #[cfg(feature = "parallel")] pub fn par_iter_entries(&self) -> impl ParallelIterator { - // First, acquire a read lock and collect all the packed offset values. - // This is a short, fast operation. + // First, acquire a read lock and collect all the packed offset + // values. This is a short, fast operation. let key_indexer_guard = self.key_indexer.read().unwrap(); let packed_values: Vec = key_indexer_guard.values().cloned().collect(); drop(key_indexer_guard); // Release the lock as soon as possible. @@ -275,47 +301,64 @@ impl DataStore { // Clone the mmap Arc once to be moved into the parallel iterator. let mmap_arc = self.get_mmap_arc(); - // Create a parallel iterator over the collected offsets. The `filter_map` - // operation is the part that will run in parallel across threads. + // Create a parallel iterator over the collected offsets. The + // `filter_map` operation is the part that will run in parallel + // across threads. packed_values.into_par_iter().filter_map(move |packed| { let (_tag, offset) = KeyIndexer::unpack(packed); let offset = offset as usize; - // This logic is a simplified, read-only version of `read_entry_with_context`. - // We perform basic bounds checks to ensure safety. + // This logic is a simplified, read-only version of + // `read_entry_with_context`. We perform basic bounds checks + // to ensure safety. if offset + METADATA_SIZE > mmap_arc.len() { return None; } let metadata_bytes = &mmap_arc[offset..offset + METADATA_SIZE]; let metadata = EntryMetadata::deserialize(metadata_bytes); - let entry_start = metadata.prev_offset as usize; + + // Derive aligned start from previous tail. For tombstones + // (single NULL byte without pre-pad), also support legacy + // no-prepad case. + let prev_tail = metadata.prev_offset; + let derived = prev_tail + Self::prepad_len(prev_tail) as u64; let entry_end = offset; + let mut entry_start = derived as usize; + + // Tombstone detection (no pre-pad case). + if entry_end > prev_tail as usize + && entry_end - prev_tail as usize == 1 + && mmap_arc[prev_tail as usize..entry_end] == NULL_BYTE + { + entry_start = prev_tail as usize; + } + if entry_start >= entry_end || entry_end > mmap_arc.len() { return None; } - // Important: We must filter out tombstone entries, which are marked - // by a single null byte payload. + // Filter out tombstones (single NULL byte region). if entry_end - entry_start == 1 && mmap_arc[entry_start..entry_end] == NULL_BYTE { return None; } - // If all checks pass, construct and return the EntryHandle. Some(EntryHandle { - mmap_arc: mmap_arc.clone(), // Each handle gets a clone of the Arc + mmap_arc: mmap_arc.clone(), range: entry_start..entry_end, metadata, }) }) } - /// Recovers the **latest valid chain** of entries from the storage file. + /// Recovers the **latest valid chain** of entries from the storage + /// file. /// - /// This function **scans backward** through the file, verifying that each entry - /// correctly references the previous offset. It determines the **last valid - /// storage position** to ensure data integrity. + /// This function **scans backward** through the file, verifying that + /// each entry correctly references the previous offset. It + /// determines the **last valid storage position** to ensure data + /// integrity. /// /// # How It Works: /// - Scans from the last written offset **backward**. @@ -342,7 +385,22 @@ impl DataStore { &mmap[metadata_offset as usize..(metadata_offset as usize + METADATA_SIZE)]; let metadata = EntryMetadata::deserialize(metadata_bytes); - let entry_start = metadata.prev_offset; + // Stored `prev_offset` is the **previous tail**. + let prev_tail = metadata.prev_offset; + + // Derive start; handle tombstone (no pre-pad) as a special + // case so chain length math stays correct. + let derived_start = prev_tail + Self::prepad_len(prev_tail) as u64; + let entry_end = metadata_offset; + + let entry_start = if entry_end > prev_tail + && entry_end - prev_tail == 1 + && mmap[prev_tail as usize..entry_end as usize] == NULL_BYTE + { + prev_tail + } else { + derived_start + }; if entry_start >= metadata_offset { cursor -= 1; @@ -350,7 +408,8 @@ impl DataStore { } let mut chain_valid = true; - let mut back_cursor = entry_start; + let mut back_cursor = prev_tail; // walk by tails + // size of current entry data region let mut total_size = (metadata_offset - entry_start) + METADATA_SIZE as u64; while back_cursor != 0 { @@ -360,18 +419,42 @@ impl DataStore { } let prev_metadata_offset = back_cursor - METADATA_SIZE as u64; + if prev_metadata_offset as usize + METADATA_SIZE > mmap.len() { + chain_valid = false; + break; + } + let prev_metadata_bytes = &mmap[prev_metadata_offset as usize ..(prev_metadata_offset as usize + METADATA_SIZE)]; let prev_metadata = EntryMetadata::deserialize(prev_metadata_bytes); - let entry_size = prev_metadata_offset.saturating_sub(prev_metadata.prev_offset); + let prev_prev_tail = prev_metadata.prev_offset; + + // Size of the previous entry’s data region + let prev_entry_start = if prev_metadata_offset > prev_prev_tail + && prev_metadata_offset - prev_prev_tail == 1 + && mmap[prev_prev_tail as usize..prev_metadata_offset as usize] == NULL_BYTE + { + prev_prev_tail + } else { + prev_prev_tail + Self::prepad_len(prev_prev_tail) as u64 + }; + + if prev_entry_start >= prev_metadata_offset { + chain_valid = false; + break; + } + + let entry_size = prev_metadata_offset.saturating_sub(prev_entry_start); + total_size += entry_size + METADATA_SIZE as u64; - if prev_metadata.prev_offset >= prev_metadata_offset { + + if prev_prev_tail >= prev_metadata_offset { chain_valid = false; break; } - back_cursor = prev_metadata.prev_offset; + back_cursor = prev_prev_tail; } if chain_valid && back_cursor == 0 && total_size <= file_len { @@ -387,9 +470,10 @@ impl DataStore { /// Performs the core logic of reading an entry from the store. /// - /// This private helper centralizes the logic for both `read` and `batch_read`. - /// It takes all necessary context to perform a safe lookup, including the key, - /// its hash, the memory map, and a read guard for the key indexer. + /// This private helper centralizes the logic for both `read` and + /// `batch_read`. It takes all necessary context to perform a safe + /// lookup, including the key, its hash, the memory map, and a read + /// guard for the key indexer. /// /// # Parameters /// - `key`: The original key bytes used for tag verification. @@ -399,8 +483,8 @@ impl DataStore { /// /// # Returns /// - `Some(EntryHandle)` if the key is found and all checks pass. - /// - `None` if the key is not found, a tag mismatch occurs (collision/corruption), - /// or the entry is a tombstone. + /// - `None` if the key is not found, a tag mismatch occurs + /// (collision/corruption), or the entry is a tombstone. #[inline] fn read_entry_with_context<'a>( &self, @@ -417,7 +501,8 @@ impl DataStore { && tag != KeyIndexer::tag_from_key(non_hashed_key) { warn!( - "Tag mismatch detected for `non_hashed_key`, likely a hash collision or index corruption." + "Tag mismatch detected for `non_hashed_key`, likely a \ + hash collision or index corruption." ); return None; } @@ -429,14 +514,27 @@ impl DataStore { let metadata_bytes = &mmap_arc[offset..offset + METADATA_SIZE]; let metadata = EntryMetadata::deserialize(metadata_bytes); - let entry_start = metadata.prev_offset as usize; + + // Derive aligned payload start from stored previous tail. Support + // tombstones (single NULL byte without pre-pad). + let prev_tail = metadata.prev_offset; + let derived = prev_tail + Self::prepad_len(prev_tail) as u64; let entry_end = offset; + let mut entry_start = derived as usize; + + if entry_end > prev_tail as usize + && entry_end - prev_tail as usize == 1 + && mmap_arc[prev_tail as usize..entry_end] == NULL_BYTE + { + entry_start = prev_tail as usize; + } + if entry_start >= entry_end || entry_end > mmap_arc.len() { return None; } - // Check for tombstone (deleted entry) + // Tombstone (single null byte) if entry_end - entry_start == 1 && mmap_arc[entry_start..entry_end] == NULL_BYTE { return None; } @@ -456,38 +554,133 @@ impl DataStore { /// /// # Parameters: /// - `entry`: The **entry handle** to be copied. - /// - `target`: The **destination storage** where the entry should be copied. + /// - `target`: The **destination storage** where the entry should be + /// copied. /// /// # Returns: - /// - `Ok(target_offset)`: The file offset where the copied entry was written. + /// - `Ok(target_offset)`: The file offset where the copied entry was + /// written. /// - `Err(std::io::Error)`: If a write operation fails. /// /// # Notes: - /// - This is a **low-level function** used by `copy` and related operations. + /// - This is a **low-level function** used by `copy` and related + /// operations. /// - The `entry` remains **unchanged** in the original storage. fn copy_handle(&self, entry: &EntryHandle, target: &DataStore) -> Result { let mut entry_stream = EntryStream::from(entry.clone_arc()); target.write_stream_with_key_hash(entry.key_hash(), &mut entry_stream) } - // TODO: Determine thread count *before* running this OR [somehow] make it thread safe. - /// Compacts the storage by keeping only the latest version of each key. + /// Estimates the potential space savings from compaction. + /// + /// This method scans the storage file and calculates the difference + /// between the total file size and the size required to keep only + /// the latest versions of all keys. + /// + /// # How It Works: + /// - Iterates through the entries, tracking the **latest version** of + /// each key. + /// - Ignores older versions of keys to estimate the **optimized** + /// storage footprint. + /// - Returns the **difference** between the total file size and the + /// estimated compacted size. + pub fn estimate_compaction_savings(&self) -> u64 { + let total_size = self.file_size().unwrap_or(0); + let mut unique_entry_size: u64 = 0; + let mut seen_keys = HashSet::with_hasher(Xxh3BuildHasher); + + for entry in self.iter_entries() { + if seen_keys.insert(entry.key_hash()) { + unique_entry_size += entry.file_size() as u64; + } + } + total_size.saturating_sub(unique_entry_size) + } + + /// Provides access to the shared memory-mapped file (`Arc`) + /// for testing. + /// + /// This method returns a cloned `Arc`, allowing test cases to + /// inspect the memory-mapped region while ensuring reference + /// counting remains intact. + /// + /// # Notes: + /// - The returned `Arc` ensures safe access without + /// invalidating the mmap. + /// - This function is only available in **test** and **debug** + /// builds. + #[cfg(any(test, debug_assertions))] + pub fn get_mmap_arc_for_testing(&self) -> Arc { + self.get_mmap_arc() + } + + /// Provides direct access to the raw pointer of the underlying + /// memory map for testing. + /// + /// This method retrieves a raw pointer (`*const u8`) to the start of + /// the memory-mapped file. It is useful for validating zero-copy + /// behavior and memory alignment in test cases. + /// + /// # Safety Considerations: + /// - The pointer remains valid **as long as** the mmap is not + /// remapped or dropped. + /// - Dereferencing this pointer outside of controlled test + /// environments **is unsafe** and may result in undefined + /// behavior. + /// + /// # Notes: + /// - This function is only available in **test** and **debug** + /// builds. + #[cfg(any(test, debug_assertions))] + pub fn arc_ptr(&self) -> *const u8 { + self.get_mmap_arc().as_ptr() + } + + #[inline] + fn get_mmap_arc(&self) -> Arc { + let guard = self.mmap.lock().unwrap(); + let mmap_clone = guard.clone(); + drop(guard); + mmap_clone + } + + // --- Alignment helper ------------------------------------------------- + + /// Compute the number of pre-pad bytes required to align `offset` to + /// `PAYLOAD_ALIGNMENT`. `PAYLOAD_ALIGNMENT` is a power of two. + #[inline] + fn prepad_len(offset: u64) -> usize { + let a = PAYLOAD_ALIGNMENT; + ((a - (offset % a)) & (a - 1)) as usize + } + + // --------------------------------------------------------------------- + + // TODO: Determine thread count *before* running this OR [somehow] + // make it thread safe. + /// Compacts the storage by keeping only the latest version of each + /// key. /// /// # ⚠️ WARNING: - /// - **This function should only be used when a single thread is accessing the storage.** - /// - While `&mut self` prevents concurrent **mutations**, it **does not** prevent - /// other threads from holding shared references (`&DataStore`) and performing reads. - /// - If the `DataStore` instance is wrapped in `Arc`, multiple threads - /// may still hold **read** references while compaction is running, potentially - /// leading to inconsistent reads. - /// - If stricter concurrency control is required, **manual synchronization should - /// be enforced externally.** + /// - **This function should only be used when a single thread is + /// accessing the storage.** + /// - While `&mut self` prevents concurrent **mutations**, it **does + /// not** prevent other threads from holding shared references + /// (`&DataStore`) and performing reads. + /// - If the `DataStore` instance is wrapped in `Arc`, + /// multiple threads may still hold **read** references while + /// compaction is running, potentially leading to inconsistent + /// reads. + /// - If stricter concurrency control is required, **manual + /// synchronization should be enforced externally.** /// /// # Behavior: - /// - Creates a **temporary compacted file** containing only the latest versions - /// of stored keys. - /// - Swaps the original file with the compacted version upon success. - /// - Does **not** remove tombstone (deleted) entries due to the append-only model. + /// - Creates a **temporary compacted file** containing only the + /// latest versions of stored keys. + /// - Swaps the original file with the compacted version upon + /// success. + /// - Does **not** remove tombstone (deleted) entries due to the + /// append-only model. /// /// # Returns: /// - `Ok(())` if compaction completes successfully. @@ -509,13 +702,12 @@ impl DataStore { let size_before = self.file_size()?; - // Note: The current implementation should never increase space, but if an additional indexer - // is ever used, this may change. + // Note: The current implementation should never increase space, + // but if an additional indexer is ever used, this may change. // // Only write the static index if it actually saves space if size_before > compacted_data_size { info!("Compaction will save space. Writing static index."); - // let indexed_up_to = compacted_storage.tail_offset.load(Ordering::Acquire); let mut file_guard = compacted_storage .file @@ -524,7 +716,9 @@ impl DataStore { file_guard.flush()?; } else { info!( - "Compaction would increase file size (data w/ indexing: {compacted_data_size}). Skipping static index generation.", + "Compaction would increase file size \ + (data w/ indexing: {compacted_data_size}). \ + Skipping static index generation.", ); } @@ -535,67 +729,6 @@ impl DataStore { info!("Compaction file swap complete."); Ok(()) } - - /// Estimates the potential space savings from compaction. - /// - /// This method scans the storage file and calculates the difference - /// between the total file size and the size required to keep only - /// the latest versions of all keys. - /// - /// # How It Works: - /// - Iterates through the entries, tracking the **latest version** of each key. - /// - Ignores older versions of keys to estimate the **optimized** storage footprint. - /// - Returns the **difference** between the total file size and the estimated compacted size. - pub fn estimate_compaction_savings(&self) -> u64 { - let total_size = self.file_size().unwrap_or(0); - let mut unique_entry_size: u64 = 0; - let mut seen_keys = HashSet::with_hasher(Xxh3BuildHasher); - - for entry in self.iter_entries() { - if seen_keys.insert(entry.key_hash()) { - unique_entry_size += entry.file_size() as u64; - } - } - total_size.saturating_sub(unique_entry_size) - } - - /// Provides access to the shared memory-mapped file (`Arc`) for testing. - /// - /// This method returns a cloned `Arc`, allowing test cases to inspect - /// the memory-mapped region while ensuring reference counting remains intact. - /// - /// # Notes: - /// - The returned `Arc` ensures safe access without invalidating the mmap. - /// - This function is only available in **test** and **debug** builds. - #[cfg(any(test, debug_assertions))] - pub fn get_mmap_arc_for_testing(&self) -> Arc { - self.get_mmap_arc() - } - - /// Provides direct access to the raw pointer of the underlying memory map for testing. - /// - /// This method retrieves a raw pointer (`*const u8`) to the start of the memory-mapped file. - /// It is useful for validating zero-copy behavior and memory alignment in test cases. - /// - /// # Safety Considerations: - /// - The pointer remains valid **as long as** the mmap is not remapped or dropped. - /// - Dereferencing this pointer outside of controlled test environments **is unsafe** - /// and may result in undefined behavior. - /// - /// # Notes: - /// - This function is only available in **test** and **debug** builds. - #[cfg(any(test, debug_assertions))] - pub fn arc_ptr(&self) -> *const u8 { - self.get_mmap_arc().as_ptr() - } - - #[inline] - fn get_mmap_arc(&self) -> Arc { - let guard = self.mmap.lock().unwrap(); - let mmap_clone = guard.clone(); - drop(guard); - mmap_clone - } } impl DataStoreWriter for DataStore { @@ -609,7 +742,15 @@ impl DataStoreWriter for DataStore { .file .write() .map_err(|_| std::io::Error::other("Failed to acquire file lock"))?; - let prev_offset = self.tail_offset.load(Ordering::Acquire); + let prev_tail = self.tail_offset.load(Ordering::Acquire); + + // Pre-align payload start so typed views can be zero-copy. + let prepad = Self::prepad_len(prev_tail); + if prepad > 0 { + // Pad with zeros; not part of logical payload. + let pad = [0u8; 64]; + file.write_all(&pad[..prepad])?; + } let mut buffer = vec![0; WRITE_STREAM_BUFFER_SIZE]; let mut total_written = 0; @@ -637,16 +778,25 @@ impl DataStoreWriter for DataStore { )); } + if total_written == 0 { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Payload cannot be empty.", + )); + } + let checksum = checksum_state.finalize().to_le_bytes(); + // Link to previous tail; readers derive aligned start. let metadata = EntryMetadata { key_hash, - prev_offset, + prev_offset: prev_tail, checksum, }; file.write_all(&metadata.serialize())?; file.flush()?; - let tail_offset = prev_offset + total_written as u64 + METADATA_SIZE as u64; + let tail_offset = prev_tail + prepad as u64 + total_written as u64 + METADATA_SIZE as u64; + self.reindex( &file, &[(key_hash, tail_offset - METADATA_SIZE as u64)], @@ -665,7 +815,8 @@ impl DataStoreWriter for DataStore { self.batch_write_with_key_hashes(vec![(key_hash, payload)], false) } - // TODO: Consider change signature to: fn batch_write(&self, entries: Vec<(Vec, Vec)>) -> Result { + // TODO: Consider change signature to: + // fn batch_write(&self, entries: Vec<(Vec, Vec)>) -> Result fn batch_write(&self, entries: &[(&[u8], &[u8])]) -> Result { let (keys, payloads): (Vec<_>, Vec<_>) = entries.iter().cloned().unzip(); let hashes = compute_hash_batch(&keys); @@ -673,7 +824,8 @@ impl DataStoreWriter for DataStore { self.batch_write_with_key_hashes(hashed_entries, false) } - // TODO: Consider change `prehashed_keys: Vec<(u64, &[u8])>` to `prehashed_keys: Vec<(u64, Vec)>` + // TODO: Consider change `prehashed_keys: Vec<(u64, &[u8])>` to + // `prehashed_keys: Vec<(u64, Vec)>` fn batch_write_with_key_hashes( &self, prehashed_keys: Vec<(u64, &[u8])>, @@ -698,8 +850,33 @@ impl DataStoreWriter for DataStore { "NULL-byte payloads cannot be written directly.", )); } + // Tombstone: write 1 byte, no pre-pad. + let link_to_prev_tail = tail_offset; + + if payload.is_empty() { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Payload cannot be empty.", + )); + } + + let checksum = compute_checksum(payload); + let metadata = EntryMetadata { + key_hash, + prev_offset: link_to_prev_tail, + checksum, + }; + + let mut entry: Vec = vec![0u8; 1 + METADATA_SIZE]; + entry[0] = NULL_BYTE[0]; + entry[1..].copy_from_slice(&metadata.serialize()); + buffer.extend_from_slice(&entry); + + tail_offset += entry.len() as u64; + key_hash_offsets.push((key_hash, tail_offset - METADATA_SIZE as u64)); deleted_keys.insert(key_hash); + continue; } if payload.is_empty() { @@ -709,11 +886,19 @@ impl DataStoreWriter for DataStore { )); } - let prev_offset = tail_offset; + // Non-tombstone: pre-align current payload. + let link_to_prev_tail = tail_offset; + let prepad = Self::prepad_len(tail_offset); + if prepad > 0 { + let old_len = buffer.len(); + buffer.resize(old_len + prepad, 0u8); + tail_offset += prepad as u64; + } + let checksum = compute_checksum(payload); let metadata = EntryMetadata { key_hash, - prev_offset, + prev_offset: link_to_prev_tail, checksum, }; let payload_len = payload.len(); @@ -759,7 +944,8 @@ impl DataStoreWriter for DataStore { return Err(std::io::Error::new( std::io::ErrorKind::InvalidInput, format!( - "Cannot copy entry to the same storage ({:?}). Use `rename` instead.", + "Cannot copy entry to the same storage ({:?}). \ + Use `rename` instead.", self.path ), )); @@ -789,7 +975,8 @@ impl DataStoreWriter for DataStore { } fn batch_delete_key_hashes(&self, prehashed_keys: &[u64]) -> Result { - // First, check which keys actually exist to avoid writing useless tombstones. + // First, check which keys actually exist to avoid writing + // useless tombstones. let keys_to_delete: Vec = { let key_indexer_guard = self .key_indexer @@ -803,18 +990,18 @@ impl DataStoreWriter for DataStore { .collect() }; - // If no keys were found to delete, we can exit early without any file I/O. + // If no keys were found to delete, exit early without any I/O. if keys_to_delete.is_empty() { return Ok(self.tail_offset.load(Ordering::Acquire)); } - // Prepare the delete operations (a key hash + a null byte payload). + // Prepare the delete operations (a key hash + a null byte). let delete_ops: Vec<(u64, &[u8])> = keys_to_delete .iter() .map(|&key_hash| (key_hash, &NULL_BYTE as &[u8])) .collect(); - // Use the underlying batch write method, allowing null bytes for tombstones. + // Use the underlying batch write method, allowing null bytes. self.batch_write_with_key_hashes(delete_ops, true) } } @@ -827,7 +1014,8 @@ impl DataStoreReader for DataStore { } fn exists_with_key_hash(&self, prehashed_key: u64) -> Result { - // This is a lightweight wrapper around the read method, just like exists(). + // This is a lightweight wrapper around the read method, just + // like exists(). Ok(self.read_with_key_hash(prehashed_key)?.is_some()) } @@ -849,8 +1037,6 @@ impl DataStoreReader for DataStore { .map_err(|_| Error::other("key-index lock poisoned"))?; let mmap_arc = self.get_mmap_arc(); - // Call the core logic with `None` for the key, as we are only using the hash - // and want to skip the tag verification check. Ok(self.read_entry_with_context(None, prehashed_key, &mmap_arc, &key_indexer_guard)) } @@ -869,8 +1055,19 @@ impl DataStoreReader for DataStore { let metadata_bytes = &mmap_arc[metadata_offset..metadata_offset + METADATA_SIZE]; let metadata = EntryMetadata::deserialize(metadata_bytes); - let entry_start = metadata.prev_offset as usize; + // Derive aligned start; support tombstone no-prepad case. + let prev_tail = metadata.prev_offset; + let derived = prev_tail + Self::prepad_len(prev_tail) as u64; let entry_end = metadata_offset; + + let mut entry_start = derived as usize; + if entry_end > prev_tail as usize + && entry_end - prev_tail as usize == 1 + && mmap_arc[prev_tail as usize..entry_end] == NULL_BYTE + { + entry_start = prev_tail as usize; + } + if entry_start >= entry_end || entry_end > mmap_arc.len() { return Ok(None); } @@ -925,16 +1122,13 @@ impl DataStoreReader for DataStore { }) .collect() } - // Case 2: We only have the hashes and must skip tag verification. - None => { - prehashed_keys - .iter() - .map(|key_hash| { - // Correctly pass `None` as we don't have the original key - self.read_entry_with_context(None, *key_hash, &mmap_arc, &key_indexer_guard) - }) - .collect() - } + // Case 2: We only have the hashes and must skip tag check. + None => prehashed_keys + .iter() + .map(|key_hash| { + self.read_entry_with_context(None, *key_hash, &mmap_arc, &key_indexer_guard) + }) + .collect(), }; Ok(results) diff --git a/src/storage_engine/entry_iterator.rs b/src/storage_engine/entry_iterator.rs index acd0f384..5c57010a 100644 --- a/src/storage_engine/entry_iterator.rs +++ b/src/storage_engine/entry_iterator.rs @@ -13,7 +13,8 @@ use std::sync::Arc; /// offsets stored in each entry. /// /// ## Behavior: -/// - **Starts at `tail_offset`** and moves backward using the `prev_offset` field. +/// - **Starts at `tail_offset`** and moves backward using the +/// `prev_offset` field. /// - **Ensures unique keys** by tracking seen hashes in a `HashSet`. /// - **Skips deleted entries**, which are represented by empty data. /// - **Stops when reaching an invalid or out-of-bounds offset.** @@ -26,9 +27,10 @@ pub struct EntryIterator { impl EntryIterator { /// Creates a new iterator for scanning storage entries. /// - /// Initializes an iterator starting at the provided `tail_offset` and - /// moving backward through the storage file. The iterator ensures that - /// only the most recent version of each key is returned. + /// Initializes an iterator starting at the provided `tail_offset` + /// and moving backward through the storage file. The iterator + /// ensures that only the most recent version of each key is + /// returned. /// /// # Parameters: /// - `mmap`: A reference to the memory-mapped file. @@ -43,6 +45,12 @@ impl EntryIterator { seen_keys: HashSet::with_hasher(Xxh3BuildHasher), } } + + #[inline] + fn prepad_len(offset: u64) -> usize { + let a = PAYLOAD_ALIGNMENT; + ((a - (offset % a)) & (a - 1)) as usize + } } impl Iterator for EntryIterator { @@ -50,9 +58,10 @@ impl Iterator for EntryIterator { /// Advances the iterator to the next valid entry. /// - /// Reads and parses the metadata for the current entry, determines its - /// boundaries, and extracts its data. If the key has already been seen, - /// the iterator skips it to ensure that only the latest version is returned. + /// Reads and parses the metadata for the current entry, determines + /// its boundaries, and extracts its data. If the key has already + /// been seen, the iterator skips it to ensure that only the latest + /// version is returned. /// /// # Returns: /// - `Some(&[u8])` containing the entry data if valid. @@ -65,29 +74,47 @@ impl Iterator for EntryIterator { // Locate metadata at the current cursor position let metadata_offset = (self.cursor - METADATA_SIZE as u64) as usize; + if metadata_offset + METADATA_SIZE > self.mmap.len() { + return None; + } let metadata_bytes = &self.mmap[metadata_offset..metadata_offset + METADATA_SIZE]; let metadata = EntryMetadata::deserialize(metadata_bytes); - let entry_start = metadata.prev_offset as usize; + // Stored `prev_offset` is the **previous tail**. Derive the + // aligned payload start for regular values. For tombstones + // (single NULL byte), also support the no-prepad case. + let prev_tail = metadata.prev_offset; + let derived = prev_tail + Self::prepad_len(prev_tail) as u64; + let entry_end = metadata_offset; + let mut entry_start = derived as usize; + + // Tombstone (legacy, no-prepad). + if entry_end > prev_tail as usize + && entry_end - prev_tail as usize == 1 + && self.mmap[prev_tail as usize..entry_end] == NULL_BYTE + { + entry_start = prev_tail as usize; + } // Ensure valid entry bounds before reading if entry_start >= entry_end || entry_end > self.mmap.len() { return None; } - // Move cursor backward to follow the chain - self.cursor = metadata.prev_offset; // Move cursor backward + // Move cursor backward to follow the chain (by tails) + self.cursor = metadata.prev_offset; - // Skip duplicate keys (ensuring only the latest value is returned) + // Skip duplicate keys (ensuring only the latest value is + // returned) if !self.seen_keys.insert(metadata.key_hash) { return self.next(); // Skip if already seen } let entry_data = &self.mmap[entry_start..entry_end]; - // Skip deleted entries (denoted by empty data) - if entry_data == NULL_BYTE { + // Skip deleted entries (denoted by single null byte) + if entry_end - entry_start == 1 && entry_data == NULL_BYTE { return self.next(); } diff --git a/tests/alignment_tests.rs b/tests/alignment_tests.rs new file mode 100644 index 00000000..0b6cf353 --- /dev/null +++ b/tests/alignment_tests.rs @@ -0,0 +1,224 @@ +//! Verify fixed payload alignment across writes, deletes, overwrites. +//! Start with unaligned strings, then overwrite with aligned payloads. +//! Prove zero-copy typed views via bytemuck and do SIMD loads on +//! x86_64/aarch64. + +use std::mem::{align_of, size_of}; + +use tempfile::tempdir; + +use simd_r_drive::{ + DataStore, + storage_engine::constants::PAYLOAD_ALIGNMENT, + traits::{DataStoreReader, DataStoreWriter}, +}; + +use bytemuck::try_cast_slice; + +#[cfg(target_arch = "x86_64")] +use core::arch::x86_64::{__m128i, _mm_load_si128}; + +#[cfg(target_arch = "aarch64")] +use core::arch::aarch64::{uint8x16_t, vld1q_u8}; + +fn assert_payload_addr_aligned(bytes: &[u8]) { + let ptr = bytes.as_ptr() as usize; + let a = PAYLOAD_ALIGNMENT as usize; + assert!( + ptr % a == 0, + "payload start address is not {}-byte aligned", + a + ); +} + +/// Purely safe pointer math: prove a &[T] view would be legal. +fn assert_can_view_as(bytes: &[u8]) { + let a_t = align_of::(); + assert!( + a_t <= PAYLOAD_ALIGNMENT as usize, + "type align {} exceeds PAYLOAD_ALIGNMENT {}", + a_t, + PAYLOAD_ALIGNMENT + ); + let ptr = bytes.as_ptr() as usize; + assert!( + ptr % a_t == 0, + "payload addr {} is not aligned to T (align {})", + ptr, + a_t + ); + assert!( + bytes.len() % size_of::() == 0, + "payload length {} is not a multiple of {}", + bytes.len(), + size_of::() + ); +} + +/// bytemuck zero-copy proof: get a typed view or fail. +fn assert_bytemuck_view_u32(bytes: &[u8]) { + let _: &[u32] = try_cast_slice(bytes).expect("cast &[u8]->&[u32] failed"); +} +fn assert_bytemuck_view_u64(bytes: &[u8]) { + let _: &[u64] = try_cast_slice(bytes).expect("cast &[u8]->&[u64] failed"); +} +fn assert_bytemuck_view_u128(bytes: &[u8]) { + let _: &[u128] = try_cast_slice(bytes).expect("cast &[u8]->&[u128] failed"); +} + +#[cfg(target_arch = "x86_64")] +fn assert_simd_16_byte_loadable(bytes: &[u8]) { + assert!( + (bytes.as_ptr() as usize) % 16 == 0, + "SIMD pointer must be 16-byte aligned" + ); + let lanes = bytes.len() / 16; + unsafe { + for i in 0..lanes { + let p = bytes.as_ptr().add(i * 16) as *const __m128i; + let v = _mm_load_si128(p); + core::hint::black_box(v); + } + } +} + +#[cfg(target_arch = "aarch64")] +fn assert_simd_16_byte_loadable(bytes: &[u8]) { + assert!( + (bytes.as_ptr() as usize) % 16 == 0, + "SIMD pointer must be 16-byte aligned" + ); + let lanes = bytes.len() / 16; + unsafe { + for i in 0..lanes { + let p = bytes.as_ptr().add(i * 16); + let v0 = vld1q_u8(p); + core::hint::black_box(v0); + let p_vec = p as *const uint8x16_t; + let v1: uint8x16_t = core::ptr::read(p_vec); + core::hint::black_box(v1); + } + } +} + +#[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))] +fn assert_simd_16_byte_loadable(bytes: &[u8]) { + // Portable fallback: re-assert address and u128 view conditions. + assert_payload_addr_aligned(bytes); + if bytes.len() >= 16 && bytes.len() % 16 == 0 { + assert_can_view_as::(bytes); + assert_bytemuck_view_u128(bytes); + } +} + +#[test] +fn byte_alignment_unaligned_then_overwrite_and_simd() { + let dir = tempdir().expect("create tempdir"); + let path = dir.path().join("store.bin"); + let store = DataStore::open(&path).expect("open datastore"); + + // Phase 1: write unaligned strings first (3,5,7,9 bytes). + let s1 = b"abc"; // 3 + let s2 = b"abcde"; // 5 + let s3 = b"abcdefg"; // 7 + let s4 = b"abcdefghi"; // 9 + + store.write(b"k_s1", s1).unwrap(); + store.write(b"k_s2", s2).unwrap(); + store.write(b"k_s3", s3).unwrap(); + store.write(b"k_s4", s4).unwrap(); + + // Mix in numeric payloads to stress alignment interactions. + let v_u32 = vec![0xEEu8; 5 * 4]; // 20 bytes + let v_u64 = vec![0x33u8; 9 * 8]; // 72 bytes + let v_u128 = vec![0x77u8; 4 * 16]; // 64 bytes + store.write(b"k_u32", &v_u32).unwrap(); + store.write(b"k_u64", &v_u64).unwrap(); + store.write(b"k_u128", &v_u128).unwrap(); + + // Assert alignment after initial writes. + let e_s1 = store.read(b"k_s1").unwrap().expect("k_s1 missing"); + let e_s2 = store.read(b"k_s2").unwrap().expect("k_s2 missing"); + let e_s3 = store.read(b"k_s3").unwrap().expect("k_s3 missing"); + let e_s4 = store.read(b"k_s4").unwrap().expect("k_s4 missing"); + let e_u32 = store.read(b"k_u32").unwrap().expect("k_u32 missing"); + let e_u64 = store.read(b"k_u64").unwrap().expect("k_u64 missing"); + let e_u128 = store.read(b"k_u128").unwrap().expect("k_u128 missing"); + + for bytes in [ + e_s1.as_slice(), + e_s2.as_slice(), + e_s3.as_slice(), + e_s4.as_slice(), + e_u32.as_slice(), + e_u64.as_slice(), + e_u128.as_slice(), + ] { + assert_payload_addr_aligned(bytes); + } + + // Phase 2: delete one string (tombstone, no pre-pad). + store.delete(b"k_s2").unwrap(); + + // Phase 3: overwrite with 16B-multiple payloads. + let s1_aligned = vec![0xA5u8; 2 * 16]; // 32 bytes + let s3_aligned = vec![0xB6u8; 3 * 16]; // 48 bytes + let u32_aligned = vec![0xCCu8; 16 * 4]; // 64 bytes + + store.write(b"k_s1", &s1_aligned).unwrap(); + store.write(b"k_s3", &s3_aligned).unwrap(); + store.write(b"k_u32", &u32_aligned).unwrap(); + + // Fetch survivors and assert alignment again. + let e_s1_new = store.read(b"k_s1").unwrap().expect("k_s1 missing"); + let e_s3_new = store.read(b"k_s3").unwrap().expect("k_s3 missing"); + let e_s4_new = store.read(b"k_s4").unwrap().expect("k_s4 missing"); + let e_u32_new = store.read(b"k_u32").unwrap().expect("k_u32 missing"); + let e_u64_new = store.read(b"k_u64").unwrap().expect("k_u64 missing"); + let e_u128_new = store.read(b"k_u128").unwrap().expect("k_u128 missing"); + let e_s2_gone = store.read(b"k_s2").unwrap(); + assert!(e_s2_gone.is_none(), "deleted key k_s2 should be absent"); + + for bytes in [ + e_s1_new.as_slice(), + e_s3_new.as_slice(), + e_s4_new.as_slice(), + e_u32_new.as_slice(), + e_u64_new.as_slice(), + e_u128_new.as_slice(), + ] { + assert_payload_addr_aligned(bytes); + } + + // Prove typed views would be zero-copy by math and bytemuck. + assert_can_view_as::(e_u32_new.as_slice()); + assert_can_view_as::(e_u64_new.as_slice()); + assert_can_view_as::(e_u128_new.as_slice()); + assert_bytemuck_view_u32(e_u32_new.as_slice()); + assert_bytemuck_view_u64(e_u64_new.as_slice()); + assert_bytemuck_view_u128(e_u128_new.as_slice()); + + // SIMD loads or portable fallback. + for bytes in [ + e_s1_new.as_slice(), + e_s3_new.as_slice(), + e_u32_new.as_slice(), + e_u64_new.as_slice(), + e_u128_new.as_slice(), + ] { + if bytes.len() >= 16 { + assert_simd_16_byte_loadable(bytes); + } + } + + // Iterator must yield aligned, non-tombstone payloads. + for entry in store.iter_entries() { + let bytes = entry.as_slice(); + assert_payload_addr_aligned(bytes); + if bytes.len() >= 16 { + assert_simd_16_byte_loadable(bytes); + } + } + + // tempdir cleans up automatically. +}