Skip to content

Conversation

@harshit-anyscale
Copy link
Contributor

@harshit-anyscale harshit-anyscale commented Dec 10, 2025

This PR adds queue-based autoscaling for Ray Serve TaskConsumer deployments. TaskConsumers are workloads that consume tasks from message queues (Redis, RabbitMQ), and their scaling needs are fundamentally different from HTTP-based deployments.

Instead of scaling based on HTTP request load, TaskConsumers should scale based on the number of pending tasks in the message queue.

Key Features

  • Automatic queue depth monitoring via QueueMonitor Ray actor
  • New queue_based_autoscaling_policy that scales replicas based on ceil(queue_length / target_ongoing_requests)
  • Transparent integration: TaskConsumers with autoscaling enabled automatically use queue-based scaling
  • Support for Redis and RabbitMQ brokers
  • Actor recovery mechanism for fault tolerance

Architecture


  ┌─────────────────┐      ┌──────────────────┐      ┌─────────────────┐
  │  Message Queue  │◄─────│  QueueMonitor    │      │ ServeController │
  │  (Redis/RMQ)    │      │  Actor           │◄─────│ Autoscaler      │
  └─────────────────┘      └──────────────────┘      └─────────────────┘
                                   │                         │
                                   │ get_queue_length()      │
                                   └─────────────────────────┘
                                             │
                                             ▼
                                ┌───────────────────────────┐
                                │ queue_based_autoscaling   │
                                │ _policy()                 │
                                │ desired = ceil(len/target)│
                                └───────────────────────────┘

Components

  1. QueueMonitor Actor (queue_monitor.py)
    - Lightweight Ray actor that queries queue length from the message broker
    - Supports Redis (using LLEN command) and RabbitMQ (using passive queue declaration)
    - Named actor format: QUEUE_MONITOR::<deployment_name>
    - Detached lifecycle with automatic cleanup on deployment deletion
  2. Queue-Based Autoscaling Policy (autoscaling_policy.py)
    - New policy function: queue_based_autoscaling_policy
    - Formula: desired_replicas = ceil(queue_length / target_ongoing_requests)
    - Reuses existing smoothing/delay logic to prevent oscillation
    - Stores QueueMonitor config in policy_state for actor recovery
  3. Automatic Policy Switching (application_state.py)
    - Detects TaskConsumer deployments via _is_task_consumer marker
    - Automatically creates QueueMonitor actor and switches to queue-based policy
    - Only applies when user hasn't specified a custom autoscaling policy
  4. Lifecycle Management (autoscaling_state.py)
    - QueueMonitor actors are cleaned up when deployments are deleted
    - Prevents actor leaks and ensures test isolation

Files Changed

File Changes
python/ray/serve/_private/queue_monitor.py New file - QueueMonitor actor for broker queries
python/ray/serve/autoscaling_policy.py Add queue_based_autoscaling_policy, refactor smoothing logic
python/ray/serve/_private/application_state.py Auto-configure queue-based autoscaling for TaskConsumers
python/ray/serve/_private/autoscaling_state.py Clean up QueueMonitor actors on deployment deletion
python/ray/serve/_private/constants.py Add DEFAULT_QUEUE_BASED_AUTOSCALING_POLICY constant
python/ray/serve/task_consumer.py Add _is_task_consumer marker and get_queue_monitor_config()
python/ray/serve/tests/test_task_processor.py Integration tests for queue-based autoscaling
python/ray/serve/tests/unit/test_queue_autoscaling_policy.py New file - Unit tests for policy
python/ray/serve/tests/unit/test_queue_monitor.py New file - Unit tests for QueueMonitor

Test Plan

  • Unit tests for queue_based_autoscaling_policy (19 tests)
    • Basic scaling formula verification
    • Scale from zero handling
    • Min/max replicas enforcement
    • Upscale/downscale delays
    • QueueMonitor unavailability handling
    • Actor recovery from policy_state
  • Unit tests for QueueMonitor (8 tests)
    • Redis and RabbitMQ initialization
    • Queue length queries
    • Error handling and fallback to cached values
  • Integration tests (3 tests)
    • test_task_consumer_scales_up_based_on_queue_depth - Verifies scaling from 1→4 replicas with 20 queued tasks
    • test_task_consumer_autoscaling_respects_max_replicas - Verifies max_replicas cap
    • test_task_consumer_autoscaling_respects_min_replicas - Verifies min_replicas floor

Follow-up PRs

  1. PR to add the number of prefetch tasks + number of tasks on which celery is currently executing in the queue_monitor scaling logic.
  2. Additional test cases (e.g., edge cases, failure scenarios, multi-deployment tests) will be added in a follow-up PR.

Signed-off-by: harshit <[email protected]>
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces queue-based autoscaling for TaskConsumer deployments in Ray Serve. A new QueueMonitor component, implemented as a Ray actor, is added to directly query message broker queue lengths (supporting Redis and RabbitMQ). The task_consumer decorator is updated to mark deployments as TaskConsumer and provide a QueueMonitorConfig for their associated queue. During application deployment, ApplicationState now identifies TaskConsumer deployments with default autoscaling policies, creates a QueueMonitor actor for them, and switches their policy to the new queue_based_autoscaling_policy. This new policy calculates desired replicas based on queue depth (ceil(queue_length / target_ongoing_requests)), respects min_replicas and max_replicas, handles scaling from zero, and incorporates existing smoothing logic for scaling decisions. The AutoscalingState is updated to clean up QueueMonitor actors when deployments are deregistered. Unit and integration tests were added to validate the QueueMonitor functionality, the new autoscaling policy, and its interaction with TaskConsumer deployments, including scenarios for scaling up, respecting replica bounds, and actor recovery. Review comments suggest improving exception handling by catching more specific exceptions or logging full tracebacks, and updating the docstring for _apply_scaling_decision_smoothing to accurately reflect its parameters. Additionally, it was noted that relying on __del__ for resource cleanup in QueueMonitor is discouraged, recommending explicit shutdown() methods or context managers instead.

@harshit-anyscale harshit-anyscale changed the title lint changes add queue length based autoscaling Dec 10, 2025
@harshit-anyscale harshit-anyscale self-assigned this Dec 10, 2025
@harshit-anyscale harshit-anyscale added the go add ONLY when ready to merge, run all tests label Dec 11, 2025
Copy link
Contributor

@abrarsheikh abrarsheikh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i suggest breaking down this PR into smaller chunks, ok to keep it like this if you want to iterate on the implementation

def _is_task_consumer_deployment(deployment_info: DeploymentInfo) -> bool:
"""Check if a deployment is a TaskConsumer."""
try:
deployment_def = deployment_info.replica_config.deployment_def
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this access is unsafe because it deserializes user code in controller, controller does not have the user's runtime env to execute this code.

For example if user code depends on pytorch and torch is mentioned as a pip dependency in deployment's runtime_env, then this call here will fail in cluster mode.

Comment on lines +1310 to +1311
# Configure queue-based autoscaling for TaskConsumers
_configure_queue_based_autoscaling_for_task_consumers(deployment_infos)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this only takes effect in imperative mode. If user try to deploy through config then this code path is not executed

Comment on lines +144 to +148
except Exception as e:
logger.error(
f"Failed to create QueueMonitor actor for '{deployment_name}': {e}"
)
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why fail silently, should we fail deployment here

actor = QueueMonitorActor.options(
name=full_actor_name,
namespace=namespace,
lifetime="detached",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs a comment here on why to choose a detached actor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we are creating the actor again in case the controller goes down, lifetime as detached won't be required anymore, removed it in #59430

return "unknown"


class QueueMonitor:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did we not go with the flower here? Doesn't that abstract away all this logic here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it does abstract all this logic but in the case of flower, we will have to make a remote HTTP call, then the flower will intercept it, read it, and then make the call to broker, all this combined will be more time-consuming than directly calling the broker. Apart from that, we will have to port-management for running N instances of flower inside serve controller. It is doable, but given these details, writing plain vanilla code to get queue length seems more efficient and simple.

Screenshot 2025-12-15 at 13 03 02

@harshit-anyscale
Copy link
Contributor Author

harshit-anyscale commented Dec 15, 2025

creating new small PRs for this bigger change.
will address these comments in the new ones

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants