diff --git a/.gitignore b/.gitignore index f06bb07cd..1d0bc0bd1 100644 --- a/.gitignore +++ b/.gitignore @@ -34,6 +34,3 @@ CMakeFiles/ .vs/ src/scaler/protocol/capnp/*.c++ src/scaler/protocol/capnp/*.h - -orb/logs/ -orb/metrics/ diff --git a/docs/source/tutorials/worker_adapters/orb.rst b/docs/source/tutorials/worker_adapters/orb.rst index b6c64e4fe..602cf518b 100644 --- a/docs/source/tutorials/worker_adapters/orb.rst +++ b/docs/source/tutorials/worker_adapters/orb.rst @@ -112,7 +112,7 @@ Orb Template Configuration * ``--subnet-id``: AWS subnet ID where the instances will be launched. If not provided, it attempts to discover the default subnet in the default VPC. * ``--security-group-ids``: Comma-separated list of AWS security group IDs. * ``--allowed-ip``: IP address to allow in the security group (if created automatically). Defaults to the adapter's external IP. -* ``--orb-config-path``: Path to the ORB root directory (default: ``src/scaler/drivers/orb``). +* ``--orb-config-path``: Path to the directory containing the ORB configuration files in `config/` (default: ``src/scaler/worker_adapter/drivers/orb_common``). Common Parameters ~~~~~~~~~~~~~~~~~ diff --git a/src/scaler/cluster/cluster.py b/src/scaler/cluster/cluster.py index c9b31f9db..91428ce43 100644 --- a/src/scaler/cluster/cluster.py +++ b/src/scaler/cluster/cluster.py @@ -7,7 +7,7 @@ from scaler.config.section.cluster import ClusterConfig from scaler.config.section.fixed_native_worker_adapter import FixedNativeWorkerAdapterConfig from scaler.utility.logging.utility import setup_logger -from scaler.worker_adapter.fixed_native import FixedNativeWorkerAdapter +from scaler.worker_adapter.drivers.fixed_native import FixedNativeWorkerAdapter class Cluster(multiprocessing.get_context("spawn").Process): # type: ignore[misc] diff --git a/src/scaler/config/section/orb_worker_adapter.py b/src/scaler/config/section/orb_worker_adapter.py index fe806a545..bc46389eb 100644 --- a/src/scaler/config/section/orb_worker_adapter.py +++ b/src/scaler/config/section/orb_worker_adapter.py @@ -37,7 +37,7 @@ class ORBWorkerAdapterConfig(ConfigClass): ) orb_config_path: str = dataclasses.field( - default="src/scaler/drivers/orb", metadata=dict(help="Path to the ORB root directory") + default="src/scaler/worker_adapter/drivers/orb_common", metadata=dict(help="Path to the ORB root directory") ) instance_type: str = dataclasses.field(default="t2.micro", metadata=dict(help="EC2 instance type")) diff --git a/src/scaler/entry_points/worker_adapter_ecs.py b/src/scaler/entry_points/worker_adapter_ecs.py index 69609c719..c4999bd2a 100644 --- a/src/scaler/entry_points/worker_adapter_ecs.py +++ b/src/scaler/entry_points/worker_adapter_ecs.py @@ -1,6 +1,6 @@ from scaler.config.section.ecs_worker_adapter import ECSWorkerAdapterConfig from scaler.utility.logging.utility import setup_logger -from scaler.worker_adapter.ecs import ECSWorkerAdapter +from scaler.worker_adapter.drivers.ecs import ECSWorkerAdapter def main(): diff --git a/src/scaler/entry_points/worker_adapter_fixed_native.py b/src/scaler/entry_points/worker_adapter_fixed_native.py index d657e7bbd..3fd2bb782 100644 --- a/src/scaler/entry_points/worker_adapter_fixed_native.py +++ b/src/scaler/entry_points/worker_adapter_fixed_native.py @@ -1,7 +1,7 @@ from scaler.config.section.fixed_native_worker_adapter import FixedNativeWorkerAdapterConfig from scaler.utility.event_loop import register_event_loop from scaler.utility.logging.utility import setup_logger -from scaler.worker_adapter.fixed_native import FixedNativeWorkerAdapter +from scaler.worker_adapter.drivers.fixed_native import FixedNativeWorkerAdapter def main(): diff --git a/src/scaler/entry_points/worker_adapter_native.py b/src/scaler/entry_points/worker_adapter_native.py index ae1b6fab2..430adb643 100644 --- a/src/scaler/entry_points/worker_adapter_native.py +++ b/src/scaler/entry_points/worker_adapter_native.py @@ -1,6 +1,6 @@ from scaler.config.section.native_worker_adapter import NativeWorkerAdapterConfig from scaler.utility.logging.utility import setup_logger -from scaler.worker_adapter.native import NativeWorkerAdapter +from scaler.worker_adapter.drivers.native import NativeWorkerAdapter def main(): diff --git a/src/scaler/entry_points/worker_adapter_orb.py b/src/scaler/entry_points/worker_adapter_orb.py index ffbf3ed14..c08d49836 100644 --- a/src/scaler/entry_points/worker_adapter_orb.py +++ b/src/scaler/entry_points/worker_adapter_orb.py @@ -1,6 +1,6 @@ from scaler.config.section.orb_worker_adapter import ORBWorkerAdapterConfig from scaler.utility.logging.utility import setup_logger -from scaler.worker_adapter.orb.worker_adapter import ORBWorkerAdapter +from scaler.worker_adapter.drivers.orb_common.worker_adapter import ORBWorkerAdapter def main(): diff --git a/src/scaler/entry_points/worker_adapter_symphony.py b/src/scaler/entry_points/worker_adapter_symphony.py index cf6658668..216b4558a 100644 --- a/src/scaler/entry_points/worker_adapter_symphony.py +++ b/src/scaler/entry_points/worker_adapter_symphony.py @@ -1,6 +1,6 @@ from scaler.config.section.symphony_worker_adapter import SymphonyWorkerConfig from scaler.utility.logging.utility import setup_logger -from scaler.worker_adapter.symphony.worker_adapter import SymphonyWorkerAdapter +from scaler.worker_adapter.drivers.symphony.worker_adapter import SymphonyWorkerAdapter def main(): diff --git a/src/scaler/worker_adapter/orb/__init__.py b/src/scaler/worker_adapter/drivers/__init__.py similarity index 100% rename from src/scaler/worker_adapter/orb/__init__.py rename to src/scaler/worker_adapter/drivers/__init__.py diff --git a/src/scaler/worker_adapter/common.py b/src/scaler/worker_adapter/drivers/common.py similarity index 100% rename from src/scaler/worker_adapter/common.py rename to src/scaler/worker_adapter/drivers/common.py diff --git a/src/scaler/worker_adapter/ecs.py b/src/scaler/worker_adapter/drivers/ecs.py similarity index 99% rename from src/scaler/worker_adapter/ecs.py rename to src/scaler/worker_adapter/drivers/ecs.py index 55497fd14..776475649 100644 --- a/src/scaler/worker_adapter/ecs.py +++ b/src/scaler/worker_adapter/drivers/ecs.py @@ -22,7 +22,7 @@ from scaler.utility.event_loop import create_async_loop_routine, register_event_loop from scaler.utility.identifiers import WorkerID from scaler.utility.logging.utility import setup_logger -from scaler.worker_adapter.common import WorkerGroupID, format_capabilities +from scaler.worker_adapter.drivers.common import WorkerGroupID, format_capabilities Status = WorkerAdapterCommandResponse.Status diff --git a/src/scaler/worker_adapter/fixed_native.py b/src/scaler/worker_adapter/drivers/fixed_native.py similarity index 97% rename from src/scaler/worker_adapter/fixed_native.py rename to src/scaler/worker_adapter/drivers/fixed_native.py index 9f39b35dd..17af24dce 100644 --- a/src/scaler/worker_adapter/fixed_native.py +++ b/src/scaler/worker_adapter/drivers/fixed_native.py @@ -4,7 +4,7 @@ from scaler.config.section.fixed_native_worker_adapter import FixedNativeWorkerAdapterConfig from scaler.utility.identifiers import WorkerID from scaler.worker.worker import Worker -from scaler.worker_adapter.common import WorkerGroupNotFoundError +from scaler.worker_adapter.drivers.common import WorkerGroupNotFoundError class FixedNativeWorkerAdapter: diff --git a/src/scaler/worker_adapter/native.py b/src/scaler/worker_adapter/drivers/native.py similarity index 99% rename from src/scaler/worker_adapter/native.py rename to src/scaler/worker_adapter/drivers/native.py index 416c169e4..000c7e2b0 100644 --- a/src/scaler/worker_adapter/native.py +++ b/src/scaler/worker_adapter/drivers/native.py @@ -22,7 +22,7 @@ from scaler.utility.identifiers import WorkerID from scaler.utility.logging.utility import setup_logger from scaler.worker.worker import Worker -from scaler.worker_adapter.common import WorkerGroupID +from scaler.worker_adapter.drivers.common import WorkerGroupID Status = WorkerAdapterCommandResponse.Status diff --git a/src/scaler/worker_adapter/symphony/__init__.py b/src/scaler/worker_adapter/drivers/orb_common/__init__.py similarity index 100% rename from src/scaler/worker_adapter/symphony/__init__.py rename to src/scaler/worker_adapter/drivers/orb_common/__init__.py diff --git a/src/scaler/drivers/orb/config/config.json b/src/scaler/worker_adapter/drivers/orb_common/config/config.json similarity index 100% rename from src/scaler/drivers/orb/config/config.json rename to src/scaler/worker_adapter/drivers/orb_common/config/config.json diff --git a/src/scaler/drivers/orb/config/default_config.json b/src/scaler/worker_adapter/drivers/orb_common/config/default_config.json similarity index 100% rename from src/scaler/drivers/orb/config/default_config.json rename to src/scaler/worker_adapter/drivers/orb_common/config/default_config.json diff --git a/src/scaler/worker_adapter/orb/exception.py b/src/scaler/worker_adapter/drivers/orb_common/exception.py similarity index 100% rename from src/scaler/worker_adapter/orb/exception.py rename to src/scaler/worker_adapter/drivers/orb_common/exception.py diff --git a/src/scaler/worker_adapter/orb/helper.py b/src/scaler/worker_adapter/drivers/orb_common/helper.py similarity index 96% rename from src/scaler/worker_adapter/orb/helper.py rename to src/scaler/worker_adapter/drivers/orb_common/helper.py index 8ceb4a435..4e310031b 100644 --- a/src/scaler/worker_adapter/orb/helper.py +++ b/src/scaler/worker_adapter/drivers/orb_common/helper.py @@ -7,8 +7,8 @@ from typing import Any, Dict, List, Optional from scaler.utility.formatter import snakecase_dict -from scaler.worker_adapter.orb.exception import ORBException -from scaler.worker_adapter.orb.types import ORBMachine, ORBRequest, ORBTemplate +from scaler.worker_adapter.drivers.orb_common.exception import ORBException +from scaler.worker_adapter.drivers.orb_common.types import ORBMachine, ORBRequest, ORBTemplate class ORBHelper: @@ -178,14 +178,18 @@ def __init__(self, config_root_path: str): self._temp_dir = tempfile.TemporaryDirectory() self._cwd = self._temp_dir.name + config_src = path.join(config_root_path, "config") + config_dst = path.join(self._cwd, "config") + shutil.copytree( - config_root_path, - self._cwd, + config_src, + config_dst, dirs_exist_ok=True, ignore=shutil.ignore_patterns( ".git", ".venv", ".mypy_cache", "build*", "dist", "__pycache__", "metrics", "logs" ), ) + os.makedirs(path.join(self._cwd, "logs"), exist_ok=True) self.templates = self.Templates(self) diff --git a/src/scaler/worker_adapter/orb/types.py b/src/scaler/worker_adapter/drivers/orb_common/types.py similarity index 100% rename from src/scaler/worker_adapter/orb/types.py rename to src/scaler/worker_adapter/drivers/orb_common/types.py diff --git a/src/scaler/worker_adapter/orb/worker_adapter.py b/src/scaler/worker_adapter/drivers/orb_common/worker_adapter.py similarity index 85% rename from src/scaler/worker_adapter/orb/worker_adapter.py rename to src/scaler/worker_adapter/drivers/orb_common/worker_adapter.py index 2f30ab2e0..bd83414c2 100644 --- a/src/scaler/worker_adapter/orb/worker_adapter.py +++ b/src/scaler/worker_adapter/drivers/orb_common/worker_adapter.py @@ -25,9 +25,10 @@ from scaler.utility.event_loop import create_async_loop_routine, register_event_loop, run_task_forever from scaler.utility.identifiers import WorkerID from scaler.utility.logging.utility import setup_logger -from scaler.worker_adapter.common import WorkerGroupID, format_capabilities -from scaler.worker_adapter.orb.helper import ORBHelper -from scaler.worker_adapter.orb.types import ORBTemplate +from scaler.worker_adapter.drivers.common import WorkerGroupID +from scaler.worker_adapter.drivers.orb_common.helper import ORBHelper +from scaler.worker_adapter.drivers.orb_common.types import ORBTemplate +from scaler.worker_adapter.drivers.orb_ec2.worker_setup import create_user_data, get_orb_worker_name Status = WorkerAdapterCommandResponse.Status logger = logging.getLogger(__name__) @@ -38,17 +39,6 @@ ORB_MAX_POLLING_ATTEMPTS = 60 -def get_orb_worker_name(instance_id: str) -> str: - """ - Returns the deterministic worker name for an ORB instance. - If instance_id is the bash variable '${INSTANCE_ID}', it returns a bash-compatible string. - """ - if instance_id == "${INSTANCE_ID}": - return "Worker|ORB|${INSTANCE_ID}|${INSTANCE_ID//i-/}" - tag = instance_id.replace("i-", "") - return f"Worker|ORB|{instance_id}|{tag}" - - class ORBWorkerAdapter: _config: ORBWorkerAdapterConfig _orb: ORBHelper @@ -106,7 +96,12 @@ def __init__(self, config: ORBWorkerAdapterConfig): self._create_key_pair() key_name = self._created_key_name - user_data = self._create_user_data() + user_data = create_user_data( + worker_config=self._config.worker_config, + adapter_config=self._config.worker_adapter_config, + event_loop=self._config.event_loop, + worker_io_threads=self._config.worker_io_threads, + ) user_data_file_path = os.path.join(self._orb.cwd, "config", "user_data.sh") with open(user_data_file_path, "w") as f: f.write(user_data) @@ -237,53 +232,6 @@ async def __get_loops(self): else: logging.exception(f"{self._ident!r}: failed with unhandled exception:\n{e}") - def _create_user_data(self) -> str: - worker_config = self._config.worker_config - adapter_config = self._config.worker_adapter_config - - # We assume 1 worker per machine for ORB - # TODO: Add support for multiple workers per machine if needed - num_workers = 1 - - # Build the command - # We construct the full WorkerID here so it's deterministic and matches what the adapter calculates - # We fetch instance_id once and use it to construct the ID - script = f"""#!/bin/bash -INSTANCE_ID=$(ec2-metadata --instance-id --quiet) -WORKER_NAME="{get_orb_worker_name('${INSTANCE_ID}')}" - -nohup /usr/local/bin/scaler_cluster {adapter_config.scheduler_address.to_address()} \ - --num-of-workers {num_workers} \ - --worker-names "${{WORKER_NAME}}" \ - --per-worker-task-queue-size {worker_config.per_worker_task_queue_size} \ - --heartbeat-interval-seconds {worker_config.heartbeat_interval_seconds} \ - --task-timeout-seconds {worker_config.task_timeout_seconds} \ - --garbage-collect-interval-seconds {worker_config.garbage_collect_interval_seconds} \ - --death-timeout-seconds {worker_config.death_timeout_seconds} \ - --trim-memory-threshold-bytes {worker_config.trim_memory_threshold_bytes} \ - --event-loop {self._config.event_loop} \ - --worker-io-threads {self._config.worker_io_threads} \ - --deterministic-worker-ids""" - - if worker_config.hard_processor_suspend: - script += " \ - --hard-processor-suspend" - - if adapter_config.object_storage_address: - script += f" \ - --object-storage-address {adapter_config.object_storage_address.to_string()}" - - capabilities = worker_config.per_worker_capabilities.capabilities - if capabilities: - cap_str = format_capabilities(capabilities) - if cap_str.strip(): - script += f" \ - --per-worker-capabilities {cap_str}" - - script += " > /var/log/opengris-scaler.log 2>&1 &\n" - - return script - def _discover_default_subnet(self) -> str: vpcs = self._ec2.describe_vpcs(Filters=[{"Name": "isDefault", "Values": ["true"]}]) if not vpcs["Vpcs"]: diff --git a/src/scaler/worker_adapter/drivers/orb_ec2/__init__.py b/src/scaler/worker_adapter/drivers/orb_ec2/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/scaler/drivers/ami/build.sh b/src/scaler/worker_adapter/drivers/orb_ec2/ami/build.sh similarity index 100% rename from src/scaler/drivers/ami/build.sh rename to src/scaler/worker_adapter/drivers/orb_ec2/ami/build.sh diff --git a/src/scaler/drivers/ami/opengris-scaler.pkr.hcl b/src/scaler/worker_adapter/drivers/orb_ec2/ami/opengris-scaler.pkr.hcl similarity index 100% rename from src/scaler/drivers/ami/opengris-scaler.pkr.hcl rename to src/scaler/worker_adapter/drivers/orb_ec2/ami/opengris-scaler.pkr.hcl diff --git a/src/scaler/worker_adapter/drivers/orb_ec2/worker_setup.py b/src/scaler/worker_adapter/drivers/orb_ec2/worker_setup.py new file mode 100644 index 000000000..79d1a490e --- /dev/null +++ b/src/scaler/worker_adapter/drivers/orb_ec2/worker_setup.py @@ -0,0 +1,61 @@ +from scaler.config.common.worker import WorkerConfig +from scaler.config.common.worker_adapter import WorkerAdapterConfig +from scaler.worker_adapter.drivers.common import format_capabilities + + +def get_orb_worker_name(instance_id: str) -> str: + """ + Returns the deterministic worker name for an ORB instance. + If instance_id is the bash variable '${INSTANCE_ID}', it returns a bash-compatible string. + """ + if instance_id == "${INSTANCE_ID}": + return "Worker|ORB|${INSTANCE_ID}|${INSTANCE_ID//i-/}" + tag = instance_id.replace("i-", "") + return f"Worker|ORB|{instance_id}|{tag}" + + +def create_user_data( + worker_config: WorkerConfig, adapter_config: WorkerAdapterConfig, event_loop: str, worker_io_threads: int +) -> str: + # We assume 1 worker per machine for ORB + # TODO: Add support for multiple workers per machine if needed + num_workers = 1 + + # Build the command + # We construct the full WorkerID here so it's deterministic and matches what the adapter calculates + # We fetch instance_id once and use it to construct the ID + script = f"""#!/bin/bash +INSTANCE_ID=$(ec2-metadata --instance-id --quiet) +WORKER_NAME="{get_orb_worker_name('${INSTANCE_ID}')}" + +nohup /usr/local/bin/scaler_cluster {adapter_config.scheduler_address.to_address()} \ + --num-of-workers {num_workers} \ + --worker-names "${{WORKER_NAME}}" \ + --per-worker-task-queue-size {worker_config.per_worker_task_queue_size} \ + --heartbeat-interval-seconds {worker_config.heartbeat_interval_seconds} \ + --task-timeout-seconds {worker_config.task_timeout_seconds} \ + --garbage-collect-interval-seconds {worker_config.garbage_collect_interval_seconds} \ + --death-timeout-seconds {worker_config.death_timeout_seconds} \ + --trim-memory-threshold-bytes {worker_config.trim_memory_threshold_bytes} \ + --event-loop {event_loop} \ + --worker-io-threads {worker_io_threads} \ + --deterministic-worker-ids""" + + if worker_config.hard_processor_suspend: + script += " \ + --hard-processor-suspend" + + if adapter_config.object_storage_address: + script += f" \ + --object-storage-address {adapter_config.object_storage_address.to_string()}" + + capabilities = worker_config.per_worker_capabilities.capabilities + if capabilities: + cap_str = format_capabilities(capabilities) + if cap_str.strip(): + script += f" \ + --per-worker-capabilities {cap_str}" + + script += " > /var/log/opengris-scaler.log 2>&1 &" + + return script diff --git a/src/scaler/worker_adapter/drivers/symphony/__init__.py b/src/scaler/worker_adapter/drivers/symphony/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/scaler/worker_adapter/symphony/callback.py b/src/scaler/worker_adapter/drivers/symphony/callback.py similarity index 95% rename from src/scaler/worker_adapter/symphony/callback.py rename to src/scaler/worker_adapter/drivers/symphony/callback.py index 6bd63d766..5eec621d4 100644 --- a/src/scaler/worker_adapter/symphony/callback.py +++ b/src/scaler/worker_adapter/drivers/symphony/callback.py @@ -4,7 +4,7 @@ import cloudpickle -from scaler.worker_adapter.symphony.message import SoamMessage +from scaler.worker_adapter.drivers.symphony.message import SoamMessage try: import soamapi diff --git a/src/scaler/worker_adapter/symphony/heartbeat_manager.py b/src/scaler/worker_adapter/drivers/symphony/heartbeat_manager.py similarity index 97% rename from src/scaler/worker_adapter/symphony/heartbeat_manager.py rename to src/scaler/worker_adapter/drivers/symphony/heartbeat_manager.py index 9d8a8ddf7..3699e0eae 100644 --- a/src/scaler/worker_adapter/symphony/heartbeat_manager.py +++ b/src/scaler/worker_adapter/drivers/symphony/heartbeat_manager.py @@ -9,7 +9,7 @@ from scaler.protocol.python.status import Resource from scaler.utility.mixins import Looper from scaler.worker.agent.mixins import HeartbeatManager, TimeoutManager -from scaler.worker_adapter.symphony.task_manager import SymphonyTaskManager +from scaler.worker_adapter.drivers.symphony.task_manager import SymphonyTaskManager class SymphonyHeartbeatManager(Looper, HeartbeatManager): diff --git a/src/scaler/worker_adapter/symphony/message.py b/src/scaler/worker_adapter/drivers/symphony/message.py similarity index 100% rename from src/scaler/worker_adapter/symphony/message.py rename to src/scaler/worker_adapter/drivers/symphony/message.py diff --git a/src/scaler/worker_adapter/symphony/task_manager.py b/src/scaler/worker_adapter/drivers/symphony/task_manager.py similarity index 98% rename from src/scaler/worker_adapter/symphony/task_manager.py rename to src/scaler/worker_adapter/drivers/symphony/task_manager.py index 271a9ed35..1f0e3f6d5 100644 --- a/src/scaler/worker_adapter/symphony/task_manager.py +++ b/src/scaler/worker_adapter/drivers/symphony/task_manager.py @@ -16,8 +16,8 @@ from scaler.utility.queues.async_priority_queue import AsyncPriorityQueue from scaler.utility.serialization import serialize_failure from scaler.worker.agent.mixins import HeartbeatManager, TaskManager -from scaler.worker_adapter.symphony.callback import SessionCallback -from scaler.worker_adapter.symphony.message import SoamMessage +from scaler.worker_adapter.drivers.symphony.callback import SessionCallback +from scaler.worker_adapter.drivers.symphony.message import SoamMessage try: import soamapi diff --git a/src/scaler/worker_adapter/symphony/worker.py b/src/scaler/worker_adapter/drivers/symphony/worker.py similarity index 97% rename from src/scaler/worker_adapter/symphony/worker.py rename to src/scaler/worker_adapter/drivers/symphony/worker.py index 56b79f9fb..12b0629a5 100644 --- a/src/scaler/worker_adapter/symphony/worker.py +++ b/src/scaler/worker_adapter/drivers/symphony/worker.py @@ -26,8 +26,8 @@ from scaler.utility.identifiers import WorkerID from scaler.utility.logging.utility import setup_logger from scaler.worker.agent.timeout_manager import VanillaTimeoutManager -from scaler.worker_adapter.symphony.heartbeat_manager import SymphonyHeartbeatManager -from scaler.worker_adapter.symphony.task_manager import SymphonyTaskManager +from scaler.worker_adapter.drivers.symphony.heartbeat_manager import SymphonyHeartbeatManager +from scaler.worker_adapter.drivers.symphony.task_manager import SymphonyTaskManager class SymphonyWorker(multiprocessing.get_context("spawn").Process): # type: ignore diff --git a/src/scaler/worker_adapter/symphony/worker_adapter.py b/src/scaler/worker_adapter/drivers/symphony/worker_adapter.py similarity index 98% rename from src/scaler/worker_adapter/symphony/worker_adapter.py rename to src/scaler/worker_adapter/drivers/symphony/worker_adapter.py index c33af2bde..1a1e8640b 100644 --- a/src/scaler/worker_adapter/symphony/worker_adapter.py +++ b/src/scaler/worker_adapter/drivers/symphony/worker_adapter.py @@ -21,8 +21,8 @@ from scaler.utility.event_loop import create_async_loop_routine, register_event_loop from scaler.utility.identifiers import WorkerID from scaler.utility.logging.utility import setup_logger -from scaler.worker_adapter.common import WorkerGroupID -from scaler.worker_adapter.symphony.worker import SymphonyWorker +from scaler.worker_adapter.drivers.common import WorkerGroupID +from scaler.worker_adapter.drivers.symphony.worker import SymphonyWorker Status = WorkerAdapterCommandResponse.Status diff --git a/tests/scheduler/test_scaling.py b/tests/scheduler/test_scaling.py index 1b24b1496..358fd347c 100644 --- a/tests/scheduler/test_scaling.py +++ b/tests/scheduler/test_scaling.py @@ -46,7 +46,7 @@ from scaler.utility.identifiers import ClientID, ObjectID, TaskID, WorkerID from scaler.utility.logging.utility import setup_logger from scaler.utility.network_util import get_available_tcp_port -from scaler.worker_adapter.native import NativeWorkerAdapter +from scaler.worker_adapter.drivers.native import NativeWorkerAdapter from tests.utility.utility import logging_test_name