Skip to content

Commit

Permalink
Merge pull request #35 from Oxen-AI/feat/data-streamer
Browse files Browse the repository at this point in the history
Feat/data streamer
  • Loading branch information
gschoeni authored Dec 27, 2023
2 parents 8078583 + ec5537e commit ffa388c
Show file tree
Hide file tree
Showing 18 changed files with 1,091 additions and 293 deletions.
574 changes: 348 additions & 226 deletions oxen/Cargo.lock

Large diffs are not rendered by default.

15 changes: 8 additions & 7 deletions oxen/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "oxen"
version = "0.3.2"
version = "0.3.4"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand All @@ -9,17 +9,18 @@ name = "oxen"
crate-type = ["cdylib"]

[dependencies]
pyo3 = "0.19"
pyo3-asyncio = { git = "https://github.com/awestlake87/pyo3-asyncio.git", rev = "2cdf4d7ed932b6d0a3dc2756189bdd17a050e7f7", features = ["attributes", "tokio-runtime"] }
pyo3 = "0.20.0"
pyo3-asyncio = { version = "0.20.0", features = ["attributes", "tokio-runtime"] }
log = "0.4.17"
pyo3-log = "0.8.1"
pyo3-log = "0.9.0"
tokio = { version = "1", features = ["full"] }
pyo3-polars = "0.6.0"
liboxen = "0.9.8"
pyo3-polars = "0.9.0"
serde_json = "1.0.106"
liboxen = "0.9.17"
# liboxen = { path = "../../rust/Oxen/src/lib" }

[build-dependencies]
cc = { version = "1.0", features = ["parallel"] }
bindgen = { version = "0.66.0", default-features = false, features = ["runtime"] }
bindgen = { version = "0.69.1", default-features = false, features = ["runtime"] }
glob = "0.3"
pkg-config = { version = "0.3", optional = true }
2 changes: 1 addition & 1 deletion oxen/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[build-system]
requires = ["maturin>=0.12,<0.13"]
requires = ["maturin>=0.12,<0.14"]
build-backend = "maturin"

[project]
Expand Down
1 change: 1 addition & 0 deletions oxen/python/oxen/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Optional
import os


def config_auth(token: str, host: str = "hub.oxen.ai", path: Optional[str] = None):
"""
Configures authentication for a host.
Expand Down
2 changes: 1 addition & 1 deletion oxen/python/oxen/clone.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def clone(
A LocalRepo object that can be used to interact with the cloned repo.
"""
# Verify repo_id format
if not "/" in repo_id:
if "/" not in repo_id:
raise ValueError(f"Invalid repo_id format: {repo_id}")
# Get repo name from repo_id
repo_name = repo_id.split("/")[1]
Expand Down
126 changes: 77 additions & 49 deletions oxen/python/oxen/dataset.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,3 @@
# Read up on data, dataset, and dataloader APIs in
# * pytorch
# * huggingface
# * tensorflow
#
# Others to consider:
# * keras
# * fastai
# * pytorch-lightning
# * jax
# * sklearn
# * pandas
# * numpy
# * scipy

# Start going through tutorials for each of the above,
# and write example code for each of the above
# fit into a boilerplate similar to the ImageClassificationBoilerplate and
# be sure that the Oxen data APIs can easily slot in,
# with swapping out the model, training loop, eval loop, etc.

# Design APIs to be easy to fetch random examples from remote data frames to serve up
# to our "gradio chatbot labeling tool" (or any other labeling tool)

# Design underlying Oxen APIs to easily conform to the above APIs
#
# Keith: How impossible would building a Python library that exposes a read-only
# dataframe on top of an oxen repo be?

# Read-only dataframe API:
# 0) download a file to disk from repo (easy)
# 1) load a dataframe into python, from downloaded file (easy)
# 2) slice and stream a dataframe in current format (easy, already have apis,
# potentially slow to slice big CSVs)
# 3) slice and stream a dataframe after converting for apache arrow for you on disk
# (way more efficient, can convert on disk, do we do out of the box?)

from oxen import PyDataset

import os
Expand All @@ -44,6 +7,24 @@
from typing import Optional


def load_dataset(repo, paths: Union[str, Sequence[str]], features=None):
"""
Load a dataset from a repo.
Parameters
----------
repo : Repo
The oxen repository you are loading data from
can be a local or a remote repo
features : Features | None
The features of the dataset, columns, dtypes, etc.
paths : str | Sequence[str]
The paths to the data files needed to load the dataset
"""
dataset = Dataset(repo, paths, features)
return dataset


