Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
from typing import TYPE_CHECKING, Dict

from .backpressure_policy import BackpressurePolicy
from .downstream_capacity_backpressure_policy import (
get_available_object_store_budget_fraction,
)
from ray._private.ray_constants import env_float
from ray.data._internal.execution.operators.map_operator import MapOperator
from ray.data._internal.execution.operators.task_pool_map_operator import (
Expand Down Expand Up @@ -160,8 +163,8 @@ def can_add_input(self, op: "PhysicalOperator") -> bool:

# For this Op, if the objectstore budget (available) to total
# ratio is above threshold, skip dynamic output queue size backpressure.
available_budget_fraction = (
self._resource_manager.get_available_object_store_budget_fraction(op)
available_budget_fraction = get_available_object_store_budget_fraction(
self._resource_manager, op, consider_downstream_ineligible_ops=True
)
if (
available_budget_fraction is not None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,66 @@
logger = logging.getLogger(__name__)


def get_available_object_store_budget_fraction(
resource_manager: "ResourceManager",
op: "PhysicalOperator",
consider_downstream_ineligible_ops: bool,
) -> Optional[float]:
"""Get available object store memory budget fraction for the operator.

Args:
resource_manager: The resource manager to use.
op: The operator to get the budget fraction for.
consider_downstream_ineligible_ops: If True, include downstream ineligible
ops in the calculation. If False, only consider this op's usage/budget.

Returns:
The available budget fraction, or None if not available.
"""
op_usage = resource_manager.get_op_usage(op)
op_budget = resource_manager.get_budget(op)
if op_usage is None or op_budget is None:
return None

if consider_downstream_ineligible_ops:
total_usage = resource_manager.get_op_usage_object_store_with_downstream(op)
else:
total_usage = op_usage.object_store_memory

total_budget = op_budget.object_store_memory
total_mem = total_usage + total_budget
if total_mem == 0:
return None

return total_budget / total_mem


def get_utilized_object_store_budget_fraction(
resource_manager: "ResourceManager",
op: "PhysicalOperator",
consider_downstream_ineligible_ops: bool,
) -> Optional[float]:
"""Get utilized object store memory budget fraction for the operator.

Args:
resource_manager: The resource manager to use.
op: The operator to get the utilized fraction for.
consider_downstream_ineligible_ops: If True, include downstream ineligible
ops in the calculation. If False, only consider this op's usage/budget.

Returns:
The utilized budget fraction, or None if not available.
"""
available_fraction = get_available_object_store_budget_fraction(
resource_manager,
op,
consider_downstream_ineligible_ops=consider_downstream_ineligible_ops,
)
if available_fraction is None:
return None
return 1 - available_fraction


class DownstreamCapacityBackpressurePolicy(BackpressurePolicy):
"""Backpressure policy based on downstream processing capacity.

Expand Down Expand Up @@ -124,8 +184,8 @@ def _should_apply_backpressure(self, op: "PhysicalOperator") -> bool:
if self._should_skip_backpressure(op):
return False

utilized_budget_fraction = (
self._resource_manager.get_utilized_object_store_budget_fraction(op)
utilized_budget_fraction = get_utilized_object_store_budget_fraction(
self._resource_manager, op, consider_downstream_ineligible_ops=True
)
if (
utilized_budget_fraction is not None
Expand Down
33 changes: 8 additions & 25 deletions python/ray/data/_internal/execution/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,10 +437,8 @@ def get_op_outputs_object_store_usage_with_downstream(
# Outputs usage of the current operator.
op_outputs_usage = self._mem_op_outputs[op]
# Also account the downstream ineligible operators' memory usage.
op_outputs_usage += sum(
self.get_op_usage(next_op).object_store_memory
for next_op in self._get_downstream_ineligible_ops(op)
)
for next_op in self._get_downstream_ineligible_ops(op):
op_outputs_usage += int(self.get_op_usage(next_op).object_store_memory)
return op_outputs_usage

def is_materializing_op(self, op: PhysicalOperator) -> bool:
Expand All @@ -454,27 +452,12 @@ def has_materializing_downstream_op(self, op: PhysicalOperator) -> bool:
for next_op in op.output_dependencies
)

def get_available_object_store_budget_fraction(
self, op: PhysicalOperator
) -> Optional[float]:
"""Get available object store memory budget fraction for the operator. Returns None if not available."""
op_usage = self.get_op_usage(op)
op_budget = self.get_budget(op)
if op_usage is None or op_budget is None:
return None
total_mem = op_usage.object_store_memory + op_budget.object_store_memory
if total_mem == 0:
return None
return op_budget.object_store_memory / total_mem

def get_utilized_object_store_budget_fraction(
self, op: PhysicalOperator
) -> Optional[float]:
"""Get utilized object store memory budget fraction for the operator. Returns None if not available."""
available_fraction = self.get_available_object_store_budget_fraction(op)
if available_fraction is None:
return None
return 1 - available_fraction
def get_op_usage_object_store_with_downstream(self, op: PhysicalOperator) -> int:
"""Get total object store usage of the given operator and its downstream ineligible ops."""
total_usage = int(self.get_op_usage(op).object_store_memory)
for next_op in self._get_downstream_ineligible_ops(op):
total_usage += int(self.get_op_usage(next_op).object_store_memory)
return total_usage
Comment on lines +455 to +460
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To improve maintainability and reduce code duplication, you can extract the logic for calculating the total object store usage of downstream ineligible operators into a private helper method. This logic is also used in get_op_outputs_object_store_usage_with_downstream.

After applying the suggestion below, you should also update get_op_outputs_object_store_usage_with_downstream to use _get_downstream_ineligible_ops_usage.

Suggested change
def get_op_usage_object_store_with_downstream(self, op: PhysicalOperator) -> int:
"""Get total object store usage of the given operator and its downstream ineligible ops."""
total_usage = int(self.get_op_usage(op).object_store_memory)
for next_op in self._get_downstream_ineligible_ops(op):
total_usage += int(self.get_op_usage(next_op).object_store_memory)
return total_usage
def _get_downstream_ineligible_ops_usage(self, op: PhysicalOperator) -> int:
"""Get the total object store memory usage of downstream ineligible operators."""
usage = 0
for next_op in self._get_downstream_ineligible_ops(op):
usage += int(self.get_op_usage(next_op).object_store_memory)
return usage
def get_op_usage_object_store_with_downstream(self, op: PhysicalOperator) -> int:
"""Get total object store usage of the given operator and its downstream ineligible ops."""
total_usage = int(self.get_op_usage(op).object_store_memory)
total_usage += self._get_downstream_ineligible_ops_usage(op)
return total_usage



def _get_first_pending_shuffle_op(topology: "Topology") -> int:
Expand Down
36 changes: 23 additions & 13 deletions python/ray/data/tests/test_backpressure_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import time
import unittest
from collections import defaultdict
from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch

import pytest

Expand Down Expand Up @@ -244,7 +244,13 @@ def test_can_add_input_with_materializing_downstream_op(self):
map_op.metrics.num_tasks_running = 5
self.assertFalse(policy.can_add_input(map_op)) # 5 >= 5

def test_can_add_input_with_object_store_memory_usage_ratio_above_threshold(self):
@patch(
"ray.data._internal.execution.backpressure_policy."
"concurrency_cap_backpressure_policy.get_available_object_store_budget_fraction"
)
def test_can_add_input_with_object_store_memory_usage_ratio_above_threshold(
self, mock_get_budget_fraction
):
"""Test can_add_input when object store memory usage ratio is above threshold."""
input_op = InputDataBuffer(DataContext.get_current(), input_data=[MagicMock()])
map_op = TaskPoolMapOperator(
Expand All @@ -264,9 +270,7 @@ def test_can_add_input_with_object_store_memory_usage_ratio_above_threshold(self
ConcurrencyCapBackpressurePolicy.AVAILABLE_OBJECT_STORE_BUDGET_THRESHOLD
)
# Set fraction above threshold to skip dynamic backpressure
mock_resource_manager.get_available_object_store_budget_fraction.return_value = (
threshold + 0.05
)
mock_get_budget_fraction.return_value = threshold + 0.05
mock_resource_manager.is_op_eligible.return_value = True
mock_resource_manager.has_materializing_downstream_op.return_value = False

Expand Down Expand Up @@ -295,7 +299,13 @@ def test_can_add_input_with_object_store_memory_usage_ratio_above_threshold(self
self.assertEqual(policy._q_level_nbytes[map_op], initial_level)
self.assertEqual(policy._q_level_dev[map_op], initial_dev)

def test_can_add_input_with_object_store_memory_usage_ratio_below_threshold(self):
@patch(
"ray.data._internal.execution.backpressure_policy."
"concurrency_cap_backpressure_policy.get_available_object_store_budget_fraction"
)
def test_can_add_input_with_object_store_memory_usage_ratio_below_threshold(
self, mock_get_budget_fraction
):
"""Test can_add_input when object store memory usage ratio is below threshold."""
input_op = InputDataBuffer(DataContext.get_current(), input_data=[MagicMock()])
map_op = TaskPoolMapOperator(
Expand All @@ -315,9 +325,7 @@ def test_can_add_input_with_object_store_memory_usage_ratio_below_threshold(self
ConcurrencyCapBackpressurePolicy.AVAILABLE_OBJECT_STORE_BUDGET_THRESHOLD
)
# Set fraction below threshold to apply dynamic backpressure
mock_resource_manager.get_available_object_store_budget_fraction.return_value = (
threshold - 0.05
)
mock_get_budget_fraction.return_value = threshold - 0.05
mock_resource_manager.is_op_eligible.return_value = True
mock_resource_manager.has_materializing_downstream_op.return_value = False

Expand Down Expand Up @@ -354,7 +362,11 @@ def test_can_add_input_with_object_store_memory_usage_ratio_below_threshold(self
# Dev should also be updated
self.assertNotEqual(policy._q_level_dev[map_op], initial_dev)

def test_can_add_input_effective_cap_calculation(self):
@patch(
"ray.data._internal.execution.backpressure_policy."
"concurrency_cap_backpressure_policy.get_available_object_store_budget_fraction"
)
def test_can_add_input_effective_cap_calculation(self, mock_get_budget_fraction):
"""Test that effective cap calculation works correctly with different queue sizes."""
input_op = InputDataBuffer(DataContext.get_current(), input_data=[MagicMock()])
map_op = TaskPoolMapOperator(
Expand All @@ -372,9 +384,7 @@ def test_can_add_input_effective_cap_calculation(self):
ConcurrencyCapBackpressurePolicy.AVAILABLE_OBJECT_STORE_BUDGET_THRESHOLD
)
# Set fraction below threshold to apply dynamic backpressure
mock_resource_manager.get_available_object_store_budget_fraction.return_value = (
threshold - 0.05
)
mock_get_budget_fraction.return_value = threshold - 0.05
mock_resource_manager.is_op_eligible.return_value = True
mock_resource_manager.has_materializing_downstream_op.return_value = False

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch

import pytest

Expand All @@ -23,6 +23,17 @@


class TestDownstreamCapacityBackpressurePolicy:
@pytest.fixture(autouse=True)
def setup_budget_fraction_mock(self):
"""Fixture to patch get_utilized_object_store_budget_fraction for all tests."""
with patch(
"ray.data._internal.execution.backpressure_policy."
"downstream_capacity_backpressure_policy."
"get_utilized_object_store_budget_fraction"
) as mock_func:
self._mock_get_utilized_budget_fraction = mock_func
yield

def _mock_operator(
self,
op_class: type = PhysicalOperator,
Expand Down Expand Up @@ -171,13 +182,13 @@ def _mock_resource_manager(
return rm

def _set_utilized_budget_fraction(self, rm, fraction):
"""Helper to set utilized budget fraction on resource manager.
"""Helper to set utilized budget fraction.

The policy checks: utilized_fraction <= OBJECT_STORE_BUDGET_UTIL_THRESHOLD
With threshold=0.9, skip backpressure when utilized_fraction <= 0.9.
To trigger backpressure, set utilized_fraction > 0.9.
"""
rm.get_utilized_object_store_budget_fraction.return_value = fraction
self._mock_get_utilized_budget_fraction.return_value = fraction
return fraction

def _set_queue_ratio(self, op, op_state, rm, queue_size, downstream_capacity):
Expand Down
1 change: 0 additions & 1 deletion release/release_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1653,7 +1653,6 @@
runtime_env:
# Enable verbose stats for resource manager
- RAY_DATA_DEBUG_RESOURCE_MANAGER=1
- RAY_DATA_ENABLE_DYNAMIC_OUTPUT_QUEUE_SIZE_BACKPRESSURE=1

# 'type: gpu' means: use the 'ray-ml' image.
type: gpu
Expand Down