Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
92079a6
init refactor
FangwenDave Jan 21, 2026
11e69be
fix test case: revert session related apis
FangwenDave Jan 21, 2026
4063cb8
fix test case: add rock auth to sandbox info
FangwenDave Jan 21, 2026
1b8e903
fix test case: add rock auth in get status
FangwenDave Jan 21, 2026
8a74b11
temproray remain apis for test case
FangwenDave Jan 21, 2026
fdef1a1
remove ray dependency from sandbox manager layer
FangwenDave Jan 22, 2026
4e152d7
fix test case: add fake redis for test env
FangwenDave Jan 22, 2026
3090dc9
fix test case: fix status code of run in session method
FangwenDave Jan 22, 2026
d5db9f4
opt: remove get_deployment method in sandbox manager
FangwenDave Jan 22, 2026
cc1a2e2
opt: move ray service inside deployment service
FangwenDave Jan 22, 2026
5ac5088
remove ray dependency in base manager
FangwenDave Jan 23, 2026
38c971e
fix comment
FangwenDave Jan 23, 2026
3469bac
split gem api from deployment service and sink ray operations to ray …
FangwenDave Jan 23, 2026
bd9c2ad
fix comment
FangwenDave Jan 26, 2026
e126c13
refactor: consolidate status methods by merging get_status_v2 into ge…
FangwenDave Jan 26, 2026
29e31ec
add test case
FangwenDave Jan 26, 2026
1485490
fix test case
FangwenDave Jan 26, 2026
0706157
fix test
FangwenDave Jan 26, 2026
56e6e35
fix get actor name in env service
FangwenDave Jan 26, 2026
07efee9
modify fakeredis dependency group
FangwenDave Jan 27, 2026
998a3d3
optimize get status judgement and param
FangwenDave Jan 27, 2026
ec035dd
fix test case: correct parameter name in sandbox test
FangwenDave Jan 27, 2026
440e6c2
optimize deployment service inherit
FangwenDave Jan 27, 2026
28beced
optimize abstract deployment service default return
FangwenDave Jan 27, 2026
8722ebd
refactor: reorganize deployment service imports and structure
FangwenDave Jan 27, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ admin = [
"boto3",
"ray[default]==2.43.0",
"pip",
"cryptography==39.0.1"
"cryptography==39.0.1",
"fakeredis[json]",
]

