Skip to content
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
30898ae
Revisiting `HashShuffleAggregator` protocol to streamline, simplify a…
alexeykudinkin Jan 8, 2026
d813073
Added tests for `HashShuffleAggregator`
alexeykudinkin Jan 8, 2026
a1fe381
Rebased Join;
alexeykudinkin Jan 8, 2026
bce9f8d
Rebased Aggregate;
alexeykudinkin Jan 8, 2026
8b17302
Allow operators to override compaction thresholds;
alexeykudinkin Jan 8, 2026
38bd366
Missing changes
alexeykudinkin Jan 8, 2026
b3389f7
`lint`
alexeykudinkin Jan 8, 2026
03218f6
Fixed refs
alexeykudinkin Jan 8, 2026
e9baa9b
`lint`
alexeykudinkin Jan 8, 2026
1df562c
Tidying up
alexeykudinkin Jan 8, 2026
f8343df
Streamlined partition buckets init
alexeykudinkin Jan 8, 2026
b47ca98
Wired in `num_input_seqs`
alexeykudinkin Jan 8, 2026
b289924
Fixed invalid ref
alexeykudinkin Jan 8, 2026
0f593a9
`lint`
alexeykudinkin Jan 8, 2026
06af88b
Fixed refs
alexeykudinkin Jan 8, 2026
2480037
Bump up `_shuffle_block` num_cpus to 1
alexeykudinkin Jan 8, 2026
a05dc27
Reverting task retries for _shuffle_block
alexeykudinkin Jan 8, 2026
d92688d
Missing bazel target
alexeykudinkin Jan 8, 2026
a30e114
`lint`
alexeykudinkin Jan 8, 2026
608c10d
Reduce scope of the lock
alexeykudinkin Jan 9, 2026
7bc588a
Relocated `compaction_threshold` into the `PartitionBucket`
alexeykudinkin Jan 9, 2026
035730c
Updated compaction comment
alexeykudinkin Jan 9, 2026
e127c66
`lint`
alexeykudinkin Jan 9, 2026
a294375
Fixed tests
alexeykudinkin Jan 13, 2026
24e23ac
Fixed invalid ref
alexeykudinkin Jan 13, 2026
6175c8e
Fixed reported `incremental_resource_usage` for hash-shuffle
alexeykudinkin Jan 13, 2026
c247dd3
Fixed test
alexeykudinkin Jan 13, 2026
c5004f0
`lint`
alexeykudinkin Jan 13, 2026
116f46d
Tidying up
alexeykudinkin Jan 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions python/ray/data/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,20 @@ py_test(
],
)

py_test(
name = "test_hash_shuffle_aggregator",
size = "medium",
srcs = ["tests/test_hash_shuffle_aggregator.py"],
tags = [
"exclusive",
"team:data",
],
deps = [
":conftest",
"//:ray_lib",
],
)

py_test(
name = "test_hudi",
size = "medium",
Expand Down
122 changes: 55 additions & 67 deletions python/ray/data/_internal/execution/operators/hash_aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@
import math
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Tuple

from ray.data._internal.arrow_block import ArrowBlockAccessor
from ray.data._internal.execution.interfaces import PhysicalOperator
from ray.data._internal.execution.operators.hash_shuffle import (
BlockTransformer,
HashShufflingOperatorBase,
StatefulShuffleAggregation,
ShuffleAggregation,
)
from ray.data._internal.util import GiB, MiB
from ray.data.aggregate import AggregateFn
Expand All @@ -21,91 +20,69 @@
logger = logging.getLogger(__name__)


class ReducingShuffleAggregation(StatefulShuffleAggregation):
"""Aggregation performing reduction of the shuffled sequence using provided
list of aggregating functions.
class ReducingAggregation(ShuffleAggregation):
"""Stateless aggregation that reduces blocks using aggregation functions.

NOTE: That reductions are performed incrementally in a streaming fashion upon
accumulation of pre-configured buffer of rows to run aggregation on."""

_DEFAULT_BLOCKS_BUFFER_LIMIT = 1000
This implementation performs incremental reduction during compaction,
combining multiple partially-aggregated blocks into one. The final
aggregation is performed during finalization.
"""

def __init__(
self,
aggregator_id: int,
key_columns: Optional[Tuple[str]],
aggregation_fns: Tuple[AggregateFn],
key_columns: Tuple[str, ...],
aggregation_fns: Tuple[AggregateFn, ...],
):
self._sort_key: "SortKey" = self._get_sort_key(key_columns)
self._aggregation_fns: Tuple[AggregateFn, ...] = aggregation_fns

super().__init__(aggregator_id)

assert key_columns is not None, "Shuffle aggregation requires key columns"
@classmethod
def is_compacting(cls):
return True

self._sort_key: "SortKey" = ReducingShuffleAggregation._get_sort_key(
key_columns
)
self._aggregation_fns: Tuple[AggregateFn] = aggregation_fns
def compact(self, partition_shards: List[Block]) -> Block:
assert len(partition_shards) > 0, "Provided sequence must be non-empty"

self._aggregated_blocks: List[Block] = []
return self._combine(partition_shards, finalize=False)

def accept(self, input_seq_id: int, partition_id: int, partition_shard: Block):
def finalize(self, partition_shards_map: Dict[int, List[Block]]) -> Iterator[Block]:
assert (
input_seq_id == 0
), f"Single sequence is expected (got seq-id {input_seq_id})"

# Received partition shard is already partially aggregated, hence
# we simply add it to the list of aggregated blocks
#
# NOTE: We're not separating blocks by partition as it's ultimately not
# relevant for the aggregations performed
self._aggregated_blocks.append(partition_shard)

# Aggregation is performed incrementally, rather
# than being deferred to the finalization stage
if len(self._aggregated_blocks) > self._DEFAULT_BLOCKS_BUFFER_LIMIT:
# NOTE: This method will reset partially aggregated blocks to hold
# the new combined one
#
# TODO make aggregation async
self._combine_aggregated_blocks(should_finalize=False)

def finalize(self, partition_id: int) -> Iterator[Block]:
if len(self._aggregated_blocks) == 0:
block = ArrowBlockAccessor._empty_table()
else:
block = self._combine_aggregated_blocks(should_finalize=True)
len(partition_shards_map) == 1
), f"Single input-sequence is expected (got {len(partition_shards_map)})"

