diff --git a/README.md b/README.md index 2673b4f59..425ad4be7 100644 --- a/README.md +++ b/README.md @@ -248,7 +248,6 @@ The following table maps each Scaler command to its corresponding section name i | `scaler_ui` | `[webui]` | | `scaler_top` | `[top]` | | `scaler_worker_manager_baremetal_native` | `[native_worker_manager]` | -| `scaler_worker_manager_baremetal_fixed_native` | `[fixed_native_worker_manager]` | | `scaler_worker_manager_symphony` | `[symphony_worker_manager]` | ### Practical Scenarios & Examples diff --git a/docs/source/index.rst b/docs/source/index.rst index 9034bfadc..318e67504 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -29,7 +29,6 @@ Content tutorials/scaling tutorials/worker_manager_adapter/index tutorials/worker_manager_adapter/native - tutorials/worker_manager_adapter/fixed_native tutorials/worker_manager_adapter/aws_hpc/index tutorials/worker_manager_adapter/common_parameters tutorials/compatibility/ray diff --git a/docs/source/tutorials/configuration.rst b/docs/source/tutorials/configuration.rst index 562dbe37e..7e4614fb4 100644 --- a/docs/source/tutorials/configuration.rst +++ b/docs/source/tutorials/configuration.rst @@ -195,8 +195,6 @@ The following table maps each Scaler command to its corresponding section name i - ``[top]`` * - ``scaler_worker_manager_baremetal_native`` - ``[native_worker_manager]`` - * - ``scaler_worker_manager_baremetal_fixed_native`` - - ``[fixed_native_worker_manager]`` * - ``scaler_worker_manager_symphony`` - ``[symphony_worker_manager]`` * - ``scaler_worker_manager_aws_raw_ecs`` diff --git a/docs/source/tutorials/worker_manager_adapter/fixed_native.rst b/docs/source/tutorials/worker_manager_adapter/fixed_native.rst deleted file mode 100644 index faf15d53e..000000000 --- a/docs/source/tutorials/worker_manager_adapter/fixed_native.rst +++ /dev/null @@ -1,63 +0,0 @@ -Fixed Native Worker Manager -=========================== - -The Fixed Native worker manager spawns a fixed number of worker subprocesses at startup. Unlike other worker managers, it does **not** support dynamic scaling. It is useful for environments where you want a static pool of workers to be available immediately and do not want the system to dynamically adjust the number of processes. - -Getting Started ---------------- - -To start the Fixed Native worker manager, use the ``scaler_worker_manager_baremetal_fixed_native`` command. - -Example command: - -.. code-block:: bash - - scaler_worker_manager_baremetal_fixed_native tcp://:8516 \ - --max-workers 8 \ - --logging-level INFO \ - --task-timeout-seconds 60 - -Equivalent configuration using a TOML file: - -.. code-block:: bash - - scaler_worker_manager_baremetal_fixed_native tcp://:8516 --config config.toml - -.. code-block:: toml - - # config.toml - - [fixed_native_worker_manager] - max_workers = 8 - logging_level = "INFO" - task_timeout_seconds = 60 - -* ``tcp://:8516`` is the address workers will use to connect to the scheduler. -* The manager will immediately spawn 8 worker subprocesses at startup and maintain them. - -How it Works ------------- - -Upon startup, the Fixed Native worker manager spawns the number of workers specified by ``--max-workers``. It reports its capacity to the scheduler as 0 to prevent the scheduler from attempting to scale it up or down dynamically. - -If a worker process terminates, the manager does not automatically restart it (in the current implementation). - -Supported Parameters --------------------- - -.. note:: - For more details on how to configure Scaler, see the :doc:`../configuration` section. - -The Fixed Native worker manager supports the following specific configuration parameters in addition to the common worker manager parameters. - -Fixed Native Configuration -~~~~~~~~~~~~~~~~~~~~~~~~~~ - -* ``--max-workers`` (``-mw``): The exact number of worker subprocesses to spawn at startup. Must be a non-negative integer. -* ``--preload``: Python module or script to preload in each worker process before it starts accepting tasks. -* ``--worker-io-threads`` (``-wit``): Number of IO threads for the IO backend per worker (default: ``1``). - -Common Parameters -~~~~~~~~~~~~~~~~~ - -For a full list of common parameters including networking, worker configuration, and logging, see :doc:`common_parameters`. diff --git a/docs/source/tutorials/worker_manager_adapter/index.rst b/docs/source/tutorials/worker_manager_adapter/index.rst index 644c479da..45d095a6e 100644 --- a/docs/source/tutorials/worker_manager_adapter/index.rst +++ b/docs/source/tutorials/worker_manager_adapter/index.rst @@ -34,12 +34,9 @@ Scaler provides several worker managers to support different execution environme Native ~~~~~~ -The :doc:`Native ` worker manager allows Scaler to dynamically provision workers as local subprocesses on the same machine. It is the simplest way to scale workloads across multiple CPU cores locally and supports dynamic auto-scaling. - -Fixed Native -~~~~~~~~~~~~ - -The :doc:`Fixed Native ` worker manager spawns a static number of worker subprocesses at startup and does not support dynamic scaling. It is the underlying component used by the high-level ``SchedulerClusterCombo`` class. +The :doc:`Native ` worker manager provisions workers as local subprocesses on the same machine. +It supports both dynamic auto-scaling (default) and fixed-pool mode, where a set number of workers +are pre-spawned at startup. AWS HPC ~~~~~~~ @@ -55,6 +52,5 @@ All worker managers share a set of :doc:`common configuration parameters :8516`` is the address workers will use to connect to the scheduler. -* The manager can spawn up to 4 worker subprocesses. +* The manager can spawn up to 4 worker subprocesses in dynamic mode. + +To use fixed-pool mode, set ``--mode fixed`` and specify the exact number of workers: + +.. code-block:: toml + + # config.toml + + [native_worker_manager] + mode = "fixed" + max_workers = 8 How it Works ------------ -When the scheduler determines that more capacity is needed, it sends a request to the Native worker manager. The manager then spawns a new worker process using the same Python interpreter and environment that started the manager. - -Each worker group managed by the Native manager contains exactly one worker process. +**Dynamic mode** (default): when the scheduler determines that more capacity is needed, it sends a request to the Native worker manager. The manager then spawns a new worker process using the same Python interpreter and environment that started the manager. Each worker group managed by the Native manager contains exactly one worker process. -Unlike the Fixed Native worker manager, which spawns a static number of workers at startup, the Native worker manager is designed to be used with Scaler's auto-scaling features to dynamically grow and shrink the local worker pool based on demand. +**Fixed mode**: all workers are pre-spawned at startup. The manager runs a simple synchronous loop with no event loop or scheduler connector — it spawns the workers and waits for them to finish. Workers connect directly to the scheduler themselves. When all pre-spawned workers have exited, the manager itself exits. Supported Parameters -------------------- @@ -55,7 +63,13 @@ The Native worker manager supports the following specific configuration paramete Native Configuration ~~~~~~~~~~~~~~~~~~~~ -* ``--max-workers`` (``-mw``): Maximum number of worker subprocesses that can be started. Set to ``-1`` for no limit (default: ``-1``). +* ``--mode``: Operating mode. ``dynamic`` (default) enables auto-scaling driven by the scheduler. + ``fixed`` pre-spawns ``--max-workers`` workers at startup and does not support dynamic scaling. + In fixed mode ``--max-workers`` must be a positive integer. +* ``--worker-type``: Optional string prefix used in worker IDs. Overrides the default prefix (``NAT`` + for dynamic mode, ``FIX`` for fixed mode). Useful when multiple adapters of the same mode are + running concurrently and their workers need to be distinguishable by type in logs and monitoring. +* ``--max-workers`` (``-mw``): In dynamic mode, the maximum number of worker subprocesses that can be started (``-1`` = unlimited, default: ``-1``). In fixed mode, the exact number of workers spawned at startup (must be ≥ 1). * ``--preload``: Python module or script to preload in each worker process before it starts accepting tasks. * ``--worker-io-threads`` (``-wit``): Number of IO threads for the IO backend per worker (default: ``1``). diff --git a/examples/task_capabilities.py b/examples/task_capabilities.py index 9a21193ad..05bb69f98 100644 --- a/examples/task_capabilities.py +++ b/examples/task_capabilities.py @@ -5,16 +5,17 @@ """ import math +import multiprocessing from scaler import Client from scaler.cluster.combo import SchedulerClusterCombo from scaler.config.common.logging import LoggingConfig from scaler.config.common.worker import WorkerConfig from scaler.config.common.worker_manager import WorkerManagerConfig -from scaler.config.section.fixed_native_worker_manager import FixedNativeWorkerManagerConfig +from scaler.config.section.native_worker_manager import NativeWorkerManagerConfig, NativeWorkerManagerMode from scaler.config.section.scheduler import PolicyConfig from scaler.config.types.worker import WorkerCapabilities -from scaler.worker_manager_adapter.baremetal.fixed_native import FixedNativeWorkerManager +from scaler.worker_manager_adapter.baremetal.native import NativeWorkerManager def gpu_task(x: float) -> float: @@ -39,14 +40,16 @@ def main(): # Adds an additional worker with GPU support base_manager = cluster._worker_manager - gpu_manager = FixedNativeWorkerManager( - FixedNativeWorkerManagerConfig( + gpu_manager = NativeWorkerManager( + NativeWorkerManagerConfig( worker_manager_config=WorkerManagerConfig( - scheduler_address=base_manager._address, object_storage_address=None, max_workers=1 + scheduler_address=base_manager._address, + object_storage_address=base_manager._object_storage_address, + max_workers=1, ), - preload=None, + mode=NativeWorkerManagerMode.FIXED, event_loop=base_manager._event_loop, - worker_io_threads=1, + worker_io_threads=base_manager._io_threads, worker_config=WorkerConfig( per_worker_capabilities=WorkerCapabilities({"gpu": -1}), per_worker_task_queue_size=base_manager._task_queue_size, @@ -64,7 +67,8 @@ def main(): ), ) ) - gpu_manager.start() + gpu_manager_process = multiprocessing.Process(target=gpu_manager.run) + gpu_manager_process.start() with Client(address=cluster.get_address()) as client: print("Submitting tasks...") @@ -83,7 +87,9 @@ def main(): gpu_future.result() cpu_future.result() - gpu_manager.shutdown() + if gpu_manager_process.is_alive(): + gpu_manager_process.terminate() + gpu_manager_process.join() cluster.shutdown() diff --git a/pyproject.toml b/pyproject.toml index d90c44ac4..1296dd3f6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -89,7 +89,6 @@ scaler_top = "scaler.entry_points.top:main" scaler_ui = "scaler.entry_points.webui:main" scaler_object_storage_server = "scaler.entry_points.object_storage_server:main" scaler_worker_manager_baremetal_native = "scaler.entry_points.worker_manager_baremetal_native:main" -scaler_worker_manager_baremetal_fixed_native = "scaler.entry_points.worker_manager_baremetal_fixed_native:main" scaler_worker_manager_symphony = "scaler.entry_points.worker_manager_symphony:main" scaler_worker_manager_aws_raw_ecs = "scaler.entry_points.worker_manager_aws_raw_ecs:main" scaler_worker_manager_aws_hpc_batch = "scaler.entry_points.worker_manager_aws_hpc_batch:main" diff --git a/src/run_worker_manager_baremetal_fixed_native.py b/src/run_worker_manager_baremetal_fixed_native.py deleted file mode 100644 index 34c7caa8a..000000000 --- a/src/run_worker_manager_baremetal_fixed_native.py +++ /dev/null @@ -1,5 +0,0 @@ -from scaler.entry_points.worker_manager_baremetal_fixed_native import main -from scaler.utility.debug import pdb_wrapped - -if __name__ == "__main__": - pdb_wrapped(main)() diff --git a/src/scaler/cluster/combo.py b/src/scaler/cluster/combo.py index 183cd7f03..142000525 100644 --- a/src/scaler/cluster/combo.py +++ b/src/scaler/cluster/combo.py @@ -1,4 +1,5 @@ import logging +import multiprocessing from typing import Dict, Optional, Tuple from scaler.cluster.object_storage_server import ObjectStorageServerProcess @@ -24,13 +25,13 @@ DEFAULT_WORKER_DEATH_TIMEOUT, DEFAULT_WORKER_TIMEOUT_SECONDS, ) -from scaler.config.section.fixed_native_worker_manager import FixedNativeWorkerManagerConfig +from scaler.config.section.native_worker_manager import NativeWorkerManagerConfig, NativeWorkerManagerMode from scaler.config.section.scheduler import PolicyConfig from scaler.config.types.object_storage_server import ObjectStorageAddressConfig from scaler.config.types.worker import WorkerCapabilities from scaler.config.types.zmq import ZMQConfig from scaler.utility.network_util import get_available_tcp_port -from scaler.worker_manager_adapter.baremetal.fixed_native import FixedNativeWorkerManager +from scaler.worker_manager_adapter.baremetal.native import NativeWorkerManager class SchedulerClusterCombo: @@ -89,8 +90,8 @@ def __init__( self._object_storage.start() self._object_storage.wait_until_ready() # object storage should be ready before starting the cluster - self._worker_manager = FixedNativeWorkerManager( - FixedNativeWorkerManagerConfig( + self._worker_manager = NativeWorkerManager( + NativeWorkerManagerConfig( worker_manager_config=WorkerManagerConfig( scheduler_address=self._address, object_storage_address=self._object_storage_address, @@ -99,6 +100,7 @@ def __init__( preload=None, event_loop=event_loop, worker_io_threads=worker_io_threads, + mode=NativeWorkerManagerMode.FIXED, worker_config=WorkerConfig( per_worker_capabilities=WorkerCapabilities(per_worker_capabilities or {}), per_worker_task_queue_size=per_worker_task_queue_size, @@ -113,6 +115,8 @@ def __init__( ) ) + self._worker_manager_process = multiprocessing.Process(target=self._worker_manager.run) + self._scheduler = SchedulerProcess( address=self._address, object_storage_address=self._object_storage_address, @@ -132,8 +136,8 @@ def __init__( policy=scaler_policy, ) - self._worker_manager.start() self._scheduler.start() + self._worker_manager_process.start() logging.info(f"{self.__get_prefix()} started") def __del__(self): @@ -144,7 +148,10 @@ def shutdown(self): self._shutdown_called = True logging.info(f"{self.__get_prefix()} shutdown") - self._worker_manager.shutdown() + if self._worker_manager_process.is_alive(): + self._worker_manager_process.terminate() + self._worker_manager_process.join() + self._scheduler.terminate() self._scheduler.join() diff --git a/src/scaler/config/section/fixed_native_worker_manager.py b/src/scaler/config/section/fixed_native_worker_manager.py deleted file mode 100644 index 93ea2ab8a..000000000 --- a/src/scaler/config/section/fixed_native_worker_manager.py +++ /dev/null @@ -1,38 +0,0 @@ -import argparse -import dataclasses -from typing import Optional - -from scaler.config import defaults -from scaler.config.common.logging import LoggingConfig -from scaler.config.common.worker import WorkerConfig -from scaler.config.common.worker_manager import WorkerManagerConfig -from scaler.config.config_class import ConfigClass -from scaler.utility.event_loop import EventLoopType - - -@dataclasses.dataclass -class FixedNativeWorkerManagerConfig(ConfigClass): - worker_manager_config: WorkerManagerConfig - preload: Optional[str] = None - worker_config: WorkerConfig = dataclasses.field(default_factory=WorkerConfig) - logging_config: LoggingConfig = dataclasses.field(default_factory=LoggingConfig) - event_loop: str = dataclasses.field( - default="builtin", - metadata=dict(short="-el", choices=EventLoopType.allowed_types(), help="select the event loop type"), - ) - - worker_io_threads: int = dataclasses.field( - default=defaults.DEFAULT_IO_THREADS, - metadata=dict(short="-wit", help="set the number of io threads for io backend per worker"), - ) - - @classmethod - def configure_parser(cls, parser) -> None: - super().configure_parser(parser) - parser.add_argument("-n", "--num-of-workers", dest="max_workers", type=int, help=argparse.SUPPRESS) - - def __post_init__(self) -> None: - if self.worker_io_threads <= 0: - raise ValueError("worker_io_threads must be a positive integer.") - if self.worker_manager_config.max_workers < 0: - raise ValueError("max_workers must be >=0 for fixed native worker manager") diff --git a/src/scaler/config/section/native_worker_manager.py b/src/scaler/config/section/native_worker_manager.py index 2aeca2daa..883f12e8a 100644 --- a/src/scaler/config/section/native_worker_manager.py +++ b/src/scaler/config/section/native_worker_manager.py @@ -1,4 +1,6 @@ +import argparse import dataclasses +import enum from typing import Optional from scaler.config import defaults @@ -9,6 +11,11 @@ from scaler.utility.event_loop import EventLoopType +class NativeWorkerManagerMode(enum.Enum): + DYNAMIC = "dynamic" + FIXED = "fixed" + + @dataclasses.dataclass class NativeWorkerManagerConfig(ConfigClass): worker_manager_config: WorkerManagerConfig @@ -25,6 +32,26 @@ class NativeWorkerManagerConfig(ConfigClass): metadata=dict(short="-wit", help="set the number of io threads for io backend per worker"), ) + mode: NativeWorkerManagerMode = dataclasses.field( + default=NativeWorkerManagerMode.DYNAMIC, + metadata=dict( + type=NativeWorkerManagerMode, + help="operating mode: 'dynamic' for auto-scaling driven by scheduler, 'fixed' for pre-spawned workers", + ), + ) + + worker_type: Optional[str] = dataclasses.field( + default=None, + metadata=dict(help="worker type prefix used in worker IDs; defaults to 'FIX' or 'NAT' based on mode"), + ) + + @classmethod + def configure_parser(cls, parser: argparse.ArgumentParser) -> None: + super().configure_parser(parser) + parser.add_argument("-n", "--num-of-workers", dest="max_workers", type=int, help=argparse.SUPPRESS) + def __post_init__(self) -> None: if self.worker_io_threads <= 0: raise ValueError("worker_io_threads must be a positive integer.") + if self.mode == NativeWorkerManagerMode.FIXED and self.worker_manager_config.max_workers < 0: + raise ValueError("max_workers must be >= 0 for fixed mode") diff --git a/src/scaler/entry_points/cluster.py b/src/scaler/entry_points/cluster.py index 38a6a83f7..849d135e7 100644 --- a/src/scaler/entry_points/cluster.py +++ b/src/scaler/entry_points/cluster.py @@ -1,8 +1,13 @@ -from scaler.entry_points.worker_manager_baremetal_fixed_native import main as _main +import dataclasses +from scaler.config.section.native_worker_manager import NativeWorkerManagerConfig, NativeWorkerManagerMode +from scaler.worker_manager_adapter.baremetal.native import NativeWorkerManager -def main(): - _main(section="cluster") + +def main() -> None: + config = NativeWorkerManagerConfig.parse("Scaler Cluster", "cluster") + config = dataclasses.replace(config, mode=NativeWorkerManagerMode.FIXED) + NativeWorkerManager(config).run() __all__ = ["main"] diff --git a/src/scaler/entry_points/worker_manager_baremetal_fixed_native.py b/src/scaler/entry_points/worker_manager_baremetal_fixed_native.py deleted file mode 100644 index 02f6e0c3a..000000000 --- a/src/scaler/entry_points/worker_manager_baremetal_fixed_native.py +++ /dev/null @@ -1,33 +0,0 @@ -import signal - -from scaler.config.section.fixed_native_worker_manager import FixedNativeWorkerManagerConfig -from scaler.utility.event_loop import register_event_loop -from scaler.utility.logging.utility import setup_logger -from scaler.worker_manager_adapter.baremetal.fixed_native import FixedNativeWorkerManager - - -def main(section: str = "fixed_native_worker_manager"): - fixed_native_manager_config = FixedNativeWorkerManagerConfig.parse("Scaler Fixed Native Worker Manager", section) - - register_event_loop(fixed_native_manager_config.event_loop) - - setup_logger( - fixed_native_manager_config.logging_config.paths, - fixed_native_manager_config.logging_config.config_file, - fixed_native_manager_config.logging_config.level, - ) - - fixed_native_worker_manager = FixedNativeWorkerManager(fixed_native_manager_config) - - def handle_signal(signum, frame): - fixed_native_worker_manager.shutdown() - - signal.signal(signal.SIGINT, handle_signal) - signal.signal(signal.SIGTERM, handle_signal) - - fixed_native_worker_manager.start() - fixed_native_worker_manager.join() - - -if __name__ == "__main__": - main() diff --git a/src/scaler/version.txt b/src/scaler/version.txt index 815d5ca06..398935591 100644 --- a/src/scaler/version.txt +++ b/src/scaler/version.txt @@ -1 +1 @@ -1.19.0 +1.20.0 diff --git a/src/scaler/worker_manager_adapter/aws_raw/ecs.py b/src/scaler/worker_manager_adapter/aws_raw/ecs.py index 351ba4ef3..4f58edd81 100644 --- a/src/scaler/worker_manager_adapter/aws_raw/ecs.py +++ b/src/scaler/worker_manager_adapter/aws_raw/ecs.py @@ -30,7 +30,7 @@ @dataclass class WorkerGroupInfo: - task_arn: str + task_arn: str # sufficient to identify the group for stop_task(); no worker ID tracking needed class ECSWorkerManager: @@ -225,6 +225,7 @@ async def start_worker_group(self) -> Tuple[WorkerGroupID, Status]: command = ( f"scaler_cluster {self._address.to_address()} " + f"--worker-type ECS " f"--max-workers {self._ecs_task_cpu} " f"--per-worker-task-queue-size {self._per_worker_task_queue_size} " f"--heartbeat-interval-seconds {self._heartbeat_interval_seconds} " diff --git a/src/scaler/worker_manager_adapter/baremetal/fixed_native.py b/src/scaler/worker_manager_adapter/baremetal/fixed_native.py deleted file mode 100644 index de8ba4519..000000000 --- a/src/scaler/worker_manager_adapter/baremetal/fixed_native.py +++ /dev/null @@ -1,77 +0,0 @@ -import uuid -from typing import Dict - -from scaler.config.section.fixed_native_worker_manager import FixedNativeWorkerManagerConfig -from scaler.utility.identifiers import WorkerID -from scaler.worker.worker import Worker -from scaler.worker_manager_adapter.common import WorkerGroupNotFoundError - - -class FixedNativeWorkerManager: - def __init__(self, config: FixedNativeWorkerManagerConfig): - self._address = config.worker_manager_config.scheduler_address - self._object_storage_address = config.worker_manager_config.object_storage_address - self._capabilities = config.worker_config.per_worker_capabilities.capabilities - self._io_threads = config.worker_io_threads - self._task_queue_size = config.worker_config.per_worker_task_queue_size - self._max_workers = config.worker_manager_config.max_workers - self._heartbeat_interval_seconds = config.worker_config.heartbeat_interval_seconds - self._task_timeout_seconds = config.worker_config.task_timeout_seconds - self._death_timeout_seconds = config.worker_config.death_timeout_seconds - self._garbage_collect_interval_seconds = config.worker_config.garbage_collect_interval_seconds - self._trim_memory_threshold_bytes = config.worker_config.trim_memory_threshold_bytes - self._hard_processor_suspend = config.worker_config.hard_processor_suspend - self._event_loop = config.event_loop - self._logging_paths = config.logging_config.paths - self._logging_level = config.logging_config.level - self._logging_config_file = config.logging_config.config_file - self._preload = config.preload - - self._workers: Dict[WorkerID, Worker] = {} - - def _spawn_worker(self): - worker = Worker( - name=f"FIX|{uuid.uuid4().hex}", - address=self._address, - object_storage_address=self._object_storage_address, - preload=self._preload, - capabilities=self._capabilities, - io_threads=self._io_threads, - task_queue_size=self._task_queue_size, - heartbeat_interval_seconds=self._heartbeat_interval_seconds, - task_timeout_seconds=self._task_timeout_seconds, - death_timeout_seconds=self._death_timeout_seconds, - garbage_collect_interval_seconds=self._garbage_collect_interval_seconds, - trim_memory_threshold_bytes=self._trim_memory_threshold_bytes, - hard_processor_suspend=self._hard_processor_suspend, - event_loop=self._event_loop, - logging_paths=self._logging_paths, - logging_level=self._logging_level, - ) - worker.start() - self._workers[worker.identity] = worker - - def _shutdown_worker(self, worker_id: WorkerID): - if worker_id not in self._workers: - raise WorkerGroupNotFoundError(f"Worker with ID {worker_id!r} not found.") - - worker = self._workers[worker_id] - worker.terminate() - worker.join() - del self._workers[worker_id] - - def start(self): - for _ in range(self._max_workers): - self._spawn_worker() - - def shutdown(self): - for worker_id in list(self._workers.keys()): - self._shutdown_worker(worker_id) - - def join(self): - """Wait for all workers to finish.""" - - # this specific adapter cannot dynamically spawn workers - # therefore we just wait for all existing workers to finish - for worker in list(self._workers.values()): - worker.join() diff --git a/src/scaler/worker_manager_adapter/baremetal/native.py b/src/scaler/worker_manager_adapter/baremetal/native.py index e45feedf7..d27c590af 100644 --- a/src/scaler/worker_manager_adapter/baremetal/native.py +++ b/src/scaler/worker_manager_adapter/baremetal/native.py @@ -3,12 +3,13 @@ import os import signal import uuid -from typing import Dict, Tuple +from typing import Any, Dict, Optional, Tuple import zmq -from scaler.config.section.native_worker_manager import NativeWorkerManagerConfig +from scaler.config.section.native_worker_manager import NativeWorkerManagerConfig, NativeWorkerManagerMode from scaler.io import uv_ymq +from scaler.io.mixins import AsyncConnector from scaler.io.utility import create_async_connector, create_async_simple_context from scaler.io.ymq import ymq from scaler.protocol.python.message import ( @@ -49,11 +50,34 @@ def __init__(self, config: NativeWorkerManagerConfig): self._logging_config_file = config.logging_config.config_file self._preload = config.preload self._workers_per_group = 1 + self._mode = config.mode + + if config.worker_type is not None: + self._worker_prefix = config.worker_type + elif self._mode == NativeWorkerManagerMode.FIXED: + self._worker_prefix = "FIX" + elif self._mode == NativeWorkerManagerMode.DYNAMIC: + self._worker_prefix = "NAT" + else: + raise ValueError(f"worker_type is not set and mode is unrecognised: {self._mode!r}") - self._context = create_async_simple_context() + """ + Although a worker group can contain multiple workers, in this native adapter implementation, + each worker group will only contain one worker. + """ + self._worker_groups: Dict[WorkerGroupID, Dict[WorkerID, Worker]] = {} + + # ZMQ setup is deferred to _setup_zmq(), called at the start of run(). + # This keeps the object picklable so callers can do Process(target=adapter.run).start(). + self._context: Optional[Any] = None + self._connector_external: Optional[AsyncConnector] = None + self._ident: Optional[bytes] = None + + def _setup_zmq(self) -> None: self._name = "worker_manager_native" self._ident = f"{self._name}|{uuid.uuid4().bytes.hex()}".encode() + self._context = create_async_simple_context() self._connector_external = create_async_connector( self._context, name="worker_manager_native", @@ -64,11 +88,32 @@ def __init__(self, config: NativeWorkerManagerConfig): identity=self._ident, ) - """ - Although a worker group can contain multiple workers, in this native adapter implementation, - each worker group will only contain one worker. - """ - self._worker_groups: Dict[WorkerGroupID, Dict[WorkerID, Worker]] = {} + def _create_worker(self) -> Worker: + return Worker( + name=f"{self._worker_prefix}|{uuid.uuid4().hex}", + address=self._address, + object_storage_address=self._object_storage_address, + preload=self._preload, + capabilities=self._capabilities, + io_threads=self._io_threads, + task_queue_size=self._task_queue_size, + heartbeat_interval_seconds=self._heartbeat_interval_seconds, + task_timeout_seconds=self._task_timeout_seconds, + death_timeout_seconds=self._death_timeout_seconds, + garbage_collect_interval_seconds=self._garbage_collect_interval_seconds, + trim_memory_threshold_bytes=self._trim_memory_threshold_bytes, + hard_processor_suspend=self._hard_processor_suspend, + event_loop=self._event_loop, + logging_paths=self._logging_paths, + logging_level=self._logging_level, + ) + + def _spawn_initial_workers(self) -> None: + for _ in range(self._max_workers): + worker = self._create_worker() + worker.start() + group_id = f"fixed-{uuid.uuid4().hex}".encode() + self._worker_groups[group_id] = {worker.identity: worker} async def __on_receive_external(self, message: Message): if isinstance(message, WorkerManagerCommand): @@ -107,25 +152,7 @@ async def start_worker_group(self) -> Tuple[WorkerGroupID, Status]: if num_of_workers >= self._max_workers != -1: return b"", Status.WorkerGroupTooMuch - worker = Worker( - name=f"NAT|{uuid.uuid4().hex}", - address=self._address, - object_storage_address=self._object_storage_address, - preload=self._preload, - capabilities=self._capabilities, - io_threads=self._io_threads, - task_queue_size=self._task_queue_size, - heartbeat_interval_seconds=self._heartbeat_interval_seconds, - task_timeout_seconds=self._task_timeout_seconds, - death_timeout_seconds=self._death_timeout_seconds, - garbage_collect_interval_seconds=self._garbage_collect_interval_seconds, - trim_memory_threshold_bytes=self._trim_memory_threshold_bytes, - hard_processor_suspend=self._hard_processor_suspend, - event_loop=self._event_loop, - logging_paths=self._logging_paths, - logging_level=self._logging_level, - ) - + worker = self._create_worker() worker.start() worker_group_id = f"native-{uuid.uuid4().hex}".encode() self._worker_groups[worker_group_id] = {worker.identity: worker} @@ -148,10 +175,35 @@ async def shutdown_worker_group(self, worker_group_id: WorkerGroupID) -> Status: return Status.Success def run(self) -> None: + if self._mode == NativeWorkerManagerMode.FIXED: + self._run_fixed() + return + + # DYNAMIC mode + self._setup_zmq() self._loop = asyncio.new_event_loop() run_task_forever(self._loop, self._run(), cleanup_callback=self._cleanup) - def _cleanup(self): + def _run_fixed(self) -> None: + setup_logger(self._logging_paths, self._logging_config_file, self._logging_level) + register_event_loop(self._event_loop) + self._spawn_initial_workers() + + def _on_signal(sig: int, frame: object) -> None: + logging.info("NativeWorkerManager (FIXED): received signal %d, terminating workers", sig) + for group in self._worker_groups.values(): + for worker in group.values(): + if worker.is_alive(): + worker.terminate() + + signal.signal(signal.SIGTERM, _on_signal) + signal.signal(signal.SIGINT, _on_signal) + + for group in self._worker_groups.values(): + for worker in group.values(): + worker.join() + + def _cleanup(self) -> None: if self._connector_external is not None: self._connector_external.destroy() @@ -171,7 +223,7 @@ async def _run(self) -> None: self.__register_signal() await self._task - async def __send_heartbeat(self): + async def __send_heartbeat(self) -> None: await self._connector_external.send( WorkerManagerHeartbeat.new_msg( max_worker_groups=self._max_workers, @@ -181,7 +233,7 @@ async def __send_heartbeat(self): ) ) - async def __get_loops(self): + async def __get_loops(self) -> None: loops = [ create_async_loop_routine(self._connector_external.routine, 0), create_async_loop_routine(self.__send_heartbeat, self._heartbeat_interval_seconds), diff --git a/tests/client/test_client.py b/tests/client/test_client.py index 60c78b0d4..b76b8c10b 100644 --- a/tests/client/test_client.py +++ b/tests/client/test_client.py @@ -1,4 +1,5 @@ import functools +import multiprocessing import os import random import tempfile @@ -10,13 +11,13 @@ from scaler.config.common.logging import LoggingConfig from scaler.config.common.worker import WorkerConfig from scaler.config.common.worker_manager import WorkerManagerConfig -from scaler.config.section.fixed_native_worker_manager import FixedNativeWorkerManagerConfig +from scaler.config.section.native_worker_manager import NativeWorkerManagerConfig, NativeWorkerManagerMode from scaler.config.types.worker import WorkerCapabilities from scaler.utility.exceptions import MissingObjects, ProcessorDiedError from scaler.utility.logging.scoped_logger import ScopedLogger from scaler.utility.logging.utility import setup_logger from scaler.worker.preload import PreloadSpecError, _parse_preload_spec, execute_preload -from scaler.worker_manager_adapter.baremetal.fixed_native import FixedNativeWorkerManager +from scaler.worker_manager_adapter.baremetal.native import NativeWorkerManager from tests.utility.utility import logging_test_name @@ -361,8 +362,8 @@ def tearDown(self) -> None: def _create_preload_cluster(self, preload: str, logging_paths: tuple = ("/dev/stdout",)): base_manager = self.combo._worker_manager - preload_manager = FixedNativeWorkerManager( - FixedNativeWorkerManagerConfig( + preload_manager = NativeWorkerManager( + NativeWorkerManagerConfig( worker_manager_config=WorkerManagerConfig( scheduler_address=self.combo._address, object_storage_address=self.combo._object_storage_address, @@ -371,6 +372,7 @@ def _create_preload_cluster(self, preload: str, logging_paths: tuple = ("/dev/st preload=preload, event_loop=base_manager._event_loop, worker_io_threads=base_manager._io_threads, + mode=NativeWorkerManagerMode.FIXED, worker_config=WorkerConfig( per_worker_capabilities=WorkerCapabilities({}), per_worker_task_queue_size=base_manager._task_queue_size, @@ -388,15 +390,15 @@ def _create_preload_cluster(self, preload: str, logging_paths: tuple = ("/dev/st ), ) ) - return preload_manager + return multiprocessing.Process(target=preload_manager.run) def test_preload_success(self): - preload_cluster = self._create_preload_cluster( + preload_process = self._create_preload_cluster( preload="tests.utility.utility:setup_global_value('test_preload_value')" ) try: - preload_cluster.start() + preload_process.start() time.sleep(2) with Client(self.combo.get_address()) as client: @@ -407,7 +409,8 @@ def test_preload_success(self): # Verify the preloaded value is accessible self.assertEqual(result, "test_preload_value") finally: - preload_cluster.shutdown() + preload_process.terminate() + preload_process.join() def test_preload_failure(self): # For checking if the failure was logged, Processor will create log_path-{pid} @@ -417,12 +420,12 @@ def test_preload_failure(self): log_basename = os.path.basename(log_path) try: - preload_cluster = self._create_preload_cluster( + preload_process = self._create_preload_cluster( preload="tests.utility.utility:failing_preload()", logging_paths=(log_path,) ) try: - preload_cluster.start() + preload_process.start() time.sleep(10) # Find processor log files by looking for files with PID suffixes @@ -438,7 +441,8 @@ def test_preload_failure(self): # If we reach here without any other exceptions, the test is successful finally: - preload_cluster.shutdown() + preload_process.terminate() + preload_process.join() finally: # Clean up log files try: diff --git a/tests/cluster/test_cluster_disconnect.py b/tests/cluster/test_cluster_disconnect.py index 700edaa5b..73635f27f 100644 --- a/tests/cluster/test_cluster_disconnect.py +++ b/tests/cluster/test_cluster_disconnect.py @@ -1,3 +1,4 @@ +import multiprocessing import time import unittest from concurrent.futures import CancelledError @@ -7,10 +8,10 @@ from scaler.config.common.worker import WorkerConfig from scaler.config.common.worker_manager import WorkerManagerConfig from scaler.config.defaults import DEFAULT_LOGGING_PATHS -from scaler.config.section.fixed_native_worker_manager import FixedNativeWorkerManagerConfig +from scaler.config.section.native_worker_manager import NativeWorkerManagerConfig, NativeWorkerManagerMode from scaler.config.types.worker import WorkerCapabilities from scaler.utility.logging.utility import setup_logger -from scaler.worker_manager_adapter.baremetal.fixed_native import FixedNativeWorkerManager +from scaler.worker_manager_adapter.baremetal.native import NativeWorkerManager from tests.utility.utility import logging_test_name @@ -32,8 +33,8 @@ def tearDown(self) -> None: def test_cluster_disconnect(self): base_manager = self.combo._worker_manager - dying_manager = FixedNativeWorkerManager( - FixedNativeWorkerManagerConfig( + dying_manager = NativeWorkerManager( + NativeWorkerManagerConfig( worker_manager_config=WorkerManagerConfig( scheduler_address=self.combo._address, object_storage_address=self.combo._object_storage_address, @@ -42,6 +43,7 @@ def test_cluster_disconnect(self): preload=None, event_loop=base_manager._event_loop, worker_io_threads=base_manager._io_threads, + mode=NativeWorkerManagerMode.FIXED, worker_config=WorkerConfig( per_worker_capabilities=WorkerCapabilities({}), per_worker_task_queue_size=base_manager._task_queue_size, @@ -59,12 +61,14 @@ def test_cluster_disconnect(self): ), ) ) - dying_manager.start() + dying_process = multiprocessing.Process(target=dying_manager.run) + dying_process.start() client = Client(self.address) future_result = client.submit(noop_sleep, 5) time.sleep(2) - dying_manager.shutdown() + dying_process.terminate() + dying_process.join() with self.assertRaises(CancelledError): client.clear() diff --git a/tests/core/test_death_timeout.py b/tests/core/test_death_timeout.py index ccee18973..6ef779b4d 100644 --- a/tests/core/test_death_timeout.py +++ b/tests/core/test_death_timeout.py @@ -1,4 +1,5 @@ import logging +import multiprocessing import time import unittest @@ -16,12 +17,12 @@ DEFAULT_TASK_TIMEOUT_SECONDS, DEFAULT_TRIM_MEMORY_THRESHOLD_BYTES, ) -from scaler.config.section.fixed_native_worker_manager import FixedNativeWorkerManagerConfig +from scaler.config.section.native_worker_manager import NativeWorkerManagerConfig, NativeWorkerManagerMode from scaler.config.types.worker import WorkerCapabilities from scaler.config.types.zmq import ZMQConfig from scaler.utility.logging.utility import setup_logger from scaler.utility.network_util import get_available_tcp_port -from scaler.worker_manager_adapter.baremetal.fixed_native import FixedNativeWorkerManager +from scaler.worker_manager_adapter.baremetal.native import NativeWorkerManager from tests.utility.utility import logging_test_name # This is a manual test because it can loop infinitely if it fails @@ -35,8 +36,8 @@ def setUp(self) -> None: def test_no_scheduler(self): logging.info("test with no scheduler") # Test 1: Spinning up a cluster with no scheduler. Death timeout should apply - manager = FixedNativeWorkerManager( - FixedNativeWorkerManagerConfig( + manager = NativeWorkerManager( + NativeWorkerManagerConfig( worker_manager_config=WorkerManagerConfig( scheduler_address=ZMQConfig.from_string(f"tcp://127.0.0.1:{get_available_tcp_port()}"), object_storage_address=None, @@ -45,6 +46,7 @@ def test_no_scheduler(self): preload=None, event_loop="builtin", worker_io_threads=DEFAULT_IO_THREADS, + mode=NativeWorkerManagerMode.FIXED, worker_config=WorkerConfig( per_worker_capabilities=WorkerCapabilities({}), per_worker_task_queue_size=DEFAULT_PER_WORKER_QUEUE_SIZE, @@ -60,8 +62,9 @@ def test_no_scheduler(self): ), ) ) - manager.start() - manager.join() + process = multiprocessing.Process(target=manager.run) + process.start() + process.join() def test_shutdown(self): logging.info("test with explicitly shutdown") diff --git a/tests/scheduler/test_balance.py b/tests/scheduler/test_balance.py index f54b69922..d8861d609 100644 --- a/tests/scheduler/test_balance.py +++ b/tests/scheduler/test_balance.py @@ -1,3 +1,4 @@ +import multiprocessing import os import time import unittest @@ -7,11 +8,11 @@ from scaler.config.common.worker import WorkerConfig from scaler.config.common.worker_manager import WorkerManagerConfig from scaler.config.defaults import DEFAULT_LOAD_BALANCE_SECONDS -from scaler.config.section.fixed_native_worker_manager import FixedNativeWorkerManagerConfig +from scaler.config.section.native_worker_manager import NativeWorkerManagerConfig, NativeWorkerManagerMode from scaler.config.types.worker import WorkerCapabilities from scaler.utility.logging.utility import setup_logger from scaler.utility.network_util import get_available_tcp_port -from scaler.worker_manager_adapter.baremetal.fixed_native import FixedNativeWorkerManager +from scaler.worker_manager_adapter.baremetal.native import NativeWorkerManager from tests.utility.utility import logging_test_name @@ -49,14 +50,15 @@ def test_balance(self): time.sleep(3) base_manager = combo._worker_manager - new_manager = FixedNativeWorkerManager( - FixedNativeWorkerManagerConfig( + new_manager = NativeWorkerManager( + NativeWorkerManagerConfig( worker_manager_config=WorkerManagerConfig( scheduler_address=base_manager._address, object_storage_address=None, max_workers=N_WORKERS - 1 ), preload=None, event_loop=base_manager._event_loop, worker_io_threads=1, + mode=NativeWorkerManagerMode.FIXED, worker_config=WorkerConfig( per_worker_capabilities=WorkerCapabilities({}), per_worker_task_queue_size=base_manager._task_queue_size, @@ -74,7 +76,8 @@ def test_balance(self): ), ) ) - new_manager.start() + process = multiprocessing.Process(target=new_manager.run) + process.start() pids = {f.result() for f in futures} @@ -82,5 +85,6 @@ def test_balance(self): client.disconnect() - new_manager.shutdown() + process.terminate() + process.join() combo.shutdown() diff --git a/tests/scheduler/test_capabilities.py b/tests/scheduler/test_capabilities.py index 74d4d3c47..826e98e9a 100644 --- a/tests/scheduler/test_capabilities.py +++ b/tests/scheduler/test_capabilities.py @@ -1,3 +1,4 @@ +import multiprocessing import unittest from concurrent.futures import TimeoutError @@ -5,11 +6,11 @@ from scaler.config.common.logging import LoggingConfig from scaler.config.common.worker import WorkerConfig from scaler.config.common.worker_manager import WorkerManagerConfig -from scaler.config.section.fixed_native_worker_manager import FixedNativeWorkerManagerConfig +from scaler.config.section.native_worker_manager import NativeWorkerManagerConfig, NativeWorkerManagerMode from scaler.config.section.scheduler import PolicyConfig from scaler.config.types.worker import WorkerCapabilities from scaler.utility.logging.utility import setup_logger -from scaler.worker_manager_adapter.baremetal.fixed_native import FixedNativeWorkerManager +from scaler.worker_manager_adapter.baremetal.native import NativeWorkerManager from tests.utility.utility import logging_test_name @@ -41,14 +42,15 @@ def test_capabilities(self): future.result(timeout=1) # Connects a worker that can handle the task - gpu_manager = FixedNativeWorkerManager( - FixedNativeWorkerManagerConfig( + gpu_manager = NativeWorkerManager( + NativeWorkerManagerConfig( worker_manager_config=WorkerManagerConfig( scheduler_address=base_manager._address, object_storage_address=None, max_workers=1 ), preload=None, event_loop=base_manager._event_loop, worker_io_threads=1, + mode=NativeWorkerManagerMode.FIXED, worker_config=WorkerConfig( per_worker_capabilities=WorkerCapabilities({"gpu": -1}), per_worker_task_queue_size=base_manager._task_queue_size, @@ -66,11 +68,13 @@ def test_capabilities(self): ), ) ) - gpu_manager.start() + gpu_process = multiprocessing.Process(target=gpu_manager.run) + gpu_process.start() self.assertEqual(future.result(), 3.0) - gpu_manager.shutdown() + gpu_process.terminate() + gpu_process.join() def test_graph_capabilities(self): base_manager = self.combo._worker_manager @@ -86,14 +90,15 @@ def test_graph_capabilities(self): future.result(timeout=1) # Connect a worker that can handle the task - gpu_manager = FixedNativeWorkerManager( - FixedNativeWorkerManagerConfig( + gpu_manager = NativeWorkerManager( + NativeWorkerManagerConfig( worker_manager_config=WorkerManagerConfig( scheduler_address=base_manager._address, object_storage_address=None, max_workers=1 ), preload=None, event_loop=base_manager._event_loop, worker_io_threads=1, + mode=NativeWorkerManagerMode.FIXED, worker_config=WorkerConfig( per_worker_capabilities=WorkerCapabilities({"gpu": -1}), per_worker_task_queue_size=base_manager._task_queue_size, @@ -111,8 +116,10 @@ def test_graph_capabilities(self): ), ) ) - gpu_manager.start() + gpu_process = multiprocessing.Process(target=gpu_manager.run) + gpu_process.start() self.assertEqual(future.result(), 8) - gpu_manager.shutdown() + gpu_process.terminate() + gpu_process.join()