Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ The following table maps each Scaler command to its corresponding section name i
| `scaler_ui` | `[webui]` |
| `scaler_top` | `[top]` |
| `scaler_worker_manager_baremetal_native` | `[native_worker_manager]` |
| `scaler_worker_manager_baremetal_fixed_native` | `[fixed_native_worker_manager]` |
| `scaler_worker_manager_symphony` | `[symphony_worker_manager]` |

### Practical Scenarios & Examples
Expand Down
1 change: 0 additions & 1 deletion docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ Content
tutorials/scaling
tutorials/worker_manager_adapter/index
tutorials/worker_manager_adapter/native
tutorials/worker_manager_adapter/fixed_native
tutorials/worker_manager_adapter/aws_hpc/index
tutorials/worker_manager_adapter/common_parameters
tutorials/compatibility/ray
Expand Down
2 changes: 0 additions & 2 deletions docs/source/tutorials/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,6 @@ The following table maps each Scaler command to its corresponding section name i
- ``[top]``
* - ``scaler_worker_manager_baremetal_native``
- ``[native_worker_manager]``
* - ``scaler_worker_manager_baremetal_fixed_native``
- ``[fixed_native_worker_manager]``
* - ``scaler_worker_manager_symphony``
- ``[symphony_worker_manager]``
* - ``scaler_worker_manager_aws_raw_ecs``
Expand Down
63 changes: 0 additions & 63 deletions docs/source/tutorials/worker_manager_adapter/fixed_native.rst

This file was deleted.

10 changes: 3 additions & 7 deletions docs/source/tutorials/worker_manager_adapter/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,9 @@ Scaler provides several worker managers to support different execution environme
Native
~~~~~~

The :doc:`Native <native>` worker manager allows Scaler to dynamically provision workers as local subprocesses on the same machine. It is the simplest way to scale workloads across multiple CPU cores locally and supports dynamic auto-scaling.

Fixed Native
~~~~~~~~~~~~

The :doc:`Fixed Native <fixed_native>` worker manager spawns a static number of worker subprocesses at startup and does not support dynamic scaling. It is the underlying component used by the high-level ``SchedulerClusterCombo`` class.
The :doc:`Native <native>` worker manager provisions workers as local subprocesses on the same machine.
It supports both dynamic auto-scaling (default) and fixed-pool mode, where a set number of workers
are pre-spawned at startup.

