Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions python/ray/serve/_private/application_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from ray.serve._private.config import DeploymentConfig
from ray.serve._private.constants import (
DEFAULT_AUTOSCALING_POLICY_NAME,
DEFAULT_QUEUE_BASED_AUTOSCALING_POLICY,
DEFAULT_REQUEST_ROUTER_PATH,
RAY_SERVE_ENABLE_TASK_EVENTS,
SERVE_LOGGER_NAME,
Expand All @@ -39,6 +40,10 @@
from ray.serve._private.deployment_state import DeploymentStateManager
from ray.serve._private.endpoint_state import EndpointState
from ray.serve._private.logging_utils import configure_component_logger
from ray.serve._private.queue_monitor import (
QueueMonitorConfig,
create_queue_monitor_actor,
)
from ray.serve._private.storage.kv_store import KVStoreBase
from ray.serve._private.usage import ServeUsageTag
from ray.serve._private.utils import (
Expand Down Expand Up @@ -73,6 +78,73 @@
CHECKPOINT_KEY = "serve-application-state-checkpoint"


def _is_task_consumer_deployment(deployment_info: DeploymentInfo) -> bool:
"""Check if a deployment is a TaskConsumer."""
try:
deployment_def = deployment_info.replica_config.deployment_def
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this access is unsafe because it deserializes user code in controller, controller does not have the user's runtime env to execute this code.

For example if user code depends on pytorch and torch is mentioned as a pip dependency in deployment's runtime_env, then this call here will fail in cluster mode.

if deployment_def is None:
return False
return getattr(deployment_def, "_is_task_consumer", False)
except Exception as e:
logger.debug(f"Error checking if deployment is TaskConsumer: {e}")
return False


def _get_queue_monitor_config(deployment_info: DeploymentInfo) -> Optional[QueueMonitorConfig]:
"""Extract QueueMonitorConfig from a TaskConsumer deployment."""
try:
deployment_def = deployment_info.replica_config.deployment_def
if hasattr(deployment_def, "get_queue_monitor_config"):
return deployment_def.get_queue_monitor_config()
except Exception as e:
logger.warning(f"Failed to get queue monitor config: {e}")
return None


def _configure_queue_based_autoscaling_for_task_consumers(deployment_infos: Dict[str, DeploymentInfo]) -> None:
"""
Configure queue-based autoscaling for TaskConsumers.

For TaskConsumer deployments with autoscaling enabled and no custom policy,
this function switches the autoscaling policy to queue-based autoscaling.

Args:
deployment_infos: Deployment infos dict
"""
for deployment_name, deployment_info in deployment_infos.items():
is_task_consumer = _is_task_consumer_deployment(deployment_info)
has_autoscaling = deployment_info.deployment_config.autoscaling_config is not None

# Set queue-based autoscaling policy on TaskConsumer only if user hasn't set a custom policy. This respects user's explicit choice.
if is_task_consumer and has_autoscaling:
logger.info(f"Deployment '{deployment_name}' is a TaskConsumer with autoscaling enabled")
is_default_policy = deployment_info.deployment_config.autoscaling_config.policy.is_default_policy_function()

if is_default_policy:
queue_monitor_config = _get_queue_monitor_config(deployment_info)
if queue_monitor_config is not None:
# Create QueueMonitor as a Ray actor (not Serve deployment)
# This avoids deadlock when autoscaling policy queries it from controller
try:
create_queue_monitor_actor(
deployment_name=deployment_name,
config=queue_monitor_config,
)
except Exception as e:
logger.error(f"Failed to create QueueMonitor actor for '{deployment_name}': {e}")
continue

# Switch to queue-based autoscaling policy
deployment_info.deployment_config.autoscaling_config.policy = (
AutoscalingPolicy(
policy_function=DEFAULT_QUEUE_BASED_AUTOSCALING_POLICY
)
)
logger.info(f"Switched TaskConsumer '{deployment_name}' to queue-based autoscaling policy")

return deployment_infos


class BuildAppStatus(Enum):
"""Status of the build application task."""

Expand Down Expand Up @@ -1185,6 +1257,10 @@ def deploy_apps(self, name_to_deployment_args: Dict[str, List[Dict]]) -> None:
)
for params in deployment_args
}

# Configure queue-based autoscaling for TaskConsumers
_configure_queue_based_autoscaling_for_task_consumers(deployment_infos)
Comment on lines +1310 to +1311
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this only takes effect in imperative mode. If user try to deploy through config then this code path is not executed


self._application_states[name].deploy_app(deployment_infos)

def deploy_app(self, name: str, deployment_args: List[Dict]) -> None:
Expand Down
5 changes: 5 additions & 0 deletions python/ray/serve/_private/autoscaling_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
aggregate_timeseries,
merge_instantaneous_total,
)
from ray.serve._private.queue_monitor import delete_queue_monitor_actor
from ray.serve._private.usage import ServeUsageTag
from ray.serve._private.utils import get_capacity_adjusted_num_replicas
from ray.serve.config import AutoscalingContext, AutoscalingPolicy
Expand Down Expand Up @@ -892,6 +893,10 @@ def deregister_deployment(self, deployment_id: DeploymentID):
)
app_state.deregister_deployment(deployment_id)

# Clean up QueueMonitor actor if it exists for this deployment
# This is needed for TaskConsumer deployments with queue-based autoscaling
delete_queue_monitor_actor(deployment_id.name)

def register_application(
self,
app_name: ApplicationName,
Expand Down
5 changes: 5 additions & 0 deletions python/ray/serve/_private/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,11 @@
"ray.serve.autoscaling_policy:default_autoscaling_policy"
)

# The default queue-based autoscaling policy to use for TaskConsumers if none is specified.
DEFAULT_QUEUE_BASED_AUTOSCALING_POLICY = (
"ray.serve.autoscaling_policy:default_queue_based_autoscaling_policy"
)

