From e273bf5e8249b8aaba8a2c6fb9b745d40648fcc8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arne=20K=C3=BCderle?= Date: Tue, 26 Sep 2023 13:19:25 +0200 Subject: [PATCH 1/2] First attempt of adding parallel functionality to the Scorer --- tests/test_pipelines/test_scorer.py | 28 +++++++++++++- tpcp/validate/_scorer.py | 57 +++++++++++++++++++++-------- 2 files changed, 68 insertions(+), 17 deletions(-) diff --git a/tests/test_pipelines/test_scorer.py b/tests/test_pipelines/test_scorer.py index 0bf71027..19262033 100644 --- a/tests/test_pipelines/test_scorer.py +++ b/tests/test_pipelines/test_scorer.py @@ -293,11 +293,12 @@ def test_no_agg_single_raises(self): assert "Scorer returned a NoAgg object. " in str(e) - def test_score_return_val_multi_score_no_agg(self): + @pytest.mark.parametrize("n_jobs", (1, 2)) + def test_score_return_val_multi_score_no_agg(self, n_jobs): def multi_score_func(pipeline, data_point): return {"score_1": data_point.group_labels[0], "no_agg_score": NoAgg(str(data_point.group_labels))} - scorer = Scorer(multi_score_func) + scorer = Scorer(multi_score_func, n_jobs=n_jobs) pipe = DummyOptimizablePipeline() data = DummyDataset() agg, single = scorer(pipe, data) @@ -381,3 +382,26 @@ def score_func(x, y): data = DummyDataset() _ = scorer(pipe, data) assert mock_method.called_with(values=list(data), datapoints=list(data)) + + @pytest.mark.parametrize("n_jobs", (1, 2)) + def test_single_value_callback_called_correctly(self, n_jobs): + """This tests that the callback is called in the main thread and not in the parallel threads.""" + + def score_func(x, y): + return y.group_label.value + + thread_local_step = [] + thread_local_scores = [] + + def callback(step, scores, **_): + thread_local_step.append(step) + thread_local_scores.append(scores[-1]) + + scorer = Scorer(score_func, n_jobs=n_jobs, single_score_callback=callback) + pipe = DummyOptimizablePipeline() + data = DummyDataset() + + _ = scorer(pipe, data) + + assert thread_local_step == list(range(len(data))) + assert thread_local_scores == [d.group_label.value for d in data] diff --git a/tpcp/validate/_scorer.py b/tpcp/validate/_scorer.py index c772e7fc..ca89627b 100644 --- a/tpcp/validate/_scorer.py +++ b/tpcp/validate/_scorer.py @@ -20,6 +20,7 @@ ) import numpy as np +from joblib import Parallel from typing_extensions import Protocol from tpcp import NOTHING @@ -27,6 +28,7 @@ from tpcp._dataset import Dataset, DatasetT from tpcp._pipeline import Pipeline, PipelineT from tpcp.exceptions import ScorerFailedError, ValidationError +from tpcp.parallel import delayed T = TypeVar("T") AggReturnType = Union[float, Dict[str, float], _Nothing] @@ -154,6 +156,19 @@ class Scorer(Generic[PipelineT, DatasetT, T]): >>> def callback(*, step: int, pipeline: Pipeline, **_): ... ... + n_jobs + The number of parallel jobs to run. + Each job will run on a single data point. + Note, that the single_score_callback will still be called in the main thread, after a job is finished. + However, it could be that multiple jobs are finished before the callback is called. + The callback is still gurateed to be called in the order of the data points. + If None, no parallelization is used. + verbose + Controls the verbosity of the parallelization. + See :class:`joblib.Parallel` for more details. + pre_dispatch + Controls the number of jobs that get dispatched during parallelization. + See :class:`joblib.Parallel` for more details. kwargs Additional arguments that might be used by the scorer. These are ignored for the base scorer. @@ -163,6 +178,7 @@ class Scorer(Generic[PipelineT, DatasetT, T]): kwargs: Dict[str, Any] _score_func: ScoreFunc[PipelineT, DatasetT, T] _single_score_func: Optional[ScoreCallback[PipelineT, DatasetT, T]] + _parallel_kwargs: Dict[str, Any] def __init__( self, @@ -170,12 +186,21 @@ def __init__( *, default_aggregator: Type[Aggregator[T]] = MeanAggregator, single_score_callback: Optional[ScoreCallback[PipelineT, DatasetT, T]] = None, + # Multiprocess_kwargs + n_jobs: Optional[int] = None, + verbose: int = 0, + pre_dispatch: Union[str, int] = "2*n_jobs", **kwargs: Any, ) -> None: self.kwargs = kwargs self._score_func = score_func self._default_aggregator = default_aggregator self._single_score_callback = single_score_callback + self._parallel_kwargs = { + "n_jobs": n_jobs, + "verbose": verbose, + "pre_dispatch": pre_dispatch, + } # The typing for IndividualScoreType here is not perfect, but not sure how to fix. # For the aggregated scores, we can easily parameterize the value based on the generic, but not for the single @@ -251,10 +276,7 @@ def _aggregate( # noqa: C901, PLR0912 return agg_scores, raw_scores def _score(self, pipeline: PipelineT, dataset: DatasetT): - # `float` because the return value in case of an exception will always be float - scores: List[ScoreTypeT[T]] = [] - datapoints: List[DatasetT] = [] - for i, d in enumerate(dataset): + def per_datapoint(i, d): try: # We need to clone here again, to make sure that the run for each data point is truly independent. score = self._score_func(pipeline.clone(), d) @@ -267,19 +289,24 @@ def _score(self, pipeline: PipelineT, dataset: DatasetT): "The original exception was:\n\n" f"{traceback.format_exc()}" ) from e + return i, score - scores.append(score) - if self._single_score_callback: - self._single_score_callback( - step=i, - scores=tuple(scores), - scorer=self, - pipeline=pipeline, - dataset=dataset, - ) - datapoints.append(d) + scores: List[ScoreTypeT[T]] = [] + + parallel = Parallel(**self._parallel_kwargs, return_as="generator") + with parallel: + for i, r in parallel(delayed(per_datapoint)(i, d) for i, d in enumerate(dataset)): + scores.append(r) + if self._single_score_callback: + self._single_score_callback( + step=i, + scores=tuple(scores), + scorer=self, + pipeline=pipeline, + dataset=dataset, + ) - return self._aggregate(_check_and_invert_score_dict(scores, self._default_aggregator), datapoints) + return self._aggregate(_check_and_invert_score_dict(scores, self._default_aggregator), list(dataset)) ScorerTypes = Union[ScoreFunc[PipelineT, DatasetT, ScoreTypeT[T]], Scorer[PipelineT, DatasetT, ScoreTypeT[T]], None] From 6cef4feeba2c0c4687b72c01bae3a1a9ddb8b2ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arne=20K=C3=BCderle?= Date: Tue, 26 Sep 2023 13:22:52 +0200 Subject: [PATCH 2/2] Added changelog --- CHANGELOG.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a3799264..ee02a01c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,14 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) (+ the Migration Guide), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.25.0] - + +### Added + +- The Scorer class now has the ability to score datapoints in parallel. + This can be enabled by setting the `n_jobs` parameter of the `Scorer` class to something larger than 1. + (https://github.com/mad-lab-fau/tpcp/pull/95) + ## [0.24.0] - 2023-09-08 For all changes in this release see: https://github.com/mad-lab-fau/tpcp/pull/85