diff --git a/resources_servers/ns_tools/app.py b/resources_servers/ns_tools/app.py index 6cbd035a4..41aa81fe0 100644 --- a/resources_servers/ns_tools/app.py +++ b/resources_servers/ns_tools/app.py @@ -78,6 +78,13 @@ class NSToolsConfig(BaseResourcesServerConfig): # python_tool HTTP server port (spawned automatically) python_tool_port: int = 8765 + # Verbose logging for tool execution timing (disabled by default) + verbose_tool_logging: bool = False + + # When True, skip replaying session history after sandbox worker restarts. + # The model receives a warning in stderr instead of restored state. + disable_session_restore: bool = False + # ============================================================ # Run/Verify Request/Response Models @@ -106,6 +113,13 @@ class NSToolsVerifyResponse(BaseVerifyResponse): delegated_response: Optional[Dict[str, Any]] = None + # Timing metrics for tool execution + total_tool_execution_time_seconds: float = 0.0 + num_tool_calls: int = 0 + avg_tool_call_time_seconds: float = 0.0 + tool_timeout_count: int = 0 # Internal sandbox timeouts (process_status == "timeout") + tool_request_timeout_count: int = 0 # HTTP/request-level timeouts + # ============================================================ # Resources Server Implementation @@ -117,6 +131,7 @@ class NSToolsResourcesServer(SimpleResourcesServer): tool_manager: Optional[Any] = None _tool_name_map: Dict[str, str] = {} # Maps tool names to qualified names _python_tool_process: Optional[subprocess.Popen] = None + _timing_by_session: Dict[str, list] = {} # session_id -> list of timing records def setup_webserver(self) -> FastAPI: app = super().setup_webserver() @@ -151,6 +166,8 @@ def _start_python_tool_server(self): "--sandbox-port", str(self.config.sandbox_port), ] + if self.config.disable_session_restore: + cmd.append("--disable-session-restore") logger.info(f"python_tool command: {' '.join(cmd)}") # Don't pipe stdout/stderr so we can see output directly in logs @@ -248,6 +265,7 @@ async def execute_tool(self, tool_name: str, request: Request) -> PlainTextRespo Uses the nemo-gym session ID as the request_id for stateful tools. Returns the result as plain text for simple_agent compatibility. + Tracks execution timing and timeout detection per session. """ if not self.tool_manager: return PlainTextResponse(json.dumps({"error": "No tools configured"})) @@ -257,15 +275,24 @@ async def execute_tool(self, tool_name: str, request: Request) -> PlainTextRespo logger.error(f"Unknown tool requested: {tool_name}") return PlainTextResponse(json.dumps({"error": f"Unknown tool: {tool_name}"})) + # Get session ID for stateful execution + session_id = request.session.get(SESSION_ID_KEY) + if not session_id: + session_id = str(uuid.uuid4()) + logger.warning(f"No session ID found, using fallback: {session_id}") + + if self.config.verbose_tool_logging: + if session_id not in self._timing_by_session: + self._timing_by_session[session_id] = [] + + start_time = time.perf_counter() + is_internal_timeout = False + is_request_timeout = False + result = None + try: body = await request.json() - # Get session ID for stateful execution - session_id = request.session.get(SESSION_ID_KEY) - if not session_id: - session_id = str(uuid.uuid4()) - logger.warning(f"No session ID found, using fallback: {session_id}") - # Execute the tool result = await self.tool_manager.execute_tool( raw_name=tool_name, @@ -273,27 +300,93 @@ async def execute_tool(self, tool_name: str, request: Request) -> PlainTextRespo extra_args={"request_id": session_id}, ) - # Return result as plain text to avoid double JSON serialization - if isinstance(result, str): - return PlainTextResponse(result) - return PlainTextResponse(json.dumps(result)) + # Check for internal sandbox timeout (process_status == "timeout") + try: + if isinstance(result, str): + result_dict = json.loads(result) + elif isinstance(result, dict): + result_dict = result + else: + result_dict = {} + is_internal_timeout = result_dict.get("process_status") == "timeout" + except (json.JSONDecodeError, TypeError, AttributeError): + pass + + except (httpx.TimeoutException, TimeoutError) as e: + is_request_timeout = True + logger.warning(f"Request timeout executing tool {tool_name}: {e}") + result = {"error": "Request timeout", "process_status": "timeout"} except Exception as e: logger.exception(f"Error executing tool {tool_name}: {e}") - return PlainTextResponse(json.dumps({"error": str(e)})) + result = {"error": str(e)} + + if self.config.verbose_tool_logging: + elapsed = time.perf_counter() - start_time + self._timing_by_session[session_id].append( + { + "tool_name": tool_name, + "execution_time_seconds": elapsed, + "is_internal_timeout": is_internal_timeout, + "is_request_timeout": is_request_timeout, + } + ) + timeout_info = "" + if is_internal_timeout: + timeout_info = " [INTERNAL_TIMEOUT]" + elif is_request_timeout: + timeout_info = " [REQUEST_TIMEOUT]" + logger.info(f"Tool '{tool_name}' executed in {elapsed:.3f}s{timeout_info} (session={session_id[:8]}...)") + + # Return result as plain text to avoid double JSON serialization + if isinstance(result, str): + return PlainTextResponse(result) + return PlainTextResponse(json.dumps(result)) # -------------------------------------------------------- # Verification # -------------------------------------------------------- - async def verify(self, body: NSToolsVerifyRequest) -> NSToolsVerifyResponse: + def _aggregate_timing_metrics(self, session_id: Optional[str]) -> Dict[str, Any]: + """Aggregate tool execution timing metrics for a session.""" + tool_timings = self._timing_by_session.pop(session_id, []) if session_id else [] + + total_tool_time = sum(t["execution_time_seconds"] for t in tool_timings) + num_tool_calls = len(tool_timings) + avg_tool_time = total_tool_time / num_tool_calls if num_tool_calls > 0 else 0.0 + tool_timeout_count = sum(1 for t in tool_timings if t.get("is_internal_timeout")) + tool_request_timeout_count = sum(1 for t in tool_timings if t.get("is_request_timeout")) + + return { + "total_tool_execution_time_seconds": total_tool_time, + "num_tool_calls": num_tool_calls, + "avg_tool_call_time_seconds": avg_tool_time, + "tool_timeout_count": tool_timeout_count, + "tool_request_timeout_count": tool_request_timeout_count, + } + + async def verify(self, request: Request, body: NSToolsVerifyRequest) -> NSToolsVerifyResponse: """ Verify the model's response by delegating to the configured verifier. The verifier is selected by: 1. Per-sample `verifier_type` field (if present) 2. Config `default_verifier` (fallback) + + Also aggregates and returns tool execution timing metrics for this session + when verbose_tool_logging is enabled. """ + metrics = {} + if self.config.verbose_tool_logging: + session_id = request.session.get(SESSION_ID_KEY) + metrics = self._aggregate_timing_metrics(session_id) + logger.info( + f"Session {session_id[:8] if session_id else 'unknown'}... metrics: " + f"{metrics['num_tool_calls']} tool calls, total={metrics['total_tool_execution_time_seconds']:.3f}s, " + f"avg={metrics['avg_tool_call_time_seconds']:.3f}s, " + f"internal_timeouts={metrics['tool_timeout_count']}, request_timeouts={metrics['tool_request_timeout_count']}" + ) + # Select verifier verifier_type = body.verifier_type or self.config.default_verifier @@ -321,6 +414,7 @@ async def verify(self, body: NSToolsVerifyRequest) -> NSToolsVerifyResponse: **body.model_dump(), reward=result["reward"], delegated_response=result, + **metrics, ) # -------------------------------------------------------- diff --git a/resources_servers/ns_tools/configs/ns_tools.yaml b/resources_servers/ns_tools/configs/ns_tools.yaml index 7b7761f2f..b68d31b73 100644 --- a/resources_servers/ns_tools/configs/ns_tools.yaml +++ b/resources_servers/ns_tools/configs/ns_tools.yaml @@ -30,12 +30,15 @@ ns_tools: # Per-tool configuration overrides nemo_skills_tool_overrides: PythonTool: - exec_timeout_s: 30 - + exec_timeout_s: 10 + # Sandbox configuration # Respects NEMO_SKILLS_SANDBOX_HOST/PORT env vars if set, else defaults sandbox_host: ${oc.env:NEMO_SKILLS_SANDBOX_HOST,127.0.0.1} sandbox_port: ${oc.env:NEMO_SKILLS_SANDBOX_PORT,6000} + + # Disable session replay after sandbox worker restarts (improves stability) + disable_session_restore: true domain: agent verified: false diff --git a/resources_servers/ns_tools/requirements.txt b/resources_servers/ns_tools/requirements.txt index c5e3a1f21..77be9e6f1 100644 --- a/resources_servers/ns_tools/requirements.txt +++ b/resources_servers/ns_tools/requirements.txt @@ -1,2 +1,2 @@ -e nemo-gym[dev] @ ../../ -nemo-skills @ git+https://github.com/NVIDIA-NeMo/Skills.git@georgea/super-rl-01212026 +nemo-skills @ git+https://github.com/NVIDIA-NeMo/Skills.git@georgea/super-rl-02062026 diff --git a/resources_servers/ns_tools/tests/test_app.py b/resources_servers/ns_tools/tests/test_app.py index ab82ef723..86cb334f9 100644 --- a/resources_servers/ns_tools/tests/test_app.py +++ b/resources_servers/ns_tools/tests/test_app.py @@ -113,7 +113,7 @@ async def test_verify_delegates_to_math_with_judge(self) -> None: expected_answer="4", ) - result = await server.verify(verify_request) + result = await server.verify(None, verify_request) assert result.reward == 1.0 assert result.delegated_response is not None @@ -179,7 +179,7 @@ async def test_verify_uses_default_verifier(self) -> None: expected_answer="4", ) - result = await server.verify(verify_request) + result = await server.verify(None, verify_request) assert result.reward == 0.0 call_args = server.server_client.post.call_args @@ -232,7 +232,7 @@ async def test_verify_passes_through_fields(self) -> None: expected_answer="4", ) - await server.verify(verify_request) + await server.verify(None, verify_request) call_args = server.server_client.post.call_args json_data = call_args.kwargs["json"]