diff --git a/.github/workflows/push-pr_workflow.yml b/.github/workflows/push-pr_workflow.yml index 1d3c9d958..e03b939b1 100644 --- a/.github/workflows/push-pr_workflow.yml +++ b/.github/workflows/push-pr_workflow.yml @@ -10,7 +10,7 @@ jobs: steps: - name: Checkout code - uses: actions/checkout@v2 + uses: actions/checkout@v4 with: fetch-depth: 0 # Checkout the whole history, in case the target is way far behind @@ -40,14 +40,14 @@ jobs: MAX_COMPLEXITY: 15 steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Set up Python - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: '3.x' - name: Check cache - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: ~/.cache/pip key: ${{ hashFiles('requirements/release.txt') }}-${{ hashFiles('requirements/dev.txt') }} @@ -95,14 +95,14 @@ jobs: python-version: ['3.7', '3.8', '3.9', '3.10', '3.11'] steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} - name: Check cache - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: ${{ env.pythonLocation }} key: ${{ env.pythonLocation }}-${{ hashFiles('requirements/release.txt') }}-${{ hashFiles('requirements/dev.txt') }} @@ -112,8 +112,7 @@ jobs: python3 -m pip install --upgrade pip if [ -f requirements.txt ]; then pip install -r requirements.txt; fi pip3 install -r requirements/dev.txt - pip freeze - + - name: Install singularity run: | sudo apt-get update && sudo apt-get install -y \ @@ -153,6 +152,73 @@ jobs: run: | python3 tests/integration/run_tests.py --verbose --local + Integration-tests: + runs-on: ubuntu-latest + env: + GO_VERSION: 1.18.1 + SINGULARITY_VERSION: 3.9.9 + OS: linux + ARCH: amd64 + + strategy: + matrix: + python-version: ['3.7', '3.8', '3.9', '3.10', '3.11'] + + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Check cache + uses: actions/cache@v4 + with: + path: ${{ env.pythonLocation }} + key: ${{ env.pythonLocation }}-${{ hashFiles('requirements/release.txt') }}-${{ hashFiles('requirements/dev.txt') }} + + - name: Install dependencies + run: | + python3 -m pip install --upgrade pip + if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + pip3 install -r requirements/dev.txt + + - name: Install merlin + run: | + pip3 install -e . + merlin config + + - name: Install singularity + run: | + sudo apt-get update && sudo apt-get install -y \ + build-essential \ + libssl-dev \ + uuid-dev \ + libgpgme11-dev \ + squashfs-tools \ + libseccomp-dev \ + pkg-config + wget https://go.dev/dl/go$GO_VERSION.$OS-$ARCH.tar.gz + sudo tar -C /usr/local -xzf go$GO_VERSION.$OS-$ARCH.tar.gz + rm go$GO_VERSION.$OS-$ARCH.tar.gz + export PATH=$PATH:/usr/local/go/bin + wget https://github.com/sylabs/singularity/releases/download/v$SINGULARITY_VERSION/singularity-ce-$SINGULARITY_VERSION.tar.gz + tar -xzf singularity-ce-$SINGULARITY_VERSION.tar.gz + cd singularity-ce-$SINGULARITY_VERSION + ./mconfig && \ + make -C ./builddir && \ + sudo make -C ./builddir install + + - name: Install CLI task dependencies generated from the 'feature demo' workflow + run: | + merlin example feature_demo + pip3 install -r feature_demo/requirements.txt + + # TODO remove the --ignore statement once those tests are fixed + - name: Run integration test suite for distributed tests + run: | + pytest --ignore tests/integration/test_celeryadapter.py tests/integration/ + Distributed-test-suite: runs-on: ubuntu-latest services: diff --git a/CHANGELOG.md b/CHANGELOG.md index efa43f947..e9799ee32 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added +- Merlin manager capability to monitor celery workers. - Several new unit tests for the following subdirectories: - `merlin/common/` - `merlin/config/` @@ -21,6 +22,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Split the `start_server` and `config_server` functions of `merlin/server/server_commands.py` into multiple functions to make testing easier - Split the `create_server_config` function of `merlin/server/server_config.py` into two functions to make testing easier - Combined `set_snapshot_seconds` and `set_snapshot_changes` methods of `RedisConfig` into one method `set_snapshot` +- Moved stop-workers and query-workers integration tests to pytest tests ## [1.12.2b1] ### Added diff --git a/merlin/celery.py b/merlin/celery.py index eb10f1a12..37b7f07e5 100644 --- a/merlin/celery.py +++ b/merlin/celery.py @@ -114,9 +114,11 @@ def route_for_task(name, args, kwargs, options, task=None, **kw): # pylint: dis BROKER_URI = None RESULTS_BACKEND_URI = None +app_name = "merlin_test_app" if os.getenv("CELERY_ENV") == "test" else "merlin" + # initialize app with essential properties app: Celery = patch_celery().Celery( - "merlin", + app_name, broker=BROKER_URI, backend=RESULTS_BACKEND_URI, broker_use_ssl=BROKER_SSL, diff --git a/merlin/examples/dev_workflows/multiple_workers.yaml b/merlin/examples/dev_workflows/multiple_workers.yaml index 8785d9e9a..967582a53 100644 --- a/merlin/examples/dev_workflows/multiple_workers.yaml +++ b/merlin/examples/dev_workflows/multiple_workers.yaml @@ -46,11 +46,11 @@ merlin: resources: workers: step_1_merlin_test_worker: - args: -l INFO + args: -l INFO --concurrency 1 steps: [step_1] step_2_merlin_test_worker: - args: -l INFO + args: -l INFO --concurrency 1 steps: [step_2] other_merlin_test_worker: - args: -l INFO + args: -l INFO --concurrency 1 steps: [step_3, step_4] diff --git a/merlin/main.py b/merlin/main.py index 318232131..9f2eea165 100644 --- a/merlin/main.py +++ b/merlin/main.py @@ -57,6 +57,7 @@ from merlin.server.server_commands import config_server, init_server, restart_server, start_server, status_server, stop_server from merlin.spec.expansion import RESERVED, get_spec_with_expansion from merlin.spec.specification import MerlinSpec +from merlin.study.celerymanageradapter import run_manager, start_manager, stop_manager from merlin.study.status import DetailedStatus, Status from merlin.study.status_constants import VALID_RETURN_CODES, VALID_STATUS_FILTERS from merlin.study.status_renderers import status_renderer_factory @@ -359,7 +360,7 @@ def stop_workers(args): LOG.warning(f"Worker '{worker_name}' is unexpanded. Target provenance spec instead?") # Send stop command to router - router.stop_workers(args.task_server, worker_names, args.queues, args.workers) + router.stop_workers(args.task_server, worker_names, args.queues, args.workers, args.level.upper()) def print_info(args): @@ -400,6 +401,35 @@ def process_example(args: Namespace) -> None: setup_example(args.workflow, args.path) +def process_manager(args: Namespace): + """ + Process the command for managing the workers. + + This function interprets the command provided in the `args` namespace and + executes the corresponding manager function. It supports three commands: + "run", "start", and "stop". + + :param args: parsed CLI arguments + """ + if args.command == "run": + run_manager(query_frequency=args.query_frequency, query_timeout=args.query_timeout, worker_timeout=args.worker_timeout) + elif args.command == "start": + try: + start_manager( + query_frequency=args.query_frequency, query_timeout=args.query_timeout, worker_timeout=args.worker_timeout + ) + LOG.info("Manager started successfully.") + except Exception as e: + LOG.error(f"Unable to start manager.\n{e}") + elif args.command == "stop": + if stop_manager(): + LOG.info("Manager stopped successfully.") + else: + LOG.error("Unable to stop manager.") + else: + print("Run manager with a command. Try 'merlin manager -h' for more details") + + def process_monitor(args): """ CLI command to monitor merlin workers and queues to keep @@ -898,6 +928,75 @@ def generate_worker_touching_parsers(subparsers: ArgumentParser) -> None: help="regex match for specific workers to stop", ) + # merlin manager + manager: ArgumentParser = subparsers.add_parser( + "manager", + help="Watchdog application to manage workers", + description="A daemon process that helps to restart and communicate with workers while running.", + formatter_class=ArgumentDefaultsHelpFormatter, + ) + manager.set_defaults(func=process_manager) + + def add_manager_options(manager_parser: ArgumentParser): + """ + Add shared options for manager subcommands. + + The `manager run` and `manager start` subcommands have the same options. + Rather than writing duplicate code for these we'll use this function + to add the arguments to these subcommands. + + :param manager_parser: The ArgumentParser object to add these options to + """ + manager_parser.add_argument( + "-qf", + "--query_frequency", + action="store", + type=int, + default=60, + help="The frequency at which workers will be queried for response.", + ) + manager_parser.add_argument( + "-qt", + "--query_timeout", + action="store", + type=float, + default=0.5, + help="The timeout for the query response that are sent to workers.", + ) + manager_parser.add_argument( + "-wt", + "--worker_timeout", + action="store", + type=int, + default=180, + help="The sum total (query_frequency*tries) time before an attempt is made to restart worker.", + ) + + manager_commands: ArgumentParser = manager.add_subparsers(dest="command") + manager_run = manager_commands.add_parser( + "run", + help="Run the daemon process", + description="Run manager", + formatter_class=ArgumentDefaultsHelpFormatter, + ) + add_manager_options(manager_run) + manager_run.set_defaults(func=process_manager) + manager_start = manager_commands.add_parser( + "start", + help="Start the daemon process", + description="Start manager", + formatter_class=ArgumentDefaultsHelpFormatter, + ) + add_manager_options(manager_start) + manager_start.set_defaults(func=process_manager) + manager_stop = manager_commands.add_parser( + "stop", + help="Stop the daemon process", + description="Stop manager", + formatter_class=ArgumentDefaultsHelpFormatter, + ) + manager_stop.set_defaults(func=process_manager) + # merlin monitor monitor: ArgumentParser = subparsers.add_parser( "monitor", diff --git a/merlin/managers/__init__.py b/merlin/managers/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/merlin/managers/celerymanager.py b/merlin/managers/celerymanager.py new file mode 100644 index 000000000..fe136d1ec --- /dev/null +++ b/merlin/managers/celerymanager.py @@ -0,0 +1,215 @@ +############################################################################### +# Copyright (c) 2023, Lawrence Livermore National Security, LLC. +# Produced at the Lawrence Livermore National Laboratory +# Written by the Merlin dev team, listed in the CONTRIBUTORS file. +# +# +# LLNL-CODE-797170 +# All rights reserved. +# This file is part of Merlin, Version: 1.12.1. +# +# For details, see https://github.com/LLNL/merlin. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +############################################################################### +import logging +import os +import subprocess +import time + +import psutil + +from merlin.managers.redis_connection import RedisConnectionManager + + +LOG = logging.getLogger(__name__) + + +class WorkerStatus: + running = "Running" + stalled = "Stalled" + stopped = "Stopped" + rebooting = "Rebooting" + + +WORKER_INFO = { + "status": WorkerStatus.running, + "pid": -1, + "monitored": 1, # This setting is for debug mode + "num_unresponsive": 0, + "processing_work": 1, +} + + +class CeleryManager: + def __init__(self, query_frequency: int = 60, query_timeout: float = 0.5, worker_timeout: int = 180): + """ + Initializer for Celery Manager + + :param query_frequency: The frequency at which workers will be queried with ping commands + :param query_timeout: The timeout for the query pings that are sent to workers + :param worker_timeout: The sum total(query_frequency*tries) time before an attempt is made to restart worker. + """ + self.query_frequency = query_frequency + self.query_timeout = query_timeout + self.worker_timeout = worker_timeout + + @staticmethod + def get_worker_status_redis_connection() -> RedisConnectionManager: + """Get the redis connection for info regarding the worker and manager status.""" + return RedisConnectionManager(1) + + @staticmethod + def get_worker_args_redis_connection() -> RedisConnectionManager: + """Get the redis connection for info regarding the args used to generate each worker.""" + return RedisConnectionManager(2) + + def get_celery_workers_status(self, workers: list) -> dict: + """ + Get the worker status of a current worker that is being managed + + :param workers: Workers that are checked. + :return: The result dictionary for each worker and the response. + """ + from merlin.celery import app + + celery_app = app.control + ping_result = celery_app.ping(workers, timeout=self.query_timeout) + worker_results = {worker: status for d in ping_result for worker, status in d.items()} + return worker_results + + def stop_celery_worker(self, worker: str) -> bool: + """ + Stop a celery worker by kill the worker with pid + + :param worker: Worker that is being stopped. + :return: The result of whether a worker was stopped. + """ + + # Get the PID associated with the worker + with self.get_worker_status_redis_connection() as worker_status_connect: + worker_pid = int(worker_status_connect.hget(worker, "pid")) + worker_status = worker_status_connect.hget(worker, "status") + + # TODO be wary of stalled state workers (should not happen since we use psutil.Process.kill()) + # Check to see if the pid exists and worker is set as running + if worker_status == WorkerStatus.running and psutil.pid_exists(worker_pid): + # Check to see if the pid is associated with celery + worker_process = psutil.Process(worker_pid) + if "celery" in worker_process.name(): + # Kill the pid if both conditions are right + worker_process.kill() + return True + return False + + def restart_celery_worker(self, worker: str) -> bool: + """ + Restart a celery worker with the same arguements and parameters during its creation + + :param worker: Worker that is being restarted. + :return: The result of whether a worker was restarted. + """ + + # Stop the worker that is currently running (if possible) + self.stop_celery_worker(worker) + + # Start the worker again with the args saved in redis db + with self.get_worker_args_redis_connection() as worker_args_connect, self.get_worker_status_redis_connection() as worker_status_connect: + + # Get the args and remove the worker_cmd from the hash set + args = worker_args_connect.hgetall(worker) + worker_cmd = args["worker_cmd"] + del args["worker_cmd"] + kwargs = args + for key in args: + if args[key].startswith("link:"): + kwargs[key] = worker_args_connect.hgetall(args[key].split(":", 1)[1]) + elif args[key] == "True": + kwargs[key] = True + elif args[key] == "False": + kwargs[key] = False + + # Run the subprocess for the worker and save the PID + process = subprocess.Popen(worker_cmd, **kwargs) + worker_status_connect.hset(worker, "pid", process.pid) + + return True + + def run(self): + """ + Main manager loop for monitoring and managing Celery workers. + + This method continuously monitors the status of Celery workers by + checking their health and attempting to restart any that are + unresponsive. It updates the Redis database with the current + status of the manager and the workers. + """ + manager_info = { + "status": "Running", + "pid": os.getpid(), + } + + with self.get_worker_status_redis_connection() as redis_connection: + LOG.debug(f"MANAGER: setting manager key in redis to hold the following info {manager_info}") + redis_connection.hset("manager", mapping=manager_info) + + # TODO figure out what to do with "processing_work" entry for the merlin monitor + while True: # TODO Make it so that it will stop after a list of workers is stopped + # Get the list of running workers + workers = redis_connection.keys() + LOG.debug(f"MANAGER: workers: {workers}") + workers.remove("manager") + workers = [worker for worker in workers if int(redis_connection.hget(worker, "monitored"))] + LOG.info(f"MANAGER: Monitoring {workers} workers") + + # Check/ Ping each worker to see if they are still running + if workers: + worker_results = self.get_celery_workers_status(workers) + + # If running set the status on redis that it is running + LOG.info(f"MANAGER: Responsive workers: {worker_results.keys()}") + for worker in list(worker_results.keys()): + redis_connection.hset(worker, "status", WorkerStatus.running) + + # If not running attempt to restart it + for worker in workers: + if worker not in worker_results: + LOG.info(f"MANAGER: Worker '{worker}' is unresponsive.") + # If time where the worker is unresponsive is less than the worker time out then just increment + num_unresponsive = int(redis_connection.hget(worker, "num_unresponsive")) + 1 + if num_unresponsive * self.query_frequency < self.worker_timeout: + # Attempt to restart worker + LOG.info(f"MANAGER: Attempting to restart worker '{worker}'...") + if self.restart_celery_worker(worker): + # If successful set the status to running and reset num_unresponsive + redis_connection.hset(worker, "status", WorkerStatus.running) + redis_connection.hset(worker, "num_unresponsive", 0) + LOG.info(f"MANAGER: Worker '{worker}' restarted.") + else: + # If failed set the status to stalled + redis_connection.hset(worker, "status", WorkerStatus.stalled) + LOG.error(f"MANAGER: Could not restart worker '{worker}'.") + else: + redis_connection.hset(worker, "num_unresponsive", num_unresponsive) + # Sleep for the query_frequency for the next iteration + time.sleep(self.query_frequency) + + +if __name__ == "__main__": + cm = CeleryManager() + cm.run() diff --git a/merlin/managers/redis_connection.py b/merlin/managers/redis_connection.py new file mode 100644 index 000000000..50a63492c --- /dev/null +++ b/merlin/managers/redis_connection.py @@ -0,0 +1,102 @@ +############################################################################### +# Copyright (c) 2023, Lawrence Livermore National Security, LLC. +# Produced at the Lawrence Livermore National Laboratory +# Written by the Merlin dev team, listed in the CONTRIBUTORS file. +# +# +# LLNL-CODE-797170 +# All rights reserved. +# This file is part of Merlin, Version: 1.12.2b1. +# +# For details, see https://github.com/LLNL/merlin. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +############################################################################### +""" +This module stores a manager for redis connections. +""" +import logging + +import redis + + +LOG = logging.getLogger(__name__) + + +class RedisConnectionManager: + """ + A context manager for handling redis connections. + This will ensure safe opening and closing of Redis connections. + """ + + def __init__(self, db_num: int): + self.db_num = db_num + self.connection = None + + def __enter__(self): + self.connection = self.get_redis_connection() + return self.connection + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.connection: + LOG.debug(f"MANAGER: Closing connection at db_num: {self.db_num}") + self.connection.close() + + def get_redis_connection(self) -> redis.Redis: + """ + Generic redis connection function to get the results backend redis server with a given db number increment. + + :return: Redis connection object that can be used to access values for the manager. + """ + from merlin.config.configfile import CONFIG # pylint: disable=import-outside-toplevel + from merlin.config.results_backend import get_backend_password # pylint: disable=import-outside-toplevel + + password_file = CONFIG.results_backend.password if hasattr(CONFIG.results_backend, "password") else None + server = CONFIG.results_backend.server if hasattr(CONFIG.results_backend, "server") else None + port = CONFIG.results_backend.port if hasattr(CONFIG.results_backend, "port") else None + results_db_num = CONFIG.results_backend.db_num if hasattr(CONFIG.results_backend, "db_num") else None + username = CONFIG.results_backend.username if hasattr(CONFIG.results_backend, "username") else None + + password = None + if password_file is not None: + try: + password = get_backend_password(password_file) + except IOError: + if hasattr(CONFIG.results_backend, "password"): + password = CONFIG.results_backend.password + + # Base configuration for Redis connection (this does not have ssl) + redis_config = { + "host": server, + "port": port, + "db": results_db_num + self.db_num, # Increment db_num to avoid conflicts + "username": username, + "password": password, + "decode_responses": True, + } + + # Add ssl settings if necessary + if CONFIG.results_backend.name == "rediss": + redis_config.update( + { + "ssl": True, + "ssl_cert_reqs": getattr(CONFIG.results_backend, "cert_reqs", "required"), + } + ) + + return redis.Redis(**redis_config) diff --git a/merlin/router.py b/merlin/router.py index d9114bbcd..9747b7c45 100644 --- a/merlin/router.py +++ b/merlin/router.py @@ -190,7 +190,7 @@ def get_workers(task_server): return [] -def stop_workers(task_server, spec_worker_names, queues, workers_regex): +def stop_workers(task_server, spec_worker_names, queues, workers_regex, debug_lvl): """ Stops workers. @@ -198,12 +198,13 @@ def stop_workers(task_server, spec_worker_names, queues, workers_regex): :param `spec_worker_names`: Worker names to stop, drawn from a spec. :param `queues` : The queues to stop :param `workers_regex` : Regex for workers to stop + :param debug_lvl: The debug level to use (INFO, DEBUG, ERROR, etc.) """ LOG.info("Stopping workers...") if task_server == "celery": # pylint: disable=R1705 # Stop workers - stop_celery_workers(queues, spec_worker_names, workers_regex) + stop_celery_workers(queues, spec_worker_names, workers_regex, debug_lvl) else: LOG.error("Celery is not specified as the task server!") diff --git a/merlin/study/celeryadapter.py b/merlin/study/celeryadapter.py index 5b5bdd419..0791c2a35 100644 --- a/merlin/study/celeryadapter.py +++ b/merlin/study/celeryadapter.py @@ -46,7 +46,9 @@ from merlin.common.dumper import dump_handler from merlin.config import Config +from merlin.managers.celerymanager import CeleryManager, WorkerStatus from merlin.study.batch import batch_check_parallel, batch_worker_launch +from merlin.study.celerymanageradapter import add_monitor_workers, remove_monitor_workers from merlin.utils import apply_list_of_regex, check_machines, get_procs, get_yaml_var, is_running @@ -500,15 +502,22 @@ def check_celery_workers_processing(queues_in_spec: List[str], app: Celery) -> b """ # Query celery for active tasks active_tasks = app.control.inspect().active() + result = False - # Search for the queues we provided if necessary - if active_tasks is not None: - for tasks in active_tasks.values(): - for task in tasks: - if task["delivery_info"]["routing_key"] in queues_in_spec: - return True + with CeleryManager.get_worker_status_redis_connection() as redis_connection: + # Search for the queues we provided if necessary + if active_tasks is not None: + for worker, tasks in active_tasks.items(): + for task in tasks: + if task["delivery_info"]["routing_key"] in queues_in_spec: + result = True - return False + # Set the entry in the Redis DB for the manager to signify if the worker + # is still doing work + worker_still_processing = 1 if result else 0 + redis_connection.hset(worker, "processing_work", worker_still_processing) + + return result def _get_workers_to_start(spec, steps): @@ -761,8 +770,36 @@ def launch_celery_worker(worker_cmd, worker_list, kwargs): :side effect: Launches a celery worker via a subprocess """ try: - _ = subprocess.Popen(worker_cmd, **kwargs) # pylint: disable=R1732 + process = subprocess.Popen(worker_cmd, **kwargs) # pylint: disable=R1732 + # Get the worker name from worker_cmd and add to be monitored by celery manager + worker_cmd_list = worker_cmd.split() + worker_name = worker_cmd_list[worker_cmd_list.index("-n") + 1].replace("%h", kwargs["env"]["HOSTNAME"]) + worker_name = "celery@" + worker_name worker_list.append(worker_cmd) + + # Adding the worker args to redis db + with CeleryManager.get_worker_args_redis_connection() as redis_connection: + args = kwargs.copy() + # Save worker command with the arguements + args["worker_cmd"] = worker_cmd + # Store the nested dictionaries into a separate key with a link. + # Note: This only support single nested dicts(for simplicity) and + # further nesting can be accomplished by making this recursive. + for key in kwargs: + if type(kwargs[key]) is dict: + key_name = worker_name + "_" + key + redis_connection.hmset(name=key_name, mapping=kwargs[key]) + args[key] = "link:" + key_name + if type(kwargs[key]) is bool: + if kwargs[key]: + args[key] = "True" + else: + args[key] = "False" + redis_connection.hmset(name=worker_name, mapping=args) + + # Adding the worker to redis db to be monitored + add_monitor_workers(workers=((worker_name, process.pid),)) + LOG.info(f"Added {worker_name} to be monitored") except Exception as e: # pylint: disable=C0103 LOG.error(f"Cannot start celery workers, {e}") raise @@ -801,7 +838,7 @@ def purge_celery_tasks(queues, force): return subprocess.run(purge_command, shell=True).returncode -def stop_celery_workers(queues=None, spec_worker_names=None, worker_regex=None): # pylint: disable=R0912 +def stop_celery_workers(queues=None, spec_worker_names=None, worker_regex=None, debug_lvl="INFO"): # pylint: disable=R0912 """Send a stop command to celery workers. Default behavior is to stop all connected workers. @@ -866,6 +903,8 @@ def stop_celery_workers(queues=None, spec_worker_names=None, worker_regex=None): if workers_to_stop: LOG.info(f"Sending stop to these workers: {workers_to_stop}") app.control.broadcast("shutdown", destination=workers_to_stop) + remove_entry = False if debug_lvl == "DEBUG" else True + remove_monitor_workers(workers=workers_to_stop, worker_status=WorkerStatus.stopped, remove_entry=remove_entry) else: LOG.warning("No workers found to stop") diff --git a/merlin/study/celerymanageradapter.py b/merlin/study/celerymanageradapter.py new file mode 100644 index 000000000..6dc07bab2 --- /dev/null +++ b/merlin/study/celerymanageradapter.py @@ -0,0 +1,145 @@ +############################################################################### +# Copyright (c) 2023, Lawrence Livermore National Security, LLC. +# Produced at the Lawrence Livermore National Laboratory +# Written by the Merlin dev team, listed in the CONTRIBUTORS file. +# +# +# LLNL-CODE-797170 +# All rights reserved. +# This file is part of Merlin, Version: 1.12.1. +# +# For details, see https://github.com/LLNL/merlin. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +############################################################################### +import logging +import subprocess + +import psutil + +from merlin.managers.celerymanager import WORKER_INFO, CeleryManager, WorkerStatus + + +LOG = logging.getLogger(__name__) + + +def add_monitor_workers(workers: list): + """ + Adds workers to be monitored by the celery manager. + :param list workers: A list of tuples which includes (worker_name, pid) + """ + if workers is None or len(workers) <= 0: + return + + LOG.info( + f"MANAGER: Attempting to have the manager monitor the following workers {[worker_name for worker_name, _ in workers]}." + ) + monitored_workers = [] + + with CeleryManager.get_worker_status_redis_connection() as redis_connection: + for worker in workers: + LOG.debug(f"MANAGER: Checking if connection for worker '{worker}' exists...") + if redis_connection.exists(worker[0]): + LOG.debug(f"MANAGER: Connection for worker '{worker}' exists. Setting this worker to be monitored") + redis_connection.hset(worker[0], "monitored", 1) + redis_connection.hset(worker[0], "pid", worker[1]) + monitored_workers.append(worker[0]) + else: + LOG.debug(f"MANAGER: Connection for worker '{worker}' does not exist. Not monitoring this worker.") + worker_info = WORKER_INFO + worker_info["pid"] = worker[1] + redis_connection.hmset(name=worker[0], mapping=worker_info) + LOG.info(f"MANAGER: Manager is monitoring the following workers {monitored_workers}.") + + +def remove_monitor_workers(workers: list, worker_status: WorkerStatus = None, remove_entry: bool = True): + """ + Remove workers from being monitored by the celery manager. + :param list workers: A worker names + """ + if workers is None or len(workers) <= 0: + return + with CeleryManager.get_worker_status_redis_connection() as redis_connection: + for worker in workers: + if redis_connection.exists(worker): + redis_connection.hset(worker, "monitored", 0) + if worker_status is not None: + redis_connection.hset(worker, "status", worker_status) + if remove_entry: + redis_connection.delete(worker) + + +def is_manager_runnning() -> bool: + """ + Check to see if the manager is running + + :return: True if manager is running and False if not. + """ + with CeleryManager.get_worker_args_redis_connection() as redis_connection: + manager_status = redis_connection.hgetall("manager") + return manager_status["status"] == WorkerStatus.running and psutil.pid_exists(manager_status["pid"]) + + +def run_manager(query_frequency: int = 60, query_timeout: float = 0.5, worker_timeout: int = 180) -> bool: + """ + A process locking function that calls the celery manager with proper arguments. + + :param query_frequency: The frequency at which workers will be queried with ping commands + :param query_timeout: The timeout for the query pings that are sent to workers + :param worker_timeout: The sum total(query_frequency*tries) time before an attempt is made to restart worker. + """ + celerymanager = CeleryManager(query_frequency=query_frequency, query_timeout=query_timeout, worker_timeout=worker_timeout) + celerymanager.run() + + +def start_manager(query_frequency: int = 60, query_timeout: float = 0.5, worker_timeout: int = 180) -> bool: + """ + A Non-locking function that calls the celery manager with proper arguments. + + :param query_frequency: The frequency at which workers will be queried with ping commands + :param query_timeout: The timeout for the query pings that are sent to workers + :param worker_timeout: The sum total(query_frequency*tries) time before an attempt is made to restart worker. + :return bool: True if the manager was started successfully. + """ + subprocess.Popen( + f"merlin manager run -qf {query_frequency} -qt {query_timeout} -wt {worker_timeout}", + shell=True, + close_fds=True, + stdout=subprocess.PIPE, + ) + return True + + +def stop_manager() -> bool: + """ + Stop the manager process using it's pid. + + :return bool: True if the manager was stopped successfully and False otherwise. + """ + with CeleryManager.get_worker_status_redis_connection() as redis_connection: + LOG.debug(f"MANAGER: manager keys: {redis_connection.hgetall('manager')}") + manager_pid = int(redis_connection.hget("manager", "pid")) + manager_status = redis_connection.hget("manager", "status") + LOG.debug(f"MANAGER: manager_status: {manager_status}") + LOG.debug(f"MANAGER: pid exists: {psutil.pid_exists(manager_pid)}") + + # Check to make sure that the manager is running and the pid exists + if manager_status == WorkerStatus.running and psutil.pid_exists(manager_pid): + psutil.Process(manager_pid).terminate() + return True + return False diff --git a/tests/conftest.py b/tests/conftest.py index 11bd93134..374b8d836 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -99,6 +99,22 @@ def create_encryption_file(key_filepath: str, encryption_key: bytes, app_yaml_fi ####################################### +@pytest.fixture(scope="session") +def path_to_test_specs() -> str: + """ + Fixture to provide the path to the directory containing test specifications. + + This fixture returns the absolute path to the 'test_specs' directory + within the 'integration' folder of the test directory. It expands + environment variables and user home directory as necessary. + + Returns: + The absolute path to the 'test_specs' directory. + """ + path_to_test_dir = os.path.abspath(os.path.expandvars(os.path.expanduser(os.path.dirname(__file__)))) + return os.path.join(path_to_test_dir, os.path.join("integration", "test_specs")) + + @pytest.fixture(scope="session") def temp_output_dir(tmp_path_factory: TempPathFactory) -> str: """ @@ -130,7 +146,7 @@ def merlin_server_dir(temp_output_dir: str) -> str: :param temp_output_dir: The path to the temporary output directory we'll be using for this test run :returns: The path to the merlin_server directory that will be created by the `redis_server` fixture """ - server_dir = f"{temp_output_dir}/merlin_server" + server_dir = os.path.join(temp_output_dir, "merlin_server") if not os.path.exists(server_dir): os.mkdir(server_dir) return server_dir @@ -146,11 +162,14 @@ def redis_server(merlin_server_dir: str, test_encryption_key: bytes) -> str: :param test_encryption_key: An encryption key to be used for testing :yields: The local redis server uri """ + os.environ["CELERY_ENV"] = "test" with RedisServerManager(merlin_server_dir, SERVER_PASS) as redis_server_manager: redis_server_manager.initialize_server() redis_server_manager.start_server() create_encryption_file( - f"{merlin_server_dir}/encrypt_data_key", test_encryption_key, app_yaml_filepath=f"{merlin_server_dir}/app.yaml" + os.path.join(merlin_server_dir, "encrypt_data_key"), + test_encryption_key, + app_yaml_filepath=os.path.join(merlin_server_dir, "app.yaml"), ) # Yield the redis_server uri to any fixtures/tests that may need it yield redis_server_manager.redis_server_uri @@ -165,6 +184,7 @@ def celery_app(redis_server: str) -> Celery: :param redis_server: The redis server uri we'll use to connect to redis :returns: The celery app object we'll use for testing """ + os.environ["CELERY_ENV"] = "test" return Celery("merlin_test_app", broker=redis_server, backend=redis_server) @@ -258,7 +278,7 @@ def config(merlin_server_dir: str, test_encryption_key: bytes): orig_config = copy(CONFIG) # Create an encryption key file (if it doesn't already exist) - key_file = f"{merlin_server_dir}/encrypt_data_key" + key_file = os.path.join(merlin_server_dir, "encrypt_data_key") create_encryption_file(key_file, test_encryption_key) # Set the broker configuration for testing @@ -305,7 +325,7 @@ def redis_broker_config( :param merlin_server_dir: The directory to the merlin test server configuration :param config: The fixture that sets up most of the CONFIG object for testing """ - pass_file = f"{merlin_server_dir}/redis.pass" + pass_file = os.path.join(merlin_server_dir, "redis.pass") create_pass_file(pass_file) CONFIG.broker.password = pass_file @@ -326,7 +346,7 @@ def redis_results_backend_config( :param merlin_server_dir: The directory to the merlin test server configuration :param config: The fixture that sets up most of the CONFIG object for testing """ - pass_file = f"{merlin_server_dir}/redis.pass" + pass_file = os.path.join(merlin_server_dir, "redis.pass") create_pass_file(pass_file) CONFIG.results_backend.password = pass_file @@ -347,7 +367,7 @@ def rabbit_broker_config( :param merlin_server_dir: The directory to the merlin test server configuration :param config: The fixture that sets up most of the CONFIG object for testing """ - pass_file = f"{merlin_server_dir}/rabbit.pass" + pass_file = os.path.join(merlin_server_dir, "rabbit.pass") create_pass_file(pass_file) CONFIG.broker.password = pass_file @@ -368,7 +388,7 @@ def mysql_results_backend_config( :param merlin_server_dir: The directory to the merlin test server configuration :param config: The fixture that sets up most of the CONFIG object for testing """ - pass_file = f"{merlin_server_dir}/mysql.pass" + pass_file = os.path.join(merlin_server_dir, "mysql.pass") create_pass_file(pass_file) create_cert_files(merlin_server_dir, CERT_FILES) diff --git a/tests/context_managers/celery_workers_manager.py b/tests/context_managers/celery_workers_manager.py index 118aa21a1..2daecb612 100644 --- a/tests/context_managers/celery_workers_manager.py +++ b/tests/context_managers/celery_workers_manager.py @@ -80,8 +80,9 @@ def __exit__(self, exc_type: Type[Exception], exc_value: Exception, traceback: T try: if str(pid) in ps_proc.stdout: os.kill(pid, signal.SIGKILL) - except ProcessLookupError as exc: - raise ProcessLookupError(f"PID {pid} not found. Output of 'ps ux':\n{ps_proc.stdout}") from exc + # If the process can't be found then it doesn't exist anymore + except ProcessLookupError: + pass def _is_worker_ready(self, worker_name: str, verbose: bool = False) -> bool: """ @@ -158,6 +159,8 @@ def launch_worker(self, worker_name: str, queues: List[str], concurrency: int = self.stop_all_workers() raise ValueError(f"The worker {worker_name} is already running. Choose a different name.") + queues = [f"[merlin]_{queue}" for queue in queues] + # Create the launch command for this worker worker_launch_cmd = [ "worker", @@ -174,7 +177,7 @@ def launch_worker(self, worker_name: str, queues: List[str], concurrency: int = # Create an echo command to simulate a running celery worker since our celery worker will be spun up in # a different process and we won't be able to see it with 'ps ux' like we normally would echo_process = subprocess.Popen( # pylint: disable=consider-using-with - f"echo 'celery merlin_test_app {' '.join(worker_launch_cmd)}'; sleep inf", + f"echo 'celery -A merlin_test_app {' '.join(worker_launch_cmd)}'; sleep inf", shell=True, preexec_fn=os.setpgrp, # Make this the parent of the group so we can kill the 'sleep inf' that's spun up ) diff --git a/tests/context_managers/server_manager.py b/tests/context_managers/server_manager.py index b99afb2c6..bb5d86036 100644 --- a/tests/context_managers/server_manager.py +++ b/tests/context_managers/server_manager.py @@ -91,7 +91,7 @@ def stop_server(self): if "Merlin server terminated." not in kill_process.stderr: # If it wasn't, try to kill the process by using the pid stored in a file created by `merlin server` try: - with open(f"{self.server_dir}/merlin_server.pf", "r") as process_file: + with open(os.path.join(self.server_dir, "merlin_server.pf"), "r") as process_file: server_process_info = yaml.load(process_file, yaml.Loader) os.kill(int(server_process_info["image_pid"]), signal.SIGKILL) # If the file can't be found then let's make sure there's even a redis-server process running diff --git a/tests/integration/commands/__init__.py b/tests/integration/commands/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/integration/commands/test_stop_and_query_workers.py b/tests/integration/commands/test_stop_and_query_workers.py new file mode 100644 index 000000000..00215137d --- /dev/null +++ b/tests/integration/commands/test_stop_and_query_workers.py @@ -0,0 +1,443 @@ +""" +This module will contain the base class used for testing +the `stop-workers` and `query-workers` commands. +""" + +import os +import subprocess +from contextlib import contextmanager +from enum import Enum +from typing import List + +import pytest + +from tests.context_managers.celery_workers_manager import CeleryWorkersManager +from tests.integration.conditions import Condition, HasRegex +from tests.integration.helper_funcs import check_test_conditions, copy_app_yaml_to_cwd, load_workers_from_spec + + +# pylint: disable=unused-argument,import-outside-toplevel,too-many-arguments + + +class WorkerMessages(Enum): + """ + Enumerated strings to help keep track of the messages + that we're expecting (or not expecting) to see from the + tests in this module. + """ + + NO_WORKERS_MSG_STOP = "No workers found to stop" + NO_WORKERS_MSG_QUERY = "No workers found!" + STEP_1_WORKER = "step_1_merlin_test_worker" + STEP_2_WORKER = "step_2_merlin_test_worker" + OTHER_WORKER = "other_merlin_test_worker" + + +class TestStopAndQueryWorkersCommands: + """ + Tests for the `merlin stop-workers` and `merlin query-workers` commands. + Most of these tests will: + 1. Start workers from a spec file used for testing + - Use CeleryWorkerManager for this to ensure safe stoppage of workers + if something goes wrong + 2. Run the test command from a subprocess + """ + + @contextmanager + def run_test_with_workers( + self, + path_to_test_specs: str, + merlin_server_dir: str, + conditions: List[Condition], + command: str, + flag: str = None, + ): + """ + Helper method to run common testing logic for tests with workers started. + This method must also be a context manager so we can check the status of the + workers prior to the CeleryWorkersManager running it's exit code that shuts down + all active workers. + + This method will: + 0. Read in the necessary fixtures as parameters. These fixtures grab paths to + our test specs and the merlin server directory created from starting the + containerized redis server. + 1. Load in the worker specifications from the `multiple_workers.yaml` file. + 2. Use a context manager to start up the workers on the celery app connected to + the containerized redis server + 3. Copy the app.yaml file for the containerized redis server to the current working + directory so that merlin will connect to it when we run our test + 4. Run the test command that's provided and check that the conditions given are + passing. + 5. Yield control back to the calling method. + 6. Safely terminate workers that may have not been stopped once the calling method + completes. + + Parameters: + path_to_test_specs: + A fixture to provide the path to the directory containing test specifications. + merlin_server_dir: + A fixture to provide the path to the merlin_server directory that will be + created by the `redis_server` fixture. + conditions: + A list of `Condition` instances that need to pass in order for this test to + be successful. + command: + The command that we're testing. E.g. "merlin stop-workers" + flag: + An optional flag to add to the command that we're testing so we can test + different functionality for the command. + """ + from merlin.celery import app as celery_app + + # Grab worker configurations from the spec file + multiple_worker_spec = os.path.join(path_to_test_specs, "multiple_workers.yaml") + workers_from_spec = load_workers_from_spec(multiple_worker_spec) + + # We use a context manager to start workers so that they'll safely stop even if this test fails + with CeleryWorkersManager(celery_app) as workers_manager: + workers_manager.launch_workers(workers_from_spec) + + # Copy the app.yaml to the cwd so merlin will connect to the testing server + copy_app_yaml_to_cwd(merlin_server_dir) + + # Run the test + cmd_to_test = f"{command} {flag}" if flag else command + result = subprocess.run(cmd_to_test, capture_output=True, text=True, shell=True) + + info = { + "stdout": result.stdout, + "stderr": result.stderr, + "return_code": result.returncode, + } + + # Ensure all test conditions are satisfied + check_test_conditions(conditions, info) + + yield + + def get_no_workers_msg(self, command_to_test: str) -> WorkerMessages: + """ + Retrieve the appropriate "no workers" found message. + + This method checks the command to test and returns a corresponding + message based on whether the command is to stop workers or query for them. + + Returns: + The message indicating that no workers are available, depending on the + command being tested. + """ + no_workers_msg = None + if command_to_test == "merlin stop-workers": + no_workers_msg = WorkerMessages.NO_WORKERS_MSG_STOP.value + else: + no_workers_msg = WorkerMessages.NO_WORKERS_MSG_QUERY.value + return no_workers_msg + + @pytest.mark.parametrize("command_to_test", ["merlin stop-workers", "merlin query-workers"]) + def test_no_workers( + self, + redis_server: str, + redis_results_backend_config: "Fixture", # noqa: F821 + redis_broker_config: "Fixture", # noqa: F821 + merlin_server_dir: str, + command_to_test: str, + ): + """ + Test the `merlin stop-workers` and `merlin query-workers` commands with no workers + started in the first place. + + This test will: + 0. Setup the pytest fixtures which include: + - starting a containerized Redis server + - updating the CONFIG object to point to the containerized Redis server + - obtaining the path to the merlin server directory created from starting + the containerized Redis server + 1. Copy the app.yaml file for the containerized redis server to the current working + directory so that merlin will connect to it when we run our test + 2. Run the test command that's provided and check that the conditions given are + passing. + + Parameters: + redis_server: + A fixture that starts a containerized redis server instance that runs on + localhost:6379. + redis_results_backend_config: + A fixture that modifies the CONFIG object so that it points the results + backend configuration to the containerized redis server we start up with + the `redis_server` fixture. The CONFIG object is what merlin uses to connect + to a server. + redis_broker_config: + A fixture that modifies the CONFIG object so that it points the broker + configuration to the containerized redis server we start up with the + `redis_server` fixture. The CONFIG object is what merlin uses to connect + to a server. + merlin_server_dir: + A fixture to provide the path to the merlin_server directory that will be + created by the `redis_server` fixture. + command_to_test: + The command that we're testing, obtained from the parametrize call. + """ + conditions = [ + HasRegex(self.get_no_workers_msg(command_to_test)), + HasRegex(WorkerMessages.STEP_1_WORKER.value, negate=True), + HasRegex(WorkerMessages.STEP_2_WORKER.value, negate=True), + HasRegex(WorkerMessages.OTHER_WORKER.value, negate=True), + ] + + # Copy the app.yaml to the cwd so merlin will connect to the testing server + copy_app_yaml_to_cwd(merlin_server_dir) + + # Run the test + result = subprocess.run(command_to_test, capture_output=True, text=True, shell=True) + info = { + "stdout": result.stdout, + "stderr": result.stderr, + "return_code": result.returncode, + } + + # Ensure all test conditions are satisfied + check_test_conditions(conditions, info) + + @pytest.mark.parametrize("command_to_test", ["merlin stop-workers", "merlin query-workers"]) + def test_no_flags( + self, + redis_server: str, + redis_results_backend_config: "Fixture", # noqa: F821 + redis_broker_config: "Fixture", # noqa: F821 + path_to_test_specs: str, + merlin_server_dir: str, + command_to_test: str, + ): + """ + Test the `merlin stop-workers` and `merlin query-workers` commands with no flags. + + Run the commands referenced above and ensure the text output from Merlin is correct. + For the `stop-workers` command, we check if all workers are stopped as well. + To see more information on exactly what this test is doing, see the + `run_test_with_workers()` method. + + Parameters: + redis_server: + A fixture that starts a containerized redis server instance that runs on + localhost:6379. + redis_results_backend_config: + A fixture that modifies the CONFIG object so that it points the results + backend configuration to the containerized redis server we start up with + the `redis_server` fixture. The CONFIG object is what merlin uses to connect + to a server. + redis_broker_config: + A fixture that modifies the CONFIG object so that it points the broker + configuration to the containerized redis server we start up with the + `redis_server` fixture. The CONFIG object is what merlin uses to connect + to a server. + path_to_test_specs: + A fixture to provide the path to the directory containing test specifications. + merlin_server_dir: + A fixture to provide the path to the merlin_server directory that will be + created by the `redis_server` fixture. + command_to_test: + The command that we're testing, obtained from the parametrize call. + """ + conditions = [ + HasRegex(self.get_no_workers_msg(command_to_test), negate=True), + HasRegex(WorkerMessages.STEP_1_WORKER.value), + HasRegex(WorkerMessages.STEP_2_WORKER.value), + HasRegex(WorkerMessages.OTHER_WORKER.value), + ] + with self.run_test_with_workers(path_to_test_specs, merlin_server_dir, conditions, command_to_test): + if command_to_test == "merlin stop-workers": + # After the test runs and before the CeleryWorkersManager exits, ensure there are no workers on the app + from merlin.celery import app as celery_app + + active_queues = celery_app.control.inspect().active_queues() + assert active_queues is None + + @pytest.mark.parametrize("command_to_test", ["merlin stop-workers", "merlin query-workers"]) + def test_spec_flag( + self, + redis_server: str, + redis_results_backend_config: "Fixture", # noqa: F821 + redis_broker_config: "Fixture", # noqa: F821 + path_to_test_specs: str, + merlin_server_dir: str, + command_to_test: str, + ): + """ + Test the `merlin stop-workers` and `merlin query-workers` commands with the `--spec` + flag. + + Run the commands referenced above with the `--spec` flag and ensure the text output + from Merlin is correct. For the `stop-workers` command, we check if all workers defined + in the spec file are stopped as well. To see more information on exactly what this test + is doing, see the `run_test_with_workers()` method. + + Parameters: + redis_server: + A fixture that starts a containerized redis server instance that runs on + localhost:6379. + redis_results_backend_config: + A fixture that modifies the CONFIG object so that it points the results + backend configuration to the containerized redis server we start up with + the `redis_server` fixture. The CONFIG object is what merlin uses to connect + to a server. + redis_broker_config: + A fixture that modifies the CONFIG object so that it points the broker + configuration to the containerized redis server we start up with the + `redis_server` fixture. The CONFIG object is what merlin uses to connect + to a server. + path_to_test_specs: + A fixture to provide the path to the directory containing test specifications. + merlin_server_dir: + A fixture to provide the path to the merlin_server directory that will be + created by the `redis_server` fixture. + command_to_test: + The command that we're testing, obtained from the parametrize call. + """ + conditions = [ + HasRegex(self.get_no_workers_msg(command_to_test), negate=True), + HasRegex(WorkerMessages.STEP_1_WORKER.value), + HasRegex(WorkerMessages.STEP_2_WORKER.value), + HasRegex(WorkerMessages.OTHER_WORKER.value), + ] + with self.run_test_with_workers( + path_to_test_specs, + merlin_server_dir, + conditions, + command_to_test, + flag=f"--spec {os.path.join(path_to_test_specs, 'multiple_workers.yaml')}", + ): + if command_to_test == "merlin stop-workers": + from merlin.celery import app as celery_app + + active_queues = celery_app.control.inspect().active_queues() + assert active_queues is None + + @pytest.mark.parametrize("command_to_test", ["merlin stop-workers", "merlin query-workers"]) + def test_workers_flag( + self, + redis_server: str, + redis_results_backend_config: "Fixture", # noqa: F821 + redis_broker_config: "Fixture", # noqa: F821 + path_to_test_specs: str, + merlin_server_dir: str, + command_to_test: str, + ): + """ + Test the `merlin stop-workers` and `merlin query-workers` commands with the `--workers` + flag. + + Run the commands referenced above with the `--workers` flag and ensure the text output + from Merlin is correct. For the `stop-workers` command, we check to make sure that all + workers given with this flag are stopped. To see more information on exactly what this + test is doing, see the `run_test_with_workers()` method. + + Parameters: + redis_server: + A fixture that starts a containerized redis server instance that runs on + localhost:6379. + redis_results_backend_config: + A fixture that modifies the CONFIG object so that it points the results + backend configuration to the containerized redis server we start up with + the `redis_server` fixture. The CONFIG object is what merlin uses to connect + to a server. + redis_broker_config: + A fixture that modifies the CONFIG object so that it points the broker + configuration to the containerized redis server we start up with the + `redis_server` fixture. The CONFIG object is what merlin uses to connect + to a server. + path_to_test_specs: + A fixture to provide the path to the directory containing test specifications. + merlin_server_dir: + A fixture to provide the path to the merlin_server directory that will be + created by the `redis_server` fixture. + command_to_test: + The command that we're testing, obtained from the parametrize call. + """ + conditions = [ + HasRegex(self.get_no_workers_msg(command_to_test), negate=True), + HasRegex(WorkerMessages.STEP_1_WORKER.value), + HasRegex(WorkerMessages.STEP_2_WORKER.value), + HasRegex(WorkerMessages.OTHER_WORKER.value, negate=True), + ] + with self.run_test_with_workers( + path_to_test_specs, + merlin_server_dir, + conditions, + command_to_test, + flag=f"--workers {WorkerMessages.STEP_1_WORKER.value} {WorkerMessages.STEP_2_WORKER.value}", + ): + if command_to_test == "merlin stop-workers": + from merlin.celery import app as celery_app + + active_queues = celery_app.control.inspect().active_queues() + worker_name = f"celery@{WorkerMessages.OTHER_WORKER.value}" + assert worker_name in active_queues + + @pytest.mark.parametrize("command_to_test", ["merlin stop-workers", "merlin query-workers"]) + def test_queues_flag( + self, + redis_server: str, + redis_results_backend_config: "Fixture", # noqa: F821 + redis_broker_config: "Fixture", # noqa: F821 + path_to_test_specs: str, + merlin_server_dir: str, + command_to_test: str, + ): + """ + Test the `merlin stop-workers` and `merlin query-workers` commands with the `--queues` + flag. + + Run the commands referenced above with the `--queues` flag and ensure the text output + from Merlin is correct. For the `stop-workers` command, we check that only the workers + attached to the given queues are stopped. To see more information on exactly what this + test is doing, see the `run_test_with_workers()` method. + + Parameters: + redis_server: + A fixture that starts a containerized redis server instance that runs on + localhost:6379. + redis_results_backend_config: + A fixture that modifies the CONFIG object so that it points the results + backend configuration to the containerized redis server we start up with + the `redis_server` fixture. The CONFIG object is what merlin uses to connect + to a server. + redis_broker_config: + A fixture that modifies the CONFIG object so that it points the broker + configuration to the containerized redis server we start up with the + `redis_server` fixture. The CONFIG object is what merlin uses to connect + to a server. + path_to_test_specs: + A fixture to provide the path to the directory containing test specifications. + merlin_server_dir: + A fixture to provide the path to the merlin_server directory that will be + created by the `redis_server` fixture. + command_to_test: + The command that we're testing, obtained from the parametrize call. + """ + conditions = [ + HasRegex(self.get_no_workers_msg(command_to_test), negate=True), + HasRegex(WorkerMessages.STEP_1_WORKER.value), + HasRegex(WorkerMessages.STEP_2_WORKER.value, negate=True), + HasRegex(WorkerMessages.OTHER_WORKER.value, negate=True), + ] + with self.run_test_with_workers( + path_to_test_specs, + merlin_server_dir, + conditions, + command_to_test, + flag="--queues hello_queue", + ): + if command_to_test == "merlin stop-workers": + from merlin.celery import app as celery_app + + active_queues = celery_app.control.inspect().active_queues() + workers_that_should_be_alive = [ + f"celery@{WorkerMessages.OTHER_WORKER.value}", + f"celery@{WorkerMessages.STEP_2_WORKER.value}", + ] + for worker_name in workers_that_should_be_alive: + assert worker_name in active_queues + + +# pylint: enable=unused-argument,import-outside-toplevel,too-many-arguments diff --git a/tests/integration/definitions.py b/tests/integration/definitions.py index 59c1fa256..eeb23fc71 100644 --- a/tests/integration/definitions.py +++ b/tests/integration/definitions.py @@ -109,8 +109,6 @@ def define_tests(): # pylint: disable=R0914,R0915 run = f"merlin {err_lvl} run" restart = f"merlin {err_lvl} restart" purge = "merlin purge" - stop = "merlin stop-workers" - query = "merlin query-workers" # Shortcuts for example workflow paths examples = "merlin/examples/workflows" @@ -126,7 +124,6 @@ def define_tests(): # pylint: disable=R0914,R0915 flux_restart = f"{examples}/flux/flux_par_restart.yaml" flux_native = f"{test_specs}/flux_par_native_test.yaml" lsf = f"{examples}/lsf/lsf_par.yaml" - mul_workers_demo = f"{dev_examples}/multiple_workers.yaml" cli_substitution_wf = f"{test_specs}/cli_substitution_test.yaml" chord_err_wf = f"{test_specs}/chord_err.yaml" @@ -650,160 +647,6 @@ def define_tests(): # pylint: disable=R0914,R0915 "run type": "local", }, } - stop_workers_tests = { - "stop workers no workers": { - "cmds": f"{stop}", - "conditions": [ - HasReturnCode(), - HasRegex("No workers found to stop"), - HasRegex("step_1_merlin_test_worker", negate=True), - HasRegex("step_2_merlin_test_worker", negate=True), - HasRegex("other_merlin_test_worker", negate=True), - ], - "run type": "distributed", - }, - "stop workers no flags": { - "cmds": [ - f"{workers} {mul_workers_demo}", - f"{stop}", - ], - "conditions": [ - HasReturnCode(), - HasRegex("No workers found to stop", negate=True), - HasRegex("step_1_merlin_test_worker"), - HasRegex("step_2_merlin_test_worker"), - HasRegex("other_merlin_test_worker"), - ], - "run type": "distributed", - "cleanup": KILL_WORKERS, - "num procs": 2, - }, - "stop workers with spec flag": { - "cmds": [ - f"{workers} {mul_workers_demo}", - f"{stop} --spec {mul_workers_demo}", - ], - "conditions": [ - HasReturnCode(), - HasRegex("No workers found to stop", negate=True), - HasRegex("step_1_merlin_test_worker"), - HasRegex("step_2_merlin_test_worker"), - HasRegex("other_merlin_test_worker"), - ], - "run type": "distributed", - "cleanup": KILL_WORKERS, - "num procs": 2, - }, - "stop workers with workers flag": { - "cmds": [ - f"{workers} {mul_workers_demo}", - f"{stop} --workers step_1_merlin_test_worker step_2_merlin_test_worker", - ], - "conditions": [ - HasReturnCode(), - HasRegex("No workers found to stop", negate=True), - HasRegex("step_1_merlin_test_worker"), - HasRegex("step_2_merlin_test_worker"), - HasRegex("other_merlin_test_worker", negate=True), - ], - "run type": "distributed", - "cleanup": KILL_WORKERS, - "num procs": 2, - }, - "stop workers with queues flag": { - "cmds": [ - f"{workers} {mul_workers_demo}", - f"{stop} --queues hello_queue", - ], - "conditions": [ - HasReturnCode(), - HasRegex("No workers found to stop", negate=True), - HasRegex("step_1_merlin_test_worker"), - HasRegex("step_2_merlin_test_worker", negate=True), - HasRegex("other_merlin_test_worker", negate=True), - ], - "run type": "distributed", - "cleanup": KILL_WORKERS, - "num procs": 2, - }, - } - query_workers_tests = { - "query workers no workers": { - "cmds": f"{query}", - "conditions": [ - HasReturnCode(), - HasRegex("No workers found!"), - HasRegex("step_1_merlin_test_worker", negate=True), - HasRegex("step_2_merlin_test_worker", negate=True), - HasRegex("other_merlin_test_worker", negate=True), - ], - "run type": "distributed", - }, - "query workers no flags": { - "cmds": [ - f"{workers} {mul_workers_demo}", - f"{query}", - ], - "conditions": [ - HasReturnCode(), - HasRegex("No workers found!", negate=True), - HasRegex("step_1_merlin_test_worker"), - HasRegex("step_2_merlin_test_worker"), - HasRegex("other_merlin_test_worker"), - ], - "run type": "distributed", - "cleanup": KILL_WORKERS, - "num procs": 2, - }, - "query workers with spec flag": { - "cmds": [ - f"{workers} {mul_workers_demo}", - f"{query} --spec {mul_workers_demo}", - ], - "conditions": [ - HasReturnCode(), - HasRegex("No workers found!", negate=True), - HasRegex("step_1_merlin_test_worker"), - HasRegex("step_2_merlin_test_worker"), - HasRegex("other_merlin_test_worker"), - ], - "run type": "distributed", - "cleanup": KILL_WORKERS, - "num procs": 2, - }, - "query workers with workers flag": { - "cmds": [ - f"{workers} {mul_workers_demo}", - f"{query} --workers step_1_merlin_test_worker step_2_merlin_test_worker", - ], - "conditions": [ - HasReturnCode(), - HasRegex("No workers found!", negate=True), - HasRegex("step_1_merlin_test_worker"), - HasRegex("step_2_merlin_test_worker"), - HasRegex("other_merlin_test_worker", negate=True), - ], - "run type": "distributed", - "cleanup": KILL_WORKERS, - "num procs": 2, - }, - "query workers with queues flag": { - "cmds": [ - f"{workers} {mul_workers_demo}", - f"{query} --queues hello_queue", - ], - "conditions": [ - HasReturnCode(), - HasRegex("No workers found!", negate=True), - HasRegex("step_1_merlin_test_worker"), - HasRegex("step_2_merlin_test_worker", negate=True), - HasRegex("other_merlin_test_worker", negate=True), - ], - "run type": "distributed", - "cleanup": KILL_WORKERS, - "num procs": 2, - }, - } distributed_tests = { # noqa: F841 "run and purge feature_demo": { "cmds": f"{run} {demo}; {purge} {demo} -f", @@ -876,8 +719,6 @@ def define_tests(): # pylint: disable=R0914,R0915 # provenence_equality_checks, # omitting provenance equality check because it is broken # style_checks, # omitting style checks due to different results on different machines dependency_checks, - stop_workers_tests, - query_workers_tests, distributed_tests, distributed_error_checks, ]: diff --git a/tests/integration/helper_funcs.py b/tests/integration/helper_funcs.py new file mode 100644 index 000000000..fc976a68e --- /dev/null +++ b/tests/integration/helper_funcs.py @@ -0,0 +1,112 @@ +""" +This module contains helper functions for the integration +test suite. +""" + +import os +import re +import shutil +from typing import Dict, List + +import yaml + +from tests.integration.conditions import Condition + + +def load_workers_from_spec(spec_filepath: str) -> dict: + """ + Load worker specifications from a YAML file. + + This function reads a YAML file containing study specifications and + extracts the worker information under the "merlin" section. It + constructs a dictionary in the form that CeleryWorkersManager.launch_workers + requires. + + Parameters: + spec_filepath: The file path to the YAML specification file. + + Returns: + A dictionary containing the worker specifications from the + "merlin" section of the YAML file. + """ + # Read in the contents of the spec file + with open(spec_filepath, "r") as spec_file: + spec_contents = yaml.load(spec_file, yaml.Loader) + + # Initialize an empty dictionary to hold worker_info + worker_info = {} + + # Access workers and steps from spec_contents + workers = spec_contents["merlin"]["resources"]["workers"] + study_steps = {step["name"]: step["run"]["task_queue"] for step in spec_contents["study"]} + + # Grab the concurrency and queues from each worker and add it to the worker_info dict + for worker_name, worker_settings in workers.items(): + match = re.search(r"--concurrency\s+(\d+)", worker_settings["args"]) + concurrency = int(match.group(1)) if match else 1 + queues = [study_steps[step] for step in worker_settings["steps"]] + worker_info[worker_name] = {"concurrency": concurrency, "queues": queues} + + return worker_info + + +def copy_app_yaml_to_cwd(merlin_server_dir: str): + """ + Copy the app.yaml file from the directory provided to the current working + directory. + + Grab the app.yaml file from `merlin_server_dir` and copy it to the current + working directory so that Merlin will read this in as the server configuration + for whatever test is calling this. + + Parameters: + merlin_server_dir: + The path to the `merlin_server` directory that should be created by the + `redis_server` fixture. + """ + copied_app_yaml = os.path.join(os.getcwd(), "app.yaml") + if not os.path.exists(copied_app_yaml): + server_app_yaml = os.path.join(merlin_server_dir, "app.yaml") + shutil.copy(server_app_yaml, copied_app_yaml) + + +def check_test_conditions(conditions: List[Condition], info: Dict[str, str]): + """ + Ensure all specified test conditions are satisfied based on the output + from a subprocess. + + This function iterates through a list of `Condition` instances, ingests + the provided information (stdout, stderr, and return code) for each + condition, and checks if each condition passes. If any condition fails, + an AssertionError is raised with a detailed message that includes the + condition that failed, along with the captured output and return code. + + Parameters: + conditions: + A list of Condition instances that define the expectations for the test. + info: + A dictionary containing the output from the subprocess, which should + include the following keys: + - 'stdout': The standard output captured from the subprocess. + - 'stderr': The standard error output captured from the subprocess. + - 'return_code': The return code of the subprocess, indicating success + or failure of the command executed. + + Raises: + AssertionError + If any of the conditions do not pass, an AssertionError is raised with + a detailed message including the failed condition and the subprocess + output. + """ + for condition in conditions: + condition.ingest_info(info) + try: + assert condition.passes + except AssertionError as exc: + error_message = ( + f"Condition failed: {condition}\n" + f"Captured stdout: {info['stdout']}\n" + f"Captured stderr: {info['stderr']}\n" + f"Return code: {info['return_code']}\n" + ) + raise AssertionError(error_message) from exc diff --git a/tests/integration/test_celeryadapter.py b/tests/integration/test_celeryadapter.py index 0572d6c66..60e24bb9a 100644 --- a/tests/integration/test_celeryadapter.py +++ b/tests/integration/test_celeryadapter.py @@ -34,12 +34,9 @@ import json import os from datetime import datetime -from time import sleep from typing import Dict -import pytest from celery import Celery -from celery.canvas import Signature from deepdiff import DeepDiff from merlin.config import Config @@ -49,150 +46,153 @@ from tests.unit.study.status_test_files.status_test_variables import SPEC_PATH -@pytest.mark.order(before="TestInactive") -class TestActive: - """ - This class will test functions in the celeryadapter.py module. - It will run tests where we need active queues/workers to interact with. - - NOTE: The tests in this class must be ran before the TestInactive class or else the - Celery workers needed for this class don't start - - TODO: fix the bug noted above and then check if we still need pytest-order - """ - - def test_query_celery_queues( - self, celery_app: Celery, launch_workers: "Fixture", worker_queue_map: Dict[str, str] # noqa: F821 - ): - """ - Test the query_celery_queues function by providing it with a list of active queues. - This should return a dict where keys are queue names and values are more dicts containing - the number of jobs and consumers in that queue. - - :param `celery_app`: A pytest fixture for the test Celery app - :param launch_workers: A pytest fixture that launches celery workers for us to interact with - :param worker_queue_map: A pytest fixture that returns a dict of workers and queues - """ - # Set up a dummy configuration to use in the test - dummy_config = Config({"broker": {"name": "redis"}}) - - # Get the actual output - queues_to_query = list(worker_queue_map.values()) - actual_queue_info = celeryadapter.query_celery_queues(queues_to_query, app=celery_app, config=dummy_config) - - # Ensure all 3 queues in worker_queue_map were queried before looping - assert len(actual_queue_info) == 3 - - # Ensure each queue has a worker attached - for queue_name, queue_info in actual_queue_info.items(): - assert queue_name in worker_queue_map.values() - assert queue_info == {"consumers": 1, "jobs": 0} - - def test_get_running_queues(self, launch_workers: "Fixture", worker_queue_map: Dict[str, str]): # noqa: F821 - """ - Test the get_running_queues function with queues active. - This should return a list of active queues. - - :param `launch_workers`: A pytest fixture that launches celery workers for us to interact with - :param `worker_queue_map`: A pytest fixture that returns a dict of workers and queues - """ - result = celeryadapter.get_running_queues("merlin_test_app", test_mode=True) - assert sorted(result) == sorted(list(worker_queue_map.values())) - - def test_get_active_celery_queues( - self, celery_app: Celery, launch_workers: "Fixture", worker_queue_map: Dict[str, str] # noqa: F821 - ): - """ - Test the get_active_celery_queues function with queues active. - This should return a tuple where the first entry is a dict of queue info - and the second entry is a list of worker names. - - :param `celery_app`: A pytest fixture for the test Celery app - :param `launch_workers`: A pytest fixture that launches celery workers for us to interact with - :param `worker_queue_map`: A pytest fixture that returns a dict of workers and queues - """ - # Start the queues and run the test - queue_result, worker_result = celeryadapter.get_active_celery_queues(celery_app) - - # Ensure we got output before looping - assert len(queue_result) == len(worker_result) == 3 - - for worker, queue in worker_queue_map.items(): - # Check that the entry in the queue_result dict for this queue is correct - assert queue in queue_result - assert len(queue_result[queue]) == 1 - assert worker in queue_result[queue][0] - - # Remove this entry from the queue_result dict - del queue_result[queue] - - # Check that this worker was added to the worker_result list - worker_found = False - for worker_name in worker_result[:]: - if worker in worker_name: - worker_found = True - worker_result.remove(worker_name) - break - assert worker_found - - # Ensure there was no extra output that we weren't expecting - assert queue_result == {} - assert worker_result == [] - - def test_build_set_of_queues( - self, celery_app: Celery, launch_workers: "Fixture", worker_queue_map: Dict[str, str] # noqa: F821 - ): - """ - Test the build_set_of_queues function with queues active. - This should return a set of queues (the queues defined in setUp). - """ - # Run the test - result = celeryadapter.build_set_of_queues( - steps=["all"], spec=None, specific_queues=None, verbose=False, app=celery_app - ) - assert result == set(worker_queue_map.values()) - - @pytest.mark.order(index=1) - def test_check_celery_workers_processing_tasks( - self, - celery_app: Celery, - sleep_sig: Signature, - launch_workers: "Fixture", # noqa: F821 - ): - """ - Test the check_celery_workers_processing function with workers active and a task in a queue. - This function will query workers for any tasks they're still processing. We'll send a - a task that sleeps for 3 seconds to our workers before we run this test so that there should be - a task for this function to find. - - NOTE: the celery app fixture shows strange behavior when using app.control.inspect() calls (which - check_celery_workers_processing uses) so we have to run this test first in this class in order to - have it run properly. - - :param celery_app: A pytest fixture for the test Celery app - :param sleep_sig: A pytest fixture for a celery signature of a task that sleeps for 3 sec - :param launch_workers: A pytest fixture that launches celery workers for us to interact with - """ - # Our active workers/queues are test_worker_[0-2]/test_queue_[0-2] so we're - # sending this to test_queue_0 for test_worker_0 to process - queue_for_signature = "test_queue_0" - sleep_sig.set(queue=queue_for_signature) - result = sleep_sig.delay() - - # We need to give the task we just sent to the server a second to get picked up by the worker - sleep(1) - - # Run the test now that the task should be getting processed - active_queue_test = celeryadapter.check_celery_workers_processing([queue_for_signature], celery_app) - assert active_queue_test is True - - # Now test that a queue without any tasks returns false - # We sent the signature to task_queue_0 so task_queue_1 shouldn't have any tasks to find - non_active_queue_test = celeryadapter.check_celery_workers_processing(["test_queue_1"], celery_app) - assert non_active_queue_test is False - - # Wait for the worker to finish running the task - result.get() +# from time import sleep +# import pytest +# from celery.canvas import Signature +# @pytest.mark.order(before="TestInactive") +# class TestActive: +# """ +# This class will test functions in the celeryadapter.py module. +# It will run tests where we need active queues/workers to interact with. + +# NOTE: The tests in this class must be ran before the TestInactive class or else the +# Celery workers needed for this class don't start + +# TODO: fix the bug noted above and then check if we still need pytest-order +# """ + +# def test_query_celery_queues( +# self, celery_app: Celery, launch_workers: "Fixture", worker_queue_map: Dict[str, str] # noqa: F821 +# ): +# """ +# Test the query_celery_queues function by providing it with a list of active queues. +# This should return a dict where keys are queue names and values are more dicts containing +# the number of jobs and consumers in that queue. + +# :param `celery_app`: A pytest fixture for the test Celery app +# :param launch_workers: A pytest fixture that launches celery workers for us to interact with +# :param worker_queue_map: A pytest fixture that returns a dict of workers and queues +# """ +# # Set up a dummy configuration to use in the test +# dummy_config = Config({"broker": {"name": "redis"}}) + +# # Get the actual output +# queues_to_query = list(worker_queue_map.values()) +# actual_queue_info = celeryadapter.query_celery_queues(queues_to_query, app=celery_app, config=dummy_config) + +# # Ensure all 3 queues in worker_queue_map were queried before looping +# assert len(actual_queue_info) == 3 + +# # Ensure each queue has a worker attached +# for queue_name, queue_info in actual_queue_info.items(): +# assert queue_name in worker_queue_map.values() +# assert queue_info == {"consumers": 1, "jobs": 0} + +# def test_get_running_queues(self, launch_workers: "Fixture", worker_queue_map: Dict[str, str]): # noqa: F821 +# """ +# Test the get_running_queues function with queues active. +# This should return a list of active queues. + +# :param `launch_workers`: A pytest fixture that launches celery workers for us to interact with +# :param `worker_queue_map`: A pytest fixture that returns a dict of workers and queues +# """ +# result = celeryadapter.get_running_queues("merlin_test_app", test_mode=True) +# assert sorted(result) == sorted(list(worker_queue_map.values())) + +# def test_get_active_celery_queues( +# self, celery_app: Celery, launch_workers: "Fixture", worker_queue_map: Dict[str, str] # noqa: F821 +# ): +# """ +# Test the get_active_celery_queues function with queues active. +# This should return a tuple where the first entry is a dict of queue info +# and the second entry is a list of worker names. + +# :param `celery_app`: A pytest fixture for the test Celery app +# :param `launch_workers`: A pytest fixture that launches celery workers for us to interact with +# :param `worker_queue_map`: A pytest fixture that returns a dict of workers and queues +# """ +# # Start the queues and run the test +# queue_result, worker_result = celeryadapter.get_active_celery_queues(celery_app) + +# # Ensure we got output before looping +# assert len(queue_result) == len(worker_result) == 3 + +# for worker, queue in worker_queue_map.items(): +# # Check that the entry in the queue_result dict for this queue is correct +# assert queue in queue_result +# assert len(queue_result[queue]) == 1 +# assert worker in queue_result[queue][0] + +# # Remove this entry from the queue_result dict +# del queue_result[queue] + +# # Check that this worker was added to the worker_result list +# worker_found = False +# for worker_name in worker_result[:]: +# if worker in worker_name: +# worker_found = True +# worker_result.remove(worker_name) +# break +# assert worker_found + +# # Ensure there was no extra output that we weren't expecting +# assert queue_result == {} +# assert worker_result == [] + +# def test_build_set_of_queues( +# self, celery_app: Celery, launch_workers: "Fixture", worker_queue_map: Dict[str, str] # noqa: F821 +# ): +# """ +# Test the build_set_of_queues function with queues active. +# This should return a set of queues (the queues defined in setUp). +# """ +# # Run the test +# result = celeryadapter.build_set_of_queues( +# steps=["all"], spec=None, specific_queues=None, verbose=False, app=celery_app +# ) +# assert result == set(worker_queue_map.values()) + +# @pytest.mark.order(index=1) +# def test_check_celery_workers_processing_tasks( +# self, +# celery_app: Celery, +# sleep_sig: Signature, +# launch_workers: "Fixture", # noqa: F821 +# ): +# """ +# Test the check_celery_workers_processing function with workers active and a task in a queue. +# This function will query workers for any tasks they're still processing. We'll send a +# a task that sleeps for 3 seconds to our workers before we run this test so that there should be +# a task for this function to find. + +# NOTE: the celery app fixture shows strange behavior when using app.control.inspect() calls (which +# check_celery_workers_processing uses) so we have to run this test first in this class in order to +# have it run properly. + +# :param celery_app: A pytest fixture for the test Celery app +# :param sleep_sig: A pytest fixture for a celery signature of a task that sleeps for 3 sec +# :param launch_workers: A pytest fixture that launches celery workers for us to interact with +# """ +# # Our active workers/queues are test_worker_[0-2]/test_queue_[0-2] so we're +# # sending this to test_queue_0 for test_worker_0 to process +# queue_for_signature = "test_queue_0" +# sleep_sig.set(queue=queue_for_signature) +# result = sleep_sig.delay() + +# # We need to give the task we just sent to the server a second to get picked up by the worker +# sleep(1) + +# # Run the test now that the task should be getting processed +# active_queue_test = celeryadapter.check_celery_workers_processing([queue_for_signature], celery_app) +# assert active_queue_test is True + +# # Now test that a queue without any tasks returns false +# # We sent the signature to task_queue_0 so task_queue_1 shouldn't have any tasks to find +# non_active_queue_test = celeryadapter.check_celery_workers_processing(["test_queue_1"], celery_app) +# assert non_active_queue_test is False + +# # Wait for the worker to finish running the task +# result.get() class TestInactive: diff --git a/tests/integration/test_specs/multiple_workers.yaml b/tests/integration/test_specs/multiple_workers.yaml new file mode 100644 index 000000000..967582a53 --- /dev/null +++ b/tests/integration/test_specs/multiple_workers.yaml @@ -0,0 +1,56 @@ +description: + name: multiple_workers + description: a very simple merlin workflow with multiple workers + +global.parameters: + GREET: + values : ["hello","hola"] + label : GREET.%% + WORLD: + values : ["world","mundo"] + label : WORLD.%% + +study: + - name: step_1 + description: say hello + run: + cmd: | + echo "$(GREET), $(WORLD)!" + task_queue: hello_queue + + - name: step_2 + description: step 2 + run: + cmd: | + echo "step_2" + depends: [step_1_*] + task_queue: echo_queue + + - name: step_3 + description: stop workers + run: + cmd: | + echo "stop workers" + depends: [step_2] + task_queue: other_queue + + - name: step_4 + description: another step + run: + cmd: | + echo "another step" + depends: [step_3] + task_queue: other_queue + +merlin: + resources: + workers: + step_1_merlin_test_worker: + args: -l INFO --concurrency 1 + steps: [step_1] + step_2_merlin_test_worker: + args: -l INFO --concurrency 1 + steps: [step_2] + other_merlin_test_worker: + args: -l INFO --concurrency 1 + steps: [step_3, step_4]