Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 60 additions & 2 deletions responses_api_agents/swe_agents/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from asyncio import Semaphore
from pathlib import Path
from typing import Any, Callable, Dict, Optional

import ray
from pydantic import ConfigDict, Field

Expand All @@ -39,6 +38,8 @@
NeMoGymResponseCreateParamsNonStreaming,
NeMoGymResponseOutputMessage,
NeMoGymResponseOutputText,
NeMoGymResponseFunctionToolCall,
NeMoGymResponseOutputMessageForTraining,
)
from responses_api_agents.swe_agents.utils import (
convert_tools_to_function_format,
Expand All @@ -60,6 +61,8 @@
},
)
def runner_ray_remote(runner: Callable, params: dict[str, Any]) -> Any:
ray_submit_time = time.time()
params["ray_submit_time"] = ray_submit_time
return asyncio.run(runner(**params))


Expand Down Expand Up @@ -92,6 +95,12 @@ class SWEBenchWrapperConfig(BaseResponsesAPIAgentConfig):

swebench_agent_timeout: int = Field(default=45 * 60, description="Timeout for running the agent (seconds)")

apptainer_memory_limit_mb: int = Field(
default=32 * 1024, description="Memory limit for the apptainer container (MB)"
)

command_exec_timeout: int = Field(default=5 * 60, description="Timeout for executing the command (seconds)")

# Concurrency control
concurrency: int = Field(default=256, description="Maximum number of concurrent SWE-bench runs")

Expand Down Expand Up @@ -149,6 +158,10 @@ class SWEBenchVerifyResponse(BaseVerifyResponse):
resolved: Optional[float] = None # 1.0 if resolved, 0.0 otherwise
patch_exists: Optional[float] = None # 1.0 if patch exists, 0.0 otherwise
patch_successfully_applied: Optional[float] = None # 1.0 if patch applied, 0.0 otherwise
is_nemo_gym_in_assistant_message: Optional[float] = (
None # 1.0 if nemo-gym is in the assistant message, 0.0 otherwise
)
is_finish_tool_call: Optional[float] = None # 1.0 if finish tool call is detected, 0.0 otherwise


class SWEBenchWrapper(SimpleResponsesAPIAgent):
Expand All @@ -173,6 +186,7 @@ def model_post_init(self, __context: Any) -> None:
print("Dependencies repositories set up complete", flush=True)

self.config.run_session_id = f"{int(time.time() * 1000)}_{str(uuid.uuid4())[:8]}"
print(f"Run session ID: {self.config.run_session_id}", flush=True)

