diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index 1e7505a116..7227aea918 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -10,6 +10,8 @@ distributed: # tornado.application: error scheduler: + rootish-tg: 100 + rootish-tg-dependencies: 500 allowed-failures: 3 # number of retries before a task is considered bad bandwidth: 100000000 # 100 MB/s estimated worker-worker bandwidth blocked-handlers: [] diff --git a/distributed/scheduler.py b/distributed/scheduler.py index d1f18d7c2f..93521709d5 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1840,6 +1840,11 @@ def __init__( + repr(self.WORKER_SATURATION) ) + self.rootish_tg_threshold = dask.config.get("distributed.scheduler.rootish-tg") + self.rootish_tg_dependencies_threshold = dask.config.get( + "distributed.scheduler.rootish-tg-dependencies" + ) + @abstractmethod def log_event(self, topic: str | Collection[str], msg: Any) -> None: ... @@ -3090,8 +3095,8 @@ def is_rootish(self, ts: TaskState) -> bool: # TODO short-circuit to True if `not ts.dependencies`? return ( len(tg) > self.total_nthreads * 2 - and len(tg.dependencies) < 5 - and sum(map(len, tg.dependencies)) < 5 + and len(tg.dependencies) < self.rootish_tg_threshold + and sum(map(len, tg.dependencies)) < self.rootish_tg_dependencies_threshold ) def check_idle_saturated(self, ws: WorkerState, occ: float = -1.0) -> None: