Skip to content

Commit 9cfbb1c

Browse files
committed
Merge branch 'main' into ihrpr/auth-example
2 parents ebb9151 + e0d443c commit 9cfbb1c

File tree

27 files changed

+1424
-57
lines changed

27 files changed

+1424
-57
lines changed

README.md

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,8 +410,69 @@ app = Starlette(
410410
app.router.routes.append(Host('mcp.acme.corp', app=mcp.sse_app()))
411411
```
412412

413+
When mounting multiple MCP servers under different paths, you can configure the mount path in several ways:
414+
415+
```python
416+
from starlette.applications import Starlette
417+
from starlette.routing import Mount
418+
from mcp.server.fastmcp import FastMCP
419+
420+
# Create multiple MCP servers
421+
github_mcp = FastMCP("GitHub API")
422+
browser_mcp = FastMCP("Browser")
423+
curl_mcp = FastMCP("Curl")
424+
search_mcp = FastMCP("Search")
425+
426+
# Method 1: Configure mount paths via settings (recommended for persistent configuration)
427+
github_mcp.settings.mount_path = "/github"
428+
browser_mcp.settings.mount_path = "/browser"
429+
430+
# Method 2: Pass mount path directly to sse_app (preferred for ad-hoc mounting)
431+
# This approach doesn't modify the server's settings permanently
432+
433+
# Create Starlette app with multiple mounted servers
434+
app = Starlette(
435+
routes=[
436+
# Using settings-based configuration
437+
Mount("/github", app=github_mcp.sse_app()),
438+
Mount("/browser", app=browser_mcp.sse_app()),
439+
# Using direct mount path parameter
440+
Mount("/curl", app=curl_mcp.sse_app("/curl")),
441+
Mount("/search", app=search_mcp.sse_app("/search")),
442+
]
443+
)
444+
445+
# Method 3: For direct execution, you can also pass the mount path to run()
446+
if __name__ == "__main__":
447+
search_mcp.run(transport="sse", mount_path="/search")
448+
```
449+
413450
For more information on mounting applications in Starlette, see the [Starlette documentation](https://www.starlette.io/routing/#submounting-routes).
414451

452+
#### Message Dispatch Options
453+
454+
By default, the SSE server uses an in-memory message dispatch system for incoming POST messages. For production deployments or distributed scenarios, you can use Redis or implement your own message dispatch system that conforms to the `MessageDispatch` protocol:
455+
456+
```python
457+
# Using the built-in Redis message dispatch
458+
from mcp.server.fastmcp import FastMCP
459+
from mcp.server.message_queue import RedisMessageDispatch
460+
461+
# Create a Redis message dispatch
462+
redis_dispatch = RedisMessageDispatch(
463+
redis_url="redis://localhost:6379/0", prefix="mcp:pubsub:"
464+
)
465+
466+
# Pass the message dispatch instance to the server
467+
mcp = FastMCP("My App", message_queue=redis_dispatch)
468+
```
469+
470+
To use Redis, add the Redis dependency:
471+
472+
```bash
473+
uv add "mcp[redis]"
474+
```
475+
415476
## Examples
416477

417478
### Echo Server

examples/servers/simple-prompt/mcp_simple_prompt/server.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,12 +88,15 @@ async def get_prompt(
8888
)
8989

9090
if transport == "sse":
91+
from mcp.server.message_queue.redis import RedisMessageDispatch
9192
from mcp.server.sse import SseServerTransport
9293
from starlette.applications import Starlette
9394
from starlette.responses import Response
9495
from starlette.routing import Mount, Route
9596

96-
sse = SseServerTransport("/messages/")
97+
message_dispatch = RedisMessageDispatch("redis://localhost:6379/0")
98+
99+
sse = SseServerTransport("/messages/", message_dispatch=message_dispatch)
97100

98101
async def handle_sse(request):
99102
async with sse.connect_sse(

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ dependencies = [
3737
rich = ["rich>=13.9.4"]
3838
cli = ["typer>=0.12.4", "python-dotenv>=1.0.0"]
3939
ws = ["websockets>=15.0.1"]
40+
redis = ["redis>=5.2.1", "types-redis>=4.6.0.20241004"]
4041

4142
[project.scripts]
4243
mcp = "mcp.cli:app [cli]"
@@ -55,6 +56,7 @@ dev = [
5556
"pytest-xdist>=3.6.1",
5657
"pytest-examples>=0.0.14",
5758
"pytest-pretty>=1.2.0",
59+
"fakeredis==2.28.1",
5860
]
5961
docs = [
6062
"mkdocs>=1.6.1",

src/mcp/client/sse.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,9 @@ async def sse_reader(
9898
await read_stream_writer.send(exc)
9999
continue
100100

101-
session_message = SessionMessage(message)
101+
session_message = SessionMessage(
102+
message=message
103+
)
102104
await read_stream_writer.send(session_message)
103105
case _:
104106
logger.warning(
@@ -148,3 +150,5 @@ async def post_writer(endpoint_url: str):
148150
finally:
149151
await read_stream_writer.aclose()
150152
await write_stream.aclose()
153+
await read_stream.aclose()
154+
await write_stream_reader.aclose()

src/mcp/client/stdio/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ async def stdout_reader():
144144
await read_stream_writer.send(exc)
145145
continue
146146

147-
session_message = SessionMessage(message)
147+
session_message = SessionMessage(message=message)
148148
await read_stream_writer.send(session_message)
149149
except anyio.ClosedResourceError:
150150
await anyio.lowlevel.checkpoint()

src/mcp/client/streamable_http.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ async def _handle_sse_event(
153153
):
154154
message.root.id = original_request_id
155155

156-
session_message = SessionMessage(message)
156+
session_message = SessionMessage(message=message)
157157
await read_stream_writer.send(session_message)
158158

159159
# Call resumption token callback if we have an ID
@@ -286,7 +286,7 @@ async def _handle_json_response(
286286
try:
287287
content = await response.aread()
288288
message = JSONRPCMessage.model_validate_json(content)
289-
session_message = SessionMessage(message)
289+
session_message = SessionMessage(message=message)
290290
await read_stream_writer.send(session_message)
291291
except Exception as exc:
292292
logger.error(f"Error parsing JSON response: {exc}")
@@ -333,7 +333,7 @@ async def _send_session_terminated_error(
333333
id=request_id,
334334
error=ErrorData(code=32600, message="Session terminated"),
335335
)
336-
session_message = SessionMessage(JSONRPCMessage(jsonrpc_error))
336+
session_message = SessionMessage(message=JSONRPCMessage(jsonrpc_error))
337337
await read_stream_writer.send(session_message)
338338

339339
async def post_writer(

src/mcp/client/websocket.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ async def ws_reader():
6060
async for raw_text in ws:
6161
try:
6262
message = types.JSONRPCMessage.model_validate_json(raw_text)
63-
session_message = SessionMessage(message)
63+
session_message = SessionMessage(message=message)
6464
await read_stream_writer.send(session_message)
6565
except ValidationError as exc:
6666
# If JSON parse or model validation fails, send the exception

src/mcp/server/fastmcp/server.py

Lines changed: 75 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
from mcp.server.lowlevel.server import LifespanResultT
4545
from mcp.server.lowlevel.server import Server as MCPServer
4646
from mcp.server.lowlevel.server import lifespan as default_lifespan
47+
from mcp.server.message_queue import MessageDispatch
4748
from mcp.server.session import ServerSession, ServerSessionT
4849
from mcp.server.sse import SseServerTransport
4950
from mcp.server.stdio import stdio_server
@@ -87,9 +88,15 @@ class Settings(BaseSettings, Generic[LifespanResultT]):
8788
# HTTP settings
8889
host: str = "0.0.0.0"
8990
port: int = 8000
91+
mount_path: str = "/" # Mount path (e.g. "/github", defaults to root path)
9092
sse_path: str = "/sse"
9193
message_path: str = "/messages/"
9294

95+
# SSE message queue settings
96+
message_dispatch: MessageDispatch | None = Field(
97+
None, description="Custom message dispatch instance"
98+
)
99+
93100
# resource settings
94101
warn_on_duplicate_resources: bool = True
95102

@@ -178,11 +185,16 @@ def name(self) -> str:
178185
def instructions(self) -> str | None:
179186
return self._mcp_server.instructions
180187

181-
def run(self, transport: Literal["stdio", "sse"] = "stdio") -> None:
188+
def run(
189+
self,
190+
transport: Literal["stdio", "sse"] = "stdio",
191+
mount_path: str | None = None,
192+
) -> None:
182193
"""Run the FastMCP server. Note this is a synchronous function.
183194
184195
Args:
185196
transport: Transport protocol to use ("stdio" or "sse")
197+
mount_path: Optional mount path for SSE transport
186198
"""
187199
TRANSPORTS = Literal["stdio", "sse"]
188200
if transport not in TRANSPORTS.__args__: # type: ignore
@@ -191,7 +203,7 @@ def run(self, transport: Literal["stdio", "sse"] = "stdio") -> None:
191203
if transport == "stdio":
192204
anyio.run(self.run_stdio_async)
193205
else: # transport == "sse"
194-
anyio.run(self.run_sse_async)
206+
anyio.run(lambda: self.run_sse_async(mount_path))
195207

196208
def _setup_handlers(self) -> None:
197209
"""Set up core MCP protocol handlers."""
@@ -552,11 +564,11 @@ async def run_stdio_async(self) -> None:
552564
self._mcp_server.create_initialization_options(),
553565
)
554566

555-
async def run_sse_async(self) -> None:
567+
async def run_sse_async(self, mount_path: str | None = None) -> None:
556568
"""Run the server using SSE transport."""
557569
import uvicorn
558570

559-
starlette_app = self.sse_app()
571+
starlette_app = self.sse_app(mount_path)
560572

561573
config = uvicorn.Config(
562574
starlette_app,
@@ -567,14 +579,59 @@ async def run_sse_async(self) -> None:
567579
server = uvicorn.Server(config)
568580
await server.serve()
569581

570-
def sse_app(self) -> Starlette:
582+
def _normalize_path(self, mount_path: str, endpoint: str) -> str:
583+
"""
584+
Combine mount path and endpoint to return a normalized path.
585+
586+
Args:
587+
mount_path: The mount path (e.g. "/github" or "/")
588+
endpoint: The endpoint path (e.g. "/messages/")
589+
590+
Returns:
591+
Normalized path (e.g. "/github/messages/")
592+
"""
593+
# Special case: root path
594+
if mount_path == "/":
595+
return endpoint
596+
597+
# Remove trailing slash from mount path
598+
if mount_path.endswith("/"):
599+
mount_path = mount_path[:-1]
600+
601+
# Ensure endpoint starts with slash
602+
if not endpoint.startswith("/"):
603+
endpoint = "/" + endpoint
604+
605+
# Combine paths
606+
return mount_path + endpoint
607+
608+
def sse_app(self, mount_path: str | None = None) -> Starlette:
571609
"""Return an instance of the SSE server app."""
610+
message_dispatch = self.settings.message_dispatch
611+
if message_dispatch is None:
612+
from mcp.server.message_queue import InMemoryMessageDispatch
613+
614+
message_dispatch = InMemoryMessageDispatch()
615+
logger.info("Using default in-memory message dispatch")
616+
572617
from starlette.middleware import Middleware
573618
from starlette.routing import Mount, Route
574619

620+
# Update mount_path in settings if provided
621+
if mount_path is not None:
622+
self.settings.mount_path = mount_path
623+
624+
# Create normalized endpoint considering the mount path
625+
normalized_message_endpoint = self._normalize_path(
626+
self.settings.mount_path, self.settings.message_path
627+
)
628+
575629
# Set up auth context and dependencies
576630

577-
sse = SseServerTransport(self.settings.message_path)
631+
sse = SseServerTransport(
632+
normalized_message_endpoint,
633+
message_dispatch=message_dispatch
634+
)
578635

579636
async def handle_sse(scope: Scope, receive: Receive, send: Send):
580637
# Add client ID from auth context into request context if available
@@ -589,7 +646,14 @@ async def handle_sse(scope: Scope, receive: Receive, send: Send):
589646
streams[1],
590647
self._mcp_server.create_initialization_options(),
591648
)
592-
return Response()
649+
return Response()
650+
651+
@asynccontextmanager
652+
async def lifespan(app: Starlette):
653+
try:
654+
yield
655+
finally:
656+
await message_dispatch.close()
593657

594658
# Create routes
595659
routes: list[Route | Mount] = []
@@ -666,7 +730,10 @@ async def sse_endpoint(request: Request) -> None:
666730

667731
# Create Starlette app with routes and middleware
668732
return Starlette(
669-
debug=self.settings.debug, routes=routes, middleware=middleware
733+
debug=self.settings.debug,
734+
routes=routes,
735+
middleware=middleware,
736+
lifespan=lifespan,
670737
)
671738

672739
async def list_prompts(self) -> list[MCPPrompt]:
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
"""
2+
Message Dispatch Module for MCP Server
3+
4+
This module implements dispatch interfaces for handling
5+
messages between clients and servers.
6+
"""
7+
8+
from mcp.server.message_queue.base import InMemoryMessageDispatch, MessageDispatch
9+
10+
# Try to import Redis implementation if available
11+
try:
12+
from mcp.server.message_queue.redis import RedisMessageDispatch
13+
except ImportError:
14+
RedisMessageDispatch = None
15+
16+
__all__ = ["MessageDispatch", "InMemoryMessageDispatch", "RedisMessageDispatch"]

0 commit comments

Comments
 (0)