diff --git a/python/ray/serve/_private/autoscaling_state.py b/python/ray/serve/_private/autoscaling_state.py index 187ebdbb72cc..bb9bef7b8c8d 100644 --- a/python/ray/serve/_private/autoscaling_state.py +++ b/python/ray/serve/_private/autoscaling_state.py @@ -6,6 +6,7 @@ from ray.serve._private.common import ( RUNNING_REQUESTS_KEY, ApplicationName, + ApplicationSnapshot, AutoscalingSnapshotError, AutoscalingStatus, DeploymentID, @@ -821,6 +822,7 @@ def __init__( # user defined policy returns a dictionary of state that is persisted between autoscaling decisions # content of the dictionary is determined by the user defined policy but is keyed by deployment id self._policy_state: Optional[Dict[DeploymentID, Dict]] = None + self._cached_application_snapshot: Optional[ApplicationSnapshot] = None @property def deployments(self): @@ -852,6 +854,45 @@ def register( def has_policy(self) -> bool: return self._policy is not None + def _create_application_snapshot( + self, + *, + autoscaling_contexts: Dict[DeploymentID, AutoscalingContext], + decisions: Dict[DeploymentID, int], + ) -> ApplicationSnapshot: + """Create a fully-populated ApplicationSnapshot using data from + app-level autoscaling decision. + """ + total_current = sum( + autoscaling_contexts[dep_id].current_num_replicas + for dep_id in autoscaling_contexts + ) + total_target = sum(decisions.values()) + + if total_target > total_current: + scaling_status_raw = AutoscalingStatus.UPSCALE + elif total_target < total_current: + scaling_status_raw = AutoscalingStatus.DOWNSCALE + else: + scaling_status_raw = AutoscalingStatus.STABLE + + scaling_status = AutoscalingStatus.format_scaling_status(scaling_status_raw) + + errors: List[str] = [] + + policy_name_str = f"{self._policy.__module__}.{self._policy.__name__}" + + return ApplicationSnapshot( + timestamp_str=time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), + app=self._app_name, + num_deployments=len(decisions), + total_current_replicas=total_current, + total_target_replicas=total_target, + scaling_status=scaling_status, + policy_name=policy_name_str, + errors=errors, + ) + def register_deployment( self, deployment_id: DeploymentID, @@ -979,6 +1020,11 @@ def get_decision_num_replicas( if not _skip_bound_check else num_replicas ) + self._cached_application_snapshot = self._create_application_snapshot( + autoscaling_contexts=autoscaling_contexts, + decisions=results, + ) + return results else: # Using deployment-level policy @@ -1061,6 +1107,10 @@ def drop_stale_handle_metrics(self, alive_serve_actor_ids: Set[str]): for dep_state in self._deployment_autoscaling_states.values(): dep_state.drop_stale_handle_metrics(alive_serve_actor_ids) + def get_application_snapshot(self) -> Optional[ApplicationSnapshot]: + """Return the cached application snapshot if available.""" + return self._cached_application_snapshot + class AutoscalingStateManager: """Manages all things autoscaling related. @@ -1245,3 +1295,12 @@ def get_deployment_snapshot( return None dep_state = app_state._deployment_autoscaling_states.get(deployment_id) return dep_state.get_deployment_snapshot() if dep_state else None + + def get_application_snapshot( + self, app_name: ApplicationName + ) -> Optional[ApplicationSnapshot]: + """Get the cached application snapshot for an app with app-level policy.""" + app_state = self._app_autoscaling_states.get(app_name) + if not app_state or not app_state.has_policy(): + return None + return app_state.get_application_snapshot() diff --git a/python/ray/serve/_private/common.py b/python/ray/serve/_private/common.py index 7e2f64d6bd61..ebc9991ee448 100644 --- a/python/ray/serve/_private/common.py +++ b/python/ray/serve/_private/common.py @@ -882,6 +882,35 @@ def is_scaling_equivalent(self, other: "DeploymentSnapshot") -> bool: ) +class ApplicationSnapshot(BaseModel): + """Application-level autoscaling snapshot for observability.""" + + snapshot_type: str = "application" + timestamp_str: str + app: str + num_deployments: int + total_current_replicas: int + total_target_replicas: int + scaling_status: str + policy_name: str + errors: List[str] + + def is_scaling_equivalent(self, other: "ApplicationSnapshot") -> bool: + """Return True if scaling-related fields are equal. + + Used for autoscaling snapshot log deduplication. Compares only: + app, total_target_replicas, scaling_status, num_deployments + """ + if not isinstance(other, ApplicationSnapshot): + return False + return ( + self.app == other.app + and self.total_target_replicas == other.total_target_replicas + and self.scaling_status == other.scaling_status + and self.num_deployments == other.num_deployments + ) + + RUNNING_REQUESTS_KEY = "running_requests" ONGOING_REQUESTS_KEY = "ongoing_requests" QUEUED_REQUESTS_KEY = "queued_requests" diff --git a/python/ray/serve/_private/controller.py b/python/ray/serve/_private/controller.py index 6396e0115a12..83a1bcda9fd1 100644 --- a/python/ray/serve/_private/controller.py +++ b/python/ray/serve/_private/controller.py @@ -21,6 +21,7 @@ from ray.serve._private.application_state import ApplicationStateManager, StatusOverview from ray.serve._private.autoscaling_state import AutoscalingStateManager from ray.serve._private.common import ( + ApplicationSnapshot, DeploymentID, DeploymentSnapshot, HandleMetricReport, @@ -244,10 +245,12 @@ async def __init__( # Caches for autoscaling observability self._last_autoscaling_snapshots: Dict[DeploymentID, DeploymentSnapshot] = {} + self._last_application_snapshots: Dict[str, ApplicationSnapshot] = {} self._autoscaling_enabled_deployments_cache: List[ Tuple[str, str, DeploymentDetails, Any] ] = [] - self._refresh_autoscaling_deployments_cache() + self._autoscaling_enabled_apps_cache: List[str] = [] + self._refresh_autoscaling_cache() def reconfigure_global_logging_config(self, global_logging_config: LoggingConfig): if ( @@ -429,26 +432,40 @@ def _update_proxy_nodes(self): new_proxy_nodes.add(self._controller_node_id) self._proxy_nodes = new_proxy_nodes - def _refresh_autoscaling_deployments_cache(self) -> None: + def _refresh_autoscaling_cache(self) -> None: result = [] active_dep_ids = set() + apps_with_app_level_policy = [] + for app_name in self.application_state_manager.list_app_names(): deployment_details = self.application_state_manager.list_deployment_details( app_name ) + if self.autoscaling_state_manager._application_has_policy(app_name): + apps_with_app_level_policy.append(app_name) + for dep_name, details in deployment_details.items(): active_dep_ids.add(DeploymentID(name=dep_name, app_name=app_name)) autoscaling_config = details.deployment_config.autoscaling_config if autoscaling_config: result.append((app_name, dep_name, details, autoscaling_config)) self._autoscaling_enabled_deployments_cache = result + + self._autoscaling_enabled_apps_cache = apps_with_app_level_policy + self._last_autoscaling_snapshots = { k: v for k, v in self._last_autoscaling_snapshots.items() if k in active_dep_ids } - def _emit_deployment_autoscaling_snapshots(self) -> None: + self._last_application_snapshots = { + k: v + for k, v in self._last_application_snapshots.items() + if k in apps_with_app_level_policy + } + + def _emit_autoscaling_snapshots(self) -> None: """Emit structured autoscaling snapshot logs in a single batch per loop.""" if self._autoscaling_logger is None: return @@ -475,6 +492,20 @@ def _emit_deployment_autoscaling_snapshots(self) -> None: snapshots_to_log.append(deployment_snapshot.dict(exclude_none=True)) self._last_autoscaling_snapshots[dep_id] = deployment_snapshot + for app_name in self._autoscaling_enabled_apps_cache: + app_snapshot = self.autoscaling_state_manager.get_application_snapshot( + app_name + ) + if app_snapshot is None: + continue + + last_app = self._last_application_snapshots.get(app_name) + if last_app is not None and last_app.is_scaling_equivalent(app_snapshot): + continue + + snapshots_to_log.append(app_snapshot.dict(exclude_none=True)) + self._last_application_snapshots[app_name] = app_snapshot + if snapshots_to_log: # Single write per control-loop iteration self._autoscaling_logger.info({"snapshots": snapshots_to_log}) @@ -563,16 +594,16 @@ async def run_control_loop_step( asm_update_start_time = time.time() any_target_state_changed = self.application_state_manager.update() if any_recovering or any_target_state_changed: - self._refresh_autoscaling_deployments_cache() + self._refresh_autoscaling_cache() self.asm_update_duration_gauge_s.set(time.time() - asm_update_start_time) except Exception: logger.exception("Exception updating application state.") try: - # Emit one autoscaling snapshot per deployment per loop using existing state. - self._emit_deployment_autoscaling_snapshots() + # Emit one autoscaling snapshot per loop using existing state. + self._emit_autoscaling_snapshots() except Exception: - logger.exception("Exception emitting deployment autoscaling snapshots.") + logger.exception("Exception emitting autoscaling snapshots.") # Update the proxy nodes set before updating the proxy states, # so they are more consistent. node_update_start_time = time.time() diff --git a/python/ray/serve/tests/test_config_files/app_level_autoscaling.py b/python/ray/serve/tests/test_config_files/app_level_autoscaling.py new file mode 100644 index 000000000000..90909d6df8aa --- /dev/null +++ b/python/ray/serve/tests/test_config_files/app_level_autoscaling.py @@ -0,0 +1,32 @@ +import time + +from ray import serve + + +@serve.deployment +class Preprocessor: + def __call__(self, input_data: str) -> str: + time.sleep(0.01) + return f"preprocessed_{input_data}" + + +@serve.deployment +class Model: + def __call__(self, preprocessed_data: str) -> str: + time.sleep(0.02) + return f"result_{preprocessed_data}" + + +@serve.deployment +class Driver: + def __init__(self, preprocessor, model): + self._preprocessor = preprocessor + self._model = model + + async def __call__(self, input_data: str) -> str: + preprocessed = await self._preprocessor.remote(input_data) + result = await self._model.remote(preprocessed) + return result + + +app = Driver.bind(Preprocessor.bind(), Model.bind()) diff --git a/python/ray/serve/tests/test_controller.py b/python/ray/serve/tests/test_controller.py index 91552e0c5b27..f9ff0dae3804 100644 --- a/python/ray/serve/tests/test_controller.py +++ b/python/ray/serve/tests/test_controller.py @@ -3,7 +3,7 @@ import json import os import time -from typing import Any, Dict, List, Optional, Sequence +from typing import Any, Dict, List, Optional, Sequence, Tuple import pytest @@ -19,6 +19,7 @@ from ray.serve._private.deployment_info import DeploymentInfo from ray.serve._private.logging_utils import get_serve_logs_dir from ray.serve.autoscaling_policy import default_autoscaling_policy +from ray.serve.config import AutoscalingContext from ray.serve.context import _get_global_client from ray.serve.generated.serve_pb2 import DeploymentRoute from ray.serve.schema import ApplicationStatus, ServeDeploySchema @@ -328,7 +329,8 @@ def _parse_snapshot_payload(message: Any) -> Optional[Dict[str, Any]]: def _parse_batched_snapshots_from_log( log_paths: Sequence[str], - target_deployment_name: Optional[str] = None, + snapshot_type: str = "deployment", + target_name: Optional[str] = None, ) -> List[Dict[str, Any]]: """Parse autoscaling snapshots from batched log format. @@ -337,7 +339,9 @@ def _parse_batched_snapshots_from_log( Args: log_paths: List of log file paths to parse. - target_deployment_name: If provided, only return snapshots for this deployment. + snapshot_type: "deployment" or "application" + target_name: If provided, filter by deployment name (for deployment type) + or app name (for application type). Returns: List of individual snapshot dicts, sorted by timestamp. @@ -369,24 +373,40 @@ def _parse_batched_snapshots_from_log( for snap in payload.get("snapshots", []): if not isinstance(snap, dict): continue - if snap.get("snapshot_type") != "deployment": + if snap.get("snapshot_type") != snapshot_type: continue - if ( - target_deployment_name is None - or snap.get("deployment") == target_deployment_name - ): - snaps.append(snap) - # Handle legacy single snapshot format (for backward compatibility) - elif payload.get("snapshot_type") == "deployment": - if ( - target_deployment_name is None - or payload.get("deployment") == target_deployment_name - ): - snaps.append(payload) + + # Filter by name based on snapshot type + if target_name is not None: + if snapshot_type == "deployment": + if snap.get("deployment") != target_name: + continue + elif snapshot_type == "application": + if snap.get("app") != target_name: + continue + + snaps.append(snap) return sorted(snaps, key=lambda s: s.get("timestamp_str", "")) +# Application-level autoscaling policy for testing (must be importable) +def simple_app_policy_for_test( + contexts: Dict[DeploymentID, AutoscalingContext], +) -> Tuple[Dict[DeploymentID, int], Dict[DeploymentID, Dict[str, Any]]]: + """Simple app-level policy that maintains current replicas.""" + decisions = {} + policy_state = {} + + for deployment_id, ctx in contexts.items(): + # Maintain current target replicas + decisions[deployment_id] = ctx.target_num_replicas + # Empty state for each deployment + policy_state[deployment_id] = {} + + return decisions, policy_state + + def test_autoscaling_snapshot_log_emitted_and_well_formed(serve_instance): """Validate controller emits well-formed autoscaling snapshot structured logs. @@ -420,7 +440,7 @@ def get_snapshots(): log_paths = glob.glob( os.path.join(serve_logs_dir, "autoscaling_snapshot_*.log") ) - return _parse_batched_snapshots_from_log(log_paths, DEPLOY_NAME) + return _parse_batched_snapshots_from_log(log_paths, "deployment", DEPLOY_NAME) def wait_for_replicas(current, timeout=10): """Wait for exact current replica count.""" @@ -477,7 +497,6 @@ def wait_for_replicas(current, timeout=10): assert req.result() == "ok" -# Test that no autoscaling snapshot logs are emitted for deployments without autoscaling_config def test_autoscaling_snapshot_not_emitted_without_config(serve_instance): """Ensure no deployment-type autoscaling snapshot logs are emitted without autoscaling_config.""" @@ -497,7 +516,9 @@ def app(): candidate_paths ), f"No autoscaling snapshot logs found; checked {serve_logs_dir}" - found = _parse_batched_snapshots_from_log(candidate_paths, DEPLOY_NAME) + found = _parse_batched_snapshots_from_log( + candidate_paths, "deployment", DEPLOY_NAME + ) assert not found, ( f"Found deployment-type autoscaling snapshot logs for deployment {DEPLOY_NAME} " @@ -534,7 +555,7 @@ def get_snapshots(): log_paths = glob.glob( os.path.join(serve_logs_dir, "autoscaling_snapshot_*.log") ) - return _parse_batched_snapshots_from_log(log_paths, DEPLOY_NAME) + return _parse_batched_snapshots_from_log(log_paths, "deployment", DEPLOY_NAME) # Wait until the first stable snapshot shows up def has_initial_snapshot(): @@ -546,14 +567,14 @@ def has_initial_snapshot(): controller = _get_global_client()._controller # ensure deployment is in autoscaling cache - ray.get(controller._refresh_autoscaling_deployments_cache.remote()) + ray.get(controller._refresh_autoscaling_cache.remote()) # Count current snapshots initial_count = len(get_snapshots()) # Force multiple emits for _ in range(5): - ray.get(controller._emit_deployment_autoscaling_snapshots.remote()) + ray.get(controller._emit_autoscaling_snapshots.remote()) final_count = len(get_snapshots()) @@ -676,6 +697,102 @@ def has_full_batch(): ) +def test_application_autoscaling_snapshot_log_emitted_and_well_formed(serve_instance): + """Validate controller emits well-formed application-level autoscaling snapshot logs. + + Tests app-level autoscaling policy with application snapshot. + Uses the same pattern as documented in Ray Serve application-level autoscaling. + """ + + APP_NAME = f"app_snap_{int(time.time())}" + + config = { + "applications": [ + { + "name": APP_NAME, + "route_prefix": f"/{APP_NAME}", + "import_path": "ray.serve.tests.test_config_files.app_level_autoscaling:app", + "autoscaling_policy": { + "policy_function": "ray.serve.tests.test_controller:simple_app_policy_for_test", + }, + "deployments": [ + { + "name": "Preprocessor", + "autoscaling_config": { + "min_replicas": 1, + "max_replicas": 2, + "initial_replicas": 1, + "metrics_interval_s": 0.2, + "look_back_period_s": 0.5, + }, + }, + { + "name": "Model", + "autoscaling_config": { + "min_replicas": 1, + "max_replicas": 2, + "initial_replicas": 1, + "metrics_interval_s": 0.2, + "look_back_period_s": 0.5, + }, + }, + ], + } + ] + } + + controller = _get_global_client()._controller + ray.get(controller.apply_config.remote(config=ServeDeploySchema.parse_obj(config))) + + # Wait for app to be running + def app_is_running(): + status = serve.status().applications.get(APP_NAME) + return status is not None and status.status == ApplicationStatus.RUNNING + + wait_for_condition(app_is_running, timeout=30) + + serve_logs_dir = get_serve_logs_dir() + + def get_app_snapshots(): + """Read all application snapshots from batched log format.""" + log_paths = glob.glob( + os.path.join(serve_logs_dir, "autoscaling_snapshot_*.log") + ) + return _parse_batched_snapshots_from_log(log_paths, "application", APP_NAME) + + # Wait for application snapshot to appear + wait_for_condition(lambda: len(get_app_snapshots()) > 0, timeout=15) + + all_snaps = get_app_snapshots() + assert len(all_snaps) > 0, "Expected application snapshots" + + snap = all_snaps[0] + + # Validate required fields + for key in [ + "snapshot_type", + "timestamp_str", + "app", + "num_deployments", + "total_current_replicas", + "total_target_replicas", + "scaling_status", + "policy_name", + "errors", + ]: + assert key in snap, f"Missing required key: {key}" + + # Validate field values + assert snap["snapshot_type"] == "application" + assert snap["app"] == APP_NAME + # We have 2 deployments with autoscaling: Preprocessor and Model + assert snap["num_deployments"] == 2 + assert isinstance(snap["total_current_replicas"], int) + assert isinstance(snap["total_target_replicas"], int) + assert snap["scaling_status"] in ["scaling up", "scaling down", "stable"] + assert "simple_app_policy_for_test" in snap["policy_name"] + + if __name__ == "__main__": import sys