Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 106 additions & 12 deletions resources_servers/ns_tools/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ class NSToolsConfig(BaseResourcesServerConfig):
# python_tool HTTP server port (spawned automatically)
python_tool_port: int = 8765

# Verbose logging for tool execution timing (disabled by default)
verbose_tool_logging: bool = False

# When True, skip replaying session history after sandbox worker restarts.
# The model receives a warning in stderr instead of restored state.
disable_session_restore: bool = False


# ============================================================
# Run/Verify Request/Response Models
Expand Down Expand Up @@ -106,6 +113,13 @@ class NSToolsVerifyResponse(BaseVerifyResponse):

delegated_response: Optional[Dict[str, Any]] = None

# Timing metrics for tool execution
total_tool_execution_time_seconds: float = 0.0
num_tool_calls: int = 0
avg_tool_call_time_seconds: float = 0.0
tool_timeout_count: int = 0 # Internal sandbox timeouts (process_status == "timeout")
tool_request_timeout_count: int = 0 # HTTP/request-level timeouts


# ============================================================
# Resources Server Implementation
Expand All @@ -117,6 +131,7 @@ class NSToolsResourcesServer(SimpleResourcesServer):
tool_manager: Optional[Any] = None
_tool_name_map: Dict[str, str] = {} # Maps tool names to qualified names
_python_tool_process: Optional[subprocess.Popen] = None
_timing_by_session: Dict[str, list] = {} # session_id -> list of timing records

def setup_webserver(self) -> FastAPI:
app = super().setup_webserver()
Expand Down Expand Up @@ -151,6 +166,8 @@ def _start_python_tool_server(self):
"--sandbox-port",
str(self.config.sandbox_port),
]
if self.config.disable_session_restore:
cmd.append("--disable-session-restore")
logger.info(f"python_tool command: {' '.join(cmd)}")

# Don't pipe stdout/stderr so we can see output directly in logs
Expand Down Expand Up @@ -248,6 +265,7 @@ async def execute_tool(self, tool_name: str, request: Request) -> PlainTextRespo

Uses the nemo-gym session ID as the request_id for stateful tools.
Returns the result as plain text for simple_agent compatibility.
Tracks execution timing and timeout detection per session.
"""
if not self.tool_manager:
return PlainTextResponse(json.dumps({"error": "No tools configured"}))
Expand All @@ -257,43 +275,118 @@ async def execute_tool(self, tool_name: str, request: Request) -> PlainTextRespo
logger.error(f"Unknown tool requested: {tool_name}")
return PlainTextResponse(json.dumps({"error": f"Unknown tool: {tool_name}"}))

# Get session ID for stateful execution
session_id = request.session.get(SESSION_ID_KEY)
if not session_id:
session_id = str(uuid.uuid4())
logger.warning(f"No session ID found, using fallback: {session_id}")

if self.config.verbose_tool_logging:
if session_id not in self._timing_by_session:
self._timing_by_session[session_id] = []

start_time = time.perf_counter()
is_internal_timeout = False
is_request_timeout = False
result = None

try:
body = await request.json()

# Get session ID for stateful execution
session_id = request.session.get(SESSION_ID_KEY)
if not session_id:
session_id = str(uuid.uuid4())
logger.warning(f"No session ID found, using fallback: {session_id}")

# Execute the tool
result = await self.tool_manager.execute_tool(
raw_name=tool_name,
args=body,
extra_args={"request_id": session_id},
)

# Return result as plain text to avoid double JSON serialization
if isinstance(result, str):
return PlainTextResponse(result)
return PlainTextResponse(json.dumps(result))
# Check for internal sandbox timeout (process_status == "timeout")
try:
if isinstance(result, str):
result_dict = json.loads(result)
elif isinstance(result, dict):
result_dict = result
else:
result_dict = {}
is_internal_timeout = result_dict.get("process_status") == "timeout"
except (json.JSONDecodeError, TypeError, AttributeError):
pass

except (httpx.TimeoutException, TimeoutError) as e:
is_request_timeout = True
logger.warning(f"Request timeout executing tool {tool_name}: {e}")
result = {"error": "Request timeout", "process_status": "timeout"}

except Exception as e:
logger.exception(f"Error executing tool {tool_name}: {e}")
return PlainTextResponse(json.dumps({"error": str(e)}))
result = {"error": str(e)}

if self.config.verbose_tool_logging:
elapsed = time.perf_counter() - start_time
self._timing_by_session[session_id].append(
{
"tool_name": tool_name,
"execution_time_seconds": elapsed,
"is_internal_timeout": is_internal_timeout,
"is_request_timeout": is_request_timeout,
}
)
timeout_info = ""
if is_internal_timeout:
timeout_info = " [INTERNAL_TIMEOUT]"
elif is_request_timeout:
timeout_info = " [REQUEST_TIMEOUT]"
logger.info(f"Tool '{tool_name}' executed in {elapsed:.3f}s{timeout_info} (session={session_id[:8]}...)")

# Return result as plain text to avoid double JSON serialization
if isinstance(result, str):
return PlainTextResponse(result)
return PlainTextResponse(json.dumps(result))

# --------------------------------------------------------
# Verification
# --------------------------------------------------------

async def verify(self, body: NSToolsVerifyRequest) -> NSToolsVerifyResponse:
def _aggregate_timing_metrics(self, session_id: Optional[str]) -> Dict[str, Any]:
"""Aggregate tool execution timing metrics for a session."""
tool_timings = self._timing_by_session.pop(session_id, []) if session_id else []

total_tool_time = sum(t["execution_time_seconds"] for t in tool_timings)
num_tool_calls = len(tool_timings)
avg_tool_time = total_tool_time / num_tool_calls if num_tool_calls > 0 else 0.0
tool_timeout_count = sum(1 for t in tool_timings if t.get("is_internal_timeout"))
tool_request_timeout_count = sum(1 for t in tool_timings if t.get("is_request_timeout"))

return {
"total_tool_execution_time_seconds": total_tool_time,
"num_tool_calls": num_tool_calls,
"avg_tool_call_time_seconds": avg_tool_time,
"tool_timeout_count": tool_timeout_count,
"tool_request_timeout_count": tool_request_timeout_count,
}

async def verify(self, request: Request, body: NSToolsVerifyRequest) -> NSToolsVerifyResponse:
"""
Verify the model's response by delegating to the configured verifier.

