-
Notifications
You must be signed in to change notification settings - Fork 7.2k
[Serve][3/N] Add application-level autoscaling snapshot #59995
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
d85a1a1
3c010b7
d48b54d
bbf35aa
625e755
03fce47
cadb873
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,6 +6,7 @@ | |
| from ray.serve._private.common import ( | ||
| RUNNING_REQUESTS_KEY, | ||
| ApplicationName, | ||
| ApplicationSnapshot, | ||
| AutoscalingSnapshotError, | ||
| AutoscalingStatus, | ||
| DeploymentID, | ||
|
|
@@ -821,6 +822,7 @@ def __init__( | |
| # user defined policy returns a dictionary of state that is persisted between autoscaling decisions | ||
| # content of the dictionary is determined by the user defined policy but is keyed by deployment id | ||
| self._policy_state: Optional[Dict[DeploymentID, Dict]] = None | ||
| self._cached_application_snapshot: Optional[ApplicationSnapshot] = None | ||
|
|
||
| @property | ||
| def deployments(self): | ||
|
|
@@ -852,6 +854,45 @@ def register( | |
| def has_policy(self) -> bool: | ||
| return self._policy is not None | ||
|
|
||
| def _create_application_snapshot( | ||
| self, | ||
| *, | ||
| autoscaling_contexts: Dict[DeploymentID, AutoscalingContext], | ||
| decisions: Dict[DeploymentID, int], | ||
| ) -> ApplicationSnapshot: | ||
| """Create a fully-populated ApplicationSnapshot using data from | ||
| app-level autoscaling decision. | ||
| """ | ||
| total_current = sum( | ||
| autoscaling_contexts[dep_id].current_num_replicas | ||
| for dep_id in autoscaling_contexts | ||
| ) | ||
nadongjun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| total_target = sum(decisions.values()) | ||
|
|
||
| if total_target > total_current: | ||
| scaling_status_raw = AutoscalingStatus.UPSCALE | ||
| elif total_target < total_current: | ||
| scaling_status_raw = AutoscalingStatus.DOWNSCALE | ||
| else: | ||
| scaling_status_raw = AutoscalingStatus.STABLE | ||
|
|
||
| scaling_status = AutoscalingStatus.format_scaling_status(scaling_status_raw) | ||
|
|
||
| errors: List[str] = [] | ||
|
|
||
| policy_name_str = f"{self._policy.__module__}.{self._policy.__name__}" | ||
|
|
||
| return ApplicationSnapshot( | ||
| timestamp_str=time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), | ||
| app=self._app_name, | ||
| num_deployments=len(decisions), | ||
| total_current_replicas=total_current, | ||
| total_target_replicas=total_target, | ||
| scaling_status=scaling_status, | ||
| policy_name=policy_name_str, | ||
| errors=errors, | ||
| ) | ||
|
|
||
| def register_deployment( | ||
| self, | ||
| deployment_id: DeploymentID, | ||
|
|
@@ -979,6 +1020,11 @@ def get_decision_num_replicas( | |
| if not _skip_bound_check | ||
| else num_replicas | ||
| ) | ||
| self._cached_application_snapshot = self._create_application_snapshot( | ||
| autoscaling_contexts=autoscaling_contexts, | ||
| decisions=results, | ||
| ) | ||
|
|
||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing validation for incomplete app-level policy decisionsHigh Severity The validation for app-level autoscaling policy decisions (lines 991-1003) checks that returned deployment IDs are valid but fails to verify that decisions include all deployments. The method docstring states it should "Decide scaling for all deployments" and the deployment-level path (lines 1031-1039) returns decisions for all deployments by iterating over |
||
| return results | ||
| else: | ||
| # Using deployment-level policy | ||
|
|
@@ -1061,6 +1107,10 @@ def drop_stale_handle_metrics(self, alive_serve_actor_ids: Set[str]): | |
| for dep_state in self._deployment_autoscaling_states.values(): | ||
| dep_state.drop_stale_handle_metrics(alive_serve_actor_ids) | ||
|
|
||
| def get_application_snapshot(self) -> Optional[ApplicationSnapshot]: | ||
| """Return the cached application snapshot if available.""" | ||
| return self._cached_application_snapshot | ||
|
|
||
|
|
||
| class AutoscalingStateManager: | ||
| """Manages all things autoscaling related. | ||
|
|
@@ -1245,3 +1295,12 @@ def get_deployment_snapshot( | |
| return None | ||
| dep_state = app_state._deployment_autoscaling_states.get(deployment_id) | ||
| return dep_state.get_deployment_snapshot() if dep_state else None | ||
|
|
||
| def get_application_snapshot( | ||
| self, app_name: ApplicationName | ||
| ) -> Optional[ApplicationSnapshot]: | ||
| """Get the cached application snapshot for an app with app-level policy.""" | ||
| app_state = self._app_autoscaling_states.get(app_name) | ||
| if not app_state or not app_state.has_policy(): | ||
| return None | ||
| return app_state.get_application_snapshot() | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,32 @@ | ||
| import time | ||
|
|
||
| from ray import serve | ||
|
|
||
|
|
||
| @serve.deployment | ||
| class Preprocessor: | ||
| def __call__(self, input_data: str) -> str: | ||
| time.sleep(0.01) | ||
| return f"preprocessed_{input_data}" | ||
|
|
||
|
|
||
| @serve.deployment | ||
| class Model: | ||
| def __call__(self, preprocessed_data: str) -> str: | ||
| time.sleep(0.02) | ||
| return f"result_{preprocessed_data}" | ||
|
|
||
|
|
||
| @serve.deployment | ||
| class Driver: | ||
| def __init__(self, preprocessor, model): | ||
| self._preprocessor = preprocessor | ||
| self._model = model | ||
|
|
||
| async def __call__(self, input_data: str) -> str: | ||
| preprocessed = await self._preprocessor.remote(input_data) | ||
| result = await self._model.remote(preprocessed) | ||
| return result | ||
|
|
||
|
|
||
| app = Driver.bind(Preprocessor.bind(), Model.bind()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stale cached application snapshots not invalidated
Medium Severity
The cached application snapshot at
self._cached_application_snapshotis not cleared when the application policy is updated viaregister()or when deployments are added/removed viaregister_deployment()orderegister_deployment(). Sinceget_application_snapshot()returns this cached value directly and the controller's_emit_autoscaling_snapshots()calls it every control loop, stale snapshots with incorrectpolicy_name,num_deployments, or replica totals can be logged to autoscaling snapshot logs, misleading observability and debugging efforts.Additional Locations (2)
python/ray/serve/_private/autoscaling_state.py#L895-L927python/ray/serve/_private/autoscaling_state.py#L928-L935