From ad814086af63e43eb43e797803c0a821b4fb2ffc Mon Sep 17 00:00:00 2001 From: harshit Date: Mon, 15 Dec 2025 05:33:08 +0000 Subject: [PATCH 01/18] add queue monitor Signed-off-by: harshit --- python/ray/serve/_private/queue_monitor.py | 323 ++++++++++++++++++ python/ray/serve/_private/test_utils.py | 2 +- .../serve/tests/unit/test_queue_monitor.py | 155 +++++++++ 3 files changed, 479 insertions(+), 1 deletion(-) create mode 100644 python/ray/serve/_private/queue_monitor.py create mode 100644 python/ray/serve/tests/unit/test_queue_monitor.py diff --git a/python/ray/serve/_private/queue_monitor.py b/python/ray/serve/_private/queue_monitor.py new file mode 100644 index 000000000000..67dc5069aeab --- /dev/null +++ b/python/ray/serve/_private/queue_monitor.py @@ -0,0 +1,323 @@ +import logging +from typing import Any, Dict + +import pika +import redis + +import ray +from ray.serve._private.constants import SERVE_LOGGER_NAME + +logger = logging.getLogger(SERVE_LOGGER_NAME) + +# Actor name prefix for QueueMonitor actors +QUEUE_MONITOR_ACTOR_PREFIX = "QUEUE_MONITOR::" + + +class QueueMonitorConfig: + """Configuration for the QueueMonitor deployment.""" + + def __init__( + self, + broker_url: str, + queue_name: str, + ): + self.broker_url = broker_url + self.queue_name = queue_name + + @property + def broker_type(self) -> str: + url_lower = self.broker_url.lower() + if url_lower.startswith("redis"): + return "redis" + elif url_lower.startswith("amqp") or url_lower.startswith("pyamqp"): + return "rabbitmq" + else: + return "unknown" + + +class QueueMonitor: + """ + Actor that monitors queue length by directly querying the broker. + + Returns pending tasks in the queue. + + Uses native broker clients: + - Redis: Uses redis-py library with LLEN command + - RabbitMQ: Uses pika library with passive queue declaration + """ + + def __init__(self, config: QueueMonitorConfig): + self._config = config + self._last_queue_length: int = 0 + self._is_initialized: bool = False + + # Redis connection state + self._redis_client: Any = None + + # RabbitMQ connection state + self._rabbitmq_connection: Any = None + self._rabbitmq_channel: Any = None + + def initialize(self) -> None: + """ + Initialize connection to the broker. + + Creates the appropriate client based on broker type and tests the connection. + """ + if self._is_initialized: + return + + broker_type = self._config.broker_type + try: + if broker_type == "redis": + self._init_redis() + elif broker_type == "rabbitmq": + self._init_rabbitmq() + else: + raise ValueError( + f"Unsupported broker type: {broker_type}. Supported: redis, rabbitmq" + ) + + self._is_initialized = True + logger.info( + f"QueueMonitor initialized for queue '{self._config.queue_name}' (broker: {broker_type})" + ) + + except Exception as e: + logger.error(f"Failed to initialize QueueMonitor: {e}") + raise + + def _init_redis(self) -> None: + """Initialize Redis client.""" + self._redis_client = redis.from_url(self._config.broker_url) + + # Test connection + self._redis_client.ping() + + def _init_rabbitmq(self) -> None: + """Initialize RabbitMQ connection and channel.""" + # Store connection parameters for reconnection + self._rabbitmq_connection_params = pika.URLParameters(self._config.broker_url) + + # Establish persistent connection and channel + self._rabbitmq_connection = pika.BlockingConnection( + self._rabbitmq_connection_params + ) + self._rabbitmq_channel = self._rabbitmq_connection.channel() + + def _ensure_redis_connection(self) -> None: + """Ensure Redis connection is open, reconnecting if necessary.""" + if self._redis_client is None or self._redis_client.ping() != "PONG": + logger.warning("Redis connection lost, reconnecting...") + self._redis_client = redis.from_url(self._config.broker_url) + + def _ensure_rabbitmq_connection(self) -> None: + """Ensure RabbitMQ connection is open, reconnecting if necessary.""" + if ( + self._rabbitmq_connection is None + or self._rabbitmq_connection.is_closed + or self._rabbitmq_channel is None + or self._rabbitmq_channel.is_closed + ): + logger.warning("RabbitMQ connection lost, reconnecting...") + self._rabbitmq_connection = pika.BlockingConnection( + self._rabbitmq_connection_params + ) + self._rabbitmq_channel = self._rabbitmq_connection.channel() + + def _get_redis_queue_length(self) -> int: + """ + Get pending tasks from Redis broker. + + Returns: + Number of pending tasks in the queue. + """ + self._ensure_redis_connection() + return self._redis_client.llen(self._config.queue_name) + + def _get_rabbitmq_queue_length(self) -> int: + """ + Get pending tasks from RabbitMQ broker. + + Returns: + Number of pending (ready) messages in the queue. + """ + self._ensure_rabbitmq_connection() + + # Passive declaration - doesn't create queue, just gets info + result = self._rabbitmq_channel.queue_declare( + queue=self._config.queue_name, passive=True + ) + + return result.method.message_count + + def get_config(self) -> Dict[str, Any]: + """ + Get the QueueMonitor configuration as a serializable dict. + + Returns: + Dict with 'broker_url' and 'queue_name' keys + """ + return { + "broker_url": self._config.broker_url, + "queue_name": self._config.queue_name, + } + + def get_queue_length(self) -> int: + """ + Get the current queue length from the broker. + + Returns: + Number of pending tasks in the queue. + """ + if not self._is_initialized: + logger.warning( + f"QueueMonitor not initialized for queue '{self._config.queue_name}', returning 0" + ) + return 0 + + try: + broker_type = self._config.broker_type + + if broker_type == "redis": + queue_length = self._get_redis_queue_length() + elif broker_type == "rabbitmq": + queue_length = self._get_rabbitmq_queue_length() + else: + raise ValueError(f"Unsupported broker type: {broker_type}") + + # Update cache + self._last_queue_length = queue_length + + return queue_length + + except Exception as e: + logger.warning( + f"Failed to query queue length: {e}. Using last known value: {self._last_queue_length}" + ) + return self._last_queue_length + + def shutdown(self) -> None: + # Close Redis client if present + if getattr(self, "_redis_client", None) is not None: + try: + if hasattr(self._redis_client, "close"): + self._redis_client.close() + except Exception as e: + logger.warning(f"Error closing Redis client: {e}") + self._redis_client = None + + # Close RabbitMQ connection if present + if getattr(self, "_rabbitmq_connection", None) is not None: + try: + if not self._rabbitmq_connection.is_closed: + self._rabbitmq_connection.close() + except Exception as e: + logger.warning(f"Error closing RabbitMQ connection: {e}") + self._rabbitmq_connection = None + self._rabbitmq_channel = None + + if hasattr(self, "_is_initialized"): + self._is_initialized = False + + def __del__(self): + self.shutdown() + + +@ray.remote(num_cpus=0, runtime_env={"pip": ["pika", "redis"]}) +class QueueMonitorActor(QueueMonitor): + """ + Ray actor version of QueueMonitor for direct access from ServeController. + + This is used instead of a Serve deployment because the autoscaling policy + runs inside the ServeController, and using serve.get_deployment_handle() + from within the controller causes a deadlock. + """ + + def __init__(self, config: QueueMonitorConfig): + super().__init__(config) + self.initialize() + + +def create_queue_monitor_actor( + deployment_name: str, + config: QueueMonitorConfig, + namespace: str = "serve", +) -> ray.actor.ActorHandle: + """ + Create a named QueueMonitor Ray actor. + + Args: + deployment_name: Name of the deployment + config: QueueMonitorConfig with broker URL and queue name + namespace: Ray namespace for the actor + + Returns: + ActorHandle for the QueueMonitor actor + """ + full_actor_name = f"{QUEUE_MONITOR_ACTOR_PREFIX}{deployment_name}" + + # Check if actor already exists + try: + existing = ray.get_actor(full_actor_name, namespace=namespace) + logger.info(f"QueueMonitor actor '{full_actor_name}' already exists, reusing") + + return existing + except ValueError: + pass # Actor doesn't exist, create it + + actor = QueueMonitorActor.options( + name=full_actor_name, + namespace=namespace, + ).remote(config) + + logger.info( + f"Created QueueMonitor actor '{full_actor_name}' in namespace '{namespace}'" + ) + return actor + + +def get_queue_monitor_actor( + deployment_name: str, + namespace: str = "serve", +) -> ray.actor.ActorHandle: + """ + Get an existing QueueMonitor actor by name. + + Args: + deployment_name: Name of the deployment + namespace: Ray namespace + + Returns: + ActorHandle for the QueueMonitor actor + + Raises: + ValueError: If actor doesn't exist + """ + full_actor_name = f"{QUEUE_MONITOR_ACTOR_PREFIX}{deployment_name}" + return ray.get_actor(full_actor_name, namespace=namespace) + + +def delete_queue_monitor_actor( + deployment_name: str, + namespace: str = "serve", +) -> bool: + """ + Delete a QueueMonitor actor by name. + + Args: + deployment_name: Name of the deployment + namespace: Ray namespace + + Returns: + True if actor was deleted, False if it didn't exist + """ + full_actor_name = f"{QUEUE_MONITOR_ACTOR_PREFIX}{deployment_name}" + try: + actor = ray.get_actor(full_actor_name, namespace=namespace) + ray.kill(actor) + logger.info(f"Deleted QueueMonitor actor '{full_actor_name}'") + return True + except ValueError: + # Actor doesn't exist + return False diff --git a/python/ray/serve/_private/test_utils.py b/python/ray/serve/_private/test_utils.py index db8cd4def7ca..623129481451 100644 --- a/python/ray/serve/_private/test_utils.py +++ b/python/ray/serve/_private/test_utils.py @@ -302,7 +302,7 @@ def check_num_replicas_eq( target: int, app_name: str = SERVE_DEFAULT_APP_NAME, use_controller: bool = False, -) -> int: +) -> bool: """Check if num replicas is == target.""" if use_controller: diff --git a/python/ray/serve/tests/unit/test_queue_monitor.py b/python/ray/serve/tests/unit/test_queue_monitor.py new file mode 100644 index 000000000000..ba66656de906 --- /dev/null +++ b/python/ray/serve/tests/unit/test_queue_monitor.py @@ -0,0 +1,155 @@ +import sys +from unittest.mock import MagicMock, patch + +import pytest + +from ray.serve._private.queue_monitor import ( + QueueMonitor, + QueueMonitorConfig, +) + + +class TestQueueMonitorConfig: + """Tests for QueueMonitorConfig class.""" + + def test_redis_broker_type(self): + """Test Redis broker type detection.""" + config = QueueMonitorConfig( + broker_url="redis://localhost:6379/0", + queue_name="test_queue", + ) + assert config.broker_type == "redis" + + def test_redis_with_password_broker_type(self): + """Test Redis with password broker type detection.""" + config = QueueMonitorConfig( + broker_url="rediss://user:password@localhost:6379/0", + queue_name="test_queue", + ) + assert config.broker_type == "redis" + + def test_rabbitmq_amqp_broker_type(self): + """Test RabbitMQ AMQP broker type detection.""" + config = QueueMonitorConfig( + broker_url="amqp://guest:guest@localhost:5672//", + queue_name="test_queue", + ) + assert config.broker_type == "rabbitmq" + + def test_rabbitmq_pyamqp_broker_type(self): + """Test RabbitMQ pyamqp broker type detection.""" + config = QueueMonitorConfig( + broker_url="pyamqp://guest:guest@localhost:5672//", + queue_name="test_queue", + ) + assert config.broker_type == "rabbitmq" + + def test_unknown_broker_type(self): + """Test unknown broker type detection.""" + config = QueueMonitorConfig( + broker_url="some://unknown/broker", + queue_name="test_queue", + ) + assert config.broker_type == "unknown" + + def test_config_stores_values(self): + """Test config stores broker_url and queue_name.""" + config = QueueMonitorConfig( + broker_url="amqp://guest:guest@localhost:5672//", + queue_name="my_queue", + ) + assert config.broker_url == "amqp://guest:guest@localhost:5672//" + assert config.queue_name == "my_queue" + + +class TestQueueMonitor: + """Tests for QueueMonitor class.""" + + @pytest.fixture + def redis_config(self): + """Provides a Redis QueueMonitorConfig.""" + return QueueMonitorConfig( + broker_url="redis://localhost:6379/0", + queue_name="test_queue", + ) + + @pytest.fixture + def rabbitmq_config(self): + """Provides a RabbitMQ QueueMonitorConfig.""" + return QueueMonitorConfig( + broker_url="amqp://guest:guest@localhost:5672//", + queue_name="test_queue", + ) + + @patch("ray.serve._private.queue_monitor.redis") + def test_get_redis_queue_length(self, mock_redis_module, redis_config): + """Test Redis queue length returns pending tasks.""" + mock_client = MagicMock() + mock_client.llen.return_value = 30 + mock_redis_module.from_url.return_value = mock_client + + monitor = QueueMonitor(redis_config) + monitor.initialize() + length = monitor.get_queue_length() + + assert length == 30 + mock_client.llen.assert_called_with("test_queue") + + @patch("ray.serve._private.queue_monitor.pika") + def test_get_rabbitmq_queue_length(self, mock_pika, rabbitmq_config): + """Test RabbitMQ queue length retrieval via AMQP.""" + mock_params = MagicMock() + mock_pika.URLParameters.return_value = mock_params + + # Mock connection and channel - connection is reused across calls + mock_connection = MagicMock() + mock_channel = MagicMock() + mock_result = MagicMock() + mock_result.method.message_count = 25 + + # Ensure connection appears open so it's not recreated + mock_connection.is_closed = False + mock_channel.is_closed = False + + mock_connection.channel.return_value = mock_channel + mock_channel.queue_declare.return_value = mock_result + mock_pika.BlockingConnection.return_value = mock_connection + + monitor = QueueMonitor(rabbitmq_config) + monitor.initialize() + length = monitor.get_queue_length() + + assert length == 25 + # Connection is established once during initialization + mock_pika.BlockingConnection.assert_called_once() + mock_channel.queue_declare.assert_called_with( + queue="test_queue", + passive=True, + ) + + @patch("ray.serve._private.queue_monitor.redis") + def test_get_queue_length_returns_cached_on_error( + self, mock_redis_module, redis_config + ): + """Test get_queue_length returns cached value on error.""" + mock_client = MagicMock() + mock_client.llen.return_value = 50 + mock_redis_module.from_url.return_value = mock_client + + monitor = QueueMonitor(redis_config) + monitor.initialize() + + # First successful query + length = monitor.get_queue_length() + assert length == 50 + + # Now make queries fail + mock_client.llen.side_effect = Exception("Connection lost") + + # Should return cached value + length = monitor.get_queue_length() + assert length == 50 + + +if __name__ == "__main__": + sys.exit(pytest.main(["-v", "-s", __file__])) From 1acaaf047383f3c4ed806e5994aad92346412527 Mon Sep 17 00:00:00 2001 From: harshit Date: Thu, 18 Dec 2025 04:33:38 +0000 Subject: [PATCH 02/18] move pika and redis import inside actor Signed-off-by: harshit --- python/ray/serve/_private/queue_monitor.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/python/ray/serve/_private/queue_monitor.py b/python/ray/serve/_private/queue_monitor.py index 67dc5069aeab..3677bc47d645 100644 --- a/python/ray/serve/_private/queue_monitor.py +++ b/python/ray/serve/_private/queue_monitor.py @@ -1,9 +1,6 @@ import logging from typing import Any, Dict -import pika -import redis - import ray from ray.serve._private.constants import SERVE_LOGGER_NAME @@ -36,6 +33,7 @@ def broker_type(self) -> str: class QueueMonitor: + """ Actor that monitors queue length by directly querying the broker. @@ -88,6 +86,8 @@ def initialize(self) -> None: raise def _init_redis(self) -> None: + import redis + """Initialize Redis client.""" self._redis_client = redis.from_url(self._config.broker_url) @@ -95,6 +95,8 @@ def _init_redis(self) -> None: self._redis_client.ping() def _init_rabbitmq(self) -> None: + import pika + """Initialize RabbitMQ connection and channel.""" # Store connection parameters for reconnection self._rabbitmq_connection_params = pika.URLParameters(self._config.broker_url) @@ -106,12 +108,16 @@ def _init_rabbitmq(self) -> None: self._rabbitmq_channel = self._rabbitmq_connection.channel() def _ensure_redis_connection(self) -> None: + import redis + """Ensure Redis connection is open, reconnecting if necessary.""" if self._redis_client is None or self._redis_client.ping() != "PONG": logger.warning("Redis connection lost, reconnecting...") self._redis_client = redis.from_url(self._config.broker_url) def _ensure_rabbitmq_connection(self) -> None: + import pika + """Ensure RabbitMQ connection is open, reconnecting if necessary.""" if ( self._rabbitmq_connection is None From 90afee6f42e0ef14b47d4be5e48ff917b0265425 Mon Sep 17 00:00:00 2001 From: harshit Date: Thu, 18 Dec 2025 04:44:08 +0000 Subject: [PATCH 03/18] fix tests Signed-off-by: harshit --- .../serve/tests/unit/test_queue_monitor.py | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/python/ray/serve/tests/unit/test_queue_monitor.py b/python/ray/serve/tests/unit/test_queue_monitor.py index ba66656de906..5e9c974f121e 100644 --- a/python/ray/serve/tests/unit/test_queue_monitor.py +++ b/python/ray/serve/tests/unit/test_queue_monitor.py @@ -81,12 +81,12 @@ def rabbitmq_config(self): queue_name="test_queue", ) - @patch("ray.serve._private.queue_monitor.redis") - def test_get_redis_queue_length(self, mock_redis_module, redis_config): + @patch("redis.from_url") + def test_get_redis_queue_length(self, mock_from_url, redis_config): """Test Redis queue length returns pending tasks.""" mock_client = MagicMock() mock_client.llen.return_value = 30 - mock_redis_module.from_url.return_value = mock_client + mock_from_url.return_value = mock_client monitor = QueueMonitor(redis_config) monitor.initialize() @@ -95,11 +95,14 @@ def test_get_redis_queue_length(self, mock_redis_module, redis_config): assert length == 30 mock_client.llen.assert_called_with("test_queue") - @patch("ray.serve._private.queue_monitor.pika") - def test_get_rabbitmq_queue_length(self, mock_pika, rabbitmq_config): + @patch("pika.BlockingConnection") + @patch("pika.URLParameters") + def test_get_rabbitmq_queue_length( + self, mock_url_params, mock_blocking_connection, rabbitmq_config + ): """Test RabbitMQ queue length retrieval via AMQP.""" mock_params = MagicMock() - mock_pika.URLParameters.return_value = mock_params + mock_url_params.return_value = mock_params # Mock connection and channel - connection is reused across calls mock_connection = MagicMock() @@ -113,7 +116,7 @@ def test_get_rabbitmq_queue_length(self, mock_pika, rabbitmq_config): mock_connection.channel.return_value = mock_channel mock_channel.queue_declare.return_value = mock_result - mock_pika.BlockingConnection.return_value = mock_connection + mock_blocking_connection.return_value = mock_connection monitor = QueueMonitor(rabbitmq_config) monitor.initialize() @@ -121,20 +124,20 @@ def test_get_rabbitmq_queue_length(self, mock_pika, rabbitmq_config): assert length == 25 # Connection is established once during initialization - mock_pika.BlockingConnection.assert_called_once() + mock_blocking_connection.assert_called_once() mock_channel.queue_declare.assert_called_with( queue="test_queue", passive=True, ) - @patch("ray.serve._private.queue_monitor.redis") + @patch("redis.from_url") def test_get_queue_length_returns_cached_on_error( - self, mock_redis_module, redis_config + self, mock_from_url, redis_config ): """Test get_queue_length returns cached value on error.""" mock_client = MagicMock() mock_client.llen.return_value = 50 - mock_redis_module.from_url.return_value = mock_client + mock_from_url.return_value = mock_client monitor = QueueMonitor(redis_config) monitor.initialize() From ceb80ec9e17c5cdae634111399d11f7463753ca8 Mon Sep 17 00:00:00 2001 From: harshit Date: Thu, 18 Dec 2025 07:34:51 +0000 Subject: [PATCH 04/18] fix tests Signed-off-by: harshit --- .../ray/serve/tests/unit/test_queue_monitor.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/python/ray/serve/tests/unit/test_queue_monitor.py b/python/ray/serve/tests/unit/test_queue_monitor.py index 5e9c974f121e..cfb3ff1f233d 100644 --- a/python/ray/serve/tests/unit/test_queue_monitor.py +++ b/python/ray/serve/tests/unit/test_queue_monitor.py @@ -9,6 +9,23 @@ ) +# Create mock modules for pika and redis to allow patching even when not installed +@pytest.fixture(autouse=True) +def mock_broker_modules(): + """Inject mock pika and redis modules into sys.modules for testing.""" + mock_pika = MagicMock() + mock_redis = MagicMock() + + with patch.dict( + sys.modules, + { + "pika": mock_pika, + "redis": mock_redis, + }, + ): + yield {"pika": mock_pika, "redis": mock_redis} + + class TestQueueMonitorConfig: """Tests for QueueMonitorConfig class.""" From d65d94d3e016978da924156bb6d8166a8f664574 Mon Sep 17 00:00:00 2001 From: harshit Date: Fri, 19 Dec 2025 07:45:39 +0000 Subject: [PATCH 05/18] review comments Signed-off-by: harshit --- python/ray/serve/_private/queue_monitor.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/python/ray/serve/_private/queue_monitor.py b/python/ray/serve/_private/queue_monitor.py index 3677bc47d645..47778113ed6b 100644 --- a/python/ray/serve/_private/queue_monitor.py +++ b/python/ray/serve/_private/queue_monitor.py @@ -108,10 +108,17 @@ def _init_rabbitmq(self) -> None: self._rabbitmq_channel = self._rabbitmq_connection.channel() def _ensure_redis_connection(self) -> None: + """Ensure Redis connection is open, reconnecting if necessary.""" import redis - """Ensure Redis connection is open, reconnecting if necessary.""" - if self._redis_client is None or self._redis_client.ping() != "PONG": + needs_reconnect = self._redis_client is None + if not needs_reconnect: + try: + needs_reconnect = not self._redis_client.ping() + except redis.ConnectionError: + needs_reconnect = True + + if needs_reconnect: logger.warning("Redis connection lost, reconnecting...") self._redis_client = redis.from_url(self._config.broker_url) From 29cbde5a9fe37af99c394b989f2f1e9c75724b74 Mon Sep 17 00:00:00 2001 From: harshit Date: Sun, 4 Jan 2026 04:39:05 +0000 Subject: [PATCH 06/18] review changes Signed-off-by: harshit --- python/ray/serve/_private/queue_monitor.py | 207 +++--------------- python/ray/serve/tests/test_queue_monitor.py | 131 +++++++++++ .../serve/tests/unit/test_queue_monitor.py | 175 --------------- 3 files changed, 162 insertions(+), 351 deletions(-) create mode 100644 python/ray/serve/tests/test_queue_monitor.py delete mode 100644 python/ray/serve/tests/unit/test_queue_monitor.py diff --git a/python/ray/serve/_private/queue_monitor.py b/python/ray/serve/_private/queue_monitor.py index 47778113ed6b..761b9c78e16f 100644 --- a/python/ray/serve/_private/queue_monitor.py +++ b/python/ray/serve/_private/queue_monitor.py @@ -4,6 +4,11 @@ import ray from ray.serve._private.constants import SERVE_LOGGER_NAME +try: + import flower +except ImportError: + flower = None + logger = logging.getLogger(SERVE_LOGGER_NAME) # Actor name prefix for QueueMonitor actors @@ -17,22 +22,15 @@ def __init__( self, broker_url: str, queue_name: str, + rabbitmq_http_url: str = "http://guest:guest@localhost:15672/api/", ): self.broker_url = broker_url self.queue_name = queue_name - - @property - def broker_type(self) -> str: - url_lower = self.broker_url.lower() - if url_lower.startswith("redis"): - return "redis" - elif url_lower.startswith("amqp") or url_lower.startswith("pyamqp"): - return "rabbitmq" - else: - return "unknown" + self.rabbitmq_http_url = rabbitmq_http_url -class QueueMonitor: +@ray.remote(num_cpus=1) +class QueueMonitorActor: """ Actor that monitors queue length by directly querying the broker. @@ -45,124 +43,20 @@ class QueueMonitor: """ def __init__(self, config: QueueMonitorConfig): + if flower is None: + raise ImportError( + "QueueMonitor requires the 'flower' package to be installed to query broker " + "state. Please install it in the same environment as Serve." + ) + self._config = config self._last_queue_length: int = 0 self._is_initialized: bool = False - # Redis connection state - self._redis_client: Any = None - - # RabbitMQ connection state - self._rabbitmq_connection: Any = None - self._rabbitmq_channel: Any = None - - def initialize(self) -> None: - """ - Initialize connection to the broker. - - Creates the appropriate client based on broker type and tests the connection. - """ - if self._is_initialized: - return - - broker_type = self._config.broker_type - try: - if broker_type == "redis": - self._init_redis() - elif broker_type == "rabbitmq": - self._init_rabbitmq() - else: - raise ValueError( - f"Unsupported broker type: {broker_type}. Supported: redis, rabbitmq" - ) - - self._is_initialized = True - logger.info( - f"QueueMonitor initialized for queue '{self._config.queue_name}' (broker: {broker_type})" - ) - - except Exception as e: - logger.error(f"Failed to initialize QueueMonitor: {e}") - raise - - def _init_redis(self) -> None: - import redis - - """Initialize Redis client.""" - self._redis_client = redis.from_url(self._config.broker_url) - - # Test connection - self._redis_client.ping() - - def _init_rabbitmq(self) -> None: - import pika - - """Initialize RabbitMQ connection and channel.""" - # Store connection parameters for reconnection - self._rabbitmq_connection_params = pika.URLParameters(self._config.broker_url) - - # Establish persistent connection and channel - self._rabbitmq_connection = pika.BlockingConnection( - self._rabbitmq_connection_params + self._flower_broker = flower.Broker( + self._config.broker_url, self._config.rabbitmq_http_url ) - self._rabbitmq_channel = self._rabbitmq_connection.channel() - - def _ensure_redis_connection(self) -> None: - """Ensure Redis connection is open, reconnecting if necessary.""" - import redis - - needs_reconnect = self._redis_client is None - if not needs_reconnect: - try: - needs_reconnect = not self._redis_client.ping() - except redis.ConnectionError: - needs_reconnect = True - - if needs_reconnect: - logger.warning("Redis connection lost, reconnecting...") - self._redis_client = redis.from_url(self._config.broker_url) - - def _ensure_rabbitmq_connection(self) -> None: - import pika - - """Ensure RabbitMQ connection is open, reconnecting if necessary.""" - if ( - self._rabbitmq_connection is None - or self._rabbitmq_connection.is_closed - or self._rabbitmq_channel is None - or self._rabbitmq_channel.is_closed - ): - logger.warning("RabbitMQ connection lost, reconnecting...") - self._rabbitmq_connection = pika.BlockingConnection( - self._rabbitmq_connection_params - ) - self._rabbitmq_channel = self._rabbitmq_connection.channel() - - def _get_redis_queue_length(self) -> int: - """ - Get pending tasks from Redis broker. - - Returns: - Number of pending tasks in the queue. - """ - self._ensure_redis_connection() - return self._redis_client.llen(self._config.queue_name) - - def _get_rabbitmq_queue_length(self) -> int: - """ - Get pending tasks from RabbitMQ broker. - - Returns: - Number of pending (ready) messages in the queue. - """ - self._ensure_rabbitmq_connection() - - # Passive declaration - doesn't create queue, just gets info - result = self._rabbitmq_channel.queue_declare( - queue=self._config.queue_name, passive=True - ) - - return result.method.message_count + self._is_initialized = True def get_config(self) -> Dict[str, Any]: """ @@ -174,6 +68,7 @@ def get_config(self) -> Dict[str, Any]: return { "broker_url": self._config.broker_url, "queue_name": self._config.queue_name, + "rabbitmq_http_url": self._config.rabbitmq_http_url, } def get_queue_length(self) -> int: @@ -190,19 +85,16 @@ def get_queue_length(self) -> int: return 0 try: - broker_type = self._config.broker_type - - if broker_type == "redis": - queue_length = self._get_redis_queue_length() - elif broker_type == "rabbitmq": - queue_length = self._get_rabbitmq_queue_length() - else: - raise ValueError(f"Unsupported broker type: {broker_type}") + queues = self._flower_broker.queues([self._config.queue_name]) + print(f"queues: {queues}") + if queues is not None: + for q in queues: + if q.get("name") == self._config.queue_name: + queue_length = q.get("messages") + self._last_queue_length = queue_length + return queue_length - # Update cache - self._last_queue_length = queue_length - - return queue_length + return self._last_queue_length except Exception as e: logger.warning( @@ -211,45 +103,8 @@ def get_queue_length(self) -> int: return self._last_queue_length def shutdown(self) -> None: - # Close Redis client if present - if getattr(self, "_redis_client", None) is not None: - try: - if hasattr(self._redis_client, "close"): - self._redis_client.close() - except Exception as e: - logger.warning(f"Error closing Redis client: {e}") - self._redis_client = None - - # Close RabbitMQ connection if present - if getattr(self, "_rabbitmq_connection", None) is not None: - try: - if not self._rabbitmq_connection.is_closed: - self._rabbitmq_connection.close() - except Exception as e: - logger.warning(f"Error closing RabbitMQ connection: {e}") - self._rabbitmq_connection = None - self._rabbitmq_channel = None - - if hasattr(self, "_is_initialized"): - self._is_initialized = False - - def __del__(self): - self.shutdown() - - -@ray.remote(num_cpus=0, runtime_env={"pip": ["pika", "redis"]}) -class QueueMonitorActor(QueueMonitor): - """ - Ray actor version of QueueMonitor for direct access from ServeController. - - This is used instead of a Serve deployment because the autoscaling policy - runs inside the ServeController, and using serve.get_deployment_handle() - from within the controller causes a deadlock. - """ - - def __init__(self, config: QueueMonitorConfig): - super().__init__(config) - self.initialize() + self._flower_broker.close() + self._is_initialized = False def create_queue_monitor_actor( @@ -311,7 +166,7 @@ def get_queue_monitor_actor( return ray.get_actor(full_actor_name, namespace=namespace) -def delete_queue_monitor_actor( +def kill_queue_monitor_actor( deployment_name: str, namespace: str = "serve", ) -> bool: diff --git a/python/ray/serve/tests/test_queue_monitor.py b/python/ray/serve/tests/test_queue_monitor.py new file mode 100644 index 000000000000..150109013d5a --- /dev/null +++ b/python/ray/serve/tests/test_queue_monitor.py @@ -0,0 +1,131 @@ +import sys +from types import SimpleNamespace + +import pytest + +import ray +from ray.serve._private.queue_monitor import ( + QueueMonitorActor, + QueueMonitorConfig, +) + + +@pytest.fixture(autouse=True) +def ray_local_mode(): + """Run these tests in Ray local mode so driver-side mocks apply to actors.""" + if ray.is_initialized(): + ray.shutdown() + ray.init(local_mode=True, num_cpus=2, include_dashboard=False) + yield + ray.shutdown() + + +class StubBroker: + """Picklable stub for flower.Broker used by QueueMonitorActor.""" + + init_calls = [] + queues_responses = [] + queues_side_effects = [] + close_calls = 0 + + def __init__(self, broker_url: str, rabbitmq_http_url: str): + type(self).init_calls.append((broker_url, rabbitmq_http_url)) + + def queues(self, names): + if type(self).queues_side_effects: + exc = type(self).queues_side_effects.pop(0) + if exc is not None: + raise exc + if type(self).queues_responses: + return type(self).queues_responses.pop(0) + return None + + def close(self): + type(self).close_calls += 1 + + +@pytest.fixture(autouse=True) +def stub_flower(monkeypatch): + """Patch queue_monitor.flower with a picklable stub (avoids cloudpickle/MagicMock issues).""" + import ray.serve._private.queue_monitor as qm + + StubBroker.init_calls = [] + StubBroker.queues_responses = [] + StubBroker.queues_side_effects = [] + StubBroker.close_calls = 0 + + monkeypatch.setattr(qm, "flower", SimpleNamespace(Broker=StubBroker)) + yield StubBroker + + +class TestQueueMonitorConfig: + """Tests for QueueMonitorConfig class.""" + + def test_config_stores_values(self): + """Test config stores broker_url, queue_name, and rabbitmq_http_url.""" + config = QueueMonitorConfig( + broker_url="redis://localhost:6379/0", + queue_name="test_queue", + ) + + assert config.broker_url == "redis://localhost:6379/0" + assert config.queue_name == "test_queue" + assert config.rabbitmq_http_url == "http://guest:guest@localhost:15672/api/" + + def test_config_custom_rabbitmq_http_url(self): + config = QueueMonitorConfig( + broker_url="amqp://guest:guest@localhost:5672//", + queue_name="my_queue", + rabbitmq_http_url="http://user:pass@localhost:15672/api/", + ) + + assert config.broker_url == "amqp://guest:guest@localhost:5672//" + assert config.queue_name == "my_queue" + assert config.rabbitmq_http_url == "http://user:pass@localhost:15672/api/" + + +class TestQueueMonitor: + """Tests for QueueMonitor class.""" + + @pytest.fixture + def redis_config(self): + return QueueMonitorConfig( + broker_url="redis://localhost:6379/0", + queue_name="test_queue", + ) + + def test_get_queue_length(self, stub_flower, redis_config): + """Test queue length returns number of messages from Flower broker.""" + stub_flower.queues_responses = [[{"name": "test_queue", "messages": 30}]] + + monitor = QueueMonitorActor.remote(redis_config) + length = ray.get(monitor.get_queue_length.remote()) + + assert length == 30 + + def test_get_queue_length_returns_cached_on_error(self, stub_flower, redis_config): + """Test get_queue_length returns cached value on error.""" + stub_flower.queues_side_effects = [None, Exception("Connection lost")] + + stub_flower.queues_responses = [[{"name": "test_queue", "messages": 50}]] + + monitor = QueueMonitorActor.remote(redis_config) + + # First successful query + length = ray.get(monitor.get_queue_length.remote()) + assert length == 50 + + # Should return cached value + length = ray.get(monitor.get_queue_length.remote()) + assert length == 50 + + def test_shutdown_marks_uninitialized(self, stub_flower, redis_config): + monitor = QueueMonitorActor.remote(redis_config) + ray.get(monitor.shutdown.remote()) + + assert ray.get(monitor.get_queue_length.remote()) == 0 + assert stub_flower.close_calls == 1 + + +if __name__ == "__main__": + sys.exit(pytest.main(["-v", "-s", __file__])) diff --git a/python/ray/serve/tests/unit/test_queue_monitor.py b/python/ray/serve/tests/unit/test_queue_monitor.py deleted file mode 100644 index cfb3ff1f233d..000000000000 --- a/python/ray/serve/tests/unit/test_queue_monitor.py +++ /dev/null @@ -1,175 +0,0 @@ -import sys -from unittest.mock import MagicMock, patch - -import pytest - -from ray.serve._private.queue_monitor import ( - QueueMonitor, - QueueMonitorConfig, -) - - -# Create mock modules for pika and redis to allow patching even when not installed -@pytest.fixture(autouse=True) -def mock_broker_modules(): - """Inject mock pika and redis modules into sys.modules for testing.""" - mock_pika = MagicMock() - mock_redis = MagicMock() - - with patch.dict( - sys.modules, - { - "pika": mock_pika, - "redis": mock_redis, - }, - ): - yield {"pika": mock_pika, "redis": mock_redis} - - -class TestQueueMonitorConfig: - """Tests for QueueMonitorConfig class.""" - - def test_redis_broker_type(self): - """Test Redis broker type detection.""" - config = QueueMonitorConfig( - broker_url="redis://localhost:6379/0", - queue_name="test_queue", - ) - assert config.broker_type == "redis" - - def test_redis_with_password_broker_type(self): - """Test Redis with password broker type detection.""" - config = QueueMonitorConfig( - broker_url="rediss://user:password@localhost:6379/0", - queue_name="test_queue", - ) - assert config.broker_type == "redis" - - def test_rabbitmq_amqp_broker_type(self): - """Test RabbitMQ AMQP broker type detection.""" - config = QueueMonitorConfig( - broker_url="amqp://guest:guest@localhost:5672//", - queue_name="test_queue", - ) - assert config.broker_type == "rabbitmq" - - def test_rabbitmq_pyamqp_broker_type(self): - """Test RabbitMQ pyamqp broker type detection.""" - config = QueueMonitorConfig( - broker_url="pyamqp://guest:guest@localhost:5672//", - queue_name="test_queue", - ) - assert config.broker_type == "rabbitmq" - - def test_unknown_broker_type(self): - """Test unknown broker type detection.""" - config = QueueMonitorConfig( - broker_url="some://unknown/broker", - queue_name="test_queue", - ) - assert config.broker_type == "unknown" - - def test_config_stores_values(self): - """Test config stores broker_url and queue_name.""" - config = QueueMonitorConfig( - broker_url="amqp://guest:guest@localhost:5672//", - queue_name="my_queue", - ) - assert config.broker_url == "amqp://guest:guest@localhost:5672//" - assert config.queue_name == "my_queue" - - -class TestQueueMonitor: - """Tests for QueueMonitor class.""" - - @pytest.fixture - def redis_config(self): - """Provides a Redis QueueMonitorConfig.""" - return QueueMonitorConfig( - broker_url="redis://localhost:6379/0", - queue_name="test_queue", - ) - - @pytest.fixture - def rabbitmq_config(self): - """Provides a RabbitMQ QueueMonitorConfig.""" - return QueueMonitorConfig( - broker_url="amqp://guest:guest@localhost:5672//", - queue_name="test_queue", - ) - - @patch("redis.from_url") - def test_get_redis_queue_length(self, mock_from_url, redis_config): - """Test Redis queue length returns pending tasks.""" - mock_client = MagicMock() - mock_client.llen.return_value = 30 - mock_from_url.return_value = mock_client - - monitor = QueueMonitor(redis_config) - monitor.initialize() - length = monitor.get_queue_length() - - assert length == 30 - mock_client.llen.assert_called_with("test_queue") - - @patch("pika.BlockingConnection") - @patch("pika.URLParameters") - def test_get_rabbitmq_queue_length( - self, mock_url_params, mock_blocking_connection, rabbitmq_config - ): - """Test RabbitMQ queue length retrieval via AMQP.""" - mock_params = MagicMock() - mock_url_params.return_value = mock_params - - # Mock connection and channel - connection is reused across calls - mock_connection = MagicMock() - mock_channel = MagicMock() - mock_result = MagicMock() - mock_result.method.message_count = 25 - - # Ensure connection appears open so it's not recreated - mock_connection.is_closed = False - mock_channel.is_closed = False - - mock_connection.channel.return_value = mock_channel - mock_channel.queue_declare.return_value = mock_result - mock_blocking_connection.return_value = mock_connection - - monitor = QueueMonitor(rabbitmq_config) - monitor.initialize() - length = monitor.get_queue_length() - - assert length == 25 - # Connection is established once during initialization - mock_blocking_connection.assert_called_once() - mock_channel.queue_declare.assert_called_with( - queue="test_queue", - passive=True, - ) - - @patch("redis.from_url") - def test_get_queue_length_returns_cached_on_error( - self, mock_from_url, redis_config - ): - """Test get_queue_length returns cached value on error.""" - mock_client = MagicMock() - mock_client.llen.return_value = 50 - mock_from_url.return_value = mock_client - - monitor = QueueMonitor(redis_config) - monitor.initialize() - - # First successful query - length = monitor.get_queue_length() - assert length == 50 - - # Now make queries fail - mock_client.llen.side_effect = Exception("Connection lost") - - # Should return cached value - length = monitor.get_queue_length() - assert length == 50 - - -if __name__ == "__main__": - sys.exit(pytest.main(["-v", "-s", __file__])) From 091c25ced96ff616f069819930ad4024f8a8cb21 Mon Sep 17 00:00:00 2001 From: harshit Date: Mon, 5 Jan 2026 04:33:40 +0000 Subject: [PATCH 07/18] review changes Signed-off-by: harshit --- python/ray/serve/_private/queue_monitor.py | 11 ++++++++--- python/ray/serve/tests/BUILD.bazel | 1 + 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/python/ray/serve/_private/queue_monitor.py b/python/ray/serve/_private/queue_monitor.py index 761b9c78e16f..785951dbafea 100644 --- a/python/ray/serve/_private/queue_monitor.py +++ b/python/ray/serve/_private/queue_monitor.py @@ -29,7 +29,7 @@ def __init__( self.rabbitmq_http_url = rabbitmq_http_url -@ray.remote(num_cpus=1) +@ray.remote(num_cpus=0) class QueueMonitorActor: """ @@ -86,7 +86,6 @@ def get_queue_length(self) -> int: try: queues = self._flower_broker.queues([self._config.queue_name]) - print(f"queues: {queues}") if queues is not None: for q in queues: if q.get("name") == self._config.queue_name: @@ -94,7 +93,13 @@ def get_queue_length(self) -> int: self._last_queue_length = queue_length return queue_length - return self._last_queue_length + if self._last_queue_length is not None: + return self._last_queue_length + else: + logger.warning( + f"No queue length found for queue '{self._config.queue_name}', returning 0" + ) + return 0 except Exception as e: logger.warning( diff --git a/python/ray/serve/tests/BUILD.bazel b/python/ray/serve/tests/BUILD.bazel index f1327f9f8f60..1ed740fc9902 100644 --- a/python/ray/serve/tests/BUILD.bazel +++ b/python/ray/serve/tests/BUILD.bazel @@ -125,6 +125,7 @@ py_test_module_list( "test_multiplex.py", "test_proxy.py", "test_proxy_response_generator.py", + "test_queue_monitor.py", "test_ray_client.py", "test_record_routing_stats.py", "test_regression.py", From 931c3ca21a02bcac1f0fb258d1b1685ec459ee3f Mon Sep 17 00:00:00 2001 From: harshit Date: Tue, 6 Jan 2026 07:16:16 +0000 Subject: [PATCH 08/18] review changes Signed-off-by: harshit --- python/ray/serve/_private/broker.py | 278 ++++++++++++++++++ python/ray/serve/_private/queue_monitor.py | 26 +- python/ray/serve/tests/test_queue_monitor.py | 234 +++++++++------ .../tests/unit/test_queue_monitor_config.py | 28 ++ 4 files changed, 466 insertions(+), 100 deletions(-) create mode 100644 python/ray/serve/_private/broker.py create mode 100644 python/ray/serve/tests/unit/test_queue_monitor_config.py diff --git a/python/ray/serve/_private/broker.py b/python/ray/serve/_private/broker.py new file mode 100644 index 000000000000..177f8dfa4e96 --- /dev/null +++ b/python/ray/serve/_private/broker.py @@ -0,0 +1,278 @@ +import json +import logging +import numbers +import socket +from urllib.parse import quote, unquote, urljoin, urlparse + +from tornado import httpclient, ioloop + +try: + import redis +except ImportError: + redis = None + + +logger = logging.getLogger(__name__) + + +class BrokerBase: + def __init__(self, broker_url, *_, **__): + purl = urlparse(broker_url) + self.host = purl.hostname + self.port = purl.port + self.vhost = purl.path[1:] + + username = purl.username + password = purl.password + + self.username = unquote(username) if username else username + self.password = unquote(password) if password else password + + async def queues(self, names): + raise NotImplementedError + + def close(self): + """Close any open connections. Override in subclasses as needed.""" + pass + + +class RabbitMQ(BrokerBase): + def __init__(self, broker_url, http_api, io_loop=None, **__): + super().__init__(broker_url) + self.io_loop = io_loop or ioloop.IOLoop.instance() + + self.host = self.host or "localhost" + self.port = self.port or 15672 + self.vhost = quote(self.vhost, "") or "/" if self.vhost != "/" else self.vhost + self.username = self.username or "guest" + self.password = self.password or "guest" + + if not http_api: + http_api = f"http://{self.username}:{self.password}@{self.host}:{self.port}/api/{self.vhost}" + + try: + self.validate_http_api(http_api) + except ValueError: + logger.error("Invalid broker api url: %s", http_api) + + self.http_api = http_api + + async def queues(self, names): + url = urljoin(self.http_api, "queues/" + self.vhost) + api_url = urlparse(self.http_api) + username = unquote(api_url.username or "") or self.username + password = unquote(api_url.password or "") or self.password + + http_client = httpclient.AsyncHTTPClient() + try: + response = await http_client.fetch( + url, + auth_username=username, + auth_password=password, + connect_timeout=1.0, + request_timeout=2.0, + validate_cert=False, + ) + except (socket.error, httpclient.HTTPError) as e: + logger.error("RabbitMQ management API call failed: %s", e) + return [] + finally: + http_client.close() + + if response.code == 200: + info = json.loads(response.body.decode()) + return [x for x in info if x["name"] in names] + response.rethrow() + + @classmethod + def validate_http_api(cls, http_api): + url = urlparse(http_api) + if url.scheme not in ("http", "https"): + raise ValueError(f"Invalid http api schema: {url.scheme}") + + +class RedisBase(BrokerBase): + DEFAULT_SEP = "\x06\x16" + DEFAULT_PRIORITY_STEPS = [0, 3, 6, 9] + + def __init__(self, broker_url, *_, **kwargs): + super().__init__(broker_url) + self.redis = None + + if not redis: + raise ImportError("redis library is required") + + broker_options = kwargs.get("broker_options", {}) + self.priority_steps = broker_options.get( + "priority_steps", self.DEFAULT_PRIORITY_STEPS + ) + self.sep = broker_options.get("sep", self.DEFAULT_SEP) + self.broker_prefix = broker_options.get("global_keyprefix", "") + + def _q_for_pri(self, queue, pri): + if pri not in self.priority_steps: + raise ValueError("Priority not in priority steps") + # pylint: disable=consider-using-f-string + return "{0}{1}{2}".format(*((queue, self.sep, pri) if pri else (queue, "", ""))) + + async def queues(self, names): + queue_stats = [] + for name in names: + priority_names = [ + self.broker_prefix + self._q_for_pri(name, pri) + for pri in self.priority_steps + ] + queue_stats.append( + { + "name": name, + "messages": sum((self.redis.llen(x) for x in priority_names)), + } + ) + return queue_stats + + def close(self): + """Close the Redis connection.""" + if self.redis is not None: + self.redis.close() + self.redis = None + + +class Redis(RedisBase): + def __init__(self, broker_url, *args, **kwargs): + super().__init__(broker_url, *args, **kwargs) + self.host = self.host or "localhost" + self.port = self.port or 6379 + self.vhost = self._prepare_virtual_host(self.vhost) + self.redis = self._get_redis_client() + + def _prepare_virtual_host(self, vhost): + if not isinstance(vhost, numbers.Integral): + if not vhost or vhost == "/": + vhost = 0 + elif vhost.startswith("/"): + vhost = vhost[1:] + try: + vhost = int(vhost) + except ValueError as exc: + raise ValueError( + f"Database is int between 0 and limit - 1, not {vhost}" + ) from exc + return vhost + + def _get_redis_client_args(self): + return { + "host": self.host, + "port": self.port, + "db": self.vhost, + "username": self.username, + "password": self.password, + } + + def _get_redis_client(self): + return redis.Redis(**self._get_redis_client_args()) + + +class RedisSentinel(RedisBase): + def __init__(self, broker_url, *args, **kwargs): + super().__init__(broker_url, *args, **kwargs) + broker_options = kwargs.get("broker_options", {}) + broker_use_ssl = kwargs.get("broker_use_ssl", None) + self.host = self.host or "localhost" + self.port = self.port or 26379 + self.vhost = self._prepare_virtual_host(self.vhost) + self.master_name = self._prepare_master_name(broker_options) + self.redis = self._get_redis_client(broker_options, broker_use_ssl) + + def _prepare_virtual_host(self, vhost): + if not isinstance(vhost, numbers.Integral): + if not vhost or vhost == "/": + vhost = 0 + elif vhost.startswith("/"): + vhost = vhost[1:] + try: + vhost = int(vhost) + except ValueError as exc: + raise ValueError( + "Database is int between 0 and limit - 1, not {vhost}" + ) from exc + return vhost + + def _prepare_master_name(self, broker_options): + try: + master_name = broker_options["master_name"] + except KeyError as exc: + raise ValueError("master_name is required for Sentinel broker") from exc + return master_name + + def _get_redis_client(self, broker_options, broker_use_ssl): + connection_kwargs = { + "password": self.password, + "sentinel_kwargs": broker_options.get("sentinel_kwargs"), + } + if isinstance(broker_use_ssl, dict): + connection_kwargs["ssl"] = True + connection_kwargs.update(broker_use_ssl) + # get all sentinel hosts from Celery App config and use them to initialize Sentinel + sentinel = redis.sentinel.Sentinel( + [(self.host, self.port)], **connection_kwargs + ) + redis_client = sentinel.master_for(self.master_name) + return redis_client + + +class RedisSocket(RedisBase): + def __init__(self, broker_url, *args, **kwargs): + super().__init__(broker_url, *args, **kwargs) + self.redis = redis.Redis( + unix_socket_path="/" + self.vhost, password=self.password + ) + + +class RedisSsl(Redis): + """ + Redis SSL class offering connection to the broker over SSL. + This does not currently support SSL settings through the url, only through + the broker_use_ssl celery configuration. + """ + + def __init__(self, broker_url, *args, **kwargs): + if "broker_use_ssl" not in kwargs: + raise ValueError("rediss broker requires broker_use_ssl") + self.broker_use_ssl = kwargs.get("broker_use_ssl", {}) + super().__init__(broker_url, *args, **kwargs) + + def _get_redis_client_args(self): + client_args = super()._get_redis_client_args() + client_args["ssl"] = True + if isinstance(self.broker_use_ssl, dict): + client_args.update(self.broker_use_ssl) + return client_args + + +class Broker: + """Factory returning the appropriate broker client based on URL scheme. + + Supported schemes: + ``amqp`` or ``amqps`` -> :class:`RabbitMQ` + ``redis`` -> :class:`Redis` + ``rediss`` -> :class:`RedisSsl` + ``redis+socket`` -> :class:`RedisSocket` + ``sentinel`` -> :class:`RedisSentinel` + """ + + def __new__(cls, broker_url, *args, **kwargs): + scheme = urlparse(broker_url).scheme + if scheme in ("amqp", "amqps"): + return RabbitMQ(broker_url, *args, **kwargs) + if scheme == "redis": + return Redis(broker_url, *args, **kwargs) + if scheme == "rediss": + return RedisSsl(broker_url, *args, **kwargs) + if scheme == "redis+socket": + return RedisSocket(broker_url, *args, **kwargs) + if scheme == "sentinel": + return RedisSentinel(broker_url, *args, **kwargs) + raise NotImplementedError + + async def queues(self, names): + raise NotImplementedError diff --git a/python/ray/serve/_private/queue_monitor.py b/python/ray/serve/_private/queue_monitor.py index 785951dbafea..566c6c039a01 100644 --- a/python/ray/serve/_private/queue_monitor.py +++ b/python/ray/serve/_private/queue_monitor.py @@ -2,13 +2,9 @@ from typing import Any, Dict import ray +from ray.serve._private.broker import Broker from ray.serve._private.constants import SERVE_LOGGER_NAME -try: - import flower -except ImportError: - flower = None - logger = logging.getLogger(SERVE_LOGGER_NAME) # Actor name prefix for QueueMonitor actors @@ -39,22 +35,16 @@ class QueueMonitorActor: Uses native broker clients: - Redis: Uses redis-py library with LLEN command - - RabbitMQ: Uses pika library with passive queue declaration + - RabbitMQ: Uses HTTP management API """ def __init__(self, config: QueueMonitorConfig): - if flower is None: - raise ImportError( - "QueueMonitor requires the 'flower' package to be installed to query broker " - "state. Please install it in the same environment as Serve." - ) - self._config = config self._last_queue_length: int = 0 self._is_initialized: bool = False - self._flower_broker = flower.Broker( - self._config.broker_url, self._config.rabbitmq_http_url + self._broker = Broker( + self._config.broker_url, http_api=self._config.rabbitmq_http_url ) self._is_initialized = True @@ -71,7 +61,7 @@ def get_config(self) -> Dict[str, Any]: "rabbitmq_http_url": self._config.rabbitmq_http_url, } - def get_queue_length(self) -> int: + async def get_queue_length(self) -> int: """ Get the current queue length from the broker. @@ -85,7 +75,7 @@ def get_queue_length(self) -> int: return 0 try: - queues = self._flower_broker.queues([self._config.queue_name]) + queues = await self._broker.queues([self._config.queue_name]) if queues is not None: for q in queues: if q.get("name") == self._config.queue_name: @@ -108,7 +98,9 @@ def get_queue_length(self) -> int: return self._last_queue_length def shutdown(self) -> None: - self._flower_broker.close() + if self._broker is not None: + self._broker.close() + self._broker = None self._is_initialized = False diff --git a/python/ray/serve/tests/test_queue_monitor.py b/python/ray/serve/tests/test_queue_monitor.py index 150109013d5a..5fd398f515b7 100644 --- a/python/ray/serve/tests/test_queue_monitor.py +++ b/python/ray/serve/tests/test_queue_monitor.py @@ -1,130 +1,198 @@ +"""Integration tests for QueueMonitorActor using real Redis.""" +import os import sys -from types import SimpleNamespace import pytest +import redis import ray from ray.serve._private.queue_monitor import ( QueueMonitorActor, QueueMonitorConfig, + create_queue_monitor_actor, + get_queue_monitor_actor, + kill_queue_monitor_actor, ) +from ray.tests.conftest import external_redis # noqa: F401 -@pytest.fixture(autouse=True) -def ray_local_mode(): - """Run these tests in Ray local mode so driver-side mocks apply to actors.""" +@pytest.fixture +def ray_instance(): # noqa: F811 + """Initialize Ray with external Redis.""" if ray.is_initialized(): ray.shutdown() - ray.init(local_mode=True, num_cpus=2, include_dashboard=False) + ray.init(num_cpus=2, include_dashboard=False) yield ray.shutdown() -class StubBroker: - """Picklable stub for flower.Broker used by QueueMonitorActor.""" +@pytest.fixture +def redis_client(external_redis): # noqa: F811 + """Create a Redis client connected to the external Redis.""" + redis_address = os.environ.get("RAY_REDIS_ADDRESS") + host, port = redis_address.split(":") + client = redis.Redis(host=host, port=int(port), db=0) + yield client + client.close() - init_calls = [] - queues_responses = [] - queues_side_effects = [] - close_calls = 0 - def __init__(self, broker_url: str, rabbitmq_http_url: str): - type(self).init_calls.append((broker_url, rabbitmq_http_url)) +@pytest.fixture +def redis_config(external_redis): # noqa: F811 + """Create a QueueMonitorConfig with the external Redis URL.""" + redis_address = os.environ.get("RAY_REDIS_ADDRESS") + return QueueMonitorConfig( + broker_url=f"redis://{redis_address}/0", + queue_name="test_queue", + ) - def queues(self, names): - if type(self).queues_side_effects: - exc = type(self).queues_side_effects.pop(0) - if exc is not None: - raise exc - if type(self).queues_responses: - return type(self).queues_responses.pop(0) - return None - def close(self): - type(self).close_calls += 1 +class TestQueueMonitorActor: + """Integration tests for QueueMonitorActor with real Redis.""" + def test_get_queue_length(self, ray_instance, redis_client, redis_config): + """Test queue length returns number of messages from broker.""" + # Push some messages to the queue + for i in range(30): + redis_client.lpush("test_queue", f"message_{i}") -@pytest.fixture(autouse=True) -def stub_flower(monkeypatch): - """Patch queue_monitor.flower with a picklable stub (avoids cloudpickle/MagicMock issues).""" - import ray.serve._private.queue_monitor as qm + try: + monitor = QueueMonitorActor.remote(redis_config) + length = ray.get(monitor.get_queue_length.remote()) - StubBroker.init_calls = [] - StubBroker.queues_responses = [] - StubBroker.queues_side_effects = [] - StubBroker.close_calls = 0 + assert length == 30 + finally: + # Clean up + redis_client.delete("test_queue") - monkeypatch.setattr(qm, "flower", SimpleNamespace(Broker=StubBroker)) - yield StubBroker + def test_get_queue_length_empty_queue( + self, ray_instance, redis_client, redis_config + ): + """Test queue length returns 0 for empty queue.""" + # Ensure queue is empty + redis_client.delete("test_queue") + monitor = QueueMonitorActor.remote(redis_config) + length = ray.get(monitor.get_queue_length.remote()) -class TestQueueMonitorConfig: - """Tests for QueueMonitorConfig class.""" + assert length == 0 - def test_config_stores_values(self): - """Test config stores broker_url, queue_name, and rabbitmq_http_url.""" - config = QueueMonitorConfig( - broker_url="redis://localhost:6379/0", - queue_name="test_queue", - ) + def test_get_queue_length_returns_cached_on_error( + self, ray_instance, redis_client, redis_config + ): + """Test get_queue_length returns cached value on error.""" + # Push messages initially + for i in range(50): + redis_client.lpush("test_queue", f"message_{i}") + + try: + monitor = QueueMonitorActor.remote(redis_config) + + # First successful query + length = ray.get(monitor.get_queue_length.remote()) + assert length == 50 + + # Simulate error by deleting the queue and checking cached value is returned + # Note: The actual error path is harder to trigger with real Redis, + # so we verify the caching behavior by checking the value is consistent + length = ray.get(monitor.get_queue_length.remote()) + assert length == 50 + finally: + redis_client.delete("test_queue") + + def test_shutdown_marks_uninitialized( + self, ray_instance, redis_client, redis_config + ): + """Test shutdown cleans up resources and returns 0 for queue length.""" + redis_client.lpush("test_queue", "message") + + try: + monitor = QueueMonitorActor.remote(redis_config) + + # Verify monitor works before shutdown + length = ray.get(monitor.get_queue_length.remote()) + assert length == 1 + + # Shutdown the monitor + ray.get(monitor.shutdown.remote()) + + # After shutdown, should return 0 + length = ray.get(monitor.get_queue_length.remote()) + assert length == 0 + finally: + redis_client.delete("test_queue") + + def test_get_config(self, ray_instance, redis_config): + """Test get_config returns the configuration as a dict.""" + monitor = QueueMonitorActor.remote(redis_config) + config = ray.get(monitor.get_config.remote()) - assert config.broker_url == "redis://localhost:6379/0" - assert config.queue_name == "test_queue" - assert config.rabbitmq_http_url == "http://guest:guest@localhost:15672/api/" + assert config["broker_url"] == redis_config.broker_url + assert config["queue_name"] == redis_config.queue_name + assert config["rabbitmq_http_url"] == redis_config.rabbitmq_http_url - def test_config_custom_rabbitmq_http_url(self): - config = QueueMonitorConfig( - broker_url="amqp://guest:guest@localhost:5672//", - queue_name="my_queue", - rabbitmq_http_url="http://user:pass@localhost:15672/api/", - ) - assert config.broker_url == "amqp://guest:guest@localhost:5672//" - assert config.queue_name == "my_queue" - assert config.rabbitmq_http_url == "http://user:pass@localhost:15672/api/" +class TestQueueMonitorHelpers: + """Integration tests for queue monitor helper functions.""" + def test_create_queue_monitor_actor(self, ray_instance, redis_config): + """Test creating a named queue monitor actor.""" + actor = create_queue_monitor_actor("test_deployment", redis_config) -class TestQueueMonitor: - """Tests for QueueMonitor class.""" + # Verify the actor works + config = ray.get(actor.get_config.remote()) + assert config["queue_name"] == "test_queue" - @pytest.fixture - def redis_config(self): - return QueueMonitorConfig( - broker_url="redis://localhost:6379/0", - queue_name="test_queue", - ) + # Clean up + kill_queue_monitor_actor("test_deployment") - def test_get_queue_length(self, stub_flower, redis_config): - """Test queue length returns number of messages from Flower broker.""" - stub_flower.queues_responses = [[{"name": "test_queue", "messages": 30}]] + def test_create_queue_monitor_actor_reuses_existing( + self, ray_instance, redis_config + ): + """Test that creating an actor with the same name reuses the existing one.""" + actor1 = create_queue_monitor_actor("test_deployment", redis_config) + actor2 = create_queue_monitor_actor("test_deployment", redis_config) - monitor = QueueMonitorActor.remote(redis_config) - length = ray.get(monitor.get_queue_length.remote()) + # Both should reference the same actor + assert actor1 == actor2 - assert length == 30 + # Clean up + kill_queue_monitor_actor("test_deployment") - def test_get_queue_length_returns_cached_on_error(self, stub_flower, redis_config): - """Test get_queue_length returns cached value on error.""" - stub_flower.queues_side_effects = [None, Exception("Connection lost")] + def test_get_queue_monitor_actor(self, ray_instance, redis_config): + """Test retrieving an existing queue monitor actor.""" + actor = create_queue_monitor_actor("test_deployment", redis_config) + # Ensure actor is ready before looking it up by name + ray.get(actor.get_config.remote()) - stub_flower.queues_responses = [[{"name": "test_queue", "messages": 50}]] + retrieved_actor = get_queue_monitor_actor("test_deployment") + config = ray.get(retrieved_actor.get_config.remote()) + assert config["queue_name"] == "test_queue" - monitor = QueueMonitorActor.remote(redis_config) + # Clean up + kill_queue_monitor_actor("test_deployment") - # First successful query - length = ray.get(monitor.get_queue_length.remote()) - assert length == 50 + def test_get_queue_monitor_actor_not_found(self, ray_instance): + """Test retrieving a non-existent actor raises ValueError.""" + with pytest.raises(ValueError): + get_queue_monitor_actor("nonexistent_deployment") - # Should return cached value - length = ray.get(monitor.get_queue_length.remote()) - assert length == 50 + def test_kill_queue_monitor_actor(self, ray_instance, redis_config): + """Test killing a queue monitor actor.""" + actor = create_queue_monitor_actor("test_deployment", redis_config) + # Ensure actor is ready before trying to kill it + ray.get(actor.get_config.remote()) - def test_shutdown_marks_uninitialized(self, stub_flower, redis_config): - monitor = QueueMonitorActor.remote(redis_config) - ray.get(monitor.shutdown.remote()) + result = kill_queue_monitor_actor("test_deployment") + assert result is True + + # Should not be able to get the actor anymore + with pytest.raises(ValueError): + get_queue_monitor_actor("test_deployment") - assert ray.get(monitor.get_queue_length.remote()) == 0 - assert stub_flower.close_calls == 1 + def test_kill_queue_monitor_actor_not_found(self, ray_instance): + """Test killing a non-existent actor returns False.""" + result = kill_queue_monitor_actor("nonexistent_deployment") + assert result is False if __name__ == "__main__": diff --git a/python/ray/serve/tests/unit/test_queue_monitor_config.py b/python/ray/serve/tests/unit/test_queue_monitor_config.py new file mode 100644 index 000000000000..680e34eb4af2 --- /dev/null +++ b/python/ray/serve/tests/unit/test_queue_monitor_config.py @@ -0,0 +1,28 @@ +from ray.serve._private.queue_monitor import QueueMonitorConfig + + +class TestQueueMonitorConfig: + """Unit tests for QueueMonitorConfig class.""" + + def test_config_stores_values(self): + """Test config stores broker_url, queue_name, and rabbitmq_http_url.""" + config = QueueMonitorConfig( + broker_url="redis://localhost:6379/0", + queue_name="test_queue", + ) + + assert config.broker_url == "redis://localhost:6379/0" + assert config.queue_name == "test_queue" + assert config.rabbitmq_http_url == "http://guest:guest@localhost:15672/api/" + + def test_config_custom_rabbitmq_http_url(self): + """Test config stores custom rabbitmq_http_url.""" + config = QueueMonitorConfig( + broker_url="amqp://guest:guest@localhost:5672//", + queue_name="my_queue", + rabbitmq_http_url="http://user:pass@localhost:15672/api/", + ) + + assert config.broker_url == "amqp://guest:guest@localhost:5672//" + assert config.queue_name == "my_queue" + assert config.rabbitmq_http_url == "http://user:pass@localhost:15672/api/" From 8140bcc4fc89f5e85b16ee600a5de4e566efc09c Mon Sep 17 00:00:00 2001 From: harshit Date: Tue, 6 Jan 2026 08:00:59 +0000 Subject: [PATCH 09/18] review changes Signed-off-by: harshit --- python/ray/serve/tests/unit/test_queue_monitor_config.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/python/ray/serve/tests/unit/test_queue_monitor_config.py b/python/ray/serve/tests/unit/test_queue_monitor_config.py index 680e34eb4af2..647b422d7e8b 100644 --- a/python/ray/serve/tests/unit/test_queue_monitor_config.py +++ b/python/ray/serve/tests/unit/test_queue_monitor_config.py @@ -1,3 +1,7 @@ +import sys + +import pytest + from ray.serve._private.queue_monitor import QueueMonitorConfig @@ -26,3 +30,7 @@ def test_config_custom_rabbitmq_http_url(self): assert config.broker_url == "amqp://guest:guest@localhost:5672//" assert config.queue_name == "my_queue" assert config.rabbitmq_http_url == "http://user:pass@localhost:15672/api/" + + +if __name__ == "__main__": + sys.exit(pytest.main(["-v", "-s", __file__])) From ab285ce39d1123e991e445f80401a4b813792607 Mon Sep 17 00:00:00 2001 From: harshit Date: Tue, 6 Jan 2026 15:47:44 +0000 Subject: [PATCH 10/18] review changes Signed-off-by: harshit --- python/ray/serve/_private/broker.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/ray/serve/_private/broker.py b/python/ray/serve/_private/broker.py index 177f8dfa4e96..2a23dcf94ec8 100644 --- a/python/ray/serve/_private/broker.py +++ b/python/ray/serve/_private/broker.py @@ -1,3 +1,6 @@ +# Portions of this code are adapted from Flower's broker.py file. +# https://github.com/mher/flower/blob/master/flower/utils/broker.py + import json import logging import numbers From c550a52cdf61291639db6403844eaf4beea911ec Mon Sep 17 00:00:00 2001 From: harshit Date: Tue, 6 Jan 2026 15:48:57 +0000 Subject: [PATCH 11/18] review changes Signed-off-by: harshit --- python/ray/serve/_private/broker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/serve/_private/broker.py b/python/ray/serve/_private/broker.py index 2a23dcf94ec8..9e8c8ddeaa9b 100644 --- a/python/ray/serve/_private/broker.py +++ b/python/ray/serve/_private/broker.py @@ -196,7 +196,7 @@ def _prepare_virtual_host(self, vhost): vhost = int(vhost) except ValueError as exc: raise ValueError( - "Database is int between 0 and limit - 1, not {vhost}" + f"Database is int between 0 and limit - 1, not {vhost}" ) from exc return vhost From 2abc81df5ab69a93927c7c7d1742adc46eb45b1b Mon Sep 17 00:00:00 2001 From: harshit Date: Tue, 6 Jan 2026 16:17:43 +0000 Subject: [PATCH 12/18] add more tests Signed-off-by: harshit --- python/ray/serve/_private/queue_monitor.py | 4 ++ python/ray/serve/tests/test_queue_monitor.py | 33 --------- .../tests/unit/test_queue_monitor_config.py | 68 +++++++++++++++++++ 3 files changed, 72 insertions(+), 33 deletions(-) diff --git a/python/ray/serve/_private/queue_monitor.py b/python/ray/serve/_private/queue_monitor.py index 566c6c039a01..2793d6583dca 100644 --- a/python/ray/serve/_private/queue_monitor.py +++ b/python/ray/serve/_private/queue_monitor.py @@ -76,6 +76,7 @@ async def get_queue_length(self) -> int: try: queues = await self._broker.queues([self._config.queue_name]) + print(f"queues: {queues}") if queues is not None: for q in queues: if q.get("name") == self._config.queue_name: @@ -83,6 +84,8 @@ async def get_queue_length(self) -> int: self._last_queue_length = queue_length return queue_length + print(f"last_queue_length: {self._last_queue_length}") + if self._last_queue_length is not None: return self._last_queue_length else: @@ -92,6 +95,7 @@ async def get_queue_length(self) -> int: return 0 except Exception as e: + print(f"error 123123: {e}") logger.warning( f"Failed to query queue length: {e}. Using last known value: {self._last_queue_length}" ) diff --git a/python/ray/serve/tests/test_queue_monitor.py b/python/ray/serve/tests/test_queue_monitor.py index 5fd398f515b7..13936662259a 100644 --- a/python/ray/serve/tests/test_queue_monitor.py +++ b/python/ray/serve/tests/test_queue_monitor.py @@ -16,16 +16,6 @@ from ray.tests.conftest import external_redis # noqa: F401 -@pytest.fixture -def ray_instance(): # noqa: F811 - """Initialize Ray with external Redis.""" - if ray.is_initialized(): - ray.shutdown() - ray.init(num_cpus=2, include_dashboard=False) - yield - ray.shutdown() - - @pytest.fixture def redis_client(external_redis): # noqa: F811 """Create a Redis client connected to the external Redis.""" @@ -76,29 +66,6 @@ def test_get_queue_length_empty_queue( assert length == 0 - def test_get_queue_length_returns_cached_on_error( - self, ray_instance, redis_client, redis_config - ): - """Test get_queue_length returns cached value on error.""" - # Push messages initially - for i in range(50): - redis_client.lpush("test_queue", f"message_{i}") - - try: - monitor = QueueMonitorActor.remote(redis_config) - - # First successful query - length = ray.get(monitor.get_queue_length.remote()) - assert length == 50 - - # Simulate error by deleting the queue and checking cached value is returned - # Note: The actual error path is harder to trigger with real Redis, - # so we verify the caching behavior by checking the value is consistent - length = ray.get(monitor.get_queue_length.remote()) - assert length == 50 - finally: - redis_client.delete("test_queue") - def test_shutdown_marks_uninitialized( self, ray_instance, redis_client, redis_config ): diff --git a/python/ray/serve/tests/unit/test_queue_monitor_config.py b/python/ray/serve/tests/unit/test_queue_monitor_config.py index 647b422d7e8b..ce7d77a27115 100644 --- a/python/ray/serve/tests/unit/test_queue_monitor_config.py +++ b/python/ray/serve/tests/unit/test_queue_monitor_config.py @@ -1,4 +1,5 @@ import sys +from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -32,5 +33,72 @@ def test_config_custom_rabbitmq_http_url(self): assert config.rabbitmq_http_url == "http://user:pass@localhost:15672/api/" +class TestQueueMonitorCaching: + """Unit tests for QueueMonitor caching behavior using mocks.""" + + @pytest.mark.asyncio + async def test_returns_cached_value_on_broker_error(self): + """Test that get_queue_length returns cached value when broker raises error.""" + from ray.serve._private.queue_monitor import QueueMonitorActor + + config = QueueMonitorConfig( + broker_url="redis://localhost:6379/0", + queue_name="test_queue", + ) + + # Create a mock broker + mock_broker = MagicMock() + + with patch("ray.serve._private.queue_monitor.Broker", return_value=mock_broker): + # Create the actor's underlying class (not as a Ray actor for unit testing) + actor = QueueMonitorActor.__ray_actor_class__(config) + + # First call succeeds and returns 50 + mock_broker.queues = AsyncMock( + return_value=[{"name": "test_queue", "messages": 50}] + ) + length = await actor.get_queue_length() + assert length == 50 + + # Second call raises an error - should return cached value + mock_broker.queues = AsyncMock(side_effect=Exception("Connection failed")) + length = await actor.get_queue_length() + assert length == 50 # Returns cached value + + @pytest.mark.asyncio + async def test_updates_cache_on_successful_query(self): + """Test that cache is updated on successful queries.""" + from ray.serve._private.queue_monitor import QueueMonitorActor + + config = QueueMonitorConfig( + broker_url="redis://localhost:6379/0", + queue_name="test_queue", + ) + + mock_broker = MagicMock() + + with patch("ray.serve._private.queue_monitor.Broker", return_value=mock_broker): + actor = QueueMonitorActor.__ray_actor_class__(config) + + # First call returns 50 + mock_broker.queues = AsyncMock( + return_value=[{"name": "test_queue", "messages": 50}] + ) + length = await actor.get_queue_length() + assert length == 50 + + # Second call returns 100 - cache should update + mock_broker.queues = AsyncMock( + return_value=[{"name": "test_queue", "messages": 100}] + ) + length = await actor.get_queue_length() + assert length == 100 + + # Third call fails - should return latest cached value (100) + mock_broker.queues = AsyncMock(side_effect=Exception("Connection failed")) + length = await actor.get_queue_length() + assert length == 100 + + if __name__ == "__main__": sys.exit(pytest.main(["-v", "-s", __file__])) From 1f89f99932b3b65b4b24dd4d312edcea42cf036e Mon Sep 17 00:00:00 2001 From: harshit Date: Wed, 7 Jan 2026 14:44:00 +0000 Subject: [PATCH 13/18] reivew changes Signed-off-by: harshit --- python/ray/serve/_private/broker.py | 10 +- python/ray/serve/_private/queue_monitor.py | 144 +++++++----------- python/ray/serve/tests/test_queue_monitor.py | 100 +++++------- .../tests/unit/test_queue_monitor_config.py | 104 ------------- 4 files changed, 102 insertions(+), 256 deletions(-) delete mode 100644 python/ray/serve/tests/unit/test_queue_monitor_config.py diff --git a/python/ray/serve/_private/broker.py b/python/ray/serve/_private/broker.py index 9e8c8ddeaa9b..12ddfbf5f90b 100644 --- a/python/ray/serve/_private/broker.py +++ b/python/ray/serve/_private/broker.py @@ -1,5 +1,7 @@ -# Portions of this code are adapted from Flower's broker.py file. -# https://github.com/mher/flower/blob/master/flower/utils/broker.py +# This module provides broker clients for querying queue lengths from message brokers. +# Adapted from Flower's broker.py (https://github.com/mher/flower/blob/master/flower/utils/broker.py) +# with the following modification: +# - Added close() method to BrokerBase and RedisBase for resource cleanup import json import logging @@ -9,13 +11,15 @@ from tornado import httpclient, ioloop +from ray.serve._private.constants import SERVE_LOGGER_NAME + try: import redis except ImportError: redis = None -logger = logging.getLogger(__name__) +logger = logging.getLogger(SERVE_LOGGER_NAME) class BrokerBase: diff --git a/python/ray/serve/_private/queue_monitor.py b/python/ray/serve/_private/queue_monitor.py index 2793d6583dca..619cbeee7e10 100644 --- a/python/ray/serve/_private/queue_monitor.py +++ b/python/ray/serve/_private/queue_monitor.py @@ -11,23 +11,8 @@ QUEUE_MONITOR_ACTOR_PREFIX = "QUEUE_MONITOR::" -class QueueMonitorConfig: - """Configuration for the QueueMonitor deployment.""" - - def __init__( - self, - broker_url: str, - queue_name: str, - rabbitmq_http_url: str = "http://guest:guest@localhost:15672/api/", - ): - self.broker_url = broker_url - self.queue_name = queue_name - self.rabbitmq_http_url = rabbitmq_http_url - - @ray.remote(num_cpus=0) class QueueMonitorActor: - """ Actor that monitors queue length by directly querying the broker. @@ -38,27 +23,34 @@ class QueueMonitorActor: - RabbitMQ: Uses HTTP management API """ - def __init__(self, config: QueueMonitorConfig): - self._config = config - self._last_queue_length: int = 0 - self._is_initialized: bool = False + def __init__( + self, + broker_url: str, + queue_name: str, + rabbitmq_http_url: str = "http://guest:guest@localhost:15672/api/", + ): + self._broker_url = broker_url + self._queue_name = queue_name + self._rabbitmq_http_url = rabbitmq_http_url - self._broker = Broker( - self._config.broker_url, http_api=self._config.rabbitmq_http_url - ) - self._is_initialized = True + self._broker = Broker(self._broker_url, http_api=self._rabbitmq_http_url) + + def __ray_shutdown__(self): + if self._broker is not None: + self._broker.close() + self._broker = None def get_config(self) -> Dict[str, Any]: """ Get the QueueMonitor configuration as a serializable dict. Returns: - Dict with 'broker_url' and 'queue_name' keys + Dict with 'broker_url', 'queue_name', and 'rabbitmq_http_url' keys """ return { - "broker_url": self._config.broker_url, - "queue_name": self._config.queue_name, - "rabbitmq_http_url": self._config.rabbitmq_http_url, + "broker_url": self._broker_url, + "queue_name": self._queue_name, + "rabbitmq_http_url": self._rabbitmq_http_url, } async def get_queue_length(self) -> int: @@ -67,50 +59,25 @@ async def get_queue_length(self) -> int: Returns: Number of pending tasks in the queue. + + Raises: + ValueError: If queue is not found in broker response. """ - if not self._is_initialized: - logger.warning( - f"QueueMonitor not initialized for queue '{self._config.queue_name}', returning 0" - ) - return 0 - - try: - queues = await self._broker.queues([self._config.queue_name]) - print(f"queues: {queues}") - if queues is not None: - for q in queues: - if q.get("name") == self._config.queue_name: - queue_length = q.get("messages") - self._last_queue_length = queue_length - return queue_length - - print(f"last_queue_length: {self._last_queue_length}") - - if self._last_queue_length is not None: - return self._last_queue_length - else: - logger.warning( - f"No queue length found for queue '{self._config.queue_name}', returning 0" - ) - return 0 - - except Exception as e: - print(f"error 123123: {e}") - logger.warning( - f"Failed to query queue length: {e}. Using last known value: {self._last_queue_length}" - ) - return self._last_queue_length - - def shutdown(self) -> None: - if self._broker is not None: - self._broker.close() - self._broker = None - self._is_initialized = False + queues = await self._broker.queues([self._queue_name]) + if queues is not None: + for q in queues: + if q.get("name") == self._queue_name: + queue_length = q.get("messages") + return queue_length + + raise ValueError(f"Queue '{self._queue_name}' not found in broker response") def create_queue_monitor_actor( deployment_name: str, - config: QueueMonitorConfig, + broker_url: str, + queue_name: str, + rabbitmq_http_url: str = "http://guest:guest@localhost:15672/api/", namespace: str = "serve", ) -> ray.actor.ActorHandle: """ @@ -118,7 +85,9 @@ def create_queue_monitor_actor( Args: deployment_name: Name of the deployment - config: QueueMonitorConfig with broker URL and queue name + broker_url: URL of the message broker + queue_name: Name of the queue to monitor + rabbitmq_http_url: HTTP API URL for RabbitMQ management (only for RabbitMQ) namespace: Ray namespace for the actor Returns: @@ -128,22 +97,19 @@ def create_queue_monitor_actor( # Check if actor already exists try: - existing = ray.get_actor(full_actor_name, namespace=namespace) + existing = get_queue_monitor_actor(deployment_name, namespace=namespace) logger.info(f"QueueMonitor actor '{full_actor_name}' already exists, reusing") - return existing except ValueError: - pass # Actor doesn't exist, create it - - actor = QueueMonitorActor.options( - name=full_actor_name, - namespace=namespace, - ).remote(config) + actor = QueueMonitorActor.options( + name=full_actor_name, + namespace=namespace, + ).remote(broker_url, queue_name, rabbitmq_http_url) - logger.info( - f"Created QueueMonitor actor '{full_actor_name}' in namespace '{namespace}'" - ) - return actor + logger.info( + f"Created QueueMonitor actor '{full_actor_name}' in namespace '{namespace}'" + ) + return actor def get_queue_monitor_actor( @@ -170,7 +136,7 @@ def get_queue_monitor_actor( def kill_queue_monitor_actor( deployment_name: str, namespace: str = "serve", -) -> bool: +) -> None: """ Delete a QueueMonitor actor by name. @@ -178,15 +144,17 @@ def kill_queue_monitor_actor( deployment_name: Name of the deployment namespace: Ray namespace - Returns: - True if actor was deleted, False if it didn't exist + Raises: + ValueError: If actor doesn't exist """ full_actor_name = f"{QUEUE_MONITOR_ACTOR_PREFIX}{deployment_name}" + actor = get_queue_monitor_actor(deployment_name, namespace=namespace) + if actor is None: + logger.info(f"QueueMonitor actor '{full_actor_name}' does not exist") + return + try: - actor = ray.get_actor(full_actor_name, namespace=namespace) - ray.kill(actor) + ray.kill(actor, no_restart=True) logger.info(f"Deleted QueueMonitor actor '{full_actor_name}'") - return True - except ValueError: - # Actor doesn't exist - return False + except Exception as e: + logger.error(f"Failed to delete QueueMonitor actor '{full_actor_name}': {e}") diff --git a/python/ray/serve/tests/test_queue_monitor.py b/python/ray/serve/tests/test_queue_monitor.py index 13936662259a..6f5d227dc130 100644 --- a/python/ray/serve/tests/test_queue_monitor.py +++ b/python/ray/serve/tests/test_queue_monitor.py @@ -8,7 +8,6 @@ import ray from ray.serve._private.queue_monitor import ( QueueMonitorActor, - QueueMonitorConfig, create_queue_monitor_actor, get_queue_monitor_actor, kill_queue_monitor_actor, @@ -23,87 +22,59 @@ def redis_client(external_redis): # noqa: F811 host, port = redis_address.split(":") client = redis.Redis(host=host, port=int(port), db=0) yield client + # Cleanup: delete test queue after each test + client.delete("test_queue") client.close() @pytest.fixture -def redis_config(external_redis): # noqa: F811 - """Create a QueueMonitorConfig with the external Redis URL.""" +def redis_broker_url(external_redis): # noqa: F811 + """Get the Redis broker URL for the external Redis.""" redis_address = os.environ.get("RAY_REDIS_ADDRESS") - return QueueMonitorConfig( - broker_url=f"redis://{redis_address}/0", - queue_name="test_queue", - ) + return f"redis://{redis_address}/0" class TestQueueMonitorActor: """Integration tests for QueueMonitorActor with real Redis.""" - def test_get_queue_length(self, ray_instance, redis_client, redis_config): + def test_get_queue_length(self, ray_instance, redis_client, redis_broker_url): """Test queue length returns number of messages from broker.""" # Push some messages to the queue for i in range(30): redis_client.lpush("test_queue", f"message_{i}") - try: - monitor = QueueMonitorActor.remote(redis_config) - length = ray.get(monitor.get_queue_length.remote()) + monitor = QueueMonitorActor.remote(redis_broker_url, "test_queue") + length = ray.get(monitor.get_queue_length.remote()) - assert length == 30 - finally: - # Clean up - redis_client.delete("test_queue") + assert length == 30 def test_get_queue_length_empty_queue( - self, ray_instance, redis_client, redis_config + self, ray_instance, redis_client, redis_broker_url ): """Test queue length returns 0 for empty queue.""" - # Ensure queue is empty - redis_client.delete("test_queue") - - monitor = QueueMonitorActor.remote(redis_config) + monitor = QueueMonitorActor.remote(redis_broker_url, "test_queue") length = ray.get(monitor.get_queue_length.remote()) assert length == 0 - def test_shutdown_marks_uninitialized( - self, ray_instance, redis_client, redis_config - ): - """Test shutdown cleans up resources and returns 0 for queue length.""" - redis_client.lpush("test_queue", "message") - - try: - monitor = QueueMonitorActor.remote(redis_config) - - # Verify monitor works before shutdown - length = ray.get(monitor.get_queue_length.remote()) - assert length == 1 - - # Shutdown the monitor - ray.get(monitor.shutdown.remote()) - - # After shutdown, should return 0 - length = ray.get(monitor.get_queue_length.remote()) - assert length == 0 - finally: - redis_client.delete("test_queue") - - def test_get_config(self, ray_instance, redis_config): + def test_get_config(self, ray_instance, redis_broker_url): """Test get_config returns the configuration as a dict.""" - monitor = QueueMonitorActor.remote(redis_config) + monitor = QueueMonitorActor.remote(redis_broker_url, "test_queue") config = ray.get(monitor.get_config.remote()) - assert config["broker_url"] == redis_config.broker_url - assert config["queue_name"] == redis_config.queue_name - assert config["rabbitmq_http_url"] == redis_config.rabbitmq_http_url + assert config["broker_url"] == redis_broker_url + assert config["queue_name"] == "test_queue" + assert config["rabbitmq_http_url"] == "http://guest:guest@localhost:15672/api/" class TestQueueMonitorHelpers: """Integration tests for queue monitor helper functions.""" - def test_create_queue_monitor_actor(self, ray_instance, redis_config): + def test_create_queue_monitor_actor(self, ray_instance, redis_broker_url): """Test creating a named queue monitor actor.""" - actor = create_queue_monitor_actor("test_deployment", redis_config) + actor = create_queue_monitor_actor( + "test_deployment", redis_broker_url, "test_queue" + ) # Verify the actor works config = ray.get(actor.get_config.remote()) @@ -113,11 +84,15 @@ def test_create_queue_monitor_actor(self, ray_instance, redis_config): kill_queue_monitor_actor("test_deployment") def test_create_queue_monitor_actor_reuses_existing( - self, ray_instance, redis_config + self, ray_instance, redis_broker_url ): """Test that creating an actor with the same name reuses the existing one.""" - actor1 = create_queue_monitor_actor("test_deployment", redis_config) - actor2 = create_queue_monitor_actor("test_deployment", redis_config) + actor1 = create_queue_monitor_actor( + "test_deployment", redis_broker_url, "test_queue" + ) + actor2 = create_queue_monitor_actor( + "test_deployment", redis_broker_url, "test_queue" + ) # Both should reference the same actor assert actor1 == actor2 @@ -125,9 +100,11 @@ def test_create_queue_monitor_actor_reuses_existing( # Clean up kill_queue_monitor_actor("test_deployment") - def test_get_queue_monitor_actor(self, ray_instance, redis_config): + def test_get_queue_monitor_actor(self, ray_instance, redis_broker_url): """Test retrieving an existing queue monitor actor.""" - actor = create_queue_monitor_actor("test_deployment", redis_config) + actor = create_queue_monitor_actor( + "test_deployment", redis_broker_url, "test_queue" + ) # Ensure actor is ready before looking it up by name ray.get(actor.get_config.remote()) @@ -143,23 +120,24 @@ def test_get_queue_monitor_actor_not_found(self, ray_instance): with pytest.raises(ValueError): get_queue_monitor_actor("nonexistent_deployment") - def test_kill_queue_monitor_actor(self, ray_instance, redis_config): + def test_kill_queue_monitor_actor(self, ray_instance, redis_broker_url): """Test killing a queue monitor actor.""" - actor = create_queue_monitor_actor("test_deployment", redis_config) + actor = create_queue_monitor_actor( + "test_deployment", redis_broker_url, "test_queue" + ) # Ensure actor is ready before trying to kill it ray.get(actor.get_config.remote()) - result = kill_queue_monitor_actor("test_deployment") - assert result is True + kill_queue_monitor_actor("test_deployment") # Should not be able to get the actor anymore with pytest.raises(ValueError): get_queue_monitor_actor("test_deployment") def test_kill_queue_monitor_actor_not_found(self, ray_instance): - """Test killing a non-existent actor returns False.""" - result = kill_queue_monitor_actor("nonexistent_deployment") - assert result is False + """Test killing a non-existent actor raises ValueError.""" + with pytest.raises(ValueError): + kill_queue_monitor_actor("nonexistent_deployment") if __name__ == "__main__": diff --git a/python/ray/serve/tests/unit/test_queue_monitor_config.py b/python/ray/serve/tests/unit/test_queue_monitor_config.py deleted file mode 100644 index ce7d77a27115..000000000000 --- a/python/ray/serve/tests/unit/test_queue_monitor_config.py +++ /dev/null @@ -1,104 +0,0 @@ -import sys -from unittest.mock import AsyncMock, MagicMock, patch - -import pytest - -from ray.serve._private.queue_monitor import QueueMonitorConfig - - -class TestQueueMonitorConfig: - """Unit tests for QueueMonitorConfig class.""" - - def test_config_stores_values(self): - """Test config stores broker_url, queue_name, and rabbitmq_http_url.""" - config = QueueMonitorConfig( - broker_url="redis://localhost:6379/0", - queue_name="test_queue", - ) - - assert config.broker_url == "redis://localhost:6379/0" - assert config.queue_name == "test_queue" - assert config.rabbitmq_http_url == "http://guest:guest@localhost:15672/api/" - - def test_config_custom_rabbitmq_http_url(self): - """Test config stores custom rabbitmq_http_url.""" - config = QueueMonitorConfig( - broker_url="amqp://guest:guest@localhost:5672//", - queue_name="my_queue", - rabbitmq_http_url="http://user:pass@localhost:15672/api/", - ) - - assert config.broker_url == "amqp://guest:guest@localhost:5672//" - assert config.queue_name == "my_queue" - assert config.rabbitmq_http_url == "http://user:pass@localhost:15672/api/" - - -class TestQueueMonitorCaching: - """Unit tests for QueueMonitor caching behavior using mocks.""" - - @pytest.mark.asyncio - async def test_returns_cached_value_on_broker_error(self): - """Test that get_queue_length returns cached value when broker raises error.""" - from ray.serve._private.queue_monitor import QueueMonitorActor - - config = QueueMonitorConfig( - broker_url="redis://localhost:6379/0", - queue_name="test_queue", - ) - - # Create a mock broker - mock_broker = MagicMock() - - with patch("ray.serve._private.queue_monitor.Broker", return_value=mock_broker): - # Create the actor's underlying class (not as a Ray actor for unit testing) - actor = QueueMonitorActor.__ray_actor_class__(config) - - # First call succeeds and returns 50 - mock_broker.queues = AsyncMock( - return_value=[{"name": "test_queue", "messages": 50}] - ) - length = await actor.get_queue_length() - assert length == 50 - - # Second call raises an error - should return cached value - mock_broker.queues = AsyncMock(side_effect=Exception("Connection failed")) - length = await actor.get_queue_length() - assert length == 50 # Returns cached value - - @pytest.mark.asyncio - async def test_updates_cache_on_successful_query(self): - """Test that cache is updated on successful queries.""" - from ray.serve._private.queue_monitor import QueueMonitorActor - - config = QueueMonitorConfig( - broker_url="redis://localhost:6379/0", - queue_name="test_queue", - ) - - mock_broker = MagicMock() - - with patch("ray.serve._private.queue_monitor.Broker", return_value=mock_broker): - actor = QueueMonitorActor.__ray_actor_class__(config) - - # First call returns 50 - mock_broker.queues = AsyncMock( - return_value=[{"name": "test_queue", "messages": 50}] - ) - length = await actor.get_queue_length() - assert length == 50 - - # Second call returns 100 - cache should update - mock_broker.queues = AsyncMock( - return_value=[{"name": "test_queue", "messages": 100}] - ) - length = await actor.get_queue_length() - assert length == 100 - - # Third call fails - should return latest cached value (100) - mock_broker.queues = AsyncMock(side_effect=Exception("Connection failed")) - length = await actor.get_queue_length() - assert length == 100 - - -if __name__ == "__main__": - sys.exit(pytest.main(["-v", "-s", __file__])) From a5a08281a3c9d22f441966607d0614c936f50737 Mon Sep 17 00:00:00 2001 From: harshit Date: Thu, 8 Jan 2026 04:39:49 +0000 Subject: [PATCH 14/18] review changes Signed-off-by: harshit --- python/ray/serve/_private/queue_monitor.py | 6 +- python/ray/serve/tests/test_queue_monitor.py | 88 ++------------------ 2 files changed, 14 insertions(+), 80 deletions(-) diff --git a/python/ray/serve/_private/queue_monitor.py b/python/ray/serve/_private/queue_monitor.py index 619cbeee7e10..ec8767b65290 100644 --- a/python/ray/serve/_private/queue_monitor.py +++ b/python/ray/serve/_private/queue_monitor.py @@ -2,6 +2,7 @@ from typing import Any, Dict import ray +from ray._common.constants import HEAD_NODE_RESOURCE_NAME from ray.serve._private.broker import Broker from ray.serve._private.constants import SERVE_LOGGER_NAME @@ -104,6 +105,9 @@ def create_queue_monitor_actor( actor = QueueMonitorActor.options( name=full_actor_name, namespace=namespace, + max_restarts=-1, + max_task_retries=-1, + resources={HEAD_NODE_RESOURCE_NAME: 0.001}, ).remote(broker_url, queue_name, rabbitmq_http_url) logger.info( @@ -154,7 +158,7 @@ def kill_queue_monitor_actor( return try: - ray.kill(actor, no_restart=True) + del actor logger.info(f"Deleted QueueMonitor actor '{full_actor_name}'") except Exception as e: logger.error(f"Failed to delete QueueMonitor actor '{full_actor_name}': {e}") diff --git a/python/ray/serve/tests/test_queue_monitor.py b/python/ray/serve/tests/test_queue_monitor.py index 6f5d227dc130..bf15092b6c92 100644 --- a/python/ray/serve/tests/test_queue_monitor.py +++ b/python/ray/serve/tests/test_queue_monitor.py @@ -7,10 +7,7 @@ import ray from ray.serve._private.queue_monitor import ( - QueueMonitorActor, create_queue_monitor_actor, - get_queue_monitor_actor, - kill_queue_monitor_actor, ) from ray.tests.conftest import external_redis # noqa: F401 @@ -43,7 +40,9 @@ def test_get_queue_length(self, ray_instance, redis_client, redis_broker_url): for i in range(30): redis_client.lpush("test_queue", f"message_{i}") - monitor = QueueMonitorActor.remote(redis_broker_url, "test_queue") + monitor = create_queue_monitor_actor( + "test_deployment", redis_broker_url, "test_queue" + ) length = ray.get(monitor.get_queue_length.remote()) assert length == 30 @@ -52,14 +51,18 @@ def test_get_queue_length_empty_queue( self, ray_instance, redis_client, redis_broker_url ): """Test queue length returns 0 for empty queue.""" - monitor = QueueMonitorActor.remote(redis_broker_url, "test_queue") + monitor = create_queue_monitor_actor( + "test_deployment", redis_broker_url, "test_queue" + ) length = ray.get(monitor.get_queue_length.remote()) assert length == 0 def test_get_config(self, ray_instance, redis_broker_url): """Test get_config returns the configuration as a dict.""" - monitor = QueueMonitorActor.remote(redis_broker_url, "test_queue") + monitor = create_queue_monitor_actor( + "test_deployment", redis_broker_url, "test_queue" + ) config = ray.get(monitor.get_config.remote()) assert config["broker_url"] == redis_broker_url @@ -67,78 +70,5 @@ def test_get_config(self, ray_instance, redis_broker_url): assert config["rabbitmq_http_url"] == "http://guest:guest@localhost:15672/api/" -class TestQueueMonitorHelpers: - """Integration tests for queue monitor helper functions.""" - - def test_create_queue_monitor_actor(self, ray_instance, redis_broker_url): - """Test creating a named queue monitor actor.""" - actor = create_queue_monitor_actor( - "test_deployment", redis_broker_url, "test_queue" - ) - - # Verify the actor works - config = ray.get(actor.get_config.remote()) - assert config["queue_name"] == "test_queue" - - # Clean up - kill_queue_monitor_actor("test_deployment") - - def test_create_queue_monitor_actor_reuses_existing( - self, ray_instance, redis_broker_url - ): - """Test that creating an actor with the same name reuses the existing one.""" - actor1 = create_queue_monitor_actor( - "test_deployment", redis_broker_url, "test_queue" - ) - actor2 = create_queue_monitor_actor( - "test_deployment", redis_broker_url, "test_queue" - ) - - # Both should reference the same actor - assert actor1 == actor2 - - # Clean up - kill_queue_monitor_actor("test_deployment") - - def test_get_queue_monitor_actor(self, ray_instance, redis_broker_url): - """Test retrieving an existing queue monitor actor.""" - actor = create_queue_monitor_actor( - "test_deployment", redis_broker_url, "test_queue" - ) - # Ensure actor is ready before looking it up by name - ray.get(actor.get_config.remote()) - - retrieved_actor = get_queue_monitor_actor("test_deployment") - config = ray.get(retrieved_actor.get_config.remote()) - assert config["queue_name"] == "test_queue" - - # Clean up - kill_queue_monitor_actor("test_deployment") - - def test_get_queue_monitor_actor_not_found(self, ray_instance): - """Test retrieving a non-existent actor raises ValueError.""" - with pytest.raises(ValueError): - get_queue_monitor_actor("nonexistent_deployment") - - def test_kill_queue_monitor_actor(self, ray_instance, redis_broker_url): - """Test killing a queue monitor actor.""" - actor = create_queue_monitor_actor( - "test_deployment", redis_broker_url, "test_queue" - ) - # Ensure actor is ready before trying to kill it - ray.get(actor.get_config.remote()) - - kill_queue_monitor_actor("test_deployment") - - # Should not be able to get the actor anymore - with pytest.raises(ValueError): - get_queue_monitor_actor("test_deployment") - - def test_kill_queue_monitor_actor_not_found(self, ray_instance): - """Test killing a non-existent actor raises ValueError.""" - with pytest.raises(ValueError): - kill_queue_monitor_actor("nonexistent_deployment") - - if __name__ == "__main__": sys.exit(pytest.main(["-v", "-s", __file__])) From dd5175b6cf84028dd98c9ef24550df54fd60fe66 Mon Sep 17 00:00:00 2001 From: harshit Date: Thu, 8 Jan 2026 04:50:29 +0000 Subject: [PATCH 15/18] review changes Signed-off-by: harshit --- python/ray/serve/_private/queue_monitor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/serve/_private/queue_monitor.py b/python/ray/serve/_private/queue_monitor.py index ec8767b65290..55bb1caaa8b0 100644 --- a/python/ray/serve/_private/queue_monitor.py +++ b/python/ray/serve/_private/queue_monitor.py @@ -158,7 +158,7 @@ def kill_queue_monitor_actor( return try: - del actor + actor.__ray_terminate__.remote() logger.info(f"Deleted QueueMonitor actor '{full_actor_name}'") except Exception as e: logger.error(f"Failed to delete QueueMonitor actor '{full_actor_name}': {e}") From e026933a0079fbe6c7dddc1bff6589144dc7b1bf Mon Sep 17 00:00:00 2001 From: harshit Date: Thu, 8 Jan 2026 07:24:19 +0000 Subject: [PATCH 16/18] review changes Signed-off-by: harshit --- python/ray/serve/_private/queue_monitor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/serve/_private/queue_monitor.py b/python/ray/serve/_private/queue_monitor.py index 55bb1caaa8b0..4655ff4822d5 100644 --- a/python/ray/serve/_private/queue_monitor.py +++ b/python/ray/serve/_private/queue_monitor.py @@ -158,7 +158,7 @@ def kill_queue_monitor_actor( return try: - actor.__ray_terminate__.remote() + ray.kill(actor, no_restart=True) logger.info(f"Deleted QueueMonitor actor '{full_actor_name}'") except Exception as e: logger.error(f"Failed to delete QueueMonitor actor '{full_actor_name}': {e}") From 844c96661122313dd4ca3061213c0cc385f1006d Mon Sep 17 00:00:00 2001 From: harshit Date: Thu, 8 Jan 2026 08:06:07 +0000 Subject: [PATCH 17/18] review changes Signed-off-by: harshit --- python/ray/serve/_private/queue_monitor.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/python/ray/serve/_private/queue_monitor.py b/python/ray/serve/_private/queue_monitor.py index 4655ff4822d5..f8be01b8d638 100644 --- a/python/ray/serve/_private/queue_monitor.py +++ b/python/ray/serve/_private/queue_monitor.py @@ -62,13 +62,18 @@ async def get_queue_length(self) -> int: Number of pending tasks in the queue. Raises: - ValueError: If queue is not found in broker response. + ValueError: If queue is not found in broker response or + if queue data is missing the 'messages' field. """ queues = await self._broker.queues([self._queue_name]) if queues is not None: for q in queues: if q.get("name") == self._queue_name: queue_length = q.get("messages") + if queue_length is None: + raise ValueError( + f"Queue '{self._queue_name}' is missing 'messages' field" + ) return queue_length raise ValueError(f"Queue '{self._queue_name}' not found in broker response") @@ -152,13 +157,10 @@ def kill_queue_monitor_actor( ValueError: If actor doesn't exist """ full_actor_name = f"{QUEUE_MONITOR_ACTOR_PREFIX}{deployment_name}" - actor = get_queue_monitor_actor(deployment_name, namespace=namespace) - if actor is None: - logger.info(f"QueueMonitor actor '{full_actor_name}' does not exist") - return - try: - ray.kill(actor, no_restart=True) - logger.info(f"Deleted QueueMonitor actor '{full_actor_name}'") - except Exception as e: - logger.error(f"Failed to delete QueueMonitor actor '{full_actor_name}': {e}") + actor = get_queue_monitor_actor(deployment_name, namespace=namespace) + except ValueError: + raise ValueError(f"QueueMonitor actor '{full_actor_name}' does not exist") + + ray.kill(actor, no_restart=True) + logger.info(f"Deleted QueueMonitor actor '{full_actor_name}'") From ea3cb688e6bdd3ff1b72a13ea4b105e79329b59a Mon Sep 17 00:00:00 2001 From: harshit Date: Thu, 8 Jan 2026 16:28:51 +0000 Subject: [PATCH 18/18] review changes Signed-off-by: harshit --- python/ray/serve/_private/queue_monitor.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/python/ray/serve/_private/queue_monitor.py b/python/ray/serve/_private/queue_monitor.py index f8be01b8d638..2b0c3f5294ae 100644 --- a/python/ray/serve/_private/queue_monitor.py +++ b/python/ray/serve/_private/queue_monitor.py @@ -157,10 +157,7 @@ def kill_queue_monitor_actor( ValueError: If actor doesn't exist """ full_actor_name = f"{QUEUE_MONITOR_ACTOR_PREFIX}{deployment_name}" - try: - actor = get_queue_monitor_actor(deployment_name, namespace=namespace) - except ValueError: - raise ValueError(f"QueueMonitor actor '{full_actor_name}' does not exist") + actor = get_queue_monitor_actor(deployment_name, namespace=namespace) ray.kill(actor, no_restart=True) logger.info(f"Deleted QueueMonitor actor '{full_actor_name}'")