Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions app/agent/manus.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ async def initialize_mcp_servers(self) -> None:
logger.info(
f"Connected to MCP server {server_id} using command {server_config.command}"
)
elif server_config.type == "streamableHttp":
if server_config.url:
await self.connect_mcp_server(
server_config.url,
server_id,
use_http_stream=True,
headers=server_config.headers
)
logger.info(f"Connected to streamableHttp server {server_id} at {server_config.url}")
except Exception as e:
logger.error(f"Failed to connect to MCP server {server_id}: {e}")

Expand All @@ -93,14 +102,21 @@ async def connect_mcp_server(
server_url: str,
server_id: str = "",
use_stdio: bool = False,
use_http_stream: bool = False,
stdio_args: List[str] = None,
headers: dict = None,
) -> None:
"""Connect to an MCP server and add its tools."""
if use_stdio:
await self.mcp_clients.connect_stdio(
server_url, stdio_args or [], server_id
)
self.connected_servers[server_id or server_url] = server_url
elif use_http_stream: # streamable http
await self.mcp_clients.connect_streamable_http(
server_url=server_url, server_id=server_id, headers=headers
)
self.connected_servers[server_id or server_url] = server_url
else:
await self.mcp_clients.connect_sse(server_url, server_id)
self.connected_servers[server_id or server_url] = server_url
Expand Down
2 changes: 2 additions & 0 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ class MCPServerConfig(BaseModel):
args: List[str] = Field(
default_factory=list, description="Arguments for stdio command"
)
headers: Dict[str, str] = Field(default_factory=dict, description="HTTP headers")


class MCPSettings(BaseModel):
Expand Down Expand Up @@ -164,6 +165,7 @@ def load_server_config(cls) -> Dict[str, MCPServerConfig]:
type=server_config["type"],
url=server_config.get("url"),
command=server_config.get("command"),
headers=server_config.get("headers", {}),
args=server_config.get("args", []),
)
return servers
Expand Down
51 changes: 50 additions & 1 deletion app/tool/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from app.logger import logger
from app.tool.base import BaseTool, ToolResult
from app.tool.tool_collection import ToolCollection

from mcp.client.streamable_http import streamablehttp_client

class MCPClientTool(BaseTool):
"""Represents a tool proxy that can be called on the MCP server from the client side."""
Expand Down Expand Up @@ -94,6 +94,55 @@ async def connect_stdio(

await self._initialize_and_list_tools(server_id)

async def connect_streamable_http(
self,
server_url: str,
server_id: str = "",
headers: Optional[Dict[str, str]] = None,
) -> None:
"""Connect to a streamableHttp type MCP server (based on HTTP streaming)"""
if not server_url:
raise ValueError("Server URL is required for streamableHttp connection")

server_id = server_id or server_url # Use URL as default server_id

# Disconnect existing connection to avoid conflicts
if server_id in self.sessions:
await self.disconnect(server_id)

exit_stack = AsyncExitStack()
self.exit_stacks[server_id] = exit_stack

try:
# Initialize HTTP streaming connection (with custom headers and timeout)
streams_context = streamablehttp_client(
url=server_url,
headers=headers or {}
)
# Enter context manager to establish streaming connection
streams = await exit_stack.enter_async_context(streams_context)
read_stream, write_stream, get_session_id = streams
if read_stream is None or write_stream is None:
raise ValueError("Invalid streams returned from streamablehttp_client")

session = await exit_stack.enter_async_context(
ClientSession(
read_stream,
write_stream,
)
)
self.sessions[server_id] = session

# Reuse existing logic to initialize tool list
await self._initialize_and_list_tools(server_id)
logger.info(f"Connected to streamableHttp server {server_id}")

except Exception as e:
# Clean up resources if connection fails
await exit_stack.aclose()
self.exit_stacks.pop(server_id, None)
raise RuntimeError(f"Failed to connect to streamableHttp server: {str(e)}")

async def _initialize_and_list_tools(self, server_id: str) -> None:
"""Initialize session and populate tool map."""
session = self.sessions.get(server_id)
Expand Down
9 changes: 8 additions & 1 deletion config/mcp.example.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@
"server1": {
"type": "sse",
"url": "http://localhost:8000/sse"
},
"server2": {
"type": "streamableHttp",
"url": "https://your_streamable_http_mcp_server/mcp",
"headers": {
"header": "content-type: application/json"
}
}
}
}
}
7 changes: 5 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,18 @@ docker~=7.1.0
pytest~=8.3.5
pytest-asyncio~=0.25.3

mcp~=1.5.0
mcp~=1.9.0
httpx>=0.27.0
tomli>=2.0.0

boto3~=1.37.18

requests~=2.32.3
beautifulsoup4~=4.13.3
crawl4ai~=0.6.3
crawl4ai>=0.6.3

huggingface-hub~=0.29.2
setuptools~=75.8.0

structlog
daytona