class Dataset:
"""
Dataset object constructs a dataset from a remote or local repo.
Expand Down Expand Up @@ -89,35 +70,80 @@ def __init__(

self._data_frames = []
self._features = features
self.downloaded = False

if download:
self.download_all()
else:
self.sizes = [self._repo.get_df_size(path) for path in self._paths]
width = sum([size[0] for size in self.sizes])
height = sum([size[1] for size in self.sizes])
self.size = width, height

# For iterating over the dataset
def __len__(self):
return sum([df.height for df in self._data_frames])
if self.downloaded:
return sum([df.height for df in self._data_frames])
else:
return self.size[1]

# For iterating over the dataset
def __getitem__(self, idx):
# iterate over data frames to find the right one to index
df_offset = 0 # which row we are at
df_idx = 0 # which dataframe we are on
while df_offset < idx:
df_offset += self._data_frames[df_idx].height
df_idx += 1
# FInd which dataframes we are in
df_idx, df_offset = self._get_df_offsets(idx)

# Offset is the row we are at in the data frame
remainder = idx - df_offset

# extract the features from the data frame
ret_features = []
for feature in self.features:
df = self._data_frames[df_idx]
val = df[feature.name][remainder]
ret_features.append(val)
if self.downloaded:
# extract the features from the data frame
for feature in self.features:
df = self._data_frames[df_idx]
val = df[feature.name][remainder]
ret_features.append(val)
else:
# Grab from the API
row = self._get_df_row(idx)
ret_features.append(row)

return ret_features

def __iter__(self):
for i in range(len(self)):
yield self[i]

def __repr__(self):
return f"Dataset({self._repo}, {self._paths})"

def __str__(self):
return f"Dataset({self._repo}, {self._paths})"

def _get_df_row(self, idx):
df_idx, df_offset = self._get_df_offsets(idx)
path = self._paths[df_offset]
return self._repo.get_df_row(path, df_idx)

def _get_df_offsets(self, idx):
# iterate over data frames to find the right one to index
df_offset = 0 # which row we are at
df_idx = 0 # which dataframe we are on

# create a running sum of the heights of the data frames
heights = [size[1] for size in self.sizes]
summed_heights = [sum(heights[: i + 1]) for i in range(len(heights))]

# find which data frame we are in
for i, height in enumerate(summed_heights):
if idx > height:
df_offset = i

df_idx = idx
if df_idx > summed_heights[df_offset]:
df_idx = idx - summed_heights[df_offset]

return df_idx, df_offset

@staticmethod
def default_cache_dir(repo) -> str:
# ~/.oxen/.cache/<namespace>/<repo>/<revision>
Expand All @@ -140,6 +166,7 @@ def _cache_download_path(self, base_dir: Optional[str]) -> str:
def df(self, path: str, base_dir: str = None) -> pl.DataFrame:
"""
Returns a dataframe of the data from the repo, fully loaded into memory.
Only works if dataset has been downloaded to disk.
Parameters
----------
Expand Down Expand Up @@ -176,6 +203,7 @@ def download(self, remote_path: str, base_dir: str = None) -> str:
os.makedirs(parent, exist_ok=True)
if not os.path.exists(output_path):
self._repo.download(remote_path, parent)
self.downloaded = True
return output_path

def download_all(self):
Expand Down
9 changes: 6 additions & 3 deletions oxen/python/oxen/local_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,17 +110,20 @@ def add(self, path: str):

def rm(self, path: str, recursive=False, staged=False, remote=False):
"""
Remove a file or directory from being tracked. This will not delete the file or directory.
Remove a file or directory from being tracked.
This will not delete the file or directory.
Args:
path: `str`
The path to the file or directory to remove.
recursive: `bool`
Whether to remove the file or directory recursively. Default: False
staged: `bool`
Whether to remove the file or directory from the staging area. Default: False
Whether to remove the file or directory from the staging area.
Default: False
remote: `bool`
Whether to remove the file or directory from a remote workspace. Default: False
Whether to remove the file or directory from a remote workspace.
Default: False
"""
self._repo.rm(path, recursive, staged, remote)

Expand Down
Empty file.
26 changes: 26 additions & 0 deletions oxen/python/oxen/providers/dataset_path_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
class DatasetPathProvider:
"""An interface for providing data by path and index"""

@property
def paths(self):
"""Get the paths to the data files"""
raise NotImplementedError

def size(self, path) -> int:
"""Get the size of the dataframe at the given path"""
raise NotImplementedError

def slice(self, path, start, end):
"""
Get a slice of the dataframe at the given path
Parameters
----------
path : str
The path to the dataframe
start : int
The start index
end : int
The end index
"""
raise NotImplementedError
73 changes: 73 additions & 0 deletions oxen/python/oxen/providers/mock_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from oxen.providers.dataset_path_provider import DatasetPathProvider
import time


class MockPathProvider(DatasetPathProvider):
"""
A mock implementation for providing data by path and index
It generates mock data with the given columns and number of rows
for the set of paths.
"""

def __init__(
self,
paths=["path_1.csv", "path_2.csv"],
num_rows=1024,
columns=["path", "x", "y"],
download_time=0.1, # mock a slow download
):
self._paths = paths
self._num_rows = num_rows
self._columns = columns
self._download_time = download_time
self._setup()

def _setup(self):
self._data_frame_paths = {}
for i, path in enumerate(self._paths):
self._data_frame_paths[path] = self._make_data_frame(i)

def _make_data_frame(self, i):
df = []
for j in range(self._num_rows):
row = {}
for col in self._columns:
idx = i * self._num_rows + j
row[col] = f"{col}_{idx}"
df.append(row)
return df

@property
def paths(self):
return self._paths

def size(self, path) -> int:
"""Get the size of the dataframe at the given path"""
if path not in self._data_frame_paths:
# Make sure the path exists
return 0, 0

if len(self._data_frame_paths[path]) == 0:
# Make sure the path has data
return 0, 0

# width x height
return len(self._data_frame_paths[path][0]), len(self._data_frame_paths[path])

def slice(self, path, start, end):
"""
Get a slice of the dataframe at the given path
Parameters
----------
path : str
The path to the dataframe
start : int
The start index
end : int
The end index
"""
# mock a slow download
time.sleep(self._download_time)
return self._data_frame_paths[path][start:end]
Loading

0 comments on commit ffa388c

Please sign in to comment.