Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First attempt of adding parallel functionality to the Scorer #95

Merged
merged 2 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) (+ the Migration Guide),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.25.0] -

### Added

- The Scorer class now has the ability to score datapoints in parallel.
This can be enabled by setting the `n_jobs` parameter of the `Scorer` class to something larger than 1.
(https://github.com/mad-lab-fau/tpcp/pull/95)

## [0.24.0] - 2023-09-08

For all changes in this release see: https://github.com/mad-lab-fau/tpcp/pull/85
Expand Down
28 changes: 26 additions & 2 deletions tests/test_pipelines/test_scorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,11 +293,12 @@ def test_no_agg_single_raises(self):

assert "Scorer returned a NoAgg object. " in str(e)

def test_score_return_val_multi_score_no_agg(self):
@pytest.mark.parametrize("n_jobs", (1, 2))
def test_score_return_val_multi_score_no_agg(self, n_jobs):
def multi_score_func(pipeline, data_point):
return {"score_1": data_point.group_labels[0], "no_agg_score": NoAgg(str(data_point.group_labels))}

scorer = Scorer(multi_score_func)
scorer = Scorer(multi_score_func, n_jobs=n_jobs)
pipe = DummyOptimizablePipeline()
data = DummyDataset()
agg, single = scorer(pipe, data)
Expand Down Expand Up @@ -381,3 +382,26 @@ def score_func(x, y):
data = DummyDataset()
_ = scorer(pipe, data)
assert mock_method.called_with(values=list(data), datapoints=list(data))

@pytest.mark.parametrize("n_jobs", (1, 2))
def test_single_value_callback_called_correctly(self, n_jobs):
"""This tests that the callback is called in the main thread and not in the parallel threads."""

def score_func(x, y):
return y.group_label.value

thread_local_step = []
thread_local_scores = []

def callback(step, scores, **_):
thread_local_step.append(step)
thread_local_scores.append(scores[-1])

scorer = Scorer(score_func, n_jobs=n_jobs, single_score_callback=callback)
pipe = DummyOptimizablePipeline()
data = DummyDataset()

_ = scorer(pipe, data)

assert thread_local_step == list(range(len(data)))
assert thread_local_scores == [d.group_label.value for d in data]
57 changes: 42 additions & 15 deletions tpcp/validate/_scorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
)

import numpy as np
from joblib import Parallel
from typing_extensions import Protocol

from tpcp import NOTHING
from tpcp._base import _Nothing
from tpcp._dataset import Dataset, DatasetT
from tpcp._pipeline import Pipeline, PipelineT
from tpcp.exceptions import ScorerFailedError, ValidationError
from tpcp.parallel import delayed

T = TypeVar("T")
AggReturnType = Union[float, Dict[str, float], _Nothing]
Expand Down Expand Up @@ -154,6 +156,19 @@ class Scorer(Generic[PipelineT, DatasetT, T]):
>>> def callback(*, step: int, pipeline: Pipeline, **_):
... ...

n_jobs
The number of parallel jobs to run.
Each job will run on a single data point.
Note, that the single_score_callback will still be called in the main thread, after a job is finished.
However, it could be that multiple jobs are finished before the callback is called.
The callback is still gurateed to be called in the order of the data points.
If None, no parallelization is used.
verbose
Controls the verbosity of the parallelization.
See :class:`joblib.Parallel` for more details.
pre_dispatch
Controls the number of jobs that get dispatched during parallelization.
See :class:`joblib.Parallel` for more details.
kwargs
Additional arguments that might be used by the scorer.
These are ignored for the base scorer.
Expand All @@ -163,19 +178,29 @@ class Scorer(Generic[PipelineT, DatasetT, T]):
kwargs: Dict[str, Any]
_score_func: ScoreFunc[PipelineT, DatasetT, T]
_single_score_func: Optional[ScoreCallback[PipelineT, DatasetT, T]]
_parallel_kwargs: Dict[str, Any]

def __init__(
self,
score_func: ScoreFunc[PipelineT, DatasetT, ScoreTypeT[T]],
*,
default_aggregator: Type[Aggregator[T]] = MeanAggregator,
single_score_callback: Optional[ScoreCallback[PipelineT, DatasetT, T]] = None,
# Multiprocess_kwargs
n_jobs: Optional[int] = None,
verbose: int = 0,
pre_dispatch: Union[str, int] = "2*n_jobs",
**kwargs: Any,
) -> None:
self.kwargs = kwargs
self._score_func = score_func
self._default_aggregator = default_aggregator
self._single_score_callback = single_score_callback
self._parallel_kwargs = {
"n_jobs": n_jobs,
"verbose": verbose,
"pre_dispatch": pre_dispatch,
}

# The typing for IndividualScoreType here is not perfect, but not sure how to fix.
# For the aggregated scores, we can easily parameterize the value based on the generic, but not for the single
Expand Down Expand Up @@ -251,10 +276,7 @@ def _aggregate( # noqa: C901, PLR0912
return agg_scores, raw_scores

def _score(self, pipeline: PipelineT, dataset: DatasetT):
# `float` because the return value in case of an exception will always be float
scores: List[ScoreTypeT[T]] = []
datapoints: List[DatasetT] = []
for i, d in enumerate(dataset):
def per_datapoint(i, d):
try:
# We need to clone here again, to make sure that the run for each data point is truly independent.
score = self._score_func(pipeline.clone(), d)
Expand All @@ -267,19 +289,24 @@ def _score(self, pipeline: PipelineT, dataset: DatasetT):
"The original exception was:\n\n"
f"{traceback.format_exc()}"
) from e
return i, score

scores.append(score)
if self._single_score_callback:
self._single_score_callback(
step=i,
scores=tuple(scores),
scorer=self,
pipeline=pipeline,
dataset=dataset,
)
datapoints.append(d)
scores: List[ScoreTypeT[T]] = []

parallel = Parallel(**self._parallel_kwargs, return_as="generator")
with parallel:
for i, r in parallel(delayed(per_datapoint)(i, d) for i, d in enumerate(dataset)):
scores.append(r)
if self._single_score_callback:
self._single_score_callback(
step=i,
scores=tuple(scores),
scorer=self,
pipeline=pipeline,
dataset=dataset,
)

return self._aggregate(_check_and_invert_score_dict(scores, self._default_aggregator), datapoints)
return self._aggregate(_check_and_invert_score_dict(scores, self._default_aggregator), list(dataset))


ScorerTypes = Union[ScoreFunc[PipelineT, DatasetT, ScoreTypeT[T]], Scorer[PipelineT, DatasetT, ScoreTypeT[T]], None]
Expand Down