1+ """
2+ Redis Message Queue Module for MCP Server
3+
4+ This module implements a Redis-backed message queue for handling messages between clients and servers.
5+ """
6+
7+ import json
8+ import logging
9+ from uuid import UUID
10+
11+ import mcp .types as types
12+
13+ try :
14+ import redis .asyncio as redis
15+ except ImportError :
16+ raise ImportError (
17+ "Redis support requires the 'redis' package. "
18+ "Install it with: 'uv add redis' or 'uv add \" mcp[redis]\" '"
19+ )
20+
21+ logger = logging .getLogger (__name__ )
22+
23+
24+ class RedisMessageQueue :
25+ """Redis implementation of the MessageQueue interface.
26+
27+ This implementation uses Redis lists to store messages for each session.
28+ Redis provides persistence and allows multiple servers to share the same queue.
29+ """
30+
31+ def __init__ (self , redis_url : str = "redis://localhost:6379/0" , prefix : str = "mcp:queue:" ) -> None :
32+ """Initialize Redis message queue.
33+
34+ Args:
35+ redis_url: Redis connection string
36+ prefix: Key prefix for Redis keys to avoid collisions
37+ """
38+ self ._redis = redis .Redis .from_url (redis_url , decode_responses = True )
39+ self ._prefix = prefix
40+ self ._active_sessions_key = f"{ prefix } active_sessions"
41+ logger .debug (f"Initialized Redis message queue with URL: { redis_url } " )
42+
43+ def _session_queue_key (self , session_id : UUID ) -> str :
44+ """Get the Redis key for a session's message queue."""
45+ return f"{ self ._prefix } session:{ session_id .hex } "
46+
47+ async def add_message (self , session_id : UUID , message : types .JSONRPCMessage | Exception ) -> bool :
48+ """Add a message to the queue for the specified session."""
49+ # Check if session exists
50+ if not await self .session_exists (session_id ):
51+ logger .warning (f"Message received for unknown session { session_id } " )
52+ return False
53+
54+ # Serialize the message
55+ if isinstance (message , Exception ):
56+ # For exceptions, store them as special format
57+ data = json .dumps ({
58+ "_exception" : True ,
59+ "type" : type (message ).__name__ ,
60+ "message" : str (message )
61+ })
62+ else :
63+ data = message .model_dump_json (by_alias = True , exclude_none = True )
64+
65+ # Push to the right side of the list (queue)
66+ await self ._redis .rpush (self ._session_queue_key (session_id ), data )
67+ logger .debug (f"Added message to Redis queue for session { session_id } " )
68+ return True
69+
70+ async def get_message (self , session_id : UUID , timeout : float = 0.1 ) -> types .JSONRPCMessage | Exception | None :
71+ """Get the next message for the specified session."""
72+ # Check if session exists
73+ if not await self .session_exists (session_id ):
74+ return None
75+
76+ # Pop from the left side of the list (queue)
77+ # Use BLPOP with timeout to avoid busy waiting
78+ result = await self ._redis .blpop ([self ._session_queue_key (session_id )], timeout )
79+
80+ if not result :
81+ return None
82+
83+ # result is a tuple of (key, value)
84+ _ , data = result
85+
86+ # Deserialize the message
87+ json_data = json .loads (data )
88+
89+ # Check if it's an exception
90+ if isinstance (json_data , dict ) and json_data .get ("_exception" ):
91+ # Reconstitute a generic exception
92+ return Exception (f"{ json_data ['type' ]} : { json_data ['message' ]} " )
93+
94+ # Regular message
95+ try :
96+ return types .JSONRPCMessage .model_validate_json (data )
97+ except Exception as e :
98+ logger .error (f"Failed to deserialize message: { e } " )
99+ return None
100+
101+ async def register_session (self , session_id : UUID ) -> None :
102+ """Register a new session with the queue."""
103+ # Add session ID to the set of active sessions
104+ await self ._redis .sadd (self ._active_sessions_key , session_id .hex )
105+ logger .debug (f"Registered session { session_id } in Redis" )
106+
107+ async def unregister_session (self , session_id : UUID ) -> None :
108+ """Unregister a session when it's closed."""
109+ # Remove session ID from active sessions
110+ await self ._redis .srem (self ._active_sessions_key , session_id .hex )
111+ # Delete the session's message queue
112+ await self ._redis .delete (self ._session_queue_key (session_id ))
113+ logger .debug (f"Unregistered session { session_id } from Redis" )
114+
115+ async def session_exists (self , session_id : UUID ) -> bool :
116+ """Check if a session exists."""
117+ return bool (await self ._redis .sismember (self ._active_sessions_key , session_id .hex ))
0 commit comments