Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MNT] Improve multithreading testing #2317

Merged
merged 11 commits into from
Nov 22, 2024
1 change: 1 addition & 0 deletions aeon/anomaly_detection/_copod.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class COPOD(PyODAdapter):
"capability:multivariate": True,
"capability:univariate": True,
"capability:missing_values": False,
"capability:multithreading": True,
"fit_is_empty": False,
"python_dependencies": ["pyod"],
}
Expand Down
1 change: 1 addition & 0 deletions aeon/anomaly_detection/_iforest.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class IsolationForest(PyODAdapter):
"capability:multivariate": True,
"capability:univariate": True,
"capability:missing_values": False,
"capability:multithreading": True,
"fit_is_empty": False,
"python_dependencies": ["pyod"],
}
Expand Down
12 changes: 0 additions & 12 deletions aeon/anomaly_detection/tests/test_left_stampi.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,15 +309,3 @@ def test_the_number_of_distances_k_defaults_to_1_and_can_be_changed(
],
any_order=True,
)

def test_it_checks_soft_dependencies(self, mocker):
"""Unit testing the dependency check."""
# given
deps_checker_stub = mocker.patch(
"aeon.base._base_series._check_estimator_deps", return_value=True
)
# deps_checker_stub.return_value = True
ad = LeftSTAMPi(window_size=5, n_init_train=10)

# then
deps_checker_stub.assert_called_once_with(ad)
9 changes: 5 additions & 4 deletions aeon/base/_base_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@


class BaseCollectionEstimator(BaseAeonEstimator):
"""Base class for estimators that use collections of time series for method fit.
"""Base class for estimators that use collections of time series for ``fit``.

Provides functions that are common to BaseClassifier, BaseRegressor,
BaseClusterer and BaseCollectionTransformer for the checking and
conversion of input to fit, predict and predict_proba, where relevant.
Provides functions that are common to estimators which use colections such as
``BaseClassifier``, ``BaseRegressor``, ``BaseClusterer``, ``BaseSimilaritySearch``
and ``BaseCollectionTransformer``. Functionality includes checking and
conversion of input to ``fit, predict and predict_proba, where relevant.

It also stores the common default tags used by all the subclasses and meta data
describing the characteristics of time series passed to ``fit``.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class KNeighborsTimeSeriesClassifier(BaseClassifier):
_tags = {
"capability:multivariate": True,
"capability:unequal_length": True,
"capability:multithreading": True,
"X_inner_type": ["np-list", "numpy3D"],
"algorithm_type": "distance",
}
Expand Down
2 changes: 1 addition & 1 deletion aeon/classification/shapelet_based/_rsast.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def __init__(
nb_inst_per_class=10,
seed=None,
classifier=None,
n_jobs=-1,
n_jobs=1,
):
super().__init__()
self.n_random_points = n_random_points
Expand Down
2 changes: 1 addition & 1 deletion aeon/classification/shapelet_based/_sast.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def __init__(
nb_inst_per_class: int = 1,
seed: Optional[int] = None,
classifier=None,
n_jobs: int = -1,
n_jobs: int = 1,
) -> None:
super().__init__()
self.length_list = length_list
Expand Down
7 changes: 2 additions & 5 deletions aeon/clustering/_kernel_k_means.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class TimeSeriesKernelKMeans(BaseClusterer):

_tags = {
"capability:multivariate": True,
"capability:multithreading": True,
"python_dependencies": "tslearn",
}

Expand All @@ -97,7 +98,7 @@ def __init__(
tol: float = 1e-4,
kernel_params: Union[dict, None] = None,
verbose: bool = False,
n_jobs: Union[int, None] = None,
n_jobs: Union[int, None] = 1,
random_state: Optional[Union[int, RandomState]] = None,
):
self.kernel = kernel
Expand Down Expand Up @@ -199,10 +200,6 @@ def _get_test_params(cls, parameter_set="default") -> dict:
"n_init": 1,
"max_iter": 1,
"tol": 0.0001,
"kernel_params": None,
"verbose": False,
"n_jobs": 1,
"random_state": 1,
}