The verifier is selected by:
1. Per-sample `verifier_type` field (if present)
2. Config `default_verifier` (fallback)

Also aggregates and returns tool execution timing metrics for this session
when verbose_tool_logging is enabled.
"""
metrics = {}
if self.config.verbose_tool_logging:
session_id = request.session.get(SESSION_ID_KEY)
metrics = self._aggregate_timing_metrics(session_id)
logger.info(
f"Session {session_id[:8] if session_id else 'unknown'}... metrics: "
f"{metrics['num_tool_calls']} tool calls, total={metrics['total_tool_execution_time_seconds']:.3f}s, "
f"avg={metrics['avg_tool_call_time_seconds']:.3f}s, "
f"internal_timeouts={metrics['tool_timeout_count']}, request_timeouts={metrics['tool_request_timeout_count']}"
)

# Select verifier
verifier_type = body.verifier_type or self.config.default_verifier

Expand Down Expand Up @@ -321,6 +414,7 @@ async def verify(self, body: NSToolsVerifyRequest) -> NSToolsVerifyResponse:
**body.model_dump(),
reward=result["reward"],
delegated_response=result,
**metrics,
)

# --------------------------------------------------------
Expand Down
7 changes: 5 additions & 2 deletions resources_servers/ns_tools/configs/ns_tools.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@ ns_tools:
# Per-tool configuration overrides
nemo_skills_tool_overrides:
PythonTool:
exec_timeout_s: 30
exec_timeout_s: 10

# Sandbox configuration
# Respects NEMO_SKILLS_SANDBOX_HOST/PORT env vars if set, else defaults
sandbox_host: ${oc.env:NEMO_SKILLS_SANDBOX_HOST,127.0.0.1}
sandbox_port: ${oc.env:NEMO_SKILLS_SANDBOX_PORT,6000}

# Disable session replay after sandbox worker restarts (improves stability)
disable_session_restore: true

domain: agent
verified: false
Expand Down
2 changes: 1 addition & 1 deletion resources_servers/ns_tools/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
-e nemo-gym[dev] @ ../../
nemo-skills @ git+https://github.com/NVIDIA-NeMo/Skills.git@georgea/super-rl-01212026
nemo-skills @ git+https://github.com/NVIDIA-NeMo/Skills.git@georgea/super-rl-02062026
6 changes: 3 additions & 3 deletions resources_servers/ns_tools/tests/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ async def test_verify_delegates_to_math_with_judge(self) -> None:
expected_answer="4",
)

result = await server.verify(verify_request)
result = await server.verify(None, verify_request)

assert result.reward == 1.0
assert result.delegated_response is not None
Expand Down Expand Up @@ -179,7 +179,7 @@ async def test_verify_uses_default_verifier(self) -> None:
expected_answer="4",
)

result = await server.verify(verify_request)
result = await server.verify(None, verify_request)

assert result.reward == 0.0
call_args = server.server_client.post.call_args
Expand Down Expand Up @@ -232,7 +232,7 @@ async def test_verify_passes_through_fields(self) -> None:
expected_answer="4",
)

await server.verify(verify_request)
await server.verify(None, verify_request)

call_args = server.server_client.post.call_args
json_data = call_args.kwargs["json"]
Expand Down