diff --git a/pyproject.toml b/pyproject.toml index a6399d464..9c89a2633 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,7 +45,8 @@ admin = [ "boto3", "ray[default]==2.43.0", "pip", - "cryptography==39.0.1" + "cryptography==39.0.1", + "fakeredis[json]", ] rocklet = [ @@ -110,7 +111,6 @@ test = [ "pytest-trio", "pytest-twisted", "pytest-env", - "fakeredis[json]", ] [tool.setuptools.packages.find] diff --git a/rock/actions/sandbox/response.py b/rock/actions/sandbox/response.py index 54e405f78..6df085993 100644 --- a/rock/actions/sandbox/response.py +++ b/rock/actions/sandbox/response.py @@ -126,3 +126,43 @@ class ChownResponse(BaseModel): class ChmodResponse(BaseModel): success: bool = False message: str = "" + + +class SystemResourceMetrics(BaseModel): + """System resource metrics""" + + total_cpu: float = 0.0 + """Total CPU cores""" + + total_memory: float = 0.0 + """Total memory in GB""" + + available_cpu: float = 0.0 + """Available CPU cores""" + + available_memory: float = 0.0 + """Available memory in GB""" + + gpu_count: int = 0 + """Total GPU count""" + + available_gpu: int = 0 + """Available GPU count""" + + def get_cpu_utilization(self) -> float: + """Get CPU utilization rate (0.0 - 1.0)""" + if self.total_cpu == 0: + return 0.0 + return (self.total_cpu - self.available_cpu) / self.total_cpu + + def get_memory_utilization(self) -> float: + """Get memory utilization rate (0.0 - 1.0)""" + if self.total_memory == 0: + return 0.0 + return (self.total_memory - self.available_memory) / self.total_memory + + def get_gpu_utilization(self) -> float: + """Get GPU utilization rate (0.0 - 1.0)""" + if self.gpu_count == 0: + return 0.0 + return (self.gpu_count - self.available_gpu) / self.gpu_count diff --git a/rock/admin/core/ray_service.py b/rock/admin/core/ray_service.py index 2dee05a36..dfe76f25c 100644 --- a/rock/admin/core/ray_service.py +++ b/rock/admin/core/ray_service.py @@ -1,3 +1,4 @@ +import asyncio import ray import time @@ -34,6 +35,31 @@ def increment_ray_request_count(self): def get_ray_rwlock(self): return self._ray_rwlock + + async def async_ray_get_actor(self, sandbox_id: str): + """Async wrapper for ray.get_actor() using asyncio.to_thread for non-blocking execution.""" + self.increment_ray_request_count() + try: + result = await asyncio.to_thread(ray.get_actor, sandbox_id, namespace=self._config.namespace) + except ValueError as e: + logger.error(f"ray get actor, actor {sandbox_id} not exist", exc_info=e) + raise e + except Exception as e: + logger.error("ray get actor failed", exc_info=e) + error_msg = str(e.args[0]) if len(e.args) > 0 else f"ray get actor failed, {str(e)}" + raise Exception(error_msg) + return result + + async def async_ray_get(self, ray_future: ray.ObjectRef): + """Async wrapper for ray.get() using asyncio.to_thread for non-blocking execution.""" + self.increment_ray_request_count() + try: + result = await asyncio.to_thread(ray.get, ray_future, timeout=60) + except Exception as e: + logger.error("ray get failed", exc_info=e) + error_msg = str(e.args[0]) if len(e.args) > 0 else f"ray get failed, {str(e)}" + raise Exception(error_msg) + return result def _setup_ray_reconnect_scheduler(self): diff --git a/rock/admin/entrypoints/sandbox_api.py b/rock/admin/entrypoints/sandbox_api.py index e64fb7d22..59cbb9e68 100644 --- a/rock/admin/entrypoints/sandbox_api.py +++ b/rock/admin/entrypoints/sandbox_api.py @@ -13,6 +13,7 @@ UploadResponse, WriteFileResponse, ) +from rock.actions.response import ResponseStatus from rock.admin.proto.request import ( SandboxBashAction, SandboxCloseBashSessionRequest, @@ -52,7 +53,7 @@ async def start_async( x_experiment_id: str | None = Header(default="default", alias="X-Experiment-Id"), rock_authorization: str | None = Header(default="default", alias="X-Key"), ) -> RockResponse[SandboxStartResponse]: - sandbox_start_response = await sandbox_manager.start_async( + sandbox_start_response = await sandbox_manager.submit( DockerDeploymentConfig.from_request(request), user_info={ "user_id": x_user_id, @@ -106,14 +107,10 @@ async def create_session(request: SandboxCreateBashSessionRequest) -> RockRespon @sandbox_router.post("/run_in_session") @handle_exceptions(error_message="run in session failed") async def run(action: SandboxBashAction) -> RockResponse[BashObservation]: - return RockResponse(result=await sandbox_manager.run_in_session(action)) - - -@sandbox_router.post("/close_session") -@handle_exceptions(error_message="close session failed") -async def close_session(request: SandboxCloseBashSessionRequest) -> RockResponse[CloseBashSessionResponse]: - return RockResponse(result=await sandbox_manager.close_session(request)) - + result = await sandbox_manager.run_in_session(action) + if result.exit_code is not None and result.exit_code == -1: + return RockResponse(status=ResponseStatus.FAILED, error=result.failure_reason) + return RockResponse(result=result) @sandbox_router.post("/read_file") @handle_exceptions(error_message="read file failed") @@ -136,7 +133,6 @@ async def upload( ) -> RockResponse[UploadResponse]: return RockResponse(result=await sandbox_manager.upload(file, target_path, sandbox_id)) - @sandbox_router.post("/stop") @handle_exceptions(error_message="stop sandbox failed") async def close(sandbox_id: str = Body(..., embed=True)) -> RockResponse[str]: diff --git a/rock/admin/main.py b/rock/admin/main.py index 72f24646d..d8f3d3394 100644 --- a/rock/admin/main.py +++ b/rock/admin/main.py @@ -49,7 +49,11 @@ async def lifespan(app: FastAPI): env_vars.ROCK_ADMIN_ROLE = args.role # init redis provider - if args.env == "local": + if args.env == "test": + from fakeredis import aioredis + redis_provider = RedisProvider(host=None, port=None, password="") + redis_provider.client = aioredis.FakeRedis(decode_responses=True) + elif args.env == "local": redis_provider = None else: redis_provider = RedisProvider( diff --git a/rock/sandbox/base_manager.py b/rock/sandbox/base_manager.py index 45ce81e76..9dd80dbd3 100644 --- a/rock/sandbox/base_manager.py +++ b/rock/sandbox/base_manager.py @@ -109,23 +109,17 @@ async def _collect_and_report_metrics_internal(self): logger.debug(f"Metrics overall report rt:{overall_duration:.4f}s") async def _report_system_resource_metrics(self): - """汇报系统资源指标""" - total_cpu, total_mem, available_cpu, available_mem = await self._collect_system_resource_metrics() - self.metrics_monitor.record_gauge_by_name(MetricsConstants.TOTAL_CPU_RESOURCE, total_cpu) - self.metrics_monitor.record_gauge_by_name(MetricsConstants.TOTAL_MEM_RESOURCE, total_mem) - self.metrics_monitor.record_gauge_by_name(MetricsConstants.AVAILABLE_CPU_RESOURCE, available_cpu) - self.metrics_monitor.record_gauge_by_name(MetricsConstants.AVAILABLE_MEM_RESOURCE, available_mem) + """Report system resource metrics""" + metrics = await self._collect_system_resource_metrics() + self.metrics_monitor.record_gauge_by_name(MetricsConstants.TOTAL_CPU_RESOURCE, metrics.total_cpu) + self.metrics_monitor.record_gauge_by_name(MetricsConstants.TOTAL_MEM_RESOURCE, metrics.total_memory) + self.metrics_monitor.record_gauge_by_name(MetricsConstants.AVAILABLE_CPU_RESOURCE, metrics.available_cpu) + self.metrics_monitor.record_gauge_by_name(MetricsConstants.AVAILABLE_MEM_RESOURCE, metrics.available_memory) async def _collect_system_resource_metrics(self): - """收集系统资源指标""" - cluster_resources = ray.cluster_resources() - available_resources = ray.available_resources() - total_cpu = cluster_resources.get("CPU", 0) - total_mem = cluster_resources.get("memory", 0) / 1024**3 - available_cpu = available_resources.get("CPU", 0) - available_mem = available_resources.get("memory", 0) / 1024**3 - return total_cpu, total_mem, available_cpu, available_mem - + """Collect system resource metrics""" + raise NotImplementedError("This method should be implemented by subclasses") + async def _collect_sandbox_meta(self) -> tuple[int, dict[str, dict[str, str]]]: meta: dict = {} cnt = 0 diff --git a/rock/sandbox/gem_manager.py b/rock/sandbox/gem_manager.py index 742d0972a..e060be78e 100644 --- a/rock/sandbox/gem_manager.py +++ b/rock/sandbox/gem_manager.py @@ -16,13 +16,14 @@ from rock.admin.proto.response import SandboxStartResponse, SandboxStatusResponse from rock.config import RockConfig from rock.deployments.config import DockerDeploymentConfig -from rock.sandbox.sandbox_actor import SandboxActor from rock.sandbox.sandbox_manager import SandboxManager +from rock.sandbox.service.env_service import RayEnvService from rock.utils.providers import RedisProvider from rock.admin.core.ray_service import RayService class GemManager(SandboxManager): + _env_service: RayEnvService def __init__( self, rock_config: RockConfig, @@ -32,10 +33,11 @@ def __init__( enable_runtime_auto_clear: bool = False, ): super().__init__(rock_config, redis_provider, ray_namespace, ray_service, enable_runtime_auto_clear) + self._env_service = RayEnvService(ray_namespace=ray_namespace, ray_service=ray_service) async def env_make(self, env_id: str) -> EnvMakeResponse: config = DockerDeploymentConfig(image=env_vars.ROCK_ENVHUB_DEFAULT_DOCKER_IMAGE) - sandbox_start_response: SandboxStartResponse = await self.start_async(config=config) + sandbox_start_response: SandboxStartResponse = await self.submit(config=config) async def wait_until_alive(sandbox_id: str, interval: float = 1.0): """Internal polling method""" @@ -53,44 +55,22 @@ async def wait_until_alive(sandbox_id: str, interval: float = 1.0): except asyncio.TimeoutError: raise Exception("Sandbox startup timeout after 300s") - sandbox_actor: SandboxActor = await self.async_ray_get_actor(sandbox_start_response.sandbox_id) - if sandbox_actor is None: - raise Exception(f"sandbox {sandbox_start_response.sandbox_id} not found to stop") - response = await self.async_ray_get( - sandbox_actor.env_make.remote( - EnvMakeRequest( - env_id=env_id, - sandbox_id=sandbox_start_response.sandbox_id, - ) + make_response = await self._env_service.env_make( + EnvMakeRequest( + env_id=env_id, + sandbox_id=sandbox_start_response.sandbox_id, ) ) - return response + return make_response async def env_step(self, request: EnvStepRequest) -> EnvStepResponse: - sandbox_id = request.sandbox_id - sandbox_actor: SandboxActor = await self.async_ray_get_actor(sandbox_id) - if sandbox_actor is None: - raise Exception(f"sandbox {sandbox_id} not found to stop") - return await self.async_ray_get(sandbox_actor.env_step.remote(request)) + return await self._env_service.env_step(request) async def env_reset(self, request: EnvResetRequest) -> EnvResetResponse: - sandbox_id = request.sandbox_id - sandbox_actor: SandboxActor = await self.async_ray_get_actor(sandbox_id) - if sandbox_actor is None: - raise Exception(f"sandbox {sandbox_id} not found to stop") - return await self.async_ray_get(sandbox_actor.env_reset.remote(request)) + return await self._env_service.env_reset(request) async def env_close(self, request: EnvCloseRequest) -> EnvCloseResponse: - sandbox_id = request.sandbox_id - sandbox_actor: SandboxActor = await self.async_ray_get_actor(sandbox_id) - if sandbox_actor is None: - raise Exception(f"sandbox {sandbox_id} not found to stop") - response = await self.async_ray_get(sandbox_actor.env_close.remote(request)) - await self.stop(sandbox_id=sandbox_id) - return response + return await self._env_service.env_close(request) async def env_list(self, sandbox_id: str) -> EnvListResponse: - sandbox_actor = await self.async_ray_get_actor(sandbox_id) - if sandbox_actor is None: - raise Exception(f"sandbox {sandbox_id} not found to stop") - return await self.async_ray_get(sandbox_actor.env_list.remote()) + return await self._env_service.env_list(sandbox_id) diff --git a/rock/sandbox/sandbox_manager.py b/rock/sandbox/sandbox_manager.py index 52bed6a47..d84688a00 100644 --- a/rock/sandbox/sandbox_manager.py +++ b/rock/sandbox/sandbox_manager.py @@ -1,14 +1,10 @@ import asyncio -import json import time - -import ray from fastapi import UploadFile from rock import env_vars from rock.actions import ( BashObservation, - CloseBashSessionResponse, CommandResponse, CreateBashSessionResponse, ReadFileResponse, @@ -16,26 +12,29 @@ WriteFileResponse, ) from rock.actions.sandbox.response import IsAliveResponse, State +from rock.admin.proto.request import SandboxCreateSessionRequest as CreateSessionRequest +from rock.admin.proto.request import SandboxCommand as Command +from rock.admin.proto.request import SandboxAction as Action + from rock.actions.sandbox.sandbox_info import SandboxInfo from rock.admin.core.ray_service import RayService from rock.admin.core.redis_key import ALIVE_PREFIX, alive_sandbox_key, timeout_sandbox_key from rock.admin.metrics.decorator import monitor_sandbox_operation -from rock.admin.proto.request import SandboxAction as Action -from rock.admin.proto.request import SandboxCloseBashSessionRequest as CloseBashSessionRequest -from rock.admin.proto.request import SandboxCommand as Command -from rock.admin.proto.request import SandboxCreateSessionRequest as CreateSessionRequest +from rock.admin.proto.response import SandboxStartResponse, SandboxStatusResponse from rock.admin.proto.request import SandboxReadFileRequest as ReadFileRequest from rock.admin.proto.request import SandboxWriteFileRequest as WriteFileRequest -from rock.admin.proto.response import SandboxStartResponse, SandboxStatusResponse from rock.config import RockConfig, RuntimeConfig from rock.deployments.config import DeploymentConfig, DockerDeploymentConfig from rock.deployments.constants import Port from rock.deployments.status import PersistedServiceStatus, ServiceStatus +from rock.deployments.abstract import AbstractDeployment +from rock.deployments.config import DeploymentConfig + from rock.logger import init_logger -from rock.rocklet import __version__ as swe_version -from rock.sandbox import __version__ as gateway_version from rock.sandbox.base_manager import BaseManager -from rock.sandbox.sandbox_actor import SandboxActor +from rock.sandbox.service.deployment.abstract import AbstractDeploymentService +from rock.sandbox.service.deployment.ray import RayDeploymentService +from rock.sandbox.service.sandbox_proxy_service import SandboxProxyService from rock.sdk.common.exceptions import BadRequestRockError from rock.utils import ( EAGLE_EYE_TRACE_ID, @@ -45,12 +44,18 @@ from rock.utils.format import parse_memory_size from rock.utils.providers.redis_provider import RedisProvider from rock.utils.service import build_sandbox_from_redis +from rock.utils.providers import RedisProvider +from rock.admin.core.ray_service import RayService +from rock.rocklet import __version__ as swe_version +from rock.sandbox import __version__ as gateway_version logger = init_logger(__name__) class SandboxManager(BaseManager): _ray_namespace: str = None + _deployment_service: AbstractDeploymentService = None + _proxy_service: SandboxProxyService = None def __init__( self, @@ -63,156 +68,64 @@ def __init__( super().__init__( rock_config, redis_provider=redis_provider, enable_runtime_auto_clear=enable_runtime_auto_clear ) - self._ray_service = ray_service self._ray_namespace = ray_namespace + self._deployment_service = RayDeploymentService(ray_namespace=ray_namespace, ray_service=ray_service) + self._proxy_service = SandboxProxyService(rock_config, redis_provider) logger.info("sandbox service init success") - async def async_ray_get(self, ray_future: ray.ObjectRef): - self._ray_service.increment_ray_request_count() - loop = asyncio.get_running_loop() - try: - result = await loop.run_in_executor(self._executor, lambda r: ray.get(r, timeout=60), ray_future) - except Exception as e: - logger.error("ray get failed", exc_info=e) - error_msg = str(e.args[0]) if len(e.args) > 0 else f"ray get failed, {str(e)}" - raise Exception(error_msg) - return result - - async def async_ray_get_actor(self, sandbox_id: str): - self._ray_service.increment_ray_request_count() - loop = asyncio.get_running_loop() - try: - result = await loop.run_in_executor( - self._executor, ray.get_actor, self.deployment_manager.get_actor_name(sandbox_id), self._ray_namespace - ) - except ValueError as e: - logger.error(f"ray get actor, actor {sandbox_id} not exist", exc_info=e) - raise e - except Exception as e: - logger.error("ray get actor failed", exc_info=e) - error_msg = str(e.args[0]) if len(e.args) > 0 else f"ray get actor failed, {str(e)}" - raise Exception(error_msg) - return result - - async def _check_sandbox_exists_in_redis(self, config: DeploymentConfig): - if isinstance(config, DockerDeploymentConfig) and config.container_name: - sandbox_id = config.container_name - if self._redis_provider and await self._redis_provider.json_get(alive_sandbox_key(sandbox_id), "$"): - raise BadRequestRockError(f"Sandbox {sandbox_id} already exists") - + @monitor_sandbox_operation() async def start_async(self, config: DeploymentConfig, user_info: dict = {}) -> SandboxStartResponse: - async with self._ray_service.get_ray_rwlock().read_lock(): - await self._check_sandbox_exists_in_redis(config) - docker_deployment_config: DockerDeploymentConfig = await self.deployment_manager.init_config(config) - sandbox_id = docker_deployment_config.container_name - logger.info(f"[{sandbox_id}] start_async params:{json.dumps(docker_deployment_config.model_dump(), indent=2)}") - actor_name = self.deployment_manager.get_actor_name(sandbox_id) - - deployment = docker_deployment_config.get_deployment() - - self.validate_sandbox_spec(self.rock_config.runtime, config) - sandbox_actor: SandboxActor = await deployment.creator_actor(actor_name) - user_id = user_info.get("user_id", "default") - experiment_id = user_info.get("experiment_id", "default") - namespace = user_info.get("namespace", "default") - rock_authorization = user_info.get("rock_authorization", "default") - sandbox_actor.start.remote() - sandbox_actor.set_user_id.remote(user_id) - sandbox_actor.set_experiment_id.remote(experiment_id) - sandbox_actor.set_namespace.remote(namespace) - - self._sandbox_meta[sandbox_id] = {"image": docker_deployment_config.image} - logger.info(f"sandbox {sandbox_id} is submitted") - stop_time = str(int(time.time()) + docker_deployment_config.auto_clear_time * 60) - auto_clear_time_dict = { - env_vars.ROCK_SANDBOX_AUTO_CLEAR_TIME_KEY: str(docker_deployment_config.auto_clear_time), - env_vars.ROCK_SANDBOX_EXPIRE_TIME_KEY: stop_time, - } - sandbox_info: SandboxInfo = await self.async_ray_get(sandbox_actor.sandbox_info.remote()) - sandbox_info["user_id"] = user_id - sandbox_info["experiment_id"] = experiment_id - sandbox_info["namespace"] = namespace - sandbox_info["state"] = State.PENDING - sandbox_info["rock_authorization"] = rock_authorization - if self._redis_provider: - await self._redis_provider.json_set(alive_sandbox_key(sandbox_id), "$", sandbox_info) - await self._redis_provider.json_set(timeout_sandbox_key(sandbox_id), "$", auto_clear_time_dict) - return SandboxStartResponse( - sandbox_id=sandbox_id, - host_name=sandbox_info.get("host_name"), - host_ip=sandbox_info.get("host_ip"), - ) + return await self.submit(config, user_info) @monitor_sandbox_operation() - async def start(self, config: DeploymentConfig) -> SandboxStartResponse: - docker_deployment_config: DockerDeploymentConfig = await self.deployment_manager.init_config(config) - - sandbox_id = docker_deployment_config.container_name - actor_name = self.deployment_manager.get_actor_name(sandbox_id) - deployment = docker_deployment_config.get_deployment() - - sandbox_actor: SandboxActor = await deployment.creator_actor(actor_name) - - await self.async_ray_get(sandbox_actor.start.remote()) - logger.info(f"sandbox {sandbox_id} is started") - - while not await self._is_actor_alive(sandbox_id): - logger.debug(f"wait actor for sandbox alive, sandbox_id: {sandbox_id}") - # TODO: timeout check - await asyncio.sleep(1) - await self.get_status(sandbox_id) - - self._sandbox_meta[sandbox_id] = {"image": docker_deployment_config.image} + async def submit(self, config: DeploymentConfig, user_info: dict = {}): + deployment_config: DeploymentConfig = await self.deployment_manager.init_config(config) + sandbox_id = deployment_config.container_name + self.validate_sandbox_spec(self.rock_config.runtime, config) + self._sandbox_meta[sandbox_id] = {"image": deployment_config.image} + sandbox_info: SandboxInfo = await self._deployment_service.submit(deployment_config, user_info) + logger.info(f"sandbox {sandbox_id} is submitted") + + stop_time = str(int(time.time()) + deployment_config.auto_clear_time * 60) + auto_clear_time_dict = { + env_vars.ROCK_SANDBOX_AUTO_CLEAR_TIME_KEY: str(deployment_config.auto_clear_time), + env_vars.ROCK_SANDBOX_EXPIRE_TIME_KEY: stop_time, + } + if self._redis_provider: + await self._redis_provider.json_set(alive_sandbox_key(sandbox_id), "$", sandbox_info) + await self._redis_provider.json_set(timeout_sandbox_key(sandbox_id), "$", auto_clear_time_dict) return SandboxStartResponse( sandbox_id=sandbox_id, - host_name=await self.async_ray_get(sandbox_actor.host_name.remote()), - host_ip=await self.async_ray_get(sandbox_actor.host_ip.remote()), + host_name=sandbox_info.get("host_name"), + host_ip=sandbox_info.get("host_ip"), ) @monitor_sandbox_operation() async def stop(self, sandbox_id): - async with self._ray_service.get_ray_rwlock().read_lock(): - logger.info(f"stop sandbox {sandbox_id}") - try: - sandbox_actor = await self.async_ray_get_actor(sandbox_id) - except ValueError as e: - await self._clear_redis_keys(sandbox_id) - raise Exception(f"sandbox {sandbox_id} not found to stop, {str(e)}") - logger.info(f"start to stop run time {sandbox_id}") - await self.async_ray_get(sandbox_actor.stop.remote()) - logger.info(f"run time stop over {sandbox_id}") - ray.kill(sandbox_actor) - try: - self._sandbox_meta.pop(sandbox_id) - except KeyError: - logger.debug(f"{sandbox_id} key not found") - logger.info(f"sandbox {sandbox_id} stopped") + logger.info(f"stop sandbox {sandbox_id}") + try: + await self._deployment_service.stop(sandbox_id) + except ValueError as e: + logger.error(f"ray get actor, actor {sandbox_id} not exist", exc_info=e) await self._clear_redis_keys(sandbox_id) + try: + self._sandbox_meta.pop(sandbox_id) + except KeyError: + logger.debug(f"{sandbox_id} key not found") + logger.info(f"sandbox {sandbox_id} stopped") + await self._clear_redis_keys(sandbox_id) async def get_mount(self, sandbox_id): - async with self._ray_service.get_ray_rwlock().read_lock(): - sandbox_actor = await self.async_ray_get_actor(sandbox_id) - if sandbox_actor is None: - await self._clear_redis_keys(sandbox_id) - raise Exception(f"sandbox {sandbox_id} not found to get mount") - result = await self.async_ray_get(sandbox_actor.get_mount.remote()) - logger.info(f"get_mount: {result}") - return result + return self._deployment_service.get_mount(sandbox_id) @monitor_sandbox_operation() async def commit(self, sandbox_id, image_tag: str, username: str, password: str) -> CommandResponse: - async with self._ray_service.get_ray_rwlock().read_lock(): - logger.info(f"commit sandbox {sandbox_id}") - sandbox_actor = await self.async_ray_get_actor(sandbox_id) - if sandbox_actor is None: - await self._clear_redis_keys(sandbox_id) - raise Exception(f"sandbox {sandbox_id} not found to commit") - logger.info(f"begin to commit {sandbox_id} to {image_tag}") - result = await self.async_ray_get(sandbox_actor.commit.remote(image_tag, username, password)) - logger.info(f"commit {sandbox_id} to {image_tag} finished, result {result}") - return result + logger.info(f"commit sandbox {sandbox_id}") + result = await self._deployment_service.commit(sandbox_id, image_tag, username, password) + logger.info(f"commit {sandbox_id} to {image_tag} finished, result {result}") + return result async def _clear_redis_keys(self, sandbox_id): if self._redis_provider: @@ -221,56 +134,89 @@ async def _clear_redis_keys(self, sandbox_id): logger.info(f"sandbox {sandbox_id} deleted from redis") @monitor_sandbox_operation() - async def get_status(self, sandbox_id) -> SandboxStatusResponse: - async with self._ray_service.get_ray_rwlock().read_lock(): - sandbox_actor = await self.async_ray_get_actor(sandbox_id) - if sandbox_actor is None: - raise Exception(f"sandbox {sandbox_id} not found to get status") + async def get_status(self, sandbox_id, use_rocklet: bool = False) -> SandboxStatusResponse: + """ + Get sandbox status with optional remote health check. + + Note: The use_rocklet parameter is deprecated and will be removed in a future version. + + Args: + sandbox_id: The sandbox identifier + use_rocklet: If True, performs remote status check and alive verification (default: False) + + Returns: + SandboxStatusResponse with complete status information + """ + # 1. Get sandbox_info (unified exception handling)ß + sandbox_info = await self._get_sandbox_info(sandbox_id) + host_ip = sandbox_info.get("host_ip") + + # 2. Determine status retrieval strategy + if use_rocklet and self._redis_provider: + # Use remote status check with parallel operations + _, remote_status = await asyncio.gather( + self._update_expire_time(sandbox_id), + self.get_remote_status(sandbox_id, host_ip), + ) + + # Update sandbox_info with remote status + sandbox_info.update(remote_status.to_dict()) + + # Check alive status + is_alive = await self._check_alive_status(sandbox_id, host_ip, remote_status) + if is_alive: + sandbox_info["state"] = State.RUNNING + + status = remote_status.phases + port_mapping = remote_status.get_port_mapping() + else: + # Fallback to deployment service status + deployment_info = await self._deployment_service.get_status(sandbox_id) + + # Merge deployment info into sandbox_info + if self._redis_provider: + sandbox_info = await self.build_sandbox_info_from_redis(sandbox_id, deployment_info) else: - remote_status: ServiceStatus = await self.async_ray_get(sandbox_actor.get_status.remote()) - alive = await self.async_ray_get(sandbox_actor.is_alive.remote()) - sandbox_info: SandboxInfo = None - if self._redis_provider: - sandbox_info = await build_sandbox_from_redis(self._redis_provider, sandbox_id) - if sandbox_info is None: - # The start() method will write to redis on the first call to get_status() - sandbox_info = await self.async_ray_get(sandbox_actor.sandbox_info.remote()) - sandbox_info.update(remote_status.to_dict()) - if alive.is_alive: - sandbox_info["state"] = State.RUNNING - await self._redis_provider.json_set(alive_sandbox_key(sandbox_id), "$", sandbox_info) - await self._update_expire_time(sandbox_id) - logger.info(f"sandbox {sandbox_id} status is {sandbox_info}, write to redis") - else: - sandbox_info = await self.async_ray_get(sandbox_actor.sandbox_info.remote()) - - return SandboxStatusResponse( - sandbox_id=sandbox_id, - status=remote_status.phases, - state=sandbox_info.get("state"), - port_mapping=remote_status.get_port_mapping(), - host_name=sandbox_info.get("host_name"), - host_ip=sandbox_info.get("host_ip"), - is_alive=alive.is_alive, - image=sandbox_info.get("image"), - swe_rex_version=swe_version, - gateway_version=gateway_version, - user_id=sandbox_info.get("user_id"), - experiment_id=sandbox_info.get("experiment_id"), - namespace=sandbox_info.get("namespace"), - cpus=sandbox_info.get("cpus"), - memory=sandbox_info.get("memory"), - ) + sandbox_info.update(deployment_info) + + # Update expire time + await self._update_expire_time(sandbox_id) + + status = sandbox_info.get("phases") + port_mapping = sandbox_info.get("port_mapping") + is_alive = sandbox_info.get("state") == State.RUNNING + + # 3. Persist to Redis if available + if self._redis_provider: + await self._redis_provider.json_set(alive_sandbox_key(sandbox_id), "$", sandbox_info) + logger.info(f"sandbox {sandbox_id} status updated, write to redis") + + # 4. Build and return unified response + return SandboxStatusResponse( + sandbox_id=sandbox_id, + status=status, + port_mapping=port_mapping, + state=sandbox_info.get("state"), + host_name=sandbox_info.get("host_name"), + host_ip=host_ip, + is_alive=is_alive, + image=sandbox_info.get("image"), + swe_rex_version=swe_version, + gateway_version=gateway_version, + user_id=sandbox_info.get("user_id"), + experiment_id=sandbox_info.get("experiment_id"), + namespace=sandbox_info.get("namespace"), + cpus=sandbox_info.get("cpus"), + memory=sandbox_info.get("memory"), + ) async def _get_sandbox_info(self, sandbox_id: str) -> SandboxInfo: - """Get sandbox info, prioritize Redis, fallback to Ray Actor""" + """Get sandbox info, prioritize Redis, fallback to deployment service""" if self._redis_provider: sandbox_info = await build_sandbox_from_redis(self._redis_provider, sandbox_id) else: - sandbox_actor = await self.async_ray_get_actor(sandbox_id) - if sandbox_actor is None: - raise Exception(f"sandbox {sandbox_id} not found to get status") - sandbox_info = await self.async_ray_get(sandbox_actor.sandbox_info.remote()) + # Fallback to deployment service (Ray calls are encapsulated in deployment_service) + sandbox_info = await self._deployment_service.get_status(sandbox_id) if sandbox_info is None: raise Exception(f"sandbox {sandbox_id} not found to get status") @@ -293,47 +239,12 @@ async def _check_alive_status( except Exception: return False - @monitor_sandbox_operation() async def get_status_v2(self, sandbox_id) -> SandboxStatusResponse: - # 1. Get sandbox_info (unified exception handling) - sandbox_info = await self._get_sandbox_info(sandbox_id) - - # 2. Parallel execution: update expire time & get remote status - host_ip = sandbox_info.get("host_ip") - _, remote_status = await asyncio.gather( - self._update_expire_time(sandbox_id), - self.get_remote_status(sandbox_id, host_ip), - ) - - # 3. Update sandbox_info and check alive status - sandbox_info.update(remote_status.to_dict()) - is_alive = await self._check_alive_status(sandbox_id, host_ip, remote_status) - if is_alive: - sandbox_info["state"] = State.RUNNING - - # 4. Persist to Redis if Redis exists - if self._redis_provider: - await self._redis_provider.json_set(alive_sandbox_key(sandbox_id), "$", sandbox_info) - logger.info(f"sandbox {sandbox_id} status is {remote_status}, write to redis") - - # 5. Build and return response - return SandboxStatusResponse( - sandbox_id=sandbox_id, - status=remote_status.phases, - port_mapping=remote_status.get_port_mapping(), - state=sandbox_info.get("state"), - host_name=sandbox_info.get("host_name"), - host_ip=sandbox_info.get("host_ip"), - is_alive=is_alive, - image=sandbox_info.get("image"), - swe_rex_version=swe_version, - gateway_version=gateway_version, - user_id=sandbox_info.get("user_id"), - experiment_id=sandbox_info.get("experiment_id"), - namespace=sandbox_info.get("namespace"), - cpus=sandbox_info.get("cpus"), - memory=sandbox_info.get("memory"), - ) + """ + Deprecated: Use get_status(sandbox_id, use_rocklet=True) instead. + This method is kept for backward compatibility. + """ + return await self.get_status(sandbox_id, use_rocklet=True) async def get_remote_status(self, sandbox_id: str, host_ip: str) -> ServiceStatus: service_status_path = PersistedServiceStatus.gen_service_status_path(sandbox_id) @@ -363,59 +274,56 @@ async def get_remote_status(self, sandbox_id: str, host_ip: str) -> ServiceStatu error_msg = ( f"get_remote_status failed! {response.get('failure_reason') if response.get('failure_reason') else ''}" ) - raise Exception(error_msg) + raise Exception(error_msg) + + def get_info_from_response(self, response: SandboxStatusResponse) -> SandboxInfo: + return SandboxInfo( + host_name=response.host_name, + host_ip=response.host_ip, + user_id=response.user_id, + experiment_id=response.experiment_id, + namespace=response.namespace, + sandbox_id=response.sandbox_id, + cpus=response.cpus, + memory=response.memory, + port_mapping=response.port_mapping, + ) + async def build_sandbox_info_from_redis(self, sandbox_id: str, deployment_info: SandboxInfo) -> SandboxInfo | None: + sandbox_status = await self._redis_provider.json_get(alive_sandbox_key(sandbox_id), "$") + if sandbox_status and len(sandbox_status) > 0: + sandbox_info = sandbox_status[0] + remote_info = {k: v for k, v in deployment_info.items() if k in ['phases', 'port_mapping', 'alive', 'state']} + if 'phases' in remote_info and remote_info['phases']: + remote_info['phases'] = {name: phase.to_dict() for name, phase in remote_info['phases'].items()} + sandbox_info.update(remote_info) + else: + sandbox_info = deployment_info + return sandbox_info + async def create_session(self, request: CreateSessionRequest) -> CreateBashSessionResponse: - sandbox_actor = await self.async_ray_get_actor(request.sandbox_id) - if sandbox_actor is None: - raise Exception(f"sandbox {request.sandbox_id} not found to create session") - await self._update_expire_time(request.sandbox_id) - return await self.async_ray_get(sandbox_actor.create_session.remote(request)) + return await self._proxy_service.create_session(request) @monitor_sandbox_operation() async def run_in_session(self, action: Action) -> BashObservation: - sandbox_actor = await self.async_ray_get_actor(action.sandbox_id) - if sandbox_actor is None: - raise Exception(f"sandbox {action.sandbox_id} not found to run in session") - await self._update_expire_time(action.sandbox_id) - return await self.async_ray_get(sandbox_actor.run_in_session.remote(action)) - - async def close_session(self, request: CloseBashSessionRequest) -> CloseBashSessionResponse: - sandbox_actor = await self.async_ray_get_actor(request.sandbox_id) - if sandbox_actor is None: - raise Exception(f"sandbox {request.sandbox_id} not found to close session") - await self._update_expire_time(request.sandbox_id) - return await self.async_ray_get(sandbox_actor.close_session.remote(request)) + return await self._proxy_service.run_in_session(action) async def execute(self, command: Command) -> CommandResponse: - sandbox_actor = await self.async_ray_get_actor(command.sandbox_id) - if sandbox_actor is None: - raise Exception(f"sandbox {command.sandbox_id} not found to execute") - await self._update_expire_time(command.sandbox_id) - return await self.async_ray_get(sandbox_actor.execute.remote(command)) - + return await self._proxy_service.execute(command) + + # TODO:remain for test, delete it after test refactor async def read_file(self, request: ReadFileRequest) -> ReadFileResponse: - sandbox_actor = await self.async_ray_get_actor(request.sandbox_id) - if sandbox_actor is None: - raise Exception(f"sandbox {request.sandbox_id} not found to read file") - await self._update_expire_time(request.sandbox_id) - return await self.async_ray_get(sandbox_actor.read_file.remote(request)) + return await self._proxy_service.read_file(request) + # TODO:remain for test, delete it after test refactor @monitor_sandbox_operation() async def write_file(self, request: WriteFileRequest) -> WriteFileResponse: - sandbox_actor = await self.async_ray_get_actor(request.sandbox_id) - if sandbox_actor is None: - raise Exception(f"sandbox {request.sandbox_id} not found to write file") - await self._update_expire_time(request.sandbox_id) - return await self.async_ray_get(sandbox_actor.write_file.remote(request)) + return await self._proxy_service.write_file(request) + # TODO:remain for test, delete it after test refactor @monitor_sandbox_operation() async def upload(self, file: UploadFile, target_path: str, sandbox_id: str) -> UploadResponse: - sandbox_actor = await self.async_ray_get_actor(sandbox_id) - if sandbox_actor is None: - raise Exception(f"sandbox {sandbox_id} not found to upload file") - await self._update_expire_time(sandbox_id) - return await self.async_ray_get(sandbox_actor.upload.remote(file, target_path)) + return await self._proxy_service.upload(file, target_path, sandbox_id) async def _is_expired(self, sandbox_id): timeout_dict = await self._redis_provider.json_get(timeout_sandbox_key(sandbox_id), "$") @@ -429,14 +337,6 @@ async def _is_expired(self, sandbox_id): logger.info(f"sandbox_id:[{sandbox_id}] is already cleared") return True - async def _is_actor_alive(self, sandbox_id): - try: - actor = await self.async_ray_get_actor(sandbox_id) - return actor is not None - except Exception as e: - logger.error("get actor failed", exc_info=e) - return False - async def _check_job_background(self): if not self._redis_provider: return @@ -456,8 +356,7 @@ async def _check_job_background(self): continue async def get_sandbox_statistics(self, sandbox_id): - sandbox_actor = await self.async_ray_get_actor(sandbox_id) - resource_metrics = await self.async_ray_get(sandbox_actor.get_sandbox_statistics.remote()) + resource_metrics = await self._deployment_service.get_sandbox_statistics(sandbox_id) return resource_metrics async def _update_expire_time(self, sandbox_id): @@ -494,4 +393,8 @@ def validate_sandbox_spec(self, runtime_config: RuntimeConfig, deployment_config ) except ValueError as e: logger.warning(f"Invalid memory size: {deployment_config.memory}", exc_info=e) - raise BadRequestRockError(f"Invalid memory size: {self._config.memory}") + raise BadRequestRockError(f"Invalid memory size: {deployment_config.memory}") + + async def _collect_system_resource_metrics(self): + """Collect system resource metrics, delegate to deployment service""" + return await self._deployment_service.collect_system_resource_metrics() diff --git a/rock/sandbox/service/deployment/abstract.py b/rock/sandbox/service/deployment/abstract.py new file mode 100644 index 000000000..7f7f441e8 --- /dev/null +++ b/rock/sandbox/service/deployment/abstract.py @@ -0,0 +1,45 @@ +from abc import ABC, abstractmethod + +from rock.actions.sandbox.response import CommandResponse, SystemResourceMetrics +from rock.actions.sandbox.sandbox_info import SandboxInfo +from rock.deployments.config import DeploymentConfig +from rock.logger import init_logger + +logger = init_logger(__name__) + +class AbstractDeploymentService(ABC): + """Abstract base class for deployment services implementing IDeploymentService.""" + + @abstractmethod + async def is_alive(self, sandbox_id: str) -> bool: + ... + + @abstractmethod + async def submit(self, config: DeploymentConfig, user_info: dict) -> SandboxInfo: + """Get status of sandbox.""" + ... + + @abstractmethod + async def get_status(self, sandbox_id: str) -> SandboxInfo: + """Get status of sandbox.""" + ... + + @abstractmethod + async def stop(self, sandbox_id: str): + """Stop sandbox.""" + + async def get_mount(self, sandbox_id: str): + """Get mount of sandbox.""" + raise NotImplementedError + + async def get_sandbox_statistics(self, sandbox_id: str): + """Get sandbox statistics.""" + raise NotImplementedError + + async def commit(self, sandbox_id: str, image_tag: str, username: str, password: str) -> CommandResponse: + """Commit sandbox to image.""" + raise NotImplementedError + + async def collect_system_resource_metrics(self) -> SystemResourceMetrics: + """Collect system resource metrics.""" + raise NotImplementedError \ No newline at end of file diff --git a/rock/sandbox/service/deployment/ray.py b/rock/sandbox/service/deployment/ray.py new file mode 100644 index 000000000..0245bfed1 --- /dev/null +++ b/rock/sandbox/service/deployment/ray.py @@ -0,0 +1,127 @@ + +from rock.actions.sandbox.response import CommandResponse, State, SystemResourceMetrics +from rock.actions.sandbox.sandbox_info import SandboxInfo +from rock.admin.core.ray_service import RayService +import ray +from rock.deployments.config import DockerDeploymentConfig +from rock.deployments.docker import DockerDeployment +from rock.deployments.status import ServiceStatus +from rock.logger import init_logger +from rock.sandbox.sandbox_actor import SandboxActor +from rock.sandbox.service.deployment.abstract import AbstractDeploymentService +from rock.sdk.common.exceptions import BadRequestRockError +from rock.utils.format import parse_memory_size + +logger = init_logger(__name__) + +class RayDeploymentService(AbstractDeploymentService): + def __init__(self, ray_namespace: str, ray_service: RayService): + self._ray_namespace = ray_namespace + self._ray_service = ray_service + + def _get_actor_name(self, sandbox_id): + return f"sandbox-{sandbox_id}" + + async def is_alive(self, sandbox_id) -> bool: + try: + actor: SandboxActor = await self._ray_service.async_ray_get_actor(self._get_actor_name(sandbox_id)) + except ValueError: + return False + return await self._ray_service.async_ray_get(actor.is_alive.remote()) + + async def submit(self, config: DockerDeploymentConfig, user_info: dict) -> SandboxInfo: + async with self._ray_service.get_ray_rwlock().read_lock(): + sandbox_actor: SandboxActor = await self.creator_actor(config) + user_id = user_info.get("user_id", "default") + experiment_id = user_info.get("experiment_id", "default") + namespace = user_info.get("namespace", "default") + rock_authorization = user_info.get("rock_authorization", "default") + sandbox_actor.start.remote() + sandbox_actor.set_user_id.remote(user_id) + sandbox_actor.set_experiment_id.remote(experiment_id) + sandbox_actor.set_namespace.remote(namespace) + sandbox_info: SandboxInfo = await self._ray_service.async_ray_get(sandbox_actor.sandbox_info.remote()) + sandbox_info["user_id"] = user_id + sandbox_info["experiment_id"] = experiment_id + sandbox_info["namespace"] = namespace + sandbox_info["state"] = State.PENDING + sandbox_info["rock_authorization"] = rock_authorization + return sandbox_info + + async def creator_actor(self, config: DockerDeploymentConfig): + actor_options = self._generate_actor_options(config) + deployment: DockerDeployment = config.get_deployment() + sandbox_actor = SandboxActor.options(**actor_options).remote(config, deployment) + return sandbox_actor + + def _generate_actor_options(self, config: DockerDeploymentConfig) -> dict: + actor_name = self._get_actor_name(config.container_name) + actor_options = {"name": actor_name, "lifetime": "detached"} + try: + memory = parse_memory_size(config.memory) + actor_options["num_cpus"] = config.cpus + actor_options["memory"] = memory + return actor_options + except ValueError as e: + logger.warning(f"Invalid memory size: {config.memory}", exc_info=e) + raise BadRequestRockError(f"Invalid memory size: {config.memory}") + + async def stop(self, sandbox_id: str): + async with self._ray_service.get_ray_rwlock().read_lock(): + actor: SandboxActor = await self._ray_service.async_ray_get_actor(self._get_actor_name(sandbox_id)) + await self._ray_service.async_ray_get(actor.stop.remote()) + logger.info(f"run time stop over {sandbox_id}") + ray.kill(actor) + + async def get_status(self, sandbox_id: str) -> SandboxInfo: + async with self._ray_service.get_ray_rwlock().read_lock(): + actor: SandboxActor = await self._ray_service.async_ray_get_actor(self._get_actor_name(sandbox_id)) + sandbox_info: SandboxInfo = await self._ray_service.async_ray_get(actor.sandbox_info.remote()) + remote_status: ServiceStatus = await self._ray_service.async_ray_get(actor.get_status.remote()) + sandbox_info["phases"] = remote_status.phases + sandbox_info["port_mapping"] = remote_status.get_port_mapping() + alive = await self._ray_service.async_ray_get(actor.is_alive.remote()) + if alive.is_alive: + sandbox_info["state"] = State.RUNNING + return sandbox_info + + async def get_mount(self, sandbox_id: str): + with self._ray_service.get_ray_rwlock().read_lock(): + actor = await self._ray_service.async_ray_get_actor(self._get_actor_name(sandbox_id)) + result = await self._ray_service.async_ray_get(actor.get_mount.remote()) + logger.info(f"get_mount: {result}") + return result + + async def get_sandbox_statistics(self, sandbox_id: str): + async with self._ray_service.get_ray_rwlock().read_lock(): + actor = await self._ray_service.async_ray_get_actor(self._get_actor_name(sandbox_id)) + result = await self._ray_service.async_ray_get(actor.get_sandbox_statistics.remote()) + logger.info(f"get_sandbox_statistics: {result}") + return result + + async def commit(self, sandbox_id) -> CommandResponse: + with self._ray_service.get_ray_rwlock().read_lock(): + actor = await self._ray_service.async_ray_get_actor(self._get_actor_name(sandbox_id)) + result = await self._ray_service.async_ray_get(actor.commit.remote()) + logger.info(f"commit: {result}") + return result + + async def collect_system_resource_metrics(self) -> SystemResourceMetrics: + """Collect system resource metrics""" + cluster_resources = ray.cluster_resources() + available_resources = ray.available_resources() + total_cpu = cluster_resources.get("CPU", 0) + total_mem = cluster_resources.get("memory", 0) / 1024**3 + available_cpu = available_resources.get("CPU", 0) + available_mem = available_resources.get("memory", 0) / 1024**3 + gpu_count = cluster_resources.get("GPU", 0) + available_gpu = available_resources.get("GPU", 0) + + return SystemResourceMetrics( + total_cpu=total_cpu, + total_memory=total_mem, + available_cpu=available_cpu, + available_memory=available_mem, + gpu_count=int(gpu_count), + available_gpu=int(available_gpu), + ) diff --git a/rock/sandbox/service/env_service.py b/rock/sandbox/service/env_service.py new file mode 100644 index 000000000..f881ac903 --- /dev/null +++ b/rock/sandbox/service/env_service.py @@ -0,0 +1,74 @@ +from abc import ABC, abstractmethod + +from rock.actions.envs.request import EnvCloseRequest, EnvMakeRequest, EnvResetRequest, EnvStepRequest +from rock.actions.envs.response import EnvCloseResponse, EnvListResponse, EnvMakeResponse, EnvResetResponse, EnvStepResponse +from rock.admin.core.ray_service import RayService +from rock.logger import init_logger +from rock.sandbox.sandbox_actor import SandboxActor + +logger = init_logger(__name__) + + +class AbstractEnvService(ABC): + @abstractmethod + async def env_step(self, request: EnvStepRequest) -> EnvStepResponse: + ... + + @abstractmethod + async def env_make(self, request: EnvMakeRequest) -> EnvMakeResponse: + ... + + @abstractmethod + async def env_reset(self, request: EnvResetRequest) -> EnvResetResponse: + ... + + @abstractmethod + async def env_close(self, request: EnvCloseRequest) -> EnvCloseResponse: + ... + + @abstractmethod + async def env_list(self, sandbox_id) -> EnvListResponse: + ... + + +class RayEnvService(AbstractEnvService): + def __init__(self, ray_namespace: str, ray_service: RayService): + self._ray_namespace = ray_namespace + self._ray_service = ray_service + + def _get_actor_name(self, sandbox_id): + return f"sandbox-{sandbox_id}" + + async def env_step(self, request: EnvStepRequest) -> EnvStepResponse: + sandbox_id = request.sandbox_id + actor: SandboxActor = await self._ray_service.async_ray_get_actor(self._get_actor_name(sandbox_id)) + result = await self._ray_service.async_ray_get(actor.env_step.remote(request)) + logger.info(f"env_step: {result}") + return result + + async def env_make(self, request: EnvMakeRequest) -> EnvMakeResponse: + sandbox_id = request.sandbox_id + actor: SandboxActor = await self._ray_service.async_ray_get_actor(self._get_actor_name(sandbox_id)) + result = await self._ray_service.async_ray_get(actor.env_make.remote(request)) + logger.info(f"env_make: {result}") + return result + + async def env_reset(self, request: EnvResetRequest) -> EnvResetResponse: + sandbox_id = request.sandbox_id + actor: SandboxActor = await self._ray_service.async_ray_get_actor(self._get_actor_name(sandbox_id)) + result = await self._ray_service.async_ray_get(actor.env_reset.remote(request)) + logger.info(f"env_reset: {result}") + return result + + async def env_close(self, request: EnvCloseRequest) -> EnvCloseResponse: + sandbox_id = request.sandbox_id + actor: SandboxActor = await self._ray_service.async_ray_get_actor(self._get_actor_name(sandbox_id)) + result = await self._ray_service.async_ray_get(actor.env_close.remote(request)) + logger.info(f"env_close: {result}") + return result + + async def env_list(self, sandbox_id) -> EnvListResponse: + actor: SandboxActor = await self._ray_service.async_ray_get_actor(self._get_actor_name(sandbox_id)) + result = await self._ray_service.async_ray_get(actor.env_list.remote()) + logger.info(f"env_list: {result}") + return result \ No newline at end of file diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index e65fd79bb..0f2a42a0a 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -106,7 +106,7 @@ def admin_remote_server(): [ "admin", "--env", - "local", + "test", "--role", "admin", "--port", diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index b43e75283..1cb571624 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -11,6 +11,7 @@ from rock.deployments.config import DockerDeploymentConfig from rock.logger import init_logger from rock.sandbox.sandbox_manager import SandboxManager +from rock.sandbox.service.deployment.ray import RayDeploymentService from rock.sandbox.service.sandbox_proxy_service import SandboxProxyService from rock.utils.providers.redis_provider import RedisProvider from rock.admin.core.ray_service import RayService @@ -53,6 +54,10 @@ async def sandbox_manager(rock_config: RockConfig, redis_provider: RedisProvider ) return sandbox_manager +@pytest.fixture +async def ray_deployment_service(rock_config: RockConfig, ray_init_shutdown, ray_service): + ray_deployment_service = RayDeploymentService(ray_namespace=rock_config.ray.namespace, ray_service=ray_service) + return ray_deployment_service @pytest.fixture async def sandbox_proxy_service(rock_config: RockConfig, redis_provider: RedisProvider): diff --git a/tests/unit/sandbox/service/test_deployment_service.py b/tests/unit/sandbox/service/test_deployment_service.py new file mode 100644 index 000000000..690e9000d --- /dev/null +++ b/tests/unit/sandbox/service/test_deployment_service.py @@ -0,0 +1,28 @@ +import pytest +from rock.actions.sandbox.response import SystemResourceMetrics + + +@pytest.mark.need_ray +@pytest.mark.asyncio +async def test_get_actor_not_exist_raises_value_error(ray_deployment_service): + sandbox_id = "unknown" + with pytest.raises(Exception) as exc_info: + await ray_deployment_service._ray_service.async_ray_get_actor(sandbox_id) + assert exc_info.type == ValueError + + +@pytest.mark.need_ray +@pytest.mark.asyncio +async def test_collect_system_resource_metrics(ray_deployment_service): + metrics: SystemResourceMetrics = await ray_deployment_service.collect_system_resource_metrics() + assert metrics.total_cpu > 0 + assert metrics.total_memory > 0 + assert metrics.available_cpu >= 0 + assert metrics.available_memory >= 0 + assert metrics.available_cpu <= metrics.total_cpu + assert metrics.available_memory <= metrics.total_memory + # 测试利用率计算 + cpu_utilization = metrics.get_cpu_utilization() + assert 0.0 <= cpu_utilization <= 1.0 + memory_utilization = metrics.get_memory_utilization() + assert 0.0 <= memory_utilization <= 1.0 \ No newline at end of file diff --git a/tests/unit/sandbox/test_sandbox_manager.py b/tests/unit/sandbox/test_sandbox_manager.py index 000241fff..6a4c5a5ca 100644 --- a/tests/unit/sandbox/test_sandbox_manager.py +++ b/tests/unit/sandbox/test_sandbox_manager.py @@ -8,6 +8,7 @@ from rock.actions import SandboxStatusResponse from rock.actions.sandbox.response import State +from rock.config import RockConfig from rock.deployments.config import DockerDeploymentConfig, RayDeploymentConfig from rock.deployments.constants import Port from rock.deployments.status import ServiceStatus @@ -21,22 +22,16 @@ @pytest.mark.need_ray @pytest.mark.asyncio async def test_async_sandbox_start(sandbox_manager: SandboxManager): - response = await sandbox_manager.start_async(DockerDeploymentConfig()) + response = await sandbox_manager.submit(DockerDeploymentConfig()) sandbox_id = response.sandbox_id assert sandbox_id is not None - search_start_time = time.time() - while time.time() - search_start_time < 60: - is_alive_response = await sandbox_manager._is_actor_alive(sandbox_id) - if is_alive_response: - break + assert await wait_sandbox_instance_alive(sandbox_manager, sandbox_id) - is_alive_response = await sandbox_manager._is_actor_alive(sandbox_id) - assert is_alive_response + assert await sandbox_manager._deployment_service.is_alive(sandbox_id) - sandbox_actor = await sandbox_manager.async_ray_get_actor(sandbox_id) - assert sandbox_actor is not None - assert await sandbox_actor.user_id.remote() == "default" - assert await sandbox_actor.experiment_id.remote() == "default" + sandbox_status = await sandbox_manager.get_status(sandbox_id) + assert sandbox_status.user_id == "default" + assert sandbox_status.experiment_id == "default" await sandbox_manager.stop(sandbox_id) @@ -44,7 +39,7 @@ async def test_async_sandbox_start(sandbox_manager: SandboxManager): @pytest.mark.need_ray @pytest.mark.asyncio async def test_get_status(sandbox_manager): - response = await sandbox_manager.start_async(DockerDeploymentConfig(image="python:3.11")) + response = await sandbox_manager.submit(DockerDeploymentConfig(image="python:3.11")) await asyncio.sleep(5) docker_status: SandboxStatusResponse = await sandbox_manager.get_status(response.sandbox_id) assert docker_status.status["docker_run"] @@ -68,41 +63,33 @@ async def test_get_status(sandbox_manager): async def test_ray_actor_is_alive(sandbox_manager): docker_deploy_config = DockerDeploymentConfig() - response = await sandbox_manager.start_async(docker_deploy_config) + response = await sandbox_manager.submit(docker_deploy_config) assert response.sandbox_id is not None - assert await sandbox_manager._is_actor_alive(response.sandbox_id) + assert await wait_sandbox_instance_alive(sandbox_manager, response.sandbox_id) - sandbox_actor = await sandbox_manager.async_ray_get_actor(response.sandbox_id) + actor_name = sandbox_manager._deployment_service._get_actor_name(response.sandbox_id) + sandbox_actor = await sandbox_manager._deployment_service._ray_service.async_ray_get_actor(actor_name) ray.kill(sandbox_actor) - assert not await sandbox_manager._is_actor_alive(response.sandbox_id) + assert not await sandbox_manager._deployment_service.is_alive(response.sandbox_id) @pytest.mark.need_ray @pytest.mark.asyncio async def test_user_info_set_success(sandbox_manager): user_info = {"user_id": "test_user_id", "experiment_id": "test_experiment_id"} - response = await sandbox_manager.start_async(RayDeploymentConfig(), user_info=user_info) + response = await sandbox_manager.submit(RayDeploymentConfig(), user_info=user_info) sandbox_id = response.sandbox_id - cnt = 0 - while True: - is_alive_response = await sandbox_manager._is_actor_alive(sandbox_id) - if is_alive_response: - break - time.sleep(1) - cnt += 1 - if cnt > 60: - raise Exception("sandbox not alive") + assert await wait_sandbox_instance_alive(sandbox_manager, sandbox_id) - is_alive_response = await sandbox_manager._is_actor_alive(sandbox_id) + is_alive_response = await sandbox_manager._deployment_service.is_alive(sandbox_id) assert is_alive_response - sandbox_actor = await sandbox_manager.async_ray_get_actor(sandbox_id) - assert sandbox_actor is not None - assert await sandbox_actor.user_id.remote() == "test_user_id" - assert await sandbox_actor.experiment_id.remote() == "test_experiment_id" + sandbox_status = await sandbox_manager.get_status(sandbox_id) + assert sandbox_status.user_id == "test_user_id" + assert sandbox_status.experiment_id == "test_experiment_id" await sandbox_manager.stop(sandbox_id) @@ -118,7 +105,7 @@ def test_set_sandbox_status_response(): async def test_resource_limit_exception(sandbox_manager, docker_deployment_config): docker_deployment_config.cpus = 20 with pytest.raises(BadRequestRockError) as e: - await sandbox_manager.start_async(docker_deployment_config) + await sandbox_manager.submit(docker_deployment_config) logger.warning(f"Resource limit exception: {str(e)}", exc_info=True) @@ -127,24 +114,27 @@ async def test_resource_limit_exception(sandbox_manager, docker_deployment_confi async def test_resource_limit_exception_memory(sandbox_manager, docker_deployment_config): docker_deployment_config.memory = "65g" with pytest.raises(BadRequestRockError) as e: - await sandbox_manager.start_async(docker_deployment_config) + await sandbox_manager.submit(docker_deployment_config) logger.warning(f"Resource limit exception: {str(e)}", exc_info=True) @pytest.mark.need_ray @pytest.mark.asyncio async def test_get_system_resource_info(sandbox_manager): - total_cpu, total_mem, ava_cpu, ava_mem = await sandbox_manager._collect_system_resource_metrics() - assert total_cpu > 0 - assert total_mem > 0 - assert ava_cpu > 0 - assert ava_mem > 0 + from rock.actions.sandbox.response import SystemResourceMetrics + metrics: SystemResourceMetrics = await sandbox_manager._collect_system_resource_metrics() + assert metrics.total_cpu > 0 + assert metrics.total_memory > 0 + assert metrics.available_cpu >= 0 + assert metrics.available_memory >= 0 + assert metrics.available_cpu <= metrics.total_cpu + assert metrics.available_memory <= metrics.total_memory @pytest.mark.need_ray @pytest.mark.asyncio async def test_get_status_state(sandbox_manager): - response = await sandbox_manager.start_async( + response = await sandbox_manager.submit( DockerDeploymentConfig(), ) sandbox_id = response.sandbox_id @@ -159,11 +149,11 @@ async def test_get_status_state(sandbox_manager): async def test_sandbox_start_with_sandbox_id(sandbox_manager): try: sandbox_id = uuid.uuid4().hex - response = await sandbox_manager.start_async(DockerDeploymentConfig(container_name=sandbox_id)) + response = await sandbox_manager.submit(DockerDeploymentConfig(container_name=sandbox_id)) assert response.sandbox_id == sandbox_id await check_sandbox_status_until_alive(sandbox_manager, sandbox_id) with pytest.raises(BadRequestRockError) as e: - await sandbox_manager.start_async( + await sandbox_manager.submit( DockerDeploymentConfig(container_name=sandbox_id), sandbox_id=sandbox_id, ) @@ -172,11 +162,182 @@ async def test_sandbox_start_with_sandbox_id(sandbox_manager): finally: await sandbox_manager.stop(sandbox_id) +async def wait_sandbox_instance_alive(sandbox_manager: SandboxManager, sandbox_id: str) -> bool: + cnt = 0 + while True: + is_alive_response = await sandbox_manager._deployment_service.is_alive(sandbox_id) + if is_alive_response: + return True + time.sleep(1) + cnt += 1 + if cnt > 60: + raise Exception("sandbox not alive") + + +async def wait_for_rocklet_service_ready(sandbox_manager: SandboxManager, sandbox_id: str, timeout: int = 120): + """Wait for rocklet HTTP service to be ready in container + + Args: + sandbox_manager: SandboxManager instance + sandbox_id: Sandbox ID + timeout: Maximum wait time in seconds + + Raises: + Exception: If service is not ready within timeout + """ + from rock.deployments.constants import Port + from rock.utils import HttpUtils, EAGLE_EYE_TRACE_ID, trace_id_ctx_var + + start_time = time.time() + while time.time() - start_time < timeout: + try: + # Get sandbox info to get host_ip and port + status = await sandbox_manager.get_status(sandbox_id, use_rocklet=False) + if not status.is_alive or not status.host_ip: + await asyncio.sleep(2) + continue + + # Try to connect to rocklet service + port = status.port_mapping.get(Port.PROXY) + if not port: + await asyncio.sleep(2) + continue + + # Test if rocklet service is responding + try: + await HttpUtils.get( + url=f"http://{status.host_ip}:{port}/", + headers={ + "sandbox_id": sandbox_id, + EAGLE_EYE_TRACE_ID: trace_id_ctx_var.get(), + }, + read_timeout=5, + ) + logger.info(f"Rocklet service is ready for sandbox {sandbox_id}") + return + except Exception: + # Service not ready yet, continue waiting + await asyncio.sleep(2) + continue + except Exception as e: + logger.debug(f"Waiting for rocklet service: {e}") + await asyncio.sleep(2) + + raise Exception(f"Rocklet service not ready within {timeout}s for sandbox {sandbox_id}") + + +async def _test_get_status_with_redis(sandbox_manager: SandboxManager, use_rocklet: bool): + """Helper function to test get_status with Redis""" + from rock.admin.core.redis_key import alive_sandbox_key + + # Submit a sandbox + response = await sandbox_manager.submit(DockerDeploymentConfig(image="python:3.11")) + sandbox_id = response.sandbox_id + + try: + # Wait for sandbox to be alive + await check_sandbox_status_until_alive(sandbox_manager, sandbox_id) + + # If using rocklet, wait for rocklet HTTP service to be ready + # if use_rocklet: + # await wait_for_rocklet_service_ready(sandbox_manager, sandbox_id) + + # Test: get_status with Redis + status_response = await sandbox_manager.get_status(sandbox_id, use_rocklet=use_rocklet) + + # Common assertions + assert status_response.sandbox_id == sandbox_id + assert status_response.host_ip is not None + assert status_response.host_name is not None + assert status_response.is_alive is True + assert status_response.state == State.RUNNING + assert len(status_response.port_mapping) > 0 + assert status_response.image == "python:3.11" + + # Verify Redis was used/updated + redis_data = await sandbox_manager._redis_provider.json_get(alive_sandbox_key(sandbox_id), "$") + assert redis_data is not None + assert len(redis_data) > 0 + + # Additional assertions for rocklet mode + if use_rocklet: + # Verify remote status was fetched (phases should be populated) + assert status_response.status is not None + assert "docker_run" in status_response.status + finally: + # Cleanup + await sandbox_manager.stop(sandbox_id) + + +@pytest.mark.need_ray +@pytest.mark.asyncio +async def test_get_status_with_redis_without_rocklet(sandbox_manager: SandboxManager): + """Test get_status: with Redis, without rocklet (use_rocklet=False)""" + await _test_get_status_with_redis(sandbox_manager, use_rocklet=False) + + +@pytest.mark.skip(reason="Skip this test after rocklet port is fixed") +@pytest.mark.need_ray +@pytest.mark.asyncio +async def test_get_status_with_redis_with_rocklet(sandbox_manager: SandboxManager): + """Test get_status: with Redis, with rocklet (use_rocklet=True)""" + await _test_get_status_with_redis(sandbox_manager, use_rocklet=True) + +async def _test_get_status_without_redis(rock_config: RockConfig, ray_service, use_rocklet: bool): + """Helper function to test get_status without Redis""" + # Create sandbox_manager without Redis + sandbox_manager_no_redis = SandboxManager( + rock_config, + redis_provider=None, # No Redis + ray_namespace=rock_config.ray.namespace, + ray_service=ray_service, + enable_runtime_auto_clear=False, + ) + + # Submit a sandbox + response = await sandbox_manager_no_redis.submit(DockerDeploymentConfig(image="python:3.11")) + sandbox_id = response.sandbox_id + + try: + # Wait for sandbox to be alive + await check_sandbox_status_until_alive(sandbox_manager_no_redis, sandbox_id) + + # If using rocklet, wait for rocklet HTTP service to be ready + if use_rocklet: + await wait_for_rocklet_service_ready(sandbox_manager_no_redis, sandbox_id) + + # Test: get_status without Redis + status_response = await sandbox_manager_no_redis.get_status(sandbox_id, use_rocklet=use_rocklet) + + # Common assertions + assert status_response.sandbox_id == sandbox_id + assert status_response.host_ip is not None + assert status_response.host_name is not None + assert status_response.is_alive is True + assert status_response.state == State.RUNNING + assert len(status_response.port_mapping) > 0 + assert status_response.image == "python:3.11" + assert status_response.status is not None + + # Additional assertions for rocklet mode + if use_rocklet: + # Verify remote status was fetched (phases should be populated) + assert "docker_run" in status_response.status + finally: + # Cleanup + await sandbox_manager_no_redis.stop(sandbox_id) + + +@pytest.mark.need_ray +@pytest.mark.asyncio +async def test_get_status_without_redis_without_rocklet(rock_config: RockConfig, ray_init_shutdown, ray_service): + """Test get_status: without Redis, without rocklet (use_rocklet=False)""" + await _test_get_status_without_redis(rock_config, ray_service, use_rocklet=False) + +@pytest.mark.skip(reason="Skip this test after rocklet port is fixed") @pytest.mark.need_ray @pytest.mark.asyncio -async def test_get_actor_not_exist_raises_value_error(sandbox_manager): - sandbox_id = "unknown" - with pytest.raises(Exception) as exc_info: - await sandbox_manager.async_ray_get_actor(sandbox_id) - assert exc_info.type == ValueError \ No newline at end of file +async def test_get_status_without_redis_with_rocklet(rock_config: RockConfig, ray_init_shutdown, ray_service): + """Test get_status: without Redis, with rocklet (use_rocklet=True)""" + await _test_get_status_without_redis(rock_config, ray_service, use_rocklet=True)