Skip to content

Conversation

@harshit-anyscale
Copy link
Contributor

@harshit-anyscale harshit-anyscale commented Dec 15, 2025

Summary

This PR is part 1 of 3 for adding queue-based autoscaling support for Ray Serve TaskConsumer deployments.

Background

TaskConsumers are workloads that consume tasks from message queues (Redis, RabbitMQ), and their scaling needs are fundamentally different from HTTP-based deployments. Instead of scaling based on HTTP request load, TaskConsumers should scale based on the number of pending tasks in the message queue.

Overall Architecture (Full Feature)

  ┌─────────────────┐      ┌──────────────────┐      ┌─────────────────┐
  │  Message Queue  │◄─────│  QueueMonitor    │      │ ServeController │
  │  (Redis/RMQ)    │      │  Actor           │◄─────│ Autoscaler      │
  └─────────────────┘      └──────────────────┘      └─────────────────┘
                                   │                         │
                                   │ get_queue_length()      │
                                   └─────────────────────────┘
                                             │
                                             ▼
                                ┌───────────────────────────┐
                                │ queue_based_autoscaling   │
                                │ _policy()                 │
                                │ desired = ceil(len/target)│
                                └───────────────────────────┘

The full implementation consists of three PRs:

PR Description Status
PR 1 (This PR) QueueMonitor actor for querying broker queue length 🔄 Current
PR 2 Introduce default Queue-based autoscaling policy Upcoming
PR 3 Integration with TaskConsumer deployments Upcoming

This PR: QueueMonitor Actor

This PR introduces the QueueMonitor Ray actor that queries message brokers to get queue length for autoscaling decisions.

Key Features

  • Multi-broker support: Redis and RabbitMQ
  • Lightweight Ray actor: Runs with num_cpus=0, and pika and redis in runtime env
  • Fault tolerance: Caches last known queue length on query failures
  • Named actor pattern: QUEUE_MONITOR::<deployment_name> for easy lookup

Queue Length Calculation

For accurate autoscaling, QueueMonitor returns total workload (pending tasks):

Broker Pending Tasks
Redis LLEN
RabbitMQ messages_ready

Components

  1. QueueMonitorConfig - Configuration dataclass with broker URL and queue name
  2. QueueMonitor - Core class that initializes broker connections and queries queue length
  3. QueueMonitorActor - Ray actor wrapper for remote access
  4. Helper functions:
    - create_queue_monitor_actor() - Create named actor
    - get_queue_monitor_actor() - Lookup existing actor
    - delete_queue_monitor_actor() - Cleanup on deployment deletion

Test Plan

  • Unit tests for QueueMonitorConfig (7 tests)
    • Broker type detection (Redis, RabbitMQ, SQS, unknown)
    • Config value storage
  • Unit tests for QueueMonitor (4 tests)
    • Redis queue length retrieval (pending)
    • RabbitMQ queue length retrieval
    • Error handling with cached value fallback

@harshit-anyscale harshit-anyscale self-assigned this Dec 15, 2025
@harshit-anyscale harshit-anyscale changed the title add queue monitor [1/n] queue-based autoscaling - add queue monitor Dec 15, 2025
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a QueueMonitor actor for monitoring queue lengths in Redis and RabbitMQ, which is a valuable addition for asynchronous task processing. The implementation is generally well-structured and includes unit tests. However, I've identified a significant performance concern with the RabbitMQ connection handling that should be addressed. Additionally, there's a minor inconsistency in broker type detection and an opportunity to improve test coverage for the new actor helper functions. My detailed feedback is in the comments below.

@harshit-anyscale harshit-anyscale changed the title [1/n] queue-based autoscaling - add queue monitor [1/3] queue-based autoscaling - add queue monitor Dec 15, 2025
@harshit-anyscale harshit-anyscale added the go add ONLY when ready to merge, run all tests label Dec 15, 2025
@harshit-anyscale harshit-anyscale force-pushed the queue-based-autoscaling-part-1 branch from 14a17b6 to a982d4b Compare December 15, 2025 11:47
@harshit-anyscale harshit-anyscale marked this pull request as ready for review December 15, 2025 11:47
@harshit-anyscale harshit-anyscale requested a review from a team as a code owner December 15, 2025 12:19
@ray-gardener ray-gardener bot added the serve Ray Serve Related Issue label Dec 15, 2025
Signed-off-by: harshit <[email protected]>
@harshit-anyscale harshit-anyscale force-pushed the queue-based-autoscaling-part-1 branch from ce6a041 to ad81408 Compare December 17, 2025 18:04
Copy link
Collaborator