def _score(self, X, y=None) -> float:
Expand Down
1 change: 1 addition & 0 deletions aeon/regression/distance_based/_time_series_neighbors.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class KNeighborsTimeSeriesRegressor(BaseRegressor):
_tags = {
"capability:multivariate": True,
"capability:unequal_length": True,
"capability:multithreading": True,
"X_inner_type": ["np-list", "numpy3D"],
"algorithm_type": "distance",
}
Expand Down
2 changes: 1 addition & 1 deletion aeon/segmentation/_clasp.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ class ClaSPSegmenter(BaseSegmenter):
>>> scores = clasp.scores
"""

_tags = {"fit_is_empty": True} # for unit test cases
_tags = {"capability:multithreading": True, "fit_is_empty": True}

def __init__(self, period_length=10, n_cps=1, exclusion_radius=0.05, n_jobs=1):
self.period_length = int(period_length)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def _yield_classification_checks(estimator_class, estimator_instances, datatypes
# test class instances
for i, estimator in enumerate(estimator_instances):
# data type irrelevant
if _get_tag(estimator_class, "capability:train_estimate", raise_error=True):
if _get_tag(estimator, "capability:train_estimate", raise_error=True):
yield partial(
check_classifier_train_estimate,
estimator=estimator,
Expand Down
44 changes: 7 additions & 37 deletions aeon/testing/estimator_checking/_yield_estimator_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@
from aeon.testing.estimator_checking._yield_early_classification_checks import (
_yield_early_classification_checks,
)
from aeon.testing.estimator_checking._yield_multithreading_checks import (
_yield_multithreading_checks,
)
from aeon.testing.estimator_checking._yield_regression_checks import (
_yield_regression_checks,
)
Expand Down Expand Up @@ -118,6 +121,10 @@ def _yield_all_aeon_checks(
estimator_class, estimator_instances, datatypes
)

yield from _yield_multithreading_checks(
estimator_class, estimator_instances, datatypes
)

if issubclass(estimator_class, BaseClassifier):
yield from _yield_classification_checks(
estimator_class, estimator_instances, datatypes
Expand Down Expand Up @@ -703,41 +710,4 @@ def check_fit_deterministic(estimator, datatype):
err_msg=f"Running {method} after fit twice with test "
f"parameters gives different results.",
)

i += 1


# def check_multiprocessing_idempotent(estimator):
# """Test that single and multi-process run results are identical.
#
# Check that running an estimator on a single process is no different to running
# it on multiple processes. We also check that we can set n_jobs=-1 to make use
# of all CPUs. The test is not really necessary though, as we rely on joblib for
# parallelization and can trust that it works as expected.
# """
# method_nsc = method_nsc_arraylike
# params = estimator_instance.get_params()
#
# if "n_jobs" in params:
# # run on a single process
# # -----------------------
# estimator = deepcopy(estimator_instance)
# estimator.set_params(n_jobs=1)
# set_random_state(estimator)
# result_single_process = scenario.run(
# estimator, method_sequence=["fit", method_nsc]
# )
#
# # run on multiple processes
# # -------------------------
# estimator = deepcopy(estimator_instance)
# estimator.set_params(n_jobs=-1)
# set_random_state(estimator)
# result_multiple_process = scenario.run(
# estimator, method_sequence=["fit", method_nsc]
# )
# _assert_array_equal(
# result_single_process,
# result_multiple_process,
# err_msg="Results are not equal for n_jobs=1 and n_jobs=-1",
# )
115 changes: 115 additions & 0 deletions aeon/testing/estimator_checking/_yield_multithreading_checks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import inspect
from functools import partial

from numpy.testing import assert_array_almost_equal

from aeon.base._base import _clone_estimator
from aeon.testing.testing_config import (
MULTITHREAD_TESTING,
NON_STATE_CHANGING_METHODS_ARRAYLIKE,
)
from aeon.testing.utils.estimator_checks import _get_tag, _run_estimator_method
from aeon.utils.validation import check_n_jobs


def _yield_multithreading_checks(estimator_class, estimator_instances, datatypes):
"""Yield all multithreading checks for an aeon estimator."""
can_thread = _get_tag(estimator_class, "capability:multithreading")

# only class required
if can_thread:
yield partial(check_multithreading_param, estimator_class=estimator_class)
else:
yield partial(check_no_multithreading_param, estimator_class=estimator_class)

if can_thread and MULTITHREAD_TESTING:
# test class instances
for i, estimator in enumerate(estimator_instances):
# test all data types
for datatype in datatypes[i]:
yield partial(
check_estimator_multithreading,
estimator=estimator,
datatype=datatype,
)
MatthewMiddlehurst marked this conversation as resolved.
Show resolved Hide resolved


def check_multithreading_param(estimator_class):
"""Test that estimators that can multithread have a n_jobs parameter."""
default_params = inspect.signature(estimator_class.__init__).parameters
n_jobs = default_params.get("n_jobs", None)

# check that the estimator has a n_jobs parameter
if n_jobs is None:
raise ValueError(
f"{estimator_class} which sets "
"capability:multithreading=True must have a n_jobs parameter."
)

# check that the default value is to use 1 thread
if n_jobs.default != 1:
raise ValueError(
"n_jobs parameter must have a default value of 1, "
"disabling multithreading by default."
)

# test parameters should not change the default value
params = estimator_class._get_test_params()
if not isinstance(params, list):
params = [params]
for param_set in params:
assert "n_jobs" not in param_set


def check_no_multithreading_param(estimator_class):
"""Test that estimators that cant multithread have no n_jobs parameter."""
default_params = inspect.signature(estimator_class.__init__).parameters

