File tree Expand file tree Collapse file tree 5 files changed +127
-13
lines changed
Expand file tree Collapse file tree 5 files changed +127
-13
lines changed Original file line number Diff line number Diff line change 44"""
55
66from mcp .server .fastmcp import FastMCP
7+ from mcp .server .message_queue import RedisMessageQueue
78
8- mcp = FastMCP ()
9+ # Create a Redis message queue
10+ redis_queue = RedisMessageQueue (
11+ redis_url = "redis://localhost:6379/0" , prefix = "mcp:pubsub:"
12+ )
13+
14+ mcp = FastMCP (message_queue = redis_queue )
915
1016
1117@mcp .tool (
@@ -61,4 +67,4 @@ def multilingual_hello() -> str:
6167
6268
6369if __name__ == "__main__" :
64- mcp .run ()
70+ mcp .run (transport = "sse" )
Original file line number Diff line number Diff line change @@ -36,7 +36,8 @@ dependencies = [
3636rich = [" rich>=13.9.4" ]
3737cli = [" typer>=0.12.4" , " python-dotenv>=1.0.0" ]
3838ws = [" websockets>=15.0.1" ]
39- redis = [" redis>=5.0.0" ]
39+ redis = [" redis>=5.2.1" ]
40+ types-redis = [" types-redis>=4.6.0.20241004" ]
4041
4142[project .scripts ]
4243mcp = " mcp.cli:app [cli]"
Original file line number Diff line number Diff line change @@ -485,7 +485,6 @@ def sse_app(self) -> Starlette:
485485 """Return an instance of the SSE server app."""
486486 # Use a custom provided message queue if available
487487 message_queue = self .settings .message_queue
488-
489488 # If no message queue is provided, create an in-memory queue as default
490489 if message_queue is None :
491490 from mcp .server .message_queue import InMemoryMessageQueue
Original file line number Diff line number Diff line change 55from uuid import UUID
66
77import anyio
8- from anyio import CapacityLimiter , from_thread
8+ from anyio import CapacityLimiter
99import mcp .types as types
1010from mcp .server .message_queue .base import MessageCallback
1111
@@ -74,8 +74,8 @@ async def _listen_for_messages(self) -> None:
7474 async with self ._limiter :
7575 while True :
7676 message : None | dict [str , Any ] = await self ._pubsub .get_message ( # type: ignore
77- ignore_subscribe_messages = True
78- )
77+ ignore_subscribe_messages = True , timeout = None # type: ignore
78+ )
7979 if message is None :
8080 continue
8181
@@ -105,7 +105,7 @@ async def _listen_for_messages(self) -> None:
105105 msg = types .JSONRPCMessage .model_validate_json (data )
106106
107107 if msg and session_id in self ._callbacks :
108- from_thread . run ( self ._callbacks [session_id ], msg )
108+ await self ._callbacks [session_id ]( msg )
109109 except Exception as e :
110110 logger .error (f"Failed to process message: { e } " )
111111
You can’t perform that action at this time.
0 commit comments