Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions python/ray/serve/_private/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,11 @@
"ray.serve.autoscaling_policy:default_autoscaling_policy"
)

# The default combined workload autoscaling policy to use for TaskConsumers if none is specified.
DEFAULT_COMBINED_WORKLOAD_AUTOSCALING_POLICY = (
"ray.serve.autoscaling_policy:default_combined_workload_autoscaling_policy"
)
Comment on lines +428 to +430
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be named better, combined_workload is overloaded term

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@abrarsheikh how about DEFAULT_QUEUE_AWARE_AUTOSCALING_POLICY? it seems okaay to me and not that overloaded, lmk your view on it, will then amend the code accordingly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would choose default_async_inferance_autoscaling_policy .

default_combined_workload_autoscaling_policy: to the reader its not clear what workloads we are combining
DEFAULT_QUEUE_AWARE_AUTOSCALING_POLICY: which queue, the other autoscaling policy is also queue aware, but the queue there refers to ongoing requests.


# Feature flag to enable collecting all queued and ongoing request
# metrics at handles instead of replicas. ON by default.
RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE = get_env_bool(
Expand Down
28 changes: 20 additions & 8 deletions python/ray/serve/_private/queue_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,18 @@
QUEUE_MONITOR_ACTOR_PREFIX = "QUEUE_MONITOR::"


def get_queue_monitor_actor_name(deployment_name: str) -> str:
"""Get the Ray actor name for a deployment's QueueMonitor.

Args:
deployment_name: Name of the deployment

Returns:
The full actor name in format "QUEUE_MONITOR::<deployment_name>"
"""
return f"{QUEUE_MONITOR_ACTOR_PREFIX}{deployment_name}"


@ray.remote(num_cpus=0)
class QueueMonitorActor:
"""
Expand Down Expand Up @@ -99,24 +111,24 @@ def create_queue_monitor_actor(
Returns:
ActorHandle for the QueueMonitor actor
"""
full_actor_name = f"{QUEUE_MONITOR_ACTOR_PREFIX}{deployment_name}"
actor_name = get_queue_monitor_actor_name(deployment_name)

# Check if actor already exists
try:
existing = get_queue_monitor_actor(deployment_name, namespace=namespace)
logger.info(f"QueueMonitor actor '{full_actor_name}' already exists, reusing")
logger.info(f"QueueMonitor actor '{actor_name}' already exists, reusing")
return existing
except ValueError:
actor = QueueMonitorActor.options(
name=full_actor_name,
name=actor_name,
namespace=namespace,
max_restarts=-1,
max_task_retries=-1,
resources={HEAD_NODE_RESOURCE_NAME: 0.001},
).remote(broker_url, queue_name, rabbitmq_http_url)

logger.info(
f"Created QueueMonitor actor '{full_actor_name}' in namespace '{namespace}'"
f"Created QueueMonitor actor '{actor_name}' in namespace '{namespace}'"
)
return actor

Expand All @@ -138,8 +150,8 @@ def get_queue_monitor_actor(
Raises:
ValueError: If actor doesn't exist
"""
full_actor_name = f"{QUEUE_MONITOR_ACTOR_PREFIX}{deployment_name}"
return ray.get_actor(full_actor_name, namespace=namespace)
actor_name = get_queue_monitor_actor_name(deployment_name)
return ray.get_actor(actor_name, namespace=namespace)


def kill_queue_monitor_actor(
Expand All @@ -156,8 +168,8 @@ def kill_queue_monitor_actor(
Raises:
ValueError: If actor doesn't exist
"""
full_actor_name = f"{QUEUE_MONITOR_ACTOR_PREFIX}{deployment_name}"
actor_name = get_queue_monitor_actor_name(deployment_name)
actor = get_queue_monitor_actor(deployment_name, namespace=namespace)

ray.kill(actor, no_restart=True)
logger.info(f"Deleted QueueMonitor actor '{full_actor_name}'")
logger.info(f"Deleted QueueMonitor actor '{actor_name}'")
161 changes: 140 additions & 21 deletions python/ray/serve/autoscaling_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
import math
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

many of the changes in this file are overlapping with changes in this PR #58857. I suggest reviewing the other PR, makes sure its compatible with what we need to do here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack, will review the other PR

from typing import Any, Dict, Optional, Tuple

import ray
from ray.serve._private.constants import (
CONTROL_LOOP_INTERVAL_S,
SERVE_AUTOSCALING_DECISION_COUNTERS_KEY,
SERVE_LOGGER_NAME,
SERVE_NAMESPACE,
)
from ray.serve._private.queue_monitor import get_queue_monitor_actor_name
from ray.serve.config import AutoscalingConfig, AutoscalingContext
from ray.util.annotations import PublicAPI

Expand Down Expand Up @@ -119,62 +122,178 @@ def replica_queue_length_autoscaling_policy(
)
return curr_target_num_replicas, policy_state

decision_num_replicas = curr_target_num_replicas

desired_num_replicas = _calculate_desired_num_replicas(
config,
total_num_requests,
num_running_replicas=num_running_replicas,
override_min_replicas=capacity_adjusted_min_replicas,
override_max_replicas=capacity_adjusted_max_replicas,
)
# Scale up.

decision_num_replicas, decision_counter = _apply_scaling_decision_smoothing(
desired_num_replicas=desired_num_replicas,
curr_target_num_replicas=curr_target_num_replicas,
decision_counter=decision_counter,
config=config,
)

policy_state["decision_counter"] = decision_counter
policy_state[SERVE_AUTOSCALING_DECISION_COUNTERS_KEY] = decision_counter
return decision_num_replicas, policy_state


@PublicAPI(stability="alpha")
def combined_workload_autoscaling_policy(
ctx: AutoscalingContext,
) -> Tuple[int, Dict[str, Any]]:
"""
Autoscaling policy for TaskConsumer deployments based on total workload.

This policy scales replicas based on the combined workload from:
- Pending tasks in the message queue (via QueueMonitor)
- Ongoing HTTP requests (from context)

Formula:
total_workload = queue_length + total_num_requests
desired_replicas = ceil(total_workload / target_ongoing_requests)

Args:
ctx: AutoscalingContext containing metrics, config, and state

Returns:
Tuple of (desired_num_replicas, updated_policy_state)
"""

# Extract state
policy_state: Dict[str, Any] = ctx.policy_state
current_num_replicas: int = ctx.current_num_replicas
curr_target_num_replicas: int = ctx.target_num_replicas
total_num_requests: int = ctx.total_num_requests
config: Optional[AutoscalingConfig] = ctx.config
capacity_adjusted_min_replicas: int = ctx.capacity_adjusted_min_replicas
capacity_adjusted_max_replicas: int = ctx.capacity_adjusted_max_replicas

# Get decision counter from state (for smoothing)
decision_counter = policy_state.get(SERVE_AUTOSCALING_DECISION_COUNTERS_KEY, 0)

# === STEP 1: Get queue length from QueueMonitor actor ===
try:
queue_monitor_actor_name = get_queue_monitor_actor_name(ctx.deployment_name)
queue_monitor_actor = ray.get_actor(
queue_monitor_actor_name, namespace=SERVE_NAMESPACE
)
queue_length = ray.get(
queue_monitor_actor.get_queue_length.remote(), timeout=5.0
)
Comment on lines +185 to +187
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would make 1 RFC call per every controller iteration. What is the impact of this on the scalability of the controller? Do we need to query queue length in every iteration?

except Exception as e:
logger.warning(
f"[{ctx.deployment_name}] Could not query QueueMonitor: {e}, maintaining {curr_target_num_replicas} replicas"
)
return curr_target_num_replicas, policy_state

# Calculate total workload = queue tasks + HTTP requests
total_workload = queue_length + total_num_requests

if current_num_replicas == 0:
# When 0 replicas and there's workload, scale up the replicas
if total_workload > 0:
return (
max(
math.ceil(1 * config.get_upscaling_factor()),
curr_target_num_replicas,
),
policy_state,
)
return curr_target_num_replicas, policy_state

# === STEP 2: Calculate desired replicas ===
desired_num_replicas = _calculate_desired_num_replicas(
config,
total_num_requests=total_workload,
num_running_replicas=current_num_replicas,
override_min_replicas=capacity_adjusted_min_replicas,
override_max_replicas=capacity_adjusted_max_replicas,
)

# === STEP 3: Apply smoothing (same logic as default policy) ===
decision_num_replicas, decision_counter = _apply_scaling_decision_smoothing(
desired_num_replicas=desired_num_replicas,
curr_target_num_replicas=curr_target_num_replicas,
decision_counter=decision_counter,
config=config,
)

# Update policy state
policy_state[SERVE_AUTOSCALING_DECISION_COUNTERS_KEY] = decision_counter
return decision_num_replicas, policy_state


def _apply_scaling_decision_smoothing(
desired_num_replicas: int,
curr_target_num_replicas: int,
decision_counter: int,
config: AutoscalingConfig,
) -> Tuple[int, int]:
"""
Apply smoothing logic to prevent oscillation in scaling decisions.

This function implements delay-based smoothing: a scaling decision must be
made for a consecutive number of periods before actually scaling.

Args:
desired_num_replicas: The calculated desired number of replicas.
curr_target_num_replicas: Current target number of replicas.
decision_counter: Counter tracking consecutive scaling decisions.
Positive = consecutive scale-up decisions, negative = scale-down.
config: Autoscaling configuration containing delay settings.

Returns:
Tuple of (decision_num_replicas, updated_decision_counter).
"""
decision_num_replicas = curr_target_num_replicas

# Scale up
if desired_num_replicas > curr_target_num_replicas:
# If the previous decision was to scale down (the counter was
# negative), we reset it and then increment it (set to 1).
# Otherwise, just increment.
if decision_counter < 0:
decision_counter = 0
decision_counter += 1

# Only actually scale the replicas if we've made this decision for
# 'scale_up_consecutive_periods' in a row.
# Only scale after upscale_delay_s
if decision_counter > int(config.upscale_delay_s / CONTROL_LOOP_INTERVAL_S):
decision_counter = 0
decision_num_replicas = desired_num_replicas

# Scale down.
# Scale down
elif desired_num_replicas < curr_target_num_replicas:
# If the previous decision was to scale up (the counter was
# positive), reset it to zero before decrementing.

if decision_counter > 0:
decision_counter = 0
decision_counter -= 1

# Downscaling to zero is only allowed from 1 -> 0
is_scaling_to_zero = curr_target_num_replicas == 1
# Determine the delay to use
is_scaling_to_zero = curr_target_num_replicas == 1 and desired_num_replicas == 0
if is_scaling_to_zero:
# Check if the downscale_to_zero_delay_s is set
# Use downscale_to_zero_delay_s if set, otherwise fall back to downscale_delay_s
if config.downscale_to_zero_delay_s is not None:
delay_s = config.downscale_to_zero_delay_s
else:
delay_s = config.downscale_delay_s
else:
delay_s = config.downscale_delay_s
# The desired_num_replicas>0 for downscaling cases other than 1->0
# Ensure desired_num_replicas >= 1 for non-zero scaling cases
desired_num_replicas = max(1, desired_num_replicas)
# Only actually scale the replicas if we've made this decision for
# 'scale_down_consecutive_periods' in a row.

# Only scale after delay
if decision_counter < -int(delay_s / CONTROL_LOOP_INTERVAL_S):
decision_counter = 0
decision_num_replicas = desired_num_replicas
# Do nothing.

# No change
else:
decision_counter = 0

policy_state[SERVE_AUTOSCALING_DECISION_COUNTERS_KEY] = decision_counter
return decision_num_replicas, policy_state
return decision_num_replicas, decision_counter


default_autoscaling_policy = replica_queue_length_autoscaling_policy

default_combined_workload_autoscaling_policy = combined_workload_autoscaling_policy
Loading