diff --git a/docs/tuning.md b/docs/tuning.md index 455c21da4..6f73d3a16 100644 --- a/docs/tuning.md +++ b/docs/tuning.md @@ -6,7 +6,13 @@ First, my general tips would be to avoid using redundant operators, like how `po When running PySR, I usually do the following: -I run from IPython (Jupyter Notebooks don't work as well[^1]) on the head node of a slurm cluster. Passing `cluster_manager="slurm"` will make PySR set up a run over the entire allocation. I set `procs` equal to the total number of cores over my entire allocation. +I run from IPython (Jupyter Notebooks don't work as well[^1]) on the head node of a slurm cluster. +Passing `cluster_manager="slurm"` will make PySR set up a run over the entire allocation (another common option is `cluster_manager="mpi"` which will use MPI). +I set `procs` equal to the total number of cores over my entire allocation. + +> [!NOTE] +> When running on a cluster, you should only launch the search from a single task on the head node, rather than starting PySR on every node simultaneously. +> The way that ClusterManagers.jl works will automatically call the correct command to spread out the processing over the topology of nodes, such as `srun` for slurm. [^1]: Jupyter Notebooks are supported by PySR, but miss out on some useful features available in IPython and Python: the progress bar, and early stopping with "q". In Jupyter you cannot interrupt a search once it has started; you have to restart the kernel. See [this issue](https://github.com/MilesCranmer/PySR/issues/260) for updates. diff --git a/pysr/julia_extensions.py b/pysr/julia_extensions.py index ac4714d48..0fac35436 100644 --- a/pysr/julia_extensions.py +++ b/pysr/julia_extensions.py @@ -2,8 +2,17 @@ from typing import Optional +from .julia_helpers import jl_array from .julia_import import Pkg, jl +UUIDs = { + "LoopVectorization": "bdcacae8-1622-11e9-2a5c-532679323890", + "Bumper": "8ce10254-0962-460f-a3d8-1f77fea1446e", + "Zygote": "e88e6eb3-aa80-5325-afca-941959d7151f", + "MPIClusterManagers": "e7922434-ae4b-11e9-05c5-9780451d2c66", + "ClusterManagers": "34f1f09b-3a8b-5176-ab39-66d58a4d544e", +} + def load_required_packages( *, @@ -13,36 +22,40 @@ def load_required_packages( cluster_manager: Optional[str] = None, ): if turbo: - load_package("LoopVectorization", "bdcacae8-1622-11e9-2a5c-532679323890") + load_package("LoopVectorization") if bumper: - load_package("Bumper", "8ce10254-0962-460f-a3d8-1f77fea1446e") + load_package("Bumper") if enable_autodiff: - load_package("Zygote", "e88e6eb3-aa80-5325-afca-941959d7151f") + load_package("Zygote") if cluster_manager is not None: - load_package("ClusterManagers", "34f1f09b-3a8b-5176-ab39-66d58a4d544e") + if cluster_manager == "mpi": + load_package("MPIClusterManagers") + else: + load_package("ClusterManagers") def load_all_packages(): """Install and load all Julia extensions available to PySR.""" - load_required_packages( - turbo=True, bumper=True, enable_autodiff=True, cluster_manager="slurm" - ) + specs = [Pkg.PackageSpec(name=key, uuid=value) for key, value in UUIDs.items()] + Pkg.add(jl_array(specs)) + Pkg.resolve() + jl.seval("import " + ", ".join(UUIDs.keys())) # TODO: Refactor this file so we can install all packages at once using `juliapkg`, # ideally parameterizable via the regular Python extras API -def isinstalled(uuid_s: str): - return jl.haskey(Pkg.dependencies(), jl.Base.UUID(uuid_s)) +def isinstalled(package_name: str): + return jl.haskey(Pkg.dependencies(), jl.Base.UUID(UUIDs[package_name])) -def load_package(package_name: str, uuid_s: str) -> None: - if not isinstalled(uuid_s): - Pkg.add(name=package_name, uuid=uuid_s) +def load_package(package_name: str) -> None: + if not isinstalled(package_name): + Pkg.add(name=package_name, uuid=UUIDs[package_name]) Pkg.resolve() # TODO: Protect against loading the same symbol from two packages, # maybe with a @gensym here. - jl.seval(f"using {package_name}: {package_name}") + jl.seval(f"import {package_name}") return None diff --git a/pysr/julia_helpers.py b/pysr/julia_helpers.py index 18d4a6cf3..b9bf6c5a5 100644 --- a/pysr/julia_helpers.py +++ b/pysr/julia_helpers.py @@ -27,9 +27,26 @@ def _escape_filename(filename): return str_repr -def _load_cluster_manager(cluster_manager: str): - jl.seval(f"using ClusterManagers: addprocs_{cluster_manager}") - return jl.seval(f"addprocs_{cluster_manager}") +def _load_cluster_manager(cluster_manager: str, mpi_flags: str): + if cluster_manager == "mpi": + jl.seval("using Distributed: addprocs") + jl.seval("using MPIClusterManagers: MPIWorkerManager") + + return jl.seval( + "(np; exeflags=``, kws...) -> " + + "addprocs(MPIWorkerManager(np);" + + ",".join( + [ + "exeflags=`$exeflags --project=$(Base.active_project())`", + f"mpiflags=`{mpi_flags}`", + "kws...", + ] + ) + + ")" + ) + else: + jl.seval(f"using ClusterManagers: addprocs_{cluster_manager}") + return jl.seval(f"addprocs_{cluster_manager}") def jl_array(x, dtype=None): @@ -42,7 +59,9 @@ def jl_array(x, dtype=None): def jl_is_function(f) -> bool: - return cast(bool, jl.seval("op -> op isa Function")(f)) + # We name it so we only compile it once + is_function = jl.seval("__pysr_jl_is_function(op) = op isa Function") + return cast(bool, is_function(f)) def jl_serialize(obj: Any) -> NDArray[np.uint8]: diff --git a/pysr/param_groupings.yml b/pysr/param_groupings.yml index 0ff9d63da..fcec5a6fc 100644 --- a/pysr/param_groupings.yml +++ b/pysr/param_groupings.yml @@ -70,6 +70,7 @@ - multithreading - cluster_manager - heap_size_hint_in_bytes + - mpi_flags - batching - batch_size - precision diff --git a/pysr/sr.py b/pysr/sr.py index 0054ce502..63d4c9553 100644 --- a/pysr/sr.py +++ b/pysr/sr.py @@ -495,10 +495,14 @@ class PySRRegressor(MultiOutputMixin, RegressorMixin, BaseEstimator): Using procs=0 will turn off both. Default is `True`. cluster_manager : str For distributed computing, this sets the job queue system. Set - to one of "slurm", "pbs", "lsf", "sge", "qrsh", "scyld", or - "htc". If set to one of these, PySR will run in distributed + to one of "slurm", "pbs", "lsf", "sge", "qrsh", "scyld", + "htc", or "mpi". If set to one of these, PySR will run in distributed mode, and use `procs` to figure out how many processes to launch. Default is `None`. + mpi_flags : str + (Experimental API) String of options to pass to `mpiexec`. + For example, `"-host worker1,worker2"`. + Default is `None`. heap_size_hint_in_bytes : int For multiprocessing, this sets the `--heap-size-hint` parameter for new Julia processes. This can be configured when using @@ -773,8 +777,9 @@ def __init__( procs: int = cpu_count(), multithreading: Optional[bool] = None, cluster_manager: Optional[ - Literal["slurm", "pbs", "lsf", "sge", "qrsh", "scyld", "htc"] + Literal["slurm", "pbs", "lsf", "sge", "qrsh", "scyld", "htc", "mpi"] ] = None, + mpi_flags: str = "", heap_size_hint_in_bytes: Optional[int] = None, batching: bool = False, batch_size: int = 50, @@ -872,6 +877,7 @@ def __init__( self.procs = procs self.multithreading = multithreading self.cluster_manager = cluster_manager + self.mpi_flags = mpi_flags self.heap_size_hint_in_bytes = heap_size_hint_in_bytes self.batching = batching self.batch_size = batch_size @@ -1690,9 +1696,6 @@ def _run( if not ALREADY_RAN and update_verbosity != 0: print("Compiling Julia backend...") - if cluster_manager is not None: - cluster_manager = _load_cluster_manager(cluster_manager) - # TODO(mcranmer): These functions should be part of this class. binary_operators, unary_operators = _maybe_create_inline_operators( binary_operators=binary_operators, @@ -1753,6 +1756,9 @@ def _run( cluster_manager=cluster_manager, ) + if cluster_manager is not None: + cluster_manager = _load_cluster_manager(cluster_manager, self.mpi_flags) + mutation_weights = SymbolicRegression.MutationWeights( mutate_constant=self.weight_mutate_constant, mutate_operator=self.weight_mutate_operator, diff --git a/pysr/test/test.py b/pysr/test/test.py index c641e9f66..51b962d5a 100644 --- a/pysr/test/test.py +++ b/pysr/test/test.py @@ -85,7 +85,7 @@ def test_linear_relation_weighted_bumper(self): jl.seval("((::Val{x}) where x) -> x")(model.julia_options_.bumper), True ) - def test_multiprocessing_turbo_custom_objective(self): + def _multiprocessing_turbo_custom_objective(self, cluster_manager): rstate = np.random.RandomState(0) y = self.X[:, 0] y += rstate.randn(*y.shape) * 1e-4 @@ -95,6 +95,7 @@ def test_multiprocessing_turbo_custom_objective(self): unary_operators=["sqrt"], procs=2, multithreading=False, + cluster_manager=cluster_manager, turbo=True, early_stop_condition="stop_if(loss, complexity) = loss < 1e-10 && complexity == 1", loss_function=""" @@ -117,6 +118,12 @@ def test_multiprocessing_turbo_custom_objective(self): jl.seval("((::Val{x}) where x) -> x")(model.julia_options_.turbo), True ) + def test_multiprocessing_turbo_custom_objective(self): + self._multiprocessing_turbo_custom_objective(None) + + def test_multiprocessing_turbo_custom_objective_mpi(self): + self._multiprocessing_turbo_custom_objective("mpi") + def test_multiline_seval(self): # The user should be able to run multiple things in a single seval call: num = jl.seval(