Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions python/ray/serve/_private/autoscaling_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from ray.serve._private.common import (
RUNNING_REQUESTS_KEY,
ApplicationName,
ApplicationSnapshot,
AutoscalingSnapshotError,
AutoscalingStatus,
DeploymentID,
Expand Down Expand Up @@ -821,6 +822,7 @@ def __init__(
# user defined policy returns a dictionary of state that is persisted between autoscaling decisions
# content of the dictionary is determined by the user defined policy but is keyed by deployment id
self._policy_state: Optional[Dict[DeploymentID, Dict]] = None
self._cached_application_snapshot: Optional[ApplicationSnapshot] = None

@property
def deployments(self):
Expand Down Expand Up @@ -852,6 +854,45 @@ def register(
def has_policy(self) -> bool:
return self._policy is not None

def _create_application_snapshot(
self,
*,
autoscaling_contexts: Dict[DeploymentID, AutoscalingContext],
decisions: Dict[DeploymentID, int],
) -> ApplicationSnapshot:
"""Create a fully-populated ApplicationSnapshot using data from
app-level autoscaling decision.
"""
total_current = sum(
autoscaling_contexts[dep_id].current_num_replicas
for dep_id in autoscaling_contexts
)
total_target = sum(decisions.values())

if total_target > total_current:
scaling_status_raw = AutoscalingStatus.UPSCALE
elif total_target < total_current:
scaling_status_raw = AutoscalingStatus.DOWNSCALE
else:
scaling_status_raw = AutoscalingStatus.STABLE

scaling_status = AutoscalingStatus.format_scaling_status(scaling_status_raw)

errors: List[str] = []

policy_name_str = f"{self._policy.__module__}.{self._policy.__name__}"

return ApplicationSnapshot(
timestamp_str=time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
app=self._app_name,
num_deployments=len(decisions),
total_current_replicas=total_current,
total_target_replicas=total_target,
scaling_status=scaling_status,
policy_name=policy_name_str,
errors=errors,
)

def register_deployment(
self,
deployment_id: DeploymentID,
Expand Down Expand Up @@ -979,6 +1020,11 @@ def get_decision_num_replicas(
if not _skip_bound_check
else num_replicas
)
self._cached_application_snapshot = self._create_application_snapshot(
autoscaling_contexts=autoscaling_contexts,
decisions=results,
)

return results
else:
# Using deployment-level policy
Expand Down Expand Up @@ -1061,6 +1107,10 @@ def drop_stale_handle_metrics(self, alive_serve_actor_ids: Set[str]):
for dep_state in self._deployment_autoscaling_states.values():
dep_state.drop_stale_handle_metrics(alive_serve_actor_ids)

def get_application_snapshot(self) -> Optional[ApplicationSnapshot]:
"""Return the cached application snapshot if available."""
return self._cached_application_snapshot


class AutoscalingStateManager:
"""Manages all things autoscaling related.
Expand Down Expand Up @@ -1245,3 +1295,12 @@ def get_deployment_snapshot(
return None
dep_state = app_state._deployment_autoscaling_states.get(deployment_id)
return dep_state.get_deployment_snapshot() if dep_state else None

def get_application_snapshot(
self, app_name: ApplicationName
) -> Optional[ApplicationSnapshot]:
"""Get the cached application snapshot for an app with app-level policy."""
app_state = self._app_autoscaling_states.get(app_name)
if not app_state or not app_state.has_policy():
return None
return app_state.get_application_snapshot()
29 changes: 29 additions & 0 deletions python/ray/serve/_private/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,35 @@ def is_scaling_equivalent(self, other: "DeploymentSnapshot") -> bool:
)


class ApplicationSnapshot(BaseModel):
"""Application-level autoscaling snapshot for observability."""

snapshot_type: str = "application"
timestamp_str: str
app: str
num_deployments: int
total_current_replicas: int
total_target_replicas: int
scaling_status: str
policy_name: str
errors: List[str]

def is_scaling_equivalent(self, other: "ApplicationSnapshot") -> bool:
"""Return True if scaling-related fields are equal.

Used for autoscaling snapshot log deduplication. Compares only:
app, total_target_replicas, scaling_status, num_deployments
"""
if not isinstance(other, ApplicationSnapshot):
return False
return (
self.app == other.app
and self.total_target_replicas == other.total_target_replicas
and self.scaling_status == other.scaling_status
and self.num_deployments == other.num_deployments
)


RUNNING_REQUESTS_KEY = "running_requests"
ONGOING_REQUESTS_KEY = "ongoing_requests"
QUEUED_REQUESTS_KEY = "queued_requests"
Expand Down
45 changes: 38 additions & 7 deletions python/ray/serve/_private/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from ray.serve._private.application_state import ApplicationStateManager, StatusOverview
from ray.serve._private.autoscaling_state import AutoscalingStateManager
from ray.serve._private.common import (
ApplicationSnapshot,
DeploymentID,
DeploymentSnapshot,
HandleMetricReport,
Expand Down Expand Up @@ -244,10 +245,12 @@ async def __init__(

# Caches for autoscaling observability
self._last_autoscaling_snapshots: Dict[DeploymentID, DeploymentSnapshot] = {}
self._last_application_snapshots: Dict[str, ApplicationSnapshot] = {}
self._autoscaling_enabled_deployments_cache: List[
Tuple[str, str, DeploymentDetails, Any]
] = []
self._refresh_autoscaling_deployments_cache()
self._autoscaling_enabled_apps_cache: List[str] = []
self._refresh_autoscaling_cache()

def reconfigure_global_logging_config(self, global_logging_config: LoggingConfig):
if (
Expand Down Expand Up @@ -429,26 +432,40 @@ def _update_proxy_nodes(self):
new_proxy_nodes.add(self._controller_node_id)
self._proxy_nodes = new_proxy_nodes

def _refresh_autoscaling_deployments_cache(self) -> None:
def _refresh_autoscaling_cache(self) -> None:
result = []
active_dep_ids = set()
apps_with_app_level_policy = []

for app_name in self.application_state_manager.list_app_names():
deployment_details = self.application_state_manager.list_deployment_details(
app_name
)
if self.autoscaling_state_manager._application_has_policy(app_name):
apps_with_app_level_policy.append(app_name)

for dep_name, details in deployment_details.items():
active_dep_ids.add(DeploymentID(name=dep_name, app_name=app_name))
autoscaling_config = details.deployment_config.autoscaling_config
if autoscaling_config:
result.append((app_name, dep_name, details, autoscaling_config))
self._autoscaling_enabled_deployments_cache = result

self._autoscaling_enabled_apps_cache = apps_with_app_level_policy

self._last_autoscaling_snapshots = {
k: v
for k, v in self._last_autoscaling_snapshots.items()
if k in active_dep_ids
}

def _emit_deployment_autoscaling_snapshots(self) -> None:
self._last_application_snapshots = {
k: v
for k, v in self._last_application_snapshots.items()
if k in apps_with_app_level_policy
}

def _emit_autoscaling_snapshots(self) -> None:
"""Emit structured autoscaling snapshot logs in a single batch per loop."""
if self._autoscaling_logger is None:
return
Expand All @@ -475,6 +492,20 @@ def _emit_deployment_autoscaling_snapshots(self) -> None:
snapshots_to_log.append(deployment_snapshot.dict(exclude_none=True))
self._last_autoscaling_snapshots[dep_id] = deployment_snapshot

for app_name in self._autoscaling_enabled_apps_cache:
app_snapshot = self.autoscaling_state_manager.get_application_snapshot(
app_name
)
if app_snapshot is None:
continue

last_app = self._last_application_snapshots.get(app_name)
if last_app is not None and last_app.is_scaling_equivalent(app_snapshot):
continue

snapshots_to_log.append(app_snapshot.dict(exclude_none=True))
self._last_application_snapshots[app_name] = app_snapshot

if snapshots_to_log:
# Single write per control-loop iteration
self._autoscaling_logger.info({"snapshots": snapshots_to_log})
Expand Down Expand Up @@ -563,16 +594,16 @@ async def run_control_loop_step(
asm_update_start_time = time.time()
any_target_state_changed = self.application_state_manager.update()
if any_recovering or any_target_state_changed:
self._refresh_autoscaling_deployments_cache()
self._refresh_autoscaling_cache()
self.asm_update_duration_gauge_s.set(time.time() - asm_update_start_time)
except Exception:
logger.exception("Exception updating application state.")

try:
# Emit one autoscaling snapshot per deployment per loop using existing state.
self._emit_deployment_autoscaling_snapshots()
# Emit one autoscaling snapshot per loop using existing state.
self._emit_autoscaling_snapshots()
except Exception:
logger.exception("Exception emitting deployment autoscaling snapshots.")
logger.exception("Exception emitting autoscaling snapshots.")
# Update the proxy nodes set before updating the proxy states,
# so they are more consistent.
node_update_start_time = time.time()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import time

from ray import serve


@serve.deployment
class Preprocessor:
def __call__(self, input_data: str) -> str:
time.sleep(0.01)
return f"preprocessed_{input_data}"


@serve.deployment
class Model:
def __call__(self, preprocessed_data: str) -> str:
time.sleep(0.02)
return f"result_{preprocessed_data}"


@serve.deployment
class Driver:
def __init__(self, preprocessor, model):
self._preprocessor = preprocessor
self._model = model

async def __call__(self, input_data: str) -> str:
preprocessed = await self._preprocessor.remote(input_data)
result = await self._model.remote(preprocessed)
return result


app = Driver.bind(Preprocessor.bind(), Model.bind())
Loading