rocklet = [
Expand Down Expand Up @@ -110,7 +111,6 @@ test = [
"pytest-trio",
"pytest-twisted",
"pytest-env",
"fakeredis[json]",
]

[tool.setuptools.packages.find]
Expand Down
40 changes: 40 additions & 0 deletions rock/actions/sandbox/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,43 @@ class ChownResponse(BaseModel):
class ChmodResponse(BaseModel):
success: bool = False
message: str = ""


class SystemResourceMetrics(BaseModel):
"""System resource metrics"""

total_cpu: float = 0.0
"""Total CPU cores"""

total_memory: float = 0.0
"""Total memory in GB"""

available_cpu: float = 0.0
"""Available CPU cores"""

available_memory: float = 0.0
"""Available memory in GB"""

gpu_count: int = 0
"""Total GPU count"""

available_gpu: int = 0
"""Available GPU count"""

def get_cpu_utilization(self) -> float:
"""Get CPU utilization rate (0.0 - 1.0)"""
if self.total_cpu == 0:
return 0.0
return (self.total_cpu - self.available_cpu) / self.total_cpu

def get_memory_utilization(self) -> float:
"""Get memory utilization rate (0.0 - 1.0)"""
if self.total_memory == 0:
return 0.0
return (self.total_memory - self.available_memory) / self.total_memory

def get_gpu_utilization(self) -> float:
"""Get GPU utilization rate (0.0 - 1.0)"""
if self.gpu_count == 0:
return 0.0
return (self.gpu_count - self.available_gpu) / self.gpu_count
26 changes: 26 additions & 0 deletions rock/admin/core/ray_service.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import ray
import time

Expand Down Expand Up @@ -34,6 +35,31 @@ def increment_ray_request_count(self):

def get_ray_rwlock(self):
return self._ray_rwlock

async def async_ray_get_actor(self, sandbox_id: str):
"""Async wrapper for ray.get_actor() using asyncio.to_thread for non-blocking execution."""
self.increment_ray_request_count()
try:
result = await asyncio.to_thread(ray.get_actor, sandbox_id, namespace=self._config.namespace)
except ValueError as e:
logger.error(f"ray get actor, actor {sandbox_id} not exist", exc_info=e)
raise e
except Exception as e:
logger.error("ray get actor failed", exc_info=e)
error_msg = str(e.args[0]) if len(e.args) > 0 else f"ray get actor failed, {str(e)}"
raise Exception(error_msg)
return result

async def async_ray_get(self, ray_future: ray.ObjectRef):
"""Async wrapper for ray.get() using asyncio.to_thread for non-blocking execution."""
self.increment_ray_request_count()
try:
result = await asyncio.to_thread(ray.get, ray_future, timeout=60)
except Exception as e:
logger.error("ray get failed", exc_info=e)
error_msg = str(e.args[0]) if len(e.args) > 0 else f"ray get failed, {str(e)}"
raise Exception(error_msg)
return result


def _setup_ray_reconnect_scheduler(self):
Expand Down
16 changes: 6 additions & 10 deletions rock/admin/entrypoints/sandbox_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
UploadResponse,
WriteFileResponse,
)
from rock.actions.response import ResponseStatus
from rock.admin.proto.request import (
SandboxBashAction,
SandboxCloseBashSessionRequest,
Expand Down Expand Up @@ -52,7 +53,7 @@ async def start_async(
x_experiment_id: str | None = Header(default="default", alias="X-Experiment-Id"),
rock_authorization: str | None = Header(default="default", alias="X-Key"),
) -> RockResponse[SandboxStartResponse]:
sandbox_start_response = await sandbox_manager.start_async(
sandbox_start_response = await sandbox_manager.submit(
DockerDeploymentConfig.from_request(request),
user_info={
"user_id": x_user_id,
Expand Down Expand Up @@ -106,14 +107,10 @@ async def create_session(request: SandboxCreateBashSessionRequest) -> RockRespon
@sandbox_router.post("/run_in_session")
@handle_exceptions(error_message="run in session failed")
async def run(action: SandboxBashAction) -> RockResponse[BashObservation]:
return RockResponse(result=await sandbox_manager.run_in_session(action))


@sandbox_router.post("/close_session")
@handle_exceptions(error_message="close session failed")
async def close_session(request: SandboxCloseBashSessionRequest) -> RockResponse[CloseBashSessionResponse]:
return RockResponse(result=await sandbox_manager.close_session(request))

result = await sandbox_manager.run_in_session(action)
if result.exit_code is not None and result.exit_code == -1:
return RockResponse(status=ResponseStatus.FAILED, error=result.failure_reason)
return RockResponse(result=result)

@sandbox_router.post("/read_file")
@handle_exceptions(error_message="read file failed")
Expand All @@ -136,7 +133,6 @@ async def upload(
) -> RockResponse[UploadResponse]:
return RockResponse(result=await sandbox_manager.upload(file, target_path, sandbox_id))


@sandbox_router.post("/stop")
@handle_exceptions(error_message="stop sandbox failed")
async def close(sandbox_id: str = Body(..., embed=True)) -> RockResponse[str]:
Expand Down
6 changes: 5 additions & 1 deletion rock/admin/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ async def lifespan(app: FastAPI):
env_vars.ROCK_ADMIN_ROLE = args.role

# init redis provider
if args.env == "local":
if args.env == "test":
from fakeredis import aioredis
redis_provider = RedisProvider(host=None, port=None, password="")
redis_provider.client = aioredis.FakeRedis(decode_responses=True)
elif args.env == "local":
redis_provider = None
else:
redis_provider = RedisProvider(
Expand Down
24 changes: 9 additions & 15 deletions rock/sandbox/base_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,23 +109,17 @@ async def _collect_and_report_metrics_internal(self):
logger.debug(f"Metrics overall report rt:{overall_duration:.4f}s")

async def _report_system_resource_metrics(self):
"""汇报系统资源指标"""
total_cpu, total_mem, available_cpu, available_mem = await self._collect_system_resource_metrics()
self.metrics_monitor.record_gauge_by_name(MetricsConstants.TOTAL_CPU_RESOURCE, total_cpu)
self.metrics_monitor.record_gauge_by_name(MetricsConstants.TOTAL_MEM_RESOURCE, total_mem)
self.metrics_monitor.record_gauge_by_name(MetricsConstants.AVAILABLE_CPU_RESOURCE, available_cpu)
self.metrics_monitor.record_gauge_by_name(MetricsConstants.AVAILABLE_MEM_RESOURCE, available_mem)
"""Report system resource metrics"""
metrics = await self._collect_system_resource_metrics()
self.metrics_monitor.record_gauge_by_name(MetricsConstants.TOTAL_CPU_RESOURCE, metrics.total_cpu)
self.metrics_monitor.record_gauge_by_name(MetricsConstants.TOTAL_MEM_RESOURCE, metrics.total_memory)
self.metrics_monitor.record_gauge_by_name(MetricsConstants.AVAILABLE_CPU_RESOURCE, metrics.available_cpu)
self.metrics_monitor.record_gauge_by_name(MetricsConstants.AVAILABLE_MEM_RESOURCE, metrics.available_memory)

async def _collect_system_resource_metrics(self):
"""收集系统资源指标"""
cluster_resources = ray.cluster_resources()
available_resources = ray.available_resources()
total_cpu = cluster_resources.get("CPU", 0)
total_mem = cluster_resources.get("memory", 0) / 1024**3
available_cpu = available_resources.get("CPU", 0)
available_mem = available_resources.get("memory", 0) / 1024**3
return total_cpu, total_mem, available_cpu, available_mem

"""Collect system resource metrics"""
raise NotImplementedError("This method should be implemented by subclasses")

async def _collect_sandbox_meta(self) -> tuple[int, dict[str, dict[str, str]]]:
meta: dict = {}
cnt = 0
Expand Down
46 changes: 13 additions & 33 deletions rock/sandbox/gem_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@
from rock.admin.proto.response import SandboxStartResponse, SandboxStatusResponse
from rock.config import RockConfig
from rock.deployments.config import DockerDeploymentConfig
from rock.sandbox.sandbox_actor import SandboxActor
from rock.sandbox.sandbox_manager import SandboxManager
from rock.sandbox.service.env_service import RayEnvService
from rock.utils.providers import RedisProvider
from rock.admin.core.ray_service import RayService


class GemManager(SandboxManager):
_env_service: RayEnvService
def __init__(
self,
rock_config: RockConfig,
Expand All @@ -32,10 +33,11 @@ def __init__(
enable_runtime_auto_clear: bool = False,
):
super().__init__(rock_config, redis_provider, ray_namespace, ray_service, enable_runtime_auto_clear)
self._env_service = RayEnvService(ray_namespace=ray_namespace, ray_service=ray_service)

async def env_make(self, env_id: str) -> EnvMakeResponse:
config = DockerDeploymentConfig(image=env_vars.ROCK_ENVHUB_DEFAULT_DOCKER_IMAGE)
sandbox_start_response: SandboxStartResponse = await self.start_async(config=config)
sandbox_start_response: SandboxStartResponse = await self.submit(config=config)

async def wait_until_alive(sandbox_id: str, interval: float = 1.0):
"""Internal polling method"""
Expand All @@ -53,44 +55,22 @@ async def wait_until_alive(sandbox_id: str, interval: float = 1.0):
except asyncio.TimeoutError:
raise Exception("Sandbox startup timeout after 300s")

sandbox_actor: SandboxActor = await self.async_ray_get_actor(sandbox_start_response.sandbox_id)
if sandbox_actor is None:
raise Exception(f"sandbox {sandbox_start_response.sandbox_id} not found to stop")
response = await self.async_ray_get(
sandbox_actor.env_make.remote(
EnvMakeRequest(
env_id=env_id,
sandbox_id=sandbox_start_response.sandbox_id,
)
make_response = await self._env_service.env_make(
EnvMakeRequest(
env_id=env_id,
sandbox_id=sandbox_start_response.sandbox_id,
)
)
return response
return make_response

async def env_step(self, request: EnvStepRequest) -> EnvStepResponse:
sandbox_id = request.sandbox_id
sandbox_actor: SandboxActor = await self.async_ray_get_actor(sandbox_id)
if sandbox_actor is None:
raise Exception(f"sandbox {sandbox_id} not found to stop")
return await self.async_ray_get(sandbox_actor.env_step.remote(request))
return await self._env_service.env_step(request)

async def env_reset(self, request: EnvResetRequest) -> EnvResetResponse:
sandbox_id = request.sandbox_id
sandbox_actor: SandboxActor = await self.async_ray_get_actor(sandbox_id)
if sandbox_actor is None:
raise Exception(f"sandbox {sandbox_id} not found to stop")
return await self.async_ray_get(sandbox_actor.env_reset.remote(request))
return await self._env_service.env_reset(request)

async def env_close(self, request: EnvCloseRequest) -> EnvCloseResponse:
sandbox_id = request.sandbox_id
sandbox_actor: SandboxActor = await self.async_ray_get_actor(sandbox_id)
if sandbox_actor is None:
raise Exception(f"sandbox {sandbox_id} not found to stop")
response = await self.async_ray_get(sandbox_actor.env_close.remote(request))
await self.stop(sandbox_id=sandbox_id)
return response
return await self._env_service.env_close(request)

async def env_list(self, sandbox_id: str) -> EnvListResponse:
sandbox_actor = await self.async_ray_get_actor(sandbox_id)
if sandbox_actor is None:
raise Exception(f"sandbox {sandbox_id} not found to stop")
return await self.async_ray_get(sandbox_actor.env_list.remote())
return await self._env_service.env_list(sandbox_id)
Loading