diff --git a/src/enfobench/__version__.py b/src/enfobench/__version__.py index dd9b22c..906d362 100644 --- a/src/enfobench/__version__.py +++ b/src/enfobench/__version__.py @@ -1 +1 @@ -__version__ = "0.5.1" +__version__ = "0.6.0" diff --git a/src/enfobench/datasets/__init__.py b/src/enfobench/datasets/__init__.py index e8b782b..a168ee3 100644 --- a/src/enfobench/datasets/__init__.py +++ b/src/enfobench/datasets/__init__.py @@ -1,8 +1,7 @@ -from enfobench.datasets.electricity_demand import DemandSubset, ElectricityDemandDataset, MetadataSubset, WeatherSubset +from enfobench.datasets.electricity_demand import ElectricityDemandDataset +from enfobench.datasets.pv_generation import PVGenerationDataset __all__ = [ "ElectricityDemandDataset", - "DemandSubset", - "MetadataSubset", - "WeatherSubset", + "PVGenerationDataset", ] diff --git a/src/enfobench/datasets/base.py b/src/enfobench/datasets/base.py new file mode 100644 index 0000000..4182880 --- /dev/null +++ b/src/enfobench/datasets/base.py @@ -0,0 +1,27 @@ +from pathlib import Path + + +class DatasetBase: + SUBSETS = () + + def __init__(self, directory: Path | str) -> None: + directory = Path(directory).resolve() + if not directory.is_dir() or not directory.exists(): + msg = "Please provide an existing directory where the dataset is located." + raise ValueError(msg) + self.directory = directory + + def __repr__(self) -> str: + return f"{self.__class__.__name__}(directory={self.directory})" + + def _check_for_valid_subset(self, subset: str): + if subset not in self.SUBSETS: + msg = f"Please provide a valid subset. Available subsets: {self.SUBSETS}" + raise ValueError(msg) + + def _get_subset_path(self, subset: str, extension: str = "parquet") -> Path: + filepath = self.directory / f"{subset}.{extension}" + if not filepath.exists(): + msg = f"Subset: {subset} is missing from the directory." + raise ValueError(msg) + return filepath diff --git a/src/enfobench/datasets/electricity_demand.py b/src/enfobench/datasets/electricity_demand.py index 229027a..1e7cb12 100644 --- a/src/enfobench/datasets/electricity_demand.py +++ b/src/enfobench/datasets/electricity_demand.py @@ -1,10 +1,10 @@ -from pathlib import Path from typing import Any import duckdb import pandas as pd from enfobench.core import Subset +from enfobench.datasets.base import DatasetBase Metadata = dict[str, Any] @@ -122,7 +122,7 @@ def get_by_unique_id(self, unique_id: str): return df -class ElectricityDemandDataset: +class ElectricityDemandDataset(DatasetBase): """ElectricityDemandDataset class representing the HuggingFace dataset. This class is a collection of all subsets inside HuggingFace dataset. @@ -136,21 +136,6 @@ class ElectricityDemandDataset: HUGGINGFACE_DATASET = "EDS-lab/electricity-demand" SUBSETS = ("demand", "metadata", "weather") - def __init__(self, directory: Path | str) -> None: - directory = Path(directory).resolve() - if not directory.is_dir() or not directory.exists(): - msg = f"Please provide the existing directory where the '{self.HUGGINGFACE_DATASET}' dataset is located." - raise ValueError(msg) - self.directory = directory.resolve() - - def __repr__(self) -> str: - return f"DemandDataset(directory={self.directory})" - - def _check_for_valid_subset(self, subset: str): - if subset not in self.SUBSETS: - msg = f"Please provide a valid subset. Available subsets: {self.SUBSETS}" - raise ValueError(msg) - @property def metadata_subset(self) -> MetadataSubset: """Returns the metadata subset.""" @@ -166,16 +151,6 @@ def demand_subset(self) -> DemandSubset: """Returns the demand subset.""" return DemandSubset(self._get_subset_path("demand")) - def _get_subset_path(self, subset: str) -> Path: - filepath = self.directory / f"{subset}.parquet" - if not filepath.exists(): - msg = ( - f"There is no {subset} in the directory. " - f"Make sure to download all subsets from the HuggingFace dataset: {self.HUGGINGFACE_DATASET}." - ) - raise ValueError(msg) - return self.directory / f"{subset}.parquet" - def list_unique_ids(self) -> list[str]: return self.metadata_subset.list_unique_ids() diff --git a/src/enfobench/datasets/pv_generation.py b/src/enfobench/datasets/pv_generation.py new file mode 100644 index 0000000..b964d95 --- /dev/null +++ b/src/enfobench/datasets/pv_generation.py @@ -0,0 +1,166 @@ +from typing import Any + +import duckdb +import pandas as pd + +from enfobench.core import Subset +from enfobench.datasets.base import DatasetBase + +Metadata = dict[str, Any] + + +class MetadataSubset(Subset): + """Metadata subset of the HuggingFace dataset containing all metadata about the meters. + + Args: + file_path: The path to the subset file. + """ + + def list_unique_ids(self) -> list[str]: + """Lists all unique ids.""" + query = """ + SELECT DISTINCT unique_id + FROM read_parquet(?) + """ + conn = duckdb.connect(":memory:") + return conn.execute(query, parameters=[str(self.file_path)]).fetch_df().unique_id.tolist() + + def get_by_unique_id(self, unique_id: str) -> Metadata: + """Returns the metadata for the given unique id. + + Args: + unique_id: The unique id of the meter. + """ + query = """ + SELECT * + FROM read_parquet(?) + WHERE unique_id = ? + """ + conn = duckdb.connect(":memory:") + df = conn.execute(query, parameters=[str(self.file_path), unique_id]).fetch_df() + if df.empty: + msg = f"Unique id '{unique_id}' was not found." + raise KeyError(msg) + return df.iloc[0].to_dict() + + +class WeatherSubset(Subset): + """Weather subset of the HuggingFace dataset containing all weather data. + + Args: + file_path: The path to the subset file. + """ + + def list_location_ids(self) -> list[str]: + """Lists all location ids.""" + query = """ + SELECT DISTINCT location_id + FROM read_parquet(?) + """ + conn = duckdb.connect(":memory:") + return conn.execute(query, parameters=[str(self.file_path)]).fetch_df().location_id.tolist() + + def get_by_location_id(self, location_id: str, columns: list[str] | None = None) -> pd.DataFrame: + """Returns the weather data for the given location id. + + Args: + location_id: The location id of the weather station. + columns: The columns to return. If None, all columns are returned. + """ + conn = duckdb.connect(":memory:") + + if columns: + query = f""" + SELECT timestamp, {", ".join(columns)} + FROM read_parquet(?) + WHERE location_id = ? + """ # noqa: S608 + else: + query = """ + SELECT * + FROM read_parquet(?) + WHERE location_id = ? + """ + df = conn.execute(query, parameters=[str(self.file_path), location_id]).fetch_df() + if df.empty: + msg = f"Location id '{location_id}' was not found." + raise KeyError(msg) + + # Remove location_id and set timestamp as index + df.drop(columns=["location_id"], inplace=True, errors="ignore") + df.set_index("timestamp", inplace=True) + return df + + +class GenerationSubset(Subset): + """Data subset of the HuggingFace dataset containing all pv generation data. + + Args: + file_path: The path to the subset file. + """ + + def get_by_unique_id(self, unique_id: str): + """Returns the generation data for the given unique id. + + Args: + unique_id: The unique id of the meter. + """ + query = """ + SELECT * + FROM read_parquet(?) + WHERE unique_id = ? + """ + conn = duckdb.connect(":memory:") + df = conn.execute(query, parameters=[str(self.file_path), unique_id]).fetch_df() + if df.empty: + msg = f"Unique id '{unique_id}' was not found." + raise KeyError(msg) + + # Remove unique_id and set timestamp as index + df.drop(columns=["unique_id"], inplace=True, errors="ignore") + df.set_index("timestamp", inplace=True) + return df + + +class PVGenerationDataset(DatasetBase): + """PVGenerationDataset class representing the HuggingFace dataset. + + This class is a collection of all subsets inside HuggingFace dataset. + It provides an easy way to access the different subsets. + + Args: + directory: The directory where the HuggingFace dataset is located. + This directory should contain all the subset files. + """ + + HUGGINGFACE_DATASET = "EDS-lab/pv-generation" + SUBSETS = ("generation", "metadata", "weather") + + @property + def metadata_subset(self) -> MetadataSubset: + """Returns the metadata subset.""" + return MetadataSubset(self._get_subset_path("metadata")) + + @property + def weather_subset(self) -> WeatherSubset: + """Returns the weather subset.""" + return WeatherSubset(self._get_subset_path("weather")) + + @property + def generation_subset(self) -> GenerationSubset: + """Returns the generation subset.""" + return GenerationSubset(self._get_subset_path("generation")) + + def list_unique_ids(self) -> list[str]: + return self.metadata_subset.list_unique_ids() + + def list_location_ids(self) -> list[str]: + return self.weather_subset.list_location_ids() + + def get_data_by_unique_id(self, unique_id: str) -> tuple[pd.DataFrame, pd.DataFrame, Metadata]: + metadata = self.metadata_subset.get_by_unique_id(unique_id) + location_id = metadata["location_id"] + + generation = self.generation_subset.get_by_unique_id(unique_id) + weather = self.weather_subset.get_by_location_id(location_id) + return generation, weather, metadata