Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 72 additions & 8 deletions tstore/archive/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import glob
import os
import shutil
from pathlib import Path


def check_tstore_structure(tstore_structure):
Expand Down Expand Up @@ -31,12 +32,43 @@ def define_attributes_filepath(base_dir):
return fpath


def define_tsarray_filepath(base_dir, tstore_id, ts_variable, tstore_structure):
"""Define filepath of a TStore TS."""
def define_tsarray_filepath(
base_dir: Path | str,
tstore_id: str,
ts_variable: str,
tstore_structure: str,
id_prefix: str,
var_prefix: str,
) -> str:
"""
Define filepath of a TStore TS.

Parameters
----------
base_dir : path-like
Base directory of the TStore.
tstore_id : str
Value of the time series ID.
ts_variable : str
Name of the time series variable.
ts_structure : ["id-var", "var-id"]
TStore structure, either "id-var" or "var-id".
id_prefix : str
Prefix for the ID directory in the TStore.
var_prefix : str
Prefix for the variable directory in the TStore.

Returns
-------
fpath : str
Filepath for the time series.
"""
id_dir_basename = f"{id_prefix}={tstore_id}"
var_dir_basename = f"{var_prefix}={ts_variable}"
if tstore_structure == "id-var":
fpath = os.path.join(base_dir, tstore_id, ts_variable)
fpath = os.path.join(base_dir, id_dir_basename, var_dir_basename)
elif tstore_structure == "var-id":
fpath = os.path.join(base_dir, ts_variable, tstore_id)
fpath = os.path.join(base_dir, var_dir_basename, id_dir_basename)
else:
raise ValueError("Valid tstore_structure are 'id-var' and 'var-id'.")
return fpath
Expand Down Expand Up @@ -82,16 +114,48 @@ def get_partitions(base_dir, ts_variable):
return partitions


def get_ts_info(base_dir, ts_variable):
"""Retrieve filepaths and tstore_ids for a specific ts_variable."""
def get_ts_info(
base_dir: Path | str,
ts_variable: str,
var_prefix: str,
):
"""
Retrieve filepaths and tstore_ids for a specific ts_variable.

Parameters
----------
base_dir : path-like
Base directory of the TStore.
ts_variable : str
Name of the time series variable.
var_prefix : str
Prefix for the variable directory in the TStore.

Returns
-------
fpaths : list of str
List of filepaths for the time series.
tstore_ids : list of str
List of time series IDs.
partitions : list of str
List of partitions.
"""
tstore_structure = get_tstore_structure(base_dir)

# TODO: DRY with `define_tsarray_filepath`?
var_dir_basename = f"{var_prefix}={ts_variable}"

if tstore_structure == "id-var":
fpaths = glob.glob(os.path.join(base_dir, "*", ts_variable))
fpaths = glob.glob(os.path.join(base_dir, "*", var_dir_basename))
tstore_ids = [os.path.basename(os.path.dirname(fpath)) for fpath in fpaths]
elif tstore_structure == "var-id":
fpaths = glob.glob(os.path.join(base_dir, ts_variable, "*"))
fpaths = glob.glob(os.path.join(base_dir, var_dir_basename, "*"))
tstore_ids = [os.path.basename(fpath) for fpath in fpaths]
else:
raise ValueError("Valid tstore_structure are 'id-var' and 'var-id'.")
# get only id values (remove prefix from hive prefix=value notation)
tstore_ids = [tstore_id.split("=")[1] for tstore_id in tstore_ids]

partitions = get_partitions(base_dir, ts_variable)