yield block
blocks = partition_shards_map[0]

def clear(self, partition_id: int):
self._aggregated_blocks: List[Block] = []
if not blocks:
return
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Empty partitions silently yield no output blocks

Medium Severity

The new implementation changes behavior for empty partitions. The old ReducingShuffleAggregation.finalize() always yielded a block (returning ArrowBlockAccessor._empty_table() for empty partitions), while the new code in both ReducingAggregation.finalize() and ConcatAggregation.finalize() has if not blocks: return which yields nothing. Additionally, HashShuffleAggregator.finalize() sets blocks = iter([]) when partition_shards_map is empty. This behavior change could break downstream code expecting a block per partition and causes the new test at line 129 to fail with IndexError when accessing results[0].

Additional Locations (2)

Fix in Cursor Fix in Web


def _combine_aggregated_blocks(self, *, should_finalize: bool) -> Block:
assert len(self._aggregated_blocks) > 0
yield self._combine(blocks, finalize=True)

block_accessor = BlockAccessor.for_block(self._aggregated_blocks[0])
def _combine(self, blocks: List[Block], *, finalize: bool) -> Block:
"""Internal method to combine blocks with optional finalization."""
assert len(blocks) > 0

block_accessor = BlockAccessor.for_block(blocks[0])
combined_block, _ = block_accessor._combine_aggregated_blocks(
self._aggregated_blocks,
blocks,
sort_key=self._sort_key,
aggs=self._aggregation_fns,
finalize=should_finalize,
finalize=finalize,
)

# For combined block that's not yet finalized reset cached aggregated
# blocks to only hold newly combined one
if not should_finalize:
self._aggregated_blocks = [combined_block]

return combined_block

@staticmethod
def _get_sort_key(key_columns: Tuple[str]):
def _get_sort_key(key_columns: Tuple[str, ...]) -> "SortKey":
from ray.data._internal.planner.exchange.sort_task_spec import SortKey

return SortKey(key=list(key_columns), descending=False)


class HashAggregateOperator(HashShufflingOperatorBase):

_DEFAULT_MIN_NUM_SHARDS_COMPACTION_THRESHOLD = 100
_DEFAULT_MAX_NUM_SHARDS_COMPACTION_THRESHOLD = 2000

def __init__(
self,
data_context: DataContext,
Expand All @@ -116,6 +93,13 @@ def __init__(
num_partitions: Optional[int] = None,
aggregator_ray_remote_args_override: Optional[Dict[str, Any]] = None,
):
# Use new stateless ReducingAggregation factory
def _create_reducing_aggregation() -> ReducingAggregation:
return ReducingAggregation(
key_columns=key_columns,
aggregation_fns=aggregation_fns,
)

super().__init__(
name_factory=(
lambda num_partitions: f"HashAggregate(key_columns={key_columns}, "
Expand All @@ -124,6 +108,7 @@ def __init__(
input_ops=[input_op],
data_context=data_context,
key_columns=[key_columns],
num_input_seqs=1,
num_partitions=(
# NOTE: In case of global aggregations (ie with no key columns specified),
# we override number of partitions to 1, since the whole dataset
Expand All @@ -132,13 +117,7 @@ def __init__(
if len(key_columns) > 0
else 1
),
partition_aggregation_factory=(
lambda aggregator_id, target_partition_ids: ReducingShuffleAggregation(
aggregator_id,
key_columns,
aggregation_fns,
)
),
partition_aggregation_factory=_create_reducing_aggregation,
input_block_transformer=_create_aggregating_transformer(
key_columns, aggregation_fns
),
Expand Down Expand Up @@ -191,14 +170,23 @@ def _estimate_aggregator_memory_allocation(

return aggregator_total_memory_required

@classmethod
def _get_min_max_partition_shards_compaction_thresholds(
cls,
) -> Optional[Tuple[int, int]]:
return (
cls._DEFAULT_MIN_NUM_SHARDS_COMPACTION_THRESHOLD,
cls._DEFAULT_MAX_NUM_SHARDS_COMPACTION_THRESHOLD,
)


def _create_aggregating_transformer(
key_columns: Tuple[str], aggregation_fns: Tuple[AggregateFn]
) -> BlockTransformer:
"""Method creates input block transformer performing partial aggregation of
the block applied prior to block being shuffled (to reduce amount of bytes shuffled)"""

sort_key = ReducingShuffleAggregation._get_sort_key(key_columns)
sort_key = ReducingAggregation._get_sort_key(key_columns)

def _aggregate(block: Block) -> Block:
from ray.data._internal.planner.exchange.aggregate_task_spec import (
Expand Down
Loading