async def responses(self, body: NeMoGymResponseCreateParamsNonStreaming = Body()) -> NeMoGymResponse:
# Extract problem information from request
Expand All @@ -189,6 +203,7 @@ async def responses(self, body: NeMoGymResponseCreateParamsNonStreaming = Body()
f"{problem_info.get('instance_id', 'unknown')}_{int(time.time() * 1000)}_{str(uuid.uuid4())[:8]}"
)
try:
ray_queue_time = time.time()
params = {
"problem_info": problem_info,
"model_endpoint": model_endpoint,
Expand All @@ -207,6 +222,9 @@ async def responses(self, body: NeMoGymResponseCreateParamsNonStreaming = Body()
"r2e_gym_setup_dir": self.config.r2e_gym_setup_dir,
"dataset_path": self.config.dataset_path,
"instance_dir": instance_dir,
"ray_queue_time": ray_queue_time,
"apptainer_memory_limit_mb": self.config.apptainer_memory_limit_mb,
"command_exec_timeout": self.config.command_exec_timeout,
}

future = runner_ray_remote.remote(run_swebench_evaluation, params)
Expand Down Expand Up @@ -299,6 +317,32 @@ async def responses(self, body: NeMoGymResponseCreateParamsNonStreaming = Body()
metadata={"error": str(e)},
)

def check_finish_tool_call(self, response: NeMoGymResponse) -> bool:
if not response.output:
return False

last_message = response.output[-1]
if isinstance(last_message, NeMoGymResponseFunctionToolCall) and last_message.name == "finish":
print(f"[REWARD] Finish tool call: {last_message.name} detected", flush=True)
return True

return False

def check_nemo_gym_in_assistant_message(self, response: NeMoGymResponse) -> bool:
if not response.output:
return False

for message in response.output:
if (
isinstance(message, NeMoGymResponseOutputMessageForTraining)
and message.role == "assistant"
and ("nemogym" in message.content[0].text.lower() or "litellm" in message.content[0].text.lower())
):
print(f"[REWARD] Nemo-Gym in assistant message: {message.content[0].text}", flush=True)
return True

return False

async def run(self, body: SWEBenchRunRequest) -> SWEBenchVerifyResponse:
"""Run and verify SWE-bench solution."""
async with self.sem:
Expand Down Expand Up @@ -342,8 +386,18 @@ async def run(self, body: SWEBenchRunRequest) -> SWEBenchVerifyResponse:
# Parse metrics from JSON string if present
metrics = json.loads(metadata.get("swe-bench-metrics", "{}")) if "swe-bench-metrics" in metadata else {}

# Only consider the response resolved if the finish tool call is present and the resolved metric is True
is_finish_tool_call = self.check_finish_tool_call(response)
if is_finish_tool_call:
resolved = metrics.get("resolved") or (metadata.get("resolved") == "True")
else:
resolved = False

# TODO: remove this check after behavior fix
is_nemo_gym_in_assistant_message = self.check_nemo_gym_in_assistant_message(response)
resolved = resolved and not is_nemo_gym_in_assistant_message

# Extract individual metrics with proper type conversion
resolved = metrics.get("resolved") or (metadata.get("resolved") == "True")
patch_exists = metrics.get("patch_exists") or (metadata.get("patch_exists") == "True")
patch_applied = metrics.get("patch_successfully_applied") or (
metadata.get("patch_successfully_applied") == "True"
Expand All @@ -359,13 +413,17 @@ async def run(self, body: SWEBenchRunRequest) -> SWEBenchVerifyResponse:
resolved=1.0 if resolved else 0.0,
patch_exists=1.0 if patch_exists else 0.0,
patch_successfully_applied=1.0 if patch_applied else 0.0,
is_nemo_gym_in_assistant_message=1.0 if is_nemo_gym_in_assistant_message else 0.0,
is_finish_tool_call=1.0 if is_finish_tool_call else 0.0,
swebench_metrics=metrics,
metadata={
"instance_id": metadata.get("instance_id", "unknown"),
"agent_framework": self.config.agent_framework,
"patch_exists": patch_exists,
"patch_successfully_applied": patch_applied,
"resolved": resolved,
"is_nemo_gym_in_assistant_message": is_nemo_gym_in_assistant_message,
"is_finish_tool_call": is_finish_tool_call,
},
)

Expand Down
10 changes: 6 additions & 4 deletions responses_api_agents/swe_agents/configs/swebench_openhands.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@ swe_agents:
agent_config: responses_api_agents/swe_agents/configs/oh_config.toml
agent_max_turns: 100
agent_framework_repo: https://github.com/sdevare-nv/nv-OpenHands.git
agent_framework_commit: 7af10584eb623e6d50a616d3c3c967d7d4fb3690 # pragma: allowlist secret
agent_framework_commit: d6bd12edd828a6af88f8036583a6a8e209315eeb

# Container configuration
container_formatter: ???
container_folder_path: null
swebench_agent_timeout: 2700 # 45 minutes
swebench_tests_timeout: 1800

swebench_agent_timeout: 1800
swebench_tests_timeout: 900
apptainer_memory_limit_mb: 32768
command_exec_timeout: 300

dataset_path: ???

# Optional model server reference
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ swe_agents_train:
agent_config: responses_api_agents/swe_agents/configs/oh_config.toml
agent_max_turns: 100
agent_framework_repo: https://github.com/sdevare-nv/nv-OpenHands.git
agent_framework_commit: 7af10584eb623e6d50a616d3c3c967d7d4fb3690 # pragma: allowlist secret
agent_framework_commit: d6bd12edd828a6af88f8036583a6a8e209315eeb
# Container configuration
container_formatter: ???
container_folder_path: null
swebench_agent_timeout: 2700 # 45 minutes
swebench_tests_timeout: 900 # 15 minutes
swebench_agent_timeout: 1800
swebench_tests_timeout: 900
apptainer_memory_limit_mb: 32768
command_exec_timeout: 300
dataset_path: ???
model_server:
name: policy_model # openai_model
Expand All @@ -39,12 +41,14 @@ swe_agents_val:
agent_config: responses_api_agents/swe_agents/configs/oh_config.toml
agent_max_turns: 100
agent_framework_repo: https://github.com/sdevare-nv/nv-OpenHands.git
agent_framework_commit: 7af10584eb623e6d50a616d3c3c967d7d4fb3690 # pragma: allowlist secret
agent_framework_commit: d6bd12edd828a6af88f8036583a6a8e209315eeb
# Container configuration
container_formatter: ???
container_folder_path: null
swebench_agent_timeout: 2700 # 45 minutes
swebench_tests_timeout: 1800 # 30 minutes
swebench_agent_timeout: 1800
swebench_tests_timeout: 900
apptainer_memory_limit_mb: 32768
command_exec_timeout: 300
dataset_path: ???
# Optional model server reference
model_server:
Expand Down
99 changes: 83 additions & 16 deletions responses_api_agents/swe_agents/run_openhands.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import os
import re
import shlex
import shutil
import time
import uuid
from dataclasses import dataclass, field
Expand Down Expand Up @@ -62,6 +63,8 @@ class SweBenchGenerationConfig:
agent_max_turns: int = 100
swebench_tests_timeout: int = 30 * 60
swebench_agent_timeout: int = 45 * 60
apptainer_memory_limit_mb: int = 32 * 1024
command_exec_timeout: int = 5 * 60
inference: SweBenchInferenceConfig = field(default_factory=SweBenchInferenceConfig)
server: dict = field(default_factory=dict)

Expand Down Expand Up @@ -222,12 +225,6 @@ async def _run_openhands(
assert self.openhands_setup_dir is not None, "OpenHands setup directory is not set"

agent_script_name = f"agent_script_{agent_run_id}.sh"
cleanup_commands = (
f"cd /openhands_setup/OpenHands && "
f"mkdir -p /trajectories_mount/trajectories && "
f"cp -r {eval_dir_in_openhands}/*/*/* /trajectories_mount/trajectories/{data_point['instance_id']}/ &&"
f"rm -rf {eval_dir_in_openhands} && rm -rf {config_file_path}"
)

agent_main_cmd = (
"if [ -d /workspace ]; then "
Expand Down Expand Up @@ -264,6 +261,8 @@ async def _run_openhands(
"export POETRY_VIRTUALENVS_IN_PROJECT=true && "
"export POETRY_VIRTUALENVS_CREATE=false && "
"export POETRY_VIRTUALENVS_PATH=/openhands_setup/OpenHands && "
f"export TMUX_MEMORY_LIMIT={self.cfg.apptainer_memory_limit_mb} && "
f"export COMMAND_EXEC_TIMEOUT={self.cfg.command_exec_timeout} && "
# TODO (sugam): fix cryptography issue
# "override_dir=$(mktemp -d /tmp/cryptography_override.XXXX) && "
# # Reinstall cryptography inside the container (via poetry's venv) using a compatible wheel
Expand Down Expand Up @@ -312,22 +311,18 @@ async def _run_openhands(
agent_timeout_seconds = self.cfg.swebench_agent_timeout
openhands_cmd = (
f"timeout --signal=TERM --kill-after=30 {agent_timeout_seconds} "
f"bash /trajectories_mount/{agent_script_name}; "
f"echo 'Cleaning up...'; "
f"{cleanup_commands}"
f"bash /trajectories_mount/{agent_script_name}"
)

search_path = os.path.join(
self.output_dir / "trajectories",
"**",
data_point["instance_id"],
self.openhands_setup_dir / "OpenHands" / eval_dir_in_openhands,
"**",
"output.jsonl",
)

try:
# Execute OpenHands command
out_file = await self._execute_container_command(
out_file_in_eval = await self._execute_container_command(
data_point=data_point,
command=openhands_cmd,
expected_file_pattern=search_path,
Expand All @@ -336,6 +331,12 @@ async def _run_openhands(
timeout=self.cfg.swebench_agent_timeout + 60,
dataset_mount_path=dataset_mount_path,
)
out_file = self._openhands_dir_copy_from_host(
data_point=data_point,
eval_dir_in_openhands=eval_dir_in_openhands,
config_file_path=config_file_path,
output_file_path=out_file_in_eval,
)

with open(out_file, "r") as f:
out_dict = json.loads(f.read().strip())
Expand All @@ -353,14 +354,68 @@ async def _run_openhands(
"model_name_or_path": out_dict["metadata"]["llm_config"]["model"],
"instance_id": out_dict["instance_id"],
"model_patch": patch + "\n" if patch and not patch.endswith("\n") else patch,
"oh_time_metrics": out_dict["metrics"],
}
)
)
except Exception as e:
print(f"oh run_infer.sh output parsing failed: {e}", flush=True)
self._openhands_dir_copy_from_host(
data_point=data_point,
eval_dir_in_openhands=eval_dir_in_openhands,
config_file_path=config_file_path,
output_file_path=None,
)
print(f"Running OpenHands failed: {e}", flush=True)
return None
return pred_file

def _openhands_dir_copy_from_host(
self,
data_point: dict[str, Any],
eval_dir_in_openhands: str,
config_file_path: str,
output_file_path: Optional[str],
) -> Optional[str]:

eval_dir_on_host = Path(self.openhands_setup_dir) / "OpenHands" / eval_dir_in_openhands
trajectories_root = Path(self.output_dir) / "trajectories" / data_point["instance_id"]
llm_completions_dir = trajectories_root / "llm_completions" / data_point["instance_id"]
trajectories_root.mkdir(parents=True, exist_ok=True)
llm_completions_dir.mkdir(parents=True, exist_ok=True)

dest_output: Optional[str] = None
if output_file_path:
source_output = Path(output_file_path)
if not source_output.is_absolute():
source_output = eval_dir_on_host / source_output
if not source_output.exists():
output_candidates = sorted(eval_dir_on_host.glob("*/*/*/output.jsonl"), key=os.path.getmtime)
if not output_candidates:
raise FileNotFoundError(
f"No output.jsonl found under {eval_dir_on_host} for {data_point['instance_id']}."
)
source_output = output_candidates[-1]

dest_output_path = trajectories_root / "output.jsonl"
shutil.copy2(source_output, dest_output_path)
dest_output = str(dest_output_path)

completion_candidates = glob.glob(str(eval_dir_on_host / "*/*/*/llm_completions/*/*.json"))
if completion_candidates:
latest_completion = max(completion_candidates, key=os.path.getmtime)
shutil.copy2(
latest_completion,
llm_completions_dir / Path(latest_completion).name,
)

shutil.rmtree(eval_dir_on_host, ignore_errors=True)
try:
Path(config_file_path).unlink()
except OSError:
pass

return dest_output

def _write_instance_dataset(self, data_point: dict[str, Any], agent_run_id: str) -> Path:
"""
To avoid making HF dataset API calls, we write the instance dictionary to a file and mount it in the container.
Expand Down Expand Up @@ -594,10 +649,14 @@ async def _execute_container_command(

# Launch Apptainer container and execute the command
apptainer_cmd = (
f"apptainer exec --writable-tmpfs --cleanenv --no-mount home,tmp,bind-paths "
f"apptainer exec --writable-tmpfs --cleanenv --pid --no-mount home,tmp,bind-paths "
f"{mount_str} "
f" {container_name} bash -c {shlex.quote(combined_command)}"
)
memory_limit_mb = self.cfg.apptainer_memory_limit_mb
if memory_limit_mb is not None and memory_limit_mb > 0:
memory_limit_kb = int(memory_limit_mb) * 1024
apptainer_cmd = f"ulimit -v {memory_limit_kb} && {apptainer_cmd}"

# Retry apptainer command up to max_retries times
for attempt in range(max_retries):
Expand Down Expand Up @@ -632,6 +691,14 @@ async def _execute_container_command(

if len(pred_files) == 1:
return pred_files[0]
elif len(pred_files) > 1:
latest_file = max(pred_files, key=os.path.getmtime)
print(
f"Multiple outputs found for {data_point['instance_id']} "
f"({len(pred_files)}). Using latest: {latest_file}",
flush=True,
)
return latest_file
else:
raise ValueError(
f"Expected exactly one file matching {expected_file_pattern} for {data_point['instance_id']}, "
Expand Down Expand Up @@ -1018,7 +1085,7 @@ async def process_single_datapoint(self, data_point: dict[str, Any]):

output_dict = {
"swe-bench-metrics": report_json[data_point["instance_id"]],
"swe-bench-outputs": trajectory_dict,
"oh_time_metrics": trajectory_dict.get("oh_time_metrics", None) if trajectory_dict else {},
"generation": "", # required TODO: we should fix this
"generation_time": generation_time,
"evaluation_time": evaluation_time,
Expand Down
Loading
Loading