Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
a7fa35e
add profiling deps; add profiling to run infer
bxyu-nvidia Jan 23, 2026
73eef96
stop profiler
bxyu-nvidia Jan 23, 2026
a9ce567
update poetry lock
bxyu-nvidia Jan 23, 2026
b4a2858
try prints
bxyu-nvidia Jan 24, 2026
6d72797
use path
bxyu-nvidia Jan 24, 2026
65c8b2f
remove dot and things afterwards
bxyu-nvidia Jan 24, 2026
ff41925
try no logging
bxyu-nvidia Jan 24, 2026
c320c26
change to asyncio
bxyu-nvidia Jan 24, 2026
9ce7688
revert
bxyu-nvidia Jan 25, 2026
4984bfb
break on poll too
bxyu-nvidia Jan 25, 2026
77eb0fb
try no start
bxyu-nvidia Jan 25, 2026
0224097
try more log thread usages
bxyu-nvidia Jan 25, 2026
d088e7b
revert
bxyu-nvidia Jan 25, 2026
c05b511
try no info logging
bxyu-nvidia Jan 25, 2026
af5bea6
Merge branch 'sdd/profiling' of https://github.com/sdevare-nv/nv-Open…
bxyu-nvidia Jan 26, 2026
e9d24f5
try add gym dep
bxyu-nvidia Jan 26, 2026
03bc2b4
fork on openhands should log
bxyu-nvidia Jan 26, 2026
95ea000
gate more actions
bxyu-nvidia Jan 26, 2026
e2e7ceb
use gym client
bxyu-nvidia Jan 26, 2026
494c79b
idk
bxyu-nvidia Jan 26, 2026
a2f77a5
use logger error
bxyu-nvidia Jan 28, 2026
4fc1379
add tags
bxyu-nvidia Jan 28, 2026
7fb6f02
fix missing comma
bxyu-nvidia Jan 28, 2026
a3cd67a
fix await
bxyu-nvidia Jan 28, 2026
3e0ff27
fix print
bxyu-nvidia Jan 28, 2026
e855e9d
use server name
bxyu-nvidia Jan 28, 2026
6d51a9f
fix
bxyu-nvidia Jan 28, 2026
42f52bc
try model dump
bxyu-nvidia Jan 28, 2026
a9163eb
directly import model repsonse
bxyu-nvidia Jan 28, 2026
d88bd6f
clean
bxyu-nvidia Jan 28, 2026
b6fd1f8
try save traj as well
bxyu-nvidia Jan 28, 2026
885280d
dump kwargs
bxyu-nvidia Jan 28, 2026
600ea6f
jlog log file
bxyu-nvidia Jan 28, 2026
68fe530
try fix dump
bxyu-nvidia Jan 28, 2026
f3ea896
dump the raw messages
bxyu-nvidia Jan 28, 2026
0eaf7d0
clean
bxyu-nvidia Jan 28, 2026
5358694
log message dicts
bxyu-nvidia Jan 28, 2026
a8b07a4
try save provider specific fields
bxyu-nvidia Jan 29, 2026
759c98e
clean
bxyu-nvidia Jan 29, 2026
aada3a8
Merge branch 'sdd/profiling' of https://github.com/sdevare-nv/nv-Open…
bxyu-nvidia Feb 4, 2026
e552632
log times to metrics
bxyu-nvidia Feb 8, 2026
8c79745
fix bug
bxyu-nvidia Feb 8, 2026
b710c43
exec time
bxyu-nvidia Feb 8, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 94 additions & 1 deletion evaluation/benchmarks/swe_bench/run_infer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import json
import os
import tempfile
from typing import Any, Literal, Optional
from typing import Any, Dict, Literal, Optional
import time
import pandas as pd
import toml
Expand Down Expand Up @@ -78,6 +78,85 @@

MAX_RETRIES = 3

########################################
# START Custom profiling code
########################################

from io import StringIO
from pathlib import Path
from typing import Optional

import yappi


class Profiler:
def __init__(self, name: str, base_profile_dir: Path) -> None:
self.name = name
self.base_profile_dir = base_profile_dir

self.required_str = None

def start(self) -> None:
yappi.set_clock_type("CPU")
yappi.start()
print(f"🔍 Enabled profiling for {self.name}")

