Skip to content

Commit b2fce7d

Browse files
committed
ruff
1 parent c3d5efc commit b2fce7d

File tree

5 files changed

+98
-82
lines changed

5 files changed

+98
-82
lines changed

src/mcp/server/fastmcp/server.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ class Settings(BaseSettings, Generic[LifespanResultT]):
7575
port: int = 8000
7676
sse_path: str = "/sse"
7777
message_path: str = "/messages/"
78-
78+
7979
# SSE message queue settings
8080
message_queue: Literal["memory", "redis"] = "memory"
8181
redis_url: str = "redis://localhost:6379/0"
@@ -488,20 +488,25 @@ def sse_app(self) -> Starlette:
488488
if self.settings.message_queue == "redis":
489489
try:
490490
from mcp.server.message_queue import RedisMessageQueue
491+
491492
message_queue = RedisMessageQueue(
492-
redis_url=self.settings.redis_url,
493-
prefix=self.settings.redis_prefix
493+
redis_url=self.settings.redis_url, prefix=self.settings.redis_prefix
494494
)
495495
logger.info(f"Using Redis message queue at {self.settings.redis_url}")
496496
except ImportError:
497-
logger.error("Redis message queue requested but 'redis' package not installed. ")
497+
logger.error(
498+
"Redis message queue requested but 'redis' package not installed. "
499+
)
498500
raise
499501
else:
500502
from mcp.server.message_queue import InMemoryMessageQueue
503+
501504
message_queue = InMemoryMessageQueue()
502505
logger.info("Using in-memory message queue")
503-
504-
sse = SseServerTransport(self.settings.message_path, message_queue=message_queue)
506+
507+
sse = SseServerTransport(
508+
self.settings.message_path, message_queue=message_queue
509+
)
505510

