File tree Expand file tree Collapse file tree 1 file changed +3
-1
lines changed
src/mcp/server/message_queue Expand file tree Collapse file tree 1 file changed +3
-1
lines changed Original file line number Diff line number Diff line change 55from uuid import UUID
66
77import anyio
8- from anyio import CapacityLimiter
8+ from anyio import CapacityLimiter , lowlevel
99
1010import mcp .types as types
1111from mcp .server .message_queue .base import MessageCallback
@@ -42,6 +42,7 @@ def __init__(
4242 self ._prefix = prefix
4343 self ._active_sessions_key = f"{ prefix } active_sessions"
4444 self ._callbacks : dict [UUID , MessageCallback ] = {}
45+ # Ensures only one polling task runs at a time for message handling
4546 self ._limiter = CapacityLimiter (1 )
4647 logger .debug (f"Initialized Redis message queue with URL: { redis_url } " )
4748
@@ -73,6 +74,7 @@ async def _listen_for_messages(self) -> None:
7374 """Background task that listens for messages on subscribed channels."""
7475 async with self ._limiter :
7576 while True :
77+ await lowlevel .checkpoint ()
7678 message : None | dict [str , Any ] = await self ._pubsub .get_message ( # type: ignore
7779 ignore_subscribe_messages = True ,
7880 timeout = None , # type: ignore
You can’t perform that action at this time.
0 commit comments