Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Implement Proximity Forest 2.0 classifier using aeon distances #1978

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
6 changes: 6 additions & 0 deletions aeon/classification/distance_based/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@
"KNeighborsTimeSeriesClassifier",
"ProximityTree",
"ProximityForest",
"ProximityTree2",
"ProximityForest2",
]

from aeon.classification.distance_based._elastic_ensemble import ElasticEnsemble
from aeon.classification.distance_based._proximity_forest import ProximityForest
from aeon.classification.distance_based._proximity_forest_2 import (
ProximityForest2,
ProximityTree2,
)
from aeon.classification.distance_based._proximity_tree import ProximityTree
from aeon.classification.distance_based._time_series_neighbors import (
KNeighborsTimeSeriesClassifier,
Expand Down
316 changes: 316 additions & 0 deletions aeon/classification/distance_based/_distances.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,316 @@
"""Distances for Proximity Forest 2.0 Classifier."""

from typing import Any, Callable, Optional, TypedDict, Union

import numpy as np
from numba import njit
from typing_extensions import Unpack

from aeon.distances._bounding_matrix import create_bounding_matrix
from aeon.distances._lcss import lcss_distance
from aeon.distances._minkowski import _univariate_minkowski_distance, minkowski_distance


class DistanceKwargs(TypedDict, total=False):
window: Optional[float]
itakura_max_slope: Optional[float]
p: float
w: np.ndarray
epsilon: float
warp_penalty: float


DistanceFunction = Callable[[np.ndarray, np.ndarray, Any], float]


def distance(
x: np.ndarray,
y: np.ndarray,
metric: Union[str, DistanceFunction],
**kwargs: Unpack[DistanceKwargs],
) -> float:
r"""Compute the distance between two time series.

Sourced from the distance module to use in the PF 2.0 algorithm.

Parameters
----------
x : np.ndarray
First time series, either univariate, shape ``(n_timepoints,)``, or
multivariate, shape ``(n_channels, n_timepoints)``.
y : np.ndarray
Second time series, either univariate, shape ``(n_timepoints,)``, or
multivariate, shape ``(n_channels, n_timepoints)``.
metric : str or Callable
The distance metric to use.
A list of valid distance metrics can be found in the documentation for
:func:`aeon.distances.get_distance_function` or by calling the function
:func:`aeon.distances.get_distance_function_names`.
kwargs : Any
Arguments for metric. Refer to each metrics documentation for a list of
possible arguments.

Returns
-------
float
Distance between the x and y.

Raises
------
ValueError
If x and y are not 1D, or 2D arrays.
If metric is not a valid string or callable.
"""
if metric == "minkowski":
return minkowski_distance(x, y, kwargs.get("p", 2.0), kwargs.get("w", None))
elif metric == "dtw":
return _dtw_distance(
x,
y,
p=kwargs.get("p"),
window=kwargs.get("window"),
itakura_max_slope=kwargs.get("itakura_max_slope"),
)
elif metric == "lcss":
return lcss_distance(
x,
y,
kwargs.get("window"),
kwargs.get("epsilon", 1.0),
kwargs.get("itakura_max_slope"),
)
elif metric == "adtw":
return _adtw_distance(
x,
y,
p=kwargs.get("p"),
itakura_max_slope=kwargs.get("itakura_max_slope"),
window=kwargs.get("window"),
warp_penalty=kwargs.get("warp_penalty", 1.0),
)
else:
if isinstance(metric, Callable):
return metric(x, y, **kwargs)
raise ValueError("Metric must be one of the supported strings or a callable")


@njit(cache=True, fastmath=True)
def _dtw_distance(
x: np.ndarray,
y: np.ndarray,
p: float = 2.0,
window: Optional[float] = None,
itakura_max_slope: Optional[float] = None,
) -> float:
r"""Return parameterised DTW distance for PF 2.0.

DTW is the most widely researched and used elastic distance measure. It mitigates
distortions in the time axis by realligning (warping) the series to best match
each other. A good background into DTW can be found in [1]_. For two series,
possibly of unequal length,
:math:`\\mathbf{x}=\\{x_1,x_2,\\ldots,x_n\\}` and
:math:`\\mathbf{y}=\\{y_1,y_2, \\ldots,y_m\\}` DTW first calculates
:math:`M(\\mathbf{x},\\mathbf{y})`, the :math:`n \times m`
pointwise distance matrix between series :math:`\\mathbf{x}` and :math:`\\mathbf{y}`
, where :math:`M_{i,j}= (x_i-y_j)^p`.
A warping path
.. math::
P = <(e_1, f_1), (e_2, f_2), \\ldots, (e_s, f_s)>
is a set of pairs of indices that define a traversal of matrix :math:`M`. A
valid warping path must start at location :math:`(1,1)` and end at point :math:`(
n,m)` and not backtrack, i.e. :math:`0 \\leq e_{i+1}-e_{i} \\leq 1` and :math:`0
\\leq f_{i+1}- f_i \\leq 1` for all :math:`1< i < m`.
The DTW distance between series is the path through :math:`M` that minimizes the
total distance. The distance for any path :math:`P` of length :math:`s` is
.. math::
D_P(\\mathbf{x},\\mathbf{y}, M) =\\sum_{i=1}^s M_{e_i,f_i}
If :math:`\\mathcal{P}` is the space of all possible paths, the DTW path :math:`P^*`
is the path that has the minimum distance, hence the DTW distance between series is
.. math::
d_{dtw}(\\mathbf{x}, \\mathbf{x}) =D_{P*}(\\mathbf{x},\\mathbf{x}, M).
The optimal warping path :math:`P^*` can be found exactly through a dynamic
programming formulation. This can be a time consuming operation, and it is common to
put a restriction on the amount of warping allowed. This is implemented through
the bounding_matrix structure, that supplies a mask for allowable warpings.
The most common bounding strategies include the Sakoe-Chiba band [2]_. The width
of the allowed warping is controlled through the ``window`` parameter
which sets the maximum proportion of warping allowed.

Parameters
----------
x : np.ndarray
First time series, either univariate, shape ``(n_timepoints,)``, or
multivariate, shape ``(n_channels, n_timepoints)``.
y : np.ndarray
Second time series, either univariate, shape ``(n_timepoints,)``, or
multivariate, shape ``(n_channels, n_timepoints)``.
p : float, default=2.0
The order of the norm of the difference
(default is 2.0, which represents the Euclidean distance).
window : float or None, default=None
The window to use for the bounding matrix. If None, no bounding matrix
is used. window is a percentage deviation, so if ``window = 0.1`` then
10% of the series length is the max warping allowed.
is used.
itakura_max_slope : float, default=None
Maximum slope as a proportion of the number of time points used to create
Itakura parallelogram on the bounding matrix. Must be between 0. and 1.

Returns
-------
float
DTW distance between x and y, minimum value 0.

Raises
------
ValueError
If x and y are not 1D or 2D arrays.

References
----------
.. [1] Ratanamahatana C and Keogh E.: Three myths about dynamic time warping data
mining, Proceedings of 5th SIAM International Conference on Data Mining, 2005.
.. [2] Sakoe H. and Chiba S.: Dynamic programming algorithm optimization for
spoken word recognition. IEEE Transactions on Acoustics, Speech, and Signal
Processing 26(1):43–49, 1978.
"""
if x.ndim == 1 and y.ndim == 1:
_x = x.reshape((1, x.shape[0]))
_y = y.reshape((1, y.shape[0]))
bounding_matrix = create_bounding_matrix(
_x.shape[1], _y.shape[1], window, itakura_max_slope
)
return _dtw_cost_matrix(_x, _y, p, bounding_matrix)[
_x.shape[1] - 1, _y.shape[1] - 1
]
if x.ndim == 2 and y.ndim == 2:
bounding_matrix = create_bounding_matrix(
x.shape[1], y.shape[1], window, itakura_max_slope
)
return _dtw_cost_matrix(x, y, p, bounding_matrix)[
x.shape[1] - 1, y.shape[1] - 1
]
raise ValueError("x and y must be 1D or 2D")


@njit(cache=True, fastmath=True)
def _dtw_cost_matrix(
x: np.ndarray, y: np.ndarray, p: float, bounding_matrix: np.ndarray
) -> np.ndarray:
x_size = x.shape[1]
y_size = y.shape[1]
cost_matrix = np.full((x_size + 1, y_size + 1), np.inf)
cost_matrix[0, 0] = 0.0
_w = np.ones_like(x)
for i in range(x_size):
for j in range(y_size):
if bounding_matrix[i, j]:
cost_matrix[i + 1, j + 1] = _univariate_minkowski_distance(
x[:, i], y[:, j], p, _w[:, i]
) + min(
cost_matrix[i, j + 1],
cost_matrix[i + 1, j],
cost_matrix[i, j],
)

return cost_matrix[1:, 1:]


@njit(cache=True, fastmath=True)
def _adtw_distance(
x: np.ndarray,
y: np.ndarray,
p: float = 2.0,
window: Optional[float] = None,
itakura_max_slope: Optional[float] = None,
warp_penalty: float = 1.0,
) -> float:
"""Parameterised version of ADTW distance for PF 2.0.

Amercing Dynamic Time Warping (ADTW) [1]_ is a variant of DTW that uses a
explicit warping penalty to encourage or discourage warping. The warping
penalty is a constant value that is added to the cost of warping. A high
value will encourage the algorithm to warp less and if the value is low warping
is more likely.

Parameters
----------
x : np.ndarray
First time series, either univariate, shape ``(n_timepoints,)``, or
multivariate, shape ``(n_channels, n_timepoints)``.
y : np.ndarray
Second time series, either univariate, shape ``(n_timepoints,)``, or
multivariate, shape ``(n_channels, n_timepoints)``.
p : float, default=2.0
The order of the norm of the difference
(default is 2.0, which represents the Euclidean distance).
window : float or None, default=None
The window to use for the bounding matrix. If None, no bounding matrix
is used. window is a percentage deviation, so if ``window = 0.1`` then
10% of the series length is the max warping allowed.
itakura_max_slope : float, default=None
Maximum slope as a proportion of the number of time points used to create
Itakura parallelogram on the bounding matrix. Must be between 0.0 and 1.0
warp_penalty: float, default=1.0
Penalty for warping. A high value will mean less warping.

Returns
-------
float
ADTW distance between x and y, minimum value 0.

Raises
------
ValueError
If x and y are not 1D or 2D arrays.

References
----------
.. [1] Matthieu Herrmann, Geoffrey I. Webb: Amercing: An intuitive and effective
constraint for dynamic time warping, Pattern Recognition, Volume 137, 2023.
"""
if x.ndim == 1 and y.ndim == 1:
_x = x.reshape((1, x.shape[0]))
_y = y.reshape((1, y.shape[0]))
bounding_matrix = create_bounding_matrix(
_x.shape[1], _y.shape[1], window, itakura_max_slope
)
return _adtw_cost_matrix(_x, _y, p, bounding_matrix, warp_penalty)[
_x.shape[1] - 1, _y.shape[1] - 1
]
if x.ndim == 2 and y.ndim == 2:
bounding_matrix = create_bounding_matrix(
x.shape[1], y.shape[1], window, itakura_max_slope
)
return _adtw_cost_matrix(x, y, p, bounding_matrix, warp_penalty)[
x.shape[1] - 1, y.shape[1] - 1
]
raise ValueError("x and y must be 1D or 2D")


@njit(cache=True, fastmath=True)
def _adtw_cost_matrix(
x: np.ndarray,
y: np.ndarray,
p: float,
bounding_matrix: np.ndarray,
warp_penalty: float,
) -> np.ndarray:
x_size = x.shape[1]
y_size = y.shape[1]
cost_matrix = np.full((x_size + 1, y_size + 1), np.inf)
cost_matrix[0, 0] = 0.0

_w = np.ones_like(x)
for i in range(x_size):
for j in range(y_size):
if bounding_matrix[i, j]:
cost_matrix[i + 1, j + 1] = _univariate_minkowski_distance(
x[:, i], y[:, j], p, _w[:, i]
) + min(
cost_matrix[i, j + 1] + warp_penalty,
cost_matrix[i + 1, j] + warp_penalty,
cost_matrix[i, j],
)

return cost_matrix[1:, 1:]
Loading