Skip to content

Commit

Permalink
Add ability to transfer data from nodes to a controller
Browse files Browse the repository at this point in the history
  • Loading branch information
pehala committed Mar 25, 2022
1 parent 9c7b5f4 commit a2aaecb
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 7 deletions.
15 changes: 11 additions & 4 deletions src/xdist/dsession.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
from queue import Empty, Queue

import py
import pytest

from xdist.workermanage import NodeManager
from xdist.remote import shared_key
from xdist.scheduler import (
EachScheduling,
LoadScheduling,
LoadScopeScheduling,
LoadFileScheduling,
LoadGroupScheduling,
)


from queue import Empty, Queue
from xdist.workermanage import NodeManager


class Interrupted(KeyboardInterrupt):
Expand Down Expand Up @@ -174,6 +174,13 @@ def worker_workerfinished(self, node):
self.shouldstop = "{} received keyboard-interrupt".format(node)
self.worker_errordown(node, "keyboard-interrupt")
return
if "shared" in node.workeroutput:
shared = self.config.stash.setdefault(shared_key, {})
for key, value in node.workeroutput["shared"].items():
if key in shared:
shared[key].append(value)
else:
shared[key] = [value]
if node in self.sched.nodes:
crashitem = self.sched.remove_node(node)
assert not crashitem, (crashitem, node)
Expand Down
26 changes: 25 additions & 1 deletion src/xdist/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@

import py
import pytest

from _pytest.stash import StashKey

PYTEST_GTE_7 = hasattr(pytest, "version_tuple") and pytest.version_tuple >= (7, 0) # type: ignore[attr-defined]

_sys_path = list(sys.path) # freeze a copy of sys.path at interpreter startup

shared_key = StashKey["Shared"]()


@pytest.hookimpl
def pytest_xdist_auto_num_workers(config):
Expand Down Expand Up @@ -302,3 +304,25 @@ def testrun_uid(request):
return request.config.workerinput["testrunuid"]
else:
return uuid.uuid4().hex


def get_shared_data(request_or_session):
"""Return shared data and True, if it is ran from xdist_controller"""
if is_xdist_controller(request_or_session):
return request_or_session.config.stash.setdefault(shared_key, {}), True
return request_or_session.config.stash.setdefault(shared_key, {}), False


@pytest.fixture(scope="session")
def add_shared_data(request, worker_id):
"""Adds data that will be collected from all workers and be accessible from master node in sessionfinish hook"""

def _add(key, value):
shared = request.config.stash.setdefault(shared_key, {})
if worker_id == "master":
# Worker shared_data are grouped together, master data aren't
shared[key] = [value]
else:
shared[key] = value

return _add
8 changes: 6 additions & 2 deletions src/xdist/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@
needs not to be installed in remote environments.
"""

import sys
import os
import sys
import time

import py
import pytest
from _pytest.config import _prepareconfig, Config
from execnet.gateway_base import dumps, DumpError

from _pytest.config import _prepareconfig, Config
from xdist.plugin import shared_key

try:
from setproctitle import setproctitle
Expand Down Expand Up @@ -64,6 +65,9 @@ def pytest_sessionstart(self, session):
def pytest_sessionfinish(self, exitstatus):
# in pytest 5.0+, exitstatus is an IntEnum object
self.config.workeroutput["exitstatus"] = int(exitstatus)
shared = self.config.stash.get(shared_key, None)
if shared:
self.config.workeroutput["shared"] = shared
yield
self.sendevent("workerfinished", workeroutput=self.config.workeroutput)

Expand Down

0 comments on commit a2aaecb

Please sign in to comment.