From 3a9418556a81715e67df19a48ad898dc548e325e Mon Sep 17 00:00:00 2001 From: Vojtech Tuma Date: Thu, 11 Jul 2024 16:47:45 +0200 Subject: [PATCH] Use default scheduler port for LocalCluster 1. Unify multiple occurrences of 8786 into a single constant `DEFAULT_SCHEDULER_PORT`. 2. Change the default for the local cluster to use that instead of `0` (random port). 3. Introduce a fallback address for scheduler start in case of no port having been given. --- distributed/cli/dask_scheduler.py | 5 +++-- distributed/cli/dask_ssh.py | 3 ++- distributed/core.py | 31 ++++++++++++++++++++++++------- distributed/deploy/local.py | 2 +- distributed/scheduler.py | 20 +++++++++++++++++--- distributed/utils_test.py | 3 ++- 6 files changed, 49 insertions(+), 15 deletions(-) diff --git a/distributed/cli/dask_scheduler.py b/distributed/cli/dask_scheduler.py index b2b846ee46f..5a3daa0a282 100755 --- a/distributed/cli/dask_scheduler.py +++ b/distributed/cli/dask_scheduler.py @@ -20,6 +20,7 @@ enable_proctitle_on_children, enable_proctitle_on_current, ) +from distributed.scheduler import DEFAULT_SCHEDULER_PORT logger = logging.getLogger("distributed.scheduler") @@ -165,9 +166,9 @@ def main( if port is None and (not host or not re.search(r":\d", host)): if isinstance(protocol, list): - port = [8786] + [0] * (len(protocol) - 1) + port = [DEFAULT_SCHEDULER_PORT] + [0] * (len(protocol) - 1) else: - port = 8786 + port = DEFAULT_SCHEDULER_PORT if isinstance(protocol, list) or isinstance(port, list): if (not isinstance(protocol, list) or not isinstance(port, list)) or len( diff --git a/distributed/cli/dask_ssh.py b/distributed/cli/dask_ssh.py index f4497745377..353b43d0a5b 100755 --- a/distributed/cli/dask_ssh.py +++ b/distributed/cli/dask_ssh.py @@ -8,6 +8,7 @@ import click from distributed.deploy.old_ssh import SSHCluster +from distributed.scheduler import DEFAULT_SCHEDULER_PORT logger = logging.getLogger("distributed.dask_ssh") @@ -30,7 +31,7 @@ ) @click.option( "--scheduler-port", - default=8786, + default=DEFAULT_SCHEDULER_PORT, show_default=True, type=int, help="Specify scheduler port number.", diff --git a/distributed/core.py b/distributed/core.py index 90705e80515..28f41150b6a 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -859,13 +859,30 @@ async def listen(self, port_or_addr=None, allow_offload=True, **kwargs): else: addr = port_or_addr assert isinstance(addr, str) - listener = await listen( - addr, - self.handle_comm, - deserialize=self.deserialize, - allow_offload=allow_offload, - **kwargs, - ) + try: + listener = await listen( + addr, + self.handle_comm, + deserialize=self.deserialize, + allow_offload=allow_offload, + **kwargs, + ) + except OSError: + fallback_port_or_addr = kwargs.get("fallback_port_or_addr", None) + if not fallback_port_or_addr: + raise + warnings.warn( + f"Address {addr} is already in use.\n" + f"Falling back to {fallback_port_or_addr} instead" + ) + listener = await listen( + fallback_port_or_addr, + self.handle_comm, + deserialize=self.deserialize, + allow_offload=allow_offload, + **kwargs, + ) + self.listeners.append(listener) def handle_comm(self, comm: Comm) -> NoOpAwaitable: diff --git a/distributed/deploy/local.py b/distributed/deploy/local.py index 69f5d8af35d..5031754bee1 100644 --- a/distributed/deploy/local.py +++ b/distributed/deploy/local.py @@ -120,7 +120,7 @@ def __init__( start=None, host=None, ip=None, - scheduler_port=0, + scheduler_port=None, silence_logs=logging.WARN, dashboard_address=":8787", worker_dashboard_address=None, diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 0273d333da3..eab0e6a9086 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -195,6 +195,8 @@ "stealing": WorkStealing, } +DEFAULT_SCHEDULER_PORT = 8786 + class ClientState: """A simple object holding information about a client.""" @@ -3574,7 +3576,6 @@ class Scheduler(SchedulerState, ServerNode): Time we expect certain functions to take, e.g. ``{'sum': 0.25}`` """ - default_port = 8786 _instances: ClassVar[weakref.WeakSet[Scheduler]] = weakref.WeakSet() worker_ttl: float | None @@ -3686,8 +3687,18 @@ def __init__( interface=interface, protocol=protocol, security=security, - default_port=self.default_port, + default_port=DEFAULT_SCHEDULER_PORT, ) + if port is None: + self._fallback_start_addresses = addresses_from_user_args( + host=host, + port=0, + interface=interface, + protocol=protocol, + security=security, + ) + else: + self._fallback_start_addresses = [] http_server_modules = dask.config.get("distributed.scheduler.http.routes") show_dashboard = dashboard or (dashboard is None and dashboard_address) @@ -4094,11 +4105,14 @@ async def start_unsafe(self) -> Self: self._clear_task_state() - for addr in self._start_address: + for addr, fallback_addr in itertools.zip_longest( + self._start_address, self._fallback_start_addresses + ): await self.listen( addr, allow_offload=False, handshake_overrides={"pickle-protocol": 4, "compression": None}, + fallback_port_or_addr=fallback_addr, **self.security.get_listen_args("scheduler"), ) self.ip = get_address_host(self.listen_address) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 1fd59b5525a..0d82f1fa45c 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -60,6 +60,7 @@ from distributed.node import ServerNode from distributed.proctitle import enable_proctitle_on_children from distributed.protocol import deserialize +from distributed.scheduler import DEFAULT_SCHEDULER_PORT from distributed.scheduler import TaskState as SchedulerTaskState from distributed.security import Security from distributed.utils import ( @@ -2530,7 +2531,7 @@ def _bind_port(port): s.listen(1) yield s - default_ports = [8786] + default_ports = [DEFAULT_SCHEDULER_PORT] while time() - start < _TEST_TIMEOUT: try: