Skip to content

Commit d50c06f

Browse files
committed
feat: hive partitioning for id/var (TODO: make it optional?)
1 parent 8bc1745 commit d50c06f

File tree

5 files changed

+90
-11
lines changed

5 files changed

+90
-11
lines changed

tstore/archive/io.py

Lines changed: 73 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import glob
44
import os
55
import shutil
6+
from pathlib import Path
7+
from typing import Union
68

79

810
def check_tstore_structure(tstore_structure):
@@ -31,12 +33,43 @@ def define_attributes_filepath(base_dir):
3133
return fpath
3234

3335

34-
def define_tsarray_filepath(base_dir, tstore_id, ts_variable, tstore_structure):
35-
"""Define filepath of a TStore TS."""
36+
def define_tsarray_filepath(
37+
base_dir: Union[Path, str],
38+
tstore_id: str,
39+
ts_variable: str,
40+
tstore_structure: str,
41+
id_prefix: str,
42+
var_prefix: str,
43+
) -> str:
44+
"""
45+
Define filepath of a TStore TS.
46+
47+
Parameters
48+
----------
49+
base_dir : path-like
50+
Base directory of the TStore.
51+
tstore_id : str
52+
Value of the time series ID.
53+
ts_variable : str
54+
Name of the time series variable.
55+
ts_structure : ["id-var", "var-id"]
56+
TStore structure, either "id-var" or "var-id".
57+
id_prefix : str
58+
Prefix for the ID directory.
59+
var_prefix : str
60+
Prefix for the variable directory.
61+
62+
Returns
63+
-------
64+
fpath : str
65+
Filepath for the time series.
66+
"""
67+
id_dir_basename = f"{id_prefix}={tstore_id}"
68+
var_dir_basename = f"{var_prefix}={ts_variable}"
3669
if tstore_structure == "id-var":
37-
fpath = os.path.join(base_dir, tstore_id, ts_variable)
70+
fpath = os.path.join(base_dir, id_dir_basename, var_dir_basename)
3871
elif tstore_structure == "var-id":
39-
fpath = os.path.join(base_dir, ts_variable, tstore_id)
72+
fpath = os.path.join(base_dir, var_dir_basename, id_dir_basename)
4073
else:
4174
raise ValueError("Valid tstore_structure are 'id-var' and 'var-id'.")
4275
return fpath
@@ -82,16 +115,48 @@ def get_partitions(base_dir, ts_variable):
82115
return partitions
83116

84117

85-
def get_ts_info(base_dir, ts_variable):
86-
"""Retrieve filepaths and tstore_ids for a specific ts_variable."""
118+
def get_ts_info(
119+
base_dir: Union[Path, str],
120+
ts_variable: str,
121+
var_prefix: str,
122+
):
123+
"""
124+
Retrieve filepaths and tstore_ids for a specific ts_variable.
125+
126+
Parameters
127+
----------
128+
base_dir : path-like
129+
Base directory of the TStore.
130+
ts_variable : str
131+
Name of the time series variable.
132+
var_prefix : str
133+
Prefix for the variable directory.
134+
135+
Returns
136+
-------
137+
fpaths : list of str
138+
List of filepaths for the time series.
139+
tstore_ids : list of str
140+
List of time series IDs.
141+
partitions : list of str
142+
List of partitions.
143+
"""
87144
tstore_structure = get_tstore_structure(base_dir)
145+
146+
# TODO: DRY with `define_tsarray_filepath`?
147+
var_dir_basename = f"{var_prefix}={ts_variable}"
148+
88149
if tstore_structure == "id-var":
89-
fpaths = glob.glob(os.path.join(base_dir, "*", ts_variable))
150+
fpaths = glob.glob(os.path.join(base_dir, "*", var_dir_basename))
90151
tstore_ids = [os.path.basename(os.path.dirname(fpath)) for fpath in fpaths]
91152
elif tstore_structure == "var-id":
92-
fpaths = glob.glob(os.path.join(base_dir, ts_variable, "*"))
153+
fpaths = glob.glob(os.path.join(base_dir, var_dir_basename, "*"))
93154
tstore_ids = [os.path.basename(fpath) for fpath in fpaths]
94155
else:
95156
raise ValueError("Valid tstore_structure are 'id-var' and 'var-id'.")
157+
# get only id values (remove prefix from hive prefix=value notation)
158+
tstore_ids = [tstore_id.split("=")[1] for tstore_id in tstore_ids]
159+
96160
partitions = get_partitions(base_dir, ts_variable)
161+
97162
return fpaths, tstore_ids, partitions

