diff --git a/nemo_gym/profiling.py b/nemo_gym/profiling.py new file mode 100644 index 000000000..c9eb37572 --- /dev/null +++ b/nemo_gym/profiling.py @@ -0,0 +1,64 @@ +from io import StringIO +from pathlib import Path +from typing import Optional + +import yappi +from gprof2dot import main as gprof2dot_main +from pydantic import BaseModel +from pydot import graph_from_dot_file + + +class Profiler(BaseModel): + name: str + base_profile_dir: Path + + # Used to clean up and filter out unnecessary information in the yappi log + required_str: Optional[str] = None + + def start(self) -> None: + yappi.set_clock_type("CPU") + yappi.start() + print(f"🔍 Enabled profiling for {self.name}") + + def stop(self) -> None: + print(f"🛑 Stopping profiler for {self.name}. Check {self.base_profile_dir} for the metrics!") + yappi.stop() + self.dump() + + def dump(self) -> None: + self.base_profile_dir.mkdir(parents=True, exist_ok=True) + log_path = self.base_profile_dir / f"{self.name}.log" + callgrind_path = self.base_profile_dir / f"{self.name}.callgrind" + callgrind_dotfile_path = self.base_profile_dir / f"{self.name}.dot" + callgrind_graph_path = self.base_profile_dir / f"{self.name}.png" + + yappi.get_func_stats().save(callgrind_path, type="CALLGRIND") + gprof2dot_main(argv=f"--format=callgrind --output={callgrind_dotfile_path} -e 5 -n 5 {callgrind_path}".split()) + + (graph,) = graph_from_dot_file(callgrind_dotfile_path) + graph.write_png(callgrind_graph_path) + + buffer = StringIO() + yappi.get_func_stats().print_all( + out=buffer, + columns={ + 0: ("name", 200), + 1: ("ncall", 10), + 2: ("tsub", 8), + 3: ("ttot", 8), + 4: ("tavg", 8), + }, + ) + + buffer.seek(0) + res = "" + past_header = False + for line in buffer: + if not past_header or (self.required_str and self.required_str in line): + res += line + + if line.startswith("name"): + past_header = True + + with open(log_path, "w") as f: + f.write(res) diff --git a/nemo_gym/server_utils.py b/nemo_gym/server_utils.py index a7ca1f2f8..a687376dd 100644 --- a/nemo_gym/server_utils.py +++ b/nemo_gym/server_utils.py @@ -19,7 +19,6 @@ import sys from abc import abstractmethod from contextlib import asynccontextmanager -from io import StringIO from logging import Filter as LoggingFilter from logging import LogRecord, getLogger from os import environ, getenv @@ -33,7 +32,6 @@ import ray import requests import uvicorn -import yappi from aiohttp import ( ClientResponse, ClientResponseError, @@ -67,6 +65,7 @@ get_first_server_config_dict, get_global_config_dict, ) +from nemo_gym.profiling import Profiler _GLOBAL_AIOHTTP_CLIENT: Union[None, ClientSession] = None @@ -433,6 +432,9 @@ async def exception_handling_middleware(request: Request, call_next): ) response_content = f"Hit an exception in {self.get_session_middleware_key()} calling an inner server: {e.response_content}" + if _GLOBAL_AIOHTTP_CLIENT_REQUEST_DEBUG: + print(response_content) + return JSONResponse(content=response_content, status_code=500) except Exception as e: print( @@ -449,58 +451,26 @@ async def exception_handling_middleware(request: Request, call_next): return JSONResponse(content="An unknown error occurred", status_code=500) def setup_profiling(self, app: FastAPI, profiling_config: ProfilingMiddlewareConfig) -> None: # pragma: no cover - base_profile_dir = PARENT_DIR / profiling_config.profiling_results_dirpath - server_profile_path = (base_profile_dir / self.get_session_middleware_key()).with_suffix(".log") - - base_profile_dir.mkdir(parents=True, exist_ok=True) + base_profile_dir = PARENT_DIR / profiling_config.profiling_results_dirpath / self.get_session_middleware_key() + profiler = Profiler(name=self.config.name, base_profile_dir=base_profile_dir) main_app_lifespan = app.router.lifespan_context - def _dump_yappi_stats() -> str: - buffer = StringIO() - yappi.get_func_stats().print_all( - out=buffer, - columns={ - 0: ("name", 200), - 1: ("ncall", 10), - 2: ("tsub", 8), - 3: ("ttot", 8), - 4: ("tavg", 8), - }, - ) - - buffer.seek(0) - res = "" - past_header = False - for line in buffer: - if not past_header or self.config.entrypoint in line: - res += line - - if line.startswith("name"): - past_header = True - - return res - @asynccontextmanager async def lifespan_wrapper(app): - yappi.set_clock_type("CPU") - yappi.start() - print(f"🔍 Enabled profiling for {self.config.name}") + profiler.start() async with main_app_lifespan(app) as maybe_state: yield maybe_state - print(f"🛑 Stopping profiler for {self.config.name}. Check {server_profile_path} for the metrics!") - yappi.stop() - - with open(server_profile_path, "w") as f: - f.write(_dump_yappi_stats()) + profiler.stop() app.router.lifespan_context = lifespan_wrapper @app.get("/stats") def stats(): - return Response(_dump_yappi_stats()) + profiler.dump() + return Response() def set_ulimit(self, target_soft_limit: int = 65535): # pragma: no cover # From https://github.com/vllm-project/vllm/blob/fed8a9b107df3e27d57728c6911c7d308b871477/vllm/utils/__init__.py#L2790 diff --git a/pyproject.toml b/pyproject.toml index 522d4645a..038c338e7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -158,6 +158,9 @@ dependencies = [ # Updated: Thu Jan 08, 2026 with orjson==3.11.3 # License: Apache 2.0 https://github.com/ijl/orjson/blob/fb3eb1f729c7e7b019f780af5695722c99c7c695/LICENSE-APACHE "orjson", + + "gprof2dot", + "pydot", ] [dependency-groups] diff --git a/responses_api_agents/swe_agents/app.py b/responses_api_agents/swe_agents/app.py index a1e1557aa..bc27ad071 100644 --- a/responses_api_agents/swe_agents/app.py +++ b/responses_api_agents/swe_agents/app.py @@ -13,9 +13,12 @@ # limitations under the License. import asyncio import json +import os +import shlex import sys import time import uuid +import warnings from asyncio import Semaphore from pathlib import Path from typing import Any, Callable, Dict, Optional @@ -34,12 +37,14 @@ SimpleResponsesAPIAgent, ) from nemo_gym.config_types import ModelServerRef +from nemo_gym.global_config import OmegaConf, get_global_config_dict from nemo_gym.openai_utils import ( NeMoGymResponse, NeMoGymResponseCreateParamsNonStreaming, NeMoGymResponseOutputMessage, NeMoGymResponseOutputText, ) +from nemo_gym.profiling import Profiler from responses_api_agents.swe_agents.utils import ( convert_tools_to_function_format, convert_trajectory_to_output_items, @@ -53,14 +58,59 @@ ) +# There are some mysterious Pydantic serialization warnings related to FunctionTool that are not fatal that clutter up logs. +# At some point we can try continue chasing this one down. Example: +# (NemoGym pid=3160799) (swe_agents_val) PydanticSerializationUnexpectedValue(Expected `general-fields` - serialized value may not be as expected [field_name='tools', input_value=FunctionTool(name='str_re... a single call each.\n'), input_type=FunctionTool]) +warnings.filterwarnings("ignore", message="FunctionTool") + + +@ray.remote +class ConcurrentContainerCounter: + def __init__(self): + self.concurrent_containers = 0 + + def increment(self): + self.concurrent_containers += 1 + return self.concurrent_containers + + def decrement(self): + self.concurrent_containers -= 1 + return self.concurrent_containers + + @ray.remote( scheduling_strategy="SPREAD", runtime_env={ "py_executable": sys.executable, }, + num_cpus=1, ) -def runner_ray_remote(runner: Callable, params: dict[str, Any]) -> Any: - return asyncio.run(runner(**params)) +def runner_ray_remote( + concurrent_container_counter: ConcurrentContainerCounter, runner: Callable, params: dict[str, Any] +) -> Any: + ray_submit_time = time.time() + params["ray_submit_time"] = ray_submit_time + + # This is the first instance so we don't need to load anything + with params["metrics_fpath"].open("w") as f: + json.dump({"ray_queue_time": ray_submit_time - params["ray_queue_time"]}, f) + + if params["debug"]: + concurrent_containers = ray.get(concurrent_container_counter.increment.remote()) + print(f"Concurrent container #{concurrent_containers}", file=sys.stderr) + + instance_id = params["problem_info"].get("instance_id", "unknown") + profiler = Profiler(name=instance_id, base_profile_dir=params["persistent_dir"] / "profiling") + profiler.start() + + result = asyncio.run(runner(**params)) + + if params["debug"]: + profiler.stop() + + ray.get(concurrent_container_counter.decrement.remote()) + + return result class SWEBenchWrapperConfig(BaseResponsesAPIAgentConfig): @@ -92,6 +142,12 @@ class SWEBenchWrapperConfig(BaseResponsesAPIAgentConfig): swebench_agent_timeout: int = Field(default=45 * 60, description="Timeout for running the agent (seconds)") + apptainer_memory_limit_mb: int = Field( + default=32 * 1024, description="Memory limit for the apptainer container (MB)" + ) + + command_exec_timeout: int = Field(default=5 * 60, description="Timeout for executing the command (seconds)") + # Concurrency control concurrency: int = Field(default=256, description="Maximum number of concurrent SWE-bench runs") @@ -124,6 +180,9 @@ class SWEBenchWrapperConfig(BaseResponsesAPIAgentConfig): description="Session ID for the run", ) + openhands_should_log: bool = False + debug: bool = False + class SWEBenchRunRequest(BaseRunRequest): """Request format for SWE-bench runs.""" @@ -150,22 +209,47 @@ class SWEBenchVerifyResponse(BaseVerifyResponse): patch_exists: Optional[float] = None # 1.0 if patch exists, 0.0 otherwise patch_successfully_applied: Optional[float] = None # 1.0 if patch applied, 0.0 otherwise + # Profiling time metrics to report + ray_queue_time: float + # generation_apptainer_spinup_time: float + # create_runtime_time: float + # container_initialization_time: float + # connect_to_runtime_time: float + # runtime_initialization_fn_time: float + # total_command_exec_time: float + # total_model_call_time: float + # final_eval_apptainer_spinup_time: float + final_eval_time: float + + # Exit condition metrics to report + # TODO add more exit conditions + # hit_sample_timeout: bool + # hit_trajectory_command_exec_timeout: bool + # hit_eval_timeout: bool + hit_empty_trajectory: bool + hit_success: bool + hit_responses_exception: bool + class SWEBenchWrapper(SimpleResponsesAPIAgent): """Wrapper for NeMo-Skills SWE-bench evaluation in NeMo-Gym.""" config: SWEBenchWrapperConfig sem: Semaphore = None + _container_counter: ConcurrentContainerCounter = None + _global_config_dict_str: str = None model_config = ConfigDict(arbitrary_types_allowed=True) def model_post_init(self, __context: Any) -> None: self.sem = Semaphore(self.config.concurrency) + self._container_counter = ConcurrentContainerCounter.remote() # Pre-build OpenHands environment if using openhands framework if self.config.agent_framework == "openhands": self.config.openhands_setup_dir = setup_openhands_environment( agent_framework_repo=self.config.agent_framework_repo, agent_framework_commit=self.config.agent_framework_commit, + debug=self.config.debug, ) self.config.swebench_setup_dir = setup_swebench_environment() self.config.r2e_gym_setup_dir = setup_r2e_gym_environment() @@ -173,6 +257,9 @@ def model_post_init(self, __context: Any) -> None: print("Dependencies repositories set up complete", flush=True) self.config.run_session_id = f"{int(time.time() * 1000)}_{str(uuid.uuid4())[:8]}" + print(f"Run session ID: {self.config.run_session_id}", flush=True) + + self._global_config_dict_str = shlex.quote(OmegaConf.to_yaml(get_global_config_dict())) async def responses(self, body: NeMoGymResponseCreateParamsNonStreaming = Body()) -> NeMoGymResponse: # Extract problem information from request @@ -184,32 +271,45 @@ async def responses(self, body: NeMoGymResponseCreateParamsNonStreaming = Body() # Get model endpoint model_endpoint = get_model_endpoint(self.config.model_server.name) - # Run SWE-bench evaluation + # Create persistent directory for I/O and logs in local workspace instance_dir = ( f"{problem_info.get('instance_id', 'unknown')}_{int(time.time() * 1000)}_{str(uuid.uuid4())[:8]}" ) + workspace_root = Path(os.path.dirname(os.path.abspath(__file__))) + persistent_dir = workspace_root / f"swebench_results_{self.config.run_session_id}" / instance_dir + persistent_dir.mkdir(parents=True, exist_ok=True) + metrics_fpath = persistent_dir / "nemo_gym_metrics.json" try: + ray_queue_time = time.time() params = { "problem_info": problem_info, "model_endpoint": model_endpoint, "body": body, - "run_session_id": self.config.run_session_id, "agent_framework": self.config.agent_framework, "agent_config": self.config.agent_config, "agent_tools_file": self.config.agent_tools_file, "agent_max_turns": self.config.agent_max_turns, "swebench_tests_timeout": self.config.swebench_tests_timeout, "swebench_agent_timeout": self.config.swebench_agent_timeout, + "persistent_dir": persistent_dir, + "metrics_fpath": metrics_fpath, "agent_framework_repo": self.config.agent_framework_repo, "agent_framework_commit": self.config.agent_framework_commit, "openhands_setup_dir": self.config.openhands_setup_dir, "swebench_setup_dir": self.config.swebench_setup_dir, "r2e_gym_setup_dir": self.config.r2e_gym_setup_dir, "dataset_path": self.config.dataset_path, - "instance_dir": instance_dir, + "ray_queue_time": ray_queue_time, + "openhands_should_log": self.config.openhands_should_log, + "debug": self.config.debug, + "model_server_name": self.config.model_server.name, + "ng_global_config_dict_str": self._global_config_dict_str, + "apptainer_memory_limit_mb": self.config.apptainer_memory_limit_mb, + "command_exec_timeout": self.config.command_exec_timeout, } - future = runner_ray_remote.remote(run_swebench_evaluation, params) + # Run SWE-bench evaluation + future = runner_ray_remote.remote(self._container_counter, run_swebench_evaluation, params) result = await future # Extract trajectory and convert to proper NeMoGym format @@ -253,6 +353,10 @@ async def responses(self, body: NeMoGymResponseCreateParamsNonStreaming = Body() "agent_framework": self.config.agent_framework, "has_trajectory": str(trajectory is not None), "instance_id": result.get("instance_id", problem_info.get("instance_id", "unknown")), + "instance_dir": instance_dir, + "hit_success_str": json.dumps(bool(output_items)), + "hit_empty_trajectory_str": json.dumps(not trajectory), + "hit_responses_exception_str": json.dumps(False), } # Add evaluation results to metadata (convert to strings) @@ -264,6 +368,8 @@ async def responses(self, body: NeMoGymResponseCreateParamsNonStreaming = Body() if "swe-bench-metrics" in result: metadata["swe-bench-metrics"] = json.dumps(result["swe-bench-metrics"]) + metadata["timing_metrics"] = metrics_fpath.read_text() + return NeMoGymResponse( id=f"swebench-{problem_info.get('instance_id', 'unknown')}", created_at=int(time.time()), @@ -296,12 +402,25 @@ async def responses(self, body: NeMoGymResponseCreateParamsNonStreaming = Body() parallel_tool_calls=False, tool_choice="none", tools=[], - metadata={"error": str(e)}, + metadata={ + "error": str(e), + "hit_success_str": json.dumps(False), + "hit_empty_trajectory_str": json.dumps((not trajectory) if "trajectory" in dir() else False), + "hit_responses_exception_str": json.dumps(True), + }, ) async def run(self, body: SWEBenchRunRequest) -> SWEBenchVerifyResponse: """Run and verify SWE-bench solution.""" async with self.sem: + if self.config.debug: + print( + f"Semaphore: {self.config.concurrency - self.sem._value} / {self.config.concurrency}", flush=True + ) + body.responses_create_params.metadata["container_concurrency"] = str( + self.config.concurrency - self.sem._value + ) + # Fix None values in responses_create_params to use defaults # This is needed because the pydantic model has non-Optional fields with defaults @@ -331,7 +450,10 @@ async def run(self, body: SWEBenchRunRequest) -> SWEBenchVerifyResponse: # Add the extracted input messages and tools to the params # Note: tools should already be in the correct format from the response params_with_input = fixed_params.model_copy( - update={"input": input_messages, "tools": response.tools if response.tools else []} + update={ + "input": input_messages, + "tools": [t.model_dump() for t in response.tools] if response.tools else [], + } ) # Extract metrics from response metadata @@ -351,6 +473,8 @@ async def run(self, body: SWEBenchRunRequest) -> SWEBenchVerifyResponse: reward = 1.0 if resolved else 0.0 + hit_metrics = {k.removesuffix("_str"): json.loads(v) for k, v in metadata.items() if k.startswith("hit_")} + # Build verification response with top-level numeric fields for statistics return SWEBenchVerifyResponse( responses_create_params=params_with_input, @@ -367,6 +491,8 @@ async def run(self, body: SWEBenchRunRequest) -> SWEBenchVerifyResponse: "patch_successfully_applied": patch_applied, "resolved": resolved, }, + **json.loads(metadata["timing_metrics"]), + **hit_metrics, ) diff --git a/responses_api_agents/swe_agents/configs/swebench_openhands.yaml b/responses_api_agents/swe_agents/configs/swebench_openhands.yaml index 5e29b3968..137419780 100644 --- a/responses_api_agents/swe_agents/configs/swebench_openhands.yaml +++ b/responses_api_agents/swe_agents/configs/swebench_openhands.yaml @@ -9,14 +9,16 @@ swe_agents: agent_config: responses_api_agents/swe_agents/configs/oh_config.toml agent_max_turns: 100 agent_framework_repo: https://github.com/sdevare-nv/nv-OpenHands.git - agent_framework_commit: 7af10584eb623e6d50a616d3c3c967d7d4fb3690 # pragma: allowlist secret + agent_framework_commit: bxyu/profiling # Container configuration container_formatter: ??? container_folder_path: null - swebench_agent_timeout: 2700 # 45 minutes - swebench_tests_timeout: 1800 - + swebench_agent_timeout: 1800 + swebench_tests_timeout: 900 + apptainer_memory_limit_mb: 32768 + command_exec_timeout: 300 + dataset_path: ??? # Optional model server reference diff --git a/responses_api_agents/swe_agents/configs/swebench_openhands_training.yaml b/responses_api_agents/swe_agents/configs/swebench_openhands_training.yaml index 7e6eacda0..26e7638af 100644 --- a/responses_api_agents/swe_agents/configs/swebench_openhands_training.yaml +++ b/responses_api_agents/swe_agents/configs/swebench_openhands_training.yaml @@ -8,12 +8,14 @@ swe_agents_train: agent_config: responses_api_agents/swe_agents/configs/oh_config.toml agent_max_turns: 100 agent_framework_repo: https://github.com/sdevare-nv/nv-OpenHands.git - agent_framework_commit: 7af10584eb623e6d50a616d3c3c967d7d4fb3690 # pragma: allowlist secret + agent_framework_commit: bxyu/profiling # Container configuration container_formatter: ??? container_folder_path: null - swebench_agent_timeout: 2700 # 45 minutes - swebench_tests_timeout: 900 # 15 minutes + swebench_agent_timeout: 1800 + swebench_tests_timeout: 900 + apptainer_memory_limit_mb: 32768 + command_exec_timeout: 300 dataset_path: ??? model_server: name: policy_model # openai_model @@ -39,12 +41,14 @@ swe_agents_val: agent_config: responses_api_agents/swe_agents/configs/oh_config.toml agent_max_turns: 100 agent_framework_repo: https://github.com/sdevare-nv/nv-OpenHands.git - agent_framework_commit: 7af10584eb623e6d50a616d3c3c967d7d4fb3690 # pragma: allowlist secret + agent_framework_commit: bxyu/profiling # Container configuration container_formatter: ??? container_folder_path: null - swebench_agent_timeout: 2700 # 45 minutes - swebench_tests_timeout: 1800 # 30 minutes + swebench_agent_timeout: 1800 + swebench_tests_timeout: 900 + apptainer_memory_limit_mb: 32768 + command_exec_timeout: 300 dataset_path: ??? # Optional model server reference model_server: diff --git a/responses_api_agents/swe_agents/run_openhands.py b/responses_api_agents/swe_agents/run_openhands.py index 16e30ec4f..8007cd492 100644 --- a/responses_api_agents/swe_agents/run_openhands.py +++ b/responses_api_agents/swe_agents/run_openhands.py @@ -17,6 +17,7 @@ import os import re import shlex +import shutil import time import uuid from dataclasses import dataclass, field @@ -25,6 +26,8 @@ from typing import Any, Optional import tomlkit +from gprof2dot import main as gprof2dot_main +from pydot import graph_from_dot_file class SupportedAgentFrameworks(str, Enum): @@ -62,6 +65,8 @@ class SweBenchGenerationConfig: agent_max_turns: int = 100 swebench_tests_timeout: int = 30 * 60 swebench_agent_timeout: int = 45 * 60 + apptainer_memory_limit_mb: int = 32 * 1024 + command_exec_timeout: int = 5 * 60 inference: SweBenchInferenceConfig = field(default_factory=SweBenchInferenceConfig) server: dict = field(default_factory=dict) @@ -92,11 +97,16 @@ class SweBenchGenerationConfig: @dataclass class RunOpenHandsAgent: cfg: SweBenchGenerationConfig + ng_global_config_dict_str: str + model_server_name: str output_dir: str = None openhands_setup_dir: Path | None = None swebench_setup_dir: Path | None = None r2e_gym_setup_dir: Path | None = None dataset_path: str | None = None + openhands_should_log: bool = False + debug: bool = False + metrics_fpath: Path async def _run_swe_agent(self, data_point, api_base): """ @@ -222,12 +232,23 @@ async def _run_openhands( assert self.openhands_setup_dir is not None, "OpenHands setup directory is not set" agent_script_name = f"agent_script_{agent_run_id}.sh" - cleanup_commands = ( - f"cd /openhands_setup/OpenHands && " - f"mkdir -p /trajectories_mount/trajectories && " - f"cp -r {eval_dir_in_openhands}/*/*/* /trajectories_mount/trajectories/{data_point['instance_id']}/ &&" - f"rm -rf {eval_dir_in_openhands} && rm -rf {config_file_path}" - ) + + if self.debug: + profiling_cmd = "export NG_PROFILING_DIR=/trajectories_mount/profiling && " + else: + profiling_cmd = "" + + if self.openhands_should_log: + log_cmd = "export LOG_LEVEL=DEBUG && export LOG_TO_FILE=true && export NG_OPENHANDS_SHOULD_LOG=true && " + else: + log_cmd = ( + "export LOG_LEVEL=CRITICAL && " + "export DEBUG=False && " + "export DEBUG_LLM=False && " + "export LOG_TO_FILE=False && " + "export LOG_ALL_EVENTS=False && " + "export DEBUG_RUNTIME=False && " + ) agent_main_cmd = ( "if [ -d /workspace ]; then " @@ -250,20 +271,19 @@ async def _run_openhands( # Use pre-built OpenHands "cd /openhands_setup/OpenHands && " "export RUNTIME=local && " - # "export LOG_LEVEL=DEBUG && " - # "export LOG_TO_FILE=true && " - "export LOG_LEVEL=CRITICAL && " - "export DEBUG=False && " - "export DEBUG_LLM=False && " - "export LOG_TO_FILE=False && " - "export LOG_ALL_EVENTS=False && " - "export DEBUG_RUNTIME=False && " + f"{log_cmd}" + f"{profiling_cmd}" + f"export NEMO_GYM_METRICS_FPATH={self.metrics_fpath} && " + f"export NEMO_GYM_CONFIG_DICT={self.ng_global_config_dict_str} && " + f"export NEMO_GYM_MODEL_SERVER_NAME={self.model_server_name} &&" "export VIRTUAL_ENV=/openhands_setup/OpenHands/.venv && " "export PATH=$PATH:/openhands_setup/OpenHands/.venv/bin && " # CRITICAL: Configure poetry to only use the OpenHands venv (ignore external venvs) "export POETRY_VIRTUALENVS_IN_PROJECT=true && " "export POETRY_VIRTUALENVS_CREATE=false && " "export POETRY_VIRTUALENVS_PATH=/openhands_setup/OpenHands && " + f"export TMUX_MEMORY_LIMIT={self.cfg.apptainer_memory_limit_mb} && " + f"export COMMAND_EXEC_TIMEOUT={self.cfg.command_exec_timeout} && " # TODO (sugam): fix cryptography issue # "override_dir=$(mktemp -d /tmp/cryptography_override.XXXX) && " # # Reinstall cryptography inside the container (via poetry's venv) using a compatible wheel @@ -312,22 +332,18 @@ async def _run_openhands( agent_timeout_seconds = self.cfg.swebench_agent_timeout openhands_cmd = ( f"timeout --signal=TERM --kill-after=30 {agent_timeout_seconds} " - f"bash /trajectories_mount/{agent_script_name}; " - f"echo 'Cleaning up...'; " - f"{cleanup_commands}" + f"bash /trajectories_mount/{agent_script_name}" ) search_path = os.path.join( - self.output_dir / "trajectories", - "**", - data_point["instance_id"], + self.openhands_setup_dir / "OpenHands" / eval_dir_in_openhands, "**", "output.jsonl", ) try: # Execute OpenHands command - out_file = await self._execute_container_command( + out_file_in_eval = await self._execute_container_command( data_point=data_point, command=openhands_cmd, expected_file_pattern=search_path, @@ -336,6 +352,12 @@ async def _run_openhands( timeout=self.cfg.swebench_agent_timeout + 60, dataset_mount_path=dataset_mount_path, ) + out_file = self._openhands_dir_copy_from_host( + data_point=data_point, + eval_dir_in_openhands=eval_dir_in_openhands, + config_file_path=config_file_path, + output_file_path=out_file_in_eval, + ) with open(out_file, "r") as f: out_dict = json.loads(f.read().strip()) @@ -353,14 +375,82 @@ async def _run_openhands( "model_name_or_path": out_dict["metadata"]["llm_config"]["model"], "instance_id": out_dict["instance_id"], "model_patch": patch + "\n" if patch and not patch.endswith("\n") else patch, + "oh_time_metrics": out_dict["metrics"], } ) ) + + # Dump out dot and png files from profiling on OpenHands level + if self.debug: + base_profile_dir = Path(self.output_dir) / "profiling" + profiling_name = "openhands" + callgrind_path = base_profile_dir / f"{profiling_name}.callgrind" + callgrind_dotfile_path = base_profile_dir / f"{profiling_name}.dot" + callgrind_graph_path = base_profile_dir / f"{profiling_name}.png" + + gprof2dot_main( + argv=f"--format=callgrind --output={callgrind_dotfile_path} -e 5 -n 5 {callgrind_path}".split() + ) + + (graph,) = graph_from_dot_file(callgrind_dotfile_path) + graph.write_png(callgrind_graph_path) except Exception as e: - print(f"oh run_infer.sh output parsing failed: {e}", flush=True) + self._openhands_dir_copy_from_host( + data_point=data_point, + eval_dir_in_openhands=eval_dir_in_openhands, + config_file_path=config_file_path, + output_file_path=None, + ) + print(f"Running OpenHands failed: {e}", flush=True) return None return pred_file + def _openhands_dir_copy_from_host( + self, + data_point: dict[str, Any], + eval_dir_in_openhands: str, + config_file_path: str, + output_file_path: Optional[str], + ) -> Optional[str]: + eval_dir_on_host = Path(self.openhands_setup_dir) / "OpenHands" / eval_dir_in_openhands + trajectories_root = Path(self.output_dir) / "trajectories" / data_point["instance_id"] + llm_completions_dir = trajectories_root / "llm_completions" / data_point["instance_id"] + trajectories_root.mkdir(parents=True, exist_ok=True) + llm_completions_dir.mkdir(parents=True, exist_ok=True) + + dest_output: Optional[str] = None + if output_file_path: + source_output = Path(output_file_path) + if not source_output.is_absolute(): + source_output = eval_dir_on_host / source_output + if not source_output.exists(): + output_candidates = sorted(eval_dir_on_host.glob("*/*/*/output.jsonl"), key=os.path.getmtime) + if not output_candidates: + raise FileNotFoundError( + f"No output.jsonl found under {eval_dir_on_host} for {data_point['instance_id']}." + ) + source_output = output_candidates[-1] + + dest_output_path = trajectories_root / "output.jsonl" + shutil.copy2(source_output, dest_output_path) + dest_output = str(dest_output_path) + + completion_candidates = glob.glob(str(eval_dir_on_host / "*/*/*/llm_completions/*/*.json")) + if completion_candidates: + latest_completion = max(completion_candidates, key=os.path.getmtime) + shutil.copy2( + latest_completion, + llm_completions_dir / Path(latest_completion).name, + ) + + shutil.rmtree(eval_dir_on_host, ignore_errors=True) + try: + Path(config_file_path).unlink() + except OSError: + pass + + return dest_output + def _write_instance_dataset(self, data_point: dict[str, Any], agent_run_id: str) -> Path: """ To avoid making HF dataset API calls, we write the instance dictionary to a file and mount it in the container. @@ -518,30 +608,20 @@ async def _execute_container_command( mount_args.append(f"--mount type=bind,src={venv_path},dst=/openhands_setup/OpenHands/.venv,ro") mount_args.append(f"--mount type=bind,src={venv_path},dst={venv_path},ro") - # make everything in OpenHands read-only - mount_args.append( - f"--mount type=bind,src={self.openhands_setup_dir}/OpenHands,dst=/openhands_setup/OpenHands,ro" - ) - mount_args.append( - f"--mount type=bind,src={self.openhands_setup_dir}/OpenHands/.eval_sessions,dst=/openhands_setup/OpenHands/.eval_sessions" - ) - mount_args.append( - f"--mount type=bind,src={self.openhands_setup_dir}/OpenHands/.eval_sessions,dst={self.openhands_setup_dir}/OpenHands/.eval_sessions" - ) - mount_args.append( - f"--mount type=bind,src={self.openhands_setup_dir}/OpenHands/logs,dst=/openhands_setup/OpenHands/logs" + mount_args.extend( + [ + # make everything in OpenHands read-only + f"--mount type=bind,src={self.openhands_setup_dir}/OpenHands,dst=/openhands_setup/OpenHands,ro", + f"--mount type=bind,src={self.openhands_setup_dir}/OpenHands/.eval_sessions,dst=/openhands_setup/OpenHands/.eval_sessions", + f"--mount type=bind,src={self.openhands_setup_dir}/OpenHands/.eval_sessions,dst={self.openhands_setup_dir}/OpenHands/.eval_sessions", + f"--mount type=bind,src={self.openhands_setup_dir}/OpenHands/logs,dst=/openhands_setup/OpenHands/logs", + f"--mount type=bind,src={self.openhands_setup_dir}/OpenHands/logs,dst={self.openhands_setup_dir}/OpenHands/logs", + f"--mount type=bind,src={self.openhands_setup_dir}/OpenHands/evaluation/oh,dst=/openhands_setup/OpenHands/evaluation/oh", + f"--mount type=bind,src={self.openhands_setup_dir}/OpenHands/evaluation/oh,dst={self.openhands_setup_dir}/OpenHands/evaluation/oh", + # Data + f"--mount type=bind,src={dataset_path_to_mount},dst=/root/dataset/data.jsonl", + ] ) - mount_args.append( - f"--mount type=bind,src={self.openhands_setup_dir}/OpenHands/logs,dst={self.openhands_setup_dir}/OpenHands/logs" - ) - mount_args.append( - f"--mount type=bind,src={self.openhands_setup_dir}/OpenHands/evaluation/oh,dst=/openhands_setup/OpenHands/evaluation/oh" - ) - mount_args.append( - f"--mount type=bind,src={self.openhands_setup_dir}/OpenHands/evaluation/oh,dst={self.openhands_setup_dir}/OpenHands/evaluation/oh" - ) - - mount_args.append(f"--mount type=bind,src={dataset_path_to_mount},dst=/root/dataset/data.jsonl") miniforge3_path = Path(self.openhands_setup_dir) / "miniforge3" mount_args.append(f"--mount type=bind,src={miniforge3_path},dst=/openhands_setup/miniforge3,ro") @@ -594,10 +674,14 @@ async def _execute_container_command( # Launch Apptainer container and execute the command apptainer_cmd = ( - f"apptainer exec --writable-tmpfs --cleanenv --no-mount home,tmp,bind-paths " + f"apptainer exec --writable-tmpfs --cleanenv --pid --no-mount home,tmp,bind-paths " f"{mount_str} " f" {container_name} bash -c {shlex.quote(combined_command)}" ) + memory_limit_mb = self.cfg.apptainer_memory_limit_mb + if memory_limit_mb is not None and memory_limit_mb > 0: + memory_limit_kb = int(memory_limit_mb) * 1024 + apptainer_cmd = f"ulimit -v {memory_limit_kb} && {apptainer_cmd}" # Retry apptainer command up to max_retries times for attempt in range(max_retries): @@ -632,6 +716,14 @@ async def _execute_container_command( if len(pred_files) == 1: return pred_files[0] + elif len(pred_files) > 1: + latest_file = max(pred_files, key=os.path.getmtime) + print( + f"Multiple outputs found for {data_point['instance_id']} " + f"({len(pred_files)}). Using latest: {latest_file}", + flush=True, + ) + return latest_file else: raise ValueError( f"Expected exactly one file matching {expected_file_pattern} for {data_point['instance_id']}, " @@ -908,7 +1000,7 @@ def check_tests_passed( return required_tests <= passed_tests - async def process_single_datapoint(self, data_point: dict[str, Any]): + async def process_single_datapoint(self, data_point: dict[str, Any], persistent_dir: Path): self.output_dir = Path(self.cfg.output_file).parent agent_run_id = f"{data_point['instance_id']}_{int(time.time())}_{str(uuid.uuid4())[:8]}" @@ -1018,12 +1110,16 @@ async def process_single_datapoint(self, data_point: dict[str, Any]): output_dict = { "swe-bench-metrics": report_json[data_point["instance_id"]], - "swe-bench-outputs": trajectory_dict, + "oh_time_metrics": trajectory_dict.get("oh_time_metrics", None) if trajectory_dict else {}, "generation": "", # required TODO: we should fix this "generation_time": generation_time, "evaluation_time": evaluation_time, } + nemo_gym_metrics = json.loads(self.metrics_fpath.read_text()) + with self.metrics_fpath.open("w") as f: + json.dump(nemo_gym_metrics | {"final_eval_time": evaluation_time}, f) + return output_dict finally: self._cleanup_instance_dataset(instance_dataset_path) diff --git a/responses_api_agents/swe_agents/utils.py b/responses_api_agents/swe_agents/utils.py index 376b4f153..5f08d3a7e 100644 --- a/responses_api_agents/swe_agents/utils.py +++ b/responses_api_agents/swe_agents/utils.py @@ -17,12 +17,14 @@ import os import shutil import subprocess +import sys from contextlib import contextmanager from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from openai.types.responses.function_tool import FunctionTool +from nemo_gym.global_config import get_global_config_dict from nemo_gym.openai_utils import ( NeMoGymEasyInputMessage, NeMoGymFunctionCallOutput, @@ -33,7 +35,7 @@ NeMoGymResponseOutputMessageForTraining, NeMoGymResponseOutputText, ) -from nemo_gym.server_utils import ServerClient, get_first_server_config_dict +from nemo_gym.server_utils import get_first_server_config_dict from responses_api_agents.swe_agents.run_openhands import ( RunOpenHandsAgent, SupportedAgentFrameworks, @@ -610,7 +612,7 @@ def extract_problem_info( def get_model_endpoint(model_server_name: str) -> str: - global_config_dict = ServerClient.load_from_global_config().global_config_dict + global_config_dict = get_global_config_dict() model_server_config = get_first_server_config_dict( global_config_dict, @@ -625,26 +627,30 @@ async def run_swebench_evaluation( problem_info: Dict, model_endpoint: str, body: NeMoGymResponseCreateParamsNonStreaming, - run_session_id: str, agent_framework: str, agent_config: Optional[str], agent_tools_file: Optional[str], agent_max_turns: int, swebench_tests_timeout: int, swebench_agent_timeout: int, + persistent_dir: Path, + metrics_fpath: Path, + ng_global_config_dict_str: str, + model_server_name: str, agent_framework_repo: Optional[str] = None, agent_framework_commit: str = "HEAD", openhands_setup_dir: Optional[Path] = None, swebench_setup_dir: Optional[Path] = None, r2e_gym_setup_dir: Optional[Path] = None, dataset_path: Optional[str] = None, - instance_dir: Optional[str] = None, + ray_queue_time: Optional[float] = None, + ray_submit_time: Optional[float] = None, + openhands_should_log: bool = False, + debug: bool = False, + apptainer_memory_limit_mb: Optional[int] = None, + command_exec_timeout: Optional[int] = None, ) -> Dict: - # Create persistent directory for I/O and logs in local workspace - workspace_root = Path(os.path.dirname(os.path.abspath(__file__))) instance_id = problem_info.get("instance_id", "unknown") - persistent_dir = workspace_root / f"swebench_results_{run_session_id}" / instance_dir - persistent_dir.mkdir(parents=True, exist_ok=True) output_file = persistent_dir / "output.jsonl" inference_params = {} @@ -673,6 +679,8 @@ async def run_swebench_evaluation( agent_max_turns=agent_max_turns, swebench_tests_timeout=swebench_tests_timeout, swebench_agent_timeout=swebench_agent_timeout, + apptainer_memory_limit_mb=apptainer_memory_limit_mb, + command_exec_timeout=command_exec_timeout, inference=inference_config, server=server, ) @@ -683,10 +691,18 @@ async def run_swebench_evaluation( swebench_setup_dir=swebench_setup_dir, r2e_gym_setup_dir=r2e_gym_setup_dir, dataset_path=dataset_path, + ng_global_config_dict_str=ng_global_config_dict_str, + openhands_should_log=openhands_should_log, + debug=debug, + model_server_name=model_server_name, + metrics_fpath=metrics_fpath, ) - result = await run_oh.process_single_datapoint(problem_info) + + result = await run_oh.process_single_datapoint(problem_info, persistent_dir) print(f"Process completed for {instance_id}", flush=True) + result["oh_time_metrics"]["ray_time_in_queue"] = ray_submit_time - ray_queue_time + try: with open(output_file, "w") as f: json.dump(result, f) @@ -707,8 +723,6 @@ async def run_swebench_evaluation( agent_tools_file if agent_framework == "swe_agent" else None, ) - # tools = convert_tools_to_function_format(tools) if tools else [] - result["tools"] = tools result["trajectory"] = trajectory_data @@ -750,6 +764,7 @@ def _run_setup_shell_script( timeout_seconds: int, label: str, timeout_error_message: Optional[str] = None, + debug: bool = False, ) -> None: script_path = setup_dir / script_name @@ -774,8 +789,9 @@ def _run_setup_shell_script( if process.stdout is None: raise RuntimeError("Failed to capture script output") + target_file = sys.stderr if debug else sys.stdout for line in process.stdout: - print(line, end="", flush=True) + print(line, end="", file=target_file) output_lines.append(line) process.wait(timeout=timeout_seconds) @@ -1039,6 +1055,7 @@ def setup_openhands_environment( agent_framework_repo: Optional[str] = "https://github.com/sdevare-nv/nv-OpenHands.git", agent_framework_commit: str = "gym", setup_dir: Optional[Path] = None, + debug: bool = False, ) -> Path: setup_dir = _resolve_setup_directory(setup_dir, "swe_openhands_setup") @@ -1207,6 +1224,7 @@ def setup_openhands_environment( timeout_seconds=1800, label="OpenHands", timeout_error_message="OpenHands setup timed out after 30 minutes", + debug=debug, ) print(f"Setup directory: {setup_dir}", flush=True)