-
Notifications
You must be signed in to change notification settings - Fork 7.1k
[Data] Added AsList aggregation
#59920
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a new AsList aggregation function for Ray Data, allowing users to collect all values within a group into a single list, along with an end-to-end test for its functionality. It also adds support for the modulo (%) operator to Ray Data expressions, including its Operation enum, expression evaluation logic for both Python's operator and PyArrow compute functions, and a test case for with_column arithmetic operations. A TODO comment was added to the AggregateFnV2 docstring regarding generic types. Review comments point out that the PyArrow implementation of the modulo operator is incorrect, missing a pc.floor() call, and that the new test for this operator only covers the pandas execution path. Additionally, a change to the rows_same utility function is identified as a breaking change due to altering its return behavior from a boolean to raising an AssertionError. The AsList aggregation is noted to have potential memory issues for large groups, suggesting a warning be added to its docstring, and the new __mod__ method in expressions.py is missing type hints.
I am having trouble creating individual review comments. Click here to see my feedback.
python/ray/data/_internal/planner/plan_expression/expression_evaluator.py (132-134)
The implementation of the modulo operator for PyArrow is incorrect. It's missing a pc.floor() call. The current implementation left - (left / right) * right will produce incorrect results for integer modulo when left / right is not an integer, because pc.divide performs floating-point division. For example, 1 % 2 would evaluate to 0 instead of 1.
The correct formula for modulo is a - n * floor(a/n). The implementation should be updated to include pc.floor.
Note that the new test for this operator in test_with_column.py only exercises the pandas code path. Adding a pyarrow batch_format to the test parameterization would have caught this bug.
Operation.MOD: lambda left, right: pc.subtract(
left, pc.multiply(right, pc.floor(pc.divide(left, right)))
),
python/ray/data/_internal/util.py (1719-1727)
The change to rows_same removes the try...except block, which alters the function's contract. Previously, it returned True or False. Now, it returns True on success and raises an AssertionError on failure. This is a breaking change that could affect other tests that rely on the boolean return value.
While pd.testing.assert_frame_equal provides better error messages, changing the behavior of a shared utility function can have unintended consequences.
Please either revert this change to maintain the original contract of rows_same, or ensure all call sites of this function across the codebase are updated to handle the AssertionError. Given that not all call sites may be visible in this PR, reverting seems safer.
python/ray/data/aggregate.py (382-383)
The AsList aggregation collects all values for a group into a list in memory. For groups with a very large number of elements, this can lead to high memory usage and potentially Out-Of-Memory (OOM) errors on worker nodes.
It would be beneficial to add a note to the docstring to warn users about this potential memory issue, advising them to be cautious when using this aggregation on columns with large group sizes.
python/ray/data/expressions.py (331-333)
The new __mod__ method is missing type hints for the other parameter and the return value. For consistency with other binary operators in this class (like __add__ and __sub__), please add the type hints.
def __mod__(self, other: Any) -> "Expr":
"""Modulation operator (%)."""
return self._bin(other, Operation.MOD)
python/ray/data/_internal/planner/plan_expression/expression_evaluator.py
Show resolved
Hide resolved
| # Schema: {'id': int64, 'group_key': int64} | ||
| # Listing all elements per group: | ||
| result = ds.groupby("group_key").aggregate(AsList(on="id")).take_all() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add sort for testing?
| result = ds.groupby("group_key").aggregate(AsList(on="id")).take_all() | |
| result = ds.groupby("group_key").aggregate(AsList(on="id")).sort("group_key").take_all() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's not sufficient -- we need to order the list too which makes this code example really clumsy (hence why i'm skipping testing it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Enabliling preserve_order should solve this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Preserve order won't guarantee sequence order either b/c it depends on the order of arrival of the shards into HashShuffleAggregator
8868f11 to
9f65726
Compare
Signed-off-by: Alexey Kudinkin <[email protected]> # Conflicts: # python/ray/data/tests/test_with_column.py Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
9f65726 to
5cdaa8a
Compare
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
owenowenisme
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
## Description Changes --- - Added `AsList` aggregation allowing to aggregate given column values into a single element as a list - Added `AsListVectorized` - Added modulo op to `Expr` ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Alexey Kudinkin <[email protected]> Signed-off-by: jasonwrwang <[email protected]>
Description
Changes
AsListaggregation allowing to aggregate given column values into a single element as a listAsListVectorizedExprRelated issues
Additional information