Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,3 @@ CMakeFiles/
.vs/
src/scaler/protocol/capnp/*.c++
src/scaler/protocol/capnp/*.h

orb/logs/
orb/metrics/
2 changes: 1 addition & 1 deletion docs/source/tutorials/worker_adapters/orb.rst
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ Orb Template Configuration
* ``--subnet-id``: AWS subnet ID where the instances will be launched. If not provided, it attempts to discover the default subnet in the default VPC.
* ``--security-group-ids``: Comma-separated list of AWS security group IDs.
* ``--allowed-ip``: IP address to allow in the security group (if created automatically). Defaults to the adapter's external IP.
* ``--orb-config-path``: Path to the ORB root directory (default: ``src/scaler/drivers/orb``).
* ``--orb-config-path``: Path to the directory containing the ORB configuration files in `config/` (default: ``src/scaler/worker_adapter/drivers/orb_common``).

Common Parameters
~~~~~~~~~~~~~~~~~
Expand Down
2 changes: 1 addition & 1 deletion src/scaler/cluster/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from scaler.config.section.cluster import ClusterConfig
from scaler.config.section.fixed_native_worker_adapter import FixedNativeWorkerAdapterConfig
from scaler.utility.logging.utility import setup_logger
from scaler.worker_adapter.fixed_native import FixedNativeWorkerAdapter
from scaler.worker_adapter.drivers.fixed_native import FixedNativeWorkerAdapter


class Cluster(multiprocessing.get_context("spawn").Process): # type: ignore[misc]
Expand Down
2 changes: 1 addition & 1 deletion src/scaler/config/section/orb_worker_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class ORBWorkerAdapterConfig(ConfigClass):
)

orb_config_path: str = dataclasses.field(
default="src/scaler/drivers/orb", metadata=dict(help="Path to the ORB root directory")
default="src/scaler/worker_adapter/drivers/orb_common", metadata=dict(help="Path to the ORB root directory")
)

instance_type: str = dataclasses.field(default="t2.micro", metadata=dict(help="EC2 instance type"))
Expand Down
2 changes: 1 addition & 1 deletion src/scaler/entry_points/worker_adapter_ecs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from scaler.config.section.ecs_worker_adapter import ECSWorkerAdapterConfig
from scaler.utility.logging.utility import setup_logger
from scaler.worker_adapter.ecs import ECSWorkerAdapter
from scaler.worker_adapter.drivers.ecs import ECSWorkerAdapter


def main():
Expand Down
2 changes: 1 addition & 1 deletion src/scaler/entry_points/worker_adapter_fixed_native.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from scaler.config.section.fixed_native_worker_adapter import FixedNativeWorkerAdapterConfig
from scaler.utility.event_loop import register_event_loop
from scaler.utility.logging.utility import setup_logger
from scaler.worker_adapter.fixed_native import FixedNativeWorkerAdapter
from scaler.worker_adapter.drivers.fixed_native import FixedNativeWorkerAdapter


def main():
Expand Down
2 changes: 1 addition & 1 deletion src/scaler/entry_points/worker_adapter_native.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from scaler.config.section.native_worker_adapter import NativeWorkerAdapterConfig
from scaler.utility.logging.utility import setup_logger
from scaler.worker_adapter.native import NativeWorkerAdapter
from scaler.worker_adapter.drivers.native import NativeWorkerAdapter


def main():
Expand Down
2 changes: 1 addition & 1 deletion src/scaler/entry_points/worker_adapter_orb.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from scaler.config.section.orb_worker_adapter import ORBWorkerAdapterConfig
from scaler.utility.logging.utility import setup_logger
from scaler.worker_adapter.orb.worker_adapter import ORBWorkerAdapter
from scaler.worker_adapter.drivers.orb_common.worker_adapter import ORBWorkerAdapter


def main():
Expand Down
2 changes: 1 addition & 1 deletion src/scaler/entry_points/worker_adapter_symphony.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from scaler.config.section.symphony_worker_adapter import SymphonyWorkerConfig
from scaler.utility.logging.utility import setup_logger
from scaler.worker_adapter.symphony.worker_adapter import SymphonyWorkerAdapter
from scaler.worker_adapter.drivers.symphony.worker_adapter import SymphonyWorkerAdapter


def main():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from scaler.utility.event_loop import create_async_loop_routine, register_event_loop
from scaler.utility.identifiers import WorkerID
from scaler.utility.logging.utility import setup_logger
from scaler.worker_adapter.common import WorkerGroupID, format_capabilities
from scaler.worker_adapter.drivers.common import WorkerGroupID, format_capabilities

Status = WorkerAdapterCommandResponse.Status

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from scaler.config.section.fixed_native_worker_adapter import FixedNativeWorkerAdapterConfig
from scaler.utility.identifiers import WorkerID
from scaler.worker.worker import Worker
from scaler.worker_adapter.common import WorkerGroupNotFoundError
from scaler.worker_adapter.drivers.common import WorkerGroupNotFoundError


class FixedNativeWorkerAdapter:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from scaler.utility.identifiers import WorkerID
from scaler.utility.logging.utility import setup_logger
from scaler.worker.worker import Worker
from scaler.worker_adapter.common import WorkerGroupID
from scaler.worker_adapter.drivers.common import WorkerGroupID

Status = WorkerAdapterCommandResponse.Status

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from typing import Any, Dict, List, Optional

from scaler.utility.formatter import snakecase_dict
from scaler.worker_adapter.orb.exception import ORBException
from scaler.worker_adapter.orb.types import ORBMachine, ORBRequest, ORBTemplate
from scaler.worker_adapter.drivers.orb_common.exception import ORBException
from scaler.worker_adapter.drivers.orb_common.types import ORBMachine, ORBRequest, ORBTemplate


class ORBHelper:
Expand Down Expand Up @@ -178,14 +178,18 @@ def __init__(self, config_root_path: str):
self._temp_dir = tempfile.TemporaryDirectory()
self._cwd = self._temp_dir.name

config_src = path.join(config_root_path, "config")
config_dst = path.join(self._cwd, "config")

shutil.copytree(
config_root_path,
self._cwd,
config_src,
config_dst,
dirs_exist_ok=True,
ignore=shutil.ignore_patterns(
".git", ".venv", ".mypy_cache", "build*", "dist", "__pycache__", "metrics", "logs"
),
)

os.makedirs(path.join(self._cwd, "logs"), exist_ok=True)

self.templates = self.Templates(self)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@
from scaler.utility.event_loop import create_async_loop_routine, register_event_loop, run_task_forever
from scaler.utility.identifiers import WorkerID
from scaler.utility.logging.utility import setup_logger
from scaler.worker_adapter.common import WorkerGroupID, format_capabilities
from scaler.worker_adapter.orb.helper import ORBHelper
from scaler.worker_adapter.orb.types import ORBTemplate
from scaler.worker_adapter.drivers.common import WorkerGroupID
from scaler.worker_adapter.drivers.orb_common.helper import ORBHelper
from scaler.worker_adapter.drivers.orb_common.types import ORBTemplate
from scaler.worker_adapter.drivers.orb_ec2.worker_setup import create_user_data, get_orb_worker_name

Status = WorkerAdapterCommandResponse.Status
logger = logging.getLogger(__name__)
Expand All @@ -38,17 +39,6 @@
ORB_MAX_POLLING_ATTEMPTS = 60


def get_orb_worker_name(instance_id: str) -> str:
"""
Returns the deterministic worker name for an ORB instance.
If instance_id is the bash variable '${INSTANCE_ID}', it returns a bash-compatible string.
"""
if instance_id == "${INSTANCE_ID}":
return "Worker|ORB|${INSTANCE_ID}|${INSTANCE_ID//i-/}"
tag = instance_id.replace("i-", "")
return f"Worker|ORB|{instance_id}|{tag}"


class ORBWorkerAdapter:
_config: ORBWorkerAdapterConfig
_orb: ORBHelper
Expand Down Expand Up @@ -106,7 +96,12 @@ def __init__(self, config: ORBWorkerAdapterConfig):
self._create_key_pair()
key_name = self._created_key_name

user_data = self._create_user_data()
user_data = create_user_data(
worker_config=self._config.worker_config,
adapter_config=self._config.worker_adapter_config,
event_loop=self._config.event_loop,
worker_io_threads=self._config.worker_io_threads,
)
user_data_file_path = os.path.join(self._orb.cwd, "config", "user_data.sh")
with open(user_data_file_path, "w") as f:
f.write(user_data)
Expand Down Expand Up @@ -237,53 +232,6 @@ async def __get_loops(self):
else:
logging.exception(f"{self._ident!r}: failed with unhandled exception:\n{e}")

def _create_user_data(self) -> str:
worker_config = self._config.worker_config
adapter_config = self._config.worker_adapter_config

# We assume 1 worker per machine for ORB
# TODO: Add support for multiple workers per machine if needed
num_workers = 1

# Build the command
# We construct the full WorkerID here so it's deterministic and matches what the adapter calculates
# We fetch instance_id once and use it to construct the ID
script = f"""#!/bin/bash
INSTANCE_ID=$(ec2-metadata --instance-id --quiet)
WORKER_NAME="{get_orb_worker_name('${INSTANCE_ID}')}"

nohup /usr/local/bin/scaler_cluster {adapter_config.scheduler_address.to_address()} \
--num-of-workers {num_workers} \
--worker-names "${{WORKER_NAME}}" \
--per-worker-task-queue-size {worker_config.per_worker_task_queue_size} \
--heartbeat-interval-seconds {worker_config.heartbeat_interval_seconds} \
--task-timeout-seconds {worker_config.task_timeout_seconds} \
--garbage-collect-interval-seconds {worker_config.garbage_collect_interval_seconds} \
--death-timeout-seconds {worker_config.death_timeout_seconds} \
--trim-memory-threshold-bytes {worker_config.trim_memory_threshold_bytes} \
--event-loop {self._config.event_loop} \
--worker-io-threads {self._config.worker_io_threads} \
--deterministic-worker-ids"""

if worker_config.hard_processor_suspend:
script += " \
--hard-processor-suspend"

if adapter_config.object_storage_address:
script += f" \
--object-storage-address {adapter_config.object_storage_address.to_string()}"

capabilities = worker_config.per_worker_capabilities.capabilities
if capabilities:
cap_str = format_capabilities(capabilities)
if cap_str.strip():
script += f" \
--per-worker-capabilities {cap_str}"

script += " > /var/log/opengris-scaler.log 2>&1 &\n"

return script

def _discover_default_subnet(self) -> str:
vpcs = self._ec2.describe_vpcs(Filters=[{"Name": "isDefault", "Values": ["true"]}])
if not vpcs["Vpcs"]:
Expand Down
Empty file.
61 changes: 61 additions & 0 deletions src/scaler/worker_adapter/drivers/orb_ec2/worker_setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from scaler.config.common.worker import WorkerConfig
from scaler.config.common.worker_adapter import WorkerAdapterConfig
from scaler.worker_adapter.drivers.common import format_capabilities


def get_orb_worker_name(instance_id: str) -> str:
"""
Returns the deterministic worker name for an ORB instance.
If instance_id is the bash variable '${INSTANCE_ID}', it returns a bash-compatible string.
"""
if instance_id == "${INSTANCE_ID}":
return "Worker|ORB|${INSTANCE_ID}|${INSTANCE_ID//i-/}"
tag = instance_id.replace("i-", "")
return f"Worker|ORB|{instance_id}|{tag}"


def create_user_data(
worker_config: WorkerConfig, adapter_config: WorkerAdapterConfig, event_loop: str, worker_io_threads: int
) -> str:
# We assume 1 worker per machine for ORB
# TODO: Add support for multiple workers per machine if needed
num_workers = 1

# Build the command
# We construct the full WorkerID here so it's deterministic and matches what the adapter calculates
# We fetch instance_id once and use it to construct the ID
script = f"""#!/bin/bash
INSTANCE_ID=$(ec2-metadata --instance-id --quiet)
WORKER_NAME="{get_orb_worker_name('${INSTANCE_ID}')}"

nohup /usr/local/bin/scaler_cluster {adapter_config.scheduler_address.to_address()} \
--num-of-workers {num_workers} \
--worker-names "${{WORKER_NAME}}" \
--per-worker-task-queue-size {worker_config.per_worker_task_queue_size} \
--heartbeat-interval-seconds {worker_config.heartbeat_interval_seconds} \
--task-timeout-seconds {worker_config.task_timeout_seconds} \
--garbage-collect-interval-seconds {worker_config.garbage_collect_interval_seconds} \
--death-timeout-seconds {worker_config.death_timeout_seconds} \
--trim-memory-threshold-bytes {worker_config.trim_memory_threshold_bytes} \
--event-loop {event_loop} \
--worker-io-threads {worker_io_threads} \
--deterministic-worker-ids"""

if worker_config.hard_processor_suspend:
script += " \
--hard-processor-suspend"

if adapter_config.object_storage_address:
script += f" \
--object-storage-address {adapter_config.object_storage_address.to_string()}"

capabilities = worker_config.per_worker_capabilities.capabilities
if capabilities:
cap_str = format_capabilities(capabilities)
if cap_str.strip():
script += f" \
--per-worker-capabilities {cap_str}"

script += " > /var/log/opengris-scaler.log 2>&1 &"

return script
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import cloudpickle

from scaler.worker_adapter.symphony.message import SoamMessage
from scaler.worker_adapter.drivers.symphony.message import SoamMessage

try:
import soamapi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from scaler.protocol.python.status import Resource
from scaler.utility.mixins import Looper
from scaler.worker.agent.mixins import HeartbeatManager, TimeoutManager
from scaler.worker_adapter.symphony.task_manager import SymphonyTaskManager
from scaler.worker_adapter.drivers.symphony.task_manager import SymphonyTaskManager


class SymphonyHeartbeatManager(Looper, HeartbeatManager):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
from scaler.utility.queues.async_priority_queue import AsyncPriorityQueue
from scaler.utility.serialization import serialize_failure
from scaler.worker.agent.mixins import HeartbeatManager, TaskManager
from scaler.worker_adapter.symphony.callback import SessionCallback
from scaler.worker_adapter.symphony.message import SoamMessage
from scaler.worker_adapter.drivers.symphony.callback import SessionCallback
from scaler.worker_adapter.drivers.symphony.message import SoamMessage

try:
import soamapi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
from scaler.utility.identifiers import WorkerID
from scaler.utility.logging.utility import setup_logger
from scaler.worker.agent.timeout_manager import VanillaTimeoutManager
from scaler.worker_adapter.symphony.heartbeat_manager import SymphonyHeartbeatManager
from scaler.worker_adapter.symphony.task_manager import SymphonyTaskManager
from scaler.worker_adapter.drivers.symphony.heartbeat_manager import SymphonyHeartbeatManager
from scaler.worker_adapter.drivers.symphony.task_manager import SymphonyTaskManager


class SymphonyWorker(multiprocessing.get_context("spawn").Process): # type: ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
from scaler.utility.event_loop import create_async_loop_routine, register_event_loop
from scaler.utility.identifiers import WorkerID
from scaler.utility.logging.utility import setup_logger
from scaler.worker_adapter.common import WorkerGroupID
from scaler.worker_adapter.symphony.worker import SymphonyWorker
from scaler.worker_adapter.drivers.common import WorkerGroupID
from scaler.worker_adapter.drivers.symphony.worker import SymphonyWorker

Status = WorkerAdapterCommandResponse.Status

Expand Down
2 changes: 1 addition & 1 deletion tests/scheduler/test_scaling.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
from scaler.utility.identifiers import ClientID, ObjectID, TaskID, WorkerID
from scaler.utility.logging.utility import setup_logger
from scaler.utility.network_util import get_available_tcp_port
from scaler.worker_adapter.native import NativeWorkerAdapter
from scaler.worker_adapter.drivers.native import NativeWorkerAdapter
from tests.utility.utility import logging_test_name


Expand Down
Loading