Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add periodic_callable #209

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion adaptive_scheduler/_server_support/job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
from .common import MaxRestartsReachedError, log

if TYPE_CHECKING:
from typing import Callable

from adaptive.learner.base_learner import LearnerType

from adaptive_scheduler.scheduler import BaseScheduler
from adaptive_scheduler.utils import (
_DATAFRAME_FORMATS,
Expand All @@ -31,6 +35,7 @@ def command_line_options(
save_interval: float = 300,
save_dataframe: bool = True,
dataframe_format: _DATAFRAME_FORMATS = "pickle",
periodic_callable: tuple[Callable[[str, LearnerType], None], int] | None = None,
loky_start_method: LOKY_START_METHODS = "loky",
) -> dict[str, Any]:
"""Return the command line options for the job_script.
Expand Down Expand Up @@ -58,6 +63,9 @@ def command_line_options(
Whether to periodically save the learner's data as a `pandas.DataFame`.
dataframe_format
The format in which to save the `pandas.DataFame`. See the type hint for the options.
periodic_callable
A tuple of a callable and an interval in seconds. The callable will be called
every `interval` seconds and takes the learner name and learner as arguments.
loky_start_method
Loky start method, by default "loky".

Expand All @@ -71,12 +79,14 @@ def command_line_options(
runner_kwargs = {}
runner_kwargs["goal"] = goal
base64_runner_kwargs = _serialize_to_b64(runner_kwargs)
base64_periodic_callable = _serialize_to_b64(periodic_callable)

opts = {
"--url": database_manager.url,
"--log-interval": log_interval,
"--save-interval": save_interval,
"--serialized-runner-kwargs": base64_runner_kwargs,
"--serialized-periodic-callable": base64_periodic_callable,
}
if scheduler.single_job_script:
# if `cores` or `executor_type` is a tuple then we set it
Expand Down Expand Up @@ -125,6 +135,9 @@ class JobManager(BaseManager):
Whether to periodically save the learner's data as a `pandas.DataFame`.
dataframe_format
The format in which to save the `pandas.DataFame`. See the type hint for the options.
periodic_callable
A tuple of a callable and an interval in seconds. The callable will be called
every `interval` seconds and takes the learner name and learner as arguments.
loky_start_method
Loky start method, by default "loky".
log_interval
Expand Down Expand Up @@ -160,6 +173,7 @@ def __init__(
# Command line launcher options
save_dataframe: bool = True,
dataframe_format: _DATAFRAME_FORMATS = "pickle",
periodic_callable: tuple[Callable[[str, LearnerType], None], int] | None = None,
loky_start_method: LOKY_START_METHODS = "loky",
log_interval: float = 60,
save_interval: float = 300,
Expand All @@ -185,6 +199,7 @@ def __init__(
self.save_interval = save_interval
self.runner_kwargs = runner_kwargs
self.goal = goal
self.periodic_callable = periodic_callable

@property
def max_job_starts(self) -> int:
Expand All @@ -208,6 +223,7 @@ def _setup(self) -> None:
save_interval=self.save_interval,
save_dataframe=self.save_dataframe,
dataframe_format=self.dataframe_format,
periodic_callable=self.periodic_callable,
goal=self.goal,
loky_start_method=self.loky_start_method,
)
Expand Down Expand Up @@ -245,7 +261,8 @@ async def _start_new_jobs(
queued.add(job_name)
index, fname = self.database_manager._choose_fname(job_name)
log.debug(
f"Starting `job_name={job_name}` with `index={index}` and `fname={fname}`",
f"Starting `job_name={job_name}` with "
f"`index={index}` and `fname={fname}`",
)
await loop.run_in_executor(
ex,
Expand Down
14 changes: 14 additions & 0 deletions adaptive_scheduler/_server_support/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import argparse
import os
from contextlib import suppress
from functools import partial
from typing import TYPE_CHECKING, Any, get_args

import adaptive
Expand Down Expand Up @@ -107,6 +108,7 @@
default=120,
)
parser.add_argument("--serialized-runner-kwargs", action="store", type=str)
parser.add_argument("--serialized-periodic-callable", action="store", type=str)
return parser.parse_args()


Expand Down Expand Up @@ -143,6 +145,7 @@
)

runner_kwargs = _deserialize_from_b64(args.serialized_runner_kwargs)
periodic_callable = _deserialize_from_b64(args.serialized_periodic_callable)

Check warning on line 148 in adaptive_scheduler/_server_support/launcher.py

View check run for this annotation

Codecov / codecov/patch

adaptive_scheduler/_server_support/launcher.py#L148

Added line #L148 was not covered by tests

runner_kwargs.setdefault("shutdown_executor", True)
runner = adaptive.Runner(learner, executor=executor, **runner_kwargs)
Expand All @@ -156,6 +159,13 @@
save_method = save_dataframe(fname, format=args.dataframe_format)
runner.start_periodic_saving(interval=args.save_interval, method=save_method)

if periodic_callable:
_callable, save_interval = periodic_callable
runner.start_periodic_saving(

Check warning on line 164 in adaptive_scheduler/_server_support/launcher.py

View check run for this annotation

Codecov / codecov/patch

adaptive_scheduler/_server_support/launcher.py#L162-L164

Added lines #L162 - L164 were not covered by tests
interval=save_interval,
method=partial(_callable, fname),
)

# log progress info in the job output script, optional
_log_task = client_support.log_info(runner, interval=args.log_interval)

Expand All @@ -168,6 +178,10 @@
if args.save_dataframe:
save_method(learner)

if periodic_callable:
_callable, _ = periodic_callable
_callable(fname, learner)

Check warning on line 183 in adaptive_scheduler/_server_support/launcher.py

View check run for this annotation

Codecov / codecov/patch

adaptive_scheduler/_server_support/launcher.py#L181-L183

Added lines #L181 - L183 were not covered by tests

# log once more after the runner is done
client_support.log_now(runner, npoints_start)

Expand Down
32 changes: 24 additions & 8 deletions adaptive_scheduler/_server_support/run_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

if TYPE_CHECKING:
import adaptive
from adaptive.learner.base_learner import LearnerType

from adaptive_scheduler.scheduler import BaseScheduler
from adaptive_scheduler.utils import _DATAFRAME_FORMATS
Expand Down Expand Up @@ -105,8 +106,15 @@
Whether to periodically save the learner's data as a `pandas.DataFame`.
dataframe_format
The format in which to save the `pandas.DataFame`. See the type hint for the options.
periodic_callable
A tuple of a callable and an interval in seconds. The callable will be called
every `interval` seconds and takes the learner name and learner as arguments.
max_log_lines
The maximum number of lines to display in the log viewer widget.
max_fails_per_job
The maximum number of times a job can fail before it is not restarted.
max_simultaneous_jobs
The maximum number of jobs that can run simultaneously.

Attributes
----------
Expand Down Expand Up @@ -152,7 +160,7 @@

"""

def __init__(
def __init__( # noqa: PLR0915
self,
scheduler: BaseScheduler,
learners: list[adaptive.BaseLearner],
Expand All @@ -177,6 +185,7 @@
cleanup_first: bool = False,
save_dataframe: bool = False,
dataframe_format: _DATAFRAME_FORMATS = "pickle",
periodic_callable: tuple[Callable[[str, LearnerType], None], int] | None = None,
max_log_lines: int = 500,
max_fails_per_job: int = 50,
max_simultaneous_jobs: int = 100,
Expand All @@ -203,6 +212,7 @@
self.loky_start_method = loky_start_method
self.save_dataframe = save_dataframe
self.dataframe_format = dataframe_format
self.periodic_callable = periodic_callable
self.max_log_lines = max_log_lines
self.max_fails_per_job = max_fails_per_job
self.max_simultaneous_jobs = max_simultaneous_jobs
Expand All @@ -224,10 +234,13 @@
# Set in methods
self.start_time: float | None = None
self.end_time: float | None = None
self._start_one_by_one_task: tuple[
asyncio.Future,
list[asyncio.Task],
] | None = None
self._start_one_by_one_task: (
tuple[
asyncio.Future,
list[asyncio.Task],
]
| None
) = None

# Set on init
self.learners = learners
Expand Down Expand Up @@ -265,6 +278,7 @@
# Launcher command line options
save_dataframe=self.save_dataframe,
dataframe_format=self.dataframe_format,
periodic_callable=self.periodic_callable,
loky_start_method=self.loky_start_method,
log_interval=self.log_interval,
save_interval=self.save_interval,
Expand Down Expand Up @@ -301,7 +315,8 @@
self.kill_manager.start()
self.start_time = time.time()

def start(self, wait_for: RunManager | None = None) -> RunManager: # type: ignore[override]
# type: ignore[override]
def start(self, wait_for: RunManager | None = None) -> RunManager:
"""Start the RunManager and optionally wait for another RunManager to finish."""
if wait_for is not None:
self._start_one_by_one_task = start_one_by_one(wait_for, self)
Expand Down Expand Up @@ -467,12 +482,13 @@
def info(self) -> None:
return info(self)

def load_dataframes(self) -> pd.DataFrame:
def load_dataframes(self) -> pd.DataFrame | list[pd.DataFrame]:
"""Load the `pandas.DataFrame`s with the most recently saved learners data."""
if not self.save_dataframe:
msg = "The `save_dataframe` option was not set to True."
raise ValueError(msg)
return load_dataframes(self.fnames, format=self.dataframe_format) # type: ignore[return-value]
# type: ignore[return-value]
return load_dataframes(self.fnames, format=self.dataframe_format)

Check warning on line 491 in adaptive_scheduler/_server_support/run_manager.py

View check run for this annotation

Codecov / codecov/patch

adaptive_scheduler/_server_support/run_manager.py#L491

Added line #L491 was not covered by tests


async def _wait_for_finished(
Expand Down
6 changes: 6 additions & 0 deletions adaptive_scheduler/_server_support/slurm_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

if TYPE_CHECKING:
import adaptive
from adaptive.learner.base_learner import LearnerType

from adaptive_scheduler.utils import _DATAFRAME_FORMATS, EXECUTOR_TYPES, GoalTypes

Expand All @@ -31,6 +32,7 @@ def slurm_run(
cleanup_first: bool = True,
save_dataframe: bool = True,
dataframe_format: _DATAFRAME_FORMATS = "pickle",
periodic_callable: tuple[Callable[[str, LearnerType], None], float] | None = None,
max_fails_per_job: int = 50,
max_simultaneous_jobs: int = 100,
exclusive: bool | tuple[bool, ...] = True,
Expand Down Expand Up @@ -85,6 +87,9 @@ def slurm_run(
dataframe_format
The format to save the `pandas.DataFrame`s in. See
`adaptive_scheduler.utils.save_dataframes` for more information.
periodic_callable
A tuple of a callable and an interval in seconds. The callable will be called
every `interval` seconds and takes the learner name and learner as arguments.
max_fails_per_job
The maximum number of times a job can fail before it is cancelled.
max_simultaneous_jobs
Expand Down Expand Up @@ -177,6 +182,7 @@ def slurm_run(
cleanup_first=cleanup_first,
save_dataframe=save_dataframe,
dataframe_format=dataframe_format,
periodic_callable=periodic_callable,
max_fails_per_job=max_fails_per_job,
max_simultaneous_jobs=max_simultaneous_jobs,
initializers=initializers,
Expand Down
2 changes: 1 addition & 1 deletion adaptive_scheduler/client_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@
"""Convert parsed arguments to environment variables."""
env_vars = {}
for arg, value in vars(args).items():
if value is not None:
if value is not None and not arg.lower().startswith("serialized"):

Check warning on line 211 in adaptive_scheduler/client_support.py

View check run for this annotation

Codecov / codecov/patch

adaptive_scheduler/client_support.py#L211

Added line #L211 was not covered by tests
env_vars[f"{prefix}{arg.upper()}"] = str(value)
os.environ.update(env_vars)
log.info("set environment variables", **env_vars)
Loading