AWS HPC
~~~~~~~
Expand All @@ -55,6 +52,5 @@ All worker managers share a set of :doc:`common configuration parameters <common
:hidden:

native
fixed_native
aws_hpc/index
common_parameters
28 changes: 21 additions & 7 deletions docs/source/tutorials/worker_manager_adapter/native.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Native Worker Manager
=====================

The Native worker manager allows Scaler to dynamically provision workers as local subprocesses on the same machine where the manager is running. This is the simplest way to scale Scaler workloads across multiple CPU cores on a single machine or a group of machines.
The Native worker manager provisions workers as local subprocesses on the same machine where the manager is running. It supports both dynamic auto-scaling (default) and fixed-pool mode, where a set number of workers are pre-spawned at startup.

Getting Started
---------------
Expand Down Expand Up @@ -33,16 +33,24 @@ Equivalent configuration using a TOML file:
task_timeout_seconds = 60

* ``tcp://<SCHEDULER_IP>:8516`` is the address workers will use to connect to the scheduler.
* The manager can spawn up to 4 worker subprocesses.
* The manager can spawn up to 4 worker subprocesses in dynamic mode.

To use fixed-pool mode, set ``--mode fixed`` and specify the exact number of workers:

.. code-block:: toml

# config.toml

[native_worker_manager]
mode = "fixed"
max_workers = 8

How it Works
------------

When the scheduler determines that more capacity is needed, it sends a request to the Native worker manager. The manager then spawns a new worker process using the same Python interpreter and environment that started the manager.

Each worker group managed by the Native manager contains exactly one worker process.
**Dynamic mode** (default): when the scheduler determines that more capacity is needed, it sends a request to the Native worker manager. The manager then spawns a new worker process using the same Python interpreter and environment that started the manager. Each worker group managed by the Native manager contains exactly one worker process.

Unlike the Fixed Native worker manager, which spawns a static number of workers at startup, the Native worker manager is designed to be used with Scaler's auto-scaling features to dynamically grow and shrink the local worker pool based on demand.
**Fixed mode**: all workers are pre-spawned at startup. The manager runs a simple synchronous loop with no event loop or scheduler connector — it spawns the workers and waits for them to finish. Workers connect directly to the scheduler themselves. When all pre-spawned workers have exited, the manager itself exits.

Supported Parameters
--------------------
Expand All @@ -55,7 +63,13 @@ The Native worker manager supports the following specific configuration paramete
Native Configuration
~~~~~~~~~~~~~~~~~~~~

* ``--max-workers`` (``-mw``): Maximum number of worker subprocesses that can be started. Set to ``-1`` for no limit (default: ``-1``).
* ``--mode``: Operating mode. ``dynamic`` (default) enables auto-scaling driven by the scheduler.
``fixed`` pre-spawns ``--max-workers`` workers at startup and does not support dynamic scaling.
In fixed mode ``--max-workers`` must be a positive integer.
* ``--worker-type``: Optional string prefix used in worker IDs. Overrides the default prefix (``NAT``
for dynamic mode, ``FIX`` for fixed mode). Useful when multiple adapters of the same mode are
running concurrently and their workers need to be distinguishable by type in logs and monitoring.
* ``--max-workers`` (``-mw``): In dynamic mode, the maximum number of worker subprocesses that can be started (``-1`` = unlimited, default: ``-1``). In fixed mode, the exact number of workers spawned at startup (must be ≥ 1).
* ``--preload``: Python module or script to preload in each worker process before it starts accepting tasks.
* ``--worker-io-threads`` (``-wit``): Number of IO threads for the IO backend per worker (default: ``1``).

Expand Down
24 changes: 15 additions & 9 deletions examples/task_capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@
"""

import math
import multiprocessing

from scaler import Client
from scaler.cluster.combo import SchedulerClusterCombo
from scaler.config.common.logging import LoggingConfig
from scaler.config.common.worker import WorkerConfig
from scaler.config.common.worker_manager import WorkerManagerConfig
from scaler.config.section.fixed_native_worker_manager import FixedNativeWorkerManagerConfig
from scaler.config.section.native_worker_manager import NativeWorkerManagerConfig, NativeWorkerManagerMode
from scaler.config.section.scheduler import PolicyConfig
from scaler.config.types.worker import WorkerCapabilities
from scaler.worker_manager_adapter.baremetal.fixed_native import FixedNativeWorkerManager
from scaler.worker_manager_adapter.baremetal.native import NativeWorkerManager


def gpu_task(x: float) -> float:
Expand All @@ -39,14 +40,16 @@ def main():

# Adds an additional worker with GPU support
base_manager = cluster._worker_manager
gpu_manager = FixedNativeWorkerManager(
FixedNativeWorkerManagerConfig(
gpu_manager = NativeWorkerManager(
NativeWorkerManagerConfig(
worker_manager_config=WorkerManagerConfig(
scheduler_address=base_manager._address, object_storage_address=None, max_workers=1
scheduler_address=base_manager._address,
object_storage_address=base_manager._object_storage_address,
max_workers=1,
),
preload=None,
mode=NativeWorkerManagerMode.FIXED,
event_loop=base_manager._event_loop,
worker_io_threads=1,
worker_io_threads=base_manager._io_threads,
worker_config=WorkerConfig(
per_worker_capabilities=WorkerCapabilities({"gpu": -1}),
per_worker_task_queue_size=base_manager._task_queue_size,
Expand All @@ -64,7 +67,8 @@ def main():
),
)
)
gpu_manager.start()
gpu_manager_process = multiprocessing.Process(target=gpu_manager.run)
gpu_manager_process.start()

with Client(address=cluster.get_address()) as client:
print("Submitting tasks...")
Expand All @@ -83,7 +87,9 @@ def main():
gpu_future.result()
cpu_future.result()

gpu_manager.shutdown()
if gpu_manager_process.is_alive():
gpu_manager_process.terminate()
gpu_manager_process.join()
cluster.shutdown()


Expand Down
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ scaler_top = "scaler.entry_points.top:main"
scaler_ui = "scaler.entry_points.webui:main"
scaler_object_storage_server = "scaler.entry_points.object_storage_server:main"
scaler_worker_manager_baremetal_native = "scaler.entry_points.worker_manager_baremetal_native:main"
scaler_worker_manager_baremetal_fixed_native = "scaler.entry_points.worker_manager_baremetal_fixed_native:main"
scaler_worker_manager_symphony = "scaler.entry_points.worker_manager_symphony:main"
scaler_worker_manager_aws_raw_ecs = "scaler.entry_points.worker_manager_aws_raw_ecs:main"
scaler_worker_manager_aws_hpc_batch = "scaler.entry_points.worker_manager_aws_hpc_batch:main"
Expand Down
5 changes: 0 additions & 5 deletions src/run_worker_manager_baremetal_fixed_native.py

This file was deleted.

19 changes: 13 additions & 6 deletions src/scaler/cluster/combo.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import multiprocessing
from typing import Dict, Optional, Tuple

from scaler.cluster.object_storage_server import ObjectStorageServerProcess
Expand All @@ -24,13 +25,13 @@
DEFAULT_WORKER_DEATH_TIMEOUT,
DEFAULT_WORKER_TIMEOUT_SECONDS,
)
from scaler.config.section.fixed_native_worker_manager import FixedNativeWorkerManagerConfig
from scaler.config.section.native_worker_manager import NativeWorkerManagerConfig, NativeWorkerManagerMode
from scaler.config.section.scheduler import PolicyConfig
from scaler.config.types.object_storage_server import ObjectStorageAddressConfig
from scaler.config.types.worker import WorkerCapabilities
from scaler.config.types.zmq import ZMQConfig
from scaler.utility.network_util import get_available_tcp_port
from scaler.worker_manager_adapter.baremetal.fixed_native import FixedNativeWorkerManager
from scaler.worker_manager_adapter.baremetal.native import NativeWorkerManager


class SchedulerClusterCombo:
Expand Down Expand Up @@ -89,8 +90,8 @@ def __init__(
self._object_storage.start()
self._object_storage.wait_until_ready() # object storage should be ready before starting the cluster

self._worker_manager = FixedNativeWorkerManager(
FixedNativeWorkerManagerConfig(
self._worker_manager = NativeWorkerManager(
NativeWorkerManagerConfig(
worker_manager_config=WorkerManagerConfig(
scheduler_address=self._address,
object_storage_address=self._object_storage_address,
Expand All @@ -99,6 +100,7 @@ def __init__(
preload=None,
event_loop=event_loop,
worker_io_threads=worker_io_threads,
mode=NativeWorkerManagerMode.FIXED,
worker_config=WorkerConfig(
per_worker_capabilities=WorkerCapabilities(per_worker_capabilities or {}),
per_worker_task_queue_size=per_worker_task_queue_size,
Expand All @@ -113,6 +115,8 @@ def __init__(
)
)

self._worker_manager_process = multiprocessing.Process(target=self._worker_manager.run)

self._scheduler = SchedulerProcess(
address=self._address,
object_storage_address=self._object_storage_address,
Expand All @@ -132,8 +136,8 @@ def __init__(
policy=scaler_policy,
)

self._worker_manager.start()
self._scheduler.start()
self._worker_manager_process.start()
logging.info(f"{self.__get_prefix()} started")

def __del__(self):
Expand All @@ -144,7 +148,10 @@ def shutdown(self):
self._shutdown_called = True

logging.info(f"{self.__get_prefix()} shutdown")
self._worker_manager.shutdown()
if self._worker_manager_process.is_alive():
self._worker_manager_process.terminate()
self._worker_manager_process.join()

self._scheduler.terminate()
self._scheduler.join()

Expand Down
38 changes: 0 additions & 38 deletions src/scaler/config/section/fixed_native_worker_manager.py

This file was deleted.

27 changes: 27 additions & 0 deletions src/scaler/config/section/native_worker_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import argparse
import dataclasses
import enum
from typing import Optional

from scaler.config import defaults
Expand All @@ -9,6 +11,11 @@
from scaler.utility.event_loop import EventLoopType


class NativeWorkerManagerMode(enum.Enum):
DYNAMIC = "dynamic"
FIXED = "fixed"


@dataclasses.dataclass
class NativeWorkerManagerConfig(ConfigClass):
worker_manager_config: WorkerManagerConfig
Expand All @@ -25,6 +32,26 @@ class NativeWorkerManagerConfig(ConfigClass):
metadata=dict(short="-wit", help="set the number of io threads for io backend per worker"),
)

mode: NativeWorkerManagerMode = dataclasses.field(
default=NativeWorkerManagerMode.DYNAMIC,
metadata=dict(
type=NativeWorkerManagerMode,
help="operating mode: 'dynamic' for auto-scaling driven by scheduler, 'fixed' for pre-spawned workers",
),
)

worker_type: Optional[str] = dataclasses.field(
default=None,
metadata=dict(help="worker type prefix used in worker IDs; defaults to 'FIX' or 'NAT' based on mode"),
)

@classmethod
def configure_parser(cls, parser: argparse.ArgumentParser) -> None:
super().configure_parser(parser)
parser.add_argument("-n", "--num-of-workers", dest="max_workers", type=int, help=argparse.SUPPRESS)

def __post_init__(self) -> None:
if self.worker_io_threads <= 0:
raise ValueError("worker_io_threads must be a positive integer.")
if self.mode == NativeWorkerManagerMode.FIXED and self.worker_manager_config.max_workers < 0:
raise ValueError("max_workers must be >= 0 for fixed mode")
Loading
Loading