Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stream method to object store #29

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ from object_store import ObjectStore, ObjectMeta, Path
# data will not be persisted and is not shared across store instances
store = ObjectStore("memory://")

store.put(Path("data"), b"some data")
store.put("data", b"some data")

data = store.get("data")
assert data == b"some data"
Expand All @@ -67,13 +67,13 @@ assert copied == data
#### Async api

```py
from object_store import ObjectStore, ObjectMeta, Path
from object_store import ObjectStore, ObjectMeta

# we use an in-memory store for demonstration purposes.
# data will not be persisted and is not shared across store instances
store = ObjectStore("memory://")

path = Path("data")
path = "data"
await store.put_async(path, b"some data")

data = await store.get_async(path)
Expand All @@ -86,8 +86,8 @@ meta = await store.head_async(path)
range = await store.get_range_async(path, start=0, length=4)
assert range == b"some"

await store.copy_async(Path("data"), Path("copied"))
copied = await store.get_async(Path("copied"))
await store.copy_async("data", "copied")
copied = await store.get_async("copied")
assert copied == data
```

Expand Down
49 changes: 49 additions & 0 deletions object-store-internal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ mod builder;
mod file;
mod utils;

use bytes::Bytes;
use futures::stream::BoxStream;
use futures::StreamExt;
use pyo3_asyncio_0_21 as pyo3_asyncio;
use std::borrow::Cow;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;

pub use crate::file::{ArrowFileSystemHandler, ObjectInputFile, ObjectOutputStream};
use crate::utils::{flatten_list_stream, get_bytes};
Expand All @@ -19,6 +23,7 @@ use object_store::{
};
use pyo3::exceptions::{
PyException, PyFileExistsError, PyFileNotFoundError, PyNotImplementedError,
PyStopAsyncIteration,
};
use pyo3::prelude::*;
use pyo3::PyErr;
Expand Down Expand Up @@ -470,6 +475,38 @@ impl PyClientOptions {
}
}

#[pyclass(name = "BytesStream")]
pub struct PyBytesStream {
stream: Arc<Mutex<BoxStream<'static, object_store::Result<Bytes>>>>,
}

impl PyBytesStream {
fn new(stream: BoxStream<'static, object_store::Result<Bytes>>) -> Self {
Self {
stream: Arc::new(Mutex::new(stream)),
}
}
}

#[pymethods]
impl PyBytesStream {
fn __aiter__(_self: Py<Self>) -> Py<Self> {
_self
}

fn __anext__(&self, py: Python) -> PyResult<Option<PyObject>> {
let stream = self.stream.clone();
let fut = pyo3_asyncio::tokio::future_into_py(py, async move {
match stream.lock().await.next().await {
Some(Ok(bytes)) => Ok(Cow::<[u8]>::Owned(bytes.to_vec())),
Some(Err(e)) => Err(ObjectStoreError::from(e).into()),
None => Err(PyStopAsyncIteration::new_err("stream exhausted")),
}
})?;
Ok(Some(fut.into()))
}
}

#[pyclass(name = "ObjectStore", subclass)]
#[derive(Debug, Clone)]
/// A generic object store interface for uniformly interacting with AWS S3, Google Cloud Storage,
Expand Down Expand Up @@ -608,6 +645,18 @@ impl PyObjectStore {
})
}

#[pyo3(text_signature = "($self, location)")]
fn stream(&self, py: Python, location: PyPath) -> PyResult<PyBytesStream> {
py.allow_threads(|| {
let stream = self
.rt
.block_on(self.inner.get(&location.into()))
.map_err(ObjectStoreError::from)?
.into_stream();
Ok(PyBytesStream::new(stream))
})
}

