diff --git a/python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py b/python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py index 4e6648b1421d..06cd1b5fe0f9 100644 --- a/python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py +++ b/python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py @@ -4,6 +4,9 @@ from typing import TYPE_CHECKING, Dict from .backpressure_policy import BackpressurePolicy +from .downstream_capacity_backpressure_policy import ( + get_available_object_store_budget_fraction, +) from ray._private.ray_constants import env_float from ray.data._internal.execution.operators.map_operator import MapOperator from ray.data._internal.execution.operators.task_pool_map_operator import ( @@ -160,8 +163,8 @@ def can_add_input(self, op: "PhysicalOperator") -> bool: # For this Op, if the objectstore budget (available) to total # ratio is above threshold, skip dynamic output queue size backpressure. - available_budget_fraction = ( - self._resource_manager.get_available_object_store_budget_fraction(op) + available_budget_fraction = get_available_object_store_budget_fraction( + self._resource_manager, op, consider_downstream_ineligible_ops=True ) if ( available_budget_fraction is not None diff --git a/python/ray/data/_internal/execution/backpressure_policy/downstream_capacity_backpressure_policy.py b/python/ray/data/_internal/execution/backpressure_policy/downstream_capacity_backpressure_policy.py index a0d8127a2a6a..23e547aff6ee 100644 --- a/python/ray/data/_internal/execution/backpressure_policy/downstream_capacity_backpressure_policy.py +++ b/python/ray/data/_internal/execution/backpressure_policy/downstream_capacity_backpressure_policy.py @@ -15,6 +15,66 @@ logger = logging.getLogger(__name__) +def get_available_object_store_budget_fraction( + resource_manager: "ResourceManager", + op: "PhysicalOperator", + consider_downstream_ineligible_ops: bool, +) -> Optional[float]: + """Get available object store memory budget fraction for the operator. + + Args: + resource_manager: The resource manager to use. + op: The operator to get the budget fraction for. + consider_downstream_ineligible_ops: If True, include downstream ineligible + ops in the calculation. If False, only consider this op's usage/budget. + + Returns: + The available budget fraction, or None if not available. + """ + op_usage = resource_manager.get_op_usage(op) + op_budget = resource_manager.get_budget(op) + if op_usage is None or op_budget is None: + return None + + if consider_downstream_ineligible_ops: + total_usage = resource_manager.get_op_usage_object_store_with_downstream(op) + else: + total_usage = op_usage.object_store_memory + + total_budget = op_budget.object_store_memory + total_mem = total_usage + total_budget + if total_mem == 0: + return None + + return total_budget / total_mem + + +def get_utilized_object_store_budget_fraction( + resource_manager: "ResourceManager", + op: "PhysicalOperator", + consider_downstream_ineligible_ops: bool, +) -> Optional[float]: + """Get utilized object store memory budget fraction for the operator. + + Args: + resource_manager: The resource manager to use. + op: The operator to get the utilized fraction for. + consider_downstream_ineligible_ops: If True, include downstream ineligible + ops in the calculation. If False, only consider this op's usage/budget. + + Returns: + The utilized budget fraction, or None if not available. + """ + available_fraction = get_available_object_store_budget_fraction( + resource_manager, + op, + consider_downstream_ineligible_ops=consider_downstream_ineligible_ops, + ) + if available_fraction is None: + return None + return 1 - available_fraction + + class DownstreamCapacityBackpressurePolicy(BackpressurePolicy): """Backpressure policy based on downstream processing capacity. @@ -124,8 +184,8 @@ def _should_apply_backpressure(self, op: "PhysicalOperator") -> bool: if self._should_skip_backpressure(op): return False - utilized_budget_fraction = ( - self._resource_manager.get_utilized_object_store_budget_fraction(op) + utilized_budget_fraction = get_utilized_object_store_budget_fraction( + self._resource_manager, op, consider_downstream_ineligible_ops=True ) if ( utilized_budget_fraction is not None diff --git a/python/ray/data/_internal/execution/resource_manager.py b/python/ray/data/_internal/execution/resource_manager.py index ba855ea1349d..1f4e8cd20f3e 100644 --- a/python/ray/data/_internal/execution/resource_manager.py +++ b/python/ray/data/_internal/execution/resource_manager.py @@ -437,10 +437,8 @@ def get_op_outputs_object_store_usage_with_downstream( # Outputs usage of the current operator. op_outputs_usage = self._mem_op_outputs[op] # Also account the downstream ineligible operators' memory usage. - op_outputs_usage += sum( - self.get_op_usage(next_op).object_store_memory - for next_op in self._get_downstream_ineligible_ops(op) - ) + for next_op in self._get_downstream_ineligible_ops(op): + op_outputs_usage += int(self.get_op_usage(next_op).object_store_memory) return op_outputs_usage def is_materializing_op(self, op: PhysicalOperator) -> bool: @@ -454,27 +452,12 @@ def has_materializing_downstream_op(self, op: PhysicalOperator) -> bool: for next_op in op.output_dependencies ) - def get_available_object_store_budget_fraction( - self, op: PhysicalOperator - ) -> Optional[float]: - """Get available object store memory budget fraction for the operator. Returns None if not available.""" - op_usage = self.get_op_usage(op) - op_budget = self.get_budget(op) - if op_usage is None or op_budget is None: - return None - total_mem = op_usage.object_store_memory + op_budget.object_store_memory - if total_mem == 0: - return None - return op_budget.object_store_memory / total_mem - - def get_utilized_object_store_budget_fraction( - self, op: PhysicalOperator - ) -> Optional[float]: - """Get utilized object store memory budget fraction for the operator. Returns None if not available.""" - available_fraction = self.get_available_object_store_budget_fraction(op) - if available_fraction is None: - return None - return 1 - available_fraction + def get_op_usage_object_store_with_downstream(self, op: PhysicalOperator) -> int: + """Get total object store usage of the given operator and its downstream ineligible ops.""" + total_usage = int(self.get_op_usage(op).object_store_memory) + for next_op in self._get_downstream_ineligible_ops(op): + total_usage += int(self.get_op_usage(next_op).object_store_memory) + return total_usage def _get_first_pending_shuffle_op(topology: "Topology") -> int: diff --git a/python/ray/data/tests/test_backpressure_policies.py b/python/ray/data/tests/test_backpressure_policies.py index 10fcabe4442e..15741d90b3cb 100644 --- a/python/ray/data/tests/test_backpressure_policies.py +++ b/python/ray/data/tests/test_backpressure_policies.py @@ -3,7 +3,7 @@ import time import unittest from collections import defaultdict -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import pytest @@ -244,7 +244,13 @@ def test_can_add_input_with_materializing_downstream_op(self): map_op.metrics.num_tasks_running = 5 self.assertFalse(policy.can_add_input(map_op)) # 5 >= 5 - def test_can_add_input_with_object_store_memory_usage_ratio_above_threshold(self): + @patch( + "ray.data._internal.execution.backpressure_policy." + "concurrency_cap_backpressure_policy.get_available_object_store_budget_fraction" + ) + def test_can_add_input_with_object_store_memory_usage_ratio_above_threshold( + self, mock_get_budget_fraction + ): """Test can_add_input when object store memory usage ratio is above threshold.""" input_op = InputDataBuffer(DataContext.get_current(), input_data=[MagicMock()]) map_op = TaskPoolMapOperator( @@ -264,9 +270,7 @@ def test_can_add_input_with_object_store_memory_usage_ratio_above_threshold(self ConcurrencyCapBackpressurePolicy.AVAILABLE_OBJECT_STORE_BUDGET_THRESHOLD ) # Set fraction above threshold to skip dynamic backpressure - mock_resource_manager.get_available_object_store_budget_fraction.return_value = ( - threshold + 0.05 - ) + mock_get_budget_fraction.return_value = threshold + 0.05 mock_resource_manager.is_op_eligible.return_value = True mock_resource_manager.has_materializing_downstream_op.return_value = False @@ -295,7 +299,13 @@ def test_can_add_input_with_object_store_memory_usage_ratio_above_threshold(self self.assertEqual(policy._q_level_nbytes[map_op], initial_level) self.assertEqual(policy._q_level_dev[map_op], initial_dev) - def test_can_add_input_with_object_store_memory_usage_ratio_below_threshold(self): + @patch( + "ray.data._internal.execution.backpressure_policy." + "concurrency_cap_backpressure_policy.get_available_object_store_budget_fraction" + ) + def test_can_add_input_with_object_store_memory_usage_ratio_below_threshold( + self, mock_get_budget_fraction + ): """Test can_add_input when object store memory usage ratio is below threshold.""" input_op = InputDataBuffer(DataContext.get_current(), input_data=[MagicMock()]) map_op = TaskPoolMapOperator( @@ -315,9 +325,7 @@ def test_can_add_input_with_object_store_memory_usage_ratio_below_threshold(self ConcurrencyCapBackpressurePolicy.AVAILABLE_OBJECT_STORE_BUDGET_THRESHOLD ) # Set fraction below threshold to apply dynamic backpressure - mock_resource_manager.get_available_object_store_budget_fraction.return_value = ( - threshold - 0.05 - ) + mock_get_budget_fraction.return_value = threshold - 0.05 mock_resource_manager.is_op_eligible.return_value = True mock_resource_manager.has_materializing_downstream_op.return_value = False @@ -354,7 +362,11 @@ def test_can_add_input_with_object_store_memory_usage_ratio_below_threshold(self # Dev should also be updated self.assertNotEqual(policy._q_level_dev[map_op], initial_dev) - def test_can_add_input_effective_cap_calculation(self): + @patch( + "ray.data._internal.execution.backpressure_policy." + "concurrency_cap_backpressure_policy.get_available_object_store_budget_fraction" + ) + def test_can_add_input_effective_cap_calculation(self, mock_get_budget_fraction): """Test that effective cap calculation works correctly with different queue sizes.""" input_op = InputDataBuffer(DataContext.get_current(), input_data=[MagicMock()]) map_op = TaskPoolMapOperator( @@ -372,9 +384,7 @@ def test_can_add_input_effective_cap_calculation(self): ConcurrencyCapBackpressurePolicy.AVAILABLE_OBJECT_STORE_BUDGET_THRESHOLD ) # Set fraction below threshold to apply dynamic backpressure - mock_resource_manager.get_available_object_store_budget_fraction.return_value = ( - threshold - 0.05 - ) + mock_get_budget_fraction.return_value = threshold - 0.05 mock_resource_manager.is_op_eligible.return_value = True mock_resource_manager.has_materializing_downstream_op.return_value = False diff --git a/python/ray/data/tests/test_downstream_capacity_backpressure_policy.py b/python/ray/data/tests/test_downstream_capacity_backpressure_policy.py index fb59127ae170..7c84f0ed68f0 100644 --- a/python/ray/data/tests/test_downstream_capacity_backpressure_policy.py +++ b/python/ray/data/tests/test_downstream_capacity_backpressure_policy.py @@ -1,4 +1,4 @@ -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import pytest @@ -23,6 +23,17 @@ class TestDownstreamCapacityBackpressurePolicy: + @pytest.fixture(autouse=True) + def setup_budget_fraction_mock(self): + """Fixture to patch get_utilized_object_store_budget_fraction for all tests.""" + with patch( + "ray.data._internal.execution.backpressure_policy." + "downstream_capacity_backpressure_policy." + "get_utilized_object_store_budget_fraction" + ) as mock_func: + self._mock_get_utilized_budget_fraction = mock_func + yield + def _mock_operator( self, op_class: type = PhysicalOperator, @@ -171,13 +182,13 @@ def _mock_resource_manager( return rm def _set_utilized_budget_fraction(self, rm, fraction): - """Helper to set utilized budget fraction on resource manager. + """Helper to set utilized budget fraction. The policy checks: utilized_fraction <= OBJECT_STORE_BUDGET_UTIL_THRESHOLD With threshold=0.9, skip backpressure when utilized_fraction <= 0.9. To trigger backpressure, set utilized_fraction > 0.9. """ - rm.get_utilized_object_store_budget_fraction.return_value = fraction + self._mock_get_utilized_budget_fraction.return_value = fraction return fraction def _set_queue_ratio(self, op, op_state, rm, queue_size, downstream_capacity): diff --git a/release/release_tests.yaml b/release/release_tests.yaml index 0e1192d54f0c..3a50486e9aa6 100644 --- a/release/release_tests.yaml +++ b/release/release_tests.yaml @@ -1653,7 +1653,6 @@ runtime_env: # Enable verbose stats for resource manager - RAY_DATA_DEBUG_RESOURCE_MANAGER=1 - - RAY_DATA_ENABLE_DYNAMIC_OUTPUT_QUEUE_SIZE_BACKPRESSURE=1 # 'type: gpu' means: use the 'ray-ml' image. type: gpu