506511
async def handle_sse(request: Request) -> None:
507512
async with sse.connect_sse(

src/mcp/server/message_queue/__init__.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
"""
22
Message Queue Module for MCP Server
33
4-
This module implements queue interfaces for handling messages between clients and servers.
4+
This module implements queue interfaces for handling
5+
messages between clients and servers.
56
"""
67

78
from mcp.server.message_queue.base import InMemoryMessageQueue, MessageQueue
@@ -12,4 +13,4 @@
1213
except ImportError:
1314
RedisMessageQueue = None
1415

15-
__all__ = ["MessageQueue", "InMemoryMessageQueue", "RedisMessageQueue"]
16+
__all__ = ["MessageQueue", "InMemoryMessageQueue", "RedisMessageQueue"]
Lines changed: 42 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,3 @@
1-
"""
2-
Base Message Queue Protocol and In-Memory Implementation
3-
4-
This module defines the message queue protocol and provides a default in-memory implementation.
5-
"""
6-
71
import logging
82
from typing import Protocol, runtime_checkable
93
from uuid import UUID
@@ -16,57 +10,61 @@
1610
@runtime_checkable
1711
class MessageQueue(Protocol):
1812
"""Abstract interface for an SSE message queue.
19-
20-
This interface allows messages to be queued and processed by any SSE server instance,
13+
14+
This interface allows messages to be queued and processed by any SSE server instance
2115
enabling multiple servers to handle requests for the same session.
2216
"""
23-
24-
async def add_message(self, session_id: UUID, message: types.JSONRPCMessage | Exception) -> bool:
17+
18+
async def add_message(
19+
self, session_id: UUID, message: types.JSONRPCMessage | Exception
20+
) -> bool:
2521
"""Add a message to the queue for the specified session.
26-
22+
2723
Args:
2824
session_id: The UUID of the session this message is for
2925
message: The message to queue
30-
26+
3127
Returns:
3228
bool: True if message was accepted, False if session not found
3329
"""
3430
...
35-
36-
async def get_message(self, session_id: UUID, timeout: float = 0.1) -> types.JSONRPCMessage | Exception | None:
31+
32+
async def get_message(
33+
self, session_id: UUID, timeout: float = 0.1
34+
) -> types.JSONRPCMessage | Exception | None:
3735
"""Get the next message for the specified session.
38-
36+
3937
Args:
4038
session_id: The UUID of the session to get messages for
4139
timeout: Maximum time to wait for a message, in seconds
42-
40+
4341
Returns:
4442
The next message or None if no message is available
4543
"""
4644
...
47-
45+
4846
async def register_session(self, session_id: UUID) -> None:
4947
"""Register a new session with the queue.
50-
48+
5149
Args:
5250
session_id: The UUID of the new session to register
5351
"""
5452
...
55-
53+
5654
async def unregister_session(self, session_id: UUID) -> None:
5755
"""Unregister a session when it's closed.
58-
56+
5957
Args:
6058
session_id: The UUID of the session to unregister
6159
"""
6260
...
63-
61+
6462
async def session_exists(self, session_id: UUID) -> bool:
6563
"""Check if a session exists.
66-
64+
6765
Args:
6866
session_id: The UUID of the session to check
69-
67+
7068
Returns:
7169
bool: True if the session is active, False otherwise
7270
"""
@@ -75,54 +73,59 @@ async def session_exists(self, session_id: UUID) -> bool:
7573

7674
class InMemoryMessageQueue:
7775
"""Default in-memory implementation of the MessageQueue interface.
78-
79-
This implementation keeps messages in memory for each session until they're retrieved.
76+
77+
This implementation keeps messages in memory for
78+
each session until they're retrieved.
8079
"""
81-
80+
8281
def __init__(self) -> None:
8382
self._message_queues: dict[UUID, list[types.JSONRPCMessage | Exception]] = {}
8483
self._active_sessions: set[UUID] = set()
85-
86-
async def add_message(self, session_id: UUID, message: types.JSONRPCMessage | Exception) -> bool:
84+
85+
async def add_message(
86+
self, session_id: UUID, message: types.JSONRPCMessage | Exception
87+
) -> bool:
8788
"""Add a message to the queue for the specified session."""
8889
if session_id not in self._active_sessions:
8990
logger.warning(f"Message received for unknown session {session_id}")
9091
return False
91-
92+
9293
if session_id not in self._message_queues:
9394
self._message_queues[session_id] = []
94-
95+
9596
self._message_queues[session_id].append(message)
9697
logger.debug(f"Added message to queue for session {session_id}")
9798
return True
98-
99-
async def get_message(self, session_id: UUID, timeout: float = 0.1) -> types.JSONRPCMessage | Exception | None:
99+
100+
async def get_message(
101+
self, session_id: UUID, timeout: float = 0.1
102+
) -> types.JSONRPCMessage | Exception | None:
100103
"""Get the next message for the specified session."""
101104
if session_id not in self._active_sessions:
102105
return None
103-
106+
104107
queue = self._message_queues.get(session_id, [])
105108
if not queue:
106109
return None
107-
110+
108111
message = queue.pop(0)
109112
if not queue: # Clean up empty queue
110113
del self._message_queues[session_id]
111-
114+
112115
return message
113-
116+
114117
async def register_session(self, session_id: UUID) -> None:
115118
"""Register a new session with the queue."""
116119
self._active_sessions.add(session_id)
117120
logger.debug(f"Registered session {session_id}")
118-
121+
119122
async def unregister_session(self, session_id: UUID) -> None:
120123
"""Unregister a session when it's closed."""
121124
self._active_sessions.discard(session_id)
122125
if session_id in self._message_queues:
123126
del self._message_queues[session_id]
124127
logger.debug(f"Unregistered session {session_id}")
125-
128+
126129
async def session_exists(self, session_id: UUID) -> bool:
127130
"""Check if a session exists."""
128-
return session_id in self._active_sessions
131+
return session_id in self._active_sessions
Lines changed: 36 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,3 @@
1-
"""
2-
Redis Message Queue Module for MCP Server
3-
4-
This module implements a Redis-backed message queue for handling messages between clients and servers.
5-
"""
6-
71
import json
82
import logging
93
from uuid import UUID
@@ -23,14 +17,16 @@
2317

2418
class RedisMessageQueue:
2519
"""Redis implementation of the MessageQueue interface.
26-
20+
2721
This implementation uses Redis lists to store messages for each session.
2822
Redis provides persistence and allows multiple servers to share the same queue.
2923
"""
30-
31-
def __init__(self, redis_url: str = "redis://localhost:6379/0", prefix: str = "mcp:queue:") -> None:
24+
25+
def __init__(
26+
self, redis_url: str = "redis://localhost:6379/0", prefix: str = "mcp:queue:"
27+
) -> None:
3228
"""Initialize Redis message queue.
33-
29+
3430
Args:
3531
redis_url: Redis connection string
3632
prefix: Key prefix for Redis keys to avoid collisions
@@ -39,79 +35,87 @@ def __init__(self, redis_url: str = "redis://localhost:6379/0", prefix: str = "m
3935
self._prefix = prefix
4036
self._active_sessions_key = f"{prefix}active_sessions"
4137
logger.debug(f"Initialized Redis message queue with URL: {redis_url}")
42-
38+
4339
def _session_queue_key(self, session_id: UUID) -> str:
4440
"""Get the Redis key for a session's message queue."""
4541
return f"{self._prefix}session:{session_id.hex}"
46-
47-
async def add_message(self, session_id: UUID, message: types.JSONRPCMessage | Exception) -> bool:
42+
43+
async def add_message(
44+
self, session_id: UUID, message: types.JSONRPCMessage | Exception
45+
) -> bool:
4846
"""Add a message to the queue for the specified session."""
4947
# Check if session exists
5048
if not await self.session_exists(session_id):
5149
logger.warning(f"Message received for unknown session {session_id}")
5250
return False
53-
51+
5452
# Serialize the message
5553
if isinstance(message, Exception):
5654
# For exceptions, store them as special format
57-
data = json.dumps({
58-
"_exception": True,
59-
"type": type(message).__name__,
60-
"message": str(message)
61-
})
55+
data = json.dumps(
56+
{
57+
"_exception": True,
58+
"type": type(message).__name__,
59+
"message": str(message),
60+
}
61+
)
6262
else:
6363
data = message.model_dump_json(by_alias=True, exclude_none=True)
64-
64+
6565
# Push to the right side of the list (queue)
6666
await self._redis.rpush(self._session_queue_key(session_id), data)
6767
logger.debug(f"Added message to Redis queue for session {session_id}")
6868
return True
69-
70-
async def get_message(self, session_id: UUID, timeout: float = 0.1) -> types.JSONRPCMessage | Exception | None:
69+
70+
async def get_message(
71+
self, session_id: UUID, timeout: float = 0.1
72+
) -> types.JSONRPCMessage | Exception | None:
7173
"""Get the next message for the specified session."""
7274
# Check if session exists
7375
if not await self.session_exists(session_id):
7476
return None
75-
77+
7678
# Pop from the left side of the list (queue)
7779
# Use BLPOP with timeout to avoid busy waiting
7880
result = await self._redis.blpop([self._session_queue_key(session_id)], timeout)
79-
81+
8082
if not result:
8183
return None
82-
84+
8385
# result is a tuple of (key, value)
8486
_, data = result
85-
87+
8688
# Deserialize the message
8789
json_data = json.loads(data)
88-
90+
8991
# Check if it's an exception
9092
if isinstance(json_data, dict) and json_data.get("_exception"):
9193
# Reconstitute a generic exception
9294
return Exception(f"{json_data['type']}: {json_data['message']}")
93-
95+
9496
# Regular message
9597
try:
9698
return types.JSONRPCMessage.model_validate_json(data)
9799
except Exception as e:
98100
logger.error(f"Failed to deserialize message: {e}")
99101
return None
100-
102+
101103
async def register_session(self, session_id: UUID) -> None:
102104
"""Register a new session with the queue."""
103105
# Add session ID to the set of active sessions
104106
await self._redis.sadd(self._active_sessions_key, session_id.hex)
105107
logger.debug(f"Registered session {session_id} in Redis")
106-
108+
107109
async def unregister_session(self, session_id: UUID) -> None:
108110
"""Unregister a session when it's closed."""
109111
# Remove session ID from active sessions
110112
await self._redis.srem(self._active_sessions_key, session_id.hex)
111113
# Delete the session's message queue
112114
await self._redis.delete(self._session_queue_key(session_id))
113115
logger.debug(f"Unregistered session {session_id} from Redis")
114-
116+
115117
async def session_exists(self, session_id: UUID) -> bool:
116118
"""Check if a session exists."""
117-
return bool(await self._redis.sismember(self._active_sessions_key, session_id.hex))
119+
return bool(
120+
await self._redis.sismember(self._active_sessions_key, session_id.hex)
121+
)

src/mcp/server/sse.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,16 @@ class SseServerTransport:
6666
_endpoint: str
6767
_message_queue: MessageQueue
6868

69-
def __init__(self, endpoint: str, message_queue: MessageQueue | None = None) -> None:
69+
def __init__(
70+
self, endpoint: str, message_queue: MessageQueue | None = None
71+
) -> None:
7072
"""
7173
Creates a new SSE server transport, which will direct the client to POST
7274
messages to the relative or absolute URL given.
73-
75+
7476
Args:
7577
endpoint: The endpoint URL for SSE connections
76-
message_queue: Optional message queue to use. If None, creates an InMemoryMessageQueue.
78+
message_queue: Optional message queue to use
7779
"""
7880

7981
super().__init__()
@@ -107,6 +109,7 @@ async def connect_sse(self, scope: Scope, receive: Receive, send: Send):
107109
](0)
108110

109111
message_polling_active = True
112+
110113
async def poll_queue():
111114
"""Background task to poll for messages in the queue"""
112115
logger.debug(f"Starting queue polling for session {session_id}")

0 commit comments

Comments
 (0)