Skip to content

Conversation

@srinathk10
Copy link
Contributor

Thank you for contributing to Ray! 🚀
Please review the Ray Contribution Guide before opening a pull request.

⚠️ Remove these instructions before submitting your PR.

💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete.

Description

Briefly describe what this PR accomplishes and why it's needed.

[Data] Fixes to DownstreamCapacityBackpressuePolicy

Fixes

In DownstreamCapacityBackpressuePolicy, when calculating available/utilized object store ratio per Op, also include the downstream in-eligible Ops.

Related issues

Link related issues: "Fixes #1234", "Closes #1234", or "Related to #1234".

Additional information

Optional: Add implementation details, API changes, usage examples, screenshots, etc.

@srinathk10 srinathk10 requested a review from a team as a code owner January 9, 2026 01:42
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the calculation of object store budget fractions to correctly account for downstream ineligible operators, which fixes an issue in DownstreamCapacityBackpressuePolicy. The logic is centralized into new helper functions, improving code clarity and reusability. The changes are well-tested and include necessary updates to test mocks. My main feedback is to address some code duplication that was introduced during the refactoring to further improve maintainability.

Comment on lines +455 to +460
def get_op_usage_object_store_with_downstream(self, op: PhysicalOperator) -> int:
"""Get total object store usage of the given operator and its downstream ineligible ops."""
total_usage = int(self.get_op_usage(op).object_store_memory)
for next_op in self._get_downstream_ineligible_ops(op):
total_usage += int(self.get_op_usage(next_op).object_store_memory)
return total_usage
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To improve maintainability and reduce code duplication, you can extract the logic for calculating the total object store usage of downstream ineligible operators into a private helper method. This logic is also used in get_op_outputs_object_store_usage_with_downstream.

After applying the suggestion below, you should also update get_op_outputs_object_store_usage_with_downstream to use _get_downstream_ineligible_ops_usage.

Suggested change
def get_op_usage_object_store_with_downstream(self, op: PhysicalOperator) -> int:
"""Get total object store usage of the given operator and its downstream ineligible ops."""
total_usage = int(self.get_op_usage(op).object_store_memory)
for next_op in self._get_downstream_ineligible_ops(op):
total_usage += int(self.get_op_usage(next_op).object_store_memory)
return total_usage
def _get_downstream_ineligible_ops_usage(self, op: PhysicalOperator) -> int:
"""Get the total object store memory usage of downstream ineligible operators."""
usage = 0
for next_op in self._get_downstream_ineligible_ops(op):
usage += int(self.get_op_usage(next_op).object_store_memory)
return usage
def get_op_usage_object_store_with_downstream(self, op: PhysicalOperator) -> int:
"""Get total object store usage of the given operator and its downstream ineligible ops."""
total_usage = int(self.get_op_usage(op).object_store_memory)
total_usage += self._get_downstream_ineligible_ops_usage(op)
return total_usage

@srinathk10 srinathk10 added the go add ONLY when ready to merge, run all tests label Jan 9, 2026
@ray-gardener ray-gardener bot added the data Ray Data-related issues label Jan 9, 2026
@alexeykudinkin alexeykudinkin merged commit 56e6ec0 into master Jan 9, 2026
6 checks passed
@alexeykudinkin alexeykudinkin deleted the srinathk10/fix_downstream_capacity_policy branch January 9, 2026 19:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Ray fails to serialize self-reference objects

3 participants