From 8fcef5628f458cb68bcfaeb0046ee62047141efa Mon Sep 17 00:00:00 2001 From: Yousif Astarabadi <6870090+yousifa@users.noreply.github.com> Date: Thu, 19 Jun 2025 13:14:00 -0700 Subject: [PATCH 1/5] added streamablehttp support, bumped mcp version, added additional headers and streamable_http params to MCPClient --- pyproject.toml | 2 +- src/pipecat/services/mcp_service.py | 44 +++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index f7c73d49a5..cd1ae27ee8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -64,7 +64,7 @@ langchain = [ "langchain~=0.3.20", "langchain-community~=0.3.20", "langchain-ope livekit = [ "livekit~=0.22.0", "livekit-api~=0.8.2", "tenacity~=9.0.0" ] lmnt = [ "websockets~=13.1" ] local = [ "pyaudio~=0.2.14" ] -mcp = [ "mcp[cli]~=1.6.0" ] +mcp = [ "mcp[cli]~=1.9.4" ] mem0 = [ "mem0ai~=0.1.94" ] mlx-whisper = [ "mlx-whisper~=0.4.2" ] moondream = [ "einops~=0.8.0", "timm~=1.0.13", "transformers~=4.48.0" ] diff --git a/src/pipecat/services/mcp_service.py b/src/pipecat/services/mcp_service.py index a644d8f1bd..3e1b04681f 100644 --- a/src/pipecat/services/mcp_service.py +++ b/src/pipecat/services/mcp_service.py @@ -12,6 +12,7 @@ from mcp.client.session_group import SseServerParameters from mcp.client.sse import sse_client from mcp.client.stdio import stdio_client + from mcp.client.streamable_http import streamablehttp_client except ModuleNotFoundError as e: logger.error(f"Exception: {e}") logger.error("In order to use an MCP client, you need to `pip install pipecat-ai[mcp]`.") @@ -27,12 +28,17 @@ def __init__( super().__init__(**kwargs) self._server_params = server_params self._session = ClientSession + self._additional_headers = additional_headers or {} + if isinstance(server_params, StdioServerParameters): self._client = stdio_client self._register_tools = self._stdio_register_tools elif isinstance(server_params, SseServerParameters): self._client = sse_client self._register_tools = self._sse_register_tools + elif isinstance(server_params, str) and streamable_http: + self._client = streamablehttp_client + self._register_tools = self._streamable_http_register_tools else: raise TypeError( f"{self} invalid argument type: `server_params` must be either StdioServerParameters or SseServerParameters." @@ -156,6 +162,44 @@ async def mcp_tool_wrapper( tools_schema = await self._list_tools(session, mcp_tool_wrapper, llm) return tools_schema + async def _streamable_http_register_tools(self, llm) -> ToolsSchema: + """Register all available mcp.run tools with the LLM service using streamable HTTP. + Args: + llm: The Pipecat LLM service to register tools with + Returns: + A ToolsSchema containing all registered tools + """ + + async def mcp_tool_wrapper( + function_name: str, + tool_call_id: str, + arguments: Dict[str, Any], + llm: any, + context: any, + result_callback: any, + ) -> None: + """Wrapper for mcp.run tool calls to match Pipecat's function call interface.""" + logger.debug(f"Executing tool '{function_name}' with call ID: {tool_call_id}") + logger.trace(f"Tool arguments: {json.dumps(arguments, indent=2)}") + try: + async with self._client(self._server_params, headers=self._additional_headers) as (read_stream, write_stream, _): + async with self._session(read_stream, write_stream) as session: + await session.initialize() + await self._call_tool(session, function_name, arguments, result_callback) + except Exception as e: + error_msg = f"Error calling mcp tool {function_name}: {str(e)}" + logger.error(error_msg) + logger.exception("Full exception details:") + await result_callback(error_msg) + + logger.debug("Starting registration of mcp.run tools using streamable HTTP") + + async with self._client(self._server_params, headers=self._additional_headers) as (read_stream, write_stream, _): + async with self._session(read_stream, write_stream) as session: + await session.initialize() + tools_schema = await self._list_tools(session, mcp_tool_wrapper, llm) + return tools_schema + async def _call_tool(self, session, function_name, arguments, result_callback): logger.debug(f"Calling mcp tool '{function_name}'") try: From c720cfc7c7cb6e16e319ddff8238fd7f54bc5105 Mon Sep 17 00:00:00 2001 From: Yousif Astarabadi <6870090+yousifa@users.noreply.github.com> Date: Tue, 24 Jun 2025 19:15:32 -0700 Subject: [PATCH 2/5] updated streamablehttp to use StreamableHttpParameters type --- src/pipecat/services/mcp_service.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/src/pipecat/services/mcp_service.py b/src/pipecat/services/mcp_service.py index 3e1b04681f..2f61343dd7 100644 --- a/src/pipecat/services/mcp_service.py +++ b/src/pipecat/services/mcp_service.py @@ -9,7 +9,7 @@ try: from mcp import ClientSession, StdioServerParameters - from mcp.client.session_group import SseServerParameters + from mcp.client.session_group import SseServerParameters, StreamableHttpParameters from mcp.client.sse import sse_client from mcp.client.stdio import stdio_client from mcp.client.streamable_http import streamablehttp_client @@ -22,13 +22,12 @@ class MCPClient(BaseObject): def __init__( self, - server_params: Union[StdioServerParameters, SseServerParameters], + server_params: Union[StdioServerParameters, SseServerParameters, StreamableHttpParameters], **kwargs, ): super().__init__(**kwargs) self._server_params = server_params self._session = ClientSession - self._additional_headers = additional_headers or {} if isinstance(server_params, StdioServerParameters): self._client = stdio_client @@ -36,7 +35,7 @@ def __init__( elif isinstance(server_params, SseServerParameters): self._client = sse_client self._register_tools = self._sse_register_tools - elif isinstance(server_params, str) and streamable_http: + elif isinstance(server_params, StreamableHttpParameters): self._client = streamablehttp_client self._register_tools = self._streamable_http_register_tools else: @@ -182,7 +181,12 @@ async def mcp_tool_wrapper( logger.debug(f"Executing tool '{function_name}' with call ID: {tool_call_id}") logger.trace(f"Tool arguments: {json.dumps(arguments, indent=2)}") try: - async with self._client(self._server_params, headers=self._additional_headers) as (read_stream, write_stream, _): + async with self._client( + url=self._server_params.url, + headers=self._server_params.headers, + timeout=self._server_params.timeout, + sse_read_timeout=self._server_params.sse_read_timeout, + ) as (read_stream, write_stream, _): async with self._session(read_stream, write_stream) as session: await session.initialize() await self._call_tool(session, function_name, arguments, result_callback) @@ -194,7 +198,12 @@ async def mcp_tool_wrapper( logger.debug("Starting registration of mcp.run tools using streamable HTTP") - async with self._client(self._server_params, headers=self._additional_headers) as (read_stream, write_stream, _): + async with self._client( + url=self._server_params.url, + headers=self._server_params.headers, + timeout=self._server_params.timeout, + sse_read_timeout=self._server_params.sse_read_timeout, + ) as (read_stream, write_stream, _): async with self._session(read_stream, write_stream) as session: await session.initialize() tools_schema = await self._list_tools(session, mcp_tool_wrapper, llm) From 495688681932a2f879c761ea5ebc6f3c26693be6 Mon Sep 17 00:00:00 2001 From: Yousif Astarabadi <6870090+yousifa@users.noreply.github.com> Date: Tue, 24 Jun 2025 19:16:13 -0700 Subject: [PATCH 3/5] updated error message with StreamableHttpParameters --- src/pipecat/services/mcp_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pipecat/services/mcp_service.py b/src/pipecat/services/mcp_service.py index 2f61343dd7..dc631344ec 100644 --- a/src/pipecat/services/mcp_service.py +++ b/src/pipecat/services/mcp_service.py @@ -40,7 +40,7 @@ def __init__( self._register_tools = self._streamable_http_register_tools else: raise TypeError( - f"{self} invalid argument type: `server_params` must be either StdioServerParameters or SseServerParameters." + f"{self} invalid argument type: `server_params` must be either StdioServerParameters, SseServerParameters, or StreamableHttpParameters." ) async def register_tools(self, llm) -> ToolsSchema: From 1cac028bfee14aff11ccc3b094a615b06e301d8c Mon Sep 17 00:00:00 2001 From: Yousif Astarabadi <6870090+yousifa@users.noreply.github.com> Date: Tue, 24 Jun 2025 20:07:46 -0700 Subject: [PATCH 4/5] example using http transport for mcp client --- examples/foundational/39c-mcp-run-http.py | 131 ++++++++++++++++++++++ 1 file changed, 131 insertions(+) create mode 100644 examples/foundational/39c-mcp-run-http.py diff --git a/examples/foundational/39c-mcp-run-http.py b/examples/foundational/39c-mcp-run-http.py new file mode 100644 index 0000000000..363c3ea610 --- /dev/null +++ b/examples/foundational/39c-mcp-run-http.py @@ -0,0 +1,131 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import argparse +import os + +from dotenv import load_dotenv +from loguru import logger +from mcp.client.session_group import StreamableHttpParameters + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.services.openai.llm import OpenAILLMService +from pipecat.services.cartesia.tts import CartesiaTTSService +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.mcp_service import MCPClient +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams +from pipecat.transports.services.daily import DailyParams + +load_dotenv(override=True) + +# We store functions so objects (e.g. SileroVADAnalyzer) don't get +# instantiated. The function will be called when the desired transport gets +# selected. +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + "twilio": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), +} + + +async def run_example(transport: BaseTransport, _: argparse.Namespace, handle_sigint: bool): + logger.info(f"Starting bot") + + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ) + + llm = OpenAILLMService( + api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o-mini" + ) + + try: + mcp = MCPClient( + server_params=StreamableHttpParameters( + url="https://api.githubcopilot.com/mcp/", + headers={"Authorization": f"Bearer {os.getenv('GITHUB_PERSONAL_ACCESS_TOKEN')}"}, + ) + ) + except Exception as e: + logger.error(f"error setting up mcp") + logger.exception("error trace:") + + tools = await mcp.register_tools(llm) + + system = f""" + You are a helpful LLM in a WebRTC call. + Your goal is to answer questions about the user's GitHub repositories and account. + You have access to a number of tools provided by Github. Use any and all tools to help users. + Your output will be converted to audio so don't include special characters in your answers. + Don't overexplain what you are doing. + Just respond with short sentences when you are carrying out tool calls. + """ + + messages = [{"role": "system", "content": system}] + + context = OpenAILLMContext(messages, tools) + context_aggregator = llm.create_context_aggregator(context) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, + context_aggregator.user(), # User spoken responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses and tool context + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected: {client}") + # Kick off the conversation. + await task.queue_frames([context_aggregator.user().get_context_frame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + await task.cancel() + + runner = PipelineRunner(handle_sigint=handle_sigint) + + await runner.run(task) + + +if __name__ == "__main__": + from pipecat.examples.run import main + + main(run_example, transport_params=transport_params) From f0bcc9d9ba55d64d15a9fdd3f4188e715ea4a14f Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Wed, 25 Jun 2025 17:51:42 -0400 Subject: [PATCH 5/5] Add MCPClient docstrings. Removed google specific cleanup, changed example to openai --- examples/foundational/39c-mcp-run-http.py | 6 +- src/pipecat/services/mcp_service.py | 98 +++++++++++++++-------- 2 files changed, 71 insertions(+), 33 deletions(-) diff --git a/examples/foundational/39c-mcp-run-http.py b/examples/foundational/39c-mcp-run-http.py index 363c3ea610..b84755b5b9 100644 --- a/examples/foundational/39c-mcp-run-http.py +++ b/examples/foundational/39c-mcp-run-http.py @@ -63,6 +63,10 @@ async def run_example(transport: BaseTransport, _: argparse.Namespace, handle_si ) try: + # Github MCP docs: https://github.com/github/github-mcp-server + # Enable Github Copilot on your GitHub account. Free tier is ok. (https://github.com/settings/copilot) + # Generate a personal access token. It must be a Fine-grained token, classic tokens are not supported. (https://github.com/settings/personal-access-tokens) + # Set permissions you want to use (eg. "all repositories", "profile: read/write", etc) mcp = MCPClient( server_params=StreamableHttpParameters( url="https://api.githubcopilot.com/mcp/", @@ -128,4 +132,4 @@ async def on_client_disconnected(transport, client): if __name__ == "__main__": from pipecat.examples.run import main - main(run_example, transport_params=transport_params) + main(run_example, transport_params=transport_params) \ No newline at end of file diff --git a/src/pipecat/services/mcp_service.py b/src/pipecat/services/mcp_service.py index dc631344ec..d699af2e19 100644 --- a/src/pipecat/services/mcp_service.py +++ b/src/pipecat/services/mcp_service.py @@ -1,5 +1,13 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""MCP (Model Context Protocol) client for integrating external tools with LLMs.""" + import json -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, List, Tuple from loguru import logger @@ -10,6 +18,7 @@ try: from mcp import ClientSession, StdioServerParameters from mcp.client.session_group import SseServerParameters, StreamableHttpParameters + from mcp.client.session import ClientSession from mcp.client.sse import sse_client from mcp.client.stdio import stdio_client from mcp.client.streamable_http import streamablehttp_client @@ -20,15 +29,29 @@ class MCPClient(BaseObject): + """Client for Model Context Protocol (MCP) servers. + + Enables integration with MCP servers to provide external tools and resources + to LLMs. Supports both stdio and SSE server connections with automatic tool + registration and schema conversion. + + Args: + server_params: Server connection parameters (stdio or SSE). + **kwargs: Additional arguments passed to the parent BaseObject. + + Raises: + TypeError: If server_params is not a supported parameter type. + """ + def __init__( self, - server_params: Union[StdioServerParameters, SseServerParameters, StreamableHttpParameters], + server_params: Tuple[StdioServerParameters, SseServerParameters, StreamableHttpParameters], **kwargs, ): super().__init__(**kwargs) self._server_params = server_params self._session = ClientSession - + if isinstance(server_params, StdioServerParameters): self._client = stdio_client self._register_tools = self._stdio_register_tools @@ -44,20 +67,32 @@ def __init__( ) async def register_tools(self, llm) -> ToolsSchema: + """Register all available MCP tools with an LLM service. + + Connects to the MCP server, discovers available tools, converts their + schemas to Pipecat format, and registers them with the LLM service. + + Args: + llm: The Pipecat LLM service to register tools with. + + Returns: + A ToolsSchema containing all successfully registered tools. + """ tools_schema = await self._register_tools(llm) return tools_schema + def _convert_mcp_schema_to_pipecat( self, tool_name: str, tool_schema: Dict[str, Any] ) -> FunctionSchema: """Convert an mcp tool schema to Pipecat's FunctionSchema format. + Args: tool_name: The name of the tool tool_schema: The mcp tool schema Returns: A FunctionSchema instance """ - logger.debug(f"Converting schema for tool '{tool_name}'") logger.trace(f"Original schema: {json.dumps(tool_schema, indent=2)}") @@ -76,7 +111,8 @@ def _convert_mcp_schema_to_pipecat( return schema async def _sse_register_tools(self, llm) -> ToolsSchema: - """Register all available mcp.run tools with the LLM service. + """Register all available mcp tools with the LLM service. + Args: llm: The Pipecat LLM service to register tools with Returns: @@ -91,15 +127,12 @@ async def mcp_tool_wrapper( context: any, result_callback: any, ) -> None: - """Wrapper for mcp.run tool calls to match Pipecat's function call interface.""" + """Wrapper for mcp tool calls to match Pipecat's function call interface.""" logger.debug(f"Executing tool '{function_name}' with call ID: {tool_call_id}") logger.trace(f"Tool arguments: {json.dumps(arguments, indent=2)}") try: async with self._client( - url=self._server_params.url, - headers=self._server_params.headers, - timeout=self._server_params.timeout, - sse_read_timeout=self._server_params.sse_read_timeout, + **self._server_params.model_dump() ) as (read, write): async with self._session(read, write) as session: await session.initialize() @@ -111,12 +144,10 @@ async def mcp_tool_wrapper( await result_callback(error_msg) logger.debug(f"SSE server parameters: {self._server_params}") + logger.debug("Starting registration of mcp tools") async with self._client( - url=self._server_params.url, - headers=self._server_params.headers, - timeout=self._server_params.timeout, - sse_read_timeout=self._server_params.sse_read_timeout, + **self._server_params.model_dump() ) as (read, write): async with self._session(read, write) as session: await session.initialize() @@ -124,7 +155,8 @@ async def mcp_tool_wrapper( return tools_schema async def _stdio_register_tools(self, llm) -> ToolsSchema: - """Register all available mcp.run tools with the LLM service. + """Register all available mcp tools with the LLM service. + Args: llm: The Pipecat LLM service to register tools with Returns: @@ -139,7 +171,7 @@ async def mcp_tool_wrapper( context: any, result_callback: any, ) -> None: - """Wrapper for mcp.run tool calls to match Pipecat's function call interface.""" + """Wrapper for mcp tool calls to match Pipecat's function call interface.""" logger.debug(f"Executing tool '{function_name}' with call ID: {tool_call_id}") logger.trace(f"Tool arguments: {json.dumps(arguments, indent=2)}") try: @@ -153,7 +185,7 @@ async def mcp_tool_wrapper( logger.exception("Full exception details:") await result_callback(error_msg) - logger.debug("Starting registration of mcp.run tools") + logger.debug("Starting registration of mcp tools") async with self._client(self._server_params) as streams: async with self._session(streams[0], streams[1]) as session: @@ -162,7 +194,7 @@ async def mcp_tool_wrapper( return tools_schema async def _streamable_http_register_tools(self, llm) -> ToolsSchema: - """Register all available mcp.run tools with the LLM service using streamable HTTP. + """Register all available mcp tools with the LLM service using streamable HTTP. Args: llm: The Pipecat LLM service to register tools with Returns: @@ -177,16 +209,17 @@ async def mcp_tool_wrapper( context: any, result_callback: any, ) -> None: - """Wrapper for mcp.run tool calls to match Pipecat's function call interface.""" + """Wrapper for mcp tool calls to match Pipecat's function call interface.""" logger.debug(f"Executing tool '{function_name}' with call ID: {tool_call_id}") logger.trace(f"Tool arguments: {json.dumps(arguments, indent=2)}") try: async with self._client( - url=self._server_params.url, - headers=self._server_params.headers, - timeout=self._server_params.timeout, - sse_read_timeout=self._server_params.sse_read_timeout, - ) as (read_stream, write_stream, _): + **self._server_params.model_dump() + ) as ( + read_stream, + write_stream, + _, + ): async with self._session(read_stream, write_stream) as session: await session.initialize() await self._call_tool(session, function_name, arguments, result_callback) @@ -196,14 +229,15 @@ async def mcp_tool_wrapper( logger.exception("Full exception details:") await result_callback(error_msg) - logger.debug("Starting registration of mcp.run tools using streamable HTTP") + logger.debug("Starting registration of mcp tools using streamable HTTP") async with self._client( - url=self._server_params.url, - headers=self._server_params.headers, - timeout=self._server_params.timeout, - sse_read_timeout=self._server_params.sse_read_timeout, - ) as (read_stream, write_stream, _): + **self._server_params.model_dump() + ) as ( + read_stream, + write_stream, + _, + ): async with self._session(read_stream, write_stream) as session: await session.initialize() tools_schema = await self._list_tools(session, mcp_tool_wrapper, llm) @@ -253,7 +287,7 @@ async def _list_tools(self, session, mcp_tool_wrapper, llm): # Convert the schema function_schema = self._convert_mcp_schema_to_pipecat( tool_name, - {"description": tool.description, "input_schema": tool.inputSchema}, + {"description": tool.description, "input_schema": tool.inputSchema} ) # Register the wrapped function @@ -272,4 +306,4 @@ async def _list_tools(self, session, mcp_tool_wrapper, llm): logger.debug(f"Completed registration of {len(tool_schemas)} tools") tools_schema = ToolsSchema(standard_tools=tool_schemas) - return tools_schema + return tools_schema \ No newline at end of file