diff --git a/app/agent/manus.py b/app/agent/manus.py index df40edbba..32e6ca282 100644 --- a/app/agent/manus.py +++ b/app/agent/manus.py @@ -85,6 +85,15 @@ async def initialize_mcp_servers(self) -> None: logger.info( f"Connected to MCP server {server_id} using command {server_config.command}" ) + elif server_config.type == "streamableHttp": + if server_config.url: + await self.connect_mcp_server( + server_config.url, + server_id, + use_http_stream=True, + headers=server_config.headers + ) + logger.info(f"Connected to streamableHttp server {server_id} at {server_config.url}") except Exception as e: logger.error(f"Failed to connect to MCP server {server_id}: {e}") @@ -93,7 +102,9 @@ async def connect_mcp_server( server_url: str, server_id: str = "", use_stdio: bool = False, + use_http_stream: bool = False, stdio_args: List[str] = None, + headers: dict = None, ) -> None: """Connect to an MCP server and add its tools.""" if use_stdio: @@ -101,6 +112,11 @@ async def connect_mcp_server( server_url, stdio_args or [], server_id ) self.connected_servers[server_id or server_url] = server_url + elif use_http_stream: # streamable http + await self.mcp_clients.connect_streamable_http( + server_url=server_url, server_id=server_id, headers=headers + ) + self.connected_servers[server_id or server_url] = server_url else: await self.mcp_clients.connect_sse(server_url, server_id) self.connected_servers[server_id or server_url] = server_url diff --git a/app/config.py b/app/config.py index a881e2a5e..5a8a82268 100644 --- a/app/config.py +++ b/app/config.py @@ -133,6 +133,7 @@ class MCPServerConfig(BaseModel): args: List[str] = Field( default_factory=list, description="Arguments for stdio command" ) + headers: Dict[str, str] = Field(default_factory=dict, description="HTTP headers") class MCPSettings(BaseModel): @@ -164,6 +165,7 @@ def load_server_config(cls) -> Dict[str, MCPServerConfig]: type=server_config["type"], url=server_config.get("url"), command=server_config.get("command"), + headers=server_config.get("headers", {}), args=server_config.get("args", []), ) return servers diff --git a/app/tool/mcp.py b/app/tool/mcp.py index 32fa8249e..641adf8d9 100644 --- a/app/tool/mcp.py +++ b/app/tool/mcp.py @@ -9,7 +9,7 @@ from app.logger import logger from app.tool.base import BaseTool, ToolResult from app.tool.tool_collection import ToolCollection - +from mcp.client.streamable_http import streamablehttp_client class MCPClientTool(BaseTool): """Represents a tool proxy that can be called on the MCP server from the client side.""" @@ -94,6 +94,55 @@ async def connect_stdio( await self._initialize_and_list_tools(server_id) + async def connect_streamable_http( + self, + server_url: str, + server_id: str = "", + headers: Optional[Dict[str, str]] = None, + ) -> None: + """Connect to a streamableHttp type MCP server (based on HTTP streaming)""" + if not server_url: + raise ValueError("Server URL is required for streamableHttp connection") + + server_id = server_id or server_url # Use URL as default server_id + + # Disconnect existing connection to avoid conflicts + if server_id in self.sessions: + await self.disconnect(server_id) + + exit_stack = AsyncExitStack() + self.exit_stacks[server_id] = exit_stack + + try: + # Initialize HTTP streaming connection (with custom headers and timeout) + streams_context = streamablehttp_client( + url=server_url, + headers=headers or {} + ) + # Enter context manager to establish streaming connection + streams = await exit_stack.enter_async_context(streams_context) + read_stream, write_stream, get_session_id = streams + if read_stream is None or write_stream is None: + raise ValueError("Invalid streams returned from streamablehttp_client") + + session = await exit_stack.enter_async_context( + ClientSession( + read_stream, + write_stream, + ) + ) + self.sessions[server_id] = session + + # Reuse existing logic to initialize tool list + await self._initialize_and_list_tools(server_id) + logger.info(f"Connected to streamableHttp server {server_id}") + + except Exception as e: + # Clean up resources if connection fails + await exit_stack.aclose() + self.exit_stacks.pop(server_id, None) + raise RuntimeError(f"Failed to connect to streamableHttp server: {str(e)}") + async def _initialize_and_list_tools(self, server_id: str) -> None: """Initialize session and populate tool map.""" session = self.sessions.get(server_id) diff --git a/config/mcp.example.json b/config/mcp.example.json index 194642528..276ed61b5 100644 --- a/config/mcp.example.json +++ b/config/mcp.example.json @@ -3,6 +3,13 @@ "server1": { "type": "sse", "url": "http://localhost:8000/sse" + }, + "server2": { + "type": "streamableHttp", + "url": "https://your_streamable_http_mcp_server/mcp", + "headers": { + "header": "content-type: application/json" + } } } -} +} \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index aa7e6dc93..8cc53140b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -28,7 +28,7 @@ docker~=7.1.0 pytest~=8.3.5 pytest-asyncio~=0.25.3 -mcp~=1.5.0 +mcp~=1.9.0 httpx>=0.27.0 tomli>=2.0.0 @@ -36,7 +36,10 @@ boto3~=1.37.18 requests~=2.32.3 beautifulsoup4~=4.13.3 -crawl4ai~=0.6.3 +crawl4ai>=0.6.3 huggingface-hub~=0.29.2 setuptools~=75.8.0 + +structlog +daytona \ No newline at end of file