Skip to content

Commit 5111c92

Browse files
committed
naming changes
1 parent 7cabcea commit 5111c92

File tree

4 files changed

+48
-47
lines changed

4 files changed

+48
-47
lines changed

README.md

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -385,22 +385,22 @@ app.router.routes.append(Host('mcp.acme.corp', app=mcp.sse_app()))
385385

386386
For more information on mounting applications in Starlette, see the [Starlette documentation](https://www.starlette.io/routing/#submounting-routes).
387387

388-
#### Message Queue Options
388+
#### Message Dispatch Options
389389

390-
By default, the SSE server uses an in-memory message queue for incoming POST messages. For production deployments or distributed scenarios, you can use Redis:
390+
By default, the SSE server uses an in-memory message dispatch system for incoming POST messages. For production deployments or distributed scenarios, you can use Redis or implement your own message dispatch system that conforms to the `MessageDispatch` protocol:
391391

392392
```python
393-
# Using the built-in Redis message queue
393+
# Using the built-in Redis message dispatch
394394
from mcp.server.fastmcp import FastMCP
395-
from mcp.server.message_queue import RedisMessageQueue
395+
from mcp.server.message_queue import RedisMessageDispatch
396396

397-
# Create a Redis message queue
398-
redis_queue = RedisMessageQueue(
397+
# Create a Redis message dispatch
398+
redis_dispatch = RedisMessageDispatch(
399399
redis_url="redis://localhost:6379/0", prefix="mcp:pubsub:"
400400
)
401401

402-
# Pass the message queue instance to the server
403-
mcp = FastMCP("My App", message_queue=redis_queue)
402+
# Pass the message dispatch instance to the server
403+
mcp = FastMCP("My App", message_queue=redis_dispatch)
404404
```
405405

406406
To use Redis, add the Redis dependency:

src/mcp/server/message_queue/base.py

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212

1313

1414
@runtime_checkable
15-
class MessageQueue(Protocol):
16-
"""Abstract interface for SSE messaging.
15+
class MessageDispatch(Protocol):
16+
"""Abstract interface for SSE message dispatching.
1717
1818
This interface allows messages to be published to sessions and callbacks to be
1919
registered for message handling, enabling multiple servers to handle requests.
@@ -34,11 +34,11 @@ async def publish_message(
3434
...
3535

3636
@asynccontextmanager
37-
async def active_for_request(self, session_id: UUID, callback: MessageCallback):
38-
"""Request-scoped context manager that ensures the listener is active.
37+
async def subscribe(self, session_id: UUID, callback: MessageCallback):
38+
"""Request-scoped context manager that subscribes to messages for a session.
3939
4040
Args:
41-
session_id: The UUID of the session to activate
41+
session_id: The UUID of the session to subscribe to
4242
callback: Async callback function to handle messages for this session
4343
"""
4444
yield
@@ -55,49 +55,44 @@ async def session_exists(self, session_id: UUID) -> bool:
5555
...
5656

5757

58-
class InMemoryMessageQueue:
59-
"""Default in-memory implementation of the MessageQueue interface.
58+
class InMemoryMessageDispatch:
59+
"""Default in-memory implementation of the MessageDispatch interface.
6060
61-
This implementation immediately calls registered callbacks when messages
62-
are received.
61+
This implementation immediately dispatches messages to registered callbacks when
62+
messages are received without any queuing behavior.
6363
"""
6464

6565
def __init__(self) -> None:
6666
self._callbacks: dict[UUID, MessageCallback] = {}
67-
self._active_sessions: set[UUID] = set()
67+
# We don't need a separate _active_sessions set since _callbacks already tracks this
6868

6969
async def publish_message(
7070
self, session_id: UUID, message: types.JSONRPCMessage | Exception
7171
) -> bool:
7272
"""Publish a message for the specified session."""
73-
if not await self.session_exists(session_id):
73+
if session_id not in self._callbacks:
7474
logger.warning(f"Message received for unknown session {session_id}")
7575
return False
7676

77-
# Call the callback directly if registered
78-
if session_id in self._callbacks:
79-
await self._callbacks[session_id](message)
80-
logger.debug(f"Called callback for session {session_id}")
81-
else:
82-
logger.warning(f"No callback registered for session {session_id}")
77+
# Call the callback directly
78+
await self._callbacks[session_id](message)
79+
logger.debug(f"Called callback for session {session_id}")
8380

8481
return True
8582

8683
@asynccontextmanager
87-
async def active_for_request(self, session_id: UUID, callback: MessageCallback):
88-
"""Request-scoped context manager that ensures the listener is active."""
89-
self._active_sessions.add(session_id)
84+
async def subscribe(self, session_id: UUID, callback: MessageCallback):
85+
"""Request-scoped context manager that subscribes to messages for a session."""
9086
self._callbacks[session_id] = callback
9187
logger.debug(f"Registered session {session_id} with callback")
9288

9389
try:
9490
yield
9591
finally:
96-
self._active_sessions.discard(session_id)
9792
if session_id in self._callbacks:
9893
del self._callbacks[session_id]
9994
logger.debug(f"Unregistered session {session_id}")
10095

10196
async def session_exists(self, session_id: UUID) -> bool:
10297
"""Check if a session exists."""
103-
return session_id in self._active_sessions
98+
return session_id in self._callbacks

src/mcp/server/message_queue/redis.py

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
logger = logging.getLogger(__name__)
2222

2323

24-
class RedisMessageQueue:
25-
"""Redis implementation of the MessageQueue interface using pubsub.
24+
class RedisMessageDispatch:
25+
"""Redis implementation of the MessageDispatch interface using pubsub.
2626
2727
This implementation uses Redis pubsub for real-time message distribution across
2828
multiple servers handling the same sessions.
@@ -31,7 +31,7 @@ class RedisMessageQueue:
3131
def __init__(
3232
self, redis_url: str = "redis://localhost:6379/0", prefix: str = "mcp:pubsub:"
3333
) -> None:
34-
"""Initialize Redis message queue.
34+
"""Initialize Redis message dispatch.
3535
3636
Args:
3737
redis_url: Redis connection string
@@ -44,15 +44,15 @@ def __init__(
4444
self._callbacks: dict[UUID, MessageCallback] = {}
4545
# Ensures only one polling task runs at a time for message handling
4646
self._limiter = CapacityLimiter(1)
47-
logger.debug(f"Initialized Redis message queue with URL: {redis_url}")
47+
logger.debug(f"Initialized Redis message dispatch with URL: {redis_url}")
4848

4949
def _session_channel(self, session_id: UUID) -> str:
5050
"""Get the Redis channel for a session."""
5151
return f"{self._prefix}session:{session_id.hex}"
5252

5353
@asynccontextmanager
54-
async def active_for_request(self, session_id: UUID, callback: MessageCallback):
55-
"""Request-scoped context manager that ensures the listener task is running."""
54+
async def subscribe(self, session_id: UUID, callback: MessageCallback):
55+
"""Request-scoped context manager that subscribes to messages for a session."""
5656
await self._redis.sadd(self._active_sessions_key, session_id.hex)
5757
self._callbacks[session_id] = callback
5858
channel = self._session_channel(session_id)
@@ -98,17 +98,23 @@ async def _listen_for_messages(self) -> None:
9898
msg: None | types.JSONRPCMessage | Exception = None
9999
try:
100100
json_data = json.loads(data)
101-
if isinstance(json_data, dict):
102-
json_dict: dict[str, Any] = json_data
103-
if json_dict.get("_exception", False):
104-
msg = Exception(
105-
f"{json_dict['type']}: {json_dict['message']}"
106-
)
101+
if not isinstance(json_data, dict):
102+
logger.error(f"Received non-dict JSON data: {type(json_data)}")
103+
continue
104+
105+
json_dict: dict[str, Any] = json_data
106+
if json_dict.get("_exception", False):
107+
msg = Exception(
108+
f"{json_dict['type']}: {json_dict['message']}"
109+
)
110+
else:
111+
msg = types.JSONRPCMessage.model_validate_json(data)
112+
113+
if msg:
114+
if session_id in self._callbacks:
115+
await self._callbacks[session_id](msg)
107116
else:
108-
msg = types.JSONRPCMessage.model_validate_json(data)
109-
110-
if msg and session_id in self._callbacks:
111-
await self._callbacks[session_id](msg)
117+
logger.warning(f"No callback registered for session {session_id}")
112118
except Exception as e:
113119
logger.error(f"Failed to process message: {e}")
114120

src/mcp/server/sse.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ def __init__(
8080

8181
super().__init__()
8282
self._endpoint = endpoint
83-
self._message_queue = message_queue or InMemoryMessageQueue()
83+
self._message_dispatch = message_queue or InMemoryMessageDispatch()
8484
logger.debug(f"SseServerTransport initialized with endpoint: {endpoint}")
8585

8686
@asynccontextmanager

0 commit comments

Comments
 (0)