Skip to content

Commit 1e81f36

Browse files
committed
comments fix
1 parent 0bfd800 commit 1e81f36

File tree

3 files changed

+13
-10
lines changed

3 files changed

+13
-10
lines changed

src/mcp/server/message_queue/base.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
class MessageWrapper(BaseModel):
1717
message_id: str
1818
payload: str
19-
19+
2020
def get_json_rpc_message(self) -> types.JSONRPCMessage | ValidationError:
2121
"""Parse the payload into a JSONRPCMessage or return ValidationError."""
2222
try:
@@ -48,9 +48,12 @@ async def publish_message(
4848
...
4949

5050
async def publish_message_sync(
51-
self, session_id: UUID, message: types.JSONRPCMessage | str, timeout: float = 30.0
51+
self,
52+
session_id: UUID,
53+
message: types.JSONRPCMessage | str,
54+
timeout: float = 120.0,
5255
) -> bool:
53-
"""Publish a message for the specified session and wait for consumption confirmation.
56+
"""Publish a message for the specified session and wait for confirmation.
5457
5558
This method blocks until the message has been fully consumed by the subscriber,
5659
or until the timeout is reached.
@@ -121,12 +124,15 @@ async def publish_message(
121124

122125
logger.debug(f"Message dispatched to session {session_id}")
123126
return True
124-
127+
125128
async def publish_message_sync(
126-
self, session_id: UUID, message: types.JSONRPCMessage | str, timeout: float = 30.0
129+
self,
130+
session_id: UUID,
131+
message: types.JSONRPCMessage | str,
132+
timeout: float = 30.0,
127133
) -> bool:
128134
"""Publish a message for the specified session and wait for consumption.
129-
135+
130136
For InMemoryMessageDispatch, this is the same as publish_message since
131137
the callback is executed synchronously.
132138
"""

src/mcp/server/message_queue/redis.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,10 +130,8 @@ async def _handle_ack_message(self, channel: str, data: str) -> None:
130130
if not channel.startswith(ack_prefix):
131131
return
132132

133-
# Validate channel format exactly matches our expected format
134133
session_hex = channel[len(ack_prefix) :]
135134
try:
136-
# Validate this is a valid UUID hex and channel has correct format
137135
session_id = UUID(hex=session_hex)
138136
expected_channel = self._ack_channel(session_id)
139137
if channel != expected_channel:

src/mcp/server/sse.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,6 @@ async def handle_post_message(
174174
logger.debug(f"Validated client message: {message}")
175175
except ValidationError as err:
176176
logger.error(f"Failed to parse message: {err}")
177-
# Still publish the invalid message, but using synchronized version
178177
response = Response("Could not parse message", status_code=400)
179178
await response(scope, receive, send)
180179
# Pass raw JSON string; receiver will recreate identical ValidationError
@@ -184,7 +183,7 @@ async def handle_post_message(
184183

185184
logger.debug(f"Publishing message for session {session_id}: {message}")
186185

187-
# Use sync publish to block until the message is processed
186+
# Use sync publish, block POST response until the message is processed
188187
result = await self._message_dispatch.publish_message_sync(session_id, message)
189188

190189
if result:

0 commit comments

Comments
 (0)