tstore/tests/test_tslong.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,10 @@ def test_store(
101101
assert dirpath.is_dir()
102102

103103
# Check directory content
104-
assert sorted(os.listdir(dirpath)) == ["1", "2", "3", "4", "_attributes.parquet", "tstore_metadata.yaml"]
105-
assert os.listdir(dirpath / "1" / "ts_variable" / "year=2000" / "month=1") == ["part-0.parquet"]
104+
assert sorted(os.listdir(dirpath)) == ["_attributes.parquet"] + [f"store_id={i}" for i in ["1", "2", "3", "4"]] + [
105+
"tstore_metadata.yaml",
106+
]
107+
assert os.listdir(dirpath / "store_id=1" / "variable=ts_variable" / "year=2000" / "month=1") == ["part-0.parquet"]
106108

107109

108110
class TestLoad:

tstore/tslong/pandas.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ def to_tstore(
3131
# TSTORE options
3232
partitioning=None,
3333
tstore_structure="id-var",
34+
var_prefix="variable",
3435
overwrite=True,
3536
):
3637
"""Write the wrapped dataframe as a TStore structure."""
@@ -117,6 +118,8 @@ def to_tstore(
117118
tstore_id=tstore_id,
118119
ts_variable=ts_variable,
119120
tstore_structure=tstore_structure,
121+
id_prefix=id_var,
122+
var_prefix=var_prefix,
120123
)
121124

122125
# -----------------------------------------------

tstore/tslong/polars.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ def to_tstore(
3131
# TSTORE options
3232
partitioning=None,
3333
tstore_structure="id-var",
34+
var_prefix="variable",
3435
overwrite=True,
3536
) -> None:
3637
"""Write the wrapped dataframe as a TStore structure."""
@@ -110,6 +111,8 @@ def to_tstore(
110111
tstore_id=tstore_id,
111112
ts_variable=ts_variable,
112113
tstore_structure=tstore_structure,
114+
id_prefix=id_var,
115+
var_prefix=var_prefix,
113116
)
114117

115118
# TODO; Maybe create TS object and use TS.to_parquet() once implemented

tstore/tslong/pyarrow.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ def to_tstore(self) -> None:
2828
def from_tstore(
2929
base_dir,
3030
ts_variables=None,
31+
var_prefix="variable",
3132
start_time=None,
3233
end_time=None,
3334
tstore_ids=None,
@@ -50,6 +51,7 @@ def from_tstore(
5051
id_var=id_var,
5152
time_var=time_var,
5253
ts_variables=ts_variables,
54+
var_prefix=var_prefix,
5355
start_time=start_time,
5456
end_time=end_time,
5557
columns=columns,
@@ -104,6 +106,7 @@ def _read_ts_variable(
104106
base_dir,
105107
id_var,
106108
ts_variable,
109+
var_prefix,
107110
start_time=None,
108111
end_time=None,
109112
columns=None,
@@ -112,7 +115,8 @@ def _read_ts_variable(
112115
):
113116
"""Read a TStore ts_variable into pyarrow long-format."""
114117
# Find TS and associated TStore IDs
115-
fpaths, tstore_ids, partitions = get_ts_info(base_dir=base_dir, ts_variable=ts_variable)
118+
fpaths, tstore_ids, partitions = get_ts_info(base_dir=base_dir, ts_variable=ts_variable, var_prefix=var_prefix)
119+
116120
# Read each TS
117121
list_tables = [
118122
_read_ts(
@@ -147,6 +151,7 @@ def _read_ts_variables(
147151
id_var,
148152
time_var,
149153
ts_variables,
154+
var_prefix,
150155
start_time=None,
151156
end_time=None,
152157
columns=None,
@@ -160,6 +165,7 @@ def _read_ts_variables(
160165
base_dir=base_dir,
161166
id_var=id_var,
162167
ts_variable=ts_variable,
168+
var_prefix=var_prefix,
163169
start_time=start_time,
164170
end_time=end_time,
165171
columns=columns, # columns[ts_variable] in future

0 commit comments

Comments
 (0)