# Feature flag to enable collecting all queued and ongoing request
# metrics at handles instead of replicas. ON by default.
RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE = get_env_bool(
Expand Down
256 changes: 256 additions & 0 deletions python/ray/serve/_private/queue_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
import logging
from typing import Any, Dict

import pika
import redis

import ray
from ray.serve._private.constants import SERVE_LOGGER_NAME

logger = logging.getLogger(SERVE_LOGGER_NAME)

# Actor name prefix for QueueMonitor actors
QUEUE_MONITOR_ACTOR_PREFIX = "QUEUE_MONITOR::"


class QueueMonitorConfig:
"""Configuration for the QueueMonitor deployment."""
def __init__(self, broker_url: str, queue_name: str):
self.broker_url = broker_url
self.queue_name = queue_name

@property
def broker_type(self) -> str:
url_lower = self.broker_url.lower()
if url_lower.startswith("redis"):
return "redis"
elif url_lower.startswith("amqp") or url_lower.startswith("pyamqp"):
return "rabbitmq"
elif "sqs" in url_lower:
return "sqs"
else:
return "unknown"


class QueueMonitor:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did we not go with the flower here? Doesn't that abstract away all this logic here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it does abstract all this logic but in the case of flower, we will have to make a remote HTTP call, then the flower will intercept it, read it, and then make the call to broker, all this combined will be more time-consuming than directly calling the broker. Apart from that, we will have to port-management for running N instances of flower inside serve controller. It is doable, but given these details, writing plain vanilla code to get queue length seems more efficient and simple.

Screenshot 2025-12-15 at 13 03 02

"""
Actor that monitors queue length by directly querying the broker.
Uses native broker clients:
- Redis: Uses redis-py library with LLEN command
- RabbitMQ: Uses pika library with passive queue declaration
"""
def __init__(self, config: QueueMonitorConfig):
self._config = config
self._client: Any = None
self._last_queue_length: int = 0
self._is_initialized: bool = False

def initialize(self) -> None:
"""
Initialize connection to the broker.
Creates the appropriate client based on broker type and tests the connection.
"""
if self._is_initialized:
return

broker_type = self._config.broker_type
try:
if broker_type == "redis":
self._init_redis()
elif broker_type == "rabbitmq":
self._init_rabbitmq()
else:
raise ValueError(f"Unsupported broker type: {broker_type}. Supported: redis, rabbitmq")

self._is_initialized = True
logger.info(f"QueueMonitor initialized for queue '{self._config.queue_name}' (broker: {broker_type})")

except Exception as e:
logger.error(f"Failed to initialize QueueMonitor: {e}")
raise

def _init_redis(self) -> None:
"""Initialize Redis client."""
self._client = redis.from_url(self._config.broker_url)

# Test connection
self._client.ping()

def _init_rabbitmq(self) -> None:
"""Initialize RabbitMQ connection parameters."""
# Store connection parameters - we'll create connections as needed
self._connection_params = pika.URLParameters(self._config.broker_url)

# Test connection
connection = pika.BlockingConnection(self._connection_params)
connection.close()

def _get_redis_queue_length(self) -> int:
return self._client.llen(self._config.queue_name)

def _get_rabbitmq_queue_length(self) -> int:
connection = pika.BlockingConnection(self._connection_params)
try:
channel = connection.channel()

# Passive declaration - doesn't create queue, just gets info
result = channel.queue_declare(queue=self._config.queue_name, passive=True)

return result.method.message_count
finally:
connection.close()

def get_config(self) -> Dict[str, str]:
"""
Get the QueueMonitor configuration as a serializable dict.
Returns:
Dict with 'broker_url' and 'queue_name' keys
"""
return {
"broker_url": self._config.broker_url,
"queue_name": self._config.queue_name,
}

def get_queue_length(self) -> int:
"""
Get the current queue length from the broker.
Returns:
Number of pending tasks in the queue
"""
if not self._is_initialized:
logger.warning(f"QueueMonitor not initialized for queue '{self._config.queue_name}', returning 0")
return 0

try:
broker_type = self._config.broker_type

if broker_type == "redis":
queue_length = self._get_redis_queue_length()
elif broker_type == "rabbitmq":
queue_length = self._get_rabbitmq_queue_length()
else:
raise ValueError(f"Unsupported broker type: {broker_type}")

# Update cache
self._last_queue_length = queue_length

return queue_length

except Exception as e:
logger.warning(f"Failed to query queue length: {e}. Using last known value: {self._last_queue_length}")
return self._last_queue_length

def shutdown(self) -> None:
if self._client is not None:
try:
if hasattr(self._client, "close"):
self._client.close()
except Exception as e:
logger.warning(f"Error closing client: {e}")

self._client = None
self._is_initialized = False

def __del__(self):
self.shutdown()


@ray.remote(num_cpus=0)
class QueueMonitorActor(QueueMonitor):
"""
Ray actor version of QueueMonitor for direct access from ServeController.
This is used instead of a Serve deployment because the autoscaling policy
runs inside the ServeController, and using serve.get_deployment_handle()
from within the controller causes a deadlock.
"""
def __init__(self, config: QueueMonitorConfig):
super().__init__(config)
self.initialize()


def create_queue_monitor_actor(
deployment_name: str,
config: QueueMonitorConfig,
namespace: str = "serve",
) -> ray.actor.ActorHandle:
"""
Create a named QueueMonitor Ray actor.
Args:
deployment_name: Name of the deployment
config: QueueMonitorConfig with broker URL and queue name
namespace: Ray namespace for the actor
Returns:
ActorHandle for the QueueMonitor actor
"""
full_actor_name = f"{QUEUE_MONITOR_ACTOR_PREFIX}{deployment_name}"

# Check if actor already exists
try:
existing = ray.get_actor(full_actor_name, namespace=namespace)
logger.info(f"QueueMonitor actor '{full_actor_name}' already exists, reusing")

return existing
except ValueError:
pass # Actor doesn't exist, create it

actor = QueueMonitorActor.options(
name=full_actor_name,
namespace=namespace,
lifetime="detached",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs a comment here on why to choose a detached actor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we are creating the actor again in case the controller goes down, lifetime as detached won't be required anymore, removed it in #59430

).remote(config)

logger.info(f"Created QueueMonitor actor '{full_actor_name}' in namespace '{namespace}'")
return actor


def get_queue_monitor_actor(
deployment_name: str,
namespace: str = "serve",
) -> ray.actor.ActorHandle:
"""
Get an existing QueueMonitor actor by name.
Args:
deployment_name: Name of the deployment
namespace: Ray namespace
Returns:
ActorHandle for the QueueMonitor actor
Raises:
ValueError: If actor doesn't exist
"""
full_actor_name = f"{QUEUE_MONITOR_ACTOR_PREFIX}{deployment_name}"
return ray.get_actor(full_actor_name, namespace=namespace)


def delete_queue_monitor_actor(
deployment_name: str,
namespace: str = "serve",
) -> bool:
"""
Delete a QueueMonitor actor by name.
Args:
deployment_name: Name of the deployment
namespace: Ray namespace
Returns:
True if actor was deleted, False if it didn't exist
"""
full_actor_name = f"{QUEUE_MONITOR_ACTOR_PREFIX}{deployment_name}"
try:
actor = ray.get_actor(full_actor_name, namespace=namespace)
ray.kill(actor)
logger.info(f"Deleted QueueMonitor actor '{full_actor_name}'")
return True
except ValueError:
# Actor doesn't exist
return False
Loading