diff --git a/responses_api_agents/swe_agents/app.py b/responses_api_agents/swe_agents/app.py index a1e1557aa..0c48fdfda 100644 --- a/responses_api_agents/swe_agents/app.py +++ b/responses_api_agents/swe_agents/app.py @@ -19,7 +19,6 @@ from asyncio import Semaphore from pathlib import Path from typing import Any, Callable, Dict, Optional - import ray from pydantic import ConfigDict, Field @@ -39,6 +38,8 @@ NeMoGymResponseCreateParamsNonStreaming, NeMoGymResponseOutputMessage, NeMoGymResponseOutputText, + NeMoGymResponseFunctionToolCall, + NeMoGymResponseOutputMessageForTraining, ) from responses_api_agents.swe_agents.utils import ( convert_tools_to_function_format, @@ -60,6 +61,8 @@ }, ) def runner_ray_remote(runner: Callable, params: dict[str, Any]) -> Any: + ray_submit_time = time.time() + params["ray_submit_time"] = ray_submit_time return asyncio.run(runner(**params)) @@ -92,6 +95,12 @@ class SWEBenchWrapperConfig(BaseResponsesAPIAgentConfig): swebench_agent_timeout: int = Field(default=45 * 60, description="Timeout for running the agent (seconds)") + apptainer_memory_limit_mb: int = Field( + default=32 * 1024, description="Memory limit for the apptainer container (MB)" + ) + + command_exec_timeout: int = Field(default=5 * 60, description="Timeout for executing the command (seconds)") + # Concurrency control concurrency: int = Field(default=256, description="Maximum number of concurrent SWE-bench runs") @@ -149,6 +158,10 @@ class SWEBenchVerifyResponse(BaseVerifyResponse): resolved: Optional[float] = None # 1.0 if resolved, 0.0 otherwise patch_exists: Optional[float] = None # 1.0 if patch exists, 0.0 otherwise patch_successfully_applied: Optional[float] = None # 1.0 if patch applied, 0.0 otherwise + is_nemo_gym_in_assistant_message: Optional[float] = ( + None # 1.0 if nemo-gym is in the assistant message, 0.0 otherwise + ) + is_finish_tool_call: Optional[float] = None # 1.0 if finish tool call is detected, 0.0 otherwise class SWEBenchWrapper(SimpleResponsesAPIAgent): @@ -173,6 +186,7 @@ def model_post_init(self, __context: Any) -> None: print("Dependencies repositories set up complete", flush=True) self.config.run_session_id = f"{int(time.time() * 1000)}_{str(uuid.uuid4())[:8]}" + print(f"Run session ID: {self.config.run_session_id}", flush=True) async def responses(self, body: NeMoGymResponseCreateParamsNonStreaming = Body()) -> NeMoGymResponse: # Extract problem information from request @@ -189,6 +203,7 @@ async def responses(self, body: NeMoGymResponseCreateParamsNonStreaming = Body() f"{problem_info.get('instance_id', 'unknown')}_{int(time.time() * 1000)}_{str(uuid.uuid4())[:8]}" ) try: + ray_queue_time = time.time() params = { "problem_info": problem_info, "model_endpoint": model_endpoint, @@ -207,6 +222,9 @@ async def responses(self, body: NeMoGymResponseCreateParamsNonStreaming = Body() "r2e_gym_setup_dir": self.config.r2e_gym_setup_dir, "dataset_path": self.config.dataset_path, "instance_dir": instance_dir, + "ray_queue_time": ray_queue_time, + "apptainer_memory_limit_mb": self.config.apptainer_memory_limit_mb, + "command_exec_timeout": self.config.command_exec_timeout, } future = runner_ray_remote.remote(run_swebench_evaluation, params) @@ -299,6 +317,32 @@ async def responses(self, body: NeMoGymResponseCreateParamsNonStreaming = Body() metadata={"error": str(e)}, ) + def check_finish_tool_call(self, response: NeMoGymResponse) -> bool: + if not response.output: + return False + + last_message = response.output[-1] + if isinstance(last_message, NeMoGymResponseFunctionToolCall) and last_message.name == "finish": + print(f"[REWARD] Finish tool call: {last_message.name} detected", flush=True) + return True + + return False + + def check_nemo_gym_in_assistant_message(self, response: NeMoGymResponse) -> bool: + if not response.output: + return False + + for message in response.output: + if ( + isinstance(message, NeMoGymResponseOutputMessageForTraining) + and message.role == "assistant" + and ("nemogym" in message.content[0].text.lower() or "litellm" in message.content[0].text.lower()) + ): + print(f"[REWARD] Nemo-Gym in assistant message: {message.content[0].text}", flush=True) + return True + + return False + async def run(self, body: SWEBenchRunRequest) -> SWEBenchVerifyResponse: """Run and verify SWE-bench solution.""" async with self.sem: @@ -342,8 +386,18 @@ async def run(self, body: SWEBenchRunRequest) -> SWEBenchVerifyResponse: # Parse metrics from JSON string if present metrics = json.loads(metadata.get("swe-bench-metrics", "{}")) if "swe-bench-metrics" in metadata else {} + # Only consider the response resolved if the finish tool call is present and the resolved metric is True + is_finish_tool_call = self.check_finish_tool_call(response) + if is_finish_tool_call: + resolved = metrics.get("resolved") or (metadata.get("resolved") == "True") + else: + resolved = False + + # TODO: remove this check after behavior fix + is_nemo_gym_in_assistant_message = self.check_nemo_gym_in_assistant_message(response) + resolved = resolved and not is_nemo_gym_in_assistant_message + # Extract individual metrics with proper type conversion - resolved = metrics.get("resolved") or (metadata.get("resolved") == "True") patch_exists = metrics.get("patch_exists") or (metadata.get("patch_exists") == "True") patch_applied = metrics.get("patch_successfully_applied") or ( metadata.get("patch_successfully_applied") == "True" @@ -359,6 +413,8 @@ async def run(self, body: SWEBenchRunRequest) -> SWEBenchVerifyResponse: resolved=1.0 if resolved else 0.0, patch_exists=1.0 if patch_exists else 0.0, patch_successfully_applied=1.0 if patch_applied else 0.0, + is_nemo_gym_in_assistant_message=1.0 if is_nemo_gym_in_assistant_message else 0.0, + is_finish_tool_call=1.0 if is_finish_tool_call else 0.0, swebench_metrics=metrics, metadata={ "instance_id": metadata.get("instance_id", "unknown"), @@ -366,6 +422,8 @@ async def run(self, body: SWEBenchRunRequest) -> SWEBenchVerifyResponse: "patch_exists": patch_exists, "patch_successfully_applied": patch_applied, "resolved": resolved, + "is_nemo_gym_in_assistant_message": is_nemo_gym_in_assistant_message, + "is_finish_tool_call": is_finish_tool_call, }, ) diff --git a/responses_api_agents/swe_agents/configs/swebench_openhands.yaml b/responses_api_agents/swe_agents/configs/swebench_openhands.yaml index 5e29b3968..fa0d950a0 100644 --- a/responses_api_agents/swe_agents/configs/swebench_openhands.yaml +++ b/responses_api_agents/swe_agents/configs/swebench_openhands.yaml @@ -9,14 +9,16 @@ swe_agents: agent_config: responses_api_agents/swe_agents/configs/oh_config.toml agent_max_turns: 100 agent_framework_repo: https://github.com/sdevare-nv/nv-OpenHands.git - agent_framework_commit: 7af10584eb623e6d50a616d3c3c967d7d4fb3690 # pragma: allowlist secret + agent_framework_commit: d6bd12edd828a6af88f8036583a6a8e209315eeb # Container configuration container_formatter: ??? container_folder_path: null - swebench_agent_timeout: 2700 # 45 minutes - swebench_tests_timeout: 1800 - + swebench_agent_timeout: 1800 + swebench_tests_timeout: 900 + apptainer_memory_limit_mb: 32768 + command_exec_timeout: 300 + dataset_path: ??? # Optional model server reference diff --git a/responses_api_agents/swe_agents/configs/swebench_openhands_training.yaml b/responses_api_agents/swe_agents/configs/swebench_openhands_training.yaml index 7e6eacda0..892ee33d1 100644 --- a/responses_api_agents/swe_agents/configs/swebench_openhands_training.yaml +++ b/responses_api_agents/swe_agents/configs/swebench_openhands_training.yaml @@ -8,12 +8,14 @@ swe_agents_train: agent_config: responses_api_agents/swe_agents/configs/oh_config.toml agent_max_turns: 100 agent_framework_repo: https://github.com/sdevare-nv/nv-OpenHands.git - agent_framework_commit: 7af10584eb623e6d50a616d3c3c967d7d4fb3690 # pragma: allowlist secret + agent_framework_commit: d6bd12edd828a6af88f8036583a6a8e209315eeb # Container configuration container_formatter: ??? container_folder_path: null - swebench_agent_timeout: 2700 # 45 minutes - swebench_tests_timeout: 900 # 15 minutes + swebench_agent_timeout: 1800 + swebench_tests_timeout: 900 + apptainer_memory_limit_mb: 32768 + command_exec_timeout: 300 dataset_path: ??? model_server: name: policy_model # openai_model @@ -39,12 +41,14 @@ swe_agents_val: agent_config: responses_api_agents/swe_agents/configs/oh_config.toml agent_max_turns: 100 agent_framework_repo: https://github.com/sdevare-nv/nv-OpenHands.git - agent_framework_commit: 7af10584eb623e6d50a616d3c3c967d7d4fb3690 # pragma: allowlist secret + agent_framework_commit: d6bd12edd828a6af88f8036583a6a8e209315eeb # Container configuration container_formatter: ??? container_folder_path: null - swebench_agent_timeout: 2700 # 45 minutes - swebench_tests_timeout: 1800 # 30 minutes + swebench_agent_timeout: 1800 + swebench_tests_timeout: 900 + apptainer_memory_limit_mb: 32768 + command_exec_timeout: 300 dataset_path: ??? # Optional model server reference model_server: diff --git a/responses_api_agents/swe_agents/run_openhands.py b/responses_api_agents/swe_agents/run_openhands.py index 16e30ec4f..c61999018 100644 --- a/responses_api_agents/swe_agents/run_openhands.py +++ b/responses_api_agents/swe_agents/run_openhands.py @@ -17,6 +17,7 @@ import os import re import shlex +import shutil import time import uuid from dataclasses import dataclass, field @@ -62,6 +63,8 @@ class SweBenchGenerationConfig: agent_max_turns: int = 100 swebench_tests_timeout: int = 30 * 60 swebench_agent_timeout: int = 45 * 60 + apptainer_memory_limit_mb: int = 32 * 1024 + command_exec_timeout: int = 5 * 60 inference: SweBenchInferenceConfig = field(default_factory=SweBenchInferenceConfig) server: dict = field(default_factory=dict) @@ -222,12 +225,6 @@ async def _run_openhands( assert self.openhands_setup_dir is not None, "OpenHands setup directory is not set" agent_script_name = f"agent_script_{agent_run_id}.sh" - cleanup_commands = ( - f"cd /openhands_setup/OpenHands && " - f"mkdir -p /trajectories_mount/trajectories && " - f"cp -r {eval_dir_in_openhands}/*/*/* /trajectories_mount/trajectories/{data_point['instance_id']}/ &&" - f"rm -rf {eval_dir_in_openhands} && rm -rf {config_file_path}" - ) agent_main_cmd = ( "if [ -d /workspace ]; then " @@ -264,6 +261,8 @@ async def _run_openhands( "export POETRY_VIRTUALENVS_IN_PROJECT=true && " "export POETRY_VIRTUALENVS_CREATE=false && " "export POETRY_VIRTUALENVS_PATH=/openhands_setup/OpenHands && " + f"export TMUX_MEMORY_LIMIT={self.cfg.apptainer_memory_limit_mb} && " + f"export COMMAND_EXEC_TIMEOUT={self.cfg.command_exec_timeout} && " # TODO (sugam): fix cryptography issue # "override_dir=$(mktemp -d /tmp/cryptography_override.XXXX) && " # # Reinstall cryptography inside the container (via poetry's venv) using a compatible wheel @@ -312,22 +311,18 @@ async def _run_openhands( agent_timeout_seconds = self.cfg.swebench_agent_timeout openhands_cmd = ( f"timeout --signal=TERM --kill-after=30 {agent_timeout_seconds} " - f"bash /trajectories_mount/{agent_script_name}; " - f"echo 'Cleaning up...'; " - f"{cleanup_commands}" + f"bash /trajectories_mount/{agent_script_name}" ) search_path = os.path.join( - self.output_dir / "trajectories", - "**", - data_point["instance_id"], + self.openhands_setup_dir / "OpenHands" / eval_dir_in_openhands, "**", "output.jsonl", ) try: # Execute OpenHands command - out_file = await self._execute_container_command( + out_file_in_eval = await self._execute_container_command( data_point=data_point, command=openhands_cmd, expected_file_pattern=search_path, @@ -336,6 +331,12 @@ async def _run_openhands( timeout=self.cfg.swebench_agent_timeout + 60, dataset_mount_path=dataset_mount_path, ) + out_file = self._openhands_dir_copy_from_host( + data_point=data_point, + eval_dir_in_openhands=eval_dir_in_openhands, + config_file_path=config_file_path, + output_file_path=out_file_in_eval, + ) with open(out_file, "r") as f: out_dict = json.loads(f.read().strip()) @@ -353,14 +354,68 @@ async def _run_openhands( "model_name_or_path": out_dict["metadata"]["llm_config"]["model"], "instance_id": out_dict["instance_id"], "model_patch": patch + "\n" if patch and not patch.endswith("\n") else patch, + "oh_time_metrics": out_dict["metrics"], } ) ) except Exception as e: - print(f"oh run_infer.sh output parsing failed: {e}", flush=True) + self._openhands_dir_copy_from_host( + data_point=data_point, + eval_dir_in_openhands=eval_dir_in_openhands, + config_file_path=config_file_path, + output_file_path=None, + ) + print(f"Running OpenHands failed: {e}", flush=True) return None return pred_file + def _openhands_dir_copy_from_host( + self, + data_point: dict[str, Any], + eval_dir_in_openhands: str, + config_file_path: str, + output_file_path: Optional[str], + ) -> Optional[str]: + + eval_dir_on_host = Path(self.openhands_setup_dir) / "OpenHands" / eval_dir_in_openhands + trajectories_root = Path(self.output_dir) / "trajectories" / data_point["instance_id"] + llm_completions_dir = trajectories_root / "llm_completions" / data_point["instance_id"] + trajectories_root.mkdir(parents=True, exist_ok=True) + llm_completions_dir.mkdir(parents=True, exist_ok=True) + + dest_output: Optional[str] = None + if output_file_path: + source_output = Path(output_file_path) + if not source_output.is_absolute(): + source_output = eval_dir_on_host / source_output + if not source_output.exists(): + output_candidates = sorted(eval_dir_on_host.glob("*/*/*/output.jsonl"), key=os.path.getmtime) + if not output_candidates: + raise FileNotFoundError( + f"No output.jsonl found under {eval_dir_on_host} for {data_point['instance_id']}." + ) + source_output = output_candidates[-1] + + dest_output_path = trajectories_root / "output.jsonl" + shutil.copy2(source_output, dest_output_path) + dest_output = str(dest_output_path) + + completion_candidates = glob.glob(str(eval_dir_on_host / "*/*/*/llm_completions/*/*.json")) + if completion_candidates: + latest_completion = max(completion_candidates, key=os.path.getmtime) + shutil.copy2( + latest_completion, + llm_completions_dir / Path(latest_completion).name, + ) + + shutil.rmtree(eval_dir_on_host, ignore_errors=True) + try: + Path(config_file_path).unlink() + except OSError: + pass + + return dest_output + def _write_instance_dataset(self, data_point: dict[str, Any], agent_run_id: str) -> Path: """ To avoid making HF dataset API calls, we write the instance dictionary to a file and mount it in the container. @@ -594,10 +649,14 @@ async def _execute_container_command( # Launch Apptainer container and execute the command apptainer_cmd = ( - f"apptainer exec --writable-tmpfs --cleanenv --no-mount home,tmp,bind-paths " + f"apptainer exec --writable-tmpfs --cleanenv --pid --no-mount home,tmp,bind-paths " f"{mount_str} " f" {container_name} bash -c {shlex.quote(combined_command)}" ) + memory_limit_mb = self.cfg.apptainer_memory_limit_mb + if memory_limit_mb is not None and memory_limit_mb > 0: + memory_limit_kb = int(memory_limit_mb) * 1024 + apptainer_cmd = f"ulimit -v {memory_limit_kb} && {apptainer_cmd}" # Retry apptainer command up to max_retries times for attempt in range(max_retries): @@ -632,6 +691,14 @@ async def _execute_container_command( if len(pred_files) == 1: return pred_files[0] + elif len(pred_files) > 1: + latest_file = max(pred_files, key=os.path.getmtime) + print( + f"Multiple outputs found for {data_point['instance_id']} " + f"({len(pred_files)}). Using latest: {latest_file}", + flush=True, + ) + return latest_file else: raise ValueError( f"Expected exactly one file matching {expected_file_pattern} for {data_point['instance_id']}, " @@ -1018,7 +1085,7 @@ async def process_single_datapoint(self, data_point: dict[str, Any]): output_dict = { "swe-bench-metrics": report_json[data_point["instance_id"]], - "swe-bench-outputs": trajectory_dict, + "oh_time_metrics": trajectory_dict.get("oh_time_metrics", None) if trajectory_dict else {}, "generation": "", # required TODO: we should fix this "generation_time": generation_time, "evaluation_time": evaluation_time, diff --git a/responses_api_agents/swe_agents/utils.py b/responses_api_agents/swe_agents/utils.py index 376b4f153..23b362895 100644 --- a/responses_api_agents/swe_agents/utils.py +++ b/responses_api_agents/swe_agents/utils.py @@ -639,6 +639,10 @@ async def run_swebench_evaluation( r2e_gym_setup_dir: Optional[Path] = None, dataset_path: Optional[str] = None, instance_dir: Optional[str] = None, + ray_queue_time: Optional[float] = None, + ray_submit_time: Optional[float] = None, + apptainer_memory_limit_mb: Optional[int] = None, + command_exec_timeout: Optional[int] = None, ) -> Dict: # Create persistent directory for I/O and logs in local workspace workspace_root = Path(os.path.dirname(os.path.abspath(__file__))) @@ -673,6 +677,8 @@ async def run_swebench_evaluation( agent_max_turns=agent_max_turns, swebench_tests_timeout=swebench_tests_timeout, swebench_agent_timeout=swebench_agent_timeout, + apptainer_memory_limit_mb=apptainer_memory_limit_mb, + command_exec_timeout=command_exec_timeout, inference=inference_config, server=server, ) @@ -687,6 +693,8 @@ async def run_swebench_evaluation( result = await run_oh.process_single_datapoint(problem_info) print(f"Process completed for {instance_id}", flush=True) + result["oh_time_metrics"]["ray_time_in_queue"] = ray_submit_time - ray_queue_time + try: with open(output_file, "w") as f: json.dump(result, f) @@ -707,8 +715,6 @@ async def run_swebench_evaluation( agent_tools_file if agent_framework == "swe_agent" else None, ) - # tools = convert_tools_to_function_format(tools) if tools else [] - result["tools"] = tools result["trajectory"] = trajectory_data @@ -921,7 +927,7 @@ def setup_r2e_gym_environment( RuntimeError: If setup fails """ if eval_harness_repo is None: - eval_harness_repo = "https://github.com/ludwig-n/R2E-Gym.git" + eval_harness_repo = "https://github.com/sdevare-nv/nv-R2E-Gym.git" setup_dir = _resolve_setup_directory(setup_dir, "swe_r2e_gym_setup") @@ -1099,6 +1105,9 @@ def setup_openhands_environment( echo "Installing conda packages (this may take 5-10 minutes)..." mamba install -y --override-channels conda-forge::python=3.12 conda-forge::nodejs conda-forge::poetry conda-forge::tmux +# Upgrade packaging to ensure packaging.licenses is available (required by poetry) +pip install -q 'packaging>=24.2' + # Verify installations echo "Verifying package installations..." which python