@aslonnie aslonnie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm.. does not need my review any more?

seems that import pika is still in there though?

Signed-off-by: harshit <[email protected]>
Signed-off-by: harshit <[email protected]>
@harshit-anyscale
Copy link
Contributor Author

harshit-anyscale commented Dec 19, 2025

hmm.. does not need my review any more?

seems that import pika is still in there though?

nope, import pika is now resolved as well.

Signed-off-by: harshit <[email protected]>
Signed-off-by: harshit <[email protected]>
Signed-off-by: harshit <[email protected]>
Signed-off-by: harshit <[email protected]>
Signed-off-by: harshit <[email protected]>
if queues is not None:
for q in queues:
if q.get("name") == self._queue_name:
queue_length = q.get("messages")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RabbitMQ uses wrong field causing inconsistent queue metrics

Medium Severity

The code retrieves q.get("messages") from RabbitMQ, but the PR description explicitly specifies that messages_ready should be used. In RabbitMQ, messages equals messages_ready + messages_unacknowledged (in-flight), while messages_ready counts only pending tasks. This creates inconsistent behavior: Redis llen returns only pending messages (equivalent to messages_ready), but RabbitMQ returns pending plus in-progress messages. For autoscaling, this could cause RabbitMQ deployments to scale differently than Redis deployments with the same actual workload.

Fix in Cursor Fix in Web

Copy link
Contributor

@abrarsheikh abrarsheikh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lg2m. Let some nits

Signed-off-by: harshit <[email protected]>
Signed-off-by: harshit <[email protected]>
Signed-off-by: harshit <[email protected]>
Signed-off-by: harshit <[email protected]>
Signed-off-by: harshit <[email protected]>
@abrarsheikh abrarsheikh merged commit f580a27 into master Jan 8, 2026
6 checks passed
@abrarsheikh abrarsheikh deleted the queue-based-autoscaling-part-1 branch January 8, 2026 20:07
elliot-barn pushed a commit that referenced this pull request Jan 11, 2026
### Summary

This PR is part 1 of 3 for adding queue-based autoscaling support for
Ray Serve TaskConsumer deployments.

###  Background

TaskConsumers are workloads that consume tasks from message queues
(Redis, RabbitMQ), and their scaling needs are fundamentally different
from HTTP-based deployments. Instead of scaling based on HTTP request
load, TaskConsumers should scale based on the number of pending tasks in
the message queue.

###  Overall Architecture (Full Feature)
```
  ┌─────────────────┐      ┌──────────────────┐      ┌─────────────────┐
  │  Message Queue  │◄─────│  QueueMonitor    │      │ ServeController │
  │  (Redis/RMQ)    │      │  Actor           │◄─────│ Autoscaler      │
  └─────────────────┘      └──────────────────┘      └─────────────────┘
                                   │                         │
                                   │ get_queue_length()      │
                                   └─────────────────────────┘
                                             │
                                             ▼
                                ┌───────────────────────────┐
                                │ queue_based_autoscaling   │
                                │ _policy()                 │
                                │ desired = ceil(len/target)│
                                └───────────────────────────┘
```
  The full implementation consists of three PRs:

| PR | Description | Status |

|----------------|-----------------------------------------------------|------------|
| PR 1 (This PR) | QueueMonitor actor for querying broker queue length |
🔄 Current |
| PR 2 | Introduce default Queue-based autoscaling policy | Upcoming |
| PR 3 | Integration with TaskConsumer deployments | Upcoming |

###  This PR: QueueMonitor Actor

This PR introduces the QueueMonitor Ray actor that queries message
brokers to get queue length for autoscaling decisions.

###  Key Features

  - Multi-broker support: Redis and RabbitMQ
- Lightweight Ray actor: Runs with num_cpus=0, and pika and redis in
runtime env
  - Fault tolerance: Caches last known queue length on query failures
- Named actor pattern: QUEUE_MONITOR::<deployment_name> for easy lookup

###  Queue Length Calculation

