diff --git a/adaptive_scheduler/_server_support/job_manager.py b/adaptive_scheduler/_server_support/job_manager.py index 1b796770..9b79b6e3 100644 --- a/adaptive_scheduler/_server_support/job_manager.py +++ b/adaptive_scheduler/_server_support/job_manager.py @@ -11,6 +11,10 @@ from .common import MaxRestartsReachedError, log if TYPE_CHECKING: + from typing import Callable + + from adaptive.learner.base_learner import LearnerType + from adaptive_scheduler.scheduler import BaseScheduler from adaptive_scheduler.utils import ( _DATAFRAME_FORMATS, @@ -31,6 +35,7 @@ def command_line_options( save_interval: float = 300, save_dataframe: bool = True, dataframe_format: _DATAFRAME_FORMATS = "pickle", + periodic_callable: tuple[Callable[[str, LearnerType], None], int] | None = None, loky_start_method: LOKY_START_METHODS = "loky", ) -> dict[str, Any]: """Return the command line options for the job_script. @@ -58,6 +63,9 @@ def command_line_options( Whether to periodically save the learner's data as a `pandas.DataFame`. dataframe_format The format in which to save the `pandas.DataFame`. See the type hint for the options. + periodic_callable + A tuple of a callable and an interval in seconds. The callable will be called + every `interval` seconds and takes the learner name and learner as arguments. loky_start_method Loky start method, by default "loky". @@ -71,12 +79,14 @@ def command_line_options( runner_kwargs = {} runner_kwargs["goal"] = goal base64_runner_kwargs = _serialize_to_b64(runner_kwargs) + base64_periodic_callable = _serialize_to_b64(periodic_callable) opts = { "--url": database_manager.url, "--log-interval": log_interval, "--save-interval": save_interval, "--serialized-runner-kwargs": base64_runner_kwargs, + "--serialized-periodic-callable": base64_periodic_callable, } if scheduler.single_job_script: # if `cores` or `executor_type` is a tuple then we set it @@ -125,6 +135,9 @@ class JobManager(BaseManager): Whether to periodically save the learner's data as a `pandas.DataFame`. dataframe_format The format in which to save the `pandas.DataFame`. See the type hint for the options. + periodic_callable + A tuple of a callable and an interval in seconds. The callable will be called + every `interval` seconds and takes the learner name and learner as arguments. loky_start_method Loky start method, by default "loky". log_interval @@ -160,6 +173,7 @@ def __init__( # Command line launcher options save_dataframe: bool = True, dataframe_format: _DATAFRAME_FORMATS = "pickle", + periodic_callable: tuple[Callable[[str, LearnerType], None], int] | None = None, loky_start_method: LOKY_START_METHODS = "loky", log_interval: float = 60, save_interval: float = 300, @@ -185,6 +199,7 @@ def __init__( self.save_interval = save_interval self.runner_kwargs = runner_kwargs self.goal = goal + self.periodic_callable = periodic_callable @property def max_job_starts(self) -> int: @@ -208,6 +223,7 @@ def _setup(self) -> None: save_interval=self.save_interval, save_dataframe=self.save_dataframe, dataframe_format=self.dataframe_format, + periodic_callable=self.periodic_callable, goal=self.goal, loky_start_method=self.loky_start_method, ) @@ -245,7 +261,8 @@ async def _start_new_jobs( queued.add(job_name) index, fname = self.database_manager._choose_fname(job_name) log.debug( - f"Starting `job_name={job_name}` with `index={index}` and `fname={fname}`", + f"Starting `job_name={job_name}` with " + f"`index={index}` and `fname={fname}`", ) await loop.run_in_executor( ex, diff --git a/adaptive_scheduler/_server_support/launcher.py b/adaptive_scheduler/_server_support/launcher.py index d398de13..54053355 100644 --- a/adaptive_scheduler/_server_support/launcher.py +++ b/adaptive_scheduler/_server_support/launcher.py @@ -10,6 +10,7 @@ import argparse import os from contextlib import suppress +from functools import partial from typing import TYPE_CHECKING, Any, get_args import adaptive @@ -107,6 +108,7 @@ def _parse_args() -> argparse.Namespace: default=120, ) parser.add_argument("--serialized-runner-kwargs", action="store", type=str) + parser.add_argument("--serialized-periodic-callable", action="store", type=str) return parser.parse_args() @@ -143,6 +145,7 @@ def main() -> None: ) runner_kwargs = _deserialize_from_b64(args.serialized_runner_kwargs) + periodic_callable = _deserialize_from_b64(args.serialized_periodic_callable) runner_kwargs.setdefault("shutdown_executor", True) runner = adaptive.Runner(learner, executor=executor, **runner_kwargs) @@ -156,6 +159,13 @@ def main() -> None: save_method = save_dataframe(fname, format=args.dataframe_format) runner.start_periodic_saving(interval=args.save_interval, method=save_method) + if periodic_callable: + _callable, save_interval = periodic_callable + runner.start_periodic_saving( + interval=save_interval, + method=partial(_callable, fname), + ) + # log progress info in the job output script, optional _log_task = client_support.log_info(runner, interval=args.log_interval) @@ -168,6 +178,10 @@ def main() -> None: if args.save_dataframe: save_method(learner) + if periodic_callable: + _callable, _ = periodic_callable + _callable(fname, learner) + # log once more after the runner is done client_support.log_now(runner, npoints_start) diff --git a/adaptive_scheduler/_server_support/run_manager.py b/adaptive_scheduler/_server_support/run_manager.py index 7f827e2b..ea002d26 100644 --- a/adaptive_scheduler/_server_support/run_manager.py +++ b/adaptive_scheduler/_server_support/run_manager.py @@ -37,6 +37,7 @@ if TYPE_CHECKING: import adaptive + from adaptive.learner.base_learner import LearnerType from adaptive_scheduler.scheduler import BaseScheduler from adaptive_scheduler.utils import _DATAFRAME_FORMATS @@ -105,8 +106,15 @@ class RunManager(BaseManager): Whether to periodically save the learner's data as a `pandas.DataFame`. dataframe_format The format in which to save the `pandas.DataFame`. See the type hint for the options. + periodic_callable + A tuple of a callable and an interval in seconds. The callable will be called + every `interval` seconds and takes the learner name and learner as arguments. max_log_lines The maximum number of lines to display in the log viewer widget. + max_fails_per_job + The maximum number of times a job can fail before it is not restarted. + max_simultaneous_jobs + The maximum number of jobs that can run simultaneously. Attributes ---------- @@ -152,7 +160,7 @@ class RunManager(BaseManager): """ - def __init__( + def __init__( # noqa: PLR0915 self, scheduler: BaseScheduler, learners: list[adaptive.BaseLearner], @@ -177,6 +185,7 @@ def __init__( cleanup_first: bool = False, save_dataframe: bool = False, dataframe_format: _DATAFRAME_FORMATS = "pickle", + periodic_callable: tuple[Callable[[str, LearnerType], None], int] | None = None, max_log_lines: int = 500, max_fails_per_job: int = 50, max_simultaneous_jobs: int = 100, @@ -203,6 +212,7 @@ def __init__( self.loky_start_method = loky_start_method self.save_dataframe = save_dataframe self.dataframe_format = dataframe_format + self.periodic_callable = periodic_callable self.max_log_lines = max_log_lines self.max_fails_per_job = max_fails_per_job self.max_simultaneous_jobs = max_simultaneous_jobs @@ -224,10 +234,13 @@ def __init__( # Set in methods self.start_time: float | None = None self.end_time: float | None = None - self._start_one_by_one_task: tuple[ - asyncio.Future, - list[asyncio.Task], - ] | None = None + self._start_one_by_one_task: ( + tuple[ + asyncio.Future, + list[asyncio.Task], + ] + | None + ) = None # Set on init self.learners = learners @@ -265,6 +278,7 @@ def __init__( # Launcher command line options save_dataframe=self.save_dataframe, dataframe_format=self.dataframe_format, + periodic_callable=self.periodic_callable, loky_start_method=self.loky_start_method, log_interval=self.log_interval, save_interval=self.save_interval, @@ -301,7 +315,8 @@ def _setup(self) -> None: self.kill_manager.start() self.start_time = time.time() - def start(self, wait_for: RunManager | None = None) -> RunManager: # type: ignore[override] + # type: ignore[override] + def start(self, wait_for: RunManager | None = None) -> RunManager: """Start the RunManager and optionally wait for another RunManager to finish.""" if wait_for is not None: self._start_one_by_one_task = start_one_by_one(wait_for, self) @@ -467,12 +482,13 @@ def _repr_html_(self) -> None: def info(self) -> None: return info(self) - def load_dataframes(self) -> pd.DataFrame: + def load_dataframes(self) -> pd.DataFrame | list[pd.DataFrame]: """Load the `pandas.DataFrame`s with the most recently saved learners data.""" if not self.save_dataframe: msg = "The `save_dataframe` option was not set to True." raise ValueError(msg) - return load_dataframes(self.fnames, format=self.dataframe_format) # type: ignore[return-value] + # type: ignore[return-value] + return load_dataframes(self.fnames, format=self.dataframe_format) async def _wait_for_finished( diff --git a/adaptive_scheduler/_server_support/slurm_run.py b/adaptive_scheduler/_server_support/slurm_run.py index 72929850..d0c5e5c4 100644 --- a/adaptive_scheduler/_server_support/slurm_run.py +++ b/adaptive_scheduler/_server_support/slurm_run.py @@ -11,6 +11,7 @@ if TYPE_CHECKING: import adaptive + from adaptive.learner.base_learner import LearnerType from adaptive_scheduler.utils import _DATAFRAME_FORMATS, EXECUTOR_TYPES, GoalTypes @@ -31,6 +32,7 @@ def slurm_run( cleanup_first: bool = True, save_dataframe: bool = True, dataframe_format: _DATAFRAME_FORMATS = "pickle", + periodic_callable: tuple[Callable[[str, LearnerType], None], float] | None = None, max_fails_per_job: int = 50, max_simultaneous_jobs: int = 100, exclusive: bool | tuple[bool, ...] = True, @@ -85,6 +87,9 @@ def slurm_run( dataframe_format The format to save the `pandas.DataFrame`s in. See `adaptive_scheduler.utils.save_dataframes` for more information. + periodic_callable + A tuple of a callable and an interval in seconds. The callable will be called + every `interval` seconds and takes the learner name and learner as arguments. max_fails_per_job The maximum number of times a job can fail before it is cancelled. max_simultaneous_jobs @@ -177,6 +182,7 @@ def slurm_run( cleanup_first=cleanup_first, save_dataframe=save_dataframe, dataframe_format=dataframe_format, + periodic_callable=periodic_callable, max_fails_per_job=max_fails_per_job, max_simultaneous_jobs=max_simultaneous_jobs, initializers=initializers, diff --git a/adaptive_scheduler/client_support.py b/adaptive_scheduler/client_support.py index 93c20f3d..81c16f44 100644 --- a/adaptive_scheduler/client_support.py +++ b/adaptive_scheduler/client_support.py @@ -208,7 +208,7 @@ def args_to_env(args: argparse.Namespace, prefix: str = "ADAPTIVE_SCHEDULER_") - """Convert parsed arguments to environment variables.""" env_vars = {} for arg, value in vars(args).items(): - if value is not None: + if value is not None and not arg.lower().startswith("serialized"): env_vars[f"{prefix}{arg.upper()}"] = str(value) os.environ.update(env_vars) log.info("set environment variables", **env_vars)