return fpaths, tstore_ids, partitions
8 changes: 5 additions & 3 deletions tstore/tests/test_tsdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,15 @@ def test_store(
assert dirpath.is_dir()

# Check directory content
assert sorted(os.listdir(dirpath / "1" / "ts_variable")) == [
assert sorted(os.listdir(dirpath / "tstore_id=1" / "variable=ts_variable")) == [
"_common_metadata",
"_metadata",
"part.0.parquet",
"part.1.parquet",
]
assert sorted(os.listdir(dirpath)) == ["1", "2", "3", "4", "_attributes.parquet", "tstore_metadata.yaml"]
assert sorted(os.listdir(dirpath)) == ["_attributes.parquet"] + [f"tstore_id={i}" for i in ["1", "2", "3", "4"]] + [
"tstore_metadata.yaml",
]


class TestLoad:
Expand All @@ -116,4 +118,4 @@ def test_pandas(
tsdf = tstore.open_tsdf(tstore_path, backend="pandas")
assert type(tsdf) is TSDFPandas
assert type(tsdf._df) is pd.DataFrame
assert tsdf.shape == (4, 3)
assert tsdf.shape == (4, 2)
6 changes: 4 additions & 2 deletions tstore/tests/test_tslong.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,10 @@ def test_store(
assert dirpath.is_dir()

# Check directory content
assert sorted(os.listdir(dirpath)) == ["1", "2", "3", "4", "_attributes.parquet", "tstore_metadata.yaml"]
assert os.listdir(dirpath / "1" / "ts_variable" / "year=2000" / "month=1") == ["part-0.parquet"]
assert sorted(os.listdir(dirpath)) == ["_attributes.parquet"] + [f"store_id={i}" for i in ["1", "2", "3", "4"]] + [
"tstore_metadata.yaml",
]
assert os.listdir(dirpath / "store_id=1" / "variable=ts_variable" / "year=2000" / "month=1") == ["part-0.parquet"]


class TestLoad:
Expand Down
68 changes: 56 additions & 12 deletions tstore/tsdf/pandas.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""TSDF class wrapping a Pandas dataframe of TSArray objects."""

from pathlib import Path

from tstore.archive.metadata.readers import read_tstore_metadata
from tstore.tsdf.reader import _read_tsarrays
from tstore.tsdf.tsdf import TSDF
Expand All @@ -11,28 +13,70 @@ class TSDFPandas(TSDF):

def to_tstore(
self,
base_dir,
id_var,
time_var, # likely not needed !
partitioning=None,
tstore_structure="id-var",
overwrite=True, # append functionality?
base_dir: Path | str,
id_var: str | None = None,
time_var: str | None = None, # TODO: likely not needed !
partitioning: str | None = None,
tstore_structure: str = "id-var",
var_prefix: str = "variable",
overwrite: bool = True, # append functionality?
# geometry
):
"""Write TStore from TSDF object."""
_ = write_tstore(
) -> None:
"""
Write TSDF into a TStore.

Parameters
----------
base_dir : path-like
Base directory of the TStore.
id_var : str, optional
Name of the id variable. If the provided value matches a column of the wrapped data frame, the corresponding
column will be used as id. If the provided value is not None but does not match any column of the wrapped
data frame, the id values will be taken from the index, which will be named using the provided value.
Otherwise, the id values and name will be taken from the index.
time_var : str, optional
Name of the time variable.
ts_variables : list-like of str
List of time series variables to write.
static_variables : list-like of str, optional
List of static variables to write.
partitioning : str, optional
Time partitioning string.
tstore_structure : ["id-var", "var-id"], default "id-var"
TStore structure, either "id-var" or "var-id".
var_prefix : str, default "variable"
Prefix for the variable directory in the TStore.
overwrite : bool, default True
Overwrite existing TStore.
"""
write_tstore(
self._df,
base_dir=base_dir,
id_var=id_var,
time_var=time_var,
partitioning=partitioning,
tstore_structure=tstore_structure,
var_prefix=var_prefix,
overwrite=overwrite,
)

@staticmethod
def from_tstore(base_dir: str) -> "TSDFPandas":
"""Read TStore into TSDF object."""
def from_tstore(base_dir: Path | str, var_prefix: str = "variable") -> "TSDFPandas":
"""
Read TStore into TSDF object.

Parameters
----------
base_dir : path-like
Base directory of the TStore.
var_prefix : str, default "variable"
Prefix for the variable directory in the TStore.

Returns
-------
TSDFPandas
TSDF object with pandas backend.
"""
# TODO: enable specify subset of TSArrays, attribute columns and rows to load
# TODO: read_attributes using geopandas --> geoparquet
# TODO: separate TSDF class if geoparquet (TSDF inherit from geopandas.GeoDataFrame ?)
Expand All @@ -45,7 +89,7 @@ def from_tstore(base_dir: str) -> "TSDFPandas":
df = read_attributes(base_dir).set_index(metadata["id_var"])

# Get list of TSArrays
list_ts_series = _read_tsarrays(base_dir, metadata)
list_ts_series = _read_tsarrays(base_dir, metadata, var_prefix)

# Join TSArrays to dataframe
for ts_series in list_ts_series:
Expand Down
48 changes: 42 additions & 6 deletions tstore/tsdf/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,35 @@
@author: ghiggi
"""

from pathlib import Path

import pandas as pd

from tstore.archive.io import get_ts_info
from tstore.tsdf.ts_class import TS
from tstore.tsdf.tsarray import TSArray


def _read_tsarray(base_dir, ts_variable):
"""Read a TSArray into a pd.Series."""
def _read_tsarray(base_dir: Path | str, ts_variable: str, var_prefix: str) -> pd.Series:
"""
Read a TSArray into a pd.Series.

Parameters
----------
base_dir : path-like
Base directory of the TStore.
ts_variable : str
Name of the time series variable.
var_prefix : str
Prefix for the variable directory in the TStore.

Returns
-------
pd.Series
TSArray Series.
"""
# Retrieve TS fpaths and associated tstore_ids
ts_fpaths, tstore_ids, partitions = get_ts_info(base_dir=base_dir, ts_variable=ts_variable)
ts_fpaths, tstore_ids, partitions = get_ts_info(base_dir=base_dir, ts_variable=ts_variable, var_prefix=var_prefix)
# Read TS objects
# TODO: add option for TS format (dask, pandas, ...)
list_ts = [TS.from_file(fpath, partitions=partitions) for fpath in ts_fpaths]
Expand All @@ -26,8 +44,26 @@ def _read_tsarray(base_dir, ts_variable):
return ts_series


def _read_tsarrays(base_dir, metadata):
"""Read list of TSArrays."""
def _read_tsarrays(base_dir: Path | str, metadata: dict, var_prefix: str) -> list[pd.Series]:
"""
Read list of TSArrays.

Parameters
----------
base_dir : path-like
Base directory of the TStore.
metadata : dict-like
Metadata dictionary.
var_prefix : str
Prefix for the variable directory in the TStore.

Returns
-------
list of pd.Series
List of TSArray Series.
"""
ts_variables = metadata["ts_variables"]
list_ts_series = [_read_tsarray(base_dir=base_dir, ts_variable=ts_variable) for ts_variable in ts_variables]
list_ts_series = [
_read_tsarray(base_dir=base_dir, ts_variable=ts_variable, var_prefix=var_prefix) for ts_variable in ts_variables
]
return list_ts_series
Loading