For accurate autoscaling, QueueMonitor returns total workload (pending
tasks):

  | Broker   | Pending Tasks  |
  |----------|----------------|
  | Redis    | LLEN <queue>   |
  | RabbitMQ | messages_ready  |


###  Components

1. QueueMonitorConfig - Configuration dataclass with broker URL and
queue name
2. QueueMonitor - Core class that initializes broker connections and
queries queue length
  3. QueueMonitorActor - Ray actor wrapper for remote access
  4. Helper functions:
    - create_queue_monitor_actor() - Create named actor
    - get_queue_monitor_actor() - Lookup existing actor
    - delete_queue_monitor_actor() - Cleanup on deployment deletion

  
###  Test Plan

  - Unit tests for QueueMonitorConfig (7 tests)
    - Broker type detection (Redis, RabbitMQ, SQS, unknown)
    - Config value storage
  - Unit tests for QueueMonitor (4 tests)
    - Redis queue length retrieval (pending)
    - RabbitMQ queue length retrieval
    - Error handling with cached value fallback

---------

Signed-off-by: harshit <[email protected]>
Signed-off-by: elliot-barn <[email protected]>
AYou0207 pushed a commit to AYou0207/ray that referenced this pull request Jan 13, 2026
### Summary

This PR is part 1 of 3 for adding queue-based autoscaling support for
Ray Serve TaskConsumer deployments.

###  Background

TaskConsumers are workloads that consume tasks from message queues
(Redis, RabbitMQ), and their scaling needs are fundamentally different
from HTTP-based deployments. Instead of scaling based on HTTP request
load, TaskConsumers should scale based on the number of pending tasks in
the message queue.

###  Overall Architecture (Full Feature)
```
  ┌─────────────────┐      ┌──────────────────┐      ┌─────────────────┐
  │  Message Queue  │◄─────│  QueueMonitor    │      │ ServeController │
  │  (Redis/RMQ)    │      │  Actor           │◄─────│ Autoscaler      │
  └─────────────────┘      └──────────────────┘      └─────────────────┘
                                   │                         │
                                   │ get_queue_length()      │
                                   └─────────────────────────┘
                                             │
                                             ▼
                                ┌───────────────────────────┐
                                │ queue_based_autoscaling   │
                                │ _policy()                 │
                                │ desired = ceil(len/target)│
                                └───────────────────────────┘
```
  The full implementation consists of three PRs:

| PR | Description | Status |

|----------------|-----------------------------------------------------|------------|
| PR 1 (This PR) | QueueMonitor actor for querying broker queue length |
🔄 Current |
| PR 2 | Introduce default Queue-based autoscaling policy | Upcoming |
| PR 3 | Integration with TaskConsumer deployments | Upcoming |

###  This PR: QueueMonitor Actor

This PR introduces the QueueMonitor Ray actor that queries message
brokers to get queue length for autoscaling decisions.

###  Key Features

  - Multi-broker support: Redis and RabbitMQ
- Lightweight Ray actor: Runs with num_cpus=0, and pika and redis in
runtime env
  - Fault tolerance: Caches last known queue length on query failures
- Named actor pattern: QUEUE_MONITOR::<deployment_name> for easy lookup

###  Queue Length Calculation

For accurate autoscaling, QueueMonitor returns total workload (pending
tasks):

  | Broker   | Pending Tasks  |
  |----------|----------------|
  | Redis    | LLEN <queue>   |
  | RabbitMQ | messages_ready  |

###  Components

1. QueueMonitorConfig - Configuration dataclass with broker URL and
queue name
2. QueueMonitor - Core class that initializes broker connections and
queries queue length
  3. QueueMonitorActor - Ray actor wrapper for remote access
  4. Helper functions:
    - create_queue_monitor_actor() - Create named actor
    - get_queue_monitor_actor() - Lookup existing actor
    - delete_queue_monitor_actor() - Cleanup on deployment deletion

###  Test Plan

  - Unit tests for QueueMonitorConfig (7 tests)
    - Broker type detection (Redis, RabbitMQ, SQS, unknown)
    - Config value storage
  - Unit tests for QueueMonitor (4 tests)
    - Redis queue length retrieval (pending)
    - RabbitMQ queue length retrieval
    - Error handling with cached value fallback

---------

Signed-off-by: harshit <[email protected]>
Signed-off-by: jasonwrwang <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

go add ONLY when ready to merge, run all tests serve Ray Serve Related Issue

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants