diff --git a/examples/foundational/39c-mcp-run-http.py b/examples/foundational/39c-mcp-run-http.py new file mode 100644 index 0000000000..b84755b5b9 --- /dev/null +++ b/examples/foundational/39c-mcp-run-http.py @@ -0,0 +1,135 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import argparse +import os + +from dotenv import load_dotenv +from loguru import logger +from mcp.client.session_group import StreamableHttpParameters + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.services.openai.llm import OpenAILLMService +from pipecat.services.cartesia.tts import CartesiaTTSService +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.mcp_service import MCPClient +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams +from pipecat.transports.services.daily import DailyParams + +load_dotenv(override=True) + +# We store functions so objects (e.g. SileroVADAnalyzer) don't get +# instantiated. The function will be called when the desired transport gets +# selected. +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + "twilio": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), +} + + +async def run_example(transport: BaseTransport, _: argparse.Namespace, handle_sigint: bool): + logger.info(f"Starting bot") + + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ) + + llm = OpenAILLMService( + api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o-mini" + ) + + try: + # Github MCP docs: https://github.com/github/github-mcp-server + # Enable Github Copilot on your GitHub account. Free tier is ok. (https://github.com/settings/copilot) + # Generate a personal access token. It must be a Fine-grained token, classic tokens are not supported. (https://github.com/settings/personal-access-tokens) + # Set permissions you want to use (eg. "all repositories", "profile: read/write", etc) + mcp = MCPClient( + server_params=StreamableHttpParameters( + url="https://api.githubcopilot.com/mcp/", + headers={"Authorization": f"Bearer {os.getenv('GITHUB_PERSONAL_ACCESS_TOKEN')}"}, + ) + ) + except Exception as e: + logger.error(f"error setting up mcp") + logger.exception("error trace:") + + tools = await mcp.register_tools(llm) + + system = f""" + You are a helpful LLM in a WebRTC call. + Your goal is to answer questions about the user's GitHub repositories and account. + You have access to a number of tools provided by Github. Use any and all tools to help users. + Your output will be converted to audio so don't include special characters in your answers. + Don't overexplain what you are doing. + Just respond with short sentences when you are carrying out tool calls. + """ + + messages = [{"role": "system", "content": system}] + + context = OpenAILLMContext(messages, tools) + context_aggregator = llm.create_context_aggregator(context) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, + context_aggregator.user(), # User spoken responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses and tool context + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected: {client}") + # Kick off the conversation. + await task.queue_frames([context_aggregator.user().get_context_frame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + await task.cancel() + + runner = PipelineRunner(handle_sigint=handle_sigint) + + await runner.run(task) + + +if __name__ == "__main__": + from pipecat.examples.run import main + + main(run_example, transport_params=transport_params) \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 8fdab47423..6b1756e7de 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -64,7 +64,7 @@ langchain = [ "langchain~=0.3.20", "langchain-community~=0.3.20", "langchain-ope livekit = [ "livekit~=0.22.0", "livekit-api~=0.8.2", "tenacity~=9.0.0" ] lmnt = [ "websockets~=13.1" ] local = [ "pyaudio~=0.2.14" ] -mcp = [ "mcp[cli]~=1.6.0" ] +mcp = [ "mcp[cli]~=1.9.4" ] mem0 = [ "mem0ai~=0.1.94" ] mlx-whisper = [ "mlx-whisper~=0.4.2" ] moondream = [ "einops~=0.8.0", "timm~=1.0.13", "transformers~=4.48.0" ] diff --git a/src/pipecat/services/mcp_service.py b/src/pipecat/services/mcp_service.py index 48b0f9f1dd..d699af2e19 100644 --- a/src/pipecat/services/mcp_service.py +++ b/src/pipecat/services/mcp_service.py @@ -7,7 +7,7 @@ """MCP (Model Context Protocol) client for integrating external tools with LLMs.""" import json -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, List, Tuple from loguru import logger @@ -17,9 +17,11 @@ try: from mcp import ClientSession, StdioServerParameters - from mcp.client.session_group import SseServerParameters + from mcp.client.session_group import SseServerParameters, StreamableHttpParameters + from mcp.client.session import ClientSession from mcp.client.sse import sse_client from mcp.client.stdio import stdio_client + from mcp.client.streamable_http import streamablehttp_client except ModuleNotFoundError as e: logger.error(f"Exception: {e}") logger.error("In order to use an MCP client, you need to `pip install pipecat-ai[mcp]`.") @@ -43,21 +45,25 @@ class MCPClient(BaseObject): def __init__( self, - server_params: Union[StdioServerParameters, SseServerParameters], + server_params: Tuple[StdioServerParameters, SseServerParameters, StreamableHttpParameters], **kwargs, ): super().__init__(**kwargs) self._server_params = server_params self._session = ClientSession + if isinstance(server_params, StdioServerParameters): self._client = stdio_client self._register_tools = self._stdio_register_tools elif isinstance(server_params, SseServerParameters): self._client = sse_client self._register_tools = self._sse_register_tools + elif isinstance(server_params, StreamableHttpParameters): + self._client = streamablehttp_client + self._register_tools = self._streamable_http_register_tools else: raise TypeError( - f"{self} invalid argument type: `server_params` must be either StdioServerParameters or SseServerParameters." + f"{self} invalid argument type: `server_params` must be either StdioServerParameters, SseServerParameters, or StreamableHttpParameters." ) async def register_tools(self, llm) -> ToolsSchema: @@ -75,6 +81,7 @@ async def register_tools(self, llm) -> ToolsSchema: tools_schema = await self._register_tools(llm) return tools_schema + def _convert_mcp_schema_to_pipecat( self, tool_name: str, tool_schema: Dict[str, Any] ) -> FunctionSchema: @@ -104,7 +111,7 @@ def _convert_mcp_schema_to_pipecat( return schema async def _sse_register_tools(self, llm) -> ToolsSchema: - """Register all available mcp.run tools with the LLM service. + """Register all available mcp tools with the LLM service. Args: llm: The Pipecat LLM service to register tools with @@ -120,15 +127,12 @@ async def mcp_tool_wrapper( context: any, result_callback: any, ) -> None: - """Wrapper for mcp.run tool calls to match Pipecat's function call interface.""" + """Wrapper for mcp tool calls to match Pipecat's function call interface.""" logger.debug(f"Executing tool '{function_name}' with call ID: {tool_call_id}") logger.trace(f"Tool arguments: {json.dumps(arguments, indent=2)}") try: async with self._client( - url=self._server_params.url, - headers=self._server_params.headers, - timeout=self._server_params.timeout, - sse_read_timeout=self._server_params.sse_read_timeout, + **self._server_params.model_dump() ) as (read, write): async with self._session(read, write) as session: await session.initialize() @@ -140,12 +144,10 @@ async def mcp_tool_wrapper( await result_callback(error_msg) logger.debug(f"SSE server parameters: {self._server_params}") + logger.debug("Starting registration of mcp tools") async with self._client( - url=self._server_params.url, - headers=self._server_params.headers, - timeout=self._server_params.timeout, - sse_read_timeout=self._server_params.sse_read_timeout, + **self._server_params.model_dump() ) as (read, write): async with self._session(read, write) as session: await session.initialize() @@ -153,7 +155,7 @@ async def mcp_tool_wrapper( return tools_schema async def _stdio_register_tools(self, llm) -> ToolsSchema: - """Register all available mcp.run tools with the LLM service. + """Register all available mcp tools with the LLM service. Args: llm: The Pipecat LLM service to register tools with @@ -169,7 +171,7 @@ async def mcp_tool_wrapper( context: any, result_callback: any, ) -> None: - """Wrapper for mcp.run tool calls to match Pipecat's function call interface.""" + """Wrapper for mcp tool calls to match Pipecat's function call interface.""" logger.debug(f"Executing tool '{function_name}' with call ID: {tool_call_id}") logger.trace(f"Tool arguments: {json.dumps(arguments, indent=2)}") try: @@ -183,7 +185,7 @@ async def mcp_tool_wrapper( logger.exception("Full exception details:") await result_callback(error_msg) - logger.debug("Starting registration of mcp.run tools") + logger.debug("Starting registration of mcp tools") async with self._client(self._server_params) as streams: async with self._session(streams[0], streams[1]) as session: @@ -191,6 +193,56 @@ async def mcp_tool_wrapper( tools_schema = await self._list_tools(session, mcp_tool_wrapper, llm) return tools_schema + async def _streamable_http_register_tools(self, llm) -> ToolsSchema: + """Register all available mcp tools with the LLM service using streamable HTTP. + Args: + llm: The Pipecat LLM service to register tools with + Returns: + A ToolsSchema containing all registered tools + """ + + async def mcp_tool_wrapper( + function_name: str, + tool_call_id: str, + arguments: Dict[str, Any], + llm: any, + context: any, + result_callback: any, + ) -> None: + """Wrapper for mcp tool calls to match Pipecat's function call interface.""" + logger.debug(f"Executing tool '{function_name}' with call ID: {tool_call_id}") + logger.trace(f"Tool arguments: {json.dumps(arguments, indent=2)}") + try: + async with self._client( + **self._server_params.model_dump() + ) as ( + read_stream, + write_stream, + _, + ): + async with self._session(read_stream, write_stream) as session: + await session.initialize() + await self._call_tool(session, function_name, arguments, result_callback) + except Exception as e: + error_msg = f"Error calling mcp tool {function_name}: {str(e)}" + logger.error(error_msg) + logger.exception("Full exception details:") + await result_callback(error_msg) + + logger.debug("Starting registration of mcp tools using streamable HTTP") + + async with self._client( + **self._server_params.model_dump() + ) as ( + read_stream, + write_stream, + _, + ): + async with self._session(read_stream, write_stream) as session: + await session.initialize() + tools_schema = await self._list_tools(session, mcp_tool_wrapper, llm) + return tools_schema + async def _call_tool(self, session, function_name, arguments, result_callback): logger.debug(f"Calling mcp tool '{function_name}'") try: @@ -235,7 +287,7 @@ async def _list_tools(self, session, mcp_tool_wrapper, llm): # Convert the schema function_schema = self._convert_mcp_schema_to_pipecat( tool_name, - {"description": tool.description, "input_schema": tool.inputSchema}, + {"description": tool.description, "input_schema": tool.inputSchema} ) # Register the wrapped function @@ -254,4 +306,4 @@ async def _list_tools(self, session, mcp_tool_wrapper, llm): logger.debug(f"Completed registration of {len(tool_schemas)} tools") tools_schema = ToolsSchema(standard_tools=tool_schemas) - return tools_schema + return tools_schema \ No newline at end of file