Skip to content

Commit

Permalink
implemented repartitioning methods
Browse files Browse the repository at this point in the history
tnixon committed Jun 4, 2024
1 parent feefeca commit 2c510d0
Showing 2 changed files with 47 additions and 65 deletions.
2 changes: 2 additions & 0 deletions python/tempo/interpol.py
Original file line number Diff line number Diff line change
@@ -60,6 +60,8 @@ def interpolate(
interpolation function. The exact size of the leading and trailing margins are
configurable. The function should not attempt to modify these margin values.
**Note**: This function may cause the re-ordering of the rows in the resulting TSDF.
:param tsdf: the :class:`TSDF` timeseries dataframe
:param col: the name of the column to interpolate
:param fn: the interpolation function
110 changes: 45 additions & 65 deletions python/tempo/tsdf.py
Original file line number Diff line number Diff line change
@@ -1104,6 +1104,51 @@ def rangeBetweenWindow(
) -> WindowSpec:
return self.ts_schema.rangeBetweenWindow(start, end, reverse=reverse)

#
# Re-Partitioning
#

def repartitionBySeries(self, numPartitions: Optional[int] = None) -> "TSDF":
"""
Repartition the data frame by series id(s) into a given number of partitions.
:param numPartitions: number of partitions to repartition the data frame into
:return: a new :class:`~tsdf.TSDF` object with the data frame repartitioned
"""

# only makes sense if we have series ids
assert self.series_ids and len(self.series_ids) > 0, \
"No series ids to repartition by"

# keep same number of partitions if not specified
if numPartitions is None:
numPartitions = self.df.rdd.getNumPartitions()

# define a sorting expression
order_exprs = self.ts_index.orderByExpr()

# repartition by series ids, ordering by time
repartitioned_df = (self.df.repartition(numPartitions, *self.series_ids)
.sortWithinPartitions(*[self.series_ids
+ [self.ts_index.orderByExpr()]]))
return self.__withTransformedDF(repartitioned_df)

def repartitionByTime(self, numPartitions: Optional[int] = None) -> "TSDF":
"""
Repartition the data frame by time into a given number of partitions.
:param numPartitions: number of partitions to repartition the data frame into
:return: a new :class:`~tsdf.TSDF` object with the data frame repartitioned
"""
if numPartitions is None:
numPartitions = self.df.rdd.getNumPartitions()

repartitioned_df = self.df.repartitionByRange(numPartitions,
self.ts_index.orderByExpr())
return self.__withTransformedDF(repartitioned_df)

#
# Core Transformations
#
@@ -1512,71 +1557,6 @@ def resample(
func=func,
)

def interpolate(
self,
method: str,
freq: Optional[str] = None,
func: Optional[Union[Callable | str]] = None,
target_cols: Optional[List[str]] = None,
ts_col: Optional[str] = None,
series_ids: Optional[List[str]] = None,
show_interpolated: bool = False,
perform_checks: bool = True,
) -> "TSDF":
"""
Function to interpolate based on frequency, aggregation, and fill similar to pandas. Data will first be aggregated using resample, then missing values
will be filled based on the fill calculation.
:param freq: frequency for upsample - valid inputs are "hr", "min", "sec" corresponding to hour, minute, or second
:param func: function used to aggregate input
:param method: function used to fill missing values e.g. linear, null, zero, bfill, ffill
:param target_cols [optional]: columns that should be interpolated, by default interpolates all numeric columns
:param ts_col [optional]: specify other ts_col, by default this uses the ts_col within the TSDF object
:param partition_cols [optional]: specify other partition_cols, by default this uses the partition_cols within the TSDF object
:param show_interpolated [optional]: if true will include an additional column to show which rows have been fully interpolated.
:param perform_checks: calculate time horizon and warnings if True (default is True)
:return: new TSDF object containing interpolated data
"""

# Set defaults for target columns, timestamp column and partition columns when not provided
if freq is None:
raise ValueError("freq must be provided")
if func is None:
raise ValueError("func must be provided")
if ts_col is None:
ts_col = self.ts_col
if series_ids is None:
series_ids = self.series_ids
if target_cols is None:
prohibited_cols: List[str] = series_ids + [ts_col]
summarizable_types = ["int", "bigint", "float", "double"]

# get summarizable find summarizable columns
target_cols = [
datatype[0]
for datatype in self.df.dtypes
if (
(datatype[1] in summarizable_types)
and (datatype[0].lower() not in prohibited_cols)
)
]

interpolate_service = t_interpolation.Interpolation(is_resampled=False)
tsdf_input = TSDF(self.df, ts_col=ts_col, series_ids=series_ids)
interpolated_df: DataFrame = interpolate_service.interpolate(
tsdf_input,
ts_col,
series_ids,
target_cols,
freq,
func,
method,
show_interpolated,
perform_checks,
)

return TSDF(interpolated_df, ts_col=ts_col, series_ids=series_ids)

def extractStateIntervals(
self,
*metric_cols: str,

0 comments on commit 2c510d0

Please sign in to comment.