diff --git a/app/agent/sandbox_agent.py b/app/agent/sandbox_agent.py index 58612d20f..73d63e7f3 100644 --- a/app/agent/sandbox_agent.py +++ b/app/agent/sandbox_agent.py @@ -5,15 +5,19 @@ from app.agent.browser import BrowserContextHelper from app.agent.toolcall import ToolCallAgent from app.config import config -from app.daytona.sandbox import create_sandbox, delete_sandbox -from app.daytona.tool_base import SandboxToolsBase from app.logger import logger from app.prompt.manus import NEXT_STEP_PROMPT, SYSTEM_PROMPT +from app.sandbox.providers import SandboxProvider, create_sandbox_provider from app.tool import Terminate, ToolCollection from app.tool.ask_human import AskHuman from app.tool.mcp import MCPClients, MCPClientTool -from app.tool.sandbox.sb_browser_tool import SandboxBrowserTool +from app.tool.sandbox.sb_browser_tool import ( + SANDBOX_BROWSER_TOOL_NAME, + SandboxBrowserTool, +) +from app.tool.sandbox.sb_computer_tool import SandboxComputerTool from app.tool.sandbox.sb_files_tool import SandboxFilesTool +from app.tool.sandbox.sb_mobile_tool import SandboxMobileTool from app.tool.sandbox.sb_shell_tool import SandboxShellTool from app.tool.sandbox.sb_vision_tool import SandboxVisionTool @@ -53,6 +57,7 @@ class SandboxManus(ToolCallAgent): ) # server_id -> url/command _initialized: bool = False sandbox_link: Optional[dict[str, dict[str, str]]] = Field(default_factory=dict) + sandbox_provider: Optional[SandboxProvider] = Field(default=None, exclude=True) @model_validator(mode="after") def initialize_helper(self) -> "SandboxManus": @@ -69,42 +74,45 @@ async def create(cls, **kwargs) -> "SandboxManus": instance._initialized = True return instance - async def initialize_sandbox_tools( - self, - password: str = config.daytona.VNC_password, - ) -> None: + async def initialize_sandbox_tools(self) -> None: try: - # 创建新沙箱 - if password: - sandbox = create_sandbox(password=password) - self.sandbox = sandbox - else: - raise ValueError("password must be provided") - vnc_link = sandbox.get_preview_link(6080) - website_link = sandbox.get_preview_link(8080) - vnc_url = vnc_link.url if hasattr(vnc_link, "url") else str(vnc_link) - website_url = ( - website_link.url if hasattr(website_link, "url") else str(website_link) + provider = create_sandbox_provider() + await provider.initialize() + self.sandbox_provider = provider + + metadata = provider.metadata() + link_key = ( + metadata.extra.get("sandbox_id") + if metadata.extra.get("sandbox_id") + else metadata.provider ) - - # Get the actual sandbox_id from the created sandbox - actual_sandbox_id = sandbox.id if hasattr(sandbox, "id") else "new_sandbox" - if not self.sandbox_link: - self.sandbox_link = {} - self.sandbox_link[actual_sandbox_id] = { - "vnc": vnc_url, - "website": website_url, - } - logger.info(f"VNC URL: {vnc_url}") - logger.info(f"Website URL: {website_url}") - SandboxToolsBase._urls_printed = True - sb_tools = [ - SandboxBrowserTool(sandbox), - SandboxFilesTool(sandbox), - SandboxShellTool(sandbox), - SandboxVisionTool(sandbox), + if metadata.links: + self.sandbox_link[link_key] = metadata.links + for name, url in metadata.links.items(): + logger.info(f"Sandbox {name} link: {url}") + + tools = [ + SandboxShellTool(provider.shell_service()), + SandboxFilesTool(provider.file_service()), ] - self.available_tools.add_tools(*sb_tools) + + browser_service = provider.browser_service() + if browser_service: + tools.append(SandboxBrowserTool(browser_service)) + + computer_service = provider.computer_service() + if computer_service: + tools.append(SandboxComputerTool(computer_service)) + + mobile_service = provider.mobile_service() + if mobile_service: + tools.append(SandboxMobileTool(mobile_service)) + + vision_service = provider.vision_service() + if vision_service: + tools.append(SandboxVisionTool(vision_service)) + + self.available_tools.add_tools(*tools) except Exception as e: logger.error(f"Error initializing sandbox tools: {e}") @@ -174,17 +182,6 @@ async def disconnect_mcp_server(self, server_id: str = "") -> None: self.available_tools = ToolCollection(*base_tools) self.available_tools.add_tools(*self.mcp_clients.tools) - async def delete_sandbox(self, sandbox_id: str) -> None: - """Delete a sandbox by ID.""" - try: - await delete_sandbox(sandbox_id) - logger.info(f"Sandbox {sandbox_id} deleted successfully") - if sandbox_id in self.sandbox_link: - del self.sandbox_link[sandbox_id] - except Exception as e: - logger.error(f"Error deleting sandbox {sandbox_id}: {e}") - raise e - async def cleanup(self): """Clean up Manus agent resources.""" if self.browser_context_helper: @@ -192,7 +189,12 @@ async def cleanup(self): # Disconnect from all MCP servers only if we were initialized if self._initialized: await self.disconnect_mcp_server() - await self.delete_sandbox(self.sandbox.id if self.sandbox else "unknown") + if self.sandbox_provider: + try: + await self.sandbox_provider.cleanup() + except Exception: + logger.warning("Failed to cleanup sandbox provider", exc_info=True) + self.sandbox_provider = None self._initialized = False async def think(self) -> bool: @@ -203,8 +205,16 @@ async def think(self) -> bool: original_prompt = self.next_step_prompt recent_messages = self.memory.messages[-3:] if self.memory.messages else [] + browser_tool_names = { + tool.name + for tool in self.available_tools.tools + if isinstance(tool, SandboxBrowserTool) + } + if not browser_tool_names: + browser_tool_names = {SANDBOX_BROWSER_TOOL_NAME} + browser_in_use = any( - tc.function.name == SandboxBrowserTool().name + tc.function.name in browser_tool_names for msg in recent_messages if msg.tool_calls for tc in msg.tool_calls diff --git a/app/config.py b/app/config.py index a881e2a5e..f47f4ef2e 100644 --- a/app/config.py +++ b/app/config.py @@ -91,9 +91,56 @@ class BrowserSettings(BaseModel): ) +class AgentBaySessionDefaults(BaseModel): + """Default parameters when creating AgentBay sessions.""" + + image_id: Optional[str] = Field( + None, description="Default image ID to use when creating sessions" + ) + is_vpc: bool = Field( + False, description="Whether to create AgentBay sessions with VPC resources" + ) + + +class AgentBaySettings(BaseModel): + """Configuration for AgentBay based sandbox provider.""" + + api_key: Optional[str] = Field( + None, description="AgentBay API key (falls back to environment if missing)" + ) + endpoint: str = Field( + "wuyingai.cn-shanghai.aliyuncs.com", description="AgentBay API endpoint" + ) + timeout_ms: int = Field( + 60000, description="AgentBay API timeout in milliseconds (connect/read)" + ) + env_file: Optional[str] = Field( + None, description="Path to .env file for AgentBay credentials" + ) + desktop_image_id: Optional[str] = Field( + None, + description="Override image ID for desktop-capable sessions (e.g., linux_latest)", + ) + browser_image_id: Optional[str] = Field( + None, + description="Override image ID for browser-focused sessions (e.g., browser_latest)", + ) + mobile_image_id: Optional[str] = Field( + None, + description="Override image ID for mobile automation sessions (e.g., android_latest)", + ) + session_defaults: AgentBaySessionDefaults = Field( + default_factory=AgentBaySessionDefaults, + description="Default session creation parameters", + ) + + class SandboxSettings(BaseModel): - """Configuration for the execution sandbox""" + """Configuration for the execution sandbox and provider selection""" + provider: str = Field( + "daytona", description="Sandbox provider to use (e.g., daytona, agentbay)" + ) use_sandbox: bool = Field(False, description="Whether to use the sandbox") image: str = Field("python:3.12-slim", description="Base image") work_dir: str = Field("/workspace", description="Container working directory") @@ -103,6 +150,9 @@ class SandboxSettings(BaseModel): network_enabled: bool = Field( False, description="Whether network access is allowed" ) + agentbay: Optional[AgentBaySettings] = Field( + default=None, description="AgentBay specific sandbox settings" + ) class DaytonaSettings(BaseModel): @@ -189,6 +239,9 @@ class AppConfig(BaseModel): daytona_config: Optional[DaytonaSettings] = Field( None, description="Daytona configuration" ) + agentbay_config: Optional[AgentBaySettings] = Field( + None, description="AgentBay configuration" + ) class Config: arbitrary_types_allowed = True @@ -287,7 +340,13 @@ def _load_initial_config(self): search_settings = SearchSettings(**search_config) sandbox_config = raw_config.get("sandbox", {}) if sandbox_config: - sandbox_settings = SandboxSettings(**sandbox_config) + sandbox_agentbay = sandbox_config.get("agentbay") or {} + sandbox_settings = SandboxSettings( + **{k: v for k, v in sandbox_config.items() if k != "agentbay"}, + agentbay=AgentBaySettings(**sandbox_agentbay) + if sandbox_agentbay + else None, + ) else: sandbox_settings = SandboxSettings() daytona_config = raw_config.get("daytona", {}) @@ -310,6 +369,17 @@ def _load_initial_config(self): run_flow_settings = RunflowSettings(**run_flow_config) else: run_flow_settings = RunflowSettings() + agentbay_config = raw_config.get("agentbay", {}) + if agentbay_config: + agentbay_settings = AgentBaySettings(**agentbay_config) + else: + # fall back to sandbox nested config if present + agentbay_settings = ( + sandbox_settings.agentbay + if sandbox_settings and sandbox_settings.agentbay + else AgentBaySettings() + ) + config_dict = { "llm": { "default": default_settings, @@ -324,6 +394,7 @@ def _load_initial_config(self): "mcp_config": mcp_settings, "run_flow_config": run_flow_settings, "daytona_config": daytona_settings, + "agentbay_config": agentbay_settings, } self._config = AppConfig(**config_dict) @@ -340,6 +411,10 @@ def sandbox(self) -> SandboxSettings: def daytona(self) -> DaytonaSettings: return self._config.daytona_config + @property + def agentbay(self) -> AgentBaySettings: + return self._config.agentbay_config + @property def browser_config(self) -> Optional[BrowserSettings]: return self._config.browser_config diff --git a/app/sandbox/README.md b/app/sandbox/README.md new file mode 100644 index 000000000..78d18e808 --- /dev/null +++ b/app/sandbox/README.md @@ -0,0 +1,48 @@ +# Sandbox Providers Overview + +This directory hosts the sandbox integration layer for OpenManus. The sandbox is responsible for executing high‑risk operations (shell, browser automation, desktop control, mobile actions, etc.) in isolated environments. Two providers are currently supported: + +| Provider | Capabilities | Key files | Notes | +|----------|--------------|-----------|-------| +| **Daytona** | Shell, File, Browser, Vision | `app/daytona/` and `app/sandbox/providers/daytona_provider.py` | Requires a Daytona account and API key. | +| **AgentBay** | Shell, File, Browser, Computer (desktop), Mobile | `app/sandbox/providers/agentbay_provider.py` | Requires access to AgentBay cloud resources. | + +## How it works + +1. `app/sandbox/providers/base.py` defines common service interfaces (`ShellService`, `BrowserService`, `ComputerService`, etc.) and the `SandboxProvider` base class. +2. `app/sandbox/providers/factory.py` reads `config/config.toml` to instantiate the correct provider. +3. `SandboxManus` (`app/agent/sandbox_agent.py`) requests the provider and injects the provider-specific tools (e.g., `sandbox_shell`, `sandbox_browser`, `sandbox_mobile`) into the agent. +4. Cleanup is unified through `SandboxProvider.cleanup()`, ensuring remote sessions are released when the agent stops. + +## Choosing a provider + +Set the provider in `config/config.toml`: + +```toml +[sandbox] +provider = "agentbay" # or "daytona" +use_sandbox = true +``` + +### AgentBay setup + +1. Install dependencies (already declared in `requirements.txt`, including `wuying-agentbay-sdk`). +2. Create an AgentBay API key by following the official guide: https://help.aliyun.com/zh/agentbay/user-guide/service-management. The service provides a limited trial quota after the key is created—make sure you finish the console steps before running the agent. +3. Copy `config/config.example-agentbay.toml` to your working config and fill in the `[sandbox.agentbay]` section with your API key and image IDs. +4. Run `python sandbox_main.py`. The agent will register shell, file, browser, desktop, and mobile tools backed by AgentBay. +5. Watch the logs for session links to inspect the remote desktop or device. + +### Daytona setup + +1. Follow the instructions in `app/daytona/README.md` to configure your Daytona API key and sandbox image. +2. Ensure `provider = "daytona"` in `config/config.toml`. +3. Launch `python sandbox_main.py` to use the Daytona-backed tools (shell, file, browser, vision). + +## Adding new providers + +1. Implement a new provider class in `app/sandbox/providers/` that inherits from `SandboxProvider`. +2. Provide concrete service implementations for any capabilities you support. +3. Register the provider name in `app/sandbox/providers/factory.py`. +4. Update this README and the configuration examples to document the new option. + +Keeping the provider abstraction consistent allows the agents and tools to remain agnostic about the underlying execution environment. diff --git a/app/sandbox/__init__.py b/app/sandbox/__init__.py index ccf0df6d8..82bcfd8c3 100644 --- a/app/sandbox/__init__.py +++ b/app/sandbox/__init__.py @@ -1,30 +1,8 @@ """ -Docker Sandbox Module +Sandbox package exports. -Provides secure containerized execution environment with resource limits -and isolation for running untrusted code. +The default imports were trimmed to avoid pulling heavy dependencies when +only provider abstractions are required. Import modules directly as needed. """ -from app.sandbox.client import ( - BaseSandboxClient, - LocalSandboxClient, - create_sandbox_client, -) -from app.sandbox.core.exceptions import ( - SandboxError, - SandboxResourceError, - SandboxTimeoutError, -) -from app.sandbox.core.manager import SandboxManager -from app.sandbox.core.sandbox import DockerSandbox - -__all__ = [ - "DockerSandbox", - "SandboxManager", - "BaseSandboxClient", - "LocalSandboxClient", - "create_sandbox_client", - "SandboxError", - "SandboxTimeoutError", - "SandboxResourceError", -] +__all__ = [] diff --git a/app/sandbox/providers/__init__.py b/app/sandbox/providers/__init__.py new file mode 100644 index 000000000..e9069a240 --- /dev/null +++ b/app/sandbox/providers/__init__.py @@ -0,0 +1,30 @@ +""" +Sandbox provider abstractions and factory for switching between different backends. +""" + +from .base import ( + BrowserService, + ComputerService, + FileService, + MobileService, + SandboxMetadata, + SandboxProvider, + ShellCommandResult, + ShellService, + VisionService, +) +from .factory import create_sandbox_provider + + +__all__ = [ + "SandboxMetadata", + "SandboxProvider", + "ShellCommandResult", + "ShellService", + "FileService", + "BrowserService", + "VisionService", + "ComputerService", + "MobileService", + "create_sandbox_provider", +] diff --git a/app/sandbox/providers/agentbay_provider.py b/app/sandbox/providers/agentbay_provider.py new file mode 100644 index 000000000..4093543f6 --- /dev/null +++ b/app/sandbox/providers/agentbay_provider.py @@ -0,0 +1,1030 @@ +""" +AgentBay-backed sandbox provider implementation. +""" + +from __future__ import annotations + +import asyncio +import base64 +import json +from typing import Any, Dict, Optional, Sequence + +import requests +from agentbay import AgentBay +from agentbay.agentbay import Config as AgentBayConfig +from agentbay.browser import BrowserOption +from agentbay.command.command import CommandResult +from agentbay.computer import MouseButton, ScrollDirection +from agentbay.filesystem.filesystem import ( + BoolResult, + DirectoryListResult, + FileContentResult, + FileInfoResult, +) +from agentbay.session_params import CreateSessionParams +from browser_use import Browser as BrowserUseBrowser +from browser_use import BrowserConfig +from browser_use.browser.context import BrowserContext, BrowserContextConfig +from browser_use.dom.service import DomService + +from app.config import AgentBaySettings, Config, SandboxSettings +from app.tool.browser_use_tool import BrowserUseTool +from app.utils.logger import logger + +from .base import ( + BrowserService, + ComputerService, + FileService, + MobileService, + SandboxMetadata, + SandboxProvider, + ShellCommandResult, + ShellService, + VisionService, +) + + +class AgentBayShellService(ShellService): + """Shell service using AgentBay session command API.""" + + def __init__(self, session): + self._session = session + + async def execute( + self, + command: str, + *, + cwd: Optional[str] = None, + timeout: Optional[int] = None, + blocking: bool = False, + session: Optional[str] = None, + ) -> ShellCommandResult: + if cwd: + command = f"cd {cwd} && {command}" + timeout_ms = (timeout or 60) * 1000 + result: CommandResult = self._session.command.execute_command( + command, timeout_ms=timeout_ms + ) + if result.success: + return ShellCommandResult( + success=True, + output=result.output, + completed=True, + ) + return ShellCommandResult( + success=False, + error=result.error_message or "AgentBay command execution failed", + ) + + async def check(self, session: str) -> ShellCommandResult: + return ShellCommandResult( + success=False, + error="AgentBay shell does not support session inspection", + ) + + async def terminate(self, session: str) -> ShellCommandResult: + return ShellCommandResult( + success=False, + error="AgentBay shell does not support terminating sessions", + ) + + async def list_sessions(self) -> Sequence[str]: + return [] + + +class AgentBayBrowserUseTool(BrowserUseTool[None]): + """BrowserUse tool configured to attach to AgentBay CDP endpoint.""" + + def __init__(self, endpoint_url: str): + super().__init__() + self._endpoint_url = endpoint_url + + async def _ensure_browser_initialized(self) -> BrowserContext: # type: ignore[override] + if self.browser is None: + browser_config = BrowserConfig( + headless=False, + disable_security=True, + cdp_url=self._endpoint_url, + ) + self.browser = BrowserUseBrowser(browser_config) + + if self.context is None: + context_config = BrowserContextConfig() + self.context = await self.browser.new_context(context_config) + self.dom_service = DomService(await self.context.get_current_page()) + + return self.context + + +class AgentBayFileService(FileService): + """File service built on AgentBay FileSystem APIs.""" + + def __init__(self, session): + self._session = session + + async def read(self, path: str) -> str: + result: FileContentResult = self._session.file_system.read_file(path) + if not result.success: + raise FileNotFoundError(result.error_message or f"Failed to read {path}") + return result.content or "" + + async def write(self, path: str, content: str, *, overwrite: bool = True) -> None: + mode = "overwrite" if overwrite else "append" + result: BoolResult = self._session.file_system.write_file(path, content, mode) + if not result.success: + raise IOError(result.error_message or f"Failed to write {path}") + + async def delete(self, path: str) -> None: + command = f"rm -rf '{path}'" + exec_result = self._session.command.execute_command(command, timeout_ms=10000) + if not exec_result.success: + raise IOError(exec_result.error_message or f"Failed to delete {path}") + + async def list(self, path: str) -> Sequence[dict]: + result: DirectoryListResult = self._session.file_system.list_directory(path) + if not result.success: + raise IOError(result.error_message or f"Failed to list {path}") + entries = [] + for entry in result.entries or []: + entries.append( + { + "name": entry.get("name"), + "is_dir": entry.get("isDirectory", False), + } + ) + return entries + + async def exists(self, path: str) -> bool: + try: + result: FileInfoResult = self._session.file_system.get_file_info(path) + return result.success + except Exception: + return False + + +class AgentBayComputerService(ComputerService): + """Computer automation service backed by AgentBay Computer APIs.""" + + def __init__(self, session): + self._session = session + + async def _call(self, func, *args, **kwargs): + return await asyncio.to_thread(func, *args, **kwargs) + + def _ensure_success(self, result, default_error: str): + if not getattr(result, "success", False): + error = getattr(result, "error_message", "") or default_error + raise RuntimeError(error) + return result + + def _download_base64(self, url: str) -> str: + response = requests.get(url, timeout=30) + response.raise_for_status() + return base64.b64encode(response.content).decode("ascii") + + def _normalize_button(self, button: str, allow_double: bool = True) -> str: + value = (button or "left").lower() + mapping = { + "left": MouseButton.LEFT.value, + "right": MouseButton.RIGHT.value, + "middle": MouseButton.MIDDLE.value, + } + if allow_double: + mapping["double_left"] = MouseButton.DOUBLE_LEFT.value + if value not in mapping: + raise RuntimeError(f"Unsupported mouse button: {button}") + normalized = mapping[value] + if normalized == MouseButton.DOUBLE_LEFT.value and not allow_double: + raise RuntimeError("Double click button is not supported for this action") + return normalized + + def _format_key(self, raw: str) -> str: + key = (raw or "").strip() + if not key: + raise RuntimeError("Empty key provided") + lower = key.lower() + special_map = { + "enter": "Enter", + "esc": "Esc", + "escape": "Esc", + "backspace": "Backspace", + "tab": "Tab", + "space": "Space", + "delete": "Delete", + "ctrl": "Ctrl", + "control": "Ctrl", + "alt": "Alt", + "shift": "Shift", + "win": "Win", + "cmd": "Meta", + "command": "Meta", + "meta": "Meta", + "up": "Up", + "down": "Down", + "left": "Left", + "right": "Right", + "pageup": "PageUp", + "pagedown": "PageDown", + "home": "Home", + "end": "End", + "insert": "Insert", + } + if lower in special_map: + return special_map[lower] + if lower.startswith("f") and lower[1:].isdigit(): + return lower.upper() + if len(lower) == 1 and lower.isalpha(): + return lower + if lower.isdigit(): + return lower + return key + + async def move_mouse(self, x: int, y: int) -> None: + result = await self._call(self._session.computer.move_mouse, x, y) + self._ensure_success(result, f"Failed to move mouse to ({x}, {y})") + + async def click_mouse(self, x: int, y: int, *, button: str, count: int = 1) -> None: + normalized_button = self._normalize_button(button) + clicks = max(1, min(3, count)) + + # AgentBay provides a dedicated double-left click option; prefer it when possible. + if clicks == 2 and normalized_button == MouseButton.LEFT.value: + result = await self._call( + self._session.computer.click_mouse, + x, + y, + MouseButton.DOUBLE_LEFT, + ) + self._ensure_success(result, "Failed to perform double left click") + return + + for _ in range(clicks): + result = await self._call( + self._session.computer.click_mouse, + x, + y, + normalized_button, + ) + self._ensure_success(result, "Failed to perform mouse click") + + async def drag_mouse( + self, + from_x: int, + from_y: int, + to_x: int, + to_y: int, + *, + button: str = "left", + ) -> None: + normalized_button = self._normalize_button(button, allow_double=False) + result = await self._call( + self._session.computer.drag_mouse, + from_x, + from_y, + to_x, + to_y, + normalized_button, + ) + self._ensure_success(result, "Failed to drag mouse") + + async def scroll(self, x: int, y: int, *, amount: int) -> None: + if amount == 0: + return + direction = ( + ScrollDirection.UP.value if amount > 0 else ScrollDirection.DOWN.value + ) + result = await self._call( + self._session.computer.scroll, + x, + y, + direction, + abs(amount), + ) + self._ensure_success(result, "Failed to scroll") + + async def input_text(self, text: str) -> None: + result = await self._call(self._session.computer.input_text, text) + self._ensure_success(result, "Failed to input text") + + async def press_keys(self, keys: Sequence[str], *, hold: bool = False) -> None: + formatted_keys = [self._format_key(key) for key in keys if key] + if not formatted_keys: + raise RuntimeError("No keys provided for press_keys") + result = await self._call( + self._session.computer.press_keys, formatted_keys, hold + ) + self._ensure_success(result, "Failed to press keys") + + async def release_keys(self, keys: Sequence[str]) -> None: + formatted_keys = [self._format_key(key) for key in keys if key] + if not formatted_keys: + raise RuntimeError("No keys provided for release_keys") + result = await self._call(self._session.computer.release_keys, formatted_keys) + self._ensure_success(result, "Failed to release keys") + + async def get_cursor_position(self) -> Dict[str, int]: + result = await self._call(self._session.computer.get_cursor_position) + operation = self._ensure_success(result, "Failed to get cursor position") + data = getattr(operation, "data", None) or {} + x = int(data.get("x", 0)) + y = int(data.get("y", 0)) + return {"x": x, "y": y} + + async def get_screen_size(self) -> Dict[str, Any]: + result = await self._call(self._session.computer.get_screen_size) + operation = self._ensure_success(result, "Failed to get screen size") + data = getattr(operation, "data", None) or {} + return data + + async def screenshot(self) -> Dict[str, Any]: + result = await self._call(self._session.computer.screenshot) + operation = self._ensure_success(result, "Failed to capture screenshot") + screenshot_ref = getattr(operation, "data", None) + base64_image: Optional[str] = None + if isinstance(screenshot_ref, str) and screenshot_ref: + try: + base64_image = await asyncio.to_thread( + self._download_base64, screenshot_ref + ) + except Exception as exc: # noqa: BLE001 + logger.warning( + "Failed to download screenshot from %s: %s", + screenshot_ref, + exc, + ) + return {"url": screenshot_ref, "base64": base64_image} + + def supports_mouse_hold(self) -> bool: + # AgentBay Computer API does not expose explicit mouse down/up operations yet. + return False + + +class AgentBayMobileService(MobileService): + """Mobile automation service backed by AgentBay Mobile APIs.""" + + def __init__(self, session): + self._session = session + + async def tap(self, x: int, y: int) -> None: + result = await self._call(self._session.mobile.tap, x, y) + self._ensure_success(result, f"Failed to tap ({x}, {y})") + + async def swipe( + self, + start_x: int, + start_y: int, + end_x: int, + end_y: int, + duration_ms: int = 300, + ) -> None: + result = await self._call( + self._session.mobile.swipe, + start_x, + start_y, + end_x, + end_y, + duration_ms, + ) + self._ensure_success(result, "Failed to perform swipe") + + async def input_text(self, text: str) -> None: + result = await self._call(self._session.mobile.input_text, text) + self._ensure_success(result, "Failed to input text") + + async def send_key(self, key_code: int) -> None: + result = await self._call(self._session.mobile.send_key, key_code) + self._ensure_success(result, f"Failed to send key {key_code}") + + async def screenshot(self) -> Dict[str, Any]: + result = await self._call(self._session.mobile.screenshot) + operation = self._ensure_success(result, "Failed to capture screenshot") + screenshot_ref = getattr(operation, "data", None) + base64_image: Optional[str] = None + if isinstance(screenshot_ref, str) and screenshot_ref: + try: + base64_image = await asyncio.to_thread( + self._download_base64, screenshot_ref + ) + except Exception as exc: # noqa: BLE001 + logger.warning( + "Failed to download mobile screenshot from %s: %s", + screenshot_ref, + exc, + ) + return {"url": screenshot_ref, "base64": base64_image} + + async def get_clickable_ui_elements(self, timeout_ms: int = 2000) -> Dict[str, Any]: + result = await self._call( + self._session.mobile.get_clickable_ui_elements, timeout_ms + ) + if not getattr(result, "success", False): + error = getattr(result, "error_message", "Failed to get UI elements") + raise RuntimeError(error) + elements = getattr(result, "elements", []) or [] + return {"elements": elements, "count": len(elements)} + + async def _call(self, func, *args, **kwargs): + return await asyncio.to_thread(func, *args, **kwargs) + + def _ensure_success(self, result, default_error: str): + if not getattr(result, "success", False): + error = getattr(result, "error_message", "") or default_error + raise RuntimeError(error) + return result + + def _download_base64(self, url: str) -> str: + response = requests.get(url, timeout=30) + response.raise_for_status() + return base64.b64encode(response.content).decode("ascii") + + +class AgentBayBrowserService(BrowserService): + """Browser automation service backed by BrowserUse over AgentBay CDP.""" + + ACTION_MAP = { + "navigate_to": "go_to_url", + "go_to_url": "go_to_url", + "go_back": "go_back", + "click_element": "click_element", + "input_text": "input_text", + "scroll_down": "scroll_down", + "scroll_up": "scroll_up", + "scroll_to_text": "scroll_to_text", + "send_keys": "send_keys", + "switch_tab": "switch_tab", + "open_tab": "open_tab", + "close_tab": "close_tab", + "wait": "wait", + "get_dropdown_options": "get_dropdown_options", + "select_dropdown_option": "select_dropdown_option", + } + + def __init__(self, session): + self._session = session + self._tool: Optional[AgentBayBrowserUseTool] = None + self._endpoint_url: Optional[str] = None + + async def _ensure_tool(self) -> AgentBayBrowserUseTool: + if self._tool is not None: + return self._tool + + option = BrowserOption() + success = await self._session.browser.initialize_async(option) + if not success: + raise RuntimeError("Failed to initialize AgentBay browser session") + + endpoint_url = self._session.browser.get_endpoint_url() + if not endpoint_url: + raise RuntimeError("AgentBay browser endpoint URL unavailable") + + self._endpoint_url = endpoint_url + self._tool = AgentBayBrowserUseTool(endpoint_url) + return self._tool + + async def perform_action( + self, action: str, payload: Dict[str, Any] + ) -> Dict[str, Any]: + tool = await self._ensure_tool() + mapped_action = self.ACTION_MAP.get(action.lower()) + params: Dict[str, Any] = {} + + if mapped_action is None: + return { + "success": False, + "message": f"Unsupported browser action: {action}", + } + + def add_if(name: str, value: Any) -> None: + if value is not None: + params[name] = value + + if mapped_action == "go_to_url": + add_if("url", payload.get("url")) + if "url" not in params: + return {"success": False, "message": "url is required for navigate_to"} + elif mapped_action in ("click_element", "get_dropdown_options"): + add_if("index", payload.get("index")) + if "index" not in params: + return {"success": False, "message": "index is required for action"} + elif mapped_action == "input_text": + add_if("index", payload.get("index")) + add_if("text", payload.get("text")) + if len(params) != 2: + return { + "success": False, + "message": "index and text required for input_text", + } + elif mapped_action == "scroll_down" or mapped_action == "scroll_up": + add_if("scroll_amount", payload.get("amount")) + elif mapped_action == "scroll_to_text": + add_if("text", payload.get("text")) + if "text" not in params: + return {"success": False, "message": "text required for scroll_to_text"} + elif mapped_action == "send_keys": + add_if("keys", payload.get("keys")) + if "keys" not in params: + return {"success": False, "message": "keys required for send_keys"} + elif mapped_action == "switch_tab": + add_if("tab_id", payload.get("page_id")) + if "tab_id" not in params: + return {"success": False, "message": "page_id required for switch_tab"} + elif mapped_action == "open_tab": + add_if("url", payload.get("url")) + if "url" not in params: + return {"success": False, "message": "url required for open_tab"} + elif mapped_action == "select_dropdown_option": + add_if("index", payload.get("index")) + add_if("text", payload.get("text")) + if len(params) != 2: + return { + "success": False, + "message": "index and text required for select_dropdown_option", + } + elif mapped_action == "wait": + add_if("seconds", payload.get("seconds")) + + try: + result = await tool.execute(action=mapped_action, **params) + except Exception as exc: # noqa: BLE001 + return {"success": False, "message": f"Browser action failed: {exc}"} + + if result.error: + return {"success": False, "message": result.error} + + message = ( + result.output + if isinstance(result.output, str) + else json.dumps(result.output) + ) + + state_info = await self._collect_state(tool) + response: Dict[str, Any] = { + "success": True, + "message": message, + } + if state_info: + response["state"] = state_info + return response + + async def current_state(self) -> Dict[str, Any]: + tool = await self._ensure_tool() + state = await self._collect_state(tool) + return state or {} + + async def cleanup(self) -> None: + if self._tool and self._tool.browser: + try: + await self._tool.browser.close() # type: ignore[attr-defined] + except Exception: + logger.debug("Failed to close BrowserUse browser", exc_info=True) + try: + await self._session.browser.agent.close_async() + except Exception: + logger.debug("Failed to close AgentBay browser session", exc_info=True) + self._tool = None + self._endpoint_url = None + + async def _collect_state( + self, tool: AgentBayBrowserUseTool + ) -> Optional[Dict[str, Any]]: + try: + result = await tool.get_current_state() + except Exception as exc: # noqa: BLE001 + logger.debug("Failed to obtain browser state: %s", exc) + return None + if result.error or not result.output: + return None + try: + state = json.loads(result.output) + except Exception: + state = {"raw_output": result.output} + if result.base64_image: + state["screenshot_base64"] = result.base64_image + return state + + +class AgentBaySandboxProvider(SandboxProvider): + """Sandbox provider backed by AgentBay cloud sessions.""" + + def __init__(self, app_config: Config, sandbox_settings: SandboxSettings): + super().__init__("agentbay") + self._config = app_config + self._sandbox_settings = sandbox_settings + self._settings: AgentBaySettings = app_config.agentbay + if not self._settings: + raise RuntimeError("AgentBay settings are required when provider=agentbay") + + self._client: Optional[AgentBay] = None + self._metadata = SandboxMetadata(provider="agentbay") + + self._sessions: Dict[str, Any] = {} + self._session_locks: Dict[str, asyncio.Lock] = {} + self._service_cache: Dict[tuple[str, str], Any] = {} + + self._shell_wrapper: Optional["AgentBayLazyShellService"] = None + self._file_wrapper: Optional["AgentBayLazyFileService"] = None + self._browser_wrapper: Optional["AgentBayLazyBrowserService"] = None + self._computer_wrapper: Optional["AgentBayLazyComputerService"] = None + self._mobile_wrapper: Optional["AgentBayLazyMobileService"] = None + + async def initialize(self) -> None: + logger.info("Initializing AgentBay sandbox provider (lazy sessions)") + self._metadata.links.clear() + self._metadata.extra.clear() + self._metadata.extra["sessions"] = [] + + async def cleanup(self) -> None: + for service in list(self._service_cache.values()): + if isinstance(service, AgentBayBrowserService): + try: + await service.cleanup() + except Exception: + logger.warning("Failed to cleanup AgentBay browser", exc_info=True) + + if self._client: + for image_id, session in list(self._sessions.items()): + try: + await asyncio.to_thread(self._client.delete, session) + except Exception: + logger.warning( + "Failed to delete AgentBay session %s for image %s", + getattr(session, "session_id", "unknown"), + image_id, + exc_info=True, + ) + self._sessions.clear() + + self._service_cache.clear() + self._session_locks.clear() + self._client = None + self._shell_wrapper = None + self._file_wrapper = None + self._browser_wrapper = None + self._computer_wrapper = None + self._mobile_wrapper = None + + def metadata(self) -> SandboxMetadata: + return self._metadata + + def shell_service(self) -> ShellService: + if self._shell_wrapper is None: + self._shell_wrapper = AgentBayLazyShellService(self) + return self._shell_wrapper + + def file_service(self) -> FileService: + if self._file_wrapper is None: + self._file_wrapper = AgentBayLazyFileService(self) + return self._file_wrapper + + def browser_service(self) -> Optional[BrowserService]: + if self._get_image_id_for_role("browser") is None: + return None + if self._browser_wrapper is None: + self._browser_wrapper = AgentBayLazyBrowserService(self) + return self._browser_wrapper + + def vision_service(self) -> Optional[VisionService]: + return None + + def computer_service(self) -> Optional[ComputerService]: + if self._get_image_id_for_role("computer") is None: + return None + if self._computer_wrapper is None: + self._computer_wrapper = AgentBayLazyComputerService(self) + return self._computer_wrapper + + def mobile_service(self) -> Optional[MobileService]: + if self._get_image_id_for_role("mobile") is None: + return None + if self._mobile_wrapper is None: + self._mobile_wrapper = AgentBayLazyMobileService(self) + return self._mobile_wrapper + + async def _get_service(self, kind: str, role: str): + image_id = self._get_image_id_for_role(role) + if not image_id: + raise RuntimeError(f"No image configured for AgentBay role '{role}'") + + session = await self._ensure_session(image_id) + cache_key = (image_id, kind) + if cache_key in self._service_cache: + return self._service_cache[cache_key] + + if kind == "shell": + service = AgentBayShellService(session) + elif kind == "file": + service = AgentBayFileService(session) + elif kind == "browser": + service = AgentBayBrowserService(session) + elif kind == "computer": + service = AgentBayComputerService(session) + elif kind == "mobile": + service = AgentBayMobileService(session) + else: + raise ValueError(f"Unknown service kind: {kind}") + + self._service_cache[cache_key] = service + return service + + async def _ensure_session(self, image_id: str): + if image_id in self._sessions: + return self._sessions[image_id] + + lock = self._session_locks.setdefault(image_id, asyncio.Lock()) + async with lock: + if image_id in self._sessions: + return self._sessions[image_id] + + if self._client is None: + agentbay_cfg = AgentBayConfig( + endpoint=self._settings.endpoint, + timeout_ms=self._settings.timeout_ms, + ) + api_key = self._settings.api_key or "" + env_file = self._settings.env_file + self._client = AgentBay( + api_key=api_key, cfg=agentbay_cfg, env_file=env_file + ) + + params = CreateSessionParams() + params.is_vpc = self._settings.session_defaults.is_vpc + params.image_id = image_id + + logger.info("Creating AgentBay session with image %s", image_id) + session_result = await asyncio.to_thread(self._client.create, params) + if not session_result.success: + raise RuntimeError( + session_result.error_message + or f"Failed to create AgentBay session for image {image_id}" + ) + + session = session_result.session + self._sessions[image_id] = session + self._metadata.extra.setdefault("sessions", []).append( + { + "image_id": image_id, + "session_id": session.session_id, + } + ) + if getattr(session, "resource_url", None): + self._metadata.links[f"{image_id}_resource"] = session.resource_url + logger.info( + "AgentBay session %s (image %s) resource: %s", + session.session_id, + image_id, + session.resource_url, + ) + + logger.info( + "AgentBay session %s ready for image %s", + session.session_id, + image_id, + ) + + return session + + def _get_image_id_for_role(self, role: str) -> Optional[str]: + desktop_image = self._settings.desktop_image_id + browser_image = self._settings.browser_image_id + mobile_image = self._settings.mobile_image_id + default_image = self._settings.session_defaults.image_id + + if role == "computer": + return desktop_image or default_image + if role == "browser": + return browser_image or default_image or desktop_image + if role == "mobile": + return mobile_image or default_image or desktop_image + # shell/file default preference: use desktop if available, otherwise default, otherwise browser + return default_image or desktop_image or browser_image or mobile_image + + +class AgentBayLazyShellService(ShellService): + def __init__(self, provider: AgentBaySandboxProvider): + self._provider = provider + + async def execute( + self, + command: str, + *, + cwd: Optional[str] = None, + timeout: Optional[int] = None, + blocking: bool = False, + session: Optional[str] = None, + ) -> ShellCommandResult: + service: AgentBayShellService = await self._provider._get_service( + "shell", "shell" + ) + return await service.execute( + command, cwd=cwd, timeout=timeout, blocking=blocking, session=session + ) + + async def check(self, session: str) -> ShellCommandResult: + service: AgentBayShellService = await self._provider._get_service( + "shell", "shell" + ) + return await service.check(session) + + async def terminate(self, session: str) -> ShellCommandResult: + service: AgentBayShellService = await self._provider._get_service( + "shell", "shell" + ) + return await service.terminate(session) + + async def list_sessions(self) -> Sequence[str]: + service: AgentBayShellService = await self._provider._get_service( + "shell", "shell" + ) + return await service.list_sessions() + + +class AgentBayLazyFileService(FileService): + def __init__(self, provider: AgentBaySandboxProvider): + self._provider = provider + + async def read(self, path: str) -> str: + service: AgentBayFileService = await self._provider._get_service( + "file", "shell" + ) + return await service.read(path) + + async def write(self, path: str, content: str, *, overwrite: bool = True) -> None: + service: AgentBayFileService = await self._provider._get_service( + "file", "shell" + ) + await service.write(path, content, overwrite=overwrite) + + async def delete(self, path: str) -> None: + service: AgentBayFileService = await self._provider._get_service( + "file", "shell" + ) + await service.delete(path) + + async def list(self, path: str) -> Sequence[Dict[str, Any]]: + service: AgentBayFileService = await self._provider._get_service( + "file", "shell" + ) + return await service.list(path) + + async def exists(self, path: str) -> bool: + service: AgentBayFileService = await self._provider._get_service( + "file", "shell" + ) + return await service.exists(path) + + +class AgentBayLazyBrowserService(BrowserService): + def __init__(self, provider: AgentBaySandboxProvider): + self._provider = provider + + async def perform_action( + self, action: str, payload: Dict[str, Any] + ) -> Dict[str, Any]: + service: AgentBayBrowserService = await self._provider._get_service( + "browser", "browser" + ) + return await service.perform_action(action, payload) + + async def current_state(self) -> Dict[str, Any]: + service: AgentBayBrowserService = await self._provider._get_service( + "browser", "browser" + ) + return await service.current_state() + + async def cleanup(self) -> None: + image_id = self._provider._get_image_id_for_role("browser") + if not image_id: + return + service = self._provider._service_cache.get((image_id, "browser")) + if service: + await service.cleanup() + + +class AgentBayLazyComputerService(ComputerService): + def __init__(self, provider: AgentBaySandboxProvider): + self._provider = provider + + async def move_mouse(self, x: int, y: int) -> None: + service: AgentBayComputerService = await self._provider._get_service( + "computer", "computer" + ) + await service.move_mouse(x, y) + + async def click_mouse(self, x: int, y: int, *, button: str, count: int = 1) -> None: + service: AgentBayComputerService = await self._provider._get_service( + "computer", "computer" + ) + await service.click_mouse(x, y, button=button, count=count) + + async def drag_mouse( + self, + from_x: int, + from_y: int, + to_x: int, + to_y: int, + *, + button: str = "left", + ) -> None: + service: AgentBayComputerService = await self._provider._get_service( + "computer", "computer" + ) + await service.drag_mouse(from_x, from_y, to_x, to_y, button=button) + + async def scroll(self, x: int, y: int, *, amount: int) -> None: + service: AgentBayComputerService = await self._provider._get_service( + "computer", "computer" + ) + await service.scroll(x, y, amount=amount) + + async def input_text(self, text: str) -> None: + service: AgentBayComputerService = await self._provider._get_service( + "computer", "computer" + ) + await service.input_text(text) + + async def press_keys(self, keys: Sequence[str], *, hold: bool = False) -> None: + service: AgentBayComputerService = await self._provider._get_service( + "computer", "computer" + ) + await service.press_keys(keys, hold=hold) + + async def release_keys(self, keys: Sequence[str]) -> None: + service: AgentBayComputerService = await self._provider._get_service( + "computer", "computer" + ) + await service.release_keys(keys) + + async def get_cursor_position(self) -> Dict[str, int]: + service: AgentBayComputerService = await self._provider._get_service( + "computer", "computer" + ) + return await service.get_cursor_position() + + async def get_screen_size(self) -> Dict[str, Any]: + service: AgentBayComputerService = await self._provider._get_service( + "computer", "computer" + ) + return await service.get_screen_size() + + async def screenshot(self) -> Dict[str, Any]: + service: AgentBayComputerService = await self._provider._get_service( + "computer", "computer" + ) + return await service.screenshot() + + def supports_mouse_hold(self) -> bool: + image_id = self._provider._get_image_id_for_role("computer") + if not image_id: + return False + service = self._provider._service_cache.get((image_id, "computer")) + if isinstance(service, AgentBayComputerService): + return service.supports_mouse_hold() + return False + + +class AgentBayLazyMobileService(MobileService): + def __init__(self, provider: AgentBaySandboxProvider): + self._provider = provider + + async def tap(self, x: int, y: int) -> None: + service: AgentBayMobileService = await self._provider._get_service( + "mobile", "mobile" + ) + await service.tap(x, y) + + async def swipe( + self, + start_x: int, + start_y: int, + end_x: int, + end_y: int, + duration_ms: int = 300, + ) -> None: + service: AgentBayMobileService = await self._provider._get_service( + "mobile", "mobile" + ) + await service.swipe(start_x, start_y, end_x, end_y, duration_ms) + + async def input_text(self, text: str) -> None: + service: AgentBayMobileService = await self._provider._get_service( + "mobile", "mobile" + ) + await service.input_text(text) + + async def send_key(self, key_code: int) -> None: + service: AgentBayMobileService = await self._provider._get_service( + "mobile", "mobile" + ) + await service.send_key(key_code) + + async def screenshot(self) -> Dict[str, Any]: + service: AgentBayMobileService = await self._provider._get_service( + "mobile", "mobile" + ) + return await service.screenshot() + + async def get_clickable_ui_elements(self, timeout_ms: int = 2000) -> Dict[str, Any]: + service: AgentBayMobileService = await self._provider._get_service( + "mobile", "mobile" + ) + return await service.get_clickable_ui_elements(timeout_ms) diff --git a/app/sandbox/providers/base.py b/app/sandbox/providers/base.py new file mode 100644 index 000000000..897cb3d44 --- /dev/null +++ b/app/sandbox/providers/base.py @@ -0,0 +1,246 @@ +""" +Base abstractions for sandbox providers and the services they expose. +""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from typing import Any, Dict, Optional, Sequence + + +@dataclass +class SandboxMetadata: + """Lightweight metadata describing the active sandbox session.""" + + provider: str + links: Dict[str, str] = field(default_factory=dict) + extra: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class ShellCommandResult: + """Result returned by shell service operations.""" + + success: bool + output: Optional[str] = None + error: Optional[str] = None + session_name: Optional[str] = None + completed: Optional[bool] = None + metadata: Dict[str, Any] = field(default_factory=dict) + + +class ShellService(ABC): + """Abstract shell command execution service.""" + + @abstractmethod + async def execute( + self, + command: str, + *, + cwd: Optional[str] = None, + timeout: Optional[int] = None, + blocking: bool = False, + session: Optional[str] = None, + ) -> ShellCommandResult: + """Execute a command inside the sandbox.""" + + async def check(self, session: str) -> ShellCommandResult: + """Retrieve output for a long-running session.""" + raise NotImplementedError("check is not supported by this provider") + + async def terminate(self, session: str) -> ShellCommandResult: + """Terminate a running session.""" + raise NotImplementedError("terminate is not supported by this provider") + + async def list_sessions(self) -> Sequence[str]: + """List existing shell sessions.""" + raise NotImplementedError("list_sessions is not supported by this provider") + + +class FileService(ABC): + """Abstract file management service.""" + + @abstractmethod + async def read(self, path: str) -> str: + """Read file contents.""" + + @abstractmethod + async def write(self, path: str, content: str, *, overwrite: bool = True) -> None: + """Write file contents.""" + + @abstractmethod + async def delete(self, path: str) -> None: + """Delete file or directory.""" + + @abstractmethod + async def list(self, path: str) -> Sequence[Dict[str, Any]]: + """List entries under a directory.""" + + @abstractmethod + async def exists(self, path: str) -> bool: + """Check whether a path exists.""" + + +class BrowserService(ABC): + """Abstract browser automation service.""" + + async def initialize(self) -> None: + """Initialize browser resources.""" + + async def cleanup(self) -> None: + """Release browser resources.""" + + @abstractmethod + async def perform_action( + self, action: str, payload: Dict[str, Any] + ) -> Dict[str, Any]: + """Execute a browser action and return structured response.""" + + async def current_state(self) -> Dict[str, Any]: + """Return current browser state snapshot.""" + raise NotImplementedError("current_state is not supported by this provider") + + +class VisionService(ABC): + """Abstract vision service for reading images inside sandbox.""" + + @abstractmethod + async def read_image(self, path: str) -> Dict[str, Any]: + """Read and encode an image file located inside sandbox.""" + + +class ComputerService(ABC): + """Abstract computer automation service for desktop interactions.""" + + async def move_mouse(self, x: int, y: int) -> None: + """Move mouse cursor to the given coordinates.""" + raise NotImplementedError("move_mouse is not supported by this provider") + + async def click_mouse(self, x: int, y: int, *, button: str, count: int = 1) -> None: + """Click mouse at the given coordinates.""" + raise NotImplementedError("click_mouse is not supported by this provider") + + async def drag_mouse( + self, + from_x: int, + from_y: int, + to_x: int, + to_y: int, + *, + button: str = "left", + ) -> None: + """Drag mouse from one coordinate to another.""" + raise NotImplementedError("drag_mouse is not supported by this provider") + + async def scroll(self, x: int, y: int, *, amount: int) -> None: + """Scroll at the given coordinates. Positive amount scrolls up, negative scrolls down.""" + raise NotImplementedError("scroll is not supported by this provider") + + async def input_text(self, text: str) -> None: + """Type text into the active element.""" + raise NotImplementedError("input_text is not supported by this provider") + + async def press_keys(self, keys: Sequence[str], *, hold: bool = False) -> None: + """Press one or more keys simultaneously.""" + raise NotImplementedError("press_keys is not supported by this provider") + + async def release_keys(self, keys: Sequence[str]) -> None: + """Release previously held keys.""" + raise NotImplementedError("release_keys is not supported by this provider") + + async def get_cursor_position(self) -> Dict[str, int]: + """Return current cursor position.""" + raise NotImplementedError( + "get_cursor_position is not supported by this provider" + ) + + async def get_screen_size(self) -> Dict[str, Any]: + """Return screen size metadata.""" + raise NotImplementedError("get_screen_size is not supported by this provider") + + async def screenshot(self) -> Dict[str, Any]: + """Capture a screenshot and return metadata (e.g., url/base64).""" + raise NotImplementedError("screenshot is not supported by this provider") + + def supports_mouse_hold(self) -> bool: + """Return True when provider can distinguish mouse down/up operations.""" + return False + + +class MobileService(ABC): + """Abstract mobile automation service.""" + + async def tap(self, x: int, y: int) -> None: + raise NotImplementedError("tap is not supported by this provider") + + async def swipe( + self, + start_x: int, + start_y: int, + end_x: int, + end_y: int, + duration_ms: int = 300, + ) -> None: + raise NotImplementedError("swipe is not supported by this provider") + + async def input_text(self, text: str) -> None: + raise NotImplementedError("input_text is not supported by this provider") + + async def send_key(self, key_code: int) -> None: + raise NotImplementedError("send_key is not supported by this provider") + + async def screenshot(self) -> Dict[str, Any]: + raise NotImplementedError("screenshot is not supported by this provider") + + async def get_clickable_ui_elements(self, timeout_ms: int = 2000) -> Dict[str, Any]: + raise NotImplementedError( + "get_clickable_ui_elements is not supported by this provider" + ) + + +class SandboxProvider(ABC): + """Base class for sandbox providers.""" + + def __init__(self, name: str): + self._name = name + + @property + def name(self) -> str: + return self._name + + @abstractmethod + async def initialize(self) -> None: + """Prepare sandbox session resources.""" + + @abstractmethod + async def cleanup(self) -> None: + """Release sandbox session resources.""" + + @abstractmethod + def metadata(self) -> SandboxMetadata: + """Return metadata describing the active sandbox.""" + + @abstractmethod + def shell_service(self) -> ShellService: + """Return shell service instance.""" + + @abstractmethod + def file_service(self) -> FileService: + """Return file service instance.""" + + def browser_service(self) -> Optional[BrowserService]: + """Return browser service instance if available.""" + return None + + def vision_service(self) -> Optional[VisionService]: + """Return vision service instance if available.""" + return None + + def computer_service(self) -> Optional[ComputerService]: + """Return computer automation service if available.""" + return None + + def mobile_service(self) -> Optional[MobileService]: + """Return mobile automation service if available.""" + return None diff --git a/app/sandbox/providers/daytona_provider.py b/app/sandbox/providers/daytona_provider.py new file mode 100644 index 000000000..f22c2b25d --- /dev/null +++ b/app/sandbox/providers/daytona_provider.py @@ -0,0 +1,552 @@ +""" +Daytona-backed sandbox provider implementation. +""" + +from __future__ import annotations + +import asyncio +import base64 +import io +import json +import mimetypes +import time +from typing import Any, Dict, Optional, Sequence, Tuple +from uuid import uuid4 + +from PIL import Image + +from app.config import Config, SandboxSettings +from app.daytona.sandbox import SessionExecuteRequest, create_sandbox, delete_sandbox +from app.utils.logger import logger + +from .base import ( + BrowserService, + FileService, + SandboxMetadata, + SandboxProvider, + ShellCommandResult, + ShellService, + VisionService, +) + + +class DaytonaShellService(ShellService): + """Shell service implementation backed by Daytona sandbox tmux sessions.""" + + def __init__(self, sandbox): + self._sandbox = sandbox + self.workspace_path = "/workspace" + self._sessions: dict[str, str] = {} + + async def _ensure_session(self, session_name: str) -> str: + """Ensure tmux session exists and return underlying session id.""" + if session_name in self._sessions: + return self._sessions[session_name] + + session_id = str(uuid4()) + self._sandbox.process.create_session(session_id) + self._sessions[session_name] = session_id + return session_id + + async def _execute_raw_command(self, command: str) -> Tuple[str, int]: + """Execute command directly within the sandbox control session.""" + session_id = await self._ensure_session("_raw") + req = SessionExecuteRequest( + command=command, + run_async=False, + cwd=self.workspace_path, + ) + response = self._sandbox.process.execute_session_command( + session_id=session_id, req=req, timeout=30 + ) + logs = self._sandbox.process.get_session_command_logs( + session_id=session_id, command_id=response.cmd_id + ) + return logs, getattr(response, "exit_code", 0) + + async def _kill_tmux_session(self, session_name: str) -> None: + try: + await self._execute_raw_command(f"tmux kill-session -t {session_name}") + except Exception: + logger.debug(f"Unable to kill tmux session {session_name}", exc_info=True) + self._sessions.pop(session_name, None) + + def _default_session(self, supplied: Optional[str]) -> str: + return supplied or f"session_{uuid4().hex[:8]}" + + async def execute( + self, + command: str, + *, + cwd: Optional[str] = None, + timeout: Optional[int] = None, + blocking: bool = False, + session: Optional[str] = None, + ) -> ShellCommandResult: + session_name = self._default_session(session) + working_dir = self.workspace_path + if cwd: + working_dir = f"{self.workspace_path}/{cwd.strip('/')}" + + try: + await self._ensure_session(session_name) + except Exception as exc: + return ShellCommandResult( + success=False, + error=f"Failed to create tmux session {session_name}: {exc}", + ) + + # Create tmux session if not exists + try: + result, _ = await self._execute_raw_command( + f"tmux has-session -t {session_name} 2>/dev/null || echo 'not_exists'" + ) + if "not_exists" in result: + await self._execute_raw_command( + f"tmux new-session -d -s {session_name}" + ) + except Exception as exc: + return ShellCommandResult( + success=False, + error=f"Failed to prepare tmux session {session_name}: {exc}", + ) + + escaped_command = f"cd {working_dir} && {command}".replace('"', '\\"') + try: + await self._execute_raw_command( + f'tmux send-keys -t {session_name} "{escaped_command}" Enter' + ) + except Exception as exc: + return ShellCommandResult( + success=False, + error=f"Failed to dispatch command to tmux: {exc}", + ) + + if not blocking: + return ShellCommandResult( + success=True, + output=( + f"Command sent to session '{session_name}'. " + "Use check to retrieve output." + ), + session_name=session_name, + completed=False, + metadata={"cwd": working_dir}, + ) + + timeout_s = timeout or 60 + start_time = time.time() + while time.time() - start_time < timeout_s: + await asyncio.sleep(2) + result, _ = await self._execute_raw_command( + f"tmux has-session -t {session_name} 2>/dev/null || echo 'not_exists'" + ) + if "not_exists" in result: + break + + output, _ = await self._execute_raw_command( + f"tmux capture-pane -t {session_name} -p -S - -E -" + ) + await self._kill_tmux_session(session_name) + + return ShellCommandResult( + success=True, + output=output, + session_name=session_name, + completed=True, + metadata={"cwd": working_dir}, + ) + + async def check(self, session: str) -> ShellCommandResult: + try: + exists_output, _ = await self._execute_raw_command( + f"tmux has-session -t {session} 2>/dev/null || echo 'not_exists'" + ) + if "not_exists" in exists_output: + return ShellCommandResult( + success=False, + error=f"Session '{session}' not found.", + session_name=session, + ) + + output, _ = await self._execute_raw_command( + f"tmux capture-pane -t {session} -p -S - -E -" + ) + return ShellCommandResult( + success=True, output=output, session_name=session, completed=False + ) + except Exception as exc: + return ShellCommandResult( + success=False, + error=f"Failed to read session '{session}': {exc}", + session_name=session, + ) + + async def terminate(self, session: str) -> ShellCommandResult: + try: + await self._kill_tmux_session(session) + return ShellCommandResult( + success=True, + output=f"Session '{session}' terminated.", + session_name=session, + completed=True, + ) + except Exception as exc: + return ShellCommandResult( + success=False, + error=f"Failed to terminate session '{session}': {exc}", + session_name=session, + ) + + async def list_sessions(self) -> Sequence[str]: + try: + output, _ = await self._execute_raw_command( + "tmux list-sessions 2>/dev/null || echo 'No sessions'" + ) + if "No sessions" in output: + return [] + sessions = [] + for line in output.splitlines(): + if ":" in line: + sessions.append(line.split(":", 1)[0].strip()) + return sessions + except Exception: + logger.debug("Failed to list tmux sessions", exc_info=True) + return [] + + async def cleanup(self) -> None: + for session in list(self._sessions.keys()): + await self._kill_tmux_session(session) + try: + await self._execute_raw_command("tmux kill-server 2>/dev/null || true") + except Exception: + logger.debug("Failed to kill tmux server during cleanup", exc_info=True) + + +class DaytonaFileService(FileService): + """File management service for Daytona sandbox.""" + + def __init__(self, sandbox): + self._sandbox = sandbox + self.workspace_path = "/workspace" + + def _resolve(self, path: str) -> str: + if path.startswith("/"): + return path + return f"{self.workspace_path}/{path}".rstrip("/") + + async def read(self, path: str) -> str: + full_path = self._resolve(path) + content = self._sandbox.fs.download_file(full_path) + if isinstance(content, bytes): + return content.decode("utf-8", errors="ignore") + return str(content) + + async def write(self, path: str, content: str, *, overwrite: bool = True) -> None: + full_path = self._resolve(path) + parent = "/".join(full_path.split("/")[:-1]) + if parent: + self._sandbox.fs.create_folder(parent, "755") + if not overwrite: + try: + self._sandbox.fs.get_file_info(full_path) + raise FileExistsError(f"File '{path}' already exists") + except Exception: + pass + + self._sandbox.fs.upload_file(content.encode("utf-8"), full_path) + self._sandbox.fs.set_file_permissions(full_path, "644") + + async def delete(self, path: str) -> None: + full_path = self._resolve(path) + self._sandbox.fs.delete_file(full_path) + + async def list(self, path: str) -> Sequence[Dict[str, Any]]: + full_path = self._resolve(path) if path else self.workspace_path + entries = [] + for info in self._sandbox.fs.list_files(full_path): + entries.append( + { + "name": info.name, + "is_dir": bool(getattr(info, "is_dir", False)), + "size": getattr(info, "size", 0), + "modified": getattr(info, "mod_time", None), + } + ) + return entries + + async def exists(self, path: str) -> bool: + full_path = self._resolve(path) + try: + self._sandbox.fs.get_file_info(full_path) + return True + except Exception: + return False + + +class DaytonaBrowserService(BrowserService): + """Browser automation service using Daytona sandbox automation API.""" + + def __init__(self, sandbox): + self._sandbox = sandbox + self._last_state: Optional[Dict[str, Any]] = None + + async def perform_action( + self, action: str, payload: Dict[str, Any] + ) -> Dict[str, Any]: + endpoint, method = self._map_action(action) + if not endpoint: + return { + "success": False, + "message": f"Unsupported browser action: {action}", + } + + response = self._call_browser_api(endpoint, payload, method) + if response.get("success"): + self._last_state = response + return response + + async def current_state(self) -> Dict[str, Any]: + return self._last_state or {} + + def _map_action(self, action: str) -> Tuple[Optional[str], str]: + endpoint_map: Dict[str, Tuple[str, str]] = { + "navigate_to": ("navigate_to", "POST"), + "go_back": ("go_back", "POST"), + "click_element": ("click_element", "POST"), + "input_text": ("input_text", "POST"), + "send_keys": ("send_keys", "POST"), + "switch_tab": ("switch_tab", "POST"), + "close_tab": ("close_tab", "POST"), + "scroll_down": ("scroll_down", "POST"), + "scroll_up": ("scroll_up", "POST"), + "scroll_to_text": ("scroll_to_text", "POST"), + "get_dropdown_options": ("get_dropdown_options", "POST"), + "select_dropdown_option": ("select_dropdown_option", "POST"), + "click_coordinates": ("click_coordinates", "POST"), + "drag_drop": ("drag_drop", "POST"), + "wait": ("wait", "POST"), + } + return endpoint_map.get(action, (None, "POST")) + + def _call_browser_api( + self, endpoint: str, params: Dict[str, Any], method: str = "POST" + ) -> Dict[str, Any]: + base_url = f"http://localhost:8003/api/automation/{endpoint}" + if method == "GET" and params: + query = "&".join(f"{k}={v}" for k, v in params.items()) + command = ( + f"curl -s -X {method} '{base_url}?{query}' " + "-H 'Content-Type: application/json'" + ) + else: + body = json.dumps(params) if params else "" + data_flag = f" -d '{body}'" if body else "" + command = ( + f"curl -s -X {method} '{base_url}' " + "-H 'Content-Type: application/json'" + data_flag + ) + + result = self._sandbox.process.exec(command, timeout=30) + if getattr(result, "exit_code", 0) != 0: + return { + "success": False, + "message": f"Browser automation failed: {result.result}", + } + + try: + payload = json.loads(result.result or "{}") + except json.JSONDecodeError as exc: + return { + "success": False, + "message": f"Invalid browser response: {exc}", + } + + if "screenshot_base64" in payload: + if not self._validate_base64(payload["screenshot_base64"]): + payload.pop("screenshot_base64", None) + return payload + + def _validate_base64(self, data: str) -> bool: + if not data: + return False + try: + base64.b64decode(data, validate=True) + return True + except Exception: + logger.debug("Invalid base64 screenshot", exc_info=True) + return False + + +class DaytonaVisionService(VisionService): + """Vision service that reads images from Daytona sandbox filesystem.""" + + MAX_IMAGE_SIZE = 10 * 1024 * 1024 + MAX_COMPRESSED_SIZE = 5 * 1024 * 1024 + DEFAULT_MAX_WIDTH = 1920 + DEFAULT_MAX_HEIGHT = 1080 + DEFAULT_JPEG_QUALITY = 85 + DEFAULT_PNG_COMPRESS_LEVEL = 6 + + def __init__(self, sandbox): + self._sandbox = sandbox + self.workspace_path = "/workspace" + + def _resolve(self, path: str) -> str: + if path.startswith("/"): + return path + return f"{self.workspace_path}/{path}".rstrip("/") + + async def read_image(self, path: str) -> Dict[str, Any]: + full_path = self._resolve(path) + info = self._sandbox.fs.get_file_info(full_path) + if info.is_dir: + raise IsADirectoryError(f"'{path}' is a directory") + if info.size > self.MAX_IMAGE_SIZE: + raise ValueError( + f"Image too large ({info.size} bytes). Max {self.MAX_IMAGE_SIZE}." + ) + + raw = self._sandbox.fs.download_file(full_path) + buffer, mime_type = self._compress_image(raw, full_path) + if len(buffer) > self.MAX_COMPRESSED_SIZE: + raise ValueError( + f"Compressed image still too large ({len(buffer)} bytes). " + f"Max {self.MAX_COMPRESSED_SIZE}." + ) + + return { + "mime_type": mime_type, + "base64": base64.b64encode(buffer).decode("utf-8"), + "file_path": path, + "original_size": info.size, + "compressed_size": len(buffer), + } + + def _compress_image(self, data: bytes, path: str) -> Tuple[bytes, str]: + mime_type, _ = mimetypes.guess_type(path) + if not mime_type: + mime_type = "image/jpeg" + try: + with Image.open(io.BytesIO(data)) as img: + if img.mode in ("RGBA", "LA", "P"): + background = Image.new("RGB", img.size, (255, 255, 255)) + if img.mode == "P": + img = img.convert("RGBA") + background.paste( + img, mask=img.split()[-1] if img.mode == "RGBA" else None + ) + img = background + + width, height = img.size + if width > self.DEFAULT_MAX_WIDTH or height > self.DEFAULT_MAX_HEIGHT: + ratio = min( + self.DEFAULT_MAX_WIDTH / width, self.DEFAULT_MAX_HEIGHT / height + ) + img = img.resize( + (int(width * ratio), int(height * ratio)), + Image.Resampling.LANCZOS, + ) + + output = io.BytesIO() + if mime_type == "image/png": + img.save( + output, + format="PNG", + optimize=True, + compress_level=self.DEFAULT_PNG_COMPRESS_LEVEL, + ) + mime_type = "image/png" + elif mime_type == "image/gif": + img.save(output, format="GIF", optimize=True) + mime_type = "image/gif" + else: + img.save( + output, + format="JPEG", + quality=self.DEFAULT_JPEG_QUALITY, + optimize=True, + ) + mime_type = "image/jpeg" + return output.getvalue(), mime_type + except Exception: + logger.debug("Image compression failed, returning raw bytes", exc_info=True) + return data, mime_type + + +class DaytonaSandboxProvider(SandboxProvider): + """Concrete sandbox provider backed by Daytona remote environments.""" + + def __init__(self, app_config: Config, sandbox_settings: SandboxSettings): + super().__init__("daytona") + self._config = app_config + self._settings = sandbox_settings + + self._sandbox = None + self._metadata = SandboxMetadata(provider="daytona") + + self._shell_service: Optional[DaytonaShellService] = None + self._file_service: Optional[DaytonaFileService] = None + self._browser_service: Optional[DaytonaBrowserService] = None + self._vision_service: Optional[DaytonaVisionService] = None + + async def initialize(self) -> None: + password = self._config.daytona.VNC_password + if not password: + raise ValueError("Daytona VNC password must be configured") + + logger.info("Creating Daytona sandbox...") + self._sandbox = create_sandbox(password=password) + + self._metadata.extra["sandbox_id"] = getattr(self._sandbox, "id", None) + + try: + vnc_link = self._sandbox.get_preview_link(6080) + website_link = self._sandbox.get_preview_link(8080) + self._metadata.links["vnc"] = ( + vnc_link.url if hasattr(vnc_link, "url") else str(vnc_link) + ) + self._metadata.links["website"] = ( + website_link.url if hasattr(website_link, "url") else str(website_link) + ) + except Exception: + logger.debug("Failed to fetch sandbox preview links", exc_info=True) + + self._shell_service = DaytonaShellService(self._sandbox) + self._file_service = DaytonaFileService(self._sandbox) + self._browser_service = DaytonaBrowserService(self._sandbox) + self._vision_service = DaytonaVisionService(self._sandbox) + + async def cleanup(self) -> None: + if self._shell_service: + await self._shell_service.cleanup() + + if self._sandbox: + sandbox_id = getattr(self._sandbox, "id", None) + if sandbox_id: + try: + await delete_sandbox(sandbox_id) + except Exception: + logger.warning( + f"Failed to delete sandbox {sandbox_id}", exc_info=True + ) + self._sandbox = None + + def metadata(self) -> SandboxMetadata: + return self._metadata + + def shell_service(self) -> ShellService: + if not self._shell_service: + raise RuntimeError("Daytona sandbox not initialized") + return self._shell_service + + def file_service(self) -> FileService: + if not self._file_service: + raise RuntimeError("Daytona sandbox not initialized") + return self._file_service + + def browser_service(self) -> Optional[BrowserService]: + return self._browser_service + + def vision_service(self) -> Optional[VisionService]: + return self._vision_service diff --git a/app/sandbox/providers/factory.py b/app/sandbox/providers/factory.py new file mode 100644 index 000000000..ac46bcfdb --- /dev/null +++ b/app/sandbox/providers/factory.py @@ -0,0 +1,52 @@ +""" +Factory helpers for creating sandbox providers based on configuration. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from app.config import SandboxSettings, config +from app.utils.logger import logger + +from .base import SandboxProvider + + +if TYPE_CHECKING: + from app.config import SandboxSettings + + +def _normalize_provider_name(name: str | None) -> str: + if not name: + return "daytona" + return name.strip().lower() + + +def create_sandbox_provider() -> SandboxProvider: + """ + Instantiate a sandbox provider according to configuration. + + Returns: + SandboxProvider: Concrete sandbox provider instance. + + Raises: + ValueError: If no matching provider is found. + """ + + sandbox_settings: SandboxSettings = config.sandbox or SandboxSettings() + provider_name = _normalize_provider_name( + getattr(sandbox_settings, "provider", None) + ) + + logger.debug(f"Creating sandbox provider: {provider_name}") + + if provider_name == "daytona": + from .daytona_provider import DaytonaSandboxProvider + + return DaytonaSandboxProvider(config, sandbox_settings) + if provider_name == "agentbay": + from .agentbay_provider import AgentBaySandboxProvider + + return AgentBaySandboxProvider(config, sandbox_settings) + + raise ValueError(f"Unsupported sandbox provider: {provider_name}") diff --git a/app/tool/__init__.py b/app/tool/__init__.py index 636e9b8da..3343f9ba7 100644 --- a/app/tool/__init__.py +++ b/app/tool/__init__.py @@ -13,7 +13,6 @@ __all__ = [ "BaseTool", "Bash", - "BrowserUseTool", "Terminate", "StrReplaceEditor", "WebSearch", @@ -22,3 +21,6 @@ "PlanningTool", "Crawl4aiTool", ] + +if BrowserUseTool is not None: + __all__.append("BrowserUseTool") diff --git a/app/tool/computer_constants.py b/app/tool/computer_constants.py new file mode 100644 index 000000000..0194bcaeb --- /dev/null +++ b/app/tool/computer_constants.py @@ -0,0 +1,77 @@ +"""Shared constants for desktop automation tools.""" + +KEYBOARD_KEYS = [ + "a", + "b", + "c", + "d", + "e", + "f", + "g", + "h", + "i", + "j", + "k", + "l", + "m", + "n", + "o", + "p", + "q", + "r", + "s", + "t", + "u", + "v", + "w", + "x", + "y", + "z", + "0", + "1", + "2", + "3", + "4", + "5", + "6", + "7", + "8", + "9", + "enter", + "esc", + "backspace", + "tab", + "space", + "delete", + "ctrl", + "alt", + "shift", + "win", + "up", + "down", + "left", + "right", + "f1", + "f2", + "f3", + "f4", + "f5", + "f6", + "f7", + "f8", + "f9", + "f10", + "f11", + "f12", + "ctrl+c", + "ctrl+v", + "ctrl+x", + "ctrl+z", + "ctrl+a", + "ctrl+s", + "alt+tab", + "alt+f4", + "ctrl+alt+delete", +] + +MOUSE_BUTTONS = ["left", "right", "middle"] diff --git a/app/tool/computer_use_tool.py b/app/tool/computer_use_tool.py index 0ea57a734..06aee4e7d 100644 --- a/app/tool/computer_use_tool.py +++ b/app/tool/computer_use_tool.py @@ -10,82 +10,9 @@ from app.daytona.tool_base import Sandbox, SandboxToolsBase from app.tool.base import ToolResult +from app.tool.computer_constants import KEYBOARD_KEYS, MOUSE_BUTTONS -KEYBOARD_KEYS = [ - "a", - "b", - "c", - "d", - "e", - "f", - "g", - "h", - "i", - "j", - "k", - "l", - "m", - "n", - "o", - "p", - "q", - "r", - "s", - "t", - "u", - "v", - "w", - "x", - "y", - "z", - "0", - "1", - "2", - "3", - "4", - "5", - "6", - "7", - "8", - "9", - "enter", - "esc", - "backspace", - "tab", - "space", - "delete", - "ctrl", - "alt", - "shift", - "win", - "up", - "down", - "left", - "right", - "f1", - "f2", - "f3", - "f4", - "f5", - "f6", - "f7", - "f8", - "f9", - "f10", - "f11", - "f12", - "ctrl+c", - "ctrl+v", - "ctrl+x", - "ctrl+z", - "ctrl+a", - "ctrl+s", - "alt+tab", - "alt+f4", - "ctrl+alt+delete", -] -MOUSE_BUTTONS = ["left", "right", "middle"] _COMPUTER_USE_DESCRIPTION = """\ A comprehensive computer automation tool that allows interaction with the desktop environment. * This tool provides commands for controlling mouse, keyboard, and taking screenshots diff --git a/app/tool/sandbox/sb_browser_tool.py b/app/tool/sandbox/sb_browser_tool.py index b3a862edd..f5baf04b2 100644 --- a/app/tool/sandbox/sb_browser_tool.py +++ b/app/tool/sandbox/sb_browser_tool.py @@ -1,279 +1,50 @@ -import base64 -import io -import json -import traceback -from typing import Optional # Add this import for Optional +from typing import Any, Dict, Optional -from PIL import Image from pydantic import Field -from app.daytona.tool_base import ( # Ensure Sandbox is imported correctly - Sandbox, - SandboxToolsBase, - ThreadMessage, -) -from app.tool.base import ToolResult -from app.utils.logger import logger +from app.sandbox.providers import BrowserService +from app.tool.base import BaseTool, ToolResult -# Context = TypeVar("Context") _BROWSER_DESCRIPTION = """\ -A sandbox-based browser automation tool that allows interaction with web pages through various actions. -* This tool provides commands for controlling a browser session in a sandboxed environment -* It maintains state across calls, keeping the browser session alive until explicitly closed -* Use this when you need to browse websites, fill forms, click buttons, or extract content in a secure sandbox -* Each action requires specific parameters as defined in the tool's dependencies -Key capabilities include: -* Navigation: Go to specific URLs, go back in history -* Interaction: Click elements by index, input text, send keyboard commands -* Scrolling: Scroll up/down by pixel amount or scroll to specific text -* Tab management: Switch between tabs or close tabs -* Content extraction: Get dropdown options or select dropdown options +Execute browser automation actions within the sandbox. +The supported actions depend on the selected sandbox provider. """ +SANDBOX_BROWSER_TOOL_NAME = "sandbox_browser" -# noinspection PyArgumentList -class SandboxBrowserTool(SandboxToolsBase): - """Tool for executing tasks in a Daytona sandbox with browser-use capabilities.""" - name: str = "sandbox_browser" +class SandboxBrowserTool(BaseTool): + """Provider-agnostic browser automation tool.""" + + name: str = Field(default=SANDBOX_BROWSER_TOOL_NAME) description: str = _BROWSER_DESCRIPTION parameters: dict = { "type": "object", "properties": { "action": { "type": "string", - "enum": [ - "navigate_to", - "go_back", - "wait", - "click_element", - "input_text", - "send_keys", - "switch_tab", - "close_tab", - "scroll_down", - "scroll_up", - "scroll_to_text", - "get_dropdown_options", - "select_dropdown_option", - "click_coordinates", - "drag_drop", - ], - "description": "The browser action to perform", - }, - "url": { - "type": "string", - "description": "URL for 'navigate_to' action", - }, - "index": { - "type": "integer", - "description": "Element index for interaction actions", - }, - "text": { - "type": "string", - "description": "Text for input or scroll actions", - }, - "amount": { - "type": "integer", - "description": "Pixel amount to scroll", - }, - "page_id": { - "type": "integer", - "description": "Tab ID for tab management actions", - }, - "keys": { - "type": "string", - "description": "Keys to send for keyboard actions", - }, - "seconds": { - "type": "integer", - "description": "Seconds to wait", - }, - "x": { - "type": "integer", - "description": "X coordinate for click or drag actions", - }, - "y": { - "type": "integer", - "description": "Y coordinate for click or drag actions", - }, - "element_source": { - "type": "string", - "description": "Source element for drag and drop", - }, - "element_target": { - "type": "string", - "description": "Target element for drag and drop", + "description": "Browser action to perform", }, + "url": {"type": "string"}, + "index": {"type": "integer"}, + "text": {"type": "string"}, + "amount": {"type": "integer"}, + "page_id": {"type": "integer"}, + "keys": {"type": "string"}, + "seconds": {"type": "integer"}, + "x": {"type": "integer"}, + "y": {"type": "integer"}, + "element_source": {"type": "string"}, + "element_target": {"type": "string"}, }, "required": ["action"], - "dependencies": { - "navigate_to": ["url"], - "click_element": ["index"], - "input_text": ["index", "text"], - "send_keys": ["keys"], - "switch_tab": ["page_id"], - "close_tab": ["page_id"], - "scroll_down": ["amount"], - "scroll_up": ["amount"], - "scroll_to_text": ["text"], - "get_dropdown_options": ["index"], - "select_dropdown_option": ["index", "text"], - "click_coordinates": ["x", "y"], - "drag_drop": ["element_source", "element_target"], - "wait": ["seconds"], - }, } - browser_message: Optional[ThreadMessage] = Field(default=None, exclude=True) - - def __init__( - self, sandbox: Optional[Sandbox] = None, thread_id: Optional[str] = None, **data - ): - """Initialize with optional sandbox and thread_id.""" - super().__init__(**data) - if sandbox is not None: - self._sandbox = sandbox # Directly set the base class private attribute - def _validate_base64_image( - self, base64_string: str, max_size_mb: int = 10 - ) -> tuple[bool, str]: - """ - Validate base64 image data. - Args: - base64_string: The base64 encoded image data - max_size_mb: Maximum allowed image size in megabytes - Returns: - Tuple of (is_valid, error_message) - """ - try: - if not base64_string or len(base64_string) < 10: - return False, "Base64 string is empty or too short" - if base64_string.startswith("data:"): - try: - base64_string = base64_string.split(",", 1)[1] - except (IndexError, ValueError): - return False, "Invalid data URL format" - import re - - if not re.match(r"^[A-Za-z0-9+/]*={0,2}$", base64_string): - return False, "Invalid base64 characters detected" - if len(base64_string) % 4 != 0: - return False, "Invalid base64 string length" - try: - image_data = base64.b64decode(base64_string, validate=True) - except Exception as e: - return False, f"Base64 decoding failed: {str(e)}" - max_size_bytes = max_size_mb * 1024 * 1024 - if len(image_data) > max_size_bytes: - return False, f"Image size exceeds limit ({max_size_bytes} bytes)" - try: - image_stream = io.BytesIO(image_data) - with Image.open(image_stream) as img: - img.verify() - supported_formats = {"JPEG", "PNG", "GIF", "BMP", "WEBP", "TIFF"} - if img.format not in supported_formats: - return False, f"Unsupported image format: {img.format}" - image_stream.seek(0) - with Image.open(image_stream) as img_check: - width, height = img_check.size - max_dimension = 8192 - if width > max_dimension or height > max_dimension: - return ( - False, - f"Image dimensions exceed limit ({max_dimension}x{max_dimension})", - ) - if width < 1 or height < 1: - return False, f"Invalid image dimensions: {width}x{height}" - except Exception as e: - return False, f"Invalid image data: {str(e)}" - return True, "Valid image" - except Exception as e: - logger.error(f"Unexpected error during base64 image validation: {e}") - return False, f"Validation error: {str(e)}" + browser_service: BrowserService = Field(exclude=True) - async def _execute_browser_action( - self, endpoint: str, params: dict = None, method: str = "POST" - ) -> ToolResult: - """Execute a browser automation action through the sandbox API.""" - try: - await self._ensure_sandbox() - url = f"http://localhost:8003/api/automation/{endpoint}" - if method == "GET" and params: - query_params = "&".join([f"{k}={v}" for k, v in params.items()]) - url = f"{url}?{query_params}" - curl_cmd = ( - f"curl -s -X {method} '{url}' -H 'Content-Type: application/json'" - ) - else: - curl_cmd = ( - f"curl -s -X {method} '{url}' -H 'Content-Type: application/json'" - ) - if params: - json_data = json.dumps(params) - curl_cmd += f" -d '{json_data}'" - logger.debug(f"Executing curl command: {curl_cmd}") - response = self.sandbox.process.exec(curl_cmd, timeout=30) - if response.exit_code == 0: - try: - result = json.loads(response.result) - result.setdefault("content", "") - result.setdefault("role", "assistant") - if "screenshot_base64" in result: - screenshot_data = result["screenshot_base64"] - is_valid, validation_message = self._validate_base64_image( - screenshot_data - ) - if not is_valid: - logger.warning( - f"Screenshot validation failed: {validation_message}" - ) - result["image_validation_error"] = validation_message - del result["screenshot_base64"] - - # added_message = await self.thread_manager.add_message( - # thread_id=self.thread_id, - # type="browser_state", - # content=result, - # is_llm_message=False - # ) - message = ThreadMessage( - type="browser_state", content=result, is_llm_message=False - ) - self.browser_message = message - success_response = { - "success": result.get("success", False), - "message": result.get("message", "Browser action completed"), - } - # if added_message and 'message_id' in added_message: - # success_response['message_id'] = added_message['message_id'] - for field in [ - "url", - "title", - "element_count", - "pixels_below", - "ocr_text", - "image_url", - ]: - if field in result: - success_response[field] = result[field] - return ( - self.success_response(success_response) - if success_response["success"] - else self.fail_response(success_response) - ) - except json.JSONDecodeError as e: - logger.error(f"Failed to parse response JSON: {e}") - return self.fail_response(f"Failed to parse response JSON: {e}") - else: - logger.error(f"Browser automation request failed: {response}") - return self.fail_response( - f"Browser automation request failed: {response}" - ) - except Exception as e: - logger.error(f"Error executing browser action: {e}") - logger.debug(traceback.format_exc()) - return self.fail_response(f"Error executing browser action: {e}") + def __init__(self, browser_service: BrowserService, **data): + super().__init__(browser_service=browser_service, **data) async def execute( self, @@ -291,160 +62,35 @@ async def execute( element_target: Optional[str] = None, **kwargs, ) -> ToolResult: - """ - Execute a browser action in the sandbox environment. - Args: - action: The browser action to perform - url: URL for navigation - index: Element index for interaction - text: Text for input or scroll actions - amount: Pixel amount to scroll - page_id: Tab ID for tab management - keys: Keys to send for keyboard actions - seconds: Seconds to wait - x: X coordinate for click/drag - y: Y coordinate for click/drag - element_source: Source element for drag and drop - element_target: Target element for drag and drop - Returns: - ToolResult with the action's output or error - """ - # async with self.lock: - try: - # Navigation actions - if action == "navigate_to": - if not url: - return self.fail_response("URL is required for navigation") - return await self._execute_browser_action("navigate_to", {"url": url}) - elif action == "go_back": - return await self._execute_browser_action("go_back", {}) - # Interaction actions - elif action == "click_element": - if index is None: - return self.fail_response("Index is required for click_element") - return await self._execute_browser_action( - "click_element", {"index": index} - ) - elif action == "input_text": - if index is None or not text: - return self.fail_response( - "Index and text are required for input_text" - ) - return await self._execute_browser_action( - "input_text", {"index": index, "text": text} - ) - elif action == "send_keys": - if not keys: - return self.fail_response("Keys are required for send_keys") - return await self._execute_browser_action("send_keys", {"keys": keys}) - # Tab management - elif action == "switch_tab": - if page_id is None: - return self.fail_response("Page ID is required for switch_tab") - return await self._execute_browser_action( - "switch_tab", {"page_id": page_id} - ) - elif action == "close_tab": - if page_id is None: - return self.fail_response("Page ID is required for close_tab") - return await self._execute_browser_action( - "close_tab", {"page_id": page_id} - ) - # Scrolling actions - elif action == "scroll_down": - params = {"amount": amount} if amount is not None else {} - return await self._execute_browser_action("scroll_down", params) - elif action == "scroll_up": - params = {"amount": amount} if amount is not None else {} - return await self._execute_browser_action("scroll_up", params) - elif action == "scroll_to_text": - if not text: - return self.fail_response("Text is required for scroll_to_text") - return await self._execute_browser_action( - "scroll_to_text", {"text": text} - ) - # Dropdown actions - elif action == "get_dropdown_options": - if index is None: - return self.fail_response( - "Index is required for get_dropdown_options" - ) - return await self._execute_browser_action( - "get_dropdown_options", {"index": index} - ) - elif action == "select_dropdown_option": - if index is None or not text: - return self.fail_response( - "Index and text are required for select_dropdown_option" - ) - return await self._execute_browser_action( - "select_dropdown_option", {"index": index, "text": text} - ) - # Coordinate-based actions - elif action == "click_coordinates": - if x is None or y is None: - return self.fail_response( - "X and Y coordinates are required for click_coordinates" - ) - return await self._execute_browser_action( - "click_coordinates", {"x": x, "y": y} - ) - elif action == "drag_drop": - if not element_source or not element_target: - return self.fail_response( - "Source and target elements are required for drag_drop" - ) - return await self._execute_browser_action( - "drag_drop", - { - "element_source": element_source, - "element_target": element_target, - }, - ) - # Utility actions - elif action == "wait": - seconds_to_wait = seconds if seconds is not None else 3 - return await self._execute_browser_action( - "wait", {"seconds": seconds_to_wait} - ) - else: - return self.fail_response(f"Unknown action: {action}") - except Exception as e: - logger.error(f"Error executing browser action: {e}") - return self.fail_response(f"Error executing browser action: {e}") - - async def get_current_state( - self, message: Optional[ThreadMessage] = None - ) -> ToolResult: - """ - Get the current browser state as a ToolResult. - If context is not provided, uses self.context. - """ try: - # Use provided context or fall back to self.context - message = message or self.browser_message - if not message: - return ToolResult(error="Browser context not initialized") - state = message.content - screenshot = state.get("screenshot_base64") - # Build the state info with all required fields - state_info = { - "url": state.get("url", ""), - "title": state.get("title", ""), - "tabs": [tab.model_dump() for tab in state.get("tabs", [])], - "pixels_above": getattr(state, "pixels_above", 0), - "pixels_below": getattr(state, "pixels_below", 0), - "help": "[0], [1], [2], etc., represent clickable indices corresponding to the elements listed. Clicking on these indices will navigate to or interact with the respective content behind them.", + payload: Dict[str, Optional[Any]] = { + "url": url, + "index": index, + "text": text, + "amount": amount, + "page_id": page_id, + "keys": keys, + "seconds": seconds, + "x": x, + "y": y, + "element_source": element_source, + "element_target": element_target, } - - return ToolResult( - output=json.dumps(state_info, indent=4, ensure_ascii=False), - base64_image=screenshot, - ) - except Exception as e: - return ToolResult(error=f"Failed to get browser state: {str(e)}") - - @classmethod - def create_with_sandbox(cls, sandbox: Sandbox) -> "SandboxBrowserTool": - """Factory method to create a tool with sandbox.""" - return cls(sandbox=sandbox) + payload = {k: v for k, v in payload.items() if v is not None} + result = await self.browser_service.perform_action(action, payload) + if result.get("success"): + return self.success_response(result) + return self.fail_response(result.get("message", "Browser action failed")) + except NotImplementedError as exc: + return self.fail_response(str(exc)) + except Exception as exc: + return self.fail_response(f"Browser action failed: {exc}") + + async def get_current_state(self) -> ToolResult: + try: + state = await self.browser_service.current_state() + return self.success_response(state) + except NotImplementedError: + return self.fail_response("Browser state retrieval not supported") + except Exception as exc: + return self.fail_response(f"Failed to get browser state: {exc}") diff --git a/app/tool/sandbox/sb_computer_tool.py b/app/tool/sandbox/sb_computer_tool.py new file mode 100644 index 000000000..7847f5c3b --- /dev/null +++ b/app/tool/sandbox/sb_computer_tool.py @@ -0,0 +1,227 @@ +import asyncio +from typing import List, Optional + +from pydantic import Field + +from app.sandbox.providers import ComputerService +from app.tool.base import BaseTool, ToolResult +from app.tool.computer_constants import KEYBOARD_KEYS, MOUSE_BUTTONS + + +_COMPUTER_DESCRIPTION = """\ +Desktop automation tool powered by the sandbox provider agentbay. It's a Linux desktop. +Supports mouse movement, clicks, scrolling, keyboard input, hotkeys, and screenshots. +""" + + +class SandboxComputerTool(BaseTool): + """Provider-agnostic computer automation tool.""" + + name: str = "computer_use" + description: str = _COMPUTER_DESCRIPTION + parameters: dict = { + "type": "object", + "properties": { + "action": { + "type": "string", + "enum": [ + "move_to", + "click", + "scroll", + "typing", + "press", + "wait", + "mouse_down", + "mouse_up", + "drag_to", + "hotkey", + "screenshot", + ], + "description": "The computer action to perform", + }, + "x": {"type": "number", "description": "X coordinate for mouse actions"}, + "y": {"type": "number", "description": "Y coordinate for mouse actions"}, + "button": { + "type": "string", + "enum": MOUSE_BUTTONS, + "description": "Mouse button for click/drag actions", + "default": "left", + }, + "num_clicks": { + "type": "integer", + "description": "Number of clicks", + "enum": [1, 2, 3], + "default": 1, + }, + "amount": { + "type": "integer", + "description": "Scroll amount (positive for up, negative for down)", + "minimum": -10, + "maximum": 10, + }, + "text": {"type": "string", "description": "Text to type"}, + "key": { + "type": "string", + "enum": KEYBOARD_KEYS, + "description": "Key to press", + }, + "keys": { + "type": "string", + "enum": KEYBOARD_KEYS, + "description": "Key combination to press", + }, + "duration": { + "type": "number", + "description": "Duration in seconds to wait", + "default": 0.5, + }, + }, + "required": ["action"], + } + + computer_service: ComputerService = Field(exclude=True) + mouse_x: Optional[int] = Field(default=None, exclude=True) + mouse_y: Optional[int] = Field(default=None, exclude=True) + + def __init__(self, computer_service: ComputerService, **data): + super().__init__(computer_service=computer_service, **data) + + async def execute( + self, + action: str, + x: Optional[float] = None, + y: Optional[float] = None, + button: str = "left", + num_clicks: int = 1, + amount: Optional[int] = None, + text: Optional[str] = None, + key: Optional[str] = None, + keys: Optional[str] = None, + duration: float = 0.5, + **kwargs, + ) -> ToolResult: + try: + if action == "move_to": + if x is None or y is None: + return self.fail_response("x and y coordinates are required") + x_int, y_int = self._coerce_point(x, y) + await self.computer_service.move_mouse(x_int, y_int) + self._update_position(x_int, y_int) + return ToolResult(output=f"Moved to ({x_int}, {y_int})") + + if action == "click": + target_x, target_y = await self._resolve_target(x, y) + clicks = max(1, min(3, int(num_clicks))) + await self.computer_service.click_mouse( + target_x, target_y, button=button, count=clicks + ) + self._update_position(target_x, target_y) + return ToolResult( + output=f"{clicks} {button} click(s) performed at ({target_x}, {target_y})" + ) + + if action == "scroll": + if amount is None: + return self.fail_response("Scroll amount is required") + scroll_amount = max(-10, min(10, int(amount))) + current_x, current_y = await self._ensure_position() + await self.computer_service.scroll( + current_x, current_y, amount=scroll_amount + ) + direction = "up" if scroll_amount > 0 else "down" + steps = abs(scroll_amount) + return ToolResult( + output=f"Scrolled {direction} {steps} step(s) at ({current_x}, {current_y})" + ) + + if action == "typing": + if text is None: + return self.fail_response("Text is required for typing") + await self.computer_service.input_text(str(text)) + return ToolResult(output=f"Typed: {text}") + + if action == "press": + if key is None: + return self.fail_response("key is required for press action") + key_sequence = self._split_keys(str(key)) + await self.computer_service.press_keys(key_sequence, hold=False) + return ToolResult(output=f"Pressed keys: {'+'.join(key_sequence)}") + + if action == "hotkey": + if keys is None: + return self.fail_response("keys are required for hotkey action") + key_sequence = self._split_keys(str(keys)) + await self.computer_service.press_keys(key_sequence, hold=False) + return ToolResult( + output=f"Pressed key combination: {'+'.join(key_sequence)}" + ) + + if action == "wait": + safe_duration = max(0.0, min(10.0, float(duration))) + await asyncio.sleep(safe_duration) + return ToolResult(output=f"Waited {safe_duration} seconds") + + if action in {"mouse_down", "mouse_up"}: + return self.fail_response( + "Mouse down/up is not supported in the AgentBay computer API yet" + ) + + if action == "drag_to": + if x is None or y is None: + return self.fail_response("x and y coordinates are required") + start_x, start_y = await self._ensure_position() + target_x, target_y = self._coerce_point(x, y) + await self.computer_service.drag_mouse( + start_x, start_y, target_x, target_y, button=button + ) + self._update_position(target_x, target_y) + return ToolResult( + output=f"Dragged from ({start_x}, {start_y}) to ({target_x}, {target_y})" + ) + + if action == "screenshot": + data = await self.computer_service.screenshot() + message = "Screenshot captured" + if data.get("url"): + message += f": {data['url']}" + return ToolResult( + output=message, + base64_image=data.get("base64"), + ) + + return self.fail_response(f"Unknown action: {action}") + + except NotImplementedError as exc: + return self.fail_response(str(exc)) + except Exception as exc: # noqa: BLE001 + return self.fail_response(f"Computer action failed: {exc}") + + async def _resolve_target( + self, x: Optional[float], y: Optional[float] + ) -> tuple[int, int]: + if x is None or y is None: + return await self._ensure_position() + return self._coerce_point(x, y) + + async def _ensure_position(self) -> tuple[int, int]: + if self.mouse_x is not None and self.mouse_y is not None: + return self.mouse_x, self.mouse_y + try: + position = await self.computer_service.get_cursor_position() + except NotImplementedError: + position = {"x": 0, "y": 0} + self.mouse_x = int(position.get("x", 0)) + self.mouse_y = int(position.get("y", 0)) + return self.mouse_x, self.mouse_y + + def _update_position(self, x: int, y: int) -> None: + self.mouse_x = x + self.mouse_y = y + + @staticmethod + def _coerce_point(x: float, y: float) -> tuple[int, int]: + return int(round(float(x))), int(round(float(y))) + + @staticmethod + def _split_keys(key_spec: str) -> List[str]: + return [part.strip() for part in key_spec.split("+") if part.strip()] diff --git a/app/tool/sandbox/sb_files_tool.py b/app/tool/sandbox/sb_files_tool.py index be558b0cb..391df6bbc 100644 --- a/app/tool/sandbox/sb_files_tool.py +++ b/app/tool/sandbox/sb_files_tool.py @@ -1,31 +1,20 @@ -import asyncio -from typing import Optional, TypeVar +from typing import Optional from pydantic import Field -from app.daytona.tool_base import Sandbox, SandboxToolsBase -from app.tool.base import ToolResult -from app.utils.files_utils import clean_path, should_exclude_file -from app.utils.logger import logger +from app.sandbox.providers import FileService +from app.tool.base import BaseTool, ToolResult -Context = TypeVar("Context") - _FILES_DESCRIPTION = """\ -A sandbox-based file system tool that allows file operations in a secure sandboxed environment. -* This tool provides commands for creating, reading, updating, and deleting files in the workspace -* All operations are performed relative to the /workspace directory for security -* Use this when you need to manage files, edit code, or manipulate file contents in a sandbox -* Each action requires specific parameters as defined in the tool's dependencies -Key capabilities include: -* File creation: Create new files with specified content and permissions -* File modification: Replace specific strings or completely rewrite files -* File deletion: Remove files from the workspace -* File reading: Read file contents with optional line range specification +Perform basic file operations inside the sandbox environment. +Paths are interpreted relative to the provider's workspace unless absolute. """ -class SandboxFilesTool(SandboxToolsBase): +class SandboxFilesTool(BaseTool): + """Provider-agnostic file management tool.""" + name: str = "sandbox_files" description: str = _FILES_DESCRIPTION parameters: dict = { @@ -39,100 +28,19 @@ class SandboxFilesTool(SandboxToolsBase): "full_file_rewrite", "delete_file", ], - "description": "The file operation to perform", - }, - "file_path": { - "type": "string", - "description": "Path to the file, relative to /workspace (e.g., 'src/main.py')", - }, - "file_contents": { - "type": "string", - "description": "Content to write to the file", - }, - "old_str": { - "type": "string", - "description": "Text to be replaced (must appear exactly once)", - }, - "new_str": { - "type": "string", - "description": "Replacement text", - }, - "permissions": { - "type": "string", - "description": "File permissions in octal format (e.g., '644')", - "default": "644", }, + "file_path": {"type": "string"}, + "file_contents": {"type": "string"}, + "old_str": {"type": "string"}, + "new_str": {"type": "string"}, }, "required": ["action"], - "dependencies": { - "create_file": ["file_path", "file_contents"], - "str_replace": ["file_path", "old_str", "new_str"], - "full_file_rewrite": ["file_path", "file_contents"], - "delete_file": ["file_path"], - }, } - SNIPPET_LINES: int = Field(default=4, exclude=True) - # workspace_path: str = Field(default="/workspace", exclude=True) - # sandbox: Optional[Sandbox] = Field(default=None, exclude=True) - - def __init__( - self, sandbox: Optional[Sandbox] = None, thread_id: Optional[str] = None, **data - ): - """Initialize with optional sandbox and thread_id.""" - super().__init__(**data) - if sandbox is not None: - self._sandbox = sandbox - def clean_path(self, path: str) -> str: - """Clean and normalize a path to be relative to /workspace""" - return clean_path(path, self.workspace_path) + file_service: FileService = Field(exclude=True) - def _should_exclude_file(self, rel_path: str) -> bool: - """Check if a file should be excluded based on path, name, or extension""" - return should_exclude_file(rel_path) - - def _file_exists(self, path: str) -> bool: - """Check if a file exists in the sandbox""" - try: - self.sandbox.fs.get_file_info(path) - return True - except Exception: - return False - - async def get_workspace_state(self) -> dict: - """Get the current workspace state by reading all files""" - files_state = {} - try: - # Ensure sandbox is initialized - await self._ensure_sandbox() - - files = self.sandbox.fs.list_files(self.workspace_path) - for file_info in files: - rel_path = file_info.name - - # Skip excluded files and directories - if self._should_exclude_file(rel_path) or file_info.is_dir: - continue - - try: - full_path = f"{self.workspace_path}/{rel_path}" - content = self.sandbox.fs.download_file(full_path).decode() - files_state[rel_path] = { - "content": content, - "is_dir": file_info.is_dir, - "size": file_info.size, - "modified": file_info.mod_time, - } - except Exception as e: - print(f"Error reading file {rel_path}: {e}") - except UnicodeDecodeError: - print(f"Skipping binary file: {rel_path}") - - return files_state - - except Exception as e: - print(f"Error getting workspace state: {str(e)}") - return {} + def __init__(self, file_service: FileService, **data): + super().__init__(file_service=file_service, **data) async def execute( self, @@ -141,221 +49,69 @@ async def execute( file_contents: Optional[str] = None, old_str: Optional[str] = None, new_str: Optional[str] = None, - permissions: Optional[str] = "644", **kwargs, ) -> ToolResult: - """ - Execute a file operation in the sandbox environment. - Args: - action: The file operation to perform - file_path: Path to the file relative to /workspace - file_contents: Content to write to the file - old_str: Text to be replaced (for str_replace) - new_str: Replacement text (for str_replace) - permissions: File permissions in octal format - Returns: - ToolResult with the operation's output or error - """ - async with asyncio.Lock(): - try: - # File creation - if action == "create_file": - if not file_path or not file_contents: - return self.fail_response( - "file_path and file_contents are required for create_file" - ) - return await self._create_file( - file_path, file_contents, permissions + try: + if action == "create_file": + if not file_path or file_contents is None: + return self.fail_response( + "file_path and file_contents are required for create_file" ) - - # String replacement - elif action == "str_replace": - if not file_path or not old_str or not new_str: - return self.fail_response( - "file_path, old_str, and new_str are required for str_replace" - ) - return await self._str_replace(file_path, old_str, new_str) - - # Full file rewrite - elif action == "full_file_rewrite": - if not file_path or not file_contents: - return self.fail_response( - "file_path and file_contents are required for full_file_rewrite" - ) - return await self._full_file_rewrite( - file_path, file_contents, permissions + if await self.file_service.exists(file_path): + return self.fail_response( + f"File '{file_path}' already exists. Use full_file_rewrite instead." ) - - # File deletion - elif action == "delete_file": - if not file_path: - return self.fail_response( - "file_path is required for delete_file" - ) - return await self._delete_file(file_path) - - else: - return self.fail_response(f"Unknown action: {action}") - - except Exception as e: - logger.error(f"Error executing file action: {e}") - return self.fail_response(f"Error executing file action: {e}") - - async def _create_file( - self, file_path: str, file_contents: str, permissions: str = "644" - ) -> ToolResult: - """Create a new file with the provided contents""" - try: - # Ensure sandbox is initialized - await self._ensure_sandbox() - - file_path = self.clean_path(file_path) - full_path = f"{self.workspace_path}/{file_path}" - if self._file_exists(full_path): - return self.fail_response( - f"File '{file_path}' already exists. Use full_file_rewrite to modify existing files." + await self.file_service.write(file_path, file_contents, overwrite=True) + return self.success_response( + {"message": f"File '{file_path}' created successfully."} ) - # Create parent directories if needed - parent_dir = "/".join(full_path.split("/")[:-1]) - if parent_dir: - self.sandbox.fs.create_folder(parent_dir, "755") - - # Write the file content - self.sandbox.fs.upload_file(file_contents.encode(), full_path) - self.sandbox.fs.set_file_permissions(full_path, permissions) - - message = f"File '{file_path}' created successfully." - - # Check if index.html was created and add 8080 server info (only in root workspace) - if file_path.lower() == "index.html": - try: - website_link = self.sandbox.get_preview_link(8080) - website_url = ( - website_link.url - if hasattr(website_link, "url") - else str(website_link).split("url='")[1].split("'")[0] + if action == "str_replace": + if not file_path or old_str is None or new_str is None: + return self.fail_response( + "file_path, old_str and new_str are required for str_replace" ) - message += f"\n\n[Auto-detected index.html - HTTP server available at: {website_url}]" - message += "\n[Note: Use the provided HTTP server URL above instead of starting a new server]" - except Exception as e: - logger.warning( - f"Failed to get website URL for index.html: {str(e)}" + content = await self.file_service.read(file_path) + occurrences = content.count(old_str) + if occurrences == 0: + return self.fail_response( + f"String '{old_str}' not found in '{file_path}'" ) - - return self.success_response(message) - except Exception as e: - return self.fail_response(f"Error creating file: {str(e)}") - - async def _str_replace( - self, file_path: str, old_str: str, new_str: str - ) -> ToolResult: - """Replace specific text in a file""" - try: - # Ensure sandbox is initialized - await self._ensure_sandbox() - - file_path = self.clean_path(file_path) - full_path = f"{self.workspace_path}/{file_path}" - if not self._file_exists(full_path): - return self.fail_response(f"File '{file_path}' does not exist") - - content = self.sandbox.fs.download_file(full_path).decode() - old_str = old_str.expandtabs() - new_str = new_str.expandtabs() - - occurrences = content.count(old_str) - if occurrences == 0: - return self.fail_response(f"String '{old_str}' not found in file") - if occurrences > 1: - lines = [ - i + 1 - for i, line in enumerate(content.split("\n")) - if old_str in line - ] - return self.fail_response( - f"Multiple occurrences found in lines {lines}. Please ensure string is unique" - ) - - # Perform replacement - new_content = content.replace(old_str, new_str) - self.sandbox.fs.upload_file(new_content.encode(), full_path) - - # Show snippet around the edit - replacement_line = content.split(old_str)[0].count("\n") - start_line = max(0, replacement_line - self.SNIPPET_LINES) - end_line = replacement_line + self.SNIPPET_LINES + new_str.count("\n") - snippet = "\n".join(new_content.split("\n")[start_line : end_line + 1]) - - message = f"Replacement successful." - - return self.success_response(message) - - except Exception as e: - return self.fail_response(f"Error replacing string: {str(e)}") - - async def _full_file_rewrite( - self, file_path: str, file_contents: str, permissions: str = "644" - ) -> ToolResult: - """Completely rewrite an existing file with new content""" - try: - # Ensure sandbox is initialized - await self._ensure_sandbox() - - file_path = self.clean_path(file_path) - full_path = f"{self.workspace_path}/{file_path}" - if not self._file_exists(full_path): - return self.fail_response( - f"File '{file_path}' does not exist. Use create_file to create a new file." + if occurrences > 1: + return self.fail_response( + f"String '{old_str}' occurs multiple times; please make it unique." + ) + updated = content.replace(old_str, new_str) + await self.file_service.write(file_path, updated, overwrite=True) + return self.success_response( + {"message": f"Updated '{file_path}' successfully."} ) - self.sandbox.fs.upload_file(file_contents.encode(), full_path) - self.sandbox.fs.set_file_permissions(full_path, permissions) - - message = f"File '{file_path}' completely rewritten successfully." - - # Check if index.html was rewritten and add 8080 server info (only in root workspace) - if file_path.lower() == "index.html": - try: - website_link = self.sandbox.get_preview_link(8080) - website_url = ( - website_link.url - if hasattr(website_link, "url") - else str(website_link).split("url='")[1].split("'")[0] + if action == "full_file_rewrite": + if not file_path or file_contents is None: + return self.fail_response( + "file_path and file_contents are required for full_file_rewrite" ) - message += f"\n\n[Auto-detected index.html - HTTP server available at: {website_url}]" - message += "\n[Note: Use the provided HTTP server URL above instead of starting a new server]" - except Exception as e: - logger.warning( - f"Failed to get website URL for index.html: {str(e)}" + if not await self.file_service.exists(file_path): + return self.fail_response( + f"File '{file_path}' does not exist. Use create_file instead." ) + await self.file_service.write(file_path, file_contents, overwrite=True) + return self.success_response( + {"message": f"File '{file_path}' rewritten successfully."} + ) - return self.success_response(message) - except Exception as e: - return self.fail_response(f"Error rewriting file: {str(e)}") - - async def _delete_file(self, file_path: str) -> ToolResult: - """Delete a file at the given path""" - try: - # Ensure sandbox is initialized - await self._ensure_sandbox() - - file_path = self.clean_path(file_path) - full_path = f"{self.workspace_path}/{file_path}" - if not self._file_exists(full_path): - return self.fail_response(f"File '{file_path}' does not exist") - - self.sandbox.fs.delete_file(full_path) - return self.success_response(f"File '{file_path}' deleted successfully.") - except Exception as e: - return self.fail_response(f"Error deleting file: {str(e)}") + if action == "delete_file": + if not file_path: + return self.fail_response("file_path is required for delete_file") + if not await self.file_service.exists(file_path): + return self.fail_response(f"File '{file_path}' does not exist") + await self.file_service.delete(file_path) + return self.success_response( + {"message": f"File '{file_path}' deleted successfully."} + ) - async def cleanup(self): - """Clean up sandbox resources.""" + return self.fail_response(f"Unknown action: {action}") - @classmethod - def create_with_context(cls, context: Context) -> "SandboxFilesTool[Context]": - """Factory method to create a SandboxFilesTool with a specific context.""" - raise NotImplementedError( - "create_with_context not implemented for SandboxFilesTool" - ) + except Exception as exc: + return self.fail_response(f"File action failed: {exc}") diff --git a/app/tool/sandbox/sb_mobile_tool.py b/app/tool/sandbox/sb_mobile_tool.py new file mode 100644 index 000000000..88d5c741f --- /dev/null +++ b/app/tool/sandbox/sb_mobile_tool.py @@ -0,0 +1,172 @@ +import asyncio +from typing import Literal, Optional + +from pydantic import Field + +from app.sandbox.providers import MobileService +from app.tool.base import BaseTool, ToolResult + + +_MOBILE_DESCRIPTION = """\ +Mobile automation tool powered by the sandbox provider. +Supports tap, swipe, text input, key presses, UI inspection, and screenshots on remote devices. +""" + + +KEY_NAME_TO_CODE = { + "home": 3, + "back": 4, + "volume_up": 24, + "volume_down": 25, + "power": 26, + "menu": 82, +} + + +class SandboxMobileTool(BaseTool): + """Provider-agnostic mobile automation tool.""" + + name: str = "sandbox_mobile" + description: str = _MOBILE_DESCRIPTION + parameters: dict = { + "type": "object", + "properties": { + "action": { + "type": "string", + "enum": [ + "tap", + "swipe", + "input_text", + "send_key", + "screenshot", + "list_clickable", + "wait", + ], + "description": "Mobile action to perform", + }, + "x": {"type": "number", "description": "X coordinate"}, + "y": {"type": "number", "description": "Y coordinate"}, + "start_x": {"type": "number", "description": "Swipe start X"}, + "start_y": {"type": "number", "description": "Swipe start Y"}, + "end_x": {"type": "number", "description": "Swipe end X"}, + "end_y": {"type": "number", "description": "Swipe end Y"}, + "duration_ms": { + "type": "integer", + "description": "Swipe duration in milliseconds", + "default": 300, + }, + "text": {"type": "string", "description": "Text to input"}, + "key": { + "type": "string", + "enum": list(KEY_NAME_TO_CODE.keys()), + "description": "Key to press", + }, + "timeout_ms": { + "type": "integer", + "description": "Timeout for UI queries", + "default": 2000, + }, + "duration": { + "type": "number", + "description": "Seconds to wait", + "default": 0.5, + }, + }, + "required": ["action"], + } + + mobile_service: MobileService = Field(exclude=True) + + def __init__(self, mobile_service: MobileService, **data): + super().__init__(mobile_service=mobile_service, **data) + + async def execute( + self, + action: Literal[ + "tap", + "swipe", + "input_text", + "send_key", + "screenshot", + "list_clickable", + "wait", + ], + x: Optional[float] = None, + y: Optional[float] = None, + start_x: Optional[float] = None, + start_y: Optional[float] = None, + end_x: Optional[float] = None, + end_y: Optional[float] = None, + duration_ms: int = 300, + text: Optional[str] = None, + key: Optional[str] = None, + timeout_ms: int = 2000, + duration: float = 0.5, + **kwargs, + ) -> ToolResult: + try: + if action == "tap": + if x is None or y is None: + return self.fail_response( + "x and y coordinates are required for tap" + ) + xi, yi = int(round(float(x))), int(round(float(y))) + await self.mobile_service.tap(xi, yi) + return ToolResult(output=f"Tapped at ({xi}, {yi})") + + if action == "swipe": + required = [start_x, start_y, end_x, end_y] + if any(value is None for value in required): + return self.fail_response( + "start_x, start_y, end_x, end_y are required for swipe" + ) + sx, sy = int(round(float(start_x))), int(round(float(start_y))) + ex, ey = int(round(float(end_x))), int(round(float(end_y))) + duration_clamped = max(50, min(5000, int(duration_ms))) + await self.mobile_service.swipe(sx, sy, ex, ey, duration_clamped) + return ToolResult( + output=( + f"Swiped from ({sx}, {sy}) to ({ex}, {ey}) in {duration_clamped} ms" + ) + ) + + if action == "input_text": + if text is None: + return self.fail_response("text is required for input_text") + await self.mobile_service.input_text(text) + return ToolResult(output=f"Typed: {text}") + + if action == "send_key": + if not key: + return self.fail_response("key is required for send_key") + key_code = KEY_NAME_TO_CODE.get(key.lower()) + if key_code is None: + return self.fail_response(f"Unsupported key: {key}") + await self.mobile_service.send_key(key_code) + return ToolResult(output=f"Sent key: {key.lower()}") + + if action == "screenshot": + data = await self.mobile_service.screenshot() + message = "Mobile screenshot captured" + if data.get("url"): + message += f": {data['url']}" + return ToolResult(output=message, base64_image=data.get("base64")) + + if action == "list_clickable": + timeout_clamped = max(0, int(timeout_ms)) + data = await self.mobile_service.get_clickable_ui_elements( + timeout_clamped + ) + return self.success_response(data) + + if action == "wait": + safe_duration = max(0.0, min(30.0, float(duration))) + await asyncio.sleep(safe_duration) + return ToolResult(output=f"Waited {safe_duration} seconds") + + return self.fail_response(f"Unknown action: {action}") + + except NotImplementedError as exc: + return self.fail_response(str(exc)) + except Exception as exc: # noqa: BLE001 + return self.fail_response(f"Mobile action failed: {exc}") diff --git a/app/tool/sandbox/sb_shell_tool.py b/app/tool/sandbox/sb_shell_tool.py index 8a45244d0..638fe66cf 100644 --- a/app/tool/sandbox/sb_shell_tool.py +++ b/app/tool/sandbox/sb_shell_tool.py @@ -1,27 +1,19 @@ -import asyncio -import time -from typing import Any, Dict, Optional, TypeVar -from uuid import uuid4 +from typing import Optional -from app.daytona.tool_base import Sandbox, SandboxToolsBase -from app.tool.base import ToolResult -from app.utils.logger import logger +from pydantic import Field + +from app.sandbox.providers import ShellCommandResult, ShellService +from app.tool.base import BaseTool, ToolResult -Context = TypeVar("Context") _SHELL_DESCRIPTION = """\ -Execute a shell command in the workspace directory. -IMPORTANT: Commands are non-blocking by default and run in a tmux session. -This is ideal for long-running operations like starting servers or build processes. -Uses sessions to maintain state between commands. -This tool is essential for running CLI tools, installing packages, and managing system operations. +Execute shell commands inside the sandbox environment. +Supports blocking and non-blocking execution depending on provider capability. """ -class SandboxShellTool(SandboxToolsBase): - """Tool for executing tasks in a Daytona sandbox with browser-use capabilities. - Uses sessions for maintaining state between commands and provides comprehensive process management. - """ +class SandboxShellTool(BaseTool): + """Provider-agnostic shell execution tool.""" name: str = "sandbox_shell" description: str = _SHELL_DESCRIPTION @@ -36,384 +28,96 @@ class SandboxShellTool(SandboxToolsBase): "terminate_command", "list_commands", ], - "description": "The shell action to perform", + "description": "Shell action to perform", }, "command": { "type": "string", - "description": "The shell command to execute. Use this for running CLI tools, installing packages, " - "or system operations. Commands can be chained using &&, ||, and | operators.", + "description": "Command to execute when using execute_command", }, "folder": { "type": "string", - "description": "Optional relative path to a subdirectory of /workspace where the command should be " - "executed. Example: 'data/pdfs'", + "description": "Optional working directory relative to sandbox root", }, "session_name": { "type": "string", - "description": "Optional name of the tmux session to use. Use named sessions for related commands " - "that need to maintain state. Defaults to a random session name.", + "description": "Existing session name for long-running commands", }, "blocking": { "type": "boolean", - "description": "Whether to wait for the command to complete. Defaults to false for non-blocking " - "execution.", + "description": "Wait for command completion when true", "default": False, }, "timeout": { "type": "integer", - "description": "Optional timeout in seconds for blocking commands. Defaults to 60. Ignored for " - "non-blocking commands.", + "description": "Timeout in seconds for blocking commands", "default": 60, }, - "kill_session": { - "type": "boolean", - "description": "Whether to terminate the tmux session after checking. Set to true when you're done " - "with the command.", - "default": False, - }, }, "required": ["action"], - "dependencies": { - "execute_command": ["command"], - "check_command_output": ["session_name"], - "terminate_command": ["session_name"], - "list_commands": [], - }, } - def __init__( - self, sandbox: Optional[Sandbox] = None, thread_id: Optional[str] = None, **data - ): - """Initialize with optional sandbox and thread_id.""" - super().__init__(**data) - if sandbox is not None: - self._sandbox = sandbox - - async def _ensure_session(self, session_name: str = "default") -> str: - """Ensure a session exists and return its ID.""" - if session_name not in self._sessions: - session_id = str(uuid4()) - try: - await self._ensure_sandbox() # Ensure sandbox is initialized - self.sandbox.process.create_session(session_id) - self._sessions[session_name] = session_id - except Exception as e: - raise RuntimeError(f"Failed to create session: {str(e)}") - return self._sessions[session_name] - - async def _cleanup_session(self, session_name: str): - """Clean up a session if it exists.""" - if session_name in self._sessions: - try: - await self._ensure_sandbox() # Ensure sandbox is initialized - self.sandbox.process.delete_session(self._sessions[session_name]) - del self._sessions[session_name] - except Exception as e: - print(f"Warning: Failed to cleanup session {session_name}: {str(e)}") - - async def _execute_raw_command(self, command: str) -> Dict[str, Any]: - """Execute a raw command directly in the sandbox.""" - # Ensure session exists for raw commands - session_id = await self._ensure_session("raw_commands") - - # Execute command in session - from app.daytona.sandbox import SessionExecuteRequest + shell_service: ShellService = Field(exclude=True) - req = SessionExecuteRequest( - command=command, run_async=False, cwd=self.workspace_path - ) + def __init__(self, shell_service: ShellService, **data): + super().__init__(shell_service=shell_service, **data) - response = self.sandbox.process.execute_session_command( - session_id=session_id, - req=req, - timeout=30, # Short timeout for utility commands - ) - - logs = self.sandbox.process.get_session_command_logs( - session_id=session_id, command_id=response.cmd_id - ) - - return {"output": logs, "exit_code": response.exit_code} - - async def _execute_command( + async def execute( self, - command: str, + action: str, + command: Optional[str] = None, folder: Optional[str] = None, session_name: Optional[str] = None, blocking: bool = False, timeout: int = 60, ) -> ToolResult: try: - # Ensure sandbox is initialized - await self._ensure_sandbox() - - # Set up working directory - cwd = self.workspace_path - if folder: - folder = folder.strip("/") - cwd = f"{self.workspace_path}/{folder}" - - # Generate a session name if not provided - if not session_name: - session_name = f"session_{str(uuid4())[:8]}" - - # Check if tmux session already exists - check_session = await self._execute_raw_command( - f"tmux has-session -t {session_name} 2>/dev/null || echo 'not_exists'" - ) - session_exists = "not_exists" not in check_session.get("output", "") - - if not session_exists: - # Create a new tmux session - await self._execute_raw_command( - f"tmux new-session -d -s {session_name}" - ) - - # Ensure we're in the correct directory and send command to tmux - full_command = f"cd {cwd} && {command}" - wrapped_command = full_command.replace('"', '\\"') # Escape double quotes - - # Send command to tmux session - await self._execute_raw_command( - f'tmux send-keys -t {session_name} "{wrapped_command}" Enter' - ) - - if blocking: - # For blocking execution, wait and capture output - start_time = time.time() - while (time.time() - start_time) < timeout: - # Wait a bit before checking - time.sleep(2) - - # Check if session still exists (command might have exited) - check_result = await self._execute_raw_command( - f"tmux has-session -t {session_name} 2>/dev/null || echo 'ended'" - ) - if "ended" in check_result.get("output", ""): - break - - # Get current output and check for common completion indicators - output_result = await self._execute_raw_command( - f"tmux capture-pane -t {session_name} -p -S - -E -" - ) - current_output = output_result.get("output", "") - - # Check for prompt indicators that suggest command completion - last_lines = current_output.split("\n")[-3:] - completion_indicators = [ - "$", - "#", - ">", - "Done", - "Completed", - "Finished", - "✓", - ] - if any( - indicator in line - for indicator in completion_indicators - for line in last_lines - ): - break - - # Capture final output - output_result = await self._execute_raw_command( - f"tmux capture-pane -t {session_name} -p -S - -E -" + if action == "execute_command": + if not command: + return self.fail_response("command is required for execute_command") + result = await self.shell_service.execute( + command, + cwd=folder, + timeout=timeout, + blocking=blocking, + session=session_name, ) - final_output = output_result.get("output", "") - - # Kill the session after capture - await self._execute_raw_command(f"tmux kill-session -t {session_name}") - - return self.success_response( - { - "output": final_output, - "session_name": session_name, - "cwd": cwd, - "completed": True, - } - ) - else: - # For non-blocking, just return immediately + return self._convert_result(result) + + if action == "check_command_output": + if not session_name: + return self.fail_response("session_name is required for check") + result = await self.shell_service.check(session_name) + return self._convert_result(result) + + if action == "terminate_command": + if not session_name: + return self.fail_response("session_name is required for terminate") + result = await self.shell_service.terminate(session_name) + return self._convert_result(result) + + if action == "list_commands": + sessions = list(await self.shell_service.list_sessions()) return self.success_response( - { - "session_name": session_name, - "cwd": cwd, - "message": f"Command sent to tmux session '{session_name}'. Use check_command_output to view results.", - "completed": False, - } + {"sessions": sessions, "count": len(sessions)} ) - except Exception as e: - # Attempt to clean up session in case of error - if session_name: - try: - await self._execute_raw_command( - f"tmux kill-session -t {session_name}" - ) - except: - pass - return self.fail_response(f"Error executing command: {str(e)}") - - async def _check_command_output( - self, session_name: str, kill_session: bool = False - ) -> ToolResult: - try: - # Ensure sandbox is initialized - await self._ensure_sandbox() - - # Check if session exists - check_result = await self._execute_raw_command( - f"tmux has-session -t {session_name} 2>/dev/null || echo 'not_exists'" - ) - if "not_exists" in check_result.get("output", ""): - return self.fail_response( - f"Tmux session '{session_name}' does not exist." - ) - - # Get output from tmux pane - output_result = await self._execute_raw_command( - f"tmux capture-pane -t {session_name} -p -S - -E -" - ) - output = output_result.get("output", "") - - # Kill session if requested - if kill_session: - await self._execute_raw_command(f"tmux kill-session -t {session_name}") - termination_status = "Session terminated." - else: - termination_status = "Session still running." - - return self.success_response( - { - "output": output, - "session_name": session_name, - "status": termination_status, - } - ) - - except Exception as e: - return self.fail_response(f"Error checking command output: {str(e)}") - - async def _terminate_command(self, session_name: str) -> ToolResult: - try: - # Ensure sandbox is initialized - await self._ensure_sandbox() - - # Check if session exists - check_result = await self._execute_raw_command( - f"tmux has-session -t {session_name} 2>/dev/null || echo 'not_exists'" - ) - if "not_exists" in check_result.get("output", ""): - return self.fail_response( - f"Tmux session '{session_name}' does not exist." - ) - - # Kill the session - await self._execute_raw_command(f"tmux kill-session -t {session_name}") - - return self.success_response( - {"message": f"Tmux session '{session_name}' terminated successfully."} - ) - - except Exception as e: - return self.fail_response(f"Error terminating command: {str(e)}") - - async def _list_commands(self) -> ToolResult: - try: - # Ensure sandbox is initialized - await self._ensure_sandbox() - - # List all tmux sessions - result = await self._execute_raw_command( - "tmux list-sessions 2>/dev/null || echo 'No sessions'" - ) - output = result.get("output", "") - - if "No sessions" in output or not output.strip(): - return self.success_response( - {"message": "No active tmux sessions found.", "sessions": []} - ) - - # Parse session list - sessions = [] - for line in output.split("\n"): - if line.strip(): - parts = line.split(":") - if parts: - session_name = parts[0].strip() - sessions.append(session_name) - - return self.success_response( - { - "message": f"Found {len(sessions)} active sessions.", - "sessions": sessions, - } - ) - - except Exception as e: - return self.fail_response(f"Error listing commands: {str(e)}") - - async def execute( - self, - action: str, - command: str, - folder: Optional[str] = None, - session_name: Optional[str] = None, - blocking: bool = False, - timeout: int = 60, - kill_session: bool = False, - ) -> ToolResult: - """ - Execute a browser action in the sandbox environment. - Args: - timeout: - blocking: - session_name: - folder: - command: - kill_session: - action: The browser action to perform - Returns: - ToolResult with the action's output or error - """ - async with asyncio.Lock(): - try: - # Navigation actions - if action == "execute_command": - if not command: - return self.fail_response("command is required for navigation") - return await self._execute_command( - command, folder, session_name, blocking, timeout - ) - elif action == "check_command_output": - if session_name is None: - return self.fail_response( - "session_name is required for navigation" - ) - return await self._check_command_output(session_name, kill_session) - elif action == "terminate_command": - if session_name is None: - return self.fail_response( - "session_name is required for click_element" - ) - return await self._terminate_command(session_name) - elif action == "list_commands": - return await self._list_commands() - else: - return self.fail_response(f"Unknown action: {action}") - except Exception as e: - logger.error(f"Error executing shell action: {e}") - return self.fail_response(f"Error executing shell action: {e}") - - async def cleanup(self): - """Clean up all sessions.""" - for session_name in list(self._sessions.keys()): - await self._cleanup_session(session_name) - - # Also clean up any tmux sessions - try: - await self._ensure_sandbox() - await self._execute_raw_command("tmux kill-server 2>/dev/null || true") - except Exception as e: - logger.error(f"Error shell box cleanup action: {e}") + return self.fail_response(f"Unknown action: {action}") + + except NotImplementedError as exc: + return self.fail_response(str(exc)) + except Exception as exc: + return self.fail_response(f"Shell action failed: {exc}") + + def _convert_result(self, result: ShellCommandResult) -> ToolResult: + if not result.success: + return self.fail_response(result.error or "Shell command failed") + + payload = { + "output": result.output or "", + "session_name": result.session_name, + "completed": result.completed, + "metadata": result.metadata, + } + # Remove None fields for cleaner JSON + payload = {k: v for k, v in payload.items() if v is not None} + return self.success_response(payload) diff --git a/app/tool/sandbox/sb_vision_tool.py b/app/tool/sandbox/sb_vision_tool.py index ffe847d48..020463a0d 100644 --- a/app/tool/sandbox/sb_vision_tool.py +++ b/app/tool/sandbox/sb_vision_tool.py @@ -1,35 +1,19 @@ -import base64 -import mimetypes -import os -from io import BytesIO from typing import Optional -from PIL import Image from pydantic import Field -from app.daytona.tool_base import Sandbox, SandboxToolsBase, ThreadMessage -from app.tool.base import ToolResult +from app.sandbox.providers import VisionService +from app.tool.base import BaseTool, ToolResult -# 最大文件大小(原图10MB,压缩后5MB) -MAX_IMAGE_SIZE = 10 * 1024 * 1024 -MAX_COMPRESSED_SIZE = 5 * 1024 * 1024 - -# 压缩设置 -DEFAULT_MAX_WIDTH = 1920 -DEFAULT_MAX_HEIGHT = 1080 -DEFAULT_JPEG_QUALITY = 85 -DEFAULT_PNG_COMPRESS_LEVEL = 6 - -_VISION_DESCRIPTION = """ -A sandbox-based vision tool that allows the agent to read image files inside the sandbox using the see_image action. -* Only the see_image action is supported, with the parameter being the relative path of the image under /workspace. -* The image will be compressed and converted to base64 for use in subsequent context. -* Supported formats: JPG, PNG, GIF, WEBP. Maximum size: 10MB. +_VISION_DESCRIPTION = """\ +Read and encode image files from the sandbox environment as base64 strings. """ -class SandboxVisionTool(SandboxToolsBase): +class SandboxVisionTool(BaseTool): + """Provider-agnostic vision tool.""" + name: str = "sandbox_vision" description: str = _VISION_DESCRIPTION parameters: dict = { @@ -38,141 +22,35 @@ class SandboxVisionTool(SandboxToolsBase): "action": { "type": "string", "enum": ["see_image"], - "description": "要执行的视觉动作,目前仅支持 see_image", }, "file_path": { "type": "string", - "description": "图片在 /workspace 下的相对路径,如 'screenshots/image.png'", + "description": "Image path inside sandbox", }, }, "required": ["action", "file_path"], - "dependencies": {"see_image": ["file_path"]}, } - # def __init__(self, project_id: str, thread_id: str, thread_manager: ThreadManager): - # super().__init__(project_id=project_id, thread_manager=thread_manager) - # self.thread_id = thread_id - # self.thread_manager = thread_manager - - vision_message: Optional[ThreadMessage] = Field(default=None, exclude=True) + vision_service: VisionService = Field(exclude=True) - def __init__( - self, sandbox: Optional[Sandbox] = None, thread_id: Optional[str] = None, **data - ): - """Initialize with optional sandbox and thread_id.""" - super().__init__(**data) - if sandbox is not None: - self._sandbox = sandbox - - def compress_image(self, image_bytes: bytes, mime_type: str, file_path: str): - """压缩图片,保持合理质量。""" - try: - img = Image.open(BytesIO(image_bytes)) - if img.mode in ("RGBA", "LA", "P"): - background = Image.new("RGB", img.size, (255, 255, 255)) - if img.mode == "P": - img = img.convert("RGBA") - background.paste( - img, mask=img.split()[-1] if img.mode == "RGBA" else None - ) - img = background - width, height = img.size - if width > DEFAULT_MAX_WIDTH or height > DEFAULT_MAX_HEIGHT: - ratio = min(DEFAULT_MAX_WIDTH / width, DEFAULT_MAX_HEIGHT / height) - new_width = int(width * ratio) - new_height = int(height * ratio) - img = img.resize((new_width, new_height), Image.Resampling.LANCZOS) - output = BytesIO() - if mime_type == "image/gif": - img.save(output, format="GIF", optimize=True) - output_mime = "image/gif" - elif mime_type == "image/png": - img.save( - output, - format="PNG", - optimize=True, - compress_level=DEFAULT_PNG_COMPRESS_LEVEL, - ) - output_mime = "image/png" - else: - img.save( - output, format="JPEG", quality=DEFAULT_JPEG_QUALITY, optimize=True - ) - output_mime = "image/jpeg" - compressed_bytes = output.getvalue() - return compressed_bytes, output_mime - except Exception: - return image_bytes, mime_type + def __init__(self, vision_service: VisionService, **data): + super().__init__(vision_service=vision_service, **data) async def execute( - self, action: str, file_path: Optional[str] = None, **kwargs + self, + action: str, + file_path: Optional[str] = None, + **kwargs, ) -> ToolResult: - """ - 执行视觉动作,目前仅支持 see_image。 - 参数: - action: 必须为 'see_image' - file_path: 图片相对路径 - """ if action != "see_image": - return self.fail_response(f"未知的视觉动作: {action}") + return self.fail_response(f"Unsupported vision action: {action}") if not file_path: - return self.fail_response("file_path 参数不能为空") + return self.fail_response("file_path is required") try: - await self._ensure_sandbox() - cleaned_path = self.clean_path(file_path) - full_path = f"{self.workspace_path}/{cleaned_path}" - try: - file_info = self.sandbox.fs.get_file_info(full_path) - if file_info.is_dir: - return self.fail_response(f"路径 '{cleaned_path}' 是目录,不是图片文件。") - except Exception: - return self.fail_response(f"图片文件未找到: '{cleaned_path}'") - if file_info.size > MAX_IMAGE_SIZE: - return self.fail_response( - f"图片文件 '{cleaned_path}' 过大 ({file_info.size / (1024*1024):.2f}MB),最大允许 {MAX_IMAGE_SIZE / (1024*1024)}MB。" - ) - try: - image_bytes = self.sandbox.fs.download_file(full_path) - except Exception: - return self.fail_response(f"无法读取图片文件: {cleaned_path}") - mime_type, _ = mimetypes.guess_type(full_path) - if not mime_type or not mime_type.startswith("image/"): - ext = os.path.splitext(cleaned_path)[1].lower() - if ext == ".jpg" or ext == ".jpeg": - mime_type = "image/jpeg" - elif ext == ".png": - mime_type = "image/png" - elif ext == ".gif": - mime_type = "image/gif" - elif ext == ".webp": - mime_type = "image/webp" - else: - return self.fail_response( - f"不支持或未知的图片格式: '{cleaned_path}'。支持: JPG, PNG, GIF, WEBP。" - ) - compressed_bytes, compressed_mime_type = self.compress_image( - image_bytes, mime_type, cleaned_path - ) - if len(compressed_bytes) > MAX_COMPRESSED_SIZE: - return self.fail_response( - f"图片文件 '{cleaned_path}' 压缩后仍过大 ({len(compressed_bytes) / (1024*1024):.2f}MB),最大允许 {MAX_COMPRESSED_SIZE / (1024*1024)}MB。" - ) - base64_image = base64.b64encode(compressed_bytes).decode("utf-8") - image_context_data = { - "mime_type": compressed_mime_type, - "base64": base64_image, - "file_path": cleaned_path, - "original_size": file_info.size, - "compressed_size": len(compressed_bytes), - } - message = ThreadMessage( - type="image_context", content=image_context_data, is_llm_message=False - ) - self.vision_message = message - # return self.success_response(f"成功加载并压缩图片 '{cleaned_path}' (由 {file_info.size / 1024:.1f}KB 压缩到 {len(compressed_bytes) / 1024:.1f}KB)。") + data = await self.vision_service.read_image(file_path) return ToolResult( - output=f"成功加载并压缩图片 '{cleaned_path}'", - base64_image=base64_image, + output=f"Loaded image '{file_path}' successfully.", + base64_image=data.get("base64"), ) - except Exception as e: - return self.fail_response(f"see_image 执行异常: {str(e)}") + except Exception as exc: + return self.fail_response(f"see_image failed: {exc}") diff --git a/config/config.example-agentbay.toml b/config/config.example-agentbay.toml new file mode 100644 index 000000000..fe992865f --- /dev/null +++ b/config/config.example-agentbay.toml @@ -0,0 +1,24 @@ +# Minimal configuration example for running OpenManus with the AgentBay sandbox provider. + +[llm] +model = "gpt-4o-mini" # Replace with your preferred model +base_url = "https://api.openai.com/v1" # Replace with the endpoint of your LLM vendor +api_key = "YOUR_LLM_API_KEY" # Insert the corresponding API key +max_tokens = 4096 +temperature = 0.0 + +[sandbox] +provider = "agentbay" # Use AgentBay as the sandbox backend +use_sandbox = true + +[sandbox.agentbay] +api_key = "YOUR_AGENTBAY_API_KEY" # Create via https://help.aliyun.com/zh/agentbay/user-guide/service-management +endpoint = "wuyingai.cn-shanghai.aliyuncs.com" # Default endpoint; change if your account uses a different region +timeout_ms = 60000 +desktop_image_id = "linux_latest" # AgentBay desktop image with GUI capabilities +browser_image_id = "browser_latest" # Optional dedicated browser automation image +mobile_image_id = "mobile_latest" # Optional Android automation image + +[sandbox.agentbay.session_defaults] +image_id = "linux_latest" # Fallback image if role-specific IDs are not provided +is_vpc = false # Set true only if your AgentBay account requires VPC sessions diff --git a/config/config.example-daytona.toml b/config/config.example-daytona.toml index 10975df2e..0bb3e3621 100644 --- a/config/config.example-daytona.toml +++ b/config/config.example-daytona.toml @@ -88,6 +88,8 @@ temperature = 0.0 # Controls randomness for vision mode ## Sandbox configuration #[sandbox] #use_sandbox = false +## available providers: "daytona", "agentbay" +#provider = "daytona" #image = "python:3.12-slim" #work_dir = "/workspace" #memory_limit = "1g" # 512m @@ -95,6 +97,15 @@ temperature = 0.0 # Controls randomness for vision mode #timeout = 300 #network_enabled = true +#[sandbox.agentbay] +#api_key = "" # fallback to AGENTBAY_API_KEY env when empty +#endpoint = "wuyingai.cn-shanghai.aliyuncs.com" +#timeout_ms = 60000 +#env_file = "" +# [sandbox.agentbay.session_defaults] +# image_id = "" +# is_vpc = false + # Daytona configuration [daytona] daytona_api_key = "" diff --git a/requirements.txt b/requirements.txt index aa7e6dc93..87d166e37 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ pydantic~=2.10.6 +wuying-agentbay-sdk~=0.9.3 openai~=1.66.3 tenacity~=9.0.0 pyyaml~=6.0.2 @@ -10,7 +11,7 @@ tiktoken~=0.9.0 html2text~=2024.2.26 gymnasium~=1.1.1 -pillow~=11.1.0 +pillow~=10.4.0 browsergym~=0.13.3 uvicorn~=0.34.0 unidiff~=0.7.5