# check that the estimator does not have a n_jobs parameter
if default_params.get("n_jobs", None) is not None:
raise ValueError(
f"{estimator_class} has a n_jobs parameter, but does not set "
"capability:multithreading=True in its tags."
)


def check_estimator_multithreading(estimator, datatype):
"""Test that multithreaded estimators store n_jobs_ and produce same results."""
st_estimator = _clone_estimator(estimator, random_state=42)
mt_estimator = _clone_estimator(estimator, random_state=42)
n_jobs = max(2, check_n_jobs(-2))
mt_estimator.set_params(n_jobs=n_jobs)

# fit and get results for single thread estimator
_run_estimator_method(st_estimator, "fit", datatype, "train")

results = []
for method in NON_STATE_CHANGING_METHODS_ARRAYLIKE:
if hasattr(st_estimator, method) and callable(getattr(estimator, method)):
output = _run_estimator_method(st_estimator, method, datatype, "test")
results.append(output)

# fit multithreaded estimator
_run_estimator_method(mt_estimator, "fit", datatype, "train")

# check n_jobs_ attribute is set
assert mt_estimator.n_jobs_ == n_jobs, (
f"Multithreaded estimator {mt_estimator} does not store n_jobs_ "
f"attribute correctly. Expected {n_jobs}, got {mt_estimator.n_jobs_}."
f"It is recommended to use the check_n_jobs function to set n_jobs_ and use"
f"this for any multithreading."
)
MatthewMiddlehurst marked this conversation as resolved.
Show resolved Hide resolved

# compare results from single and multithreaded estimators
i = 0
for method in NON_STATE_CHANGING_METHODS_ARRAYLIKE:
if hasattr(estimator, method) and callable(getattr(estimator, method)):
output = _run_estimator_method(estimator, method, datatype, "test")

assert_array_almost_equal(
output,
results[i],
err_msg=f"Running {method} after fit twice with test "
f"parameters gives different results.",
)
i += 1
3 changes: 3 additions & 0 deletions aeon/testing/testing_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
# whether to use smaller parameter matrices for test generation and subsample estimators
# per os/version default is False, can be set to True by pytest --prtesting True flag
PR_TESTING = False
# whether to use multithreading in tests, can be set to True by pytest
# --enablethreading True flag
MULTITHREAD_TESTING = False

# Exclude estimators here for short term fixes
EXCLUDE_ESTIMATORS = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class MiniRocket(BaseCollectionTransformer):
"output_data_type": "Tabular",
"algorithm_type": "convolution",
"capability:multivariate": True,
"capability:multithreading": True,
}
# indices for the 84 kernels used by MiniRocket
_indices = np.array([_ for _ in combinations(np.arange(9), 3)], dtype=np.int32)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class MiniRocketMultivariateVariable(BaseCollectionTransformer):
"output_data_type": "Tabular",
"capability:multivariate": True,
"capability:unequal_length": True,
"capability:multithreading": True,
"X_inner_type": "np-list",
"algorithm_type": "convolution",
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class MultiRocket(BaseCollectionTransformer):
"output_data_type": "Tabular",
"algorithm_type": "convolution",
"capability:multivariate": True,
"capability:multithreading": True,
}
# indices for the 84 kernels used by MiniRocket
_indices = np.array([_ for _ in combinations(np.arange(9), 3)], dtype=np.int32)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class Rocket(BaseCollectionTransformer):
_tags = {
"output_data_type": "Tabular",
"capability:multivariate": True,
"capability:multithreading": True,
"algorithm_type": "convolution",
}

Expand Down
1 change: 1 addition & 0 deletions aeon/transformations/collection/dictionary_based/_sfa.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class SFA(BaseCollectionTransformer):

_tags = {
"requires_y": False, # SFA is unsupervised for equi-depth and equi-width bins
"capability:multithreading": True,
"algorithm_type": "dictionary",
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ class SFAFast(BaseCollectionTransformer):

_tags = {
"requires_y": False, # SFA is unsupervised for equi-depth and equi-width bins
"capability:multithreading": True,
"algorithm_type": "dictionary",
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ class Catch22(BaseCollectionTransformer):
"X_inner_type": ["np-list", "numpy3D"],
"capability:unequal_length": True,
"capability:multivariate": True,
"capability:multithreading": True,
"fit_is_empty": True,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class _TSFresh(BaseCollectionTransformer):
_tags = {
"output_data_type": "Tabular",
"capability:multivariate": True,
"capability:multithreading": True,
"fit_is_empty": True,
"python_dependencies": "tsfresh",
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class RandomIntervals(BaseCollectionTransformer):
_tags = {
"output_data_type": "Tabular",
"capability:multivariate": True,
"capability:multithreading": True,
"fit_is_empty": False,
"algorithm_type": "interval",
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ class SupervisedIntervals(BaseCollectionTransformer):
_tags = {
"output_data_type": "Tabular",
"capability:multivariate": True,
"capability:multithreading": True,
"requires_y": True,
"algorithm_type": "interval",
}
Expand Down
Loading