Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions python/ray/serve/_private/application_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from ray.serve._private.config import DeploymentConfig
from ray.serve._private.constants import (
DEFAULT_AUTOSCALING_POLICY_NAME,
DEFAULT_QUEUE_BASED_AUTOSCALING_POLICY,
DEFAULT_REQUEST_ROUTER_PATH,
RAY_SERVE_ENABLE_TASK_EVENTS,
SERVE_LOGGER_NAME,
Expand All @@ -39,6 +40,10 @@
from ray.serve._private.deployment_state import DeploymentStateManager
from ray.serve._private.endpoint_state import EndpointState
from ray.serve._private.logging_utils import configure_component_logger
from ray.serve._private.queue_monitor import (
QueueMonitorConfig,
create_queue_monitor_actor,
)
from ray.serve._private.storage.kv_store import KVStoreBase
from ray.serve._private.usage import ServeUsageTag
from ray.serve._private.utils import (
Expand Down Expand Up @@ -74,6 +79,87 @@
CHECKPOINT_KEY = "serve-application-state-checkpoint"


def _is_task_consumer_deployment(deployment_info: DeploymentInfo) -> bool:
"""Check if a deployment is a TaskConsumer."""
try:
deployment_def = deployment_info.replica_config.deployment_def
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this access is unsafe because it deserializes user code in controller, controller does not have the user's runtime env to execute this code.

For example if user code depends on pytorch and torch is mentioned as a pip dependency in deployment's runtime_env, then this call here will fail in cluster mode.

if deployment_def is None:
return False
return getattr(deployment_def, "_is_task_consumer", False)
except Exception as e:
logger.debug(f"Error checking if deployment is TaskConsumer: {e}")
return False


def _get_queue_monitor_config(
deployment_info: DeploymentInfo,
) -> Optional[QueueMonitorConfig]:
"""Extract QueueMonitorConfig from a TaskConsumer deployment."""
try:
deployment_def = deployment_info.replica_config.deployment_def
if hasattr(deployment_def, "get_queue_monitor_config"):
return deployment_def.get_queue_monitor_config()
except Exception as e:
logger.warning(f"Failed to get queue monitor config: {e}")
return None


def _configure_queue_based_autoscaling_for_task_consumers(
deployment_infos: Dict[str, DeploymentInfo]
) -> None:
"""
Configure queue-based autoscaling for TaskConsumers.

For TaskConsumer deployments with autoscaling enabled and no custom policy,
this function switches the autoscaling policy to queue-based autoscaling.

Args:
deployment_infos: Deployment infos dict
"""
for deployment_name, deployment_info in deployment_infos.items():
is_task_consumer = _is_task_consumer_deployment(deployment_info)
has_autoscaling = (
deployment_info.deployment_config.autoscaling_config is not None
)

# Set queue-based autoscaling policy on TaskConsumer only if user hasn't set a custom policy. This respects user's explicit choice.
if is_task_consumer and has_autoscaling:
logger.info(
f"Deployment '{deployment_name}' is a TaskConsumer with autoscaling enabled"
)
is_default_policy = (
deployment_info.deployment_config.autoscaling_config.policy.is_default_policy_function()
)

if is_default_policy:
queue_monitor_config = _get_queue_monitor_config(deployment_info)
if queue_monitor_config is not None:
# Create QueueMonitor as a Ray actor (not Serve deployment)
# This avoids deadlock when autoscaling policy queries it from controller
try:
create_queue_monitor_actor(
deployment_name=deployment_name,
config=queue_monitor_config,
)
except Exception as e:
logger.error(
f"Failed to create QueueMonitor actor for '{deployment_name}': {e}"
)
continue
Comment on lines +144 to +148
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why fail silently, should we fail deployment here


# Switch to queue-based autoscaling policy
deployment_info.deployment_config.autoscaling_config.policy = (
AutoscalingPolicy(
policy_function=DEFAULT_QUEUE_BASED_AUTOSCALING_POLICY
)
)
logger.info(
f"Switched TaskConsumer '{deployment_name}' to queue-based autoscaling policy"
)

return deployment_infos


class BuildAppStatus(Enum):
"""Status of the build application task."""

Expand Down Expand Up @@ -1220,6 +1306,10 @@ def deploy_apps(
)
for params in deployment_args
}

# Configure queue-based autoscaling for TaskConsumers
_configure_queue_based_autoscaling_for_task_consumers(deployment_infos)
Comment on lines +1310 to +1311
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this only takes effect in imperative mode. If user try to deploy through config then this code path is not executed


self._application_states[name].deploy_app(
deployment_infos, external_scaler_enabled
)
Expand Down
5 changes: 5 additions & 0 deletions python/ray/serve/_private/autoscaling_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
aggregate_timeseries,
merge_instantaneous_total,
)
from ray.serve._private.queue_monitor import delete_queue_monitor_actor
from ray.serve._private.usage import ServeUsageTag
from ray.serve._private.utils import get_capacity_adjusted_num_replicas
from ray.serve.config import AutoscalingContext, AutoscalingPolicy
Expand Down Expand Up @@ -941,6 +942,10 @@ def deregister_deployment(self, deployment_id: DeploymentID):
)
app_state.deregister_deployment(deployment_id)

# Clean up QueueMonitor actor if it exists for this deployment
# This is needed for TaskConsumer deployments with queue-based autoscaling
delete_queue_monitor_actor(deployment_id.name)

def register_application(
self,
app_name: ApplicationName,
Expand Down
5 changes: 5 additions & 0 deletions python/ray/serve/_private/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,11 @@
"ray.serve.autoscaling_policy:default_autoscaling_policy"
)

# The default queue-based autoscaling policy to use for TaskConsumers if none is specified.
DEFAULT_QUEUE_BASED_AUTOSCALING_POLICY = (
"ray.serve.autoscaling_policy:default_queue_based_autoscaling_policy"
)

# Feature flag to enable collecting all queued and ongoing request
# metrics at handles instead of replicas. ON by default.
RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE = get_env_bool(
Expand Down
Loading