Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 37 additions & 16 deletions pydantic_ai_slim/pydantic_ai/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@
from mcp.client.sse import sse_client
from mcp.client.stdio import StdioServerParameters, stdio_client
from mcp.client.streamable_http import GetSessionIdCallback, streamablehttp_client

# Try to import the new streamable_http_client. Fall back to None if not available (older MCP versions < 1.25.0).
# See: https://github.com/modelcontextprotocol/python-sdk/pull/1177
streamable_http_client: Any = None
try:
from mcp.client.streamable_http import (
streamable_http_client, # pyright: ignore[reportAttributeAccessIssue,reportUnknownVariableType]
)
except ImportError:
pass
from mcp.shared import exceptions as mcp_exceptions
from mcp.shared.context import RequestContext
from mcp.shared.message import SessionMessage
Expand Down Expand Up @@ -1153,22 +1163,33 @@ async def client_streams(
)

if self.http_client is not None:
# TODO: Clean up once https://github.com/modelcontextprotocol/python-sdk/pull/1177 lands.
@asynccontextmanager
async def httpx_client_factory(
headers: dict[str, str] | None = None,
timeout: httpx.Timeout | None = None,
auth: httpx.Auth | None = None,
) -> AsyncIterator[httpx.AsyncClient]:
assert self.http_client is not None
yield self.http_client

async with transport_client_partial(httpx_client_factory=httpx_client_factory) as (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the transport_client_partial method can now be deleted

read_stream,
write_stream,
*_,
):
yield read_stream, write_stream
# Use the new streamable_http_client directly when http_client is provided and available.
# The deprecated streamablehttp_client wrapper would close the client via `async with client:`.
# See: https://github.com/modelcontextprotocol/python-sdk/pull/1177
if self._transport_client is streamablehttp_client and streamable_http_client is not None:
async with streamable_http_client(self.url, http_client=self.http_client) as (
read_stream,
write_stream,
_,
):
yield read_stream, write_stream
else:

@asynccontextmanager
async def httpx_client_factory(
headers: dict[str, str] | None = None,
timeout: httpx.Timeout | None = None,
auth: httpx.Auth | None = None,
) -> AsyncIterator[httpx.AsyncClient]:
assert self.http_client is not None
yield self.http_client

async with transport_client_partial(httpx_client_factory=httpx_client_factory) as (
read_stream,
write_stream,
*_,
):
yield read_stream, write_stream
else:
async with transport_client_partial(headers=self.headers) as (read_stream, write_stream, *_):
yield read_stream, write_stream
Expand Down
55 changes: 55 additions & 0 deletions tests/test_mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -2220,6 +2220,19 @@ async def test_agent_run_stream_with_mcp_server_http(allow_model_requests: None,


async def test_custom_http_client_not_closed():
"""Test that user-provided HTTP clients are not closed after MCP server operations.

This test verifies the fix for GitHub issue #3745:
https://github.com/pydantic/pydantic-ai/issues/3745

When users provide their own httpx.AsyncClient to MCPServerStreamableHTTP,
it should remain open after the server finishes its operations. The server
should not manage the lifecycle of user-provided clients.

This works by using the new MCP SDK's streamable_http_client function (when
available in MCP >= 1.25.0) which properly handles user-provided clients by
not closing them on exit.
"""
custom_http_client = cached_async_http_client()

assert not custom_http_client.is_closed
Expand All @@ -2234,6 +2247,48 @@ async def test_custom_http_client_not_closed():
assert not custom_http_client.is_closed


def test_streamable_http_uses_new_api_when_available():
"""Test that MCPServerStreamableHTTP uses the new streamable_http_client API.

When a custom http_client is provided and streamable_http_client is available
(MCP SDK >= 1.25.0), the code should use the new API that correctly handles
the HTTP client lifecycle.

Related to GitHub issue #3745:
https://github.com/pydantic/pydantic-ai/issues/3745
"""
import pydantic_ai.mcp as mcp_module

if mcp_module.streamable_http_client is None:
pytest.skip('streamable_http_client not available (MCP SDK < 1.25.0)')

assert mcp_module.streamable_http_client is not None


def test_streamable_http_fallback_when_old_sdk():
"""Test that MCPServerStreamableHTTP falls back gracefully with old MCP SDK.

When streamable_http_client is not available (MCP SDK < 1.25.0), the code
should use the deprecated streamablehttp_client with the factory approach.

Related to GitHub issue #3745:
https://github.com/pydantic/pydantic-ai/issues/3745
"""
from mcp.client.streamable_http import streamablehttp_client

import pydantic_ai.mcp as mcp_module

assert streamablehttp_client is not None

with patch.object(mcp_module, 'streamable_http_client', None):
assert mcp_module.streamable_http_client is None
server = MCPServerStreamableHTTP(
url='https://example.com/mcp',
http_client=None,
)
assert server is not None


# ============================================================================
# Tool and Resource Caching Tests
# ============================================================================
Expand Down
Loading