def stop(self) -> None:
print(f"🛑 Stopping profiler for {self.name}. Check {self.base_profile_dir} for the metrics!")
yappi.stop()
self.dump()

def dump(self) -> None:
self.base_profile_dir.mkdir(parents=True, exist_ok=True)
log_path = self.base_profile_dir / f"{self.name}.log"
callgrind_path = self.base_profile_dir / f"{self.name}.callgrind"

yappi.get_func_stats().save(callgrind_path, type="CALLGRIND")

buffer = StringIO()
yappi.get_func_stats().print_all(
out=buffer,
columns={
0: ("name", 200),
1: ("ncall", 10),
2: ("tsub", 8),
3: ("ttot", 8),
4: ("tavg", 8),
},
)

buffer.seek(0)
res = ""
past_header = False
for line in buffer:
if not past_header or (self.required_str and self.required_str in line):
res += line

if line.startswith("name"):
past_header = True

with open(log_path, "w") as f:
f.write(res)


def update_metrics(update_dict: Dict[str, Any]) -> None:
import os, json

metrics_fpath = os.environ["NEMO_GYM_METRICS_FPATH"]
with open(metrics_fpath) as f:
existing_dict = json.loads(f.read())

existing_dict = {k: v for k, v in existing_dict.items() if v is not None}
update_dict = {k: v for k, v in update_dict.items() if v is not None}

with open(metrics_fpath, "w") as f:
json.dump(existing_dict | update_dict, f)


########################################
# END Custom profiling code
########################################


