Skip to content

Commit

Permalink
Use default scheduler port for LocalCluster
Browse files Browse the repository at this point in the history
1. Unify multiple occurrences of 8786 into a single constant
   `DEFAULT_SCHEDULER_PORT`.
2. Change the default for the local cluster to use that instead
   of `0` (random port).
3. Introduce a fallback address for scheduler start in case of no
   port having been given.
  • Loading branch information
tmi committed Jul 11, 2024
1 parent 8564dc7 commit fcb7223
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 15 deletions.
5 changes: 3 additions & 2 deletions distributed/cli/dask_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
enable_proctitle_on_children,
enable_proctitle_on_current,
)
from distributed.scheduler import DEFAULT_SCHEDULER_PORT

logger = logging.getLogger("distributed.scheduler")

Expand Down Expand Up @@ -165,9 +166,9 @@ def main(

if port is None and (not host or not re.search(r":\d", host)):
if isinstance(protocol, list):
port = [8786] + [0] * (len(protocol) - 1)
port = [DEFAULT_SCHEDULER_PORT] + [0] * (len(protocol) - 1)

Check warning on line 169 in distributed/cli/dask_scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/cli/dask_scheduler.py#L169

Added line #L169 was not covered by tests
else:
port = 8786
port = DEFAULT_SCHEDULER_PORT

if isinstance(protocol, list) or isinstance(port, list):
if (not isinstance(protocol, list) or not isinstance(port, list)) or len(
Expand Down
3 changes: 2 additions & 1 deletion distributed/cli/dask_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import click

from distributed.deploy.old_ssh import SSHCluster
from distributed.scheduler import DEFAULT_SCHEDULER_PORT

logger = logging.getLogger("distributed.dask_ssh")

Expand All @@ -30,7 +31,7 @@
)
@click.option(
"--scheduler-port",
default=8786,
default=DEFAULT_SCHEDULER_PORT,
show_default=True,
type=int,
help="Specify scheduler port number.",
Expand Down
31 changes: 24 additions & 7 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -859,13 +859,30 @@ async def listen(self, port_or_addr=None, allow_offload=True, **kwargs):
else:
addr = port_or_addr
assert isinstance(addr, str)
listener = await listen(
addr,
self.handle_comm,
deserialize=self.deserialize,
allow_offload=allow_offload,
**kwargs,
)
try:
listener = await listen(
addr,
self.handle_comm,
deserialize=self.deserialize,
allow_offload=allow_offload,
**kwargs,
)
except OSError:
fallback_port_or_addr = kwargs.get("fallback_port_or_addr", None)
if not fallback_port_or_addr:
raise
warnings.warn(
f"Address {addr} is already in use.\n"
f"Falling back to {fallback_port_or_addr} instead"
)
listener = await listen(

Check warning on line 878 in distributed/core.py

View check run for this annotation

Codecov / codecov/patch

distributed/core.py#L878

Added line #L878 was not covered by tests
fallback_port_or_addr,
self.handle_comm,
deserialize=self.deserialize,
allow_offload=allow_offload,
**kwargs,
)

self.listeners.append(listener)

def handle_comm(self, comm: Comm) -> NoOpAwaitable:
Expand Down
2 changes: 1 addition & 1 deletion distributed/deploy/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def __init__(
start=None,
host=None,
ip=None,
scheduler_port=0,
scheduler_port=None,
silence_logs=logging.WARN,
dashboard_address=":8787",
worker_dashboard_address=None,
Expand Down
20 changes: 17 additions & 3 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@
"stealing": WorkStealing,
}

DEFAULT_SCHEDULER_PORT = 8786


class ClientState:
"""A simple object holding information about a client."""
Expand Down Expand Up @@ -3574,7 +3576,6 @@ class Scheduler(SchedulerState, ServerNode):
Time we expect certain functions to take, e.g. ``{'sum': 0.25}``
"""

default_port = 8786
_instances: ClassVar[weakref.WeakSet[Scheduler]] = weakref.WeakSet()

worker_ttl: float | None
Expand Down Expand Up @@ -3686,8 +3687,18 @@ def __init__(
interface=interface,
protocol=protocol,
security=security,
default_port=self.default_port,
default_port=DEFAULT_SCHEDULER_PORT,
)
if port is None:
self._fallback_start_addresses = addresses_from_user_args(
host=host,
port=0,
interface=interface,
protocol=protocol,
security=security,
)
else:
self._fallback_start_addresses = []

http_server_modules = dask.config.get("distributed.scheduler.http.routes")
show_dashboard = dashboard or (dashboard is None and dashboard_address)
Expand Down Expand Up @@ -4094,11 +4105,14 @@ async def start_unsafe(self) -> Self:

self._clear_task_state()

for addr in self._start_address:
for addr, fallback_addr in itertools.zip_longest(
self._start_address, self._fallback_start_addresses
):
await self.listen(
addr,
allow_offload=False,
handshake_overrides={"pickle-protocol": 4, "compression": None},
fallback_port_or_addr=fallback_addr,
**self.security.get_listen_args("scheduler"),
)
self.ip = get_address_host(self.listen_address)
Expand Down
2 changes: 1 addition & 1 deletion distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2530,7 +2530,7 @@ def _bind_port(port):
s.listen(1)
yield s

default_ports = [8786]
default_ports = [Scheduler.DEFAULT_SCHEDULER_PORT]

while time() - start < _TEST_TIMEOUT:
try:
Expand Down

0 comments on commit fcb7223

Please sign in to comment.