diff --git a/experiments/bindings/python-ws-client/pyproject.toml b/experiments/bindings/python-ws-client/pyproject.toml index 21986765..9740d62e 100644 --- a/experiments/bindings/python-ws-client/pyproject.toml +++ b/experiments/bindings/python-ws-client/pyproject.toml @@ -41,4 +41,5 @@ dev = [ "puccinialin>=0.1.5", "pytest>=8.4.1", "pytest-benchmark>=5.1.0", + "pytest-order>=1.3.0", ] diff --git a/experiments/bindings/python-ws-client/simd_r_drive_ws_client/simd_r_drive_ws_client.pyi b/experiments/bindings/python-ws-client/simd_r_drive_ws_client/simd_r_drive_ws_client.pyi index 00dc4da0..d84dcf5f 100644 --- a/experiments/bindings/python-ws-client/simd_r_drive_ws_client/simd_r_drive_ws_client.pyi +++ b/experiments/bindings/python-ws-client/simd_r_drive_ws_client/simd_r_drive_ws_client.pyi @@ -77,20 +77,18 @@ class DataStoreWsClient: key is not present. """ ... - # - # - # TODO: Integrate - # def delete(self, key: bytes) -> None: - # """ - # Marks the key as deleted (logically removes it). - # This operation does not physically remove the data but appends a tombstone - # entry to mark the key as deleted. + def delete(self, key: bytes) -> None: + """ + Marks the key as deleted (logically removes it). - # Args: - # key (bytes): The key to mark as deleted. - # """ - # ... + This operation does not physically remove the data but appends a tombstone + entry to mark the key as deleted. + + Args: + key (bytes): The key to mark as deleted. + """ + ... # def exists(self, key: bytes) -> bool: # """ @@ -120,6 +118,33 @@ class DataStoreWsClient: # """ # return self.exists(key) + def __len__(self) -> int: + """ + Returns the total number of active entries in the store. + + Returns: + int: The total number of active entries in the store. + """ + ... + + def is_empty(self) -> bool: + """ + Determines if the store is empty or has no active keys. + + Returns: + bool: Whether or not the store has any active keys. + """ + ... + + def file_size(self) -> int: + """ + Returns the current file size on disk (including those of deleted entries). + + Returns: + int: File size in bytes. + """ + ... + @final class NamespaceHasher: """ diff --git a/experiments/bindings/python-ws-client/src/base_ws_client_py.rs b/experiments/bindings/python-ws-client/src/base_ws_client_py.rs index bf69db6a..907280b6 100644 --- a/experiments/bindings/python-ws-client/src/base_ws_client_py.rs +++ b/experiments/bindings/python-ws-client/src/base_ws_client_py.rs @@ -11,6 +11,8 @@ use std::time::Duration; use tokio::runtime::{Builder, Runtime}; use tokio::time::timeout; +// TODO: Move timeout handling into inner client + // TODO: Borrow configuration props from MySQL // connection_timeout=10, # Timeout for the connection attempt (in seconds) // read_timeout=30, # Timeout for waiting for response from server (in seconds) @@ -64,8 +66,6 @@ impl BaseDataStoreWsClient { Ok(()) } - // --- All public methods are now blocking again, but with timeouts --- - #[pyo3(name = "write")] fn py_write(&self, key: Vec, payload: Vec) -> PyResult<()> { self.check_connection()?; @@ -76,7 +76,7 @@ impl BaseDataStoreWsClient { match timeout(Duration::from_secs(30), client.write(&key, &payload)).await { Ok(Ok(_)) => Ok(()), Ok(Err(e)) => Err(PyIOError::new_err(e.to_string())), - Err(_) => Err(TimeoutError::new_err("Write operation timed out.")), + Err(_) => Err(TimeoutError::new_err("`write` operation timed out.")), } }) } @@ -94,7 +94,7 @@ impl BaseDataStoreWsClient { match timeout(Duration::from_secs(60), client.batch_write(&converted)).await { Ok(Ok(_)) => Ok(()), Ok(Err(e)) => Err(PyIOError::new_err(e.to_string())), - Err(_) => Err(TimeoutError::new_err("Batch write operation timed out.")), + Err(_) => Err(TimeoutError::new_err("`batch_write` operation timed out.")), } }) } @@ -112,9 +112,9 @@ impl BaseDataStoreWsClient { let maybe_bytes = self.runtime.block_on(async { // TODO: Don't hardcode timeout match timeout(Duration::from_secs(30), client.read(&key)).await { - Ok(Ok(data)) => Ok(data), + Ok(Ok(entry_payload)) => Ok(entry_payload), Ok(Err(e)) => Err(PyIOError::new_err(e.to_string())), - Err(_) => Err(TimeoutError::new_err("Read operation timed out.")), + Err(_) => Err(TimeoutError::new_err("`read` operation timed out.")), } })?; @@ -130,9 +130,9 @@ impl BaseDataStoreWsClient { let results = self.runtime.block_on(async { // TODO: Don't hardcode timeout match timeout(Duration::from_secs(60), client.batch_read(&key_slices)).await { - Ok(Ok(data)) => Ok(data), + Ok(Ok(entries_payloads)) => Ok(entries_payloads), Ok(Err(e)) => Err(PyIOError::new_err(e.to_string())), - Err(_) => Err(TimeoutError::new_err("Batch read operation timed out.")), + Err(_) => Err(TimeoutError::new_err("`batch_read` operation timed out.")), } })?; @@ -143,4 +143,67 @@ impl BaseDataStoreWsClient { .collect()) }) } + + #[pyo3(name = "delete")] + fn py_delete(&self, key: Vec) -> PyResult<()> { + self.check_connection()?; + let client = self.ws_client.clone(); + + self.runtime.block_on(async { + // TODO: Don't hardcode timeout + match timeout(Duration::from_secs(30), client.delete(&key)).await { + Ok(Ok(_)) => Ok(()), + Ok(Err(e)) => Err(PyIOError::new_err(e.to_string())), + Err(_) => Err(TimeoutError::new_err("`delete` operation timed out.")), + } + }) + } + + /// Implements the `len()` built-in for Python. + /// + /// This allows you to call `len(store)` to get the number of active entries. + /// It assumes the underlying Rust client has a `len()` method. + fn __len__(&self) -> PyResult { + self.check_connection()?; + let client = self.ws_client.clone(); + + self.runtime.block_on(async { + // TODO: Don't hardcode timeout + match timeout(Duration::from_secs(30), client.len()).await { + Ok(Ok(total_entries)) => Ok(total_entries), + Ok(Err(e)) => Err(PyIOError::new_err(e.to_string())), + Err(_) => Err(TimeoutError::new_err("`len` operation timed out.")), + } + }) + } + + #[pyo3(name = "is_empty")] + fn py_is_empty(&self) -> PyResult<(bool)> { + self.check_connection()?; + let client = self.ws_client.clone(); + + self.runtime.block_on(async { + // TODO: Don't hardcode timeout + match timeout(Duration::from_secs(30), client.is_empty()).await { + Ok(Ok(is_empty)) => Ok(is_empty), + Ok(Err(e)) => Err(PyIOError::new_err(e.to_string())), + Err(_) => Err(TimeoutError::new_err("`is_empty` operation timed out.")), + } + }) + } + + #[pyo3(name = "file_size")] + fn py_file_size(&self) -> PyResult<(u64)> { + self.check_connection()?; + let client = self.ws_client.clone(); + + self.runtime.block_on(async { + // TODO: Don't hardcode timeout + match timeout(Duration::from_secs(30), client.file_size()).await { + Ok(Ok(file_size)) => Ok(file_size), + Ok(Err(e)) => Err(PyIOError::new_err(e.to_string())), + Err(_) => Err(TimeoutError::new_err("`file_size` operation timed out.")), + } + }) + } } diff --git a/experiments/bindings/python-ws-client/tests.TODO/test_engine.py--TODO.IMPLEMENT_FOR_NET_BASED b/experiments/bindings/python-ws-client/tests.TODO/test_engine.py--TODO.IMPLEMENT_FOR_NET_BASED index 40214240..8288da8c 100644 --- a/experiments/bindings/python-ws-client/tests.TODO/test_engine.py--TODO.IMPLEMENT_FOR_NET_BASED +++ b/experiments/bindings/python-ws-client/tests.TODO/test_engine.py--TODO.IMPLEMENT_FOR_NET_BASED @@ -237,7 +237,7 @@ def test_entry_accessors(): assert entry.size == len(value) assert len(entry) == len(value) # Dunder test - assert entry.size_with_metadata > entry.size + assert entry.file_size > entry.size # Check offset logic start = entry.start_offset diff --git a/experiments/bindings/python-ws-client/tests.TODO/test_namespace_hasher.py--TODO.IMPLEMENT_FOR_NET_BASED b/experiments/bindings/python-ws-client/tests.TODO/test_namespace_hasher.py--TODO.IMPLEMENT_FOR_NET_BASED deleted file mode 100644 index cb5db2c5..00000000 --- a/experiments/bindings/python-ws-client/tests.TODO/test_namespace_hasher.py--TODO.IMPLEMENT_FOR_NET_BASED +++ /dev/null @@ -1,68 +0,0 @@ -import tempfile -import os -from simd_r_drive import DataStore, NamespaceHasher - - -def test_namespace_hashing_differentiates_namespaces(): - with tempfile.TemporaryDirectory() as tmpdir: - filepath = os.path.join(tmpdir, "store.bin") - engine = DataStore(filepath) - - key = b"user:1" - payload = b"example_payload" - - # Create two different namespace hashers - hasher1 = NamespaceHasher(b"namespaceA") - hasher2 = NamespaceHasher(b"namespaceB") - - key1 = hasher1.namespace(key) - key2 = hasher2.namespace(key) - - assert key1 != key2, "Keys with different namespaces should not match" - - # Write both entries - engine.write(key1, payload) - engine.write(key2, payload[::-1]) # reverse for distinction - - result1 = engine.read(key1) - result2 = engine.read(key2) - - assert result1 == payload - assert result2 == payload[::-1] - - del engine - - -def test_namespace_hashing_consistency(): - hasher = NamespaceHasher(b"shared") - - key = b"important_key" - namespaced1 = hasher.namespace(key) - namespaced2 = hasher.namespace(key) - - assert ( - namespaced1 == namespaced2 - ), "Same key and namespace must produce consistent result" - - -def test_namespace_hashing_allows_collision_avoidance(): - with tempfile.TemporaryDirectory() as tmpdir: - filepath = os.path.join(tmpdir, "store.bin") - engine = DataStore(filepath) - - keys = [b"user:alpha", b"user:beta", b"user:gamma"] - namespaces = [b"ns1", b"ns2", b"ns3"] - - seen = set() - - for ns in namespaces: - hasher = NamespaceHasher(ns) - for key in keys: - namespaced = bytes(hasher.namespace(key)) - assert namespaced not in seen, "Namespaced key should be unique" - seen.add(namespaced) - engine.write(namespaced, b"data") - - assert len(seen) == len(keys) * len(namespaces) - - del engine diff --git a/experiments/bindings/python-ws-client/tests/integraton/test_client.py b/experiments/bindings/python-ws-client/tests/integraton/test_client.py index 2345d8f7..8d344c73 100644 --- a/experiments/bindings/python-ws-client/tests/integraton/test_client.py +++ b/experiments/bindings/python-ws-client/tests/integraton/test_client.py @@ -25,6 +25,25 @@ def client(): f"Failed to connect to the WebSocket server at {SERVER_HOST}. Is it running? Error: {e}" ) +@pytest.mark.order(1) +def test_is_empty(client): + # TODO: Alternatively, implement a `clear` method? + + assert client.is_empty() + assert len(client) == 0 + assert client.file_size() == 0 + + client.write(b"testing", b"123") + + assert client.is_empty() == False + assert len(client) == 1 + assert client.file_size() > 0 + + client.delete(b"testing") + + assert client.is_empty() + assert len(client) == 0 + assert (client.file_size() > 0), f"File size expected to still be above 0 after key deletion" def test_simple_read_write(client): """Tests a simple write operation followed by a read.""" @@ -54,18 +73,29 @@ def test_simple_read_write(client): def test_batch_write_and_read(client): - """Tests a batch write operation followed by individual reads.""" + """Tests a batch write operation followed by individual reads and count verification.""" entries = [ - (b"batch-key-1", b"value-alpha"), - (b"batch-key-2", b"value-beta"), - (b"batch-key-3", b"value-gamma"), + (f"batch-key-{secrets.token_hex(4)}".encode(), b"value-alpha"), + (f"batch-key-{secrets.token_hex(4)}".encode(), b"value-beta"), + (f"batch-key-{secrets.token_hex(4)}".encode(), b"value-gamma"), ] try: + print("\n--- Starting batch write and count test ---") + initial_count = len(client) + print(f"Initial count: {initial_count}") + print("Attempting to perform a batch write...") client.batch_write(entries) print("Batch write operation completed.") + # Verify count increased correctly + new_count = len(client) + print(f"New count: {new_count}") + assert new_count == initial_count + len( + entries + ), f"FAIL: Count should be {initial_count + len(entries)}, but is {new_count}." + print("Verifying batch write by reading each key...") for key, value in entries: read_value = client.read(key) @@ -76,7 +106,7 @@ def test_batch_write_and_read(client): read_value == value ), f"FAIL: Value mismatch for key '{key.decode()}'." - print("SUCCESS: Batch write test passed.") + print("SUCCESS: Batch write and count test passed.") except Exception as e: pytest.fail(f"An exception occurred during the batch write test: {e}") @@ -226,3 +256,120 @@ def test_batch_read_structured_list_of_dicts(client): result == expected_result ), "The hydrated list of dictionaries does not match the expected result" print("SUCCESS: batch_read_structured with a list of dictionaries passed.") + + +def test_count_simple(client): + """Tests the basic increment/decrement behavior of the entry count detection.""" + print("\n--- Starting simple count test ---") + key1 = f"count-key-{secrets.token_hex(4)}".encode() + key2 = f"count-key-{secrets.token_hex(4)}".encode() + + # 1. Initial state + initial_count = len(client) + print(f"Initial count: {initial_count}") + + # 2. Add a new key + client.write(key1, b"count-data-1") + assert len(client) == initial_count + 1, "Count should increment after first write" + print(f"Count after one write: {len(client)}") + + # 3. Update an existing key + client.write(key1, b"count-data-1-updated") + assert len(client) == initial_count + 1, "Count should not change after an update" + print(f"Count after update: {len(client)}") + + # 4. Add a second key + client.write(key2, b"count-data-2") + assert len(client) == initial_count + 2, "Count should increment after second write" + print(f"Count after second write: {len(client)}") + + # 5. Delete a key + client.delete(key1) + assert len(client) == initial_count + 1, "Count should decrement after delete" + print(f"Count after one delete: {len(client)}") + + # 6. Delete a non-existent key + client.delete(key1) # Already deleted + assert len(client) == initial_count + 1, "Count should not change when deleting a non-existent key" + print(f"Count after deleting a non-existent key: {len(client)}") + + # 7. Delete the final key + client.delete(key2) + assert len(client) == initial_count, "Count should return to initial after all deletes" + print(f"Final count: {len(client)}") + + print("SUCCESS: Simple count test passed.") + + +def test_delete_key(client): + """Tests that deleting a key makes it non-existent and decrements the count.""" + key = b"key-to-be-deleted" + value = b"some-data-to-remove" + print("\n--- Starting delete handling and count test ---") + + # Arrange: Write a key, and verify it exists and count is correct. + initial_count = len(client) + print(f"Writing key '{key.decode()}' for deletion test. Initial count: {initial_count}") + client.write(key, value) + + assert len(client) == initial_count + 1, "FAIL: Count did not increment after write" + initial_read = client.read(key) + assert initial_read == value, "Pre-condition failed: Key was not written correctly before delete." + print(f"Key confirmed to exist. Count is now {len(client)}") + + # Act: Delete the key. + print(f"Deleting key '{key.decode()}'.") + client.delete(key) + + # Assert: The key should no longer exist and the count should be restored. + final_read = client.read(key) + print(f"Read after delete returned: {final_read}") + assert final_read is None, "FAIL: Reading a deleted key should return None." + + final_count = len(client) + print(f"Final count: {final_count}") + assert final_count == initial_count, "FAIL: Count did not decrement after delete" + + print("SUCCESS: Delete handling and count test passed.") + + +def test_delete_with_batch_read(client): + """ + Tests that a deleted key is correctly handled as `None` in a batch_read and count is updated. + """ + print("\n--- Starting delete with batch_read and count test ---") + # Arrange: Write a set of keys. + entries = [ + (f"dbr-{secrets.token_hex(4)}".encode(), b"value-one"), + (f"dbr-{secrets.token_hex(4)}".encode(), b"this-should-vanish"), + (f"dbr-{secrets.token_hex(4)}".encode(), b"value-three"), + ] + keys_to_fetch = [key for key, _ in entries] + key_to_delete = keys_to_fetch[1] + + initial_count = len(client) + print(f"Writing initial batch for delete test. Initial count: {initial_count}") + client.batch_write(entries) + + count_after_write = len(client) + assert count_after_write == initial_count + len(entries), "FAIL: Count did not increment correctly after batch write" + print(f"Count after batch write: {count_after_write}") + + # Act: Delete one of the keys from the batch. + print(f"Deleting key '{key_to_delete.decode()}'.") + client.delete(key_to_delete) + + # Assert count + count_after_delete = len(client) + assert count_after_delete == count_after_write - 1, "FAIL: Count did not decrement after delete" + print(f"Count after delete: {count_after_delete}") + + # Assert batch_read correctness + print(f"Performing batch_read on keys: {[k.decode() for k in keys_to_fetch]}") + results = client.batch_read(keys_to_fetch) + + expected_results = [entries[0][1], None, entries[2][1]] + + assert results == expected_results, \ + f"FAIL: batch_read did not correctly handle the deleted key. Expected {expected_results}, but got {results}." + print("SUCCESS: Delete with batch_read and count test passed.") diff --git a/experiments/bindings/python-ws-client/tests/integraton/test_namespace_hasher.py b/experiments/bindings/python-ws-client/tests/integraton/test_namespace_hasher.py new file mode 100644 index 00000000..63b9fab3 --- /dev/null +++ b/experiments/bindings/python-ws-client/tests/integraton/test_namespace_hasher.py @@ -0,0 +1,79 @@ +import pytest +from simd_r_drive_ws_client import DataStoreWsClient, NamespaceHasher +import time +import os +import secrets + +# Server address, configurable via environment variable +SERVER_HOST = os.environ.get("TEST_SERVER_HOST", "127.0.0.1") +SERVER_PORT = int(os.environ.get("TEST_SERVER_PORT", 34129)) + + +@pytest.fixture(scope="module") +def client(): + """ + Fixture to create and connect the WsClient. + The scope is 'module' so it connects only once for all tests. + """ + # Allow some time for the server to start up. + time.sleep(2) + try: + ws_client = DataStoreWsClient(SERVER_HOST, SERVER_PORT) + yield ws_client + except Exception as e: + pytest.fail( + f"Failed to connect to the WebSocket server at {SERVER_HOST}. Is it running? Error: {e}" + ) + + +def test_namespace_hashing_differentiates_namespaces(client): + key = b"user:1" + payload = b"example_payload" + + # Create two different namespace hashers + hasher1 = NamespaceHasher(b"namespaceA") + hasher2 = NamespaceHasher(b"namespaceB") + + key1 = hasher1.namespace(key) + key2 = hasher2.namespace(key) + + assert key1 != key2, "Keys with different namespaces should not match" + + # Write both entries + client.write(key1, payload) + client.write(key2, payload[::-1]) # reverse for distinction + + result1 = client.read(key1) + result2 = client.read(key2) + + assert result1 == payload + assert result2 == payload[::-1] + + +def test_namespace_hashing_consistency(): + hasher = NamespaceHasher(b"shared") + + key = b"important_key" + namespaced1 = hasher.namespace(key) + namespaced2 = hasher.namespace(key) + + assert ( + namespaced1 == namespaced2 + ), "Same key and namespace must produce consistent result" + + +def test_namespace_hashing_allows_collision_avoidance(client): + keys = [b"user:alpha", b"user:beta", b"user:gamma"] + namespaces = [b"ns1", b"ns2", b"ns3"] + + seen = set() + + for ns in namespaces: + hasher = NamespaceHasher(ns) + for key in keys: + namespaced = bytes(hasher.namespace(key)) + assert namespaced not in seen, "Namespaced key should be unique" + seen.add(namespaced) + client.write(namespaced, b"data") + + assert len(seen) == len(keys) * len(namespaces) diff --git a/experiments/bindings/python-ws-client/uv.lock b/experiments/bindings/python-ws-client/uv.lock index 2b040513..d8d0e03a 100644 --- a/experiments/bindings/python-ws-client/uv.lock +++ b/experiments/bindings/python-ws-client/uv.lock @@ -282,9 +282,21 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/9e/d6/b41653199ea09d5969d4e385df9bbfd9a100f28ca7e824ce7c0a016e3053/pytest_benchmark-5.1.0-py3-none-any.whl", hash = "sha256:922de2dfa3033c227c96da942d1878191afa135a29485fb942e85dff1c592c89", size = 44259, upload-time = "2024-10-30T11:51:45.94Z" }, ] +[[package]] +name = "pytest-order" +version = "1.3.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pytest" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/1d/66/02ae17461b14a52ce5a29ae2900156b9110d1de34721ccc16ccd79419876/pytest_order-1.3.0.tar.gz", hash = "sha256:51608fec3d3ee9c0adaea94daa124a5c4c1d2bb99b00269f098f414307f23dde", size = 47544, upload-time = "2024-08-22T12:29:54.512Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/1b/73/59b038d1aafca89f8e9936eaa8ffa6bb6138d00459d13a32ce070be4f280/pytest_order-1.3.0-py3-none-any.whl", hash = "sha256:2cd562a21380345dd8d5774aa5fd38b7849b6ee7397ca5f6999bbe6e89f07f6e", size = 14609, upload-time = "2024-08-22T12:29:53.156Z" }, +] + [[package]] name = "simd-r-drive-ws-client" -version = "0.8.0a0" +version = "0.9.0a0" source = { editable = "." } [package.dev-dependencies] @@ -295,6 +307,7 @@ dev = [ { name = "puccinialin" }, { name = "pytest" }, { name = "pytest-benchmark" }, + { name = "pytest-order" }, ] [package.metadata] @@ -307,6 +320,7 @@ dev = [ { name = "puccinialin", specifier = ">=0.1.5" }, { name = "pytest", specifier = ">=8.4.1" }, { name = "pytest-benchmark", specifier = ">=5.1.0" }, + { name = "pytest-order", specifier = ">=1.3.0" }, ] [[package]] diff --git a/experiments/bindings/python/simd_r_drive/simd_r_drive.pyi b/experiments/bindings/python/simd_r_drive/simd_r_drive.pyi index c5da35c0..d5cfdeef 100644 --- a/experiments/bindings/python/simd_r_drive/simd_r_drive.pyi +++ b/experiments/bindings/python/simd_r_drive/simd_r_drive.pyi @@ -130,14 +130,14 @@ class EntryHandle: ... @property - def size_with_metadata(self) -> int: + def file_size(self) -> int: """ Property: Returns the total size of the entry, including metadata. This method includes the metadata overhead (e.g., checksum, key hash) in the total size, providing the complete size of the entry including both data and associated metadata. - Both `size_with_metadata` and `size` access the value directly from the memory-mapped file, so they do not + Both `file_size` and `size` access the value directly from the memory-mapped file, so they do not require the data to be loaded into RAM. Returns: diff --git a/experiments/bindings/python/src/data_store.rs b/experiments/bindings/python/src/data_store.rs index 4f5b321b..08fb4a4f 100644 --- a/experiments/bindings/python/src/data_store.rs +++ b/experiments/bindings/python/src/data_store.rs @@ -125,7 +125,7 @@ impl DataStore { self.inner .lock() .unwrap() - .delete_entry(key) + .delete(key) .map(|_| ()) .map_err(|e| pyo3::exceptions::PyIOError::new_err(e.to_string())) } diff --git a/experiments/bindings/python/src/entry_handle.rs b/experiments/bindings/python/src/entry_handle.rs index d62cf688..8eec0480 100644 --- a/experiments/bindings/python/src/entry_handle.rs +++ b/experiments/bindings/python/src/entry_handle.rs @@ -19,8 +19,8 @@ impl EntryHandle { } #[getter] - fn size_with_metadata(&self) -> usize { - self.inner.size_with_metadata() + fn file_size(&self) -> usize { + self.inner.file_size() } #[getter] diff --git a/experiments/bindings/python/tests/test_engine.py b/experiments/bindings/python/tests/test_engine.py index 40214240..8288da8c 100644 --- a/experiments/bindings/python/tests/test_engine.py +++ b/experiments/bindings/python/tests/test_engine.py @@ -237,7 +237,7 @@ def test_entry_accessors(): assert entry.size == len(value) assert len(entry) == len(value) # Dunder test - assert entry.size_with_metadata > entry.size + assert entry.file_size > entry.size # Check offset logic start = entry.start_offset diff --git a/experiments/simd-r-drive-muxio-service-definition/README.md b/experiments/simd-r-drive-muxio-service-definition/README.md index 94cde08d..fe0f4152 100644 --- a/experiments/simd-r-drive-muxio-service-definition/README.md +++ b/experiments/simd-r-drive-muxio-service-definition/README.md @@ -2,7 +2,7 @@ Experimental Muxio service definitions [SIMD R Drive](https://crates.io/crates/simd-r-drive), a high-performance, zero-copy, append-only single-file-container storage engine for Rust. -**-- Work in progress --** +**Work in progress.** - https://crates.io/crates/simd-r-drive - https://crates.io/crates/muxio diff --git a/experiments/simd-r-drive-muxio-service-definition/src/prebuffered.rs b/experiments/simd-r-drive-muxio-service-definition/src/prebuffered.rs index 6178be9a..91bf6231 100644 --- a/experiments/simd-r-drive-muxio-service-definition/src/prebuffered.rs +++ b/experiments/simd-r-drive-muxio-service-definition/src/prebuffered.rs @@ -9,3 +9,15 @@ pub use batch_write::*; mod write; pub use write::*; + +mod delete; +pub use delete::*; + +mod len; +pub use len::*; + +mod is_empty; +pub use is_empty::*; + +mod file_size; +pub use file_size::*; diff --git a/experiments/simd-r-drive-muxio-service-definition/src/prebuffered/batch_read.rs b/experiments/simd-r-drive-muxio-service-definition/src/prebuffered/batch_read.rs index 889c3d6c..1f6fbc17 100644 --- a/experiments/simd-r-drive-muxio-service-definition/src/prebuffered/batch_read.rs +++ b/experiments/simd-r-drive-muxio-service-definition/src/prebuffered/batch_read.rs @@ -9,7 +9,7 @@ pub struct BatchReadRequestParams { #[derive(Encode, Decode, Debug, PartialEq)] pub struct BatchReadResponseParams { - pub entries: Vec>>, + pub entries_payloads: Vec>>, } pub struct BatchRead; @@ -20,25 +20,25 @@ impl RpcMethodPrebuffered for BatchRead { type Input = BatchReadRequestParams; type Output = BatchReadResponseParams; - fn encode_request(read_request_params: BatchReadRequestParams) -> Result, io::Error> { - Ok(bitcode::encode(&read_request_params)) + fn encode_request(request_params: Self::Input) -> Result, io::Error> { + Ok(bitcode::encode(&request_params)) } fn decode_request(bytes: &[u8]) -> Result { - let req_params = bitcode::decode::(bytes) + let request_params = bitcode::decode::(bytes) .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - Ok(req_params) + Ok(request_params) } - fn encode_response(results: Self::Output) -> Result, io::Error> { - Ok(bitcode::encode(&results)) + fn encode_response(response_params: Self::Output) -> Result, io::Error> { + Ok(bitcode::encode(&response_params)) } - fn decode_response(bytes: &[u8]) -> Result { - let resp_params = bitcode::decode::(bytes) + fn decode_response(response_bytes: &[u8]) -> Result { + let response_params = bitcode::decode::(response_bytes) .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - Ok(resp_params) + Ok(response_params) } } diff --git a/experiments/simd-r-drive-muxio-service-definition/src/prebuffered/batch_write.rs b/experiments/simd-r-drive-muxio-service-definition/src/prebuffered/batch_write.rs index f49f09c1..e14b796e 100644 --- a/experiments/simd-r-drive-muxio-service-definition/src/prebuffered/batch_write.rs +++ b/experiments/simd-r-drive-muxio-service-definition/src/prebuffered/batch_write.rs @@ -20,25 +20,25 @@ impl RpcMethodPrebuffered for BatchWrite { type Input = BatchWriteRequestParams; type Output = BatchWriteResponseParams; - fn encode_request(write_request_params: BatchWriteRequestParams) -> Result, io::Error> { - Ok(bitcode::encode(&write_request_params)) + fn encode_request(request_params: Self::Input) -> Result, io::Error> { + Ok(bitcode::encode(&request_params)) } fn decode_request(bytes: &[u8]) -> Result { - let req_params = bitcode::decode::(bytes) + let request_params = bitcode::decode::(bytes) .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - Ok(req_params) + Ok(request_params) } - fn encode_response(result: Self::Output) -> Result, io::Error> { - Ok(bitcode::encode(&result)) + fn encode_response(response_params: Self::Output) -> Result, io::Error> { + Ok(bitcode::encode(&response_params)) } - fn decode_response(bytes: &[u8]) -> Result { - let resp_params = bitcode::decode::(bytes) + fn decode_response(response_bytes: &[u8]) -> Result { + let response_params = bitcode::decode::(response_bytes) .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - Ok(resp_params) + Ok(response_params) } } diff --git a/experiments/simd-r-drive-muxio-service-definition/src/prebuffered/delete.rs b/experiments/simd-r-drive-muxio-service-definition/src/prebuffered/delete.rs new file mode 100644 index 00000000..69c5bce8 --- /dev/null +++ b/experiments/simd-r-drive-muxio-service-definition/src/prebuffered/delete.rs @@ -0,0 +1,44 @@ +use bitcode::{Decode, Encode}; +use muxio_rpc_service::{prebuffered::RpcMethodPrebuffered, rpc_method_id}; +use std::io; + +#[derive(Encode, Decode, PartialEq, Debug)] +pub struct DeleteRequestParams { + pub key: Vec, +} + +#[derive(Encode, Decode, PartialEq, Debug)] +pub struct DeleteResponseParams { + pub tail_offset: u64, +} + +pub struct Delete; + +impl RpcMethodPrebuffered for Delete { + const METHOD_ID: u64 = rpc_method_id!("delete"); + + type Input = DeleteRequestParams; + type Output = DeleteResponseParams; + + fn encode_request(request_params: Self::Input) -> Result, io::Error> { + Ok(bitcode::encode(&request_params)) + } + + fn decode_request(bytes: &[u8]) -> Result { + let request_params = bitcode::decode::(bytes) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + + Ok(request_params) + } + + fn encode_response(response_params: Self::Output) -> Result, io::Error> { + Ok(bitcode::encode(&response_params)) + } + + fn decode_response(response_bytes: &[u8]) -> Result { + let response_params = bitcode::decode::(response_bytes) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + + Ok(response_params) + } +} diff --git a/experiments/simd-r-drive-muxio-service-definition/src/prebuffered/file_size.rs b/experiments/simd-r-drive-muxio-service-definition/src/prebuffered/file_size.rs new file mode 100644 index 00000000..43b3cbf0 --- /dev/null +++ b/experiments/simd-r-drive-muxio-service-definition/src/prebuffered/file_size.rs @@ -0,0 +1,42 @@ +use bitcode::{Decode, Encode}; +use muxio_rpc_service::{prebuffered::RpcMethodPrebuffered, rpc_method_id}; +use std::io; + +#[derive(Encode, Decode, PartialEq, Debug)] +pub struct FileSizeRequestParams {} + +#[derive(Encode, Decode, PartialEq, Debug)] +pub struct FileSizeResponseParams { + pub file_size: u64, +} + +pub struct FileSize; + +impl RpcMethodPrebuffered for FileSize { + const METHOD_ID: u64 = rpc_method_id!("file_size"); + + type Input = FileSizeRequestParams; + type Output = FileSizeResponseParams; + + fn encode_request(request_params: Self::Input) -> Result, io::Error> { + Ok(bitcode::encode(&request_params)) + } + + fn decode_request(bytes: &[u8]) -> Result { + let request_params = bitcode::decode::(bytes) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + + Ok(request_params) + } + + fn encode_response(response_params: Self::Output) -> Result, io::Error> { + Ok(bitcode::encode(&response_params)) + } + + fn decode_response(response_bytes: &[u8]) -> Result { + let response_params = bitcode::decode::(response_bytes) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + + Ok(response_params) + } +} diff --git a/experiments/simd-r-drive-muxio-service-definition/src/prebuffered/is_empty.rs b/experiments/simd-r-drive-muxio-service-definition/src/prebuffered/is_empty.rs new file mode 100644 index 00000000..94597960 --- /dev/null +++ b/experiments/simd-r-drive-muxio-service-definition/src/prebuffered/is_empty.rs @@ -0,0 +1,42 @@ +use bitcode::{Decode, Encode}; +use muxio_rpc_service::{prebuffered::RpcMethodPrebuffered, rpc_method_id}; +use std::io; + +#[derive(Encode, Decode, PartialEq, Debug)] +pub struct IsEmptyRequestParams {} + +#[derive(Encode, Decode, PartialEq, Debug)] +pub struct IsEmptyResponseParams { + pub is_empty: bool, +} + +pub struct IsEmpty; + +impl RpcMethodPrebuffered for IsEmpty { + const METHOD_ID: u64 = rpc_method_id!("is_empty"); + + type Input = IsEmptyRequestParams; + type Output = IsEmptyResponseParams; + + fn encode_request(request_params: Self::Input) -> Result, io::Error> { + Ok(bitcode::encode(&request_params)) + } + + fn decode_request(bytes: &[u8]) -> Result { + let request_params = bitcode::decode::(bytes) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + + Ok(request_params) + } + + fn encode_response(response_params: Self::Output) -> Result, io::Error> { + Ok(bitcode::encode(&response_params)) + } + + fn decode_response(response_bytes: &[u8]) -> Result { + let response_params = bitcode::decode::(response_bytes) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + + Ok(response_params) + } +} diff --git a/experiments/simd-r-drive-muxio-service-definition/src/prebuffered/len.rs b/experiments/simd-r-drive-muxio-service-definition/src/prebuffered/len.rs new file mode 100644 index 00000000..12ef8ab0 --- /dev/null +++ b/experiments/simd-r-drive-muxio-service-definition/src/prebuffered/len.rs @@ -0,0 +1,42 @@ +use bitcode::{Decode, Encode}; +use muxio_rpc_service::{prebuffered::RpcMethodPrebuffered, rpc_method_id}; +use std::io; + +#[derive(Encode, Decode, PartialEq, Debug)] +pub struct LenRequestParams {} + +#[derive(Encode, Decode, PartialEq, Debug)] +pub struct LenResponseParams { + pub total_entries: usize, +} + +pub struct Len; + +impl RpcMethodPrebuffered for Len { + const METHOD_ID: u64 = rpc_method_id!("len"); + + type Input = LenRequestParams; + type Output = LenResponseParams; + + fn encode_request(request_params: Self::Input) -> Result, io::Error> { + Ok(bitcode::encode(&request_params)) + } + + fn decode_request(bytes: &[u8]) -> Result { + let request_params = bitcode::decode::(bytes) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + + Ok(request_params) + } + + fn encode_response(response_params: Self::Output) -> Result, io::Error> { + Ok(bitcode::encode(&response_params)) + } + + fn decode_response(response_bytes: &[u8]) -> Result { + let response_params = bitcode::decode::(response_bytes) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + + Ok(response_params) + } +} diff --git a/experiments/simd-r-drive-muxio-service-definition/src/prebuffered/read.rs b/experiments/simd-r-drive-muxio-service-definition/src/prebuffered/read.rs index feb03bd5..7d8b4655 100644 --- a/experiments/simd-r-drive-muxio-service-definition/src/prebuffered/read.rs +++ b/experiments/simd-r-drive-muxio-service-definition/src/prebuffered/read.rs @@ -9,7 +9,7 @@ pub struct ReadRequestParams { #[derive(Encode, Decode, PartialEq, Debug)] pub struct ReadResponseParams { - pub result: Option>, // TODO: Rename `result` + pub entry_payload: Option>, } pub struct Read; @@ -20,25 +20,25 @@ impl RpcMethodPrebuffered for Read { type Input = ReadRequestParams; type Output = ReadResponseParams; - fn encode_request(read_request_params: ReadRequestParams) -> Result, io::Error> { - Ok(bitcode::encode(&read_request_params)) + fn encode_request(request_params: Self::Input) -> Result, io::Error> { + Ok(bitcode::encode(&request_params)) } fn decode_request(bytes: &[u8]) -> Result { - let req_params = bitcode::decode::(bytes) + let request_params = bitcode::decode::(bytes) .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - Ok(req_params) + Ok(request_params) } - fn encode_response(result: Self::Output) -> Result, io::Error> { - Ok(bitcode::encode(&result)) + fn encode_response(response_params: Self::Output) -> Result, io::Error> { + Ok(bitcode::encode(&response_params)) } - fn decode_response(bytes: &[u8]) -> Result { - let resp_params = bitcode::decode::(bytes) + fn decode_response(response_bytes: &[u8]) -> Result { + let response_params = bitcode::decode::(response_bytes) .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - Ok(resp_params) + Ok(response_params) } } diff --git a/experiments/simd-r-drive-muxio-service-definition/src/prebuffered/write.rs b/experiments/simd-r-drive-muxio-service-definition/src/prebuffered/write.rs index 2ce073c0..b0b3273b 100644 --- a/experiments/simd-r-drive-muxio-service-definition/src/prebuffered/write.rs +++ b/experiments/simd-r-drive-muxio-service-definition/src/prebuffered/write.rs @@ -21,25 +21,25 @@ impl RpcMethodPrebuffered for Write { type Input = WriteRequestParams; type Output = WriteResponseParams; - fn encode_request(write_request_params: WriteRequestParams) -> Result, io::Error> { - Ok(bitcode::encode(&write_request_params)) + fn encode_request(request_params: Self::Input) -> Result, io::Error> { + Ok(bitcode::encode(&request_params)) } fn decode_request(bytes: &[u8]) -> Result { - let req_params = bitcode::decode::(bytes) + let request_params = bitcode::decode::(bytes) .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - Ok(req_params) + Ok(request_params) } - fn encode_response(result: Self::Output) -> Result, io::Error> { - Ok(bitcode::encode(&result)) + fn encode_response(response_params: Self::Output) -> Result, io::Error> { + Ok(bitcode::encode(&response_params)) } - fn decode_response(bytes: &[u8]) -> Result { - let resp_params = bitcode::decode::(bytes) + fn decode_response(response_bytes: &[u8]) -> Result { + let response_params = bitcode::decode::(response_bytes) .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - Ok(resp_params) + Ok(response_params) } } diff --git a/experiments/simd-r-drive-ws-client/README.md b/experiments/simd-r-drive-ws-client/README.md index 066e8941..106464b2 100644 --- a/experiments/simd-r-drive-ws-client/README.md +++ b/experiments/simd-r-drive-ws-client/README.md @@ -2,7 +2,7 @@ Experimental WebSocket client for [SIMD R Drive](https://crates.io/crates/simd-r-drive), a high-performance, zero-copy, append-only single-file-container storage engine for Rust. -**-- Work in progress --** +**Work in progress.** - https://crates.io/crates/simd-r-drive - https://crates.io/crates/muxio diff --git a/experiments/simd-r-drive-ws-client/src/ws_client.rs b/experiments/simd-r-drive-ws-client/src/ws_client.rs index d4ab45ec..4ed62525 100644 --- a/experiments/simd-r-drive-ws-client/src/ws_client.rs +++ b/experiments/simd-r-drive-ws-client/src/ws_client.rs @@ -5,8 +5,9 @@ use simd_r_drive::{ traits::{AsyncDataStoreReader, AsyncDataStoreWriter}, }; use simd_r_drive_muxio_service_definition::prebuffered::{ - BatchRead, BatchReadRequestParams, BatchWrite, BatchWriteRequestParams, Read, - ReadRequestParams, Write, WriteRequestParams, + BatchRead, BatchReadRequestParams, BatchWrite, BatchWriteRequestParams, Delete, + DeleteRequestParams, FileSize, FileSizeRequestParams, IsEmpty, IsEmptyRequestParams, Len, + LenRequestParams, Read, ReadRequestParams, Write, WriteRequestParams, }; use std::io::Result; @@ -38,7 +39,7 @@ impl AsyncDataStoreWriter for WsClient { } async fn write(&self, key: &[u8], payload: &[u8]) -> Result { - let resp = Write::call( + let response_params = Write::call( &self.rpc_client, WriteRequestParams { key: key.to_vec(), @@ -47,11 +48,11 @@ impl AsyncDataStoreWriter for WsClient { ) .await?; - Ok(resp.tail_offset) + Ok(response_params.tail_offset) } async fn batch_write(&self, entries: &[(&[u8], &[u8])]) -> Result { - let resp = BatchWrite::call( + let response_params = BatchWrite::call( &self.rpc_client, BatchWriteRequestParams { entries: entries @@ -62,35 +63,39 @@ impl AsyncDataStoreWriter for WsClient { ) .await?; - Ok(resp.tail_offset) + Ok(response_params.tail_offset) } - async fn rename_entry(&self, _old_key: &[u8], _new_key: &[u8]) -> Result { - unimplemented!("`rename_entry` is not currently implemented"); + async fn rename(&self, _old_key: &[u8], _new_key: &[u8]) -> Result { + unimplemented!("`rename` is not currently implemented"); } - async fn copy_entry(&self, _key: &[u8], _target: &DataStore) -> Result { - unimplemented!("`copy_entry` is not currently implemented"); + async fn copy(&self, _key: &[u8], _target: &DataStore) -> Result { + unimplemented!("`copy` is not currently implemented"); } - async fn move_entry(&self, _key: &[u8], _target: &DataStore) -> Result { - unimplemented!("`move_entry` is not currently implemented"); + async fn transfer(&self, _key: &[u8], _target: &DataStore) -> Result { + unimplemented!("`transfer` is not currently implemented"); } - async fn delete_entry(&self, _key: &[u8]) -> Result { - unimplemented!("`delete_entry` is not currently implemented"); + async fn delete(&self, key: &[u8]) -> Result { + let resp = + Delete::call(&self.rpc_client, DeleteRequestParams { key: key.to_vec() }).await?; + + Ok(resp.tail_offset) } } #[async_trait::async_trait] impl AsyncDataStoreReader for WsClient { - // TODO: This is a workaround until properly implementing a stream-able handle + // FIXME: This is a workaround until properly implementing a stream-able handle type EntryHandleType = Vec; async fn read(&self, key: &[u8]) -> Result> { - let resp = Read::call(&self.rpc_client, ReadRequestParams { key: key.to_vec() }).await?; + let response_params = + Read::call(&self.rpc_client, ReadRequestParams { key: key.to_vec() }).await?; - Ok(resp.result) + Ok(response_params.entry_payload) } async fn read_last_entry(&self) -> Result> { @@ -106,18 +111,28 @@ impl AsyncDataStoreReader for WsClient { ) .await?; - Ok(batch_read_result.entries) + Ok(batch_read_result.entries_payloads) } async fn read_metadata(&self, _key: &[u8]) -> Result> { unimplemented!("`read_metadata` is not currently implemented"); } - async fn count(&self) -> Result { - unimplemented!("`count` is not currently implemented"); + async fn len(&self) -> Result { + let response_params = Len::call(&self.rpc_client, LenRequestParams {}).await?; + + Ok(response_params.total_entries) } - async fn get_storage_size(&self) -> Result { - unimplemented!("`get_storage_size` is not currently implemented"); + async fn is_empty(&self) -> Result { + let response_params = IsEmpty::call(&self.rpc_client, IsEmptyRequestParams {}).await?; + + Ok(response_params.is_empty) + } + + async fn file_size(&self) -> Result { + let response_params = FileSize::call(&self.rpc_client, FileSizeRequestParams {}).await?; + + Ok(response_params.file_size) } } diff --git a/experiments/simd-r-drive-ws-server/README.md b/experiments/simd-r-drive-ws-server/README.md index ddcb275b..3b0b9fea 100644 --- a/experiments/simd-r-drive-ws-server/README.md +++ b/experiments/simd-r-drive-ws-server/README.md @@ -2,7 +2,7 @@ Experimental WebSocket server for [SIMD R Drive](https://crates.io/crates/simd-r-drive), a high-performance, zero-copy, append-only single-file-container storage engine for Rust. -**-- Work in progress --** +**Work in progress.** - https://crates.io/crates/simd-r-drive - https://crates.io/crates/muxio diff --git a/experiments/simd-r-drive-ws-server/src/main.rs b/experiments/simd-r-drive-ws-server/src/main.rs index 63aecdf8..87b71fb0 100644 --- a/experiments/simd-r-drive-ws-server/src/main.rs +++ b/experiments/simd-r-drive-ws-server/src/main.rs @@ -12,8 +12,9 @@ use simd_r_drive::{ }; use simd_r_drive_muxio_service_definition::prebuffered::{ - BatchRead, BatchReadResponseParams, BatchWrite, BatchWriteResponseParams, Read, - ReadResponseParams, Write, WriteResponseParams, + BatchRead, BatchReadResponseParams, BatchWrite, BatchWriteResponseParams, Delete, + DeleteResponseParams, FileSize, FileSizeResponseParams, IsEmpty, IsEmptyResponseParams, Len, + LenResponseParams, Read, ReadResponseParams, Write, WriteResponseParams, }; mod cli; use crate::cli::Cli; @@ -40,10 +41,15 @@ async fn main() -> std::io::Result<()> { let rpc_server = RpcServer::new(); let endpoint = rpc_server.endpoint(); + // TODO: Rename with consistency; `delete_store` sounds like it deletes the entire store let write_store = Arc::clone(&store); let batch_write_store = Arc::clone(&store); let read_store = Arc::clone(&store); let batch_read_store = Arc::clone(&store); + let delete_store = Arc::clone(&store); + let len_store = Arc::clone(&store); + let is_empty_store = Arc::clone(&store); + let file_size_store = Arc::clone(&store); let _ = join!( endpoint.register_prebuffered(Write::METHOD_ID, { @@ -52,17 +58,11 @@ async fn main() -> std::io::Result<()> { async move { let resp = task::spawn_blocking(move || { let params = Write::decode_request(&bytes)?; - - // Acquire exclusive write lock. - // - // This blocks all concurrent readers and writers - // until the mutation is complete. - // - // Tokio's blocking_write ensures the thread isn't stalled. let store = store_mutex.blocking_write(); let tail_offset = store.write(¶ms.key, ¶ms.payload)?; - let resp = Write::encode_response(WriteResponseParams { tail_offset })?; - Ok::<_, Box>(resp) + let response_bytes = + Write::encode_response(WriteResponseParams { tail_offset })?; + Ok::<_, Box>(response_bytes) }) .await .map_err(|e| std::io::Error::other(format!("write task: {e}")))??; @@ -75,22 +75,17 @@ async fn main() -> std::io::Result<()> { let store_mutex = Arc::clone(&batch_write_store); async move { let resp = task::spawn_blocking(move || { - let req = BatchWrite::decode_request(&bytes)?; - - // Acquire exclusive lock for batch write. - // - // Like Write, this prevents all concurrent access - // while the batch mutation occurs. + let params = BatchWrite::decode_request(&bytes)?; let store = store_mutex.blocking_write(); - let borrowed_entries: Vec<(&[u8], &[u8])> = req + let borrowed_entries: Vec<(&[u8], &[u8])> = params .entries .iter() .map(|(k, v)| (k.as_slice(), v.as_slice())) .collect(); let tail_offset = store.batch_write(&borrowed_entries)?; - let resp = + let response_bytes = BatchWrite::encode_response(BatchWriteResponseParams { tail_offset })?; - Ok::<_, Box>(resp) + Ok::<_, Box>(response_bytes) }) .await .map_err(|e| std::io::Error::other(format!("batch task: {e}")))??; @@ -103,23 +98,14 @@ async fn main() -> std::io::Result<()> { let store_mutex = Arc::clone(&read_store); async move { let resp = task::spawn_blocking(move || { - let req = Read::decode_request(&bytes)?; - - // Acquire shared read lock. - // - // This allows multiple concurrent readers to access - // the store at the same time *as long as no writer holds the lock*. - // - // We extract the data into memory immediately, - // and then drop the read lock to maximize concurrency. + let params = Read::decode_request(&bytes)?; let store = store_mutex.blocking_read(); - let result_data = store - .read(&req.key)? + let entry_payload = store + .read(¶ms.key)? .map(|handle| handle.as_slice().to_vec()); - let resp = Read::encode_response(ReadResponseParams { - result: result_data, - })?; - Ok::<_, Box>(resp) + let response_bytes = + Read::encode_response(ReadResponseParams { entry_payload })?; + Ok::<_, Box>(response_bytes) }) .await .map_err(|e| std::io::Error::other(format!("read task: {e}")))??; @@ -132,39 +118,23 @@ async fn main() -> std::io::Result<()> { let store_mutex = Arc::clone(&batch_read_store); async move { let resp = task::spawn_blocking(move || { - // ── 1. Decode the RPC frame ────────────────────────────────────── - let req = BatchRead::decode_request(&bytes)?; - - // ── 2. Pull a shared read-lock on the store ───────────────────── - // - // • Many BatchReads can run in parallel because this is a - // `blocking_read()`. - // • The lock is held only long enough to obtain the - // zero-copy EntryHandles; we drop it immediately after. - // + let params = BatchRead::decode_request(&bytes)?; let store_guard = store_mutex.blocking_read(); - - // Convert Vec> → Vec<&[u8]> (what `batch_read` wants) - let key_refs: Vec<&[u8]> = req.keys.iter().map(|k| k.as_slice()).collect(); - + let key_refs: Vec<&[u8]> = + params.keys.iter().map(|k| k.as_slice()).collect(); let handles = store_guard.batch_read(&key_refs)?; drop(store_guard); // free the lock ASAP - // ── 3. Copy the payloads out (still zero-copy *inside* the guard) ─ - // - // Each handle is turned into Option> so the resulting - // response owns its bytes and is independent of the mmap. - // - let entries: Vec>> = handles + let entries_payloads: Vec>> = handles .into_iter() .map(|opt| opt.map(|h| h.as_slice().to_vec())) .collect(); + let response_bytes = BatchRead::encode_response(BatchReadResponseParams { + entries_payloads, + })?; - // ── 4. Marshal the response frame ─────────────────────────────── - let resp = BatchRead::encode_response(BatchReadResponseParams { entries })?; - - Ok::<_, Box>(resp) + Ok::<_, Box>(response_bytes) }) .await .map_err(|e| std::io::Error::other(format!("batch read task: {e}")))??; @@ -173,6 +143,75 @@ async fn main() -> std::io::Result<()> { } } }), + endpoint.register_prebuffered(Delete::METHOD_ID, { + move |_, bytes: Vec| { + let store_mutex = Arc::clone(&delete_store); + async move { + let resp = task::spawn_blocking(move || { + let params = Delete::decode_request(&bytes)?; + let store = store_mutex.blocking_write(); + let tail_offset = store.delete(¶ms.key)?; + let response_bytes = + Delete::encode_response(DeleteResponseParams { tail_offset })?; + Ok::<_, Box>(response_bytes) + }) + .await + .map_err(|e| std::io::Error::other(format!("write task: {e}")))??; + Ok(resp) + } + } + }), + endpoint.register_prebuffered(Len::METHOD_ID, { + move |_, _bytes: Vec| { + let store_mutex = Arc::clone(&len_store); + async move { + let resp = task::spawn_blocking(move || { + let store = store_mutex.blocking_read(); + let total_entries = store.len()?; + let response_bytes = + Len::encode_response(LenResponseParams { total_entries })?; + Ok::<_, Box>(response_bytes) + }) + .await + .map_err(|e| std::io::Error::other(format!("write task: {e}")))??; + Ok(resp) + } + } + }), + endpoint.register_prebuffered(IsEmpty::METHOD_ID, { + move |_, _bytes: Vec| { + let store_mutex = Arc::clone(&is_empty_store); + async move { + let resp = task::spawn_blocking(move || { + let store = store_mutex.blocking_read(); + let is_empty = store.is_empty()?; + let response_bytes = + IsEmpty::encode_response(IsEmptyResponseParams { is_empty })?; + Ok::<_, Box>(response_bytes) + }) + .await + .map_err(|e| std::io::Error::other(format!("write task: {e}")))??; + Ok(resp) + } + } + }), + endpoint.register_prebuffered(FileSize::METHOD_ID, { + move |_, _bytes: Vec| { + let store_mutex = Arc::clone(&file_size_store); + async move { + let resp = task::spawn_blocking(move || { + let store = store_mutex.blocking_read(); + let file_size = store.file_size()?; + let response_bytes = + FileSize::encode_response(FileSizeResponseParams { file_size })?; + Ok::<_, Box>(response_bytes) + }) + .await + .map_err(|e| std::io::Error::other(format!("write task: {e}")))??; + Ok(resp) + } + } + }), ); rpc_server diff --git a/extensions/src/storage_cache_ext.rs b/extensions/src/storage_cache_ext.rs index 2a01a043..8815fae9 100644 --- a/extensions/src/storage_cache_ext.rs +++ b/extensions/src/storage_cache_ext.rs @@ -100,7 +100,7 @@ impl StorageCacheExt for DataStore { .as_secs(); if now >= expiration_timestamp { - self.delete_entry(&namespaced_key).ok(); // Remove expired entry + self.delete(&namespaced_key).ok(); // Remove expired entry return Ok(None); } diff --git a/src/cli/execute_command.rs b/src/cli/execute_command.rs index c8617abd..1cd17c4a 100644 --- a/src/cli/execute_command.rs +++ b/src/cli/execute_command.rs @@ -117,7 +117,7 @@ pub fn execute_command(cli: &Cli) { let target_storage = DataStore::open(target).expect("Failed to open target storage"); source_storage - .copy_entry(key.as_bytes(), &target_storage) + .copy(key.as_bytes(), &target_storage) .map_err(|err| { eprintln!("Could not copy entry. Received error: {err}"); std::process::exit(1); @@ -134,7 +134,7 @@ pub fn execute_command(cli: &Cli) { let target_storage = DataStore::open(target).expect("Failed to open target storage"); source_storage - .move_entry(key.as_bytes(), &target_storage) + .transfer(key.as_bytes(), &target_storage) .map_err(|err| { eprintln!("Could not copy entry. Received error: {err}"); std::process::exit(1); @@ -149,7 +149,7 @@ pub fn execute_command(cli: &Cli) { DataStore::open_existing(&cli.storage).expect("Failed to open source storage"); storage - .rename_entry(old_key.as_bytes(), new_key.as_bytes()) + .rename(old_key.as_bytes(), new_key.as_bytes()) .map_err(|err| { eprintln!("Could not rename entry. Received error: {err}"); std::process::exit(1); @@ -163,7 +163,7 @@ pub fn execute_command(cli: &Cli) { let storage = DataStore::open_existing(&cli.storage).expect("Failed to open storage"); storage - .delete_entry(key.as_bytes()) + .delete(key.as_bytes()) .expect("Failed to delete entry"); eprintln!("Deleted key '{key}'"); } @@ -192,7 +192,7 @@ pub fn execute_command(cli: &Cli) { println!( "{:<25} {} bytes", "TOTAL SIZE (W/ METADATA):", - entry.size_with_metadata() + entry.file_size() ); println!("{:<25} {:?}", "OFFSET RANGE:", entry.offset_range()); println!("{:<25} {:?}", "MEMORY ADDRESS:", entry.address_range()); @@ -228,13 +228,13 @@ pub fn execute_command(cli: &Cli) { let storage = DataStore::open_existing(&cli.storage).expect("Failed to open storage"); // Retrieve storage file size - let storage_size = storage.get_storage_size().unwrap_or(0); + let storage_size = storage.file_size().unwrap_or(0); // Get compaction savings estimate let savings_estimate = storage.estimate_compaction_savings(); // Count active entries - let entry_count = storage.count(); + let entry_count = storage.len(); println!("\n{:=^50}", " STORAGE INFO "); println!("{:<25} {:?}", "STORAGE FILE:", cli.storage); diff --git a/src/lib.rs b/src/lib.rs index a5fb325f..070516f5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -57,7 +57,7 @@ //! assert_eq!(entry_handle.as_slice(), b"A new value"); //! //! // Delete an entry -//! storage.delete_entry(b"key3").unwrap(); +//! storage.delete(b"key3").unwrap(); //! let entry_handle = storage.read(b"key3").unwrap(); //! assert!(entry_handle.is_none()); //! ``` diff --git a/src/storage_engine/data_store.rs b/src/storage_engine/data_store.rs index 78d599d8..cba6ce6f 100644 --- a/src/storage_engine/data_store.rs +++ b/src/storage_engine/data_store.rs @@ -189,6 +189,7 @@ impl DataStore { write_guard: &std::sync::RwLockWriteGuard<'_, BufWriter>, key_hash_offsets: &[(u64, u64)], tail_offset: u64, + deleted_keys: Option<&HashSet>, ) -> std::io::Result<()> { let new_mmap = Self::init_mmap(write_guard)?; let mut mmap_guard = self.mmap.lock().unwrap(); @@ -198,7 +199,14 @@ impl DataStore { .map_err(|_| std::io::Error::other("Failed to acquire index lock"))?; for (key_hash, offset) in key_hash_offsets.iter() { - key_indexer_guard.insert(*key_hash, *offset); + if deleted_keys + .as_ref() + .is_some_and(|set| set.contains(key_hash)) + { + key_indexer_guard.remove(key_hash); + } else { + key_indexer_guard.insert(*key_hash, *offset); + } } *mmap_guard = Arc::new(new_mmap); @@ -366,6 +374,7 @@ impl DataStore { &file, &[(key_hash, tail_offset - METADATA_SIZE as u64)], tail_offset, + None, )?; Ok(tail_offset) } @@ -432,12 +441,18 @@ impl DataStore { let mut tail_offset = self.tail_offset.load(Ordering::Acquire); let mut key_hash_offsets: Vec<(u64, u64)> = Vec::with_capacity(hashed_payloads.len()); + let mut deleted_keys: HashSet = HashSet::new(); + for (key_hash, payload) in hashed_payloads { - if !allow_null_bytes && payload == NULL_BYTE { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "NULL-byte payloads cannot be written directly.", - )); + if payload == NULL_BYTE { + if !allow_null_bytes { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "NULL-byte payloads cannot be written directly.", + )); + } + + deleted_keys.insert(key_hash); } if payload.is_empty() { @@ -468,7 +483,7 @@ impl DataStore { file.write_all(&buffer)?; file.flush()?; - self.reindex(&file, &key_hash_offsets, tail_offset)?; + self.reindex(&file, &key_hash_offsets, tail_offset, Some(&deleted_keys))?; Ok(self.tail_offset.load(Ordering::Acquire)) } @@ -540,9 +555,9 @@ impl DataStore { /// - `Err(std::io::Error)`: If a write operation fails. /// /// # Notes: - /// - This is a **low-level function** used by `copy_entry` and related operations. + /// - This is a **low-level function** used by `copy` and related operations. /// - The `entry` remains **unchanged** in the original storage. - fn copy_entry_handle(&self, entry: &EntryHandle, target: &DataStore) -> Result { + fn copy_handle(&self, entry: &EntryHandle, target: &DataStore) -> Result { let mut entry_stream = EntryStream::from(entry.clone_arc()); target.write_stream_with_key_hash(entry.key_hash(), &mut entry_stream) } @@ -578,13 +593,13 @@ impl DataStore { let mut compacted_data_size: u64 = 0; for entry in self.iter_entries() { - let new_tail_offset = self.copy_entry_handle(&entry, &compacted_storage)?; + let new_tail_offset = self.copy_handle(&entry, &compacted_storage)?; let stored_metadata_offset = new_tail_offset - METADATA_SIZE as u64; index_pairs.push((entry.key_hash(), stored_metadata_offset)); - compacted_data_size += entry.size_with_metadata() as u64; + compacted_data_size += entry.file_size() as u64; } - let size_before = self.get_storage_size()?; + let size_before = self.file_size()?; // Note: The current implementation should never increase space, but if an additional indexer // is ever used, this may change. @@ -624,13 +639,13 @@ impl DataStore { /// - Ignores older versions of keys to estimate the **optimized** storage footprint. /// - Returns the **difference** between the total file size and the estimated compacted size. pub fn estimate_compaction_savings(&self) -> u64 { - let total_size = self.get_storage_size().unwrap_or(0); + let total_size = self.file_size().unwrap_or(0); let mut unique_entry_size: u64 = 0; let mut seen_keys = HashSet::with_hasher(Xxh3BuildHasher); for entry in self.iter_entries() { if seen_keys.insert(entry.key_hash()) { - unique_entry_size += entry.size_with_metadata() as u64; + unique_entry_size += entry.file_size() as u64; } } total_size.saturating_sub(unique_entry_size) @@ -693,7 +708,7 @@ impl DataStoreWriter for DataStore { self.batch_write_hashed_payloads(hashed_entries, false) } - fn rename_entry(&self, old_key: &[u8], new_key: &[u8]) -> Result { + fn rename(&self, old_key: &[u8], new_key: &[u8]) -> Result { if old_key == new_key { return Err(std::io::Error::new( std::io::ErrorKind::InvalidInput, @@ -708,16 +723,16 @@ impl DataStoreWriter for DataStore { self.write_stream(new_key, &mut old_entry_stream)?; - let new_offset = self.delete_entry(old_key)?; + let new_offset = self.delete(old_key)?; Ok(new_offset) } - fn copy_entry(&self, key: &[u8], target: &DataStore) -> Result { + fn copy(&self, key: &[u8], target: &DataStore) -> Result { if self.path == target.path { return Err(std::io::Error::new( std::io::ErrorKind::InvalidInput, format!( - "Cannot copy entry to the same storage ({:?}). Use `rename_entry` instead.", + "Cannot copy entry to the same storage ({:?}). Use `rename` instead.", self.path ), )); @@ -729,15 +744,15 @@ impl DataStoreWriter for DataStore { format!("Key not found: {:?}", String::from_utf8_lossy(key)), ) })?; - self.copy_entry_handle(&entry_handle, target) + self.copy_handle(&entry_handle, target) } - fn move_entry(&self, key: &[u8], target: &DataStore) -> Result { - self.copy_entry(key, target)?; - self.delete_entry(key) + fn transfer(&self, key: &[u8], target: &DataStore) -> Result { + self.copy(key, target)?; + self.delete(key) } - fn delete_entry(&self, key: &[u8]) -> Result { + fn delete(&self, key: &[u8]) -> Result { let key_hash = compute_hash(key); self.batch_write_hashed_payloads(vec![(key_hash, &NULL_BYTE)], true) } @@ -816,7 +831,7 @@ impl DataStoreReader for DataStore { let key_indexer_guard = self .key_indexer .read() - .map_err(|_| Error::other("Key-index lock poisoned during batch_read"))?; + .map_err(|_| Error::other("Key-index lock poisoned during `batch_read`"))?; let hashes = compute_hash_batch(keys); let results = hashes @@ -853,11 +868,22 @@ impl DataStoreReader for DataStore { Ok(self.read(key)?.map(|entry| entry.metadata().clone())) } - fn count(&self) -> Result { - Ok(self.iter_entries().count()) + fn len(&self) -> Result { + let read_guard = self + .key_indexer + .read() + .map_err(|_| Error::other("Key-index lock poisoned during `len`"))?; + + Ok(read_guard.len()) + } + + fn is_empty(&self) -> Result { + let len = self.len()?; + + Ok(len == 0) } - fn get_storage_size(&self) -> Result { + fn file_size(&self) -> Result { std::fs::metadata(&self.path).map(|meta| meta.len()) } } diff --git a/src/storage_engine/entry_handle.rs b/src/storage_engine/entry_handle.rs index ba8f6669..78144198 100644 --- a/src/storage_engine/entry_handle.rs +++ b/src/storage_engine/entry_handle.rs @@ -133,7 +133,7 @@ impl EntryHandle { /// /// # Returns /// - The size of the payload plus metadata in bytes. - pub fn size_with_metadata(&self) -> usize { + pub fn file_size(&self) -> usize { self.range.len() + METADATA_SIZE } diff --git a/src/storage_engine/key_indexer.rs b/src/storage_engine/key_indexer.rs index 87a6f39e..c67865a7 100644 --- a/src/storage_engine/key_indexer.rs +++ b/src/storage_engine/key_indexer.rs @@ -65,4 +65,19 @@ impl KeyIndexer { pub fn get(&self, key_hash: &u64) -> Option<&u64> { self.index.get(key_hash) } + + #[inline] + pub fn remove(&mut self, key_hash: &u64) -> Option { + self.index.remove(key_hash) + } + + #[inline] + pub fn len(&self) -> usize { + self.index.len() + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.index.is_empty() + } } diff --git a/src/storage_engine/traits/reader.rs b/src/storage_engine/traits/reader.rs index 4b3003a4..bfea52a0 100644 --- a/src/storage_engine/traits/reader.rs +++ b/src/storage_engine/traits/reader.rs @@ -67,14 +67,21 @@ pub trait DataStoreReader { /// # Returns: /// - `Ok(active_count)`: Total active entries. /// - `Err(std::io::Error)`: On I/O failure. - fn count(&self) -> Result; + fn len(&self) -> Result; + + /// Determines if the store is empty or has no active keys. + /// + /// # Returns: + /// - `Ok(bool)`: Whether or not the store has any active keys. + /// - `Err(std::io::Error)`: On I/O failure. + fn is_empty(&self) -> Result; /// Returns the current file size on disk (including those of deleted entries). /// /// # Returns: /// - `Ok(bytes)`: File size in bytes. /// - `Err(std::io::Error)`: On I/O failure. - fn get_storage_size(&self) -> Result; + fn file_size(&self) -> Result; } #[async_trait::async_trait] @@ -144,12 +151,19 @@ pub trait AsyncDataStoreReader { /// # Returns: /// - `Ok(active_count)`: Total active entries. /// - `Err(std::io::Error)`: On I/O failure. - async fn count(&self) -> Result; + async fn len(&self) -> Result; + + /// Determines if the store is empty or has no active keys. + /// + /// # Returns: + /// - `Ok(bool)`: Whether or not the store has any active keys. + /// - `Err(std::io::Error)`: On I/O failure. + async fn is_empty(&self) -> Result; /// Returns the current file size on disk (including those of deleted entries). /// /// # Returns: /// - `Ok(bytes)`: File size in bytes. /// - `Err(std::io::Error)`: On I/O failure. - async fn get_storage_size(&self) -> Result; + async fn file_size(&self) -> Result; } diff --git a/src/storage_engine/traits/writer.rs b/src/storage_engine/traits/writer.rs index c6a396a5..59e4cb0a 100644 --- a/src/storage_engine/traits/writer.rs +++ b/src/storage_engine/traits/writer.rs @@ -70,7 +70,6 @@ pub trait DataStoreWriter { /// - If the key hashes are already computed, use `batch_write_hashed_payloads()`. fn batch_write(&self, entries: &[(&[u8], &[u8])]) -> Result; - // TODO: Rename to `rename` /// Renames an existing entry by copying it under a new key and marking the old key as deleted. /// /// This function: @@ -90,9 +89,8 @@ pub trait DataStoreWriter { /// - This operation **does not modify** the original entry but instead appends a new copy. /// - The old key is **logically deleted** via an append-only tombstone. /// - Attempting to rename a key to itself will return an error. - fn rename_entry(&self, old_key: &[u8], new_key: &[u8]) -> Result; + fn rename(&self, old_key: &[u8], new_key: &[u8]) -> Result; - // TODO: Rename to `copy` /// Copies an entry to a **different storage container**. /// /// This function: @@ -109,11 +107,10 @@ pub trait DataStoreWriter { /// or if attempting to copy to the same storage. /// /// # Notes: - /// - Copying within the **same** storage is unnecessary; use `rename_entry` instead. + /// - Copying within the **same** storage is unnecessary; use `rename` instead. /// - This operation does **not** delete the original entry. - fn copy_entry(&self, key: &[u8], target: &DataStore) -> Result; + fn copy(&self, key: &[u8], target: &DataStore) -> Result; - // TODO: Rename to `move` /// Moves an entry from the current storage to a **different storage container**. /// /// This function: @@ -129,12 +126,11 @@ pub trait DataStoreWriter { /// - `Err(std::io::Error)`: If the key is not found, or if the copy/delete operation fails. /// /// # Notes: - /// - Moving an entry within the **same** storage is unnecessary; use `rename_entry` instead. + /// - Moving an entry within the **same** storage is unnecessary; use `rename` instead. /// - The original entry is **logically deleted** by appending a tombstone, maintaining /// the append-only structure. - fn move_entry(&self, key: &[u8], target: &DataStore) -> Result; + fn transfer(&self, key: &[u8], target: &DataStore) -> Result; - // TODO: Rename to `delete` /// Deletes a key by appending a **null byte marker**. /// /// The storage engine is **append-only**, so keys cannot be removed directly. @@ -145,7 +141,7 @@ pub trait DataStoreWriter { /// /// # Returns: /// - The **new file offset** where the delete marker was appended. - fn delete_entry(&self, key: &[u8]) -> Result; + fn delete(&self, key: &[u8]) -> Result; } #[async_trait::async_trait] @@ -218,7 +214,6 @@ pub trait AsyncDataStoreWriter { /// - If the key hashes are already computed, use `batch_write_hashed_payloads()`. async fn batch_write(&self, entries: &[(&[u8], &[u8])]) -> Result; - // TODO: Rename to `rename` /// Renames an existing entry by copying it under a new key and marking the old key as deleted. /// /// This function: @@ -238,9 +233,8 @@ pub trait AsyncDataStoreWriter { /// - This operation **does not modify** the original entry but instead appends a new copy. /// - The old key is **logically deleted** via an append-only tombstone. /// - Attempting to rename a key to itself will return an error. - async fn rename_entry(&self, old_key: &[u8], new_key: &[u8]) -> Result; + async fn rename(&self, old_key: &[u8], new_key: &[u8]) -> Result; - // TODO: Rename to `copy` /// Copies an entry to a **different storage container**. /// /// This function: @@ -257,11 +251,10 @@ pub trait AsyncDataStoreWriter { /// or if attempting to copy to the same storage. /// /// # Notes: - /// - Copying within the **same** storage is unnecessary; use `rename_entry` instead. + /// - Copying within the **same** storage is unnecessary; use `rename` instead. /// - This operation does **not** delete the original entry. - async fn copy_entry(&self, key: &[u8], target: &DataStore) -> Result; + async fn copy(&self, key: &[u8], target: &DataStore) -> Result; - // TODO: Rename to `move` /// Moves an entry from the current storage to a **different storage container**. /// /// This function: @@ -277,12 +270,11 @@ pub trait AsyncDataStoreWriter { /// - `Err(std::io::Error)`: If the key is not found, or if the copy/delete operation fails. /// /// # Notes: - /// - Moving an entry within the **same** storage is unnecessary; use `rename_entry` instead. + /// - Moving an entry within the **same** storage is unnecessary; use `rename` instead. /// - The original entry is **logically deleted** by appending a tombstone, maintaining /// the append-only structure. - async fn move_entry(&self, key: &[u8], target: &DataStore) -> Result; + async fn transfer(&self, key: &[u8], target: &DataStore) -> Result; - // TODO: Rename to `delete` /// Deletes a key by appending a **null byte marker**. /// /// The storage engine is **append-only**, so keys cannot be removed directly. @@ -293,5 +285,5 @@ pub trait AsyncDataStoreWriter { /// /// # Returns: /// - The **new file offset** where the delete marker was appended. - async fn delete_entry(&self, key: &[u8]) -> Result; + async fn delete(&self, key: &[u8]) -> Result; } diff --git a/tests/basic_operations_tests.rs b/tests/basic_operations_tests.rs index e2bc271c..9a733ed0 100644 --- a/tests/basic_operations_tests.rs +++ b/tests/basic_operations_tests.rs @@ -16,6 +16,19 @@ mod tests { (dir, storage) } + #[test] + fn test_emptiness_check() { + let (_dir, storage) = create_temp_storage(); + + assert!(storage.is_empty().unwrap()); + + let key = b"test_key".as_slice(); + let payload = b"Hello, world!".as_slice(); + storage.write(key, payload).expect("Failed to append entry"); + + assert!(!storage.is_empty().unwrap()); + } + #[test] fn test_append_and_read_last_entry() { let (_dir, storage) = create_temp_storage(); diff --git a/tests/batch_ops_tests.rs b/tests/batch_ops_tests.rs index 62e81649..425d5b22 100644 --- a/tests/batch_ops_tests.rs +++ b/tests/batch_ops_tests.rs @@ -120,7 +120,7 @@ fn test_batch_write_rejects_empty_payload_among_many() { assert!(storage.read(b"k1").unwrap().is_none()); assert!(storage.read(b"k2").unwrap().is_none()); assert_eq!( - storage.count().unwrap(), + storage.len().unwrap(), 0, "no entries should have been written" ); diff --git a/tests/compaction_tests.rs b/tests/compaction_tests.rs index d97d372d..6f020224 100644 --- a/tests/compaction_tests.rs +++ b/tests/compaction_tests.rs @@ -115,7 +115,7 @@ mod tests { Some(&temp_payload2[..]) ); - storage.delete_entry(key7).unwrap(); + storage.delete(key7).unwrap(); assert_eq!(storage.read(key7).unwrap().as_deref(), None); diff --git a/tests/storage_operation_tests.rs b/tests/storage_operation_tests.rs index 6c1c28ba..3ddc1ae0 100644 --- a/tests/storage_operation_tests.rs +++ b/tests/storage_operation_tests.rs @@ -31,7 +31,7 @@ mod tests { // Step 2: Copy the entry to the target storage source_storage - .copy_entry(key, &target_storage) + .copy(key, &target_storage) .expect("Failed to copy entry"); // Step 3: Ensure the original entry still exists in the source @@ -74,7 +74,7 @@ mod tests { } #[test] - fn test_copy_entry_to_self_fails() { + fn test_copy_to_self_fails() { let (_dir, storage) = create_temp_storage(); let key = b"self_copy_key"; @@ -84,7 +84,7 @@ mod tests { storage.write(key, payload).expect("Failed to append entry"); // Step 2: Attempt to copy the entry to the same storage - let result = storage.copy_entry(key, &storage); + let result = storage.copy(key, &storage); // Step 3: Ensure the operation fails with the expected error assert!( @@ -100,7 +100,7 @@ mod tests { } #[test] - fn test_move_entry_between_storages() { + fn test_transfer_entry_between_storages() { let (_dir1, source_storage) = create_temp_storage(); let (_dir2, target_storage) = create_temp_storage(); @@ -114,7 +114,7 @@ mod tests { // Step 2: Move the entry to the target storage source_storage - .move_entry(key, &target_storage) + .transfer(key, &target_storage) .expect("Failed to move entry"); // Step 3: Ensure the original entry no longer exists in the source @@ -200,15 +200,15 @@ mod tests { Some(updated_payload2) ); - let count_before_delete = storage.count().unwrap(); + let count_before_delete = storage.len().unwrap(); assert_eq!(count_before_delete, 2); // Delete entry for key1 - storage.delete_entry(key1).expect("Failed to delete entry"); + storage.delete(key1).expect("Failed to delete entry"); // Verify count is reduced - let count_after_delete = storage.count().unwrap(); + let count_after_delete = storage.len().unwrap(); assert_eq!( count_after_delete, count_before_delete - 1, @@ -244,7 +244,7 @@ mod tests { } #[test] - fn test_rename_entry() { + fn test_rename() { let (_dir, storage) = create_temp_storage(); let old_key = b"old_key"; @@ -258,7 +258,7 @@ mod tests { // Step 2: Rename the entry storage - .rename_entry(old_key, new_key) + .rename(old_key, new_key) .expect("Failed to rename entry"); // Step 3: Ensure the new key exists and has the same data @@ -290,7 +290,7 @@ mod tests { storage.write(key, payload).expect("Failed to append entry"); // Step 2: Attempt to rename the key to itself - let result = storage.rename_entry(key, key); + let result = storage.rename(key, key); // Step 3: Ensure the operation fails with the expected error assert!(