def set_dataset_type(dataset_name: str) -> str:
"""Set dataset type based on dataset name."""
Expand Down Expand Up @@ -733,16 +812,19 @@ def process_instance(
start_time = time.perf_counter()
runtime = create_runtime(config)
end_time = time.perf_counter()
update_metrics({"create_runtime_time": end_time - start_time})
print(f"create runtime: {end_time - start_time} seconds", flush = True)
start_time = time.perf_counter()
call_async_from_sync(runtime.connect)
end_time = time.perf_counter()
update_metrics({"connect_to_runtime_time": end_time - start_time})
print(f"connect to runtime: {end_time - start_time} seconds", flush = True)

try:
start_time = time.perf_counter()
initialize_runtime(runtime, instance, metadata)
end_time = time.perf_counter()
update_metrics({"initialize_runtime_time": end_time - start_time})
print(f"init runtime: {end_time - start_time} seconds", flush = True)
message_action = get_instruction(instance, metadata)

Expand Down Expand Up @@ -902,6 +984,14 @@ def filter_dataset(

args, _ = parser.parse_known_args()

maybe_base_profile_dir = os.environ.get("NG_PROFILING_DIR")
should_profile = maybe_base_profile_dir is not None
if should_profile:
profiler = Profiler(
name="openhands", base_profile_dir=Path(maybe_base_profile_dir)
)
profiler.start()

# Validate nv-internal-1 requires instance_dict_path
if 'nv-internal-1' in args.dataset.lower():
if not args.instance_dict_path or not args.selected_id:
Expand Down Expand Up @@ -1161,3 +1251,6 @@ def get_cur_output_file_path(attempt: int) -> str:
)
# Check if any instances reached maximum retries
check_maximum_retries_exceeded(metadata.eval_output_dir)

if should_profile:
profiler.stop()
119 changes: 113 additions & 6 deletions openhands/agenthub/codeact_agent/codeact_agent.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import os
import sys
import time
import json
import tempfile
from collections import deque
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any

from openhands.llm.llm_registry import LLMRegistry

if TYPE_CHECKING:
from litellm import ChatCompletionToolParam

from openhands.events.action import Action
from openhands.llm.llm import ModelResponse

from openhands.llm.llm import ModelResponse
import openhands.agenthub.codeact_agent.function_calling as codeact_function_calling
from openhands.agenthub.codeact_agent.tools.bash import create_cmd_run_tool
from openhands.agenthub.codeact_agent.tools.browser import BrowserTool
Expand Down Expand Up @@ -90,6 +93,14 @@ def __init__(self, config: AgentConfig, llm_registry: LLMRegistry) -> None:
# Override with router if needed
self.llm = self.llm_registry.get_router(self.config)

from nemo_gym.server_utils import ServerClient
from nemo_gym.global_config import get_global_config_dict
self.ng_server_client = ServerClient(
head_server_config=ServerClient.load_head_server_config(),
global_config_dict=get_global_config_dict(),
)
self.model_server_cookies = None

@property
def prompt_manager(self) -> PromptManager:
if self._prompt_manager is None:
Expand Down Expand Up @@ -153,7 +164,7 @@ def reset(self) -> None:
# Only clear pending actions, not LLM metrics
self.pending_actions.clear()

def step(self, state: State) -> 'Action':
async def step(self, state: State) -> 'Action':
"""Performs one step using the CodeAct Agent.

This includes gathering info on previous steps and prompting the model to make a command to execute.
Expand Down Expand Up @@ -211,14 +222,110 @@ def step(self, state: State) -> 'Action':
model_name=self.llm.config.model, agent_name=self.name
)
}
response = self.llm.completion(**params)
logger.debug(f'Response from LLM: {response}')

# Original code:
# response = self.llm.completion(**params)

start_time = time.time()
response = await self._nemo_gym_model_call(messages, params['tools'])
self.update_model_call_time(start_time)

ng_openhands_should_log = os.environ.get("NG_OPENHANDS_SHOULD_LOG", "").lower() == "true"
if ng_openhands_should_log:
logger.debug(f'Response from LLM: {response}')

actions = self.response_to_actions(response)
logger.debug(f'Actions after response_to_actions: {actions}')

if ng_openhands_should_log:
logger.debug(f'Actions after response_to_actions: {actions}')

for action in actions:
self.pending_actions.append(action)
return self.pending_actions.popleft()

def update_model_call_time(self, start_time: float) -> None:
import os, json

metrics_fpath = os.environ["NEMO_GYM_METRICS_FPATH"]
with open(metrics_fpath) as f:
existing_dict = json.loads(f.read())

model_call_time_taken = existing_dict.get("total_model_call_time", 0.0)
existing_dict["total_model_call_time"] = model_call_time_taken + time.time() - start_time

with open(metrics_fpath, "w") as f:
json.dump(existing_dict, f)

async def _nemo_gym_model_call(self, messages: list[Message], tools: list['ChatCompletionToolParam']) -> ModelResponse:
message_dicts = [m.model_dump() for m in messages]
params ={
"messages": message_dicts,
"tools": tools,
**self.llm._nemo_gym_llm_kwargs,
}

# Remove prompt_token_ids, generation_token_ids, and generation_log_probs from all messages except the last
fields_to_remove = ["prompt_token_ids", "generation_token_ids", "generation_log_probs"]
last_occurrence_idx_seen = False
for message in reversed(message_dicts):
if last_occurrence_idx_seen:
for field in fields_to_remove:
if field in message:
del message[field]
elif all(field in message for field in fields_to_remove):
last_occurrence_idx_seen = True

from nemo_gym.server_utils import get_response_json, raise_for_status

model_response = await self.ng_server_client.post(
server_name=os.getenv("NEMO_GYM_MODEL_SERVER_NAME"),
url_path="/v1/chat/completions",
json=params,
cookies=self.model_server_cookies,
)
# We raise for status here since we expect model calls to always work.
await raise_for_status(model_response)
model_response_json = await get_response_json(model_response)
self.model_server_cookies = model_response.cookies

response: ModelResponse = ModelResponse.model_validate(model_response_json)

response_message_dict = model_response_json["choices"][0]["message"]
provider_specific_fields = dict()
if response_message_dict.get("prompt_token_ids"):
provider_specific_fields = {
"prompt_token_ids": response_message_dict["prompt_token_ids"],
"generation_token_ids": response_message_dict["generation_token_ids"],
"generation_log_probs": response_message_dict["generation_log_probs"],
}
response._provider_specific_fields = provider_specific_fields

# Save the llm completion. See the original code in openhands/llm/llm.py
log_file = os.path.join(
self.llm.config.log_completions_folder,
f'{self.llm.config.model.replace("/", "__")}-{time.time()}.json',
)
_d = {
'messages': [m.model_dump() for m in messages],
'response': model_response_json,
'provider_specific_fields': provider_specific_fields,
# 'args': args,
'kwargs': {
k: v
for k, v in params.items()
if k not in ('messages', 'client')
},
'timestamp': time.time(),
# 'cost': cost,
}

temp_fd, temp_path = tempfile.mkstemp(dir=os.path.dirname(log_file))
with os.fdopen(temp_fd, 'w') as f:
f.write(json.dumps(_d))
os.replace(temp_path, log_file)

return response

def _get_initial_user_message(self, history: list[Event]) -> MessageAction:
"""Finds the initial user message action from the full history."""
initial_user_message: MessageAction | None = None
Expand Down
2 changes: 1 addition & 1 deletion openhands/controller/agent_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,7 @@ async def _step(self) -> None:
action = self._replay_manager.step()
else:
try:
action = self.agent.step(self.state)
action = await self.agent.step(self.state)
if action is None:
raise LLMNoActionError('No action was returned')
action._source = EventSource.AGENT # type: ignore [attr-defined]
Expand Down
6 changes: 6 additions & 0 deletions openhands/llm/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,12 @@ def __init__(
cookie_value = raw_cookie.split(";")[0].strip()
extra_headers["Cookie"] = cookie_value

# NeMo Gym: Grab params here for NeMo Gym client use downstream.
self._nemo_gym_llm_kwargs = kwargs | dict(
model=self.config.model,
seed=self.config.seed,
)

self._completion = partial(
litellm_completion,
model=self.config.model,
Expand Down
14 changes: 14 additions & 0 deletions openhands/runtime/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,22 @@ def add_env_vars(self, env_vars: dict[str, str]) -> None:

def on_event(self, event: Event) -> None:
if isinstance(event, Action):
start_time = time.time()

asyncio.get_event_loop().run_until_complete(self._handle_action(event))

import os, json

metrics_fpath = os.environ["NEMO_GYM_METRICS_FPATH"]
with open(metrics_fpath) as f:
existing_dict = json.loads(f.read())

exec_time_taken = existing_dict.get("total_command_exec_time", 0.0)
existing_dict["total_command_exec_time"] = exec_time_taken + time.time() - start_time

with open(metrics_fpath, "w") as f:
json.dump(existing_dict, f)

async def _export_latest_git_provider_tokens(self, event: Action) -> None:
"""Refresh runtime provider tokens when agent attemps to run action with provider token"""
providers_called = ProviderHandler.check_cmd_action_for_provider_token_ref(
Expand Down
13 changes: 11 additions & 2 deletions openhands/runtime/impl/local/local_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,11 @@ def _create_server(
# Prepend the interpreter's bin directory to PATH for subprocesses
env['PATH'] = f'{_python_bin_path()}{os.pathsep}{env.get("PATH", "")}'

# NG Openhands should log propagation
ng_openhands_should_log = os.environ.get("NG_OPENHANDS_SHOULD_LOG", "").lower() == "true"
if ng_openhands_should_log:
env['NG_OPENHANDS_SHOULD_LOG'] = os.environ["NG_OPENHANDS_SHOULD_LOG"]

logger.debug(f'Updated PATH for subprocesses: {env["PATH"]}')

server_process = subprocess.Popen( # noqa: S603
Expand All @@ -697,6 +702,7 @@ def _create_server(

log_thread_exit_event = threading.Event()


# Start a thread to read and log server output
def log_output() -> None:
if not server_process or not server_process.stdout:
Expand All @@ -712,15 +718,18 @@ def log_output() -> None:
line = server_process.stdout.readline()
if not line:
break
logger.info(f'server: {line.strip()}')

if ng_openhands_should_log:
logger.info(f'server: {line.strip()}')

# Capture any remaining output
if not log_thread_exit_event.is_set():
logger.info('server process exited, reading remaining output.')
for line in server_process.stdout:
if log_thread_exit_event.is_set():
break
logger.info(f'server (remaining): {line.strip()}')
if ng_openhands_should_log:
logger.info(f'server (remaining): {line.strip()}')

except Exception as e:
logger.error(f'Error reading server output: {e}')
Expand Down
Loading
Loading