/// Return the metadata for the specified location
#[pyo3(text_signature = "($self, location)")]
fn head(&self, py: Python, location: PyPath) -> PyResult<PyObjectMeta> {
Expand Down
169 changes: 168 additions & 1 deletion object-store/python/object_store/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from io import BytesIO
from typing import List, Optional, Union
from typing import List, Optional, Union, override

# NOTE aliasing the imports with 'as' makes them public in the eyes
# of static code checkers. Thus we avoid listing them with __all__ = ...
Expand All @@ -8,6 +8,7 @@
from ._internal import ObjectMeta as ObjectMeta
from ._internal import ObjectStore as _ObjectStore
from ._internal import Path as Path
from ._internal import BytesStream

try:
import importlib.metadata as importlib_metadata
Expand Down Expand Up @@ -45,6 +46,7 @@ class ObjectStore(_ObjectStore):

backed by the Rust object_store crate."""

@override
def head(self, location: PathLike) -> ObjectMeta:
"""Return the metadata for the specified location.

Expand All @@ -56,6 +58,19 @@ def head(self, location: PathLike) -> ObjectMeta:
"""
return super().head(_as_path(location))

@override
async def head_async(self, location: PathLike) -> ObjectMeta:
"""Return the metadata for the specified location.

Args:
location (PathLike): path / key to storage location

Returns:
ObjectMeta: metadata for object at location
"""
return await super().head_async(_as_path(location))

@override
def get(self, location: PathLike) -> bytes:
"""Return the bytes that are stored at the specified location.

Expand All @@ -67,6 +82,19 @@ def get(self, location: PathLike) -> bytes:
"""
return super().get(_as_path(location))

@override
async def get_async(self, location: PathLike) -> bytes:
"""Return the bytes that are stored at the specified location.

Args:
location (PathLike): path / key to storage location

Returns:
bytes: raw data stored in location
"""
return super().get(_as_path(location))

@override
def get_range(self, location: PathLike, start: int, length: int) -> bytes:
"""Return the bytes that are stored at the specified location in the given byte range.

Expand All @@ -80,6 +108,34 @@ def get_range(self, location: PathLike, start: int, length: int) -> bytes:
"""
return super().get_range(_as_path(location), start, length)

@override
async def get_range_async(self, location: PathLike, start: int, length: int) -> bytes:
"""Return the bytes that are stored at the specified location in the given byte range.

Args:
location (PathLike): path / key to storage location
start (int): zero-based start index
length (int): length of the byte range

Returns:
bytes: raw data range stored in location
"""
return await super().get_range_async(_as_path(location), start, length)

@override
def stream(self, location: PathLike) -> BytesStream:
"""Return a chunked stream over the bytes that are stored at the specified location.

Args:
location (PathLike): path / key to storage location

Returns:
BytesStream: an async iterator that returns the next chunk in the stream with each
iteration
"""
return super().stream(_as_path(location))

@override
def put(self, location: PathLike, bytes: BytesLike) -> None:
"""Save the provided bytes to the specified location.

Expand All @@ -89,6 +145,17 @@ def put(self, location: PathLike, bytes: BytesLike) -> None:
"""
return super().put(_as_path(location), _as_bytes(bytes))

@override
async def put_async(self, location: PathLike, bytes: BytesLike) -> None:
"""Save the provided bytes to the specified location.

Args:
location (PathLike): path / key to storage location
bytes (BytesLike): data to be written to location
"""
return await super().put_async(_as_path(location), _as_bytes(bytes))

@override
def delete(self, location: PathLike) -> None:
"""Delete the object at the specified location.

Expand All @@ -97,6 +164,16 @@ def delete(self, location: PathLike) -> None:
"""
return super().delete(_as_path(location))

@override
async def delete_async(self, location: PathLike) -> None:
"""Delete the object at the specified location.

Args:
location (PathLike): path / key to storage location
"""
return await super().delete_async(_as_path(location))

@override
def list(self, prefix: Optional[PathLike] = None) -> List[ObjectMeta]:
"""List all the objects with the given prefix.

Expand All @@ -112,6 +189,23 @@ def list(self, prefix: Optional[PathLike] = None) -> List[ObjectMeta]:
prefix_ = _as_path(prefix) if prefix else None
return super().list(prefix_)

@override
async def list_async(self, prefix: Optional[PathLike] = None) -> List[ObjectMeta]:
"""List all the objects with the given prefix.

Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix
of `foo/bar/x` but not of `foo/bar_baz/x`.

Args:
prefix (PathLike | None, optional): path prefix to filter limit list results. Defaults to None.

Returns:
list[ObjectMeta]: ObjectMeta for all objects under the listed path
"""
prefix_ = _as_path(prefix) if prefix else None
return await super().list_async(prefix_)

@override
def list_with_delimiter(self, prefix: Optional[PathLike] = None) -> ListResult:
"""List objects with the given prefix and an implementation specific
delimiter. Returns common prefixes (directories) in addition to object
Expand All @@ -129,6 +223,25 @@ def list_with_delimiter(self, prefix: Optional[PathLike] = None) -> ListResult:
prefix_ = _as_path(prefix) if prefix else None
return super().list_with_delimiter(prefix_)

@override
async def list_with_delimiter_async(self, prefix: Optional[PathLike] = None) -> ListResult:
"""List objects with the given prefix and an implementation specific
delimiter. Returns common prefixes (directories) in addition to object
metadata.

Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix
of `foo/bar/x` but not of `foo/bar_baz/x`.

Args:
prefix (PathLike | None, optional): path prefix to filter limit list results. Defaults to None.

Returns:
list[ObjectMeta]: ObjectMeta for all objects under the listed path
"""
prefix_ = _as_path(prefix) if prefix else None
return await super().list_with_delimiter_async(prefix_)

@override
def copy(self, src: PathLike, dst: PathLike) -> None:
"""Copy an object from one path to another in the same object store.

Expand All @@ -140,6 +253,19 @@ def copy(self, src: PathLike, dst: PathLike) -> None:
"""
return super().copy(_as_path(src), _as_path(dst))

@override
async def copy_async(self, src: PathLike, dst: PathLike) -> None:
"""Copy an object from one path to another in the same object store.

If there exists an object at the destination, it will be overwritten.

Args:
src (PathLike): source path
dst (PathLike): destination path
"""
return await super().copy_async(_as_path(src), _as_path(dst))

@override
def copy_if_not_exists(self, src: PathLike, dst: PathLike) -> None:
"""Copy an object from one path to another, only if destination is empty.

Expand All @@ -151,6 +277,19 @@ def copy_if_not_exists(self, src: PathLike, dst: PathLike) -> None:
"""
return super().copy_if_not_exists(_as_path(src), _as_path(dst))

@override
async def copy_if_not_exists_async(self, src: PathLike, dst: PathLike) -> None:
"""Copy an object from one path to another, only if destination is empty.

Will return an error if the destination already has an object.

Args:
src (PathLike): source path
dst (PathLike): destination path
"""
return await super().copy_if_not_exists_async(_as_path(src), _as_path(dst))

@override
def rename(self, src: PathLike, dst: PathLike) -> None:
"""Move an object from one path to another in the same object store.

Expand All @@ -165,6 +304,22 @@ def rename(self, src: PathLike, dst: PathLike) -> None:
"""
return super().rename(_as_path(src), _as_path(dst))

@override
async def rename_async(self, src: PathLike, dst: PathLike) -> None:
"""Move an object from one path to another in the same object store.

By default, this is implemented as a copy and then delete source. It may not
check when deleting source that it was the same object that was originally copied.

If there exists an object at the destination, it will be overwritten.

Args:
src (PathLike): source path
dst (PathLike): destination path
"""
return await super().rename_async(_as_path(src), _as_path(dst))

@override
def rename_if_not_exists(self, src: PathLike, dst: PathLike) -> None:
"""Move an object from one path to another in the same object store.

Expand All @@ -175,3 +330,15 @@ def rename_if_not_exists(self, src: PathLike, dst: PathLike) -> None:
dst (PathLike): destination path
"""
return super().rename_if_not_exists(_as_path(src), _as_path(dst))

@override
async def rename_if_not_exists_async(self, src: PathLike, dst: PathLike) -> None:
"""Move an object from one path to another in the same object store.

Will return an error if the destination already has an object.

Args:
src (PathLike): source path
dst (PathLike): destination path
"""
return await super().rename_if_not_exists_async(_as_path(src), _as_path(dst))
Loading