Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions experiments/bindings/python-ws-client/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,5 @@ dev = [
"puccinialin>=0.1.5",
"pytest>=8.4.1",
"pytest-benchmark>=5.1.0",
"pytest-order>=1.3.0",
]
Original file line number Diff line number Diff line change
Expand Up @@ -77,20 +77,18 @@ class DataStoreWsClient:
key is not present.
"""
...
#
#
# TODO: Integrate
# def delete(self, key: bytes) -> None:
# """
# Marks the key as deleted (logically removes it).

# This operation does not physically remove the data but appends a tombstone
# entry to mark the key as deleted.
def delete(self, key: bytes) -> None:
"""
Marks the key as deleted (logically removes it).

# Args:
# key (bytes): The key to mark as deleted.
# """
# ...
This operation does not physically remove the data but appends a tombstone
entry to mark the key as deleted.

Args:
key (bytes): The key to mark as deleted.
"""
...

# def exists(self, key: bytes) -> bool:
# """
Expand Down Expand Up @@ -120,6 +118,33 @@ class DataStoreWsClient:
# """
# return self.exists(key)

def __len__(self) -> int:
"""
Returns the total number of active entries in the store.

Returns:
int: The total number of active entries in the store.
"""
...

def is_empty(self) -> bool:
"""
Determines if the store is empty or has no active keys.

Returns:
bool: Whether or not the store has any active keys.
"""
...

def file_size(self) -> int:
"""
Returns the current file size on disk (including those of deleted entries).

Returns:
int: File size in bytes.
"""
...

@final
class NamespaceHasher:
"""
Expand Down
79 changes: 71 additions & 8 deletions experiments/bindings/python-ws-client/src/base_ws_client_py.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use std::time::Duration;
use tokio::runtime::{Builder, Runtime};
use tokio::time::timeout;

// TODO: Move timeout handling into inner client

// TODO: Borrow configuration props from MySQL
// connection_timeout=10, # Timeout for the connection attempt (in seconds)
// read_timeout=30, # Timeout for waiting for response from server (in seconds)
Expand Down Expand Up @@ -64,8 +66,6 @@ impl BaseDataStoreWsClient {
Ok(())
}

// --- All public methods are now blocking again, but with timeouts ---

#[pyo3(name = "write")]
fn py_write(&self, key: Vec<u8>, payload: Vec<u8>) -> PyResult<()> {
self.check_connection()?;
Expand All @@ -76,7 +76,7 @@ impl BaseDataStoreWsClient {
match timeout(Duration::from_secs(30), client.write(&key, &payload)).await {
Ok(Ok(_)) => Ok(()),
Ok(Err(e)) => Err(PyIOError::new_err(e.to_string())),
Err(_) => Err(TimeoutError::new_err("Write operation timed out.")),
Err(_) => Err(TimeoutError::new_err("`write` operation timed out.")),
}
})
}
Expand All @@ -94,7 +94,7 @@ impl BaseDataStoreWsClient {
match timeout(Duration::from_secs(60), client.batch_write(&converted)).await {
Ok(Ok(_)) => Ok(()),
Ok(Err(e)) => Err(PyIOError::new_err(e.to_string())),
Err(_) => Err(TimeoutError::new_err("Batch write operation timed out.")),
Err(_) => Err(TimeoutError::new_err("`batch_write` operation timed out.")),
}
})
}
Expand All @@ -112,9 +112,9 @@ impl BaseDataStoreWsClient {
let maybe_bytes = self.runtime.block_on(async {
// TODO: Don't hardcode timeout
match timeout(Duration::from_secs(30), client.read(&key)).await {
Ok(Ok(data)) => Ok(data),
Ok(Ok(entry_payload)) => Ok(entry_payload),
Ok(Err(e)) => Err(PyIOError::new_err(e.to_string())),
Err(_) => Err(TimeoutError::new_err("Read operation timed out.")),
Err(_) => Err(TimeoutError::new_err("`read` operation timed out.")),
}
})?;

Expand All @@ -130,9 +130,9 @@ impl BaseDataStoreWsClient {
let results = self.runtime.block_on(async {
// TODO: Don't hardcode timeout
match timeout(Duration::from_secs(60), client.batch_read(&key_slices)).await {
Ok(Ok(data)) => Ok(data),
Ok(Ok(entries_payloads)) => Ok(entries_payloads),
Ok(Err(e)) => Err(PyIOError::new_err(e.to_string())),
Err(_) => Err(TimeoutError::new_err("Batch read operation timed out.")),
Err(_) => Err(TimeoutError::new_err("`batch_read` operation timed out.")),
}
})?;

Expand All @@ -143,4 +143,67 @@ impl BaseDataStoreWsClient {
.collect())
})
}

#[pyo3(name = "delete")]
fn py_delete(&self, key: Vec<u8>) -> PyResult<()> {
self.check_connection()?;
let client = self.ws_client.clone();

self.runtime.block_on(async {
// TODO: Don't hardcode timeout
match timeout(Duration::from_secs(30), client.delete(&key)).await {
Ok(Ok(_)) => Ok(()),
Ok(Err(e)) => Err(PyIOError::new_err(e.to_string())),
Err(_) => Err(TimeoutError::new_err("`delete` operation timed out.")),
}
})
}

/// Implements the `len()` built-in for Python.
///
/// This allows you to call `len(store)` to get the number of active entries.
/// It assumes the underlying Rust client has a `len()` method.
fn __len__(&self) -> PyResult<usize> {
self.check_connection()?;
let client = self.ws_client.clone();

self.runtime.block_on(async {
// TODO: Don't hardcode timeout
match timeout(Duration::from_secs(30), client.len()).await {
Ok(Ok(total_entries)) => Ok(total_entries),
Ok(Err(e)) => Err(PyIOError::new_err(e.to_string())),
Err(_) => Err(TimeoutError::new_err("`len` operation timed out.")),
}
})
}

#[pyo3(name = "is_empty")]
fn py_is_empty(&self) -> PyResult<(bool)> {
self.check_connection()?;
let client = self.ws_client.clone();

self.runtime.block_on(async {
// TODO: Don't hardcode timeout
match timeout(Duration::from_secs(30), client.is_empty()).await {
Ok(Ok(is_empty)) => Ok(is_empty),
Ok(Err(e)) => Err(PyIOError::new_err(e.to_string())),
Err(_) => Err(TimeoutError::new_err("`is_empty` operation timed out.")),
}
})
}

#[pyo3(name = "file_size")]
fn py_file_size(&self) -> PyResult<(u64)> {
self.check_connection()?;
let client = self.ws_client.clone();

self.runtime.block_on(async {
// TODO: Don't hardcode timeout
match timeout(Duration::from_secs(30), client.file_size()).await {
Ok(Ok(file_size)) => Ok(file_size),
Ok(Err(e)) => Err(PyIOError::new_err(e.to_string())),
Err(_) => Err(TimeoutError::new_err("`file_size` operation timed out.")),
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ def test_entry_accessors():
assert entry.size == len(value)
assert len(entry) == len(value) # Dunder test

assert entry.size_with_metadata > entry.size
assert entry.file_size > entry.size

# Check offset logic
start = entry.start_offset
Expand Down

This file was deleted.

Loading