Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/1323.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix hang with `--dist=loadgroup` if a crashed worker is replaced.
12 changes: 10 additions & 2 deletions src/xdist/scheduler/loadscope.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,10 @@ def remove_node(self, node: WorkerController) -> str | None:
"Unable to identify crashitem on a workload with pending items"
)

# Made uncompleted work unit available again
self.workqueue.update(workload)
# Make uncompleted work units available again
for scope, work_unit in workload.items():
if any(not completed for completed in work_unit.values()):
self.workqueue[scope] = work_unit

for node in self.assigned_work:
self._reschedule(node)
Expand Down Expand Up @@ -278,6 +280,12 @@ def _assign_work_unit(self, node: WorkerController) -> None:
for nodeid, completed in work_unit.items()
if not completed
]
if not nodeids_indexes:
# Raise since this is an internal error that may result in a hanging worker
# See #1323
raise RuntimeError(
"Trying to assign a work unit with no pending items to a node"
)

node.send_runtest_some(nodeids_indexes)

Expand Down
21 changes: 21 additions & 0 deletions testing/acceptance_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,27 @@ def test_b(): pass
]
)

def test_loadgroup_does_not_hang_after_restart(
self, pytester: pytest.Pytester
) -> None:
"""Fix test suite never finishing in case a worker has to be restarted
after having already finished a test (#1323)."""
f = pytester.makepyfile(
"""
import os
def test_a(): pass
def test_b(): os._exit(1)
"""
)
res = pytester.runpytest(f, "-n1", "--dist=loadgroup")
res.stdout.fnmatch_lines(
[
"replacing crashed worker gw*",
"worker*crashed while running*",
"*5 failed*1 passed*",
]
)

def test_max_worker_restart(self, pytester: pytest.Pytester) -> None:
f = pytester.makepyfile(
"""
Expand Down
Loading