diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 452c23bdd9..ab5b28fffc 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1425,14 +1425,8 @@ class TaskState: #: be rejected. run_id: int | None - #: Whether to consider this task rootish in the context of task queueing - #: True - #: Always consider this task rootish - #: False - #: Never consider this task rootish - #: None - #: Use a heuristic to determine whether this task should be considered rootish - _rootish: bool | None + #: Whether to allow queueing this task if it is rootish + _queueable: bool #: Cached hash of :attr:`~TaskState.client_key` _hash: int @@ -1489,7 +1483,7 @@ def __init__( self.metadata = None self.annotations = None self.erred_on = None - self._rootish = None + self._queueable = True self.run_id = None self.group = group group.add(self) @@ -2286,7 +2280,7 @@ def decide_worker_rootish_queuing_disabled( """ if self.validate: # See root-ish-ness note below in `decide_worker_rootish_queuing_enabled` - assert math.isinf(self.WORKER_SATURATION) or not ts._rootish + assert math.isinf(self.WORKER_SATURATION) or not ts._queueable pool = self.idle.values() if self.idle else self.running if not pool: @@ -2452,7 +2446,7 @@ def _transition_waiting_processing(self, key: Key, stimulus_id: str) -> RecsMsgs # removed, there should only be one, which combines co-assignment and # queuing. Eventually, special-casing root tasks might be removed entirely, # with better heuristics. - if math.isinf(self.WORKER_SATURATION) or not ts._rootish: + if math.isinf(self.WORKER_SATURATION) or not ts._queueable: if not (ws := self.decide_worker_rootish_queuing_disabled(ts)): return {ts.key: "no-worker"}, {}, {} else: diff --git a/distributed/shuffle/_scheduler_plugin.py b/distributed/shuffle/_scheduler_plugin.py index 5f474c0cfd..0cf1c3338e 100644 --- a/distributed/shuffle/_scheduler_plugin.py +++ b/distributed/shuffle/_scheduler_plugin.py @@ -300,7 +300,7 @@ def _ensure_output_tasks_are_non_rootish(self, spec: ShuffleSpec) -> None: """ barrier = self.scheduler.tasks[barrier_key(spec.id)] for dependent in barrier.dependents: - dependent._rootish = False + dependent._queueable = False @log_errors() def _set_restriction(self, ts: TaskState, worker: str) -> None: