diff --git a/python/ray/data/BUILD.bazel b/python/ray/data/BUILD.bazel index 06efa7e3f216..ef1ef2dee13c 100644 --- a/python/ray/data/BUILD.bazel +++ b/python/ray/data/BUILD.bazel @@ -649,6 +649,20 @@ py_test( ], ) +py_test( + name = "test_hash_shuffle_aggregator", + size = "medium", + srcs = ["tests/test_hash_shuffle_aggregator.py"], + tags = [ + "exclusive", + "team:data", + ], + deps = [ + ":conftest", + "//:ray_lib", + ], +) + py_test( name = "test_hudi", size = "medium", diff --git a/python/ray/data/_internal/execution/operators/hash_aggregate.py b/python/ray/data/_internal/execution/operators/hash_aggregate.py index c81d95be71df..0401d8f9d5d3 100644 --- a/python/ray/data/_internal/execution/operators/hash_aggregate.py +++ b/python/ray/data/_internal/execution/operators/hash_aggregate.py @@ -2,12 +2,11 @@ import math from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Tuple -from ray.data._internal.arrow_block import ArrowBlockAccessor from ray.data._internal.execution.interfaces import PhysicalOperator from ray.data._internal.execution.operators.hash_shuffle import ( BlockTransformer, HashShufflingOperatorBase, - StatefulShuffleAggregation, + ShuffleAggregation, ) from ray.data._internal.util import GiB, MiB from ray.data.aggregate import AggregateFn @@ -21,91 +20,69 @@ logger = logging.getLogger(__name__) -class ReducingShuffleAggregation(StatefulShuffleAggregation): - """Aggregation performing reduction of the shuffled sequence using provided - list of aggregating functions. +class ReducingAggregation(ShuffleAggregation): + """Stateless aggregation that reduces blocks using aggregation functions. - NOTE: That reductions are performed incrementally in a streaming fashion upon - accumulation of pre-configured buffer of rows to run aggregation on.""" - - _DEFAULT_BLOCKS_BUFFER_LIMIT = 1000 + This implementation performs incremental reduction during compaction, + combining multiple partially-aggregated blocks into one. The final + aggregation is performed during finalization. + """ def __init__( self, - aggregator_id: int, - key_columns: Optional[Tuple[str]], - aggregation_fns: Tuple[AggregateFn], + key_columns: Tuple[str, ...], + aggregation_fns: Tuple[AggregateFn, ...], ): + self._sort_key: "SortKey" = self._get_sort_key(key_columns) + self._aggregation_fns: Tuple[AggregateFn, ...] = aggregation_fns - super().__init__(aggregator_id) - - assert key_columns is not None, "Shuffle aggregation requires key columns" + @classmethod + def is_compacting(cls): + return True - self._sort_key: "SortKey" = ReducingShuffleAggregation._get_sort_key( - key_columns - ) - self._aggregation_fns: Tuple[AggregateFn] = aggregation_fns + def compact(self, partition_shards: List[Block]) -> Block: + assert len(partition_shards) > 0, "Provided sequence must be non-empty" - self._aggregated_blocks: List[Block] = [] + return self._combine(partition_shards, finalize=False) - def accept(self, input_seq_id: int, partition_id: int, partition_shard: Block): + def finalize(self, partition_shards_map: Dict[int, List[Block]]) -> Iterator[Block]: assert ( - input_seq_id == 0 - ), f"Single sequence is expected (got seq-id {input_seq_id})" - - # Received partition shard is already partially aggregated, hence - # we simply add it to the list of aggregated blocks - # - # NOTE: We're not separating blocks by partition as it's ultimately not - # relevant for the aggregations performed - self._aggregated_blocks.append(partition_shard) - - # Aggregation is performed incrementally, rather - # than being deferred to the finalization stage - if len(self._aggregated_blocks) > self._DEFAULT_BLOCKS_BUFFER_LIMIT: - # NOTE: This method will reset partially aggregated blocks to hold - # the new combined one - # - # TODO make aggregation async - self._combine_aggregated_blocks(should_finalize=False) - - def finalize(self, partition_id: int) -> Iterator[Block]: - if len(self._aggregated_blocks) == 0: - block = ArrowBlockAccessor._empty_table() - else: - block = self._combine_aggregated_blocks(should_finalize=True) + len(partition_shards_map) == 1 + ), f"Single input-sequence is expected (got {len(partition_shards_map)})" - yield block + blocks = partition_shards_map[0] - def clear(self, partition_id: int): - self._aggregated_blocks: List[Block] = [] + if not blocks: + return - def _combine_aggregated_blocks(self, *, should_finalize: bool) -> Block: - assert len(self._aggregated_blocks) > 0 + yield self._combine(blocks, finalize=True) - block_accessor = BlockAccessor.for_block(self._aggregated_blocks[0]) + def _combine(self, blocks: List[Block], *, finalize: bool) -> Block: + """Internal method to combine blocks with optional finalization.""" + assert len(blocks) > 0 + + block_accessor = BlockAccessor.for_block(blocks[0]) combined_block, _ = block_accessor._combine_aggregated_blocks( - self._aggregated_blocks, + blocks, sort_key=self._sort_key, aggs=self._aggregation_fns, - finalize=should_finalize, + finalize=finalize, ) - # For combined block that's not yet finalized reset cached aggregated - # blocks to only hold newly combined one - if not should_finalize: - self._aggregated_blocks = [combined_block] - return combined_block @staticmethod - def _get_sort_key(key_columns: Tuple[str]): + def _get_sort_key(key_columns: Tuple[str, ...]) -> "SortKey": from ray.data._internal.planner.exchange.sort_task_spec import SortKey return SortKey(key=list(key_columns), descending=False) class HashAggregateOperator(HashShufflingOperatorBase): + + _DEFAULT_MIN_NUM_SHARDS_COMPACTION_THRESHOLD = 100 + _DEFAULT_MAX_NUM_SHARDS_COMPACTION_THRESHOLD = 2000 + def __init__( self, data_context: DataContext, @@ -116,6 +93,13 @@ def __init__( num_partitions: Optional[int] = None, aggregator_ray_remote_args_override: Optional[Dict[str, Any]] = None, ): + # Use new stateless ReducingAggregation factory + def _create_reducing_aggregation() -> ReducingAggregation: + return ReducingAggregation( + key_columns=key_columns, + aggregation_fns=aggregation_fns, + ) + super().__init__( name_factory=( lambda num_partitions: f"HashAggregate(key_columns={key_columns}, " @@ -124,6 +108,7 @@ def __init__( input_ops=[input_op], data_context=data_context, key_columns=[key_columns], + num_input_seqs=1, num_partitions=( # NOTE: In case of global aggregations (ie with no key columns specified), # we override number of partitions to 1, since the whole dataset @@ -132,13 +117,7 @@ def __init__( if len(key_columns) > 0 else 1 ), - partition_aggregation_factory=( - lambda aggregator_id, target_partition_ids: ReducingShuffleAggregation( - aggregator_id, - key_columns, - aggregation_fns, - ) - ), + partition_aggregation_factory=_create_reducing_aggregation, input_block_transformer=_create_aggregating_transformer( key_columns, aggregation_fns ), @@ -191,6 +170,15 @@ def _estimate_aggregator_memory_allocation( return aggregator_total_memory_required + @classmethod + def _get_min_max_partition_shards_compaction_thresholds( + cls, + ) -> Optional[Tuple[int, int]]: + return ( + cls._DEFAULT_MIN_NUM_SHARDS_COMPACTION_THRESHOLD, + cls._DEFAULT_MAX_NUM_SHARDS_COMPACTION_THRESHOLD, + ) + def _create_aggregating_transformer( key_columns: Tuple[str], aggregation_fns: Tuple[AggregateFn] @@ -198,7 +186,7 @@ def _create_aggregating_transformer( """Method creates input block transformer performing partial aggregation of the block applied prior to block being shuffled (to reduce amount of bytes shuffled)""" - sort_key = ReducingShuffleAggregation._get_sort_key(key_columns) + sort_key = ReducingAggregation._get_sort_key(key_columns) def _aggregate(block: Block) -> Block: from ray.data._internal.planner.exchange.aggregate_task_spec import ( diff --git a/python/ray/data/_internal/execution/operators/hash_shuffle.py b/python/ray/data/_internal/execution/operators/hash_shuffle.py index 6e388bd491f3..7b5c5a83daad 100644 --- a/python/ray/data/_internal/execution/operators/hash_shuffle.py +++ b/python/ray/data/_internal/execution/operators/hash_shuffle.py @@ -3,6 +3,7 @@ import itertools import logging import math +import queue import random import threading import time @@ -80,10 +81,6 @@ BlockTransformer = Callable[[Block], Block] -StatefulShuffleAggregationFactory = Callable[ - [int, List[int]], "StatefulShuffleAggregation" -] - DEFAULT_HASH_SHUFFLE_AGGREGATOR_MAX_CONCURRENCY = env_integer( "RAY_DATA_DEFAULT_HASH_SHUFFLE_AGGREGATOR_MAX_CONCURRENCY", 8 @@ -94,104 +91,132 @@ ) -class StatefulShuffleAggregation(abc.ABC): - """Interface for a stateful aggregation to be used by hash-based shuffling - operators (inheriting from `HashShufflingOperatorBase`) and subsequent - aggregation of the dataset. +class ShuffleAggregation: + """Stateless implementation of shuffle "aggregation" operation, which for ex, + could be: + + - Concatenation: concatenates received shuffled partitions into a + single block (for ``repartition`` for ex). + - Join: joins corresponding shuffled partitions. + - Group Aggregation: applies aggregation on grouped data (like ``sum``, + ``count``, ``unique``, etc). - Any stateful aggregation has to adhere to the following protocol: + Each implementation is meant to be *stateless*, simply implementing + corresponding transformation on provided partition-shards. Accumulation and + state management is handled by the ``HashShuffleAggregator`` actor, + which invokes these methods as pure transformations. - - Individual input sequence(s) will be (hash-)partitioned into N - partitions each. + Each implementation must implement following methods: - - Accepting individual partition shards: for any given partition (identified - by partition-id) of the input sequence (identified by input-id) aggregation - will be receiving corresponding partition shards as the input sequence is - being shuffled. + - ``compact``: "compacts" an accumulated, *partial* list of partition shards. + Used extensively by reducing aggregating transformations like (``sum``, + ``count``, etc), to perform periodic aggregations during the shuffle stage + itself (for more details check out method's py-doc) - - Upon completion of the shuffling (ie once whole sequence is shuffled) - aggregation will receive a call to finalize the aggregation at which point - it's expected to produce and return resulting block. + - ``finalize``: finalizes provided *complete* list of partition's shards + (per input sequence). - - After successful finalization aggregation's `clear` method will be invoked - to clear any accumulated state to release resources. """ - def __init__(self, aggregator_id: int): - self._aggregator_id = aggregator_id + @classmethod + def is_compacting(cls): + """Returns whether this aggregation is capable of compacting partial + partition's shards list. + """ + return False + + def compact(self, partial_partition_shards: List[Block]) -> Block: + """Incrementally "compacts" provided partition shards of a *single* + partition from a single input blocks sequence. + + This operation is meant to incrementally process provided *partial* + list of the partition's shards. This is particularly beneficial for + aggregating transformations such as ``sum``, ``count``, ``unique``, etc., + which can effectively continuously incrementally aggregate partial partition + while shuffle is ongoing, therefore reducing amount of computation needed + during finalization. - def accept(self, input_seq_id: int, partition_id: int, partition_shard: Block): - """Accepts corresponding partition shard for the partition identified by - - Input sequence id - - Partition id + This operation can be invoked multiple times during the shuffle stage. + + For some transformation no meaningful compaction is possible, for which + is perfectly fine to return provided partition shards as they are. + + Args: + partial_partition_shards: Partial (incomplete) list of partition shards. + + Returns: + Potentially "compacted" block (if it's advantageous to do so). """ raise NotImplementedError() - def finalize(self, partition_id: int) -> Iterator[Block]: - """Finalizes aggregation of partitions (identified by partition-id) - from all input sequences returning resulting block. - """ + def finalize(self, partition_shards_map: Dict[int, List[Block]]) -> Iterator[Block]: + """Processes final, complete set of partition shards producing + output block of this aggregation. - raise NotImplementedError() + Called once after all shards for a partition have been received. - def clear(self, partition_id: int): - """Clears out any accumulated state for provided partition-id. + Args: + partition_shards_map: map input sequence id into final, complete + list of corresponding partition's shards. - NOTE: This method is invoked after aggregation is finalized for the given - partition.""" + For single input-sequence operations, this will be tuple of 1 list. + For multi input-sequence operations (e.g., joins), this will + contain multiple lists corresponding to the same partition from + respective input-sequences (tuple[0] for first seq, etc). + + Returns: + Iterator of incrementally yielded output blocks for this partition. + """ raise NotImplementedError() -class Concat(StatefulShuffleAggregation): - """Trivial aggregation recombining dataset's individual partition - from the partition shards provided during shuffling stage. Returns - single combined `Block` (Pyarrow `Table`) +# Factory type for creating stateless aggregation components +ShuffleAggregationFactory = Callable[[], ShuffleAggregation] + + +class ConcatAggregation(ShuffleAggregation): + """Simple concatenation aggregation for hash shuffle. + + Concatenates all partition shards into a single block, optionally sorting + the result by key columns. """ def __init__( self, - aggregator_id: int, - target_partition_ids: List[int], *, - should_sort: bool, - key_columns: Optional[Tuple[str]] = None, + should_sort: bool = False, + key_columns: Optional[Tuple[str, ...]] = None, ): - super().__init__(aggregator_id) - - assert ( - not should_sort or key_columns - ), f"Key columns have to be specified when `should_sort=True` (got {list(key_columns)})" + if should_sort and not key_columns: + raise ValueError("Key columns must be specified when should_sort=True") self._should_sort = should_sort self._key_columns = key_columns - # Block builders for individual partitions (identified by partition index) - self._partition_block_builders: Dict[int, ArrowBlockBuilder] = { - partition_id: ArrowBlockBuilder() for partition_id in target_partition_ids - } + def finalize(self, partition_shards_map: Dict[int, List[Block]]) -> Iterator[Block]: + """Concatenates blocks and optionally sorts by key columns.""" - def accept(self, input_seq_id: int, partition_id: int, partition_shard: Block): - assert input_seq_id == 0, ( - f"Concat is unary stateful aggregation, got sequence " - f"index of {input_seq_id}" - ) - assert partition_id in self._partition_block_builders, ( - f"Received shard from unexpected partition '{partition_id}' " - f"(expecting {self._partition_block_builders.keys()})" - ) + assert ( + len(partition_shards_map) == 1 + ), f"Single input-sequence is expected (got {len(partition_shards_map)})" - self._partition_block_builders[partition_id].add_block(partition_shard) + blocks = partition_shards_map[0] + if not blocks: + return - def finalize(self, partition_id: int) -> Iterator[Block]: - block = self._partition_block_builders[partition_id].build() + result = _combine(blocks) - if self._should_sort and len(block) > 0: - block = block.sort_by([(k, "ascending") for k in self._key_columns]) + if self._should_sort and result.num_rows > 0: + result = result.sort_by([(k, "ascending") for k in self._key_columns]) - yield block + yield result - def clear(self, partition_id: int): - self._partition_block_builders.pop(partition_id) + +def _combine(partition_shards: List[Block]) -> Block: + builder = ArrowBlockBuilder() + for block in partition_shards: + builder.add_block(block) + return builder.build() @ray.remote @@ -346,6 +371,43 @@ def _shuffle_block( return original_block_metadata, partition_shards_stats +@dataclass +class PartitionBucket: + """Per-partition state for thread-safe block accumulation. + + Each partition has its own lock and queue, eliminating cross-partition + contention during the accept (submit) path. + + The queue is used for lock-free block accumulation (Queue.put is thread-safe). + The lock is only acquired during compaction to ensure at most one compaction + runs at a time per partition. + """ + + lock: threading.Lock + queue: queue.Queue + + compaction_threshold: Optional[int] + + def drain_queue(self) -> List[Block]: + blocks = [] + + try: + while True: + blocks.append(self.queue.get_nowait()) + except queue.Empty: + pass + + return blocks + + @staticmethod + def create(compaction_threshold: Optional[int]) -> "PartitionBucket": + return PartitionBucket( + lock=threading.Lock(), + queue=queue.Queue(), + compaction_threshold=compaction_threshold, + ) + + @dataclass class _PartitionStats: num_rows: int @@ -455,6 +517,8 @@ class HashShufflingOperatorBase(PhysicalOperator, HashShuffleProgressBarMixin): simultaneously (as required by Join operator for ex). """ + _DEFAULT_SHUFFLE_BLOCK_NUM_CPUS = 1.0 + def __init__( self, name_factory: Callable[[int], str], @@ -462,7 +526,8 @@ def __init__( data_context: DataContext, *, key_columns: List[Tuple[str]], - partition_aggregation_factory: StatefulShuffleAggregationFactory, + partition_aggregation_factory: ShuffleAggregationFactory, + num_input_seqs: int, num_partitions: Optional[int] = None, partition_size_hint: Optional[int] = None, input_block_transformer: Optional[BlockTransformer] = None, @@ -546,13 +611,17 @@ def __init__( ray_remote_args.update(aggregator_ray_remote_args_override) self._aggregator_pool: AggregatorPool = AggregatorPool( + num_input_seqs=num_input_seqs, num_partitions=target_num_partitions, num_aggregators=num_aggregators, aggregation_factory=partition_aggregation_factory, aggregator_ray_remote_args=ray_remote_args, - target_max_block_size=None - if disallow_block_splitting - else data_context.target_max_block_size, + target_max_block_size=( + None if disallow_block_splitting else data_context.target_max_block_size + ), + min_max_shards_compaction_thresholds=( + self._get_min_max_partition_shards_compaction_thresholds() + ), ) # We track the running usage total because iterating @@ -637,7 +706,7 @@ def _do_add_input_inner(self, input_bundle: RefBundle, input_index: int): input_key_column_names = self._key_column_names[input_index] # Compose shuffling task resource bundle shuffle_task_resource_bundle = { - "num_cpus": 0.5, + "num_cpus": self._DEFAULT_SHUFFLE_BLOCK_NUM_CPUS, "memory": self._estimate_shuffling_memory_req( block_metadata, target_max_block_size=( @@ -664,13 +733,14 @@ def _do_add_input_inner(self, input_bundle: RefBundle, input_index: int): # - Block is first hash-partitioned into N partitions # - Individual partitions then are submitted to the corresponding # aggregators + # + # TODO HSA needs to be idempotent for _shuffle_block to be retriable + # https://anyscale1.atlassian.net/browse/DATA-1763 input_block_partition_shards_metadata_tuple_ref: ObjectRef[ Tuple[BlockMetadata, Dict[int, _PartitionStats]] ] = _shuffle_block.options( **shuffle_task_resource_bundle, num_returns=1, - # Make sure tasks are retried indefinitely - max_retries=-1, ).remote( block_ref, input_index, @@ -992,9 +1062,7 @@ def base_resource_usage(self) -> ExecutionResources: def incremental_resource_usage(self) -> ExecutionResources: return ExecutionResources( - # TODO fix (this hack is currently to force Ray to spin up more tasks when - # shuffling to autoscale hardware capacity) - cpu=0.01, + cpu=self._DEFAULT_SHUFFLE_BLOCK_NUM_CPUS, # cpu=self._shuffle_block_ray_remote_args.get("num_cpus", 0), # TODO estimate (twice avg block size) object_store_memory=0, @@ -1176,6 +1244,12 @@ def _estimate_aggregator_memory_allocation( def _gen_op_name(cls, num_partitions: int) -> str: raise NotImplementedError() + @classmethod + def _get_min_max_partition_shards_compaction_thresholds( + cls, + ) -> Optional[Tuple[int, int]]: + return None + class HashShuffleOperator(HashShufflingOperatorBase): # Add 30% buffer to account for data skew @@ -1191,6 +1265,13 @@ def __init__( should_sort: bool = False, aggregator_ray_remote_args_override: Optional[Dict[str, Any]] = None, ): + # Use new stateless ConcatAggregation factory + def _create_concat_aggregation() -> ConcatAggregation: + return ConcatAggregation( + should_sort=should_sort, + key_columns=key_columns if key_columns else None, + ) + super().__init__( name_factory=( lambda num_partitions: f"Shuffle(key_columns={key_columns}, num_partitions={num_partitions})" @@ -1198,16 +1279,10 @@ def __init__( input_ops=[input_op], data_context=data_context, key_columns=[key_columns], + num_input_seqs=1, num_partitions=num_partitions, aggregator_ray_remote_args_override=aggregator_ray_remote_args_override, - partition_aggregation_factory=( - lambda aggregator_id, target_partition_ids: Concat( - aggregator_id, - target_partition_ids, - should_sort=should_sort, - key_columns=key_columns, - ) - ), + partition_aggregation_factory=_create_concat_aggregation, shuffle_progress_bar_name="Shuffle", # NOTE: In cases like ``groupby`` blocks can't be split as this might violate an invariant that all rows # with the same key are in the same group (block) @@ -1283,17 +1358,20 @@ class AggregatorHealthInfo: class AggregatorPool: def __init__( self, + num_input_seqs: int, num_partitions: int, num_aggregators: int, - aggregation_factory: StatefulShuffleAggregationFactory, + aggregation_factory: ShuffleAggregationFactory, aggregator_ray_remote_args: Dict[str, Any], target_max_block_size: Optional[int], + min_max_shards_compaction_thresholds: Optional[Tuple[int, int]] = None, ): assert ( num_partitions >= 1 ), f"Number of partitions has to be >= 1 (got {num_partitions})" self._target_max_block_size = target_max_block_size + self._num_input_seqs = num_input_seqs self._num_partitions = num_partitions self._num_aggregators: int = num_aggregators self._aggregator_partition_map: Dict[ @@ -1304,9 +1382,9 @@ def __init__( self._aggregators: List[ray.actor.ActorHandle] = [] - self._aggregation_factory_ref: ObjectRef[ - StatefulShuffleAggregationFactory - ] = ray.put(aggregation_factory) + self._aggregation_factory_ref: ObjectRef[ShuffleAggregationFactory] = ray.put( + aggregation_factory + ) self._aggregator_ray_remote_args: Dict[ str, Any @@ -1315,6 +1393,10 @@ def __init__( self._aggregator_partition_map, ) + self._min_max_shards_compaction_thresholds = ( + min_max_shards_compaction_thresholds + ) + def start(self): # Check cluster resources before starting aggregators self._check_cluster_resources() @@ -1333,9 +1415,11 @@ def start(self): **self._aggregator_ray_remote_args ).remote( aggregator_id, + self._num_input_seqs, target_partition_ids, self._aggregation_factory_ref, self._target_max_block_size, + self._min_max_shards_compaction_thresholds, ) self._aggregators.append(aggregator) @@ -1537,52 +1621,185 @@ def start_health_monitoring(self): max_task_retries=-1 ) class HashShuffleAggregator: - """Actor handling of the assigned partitions during hash-shuffle operation + """Actor handling of the assigned partitions during hash-shuffle operation. + + This actor uses per-(sequence, partition) locking to eliminate cross-partition + contention during the submit (accept) path. Each (sequence, partition) pair has + its own lock and block queue. + + The aggregation logic is delegated to a stateless `ShuffleAggregation` component + that operates on batches of blocks without maintaining internal state. + + For multi-sequence operations (e.g., joins), blocks from different input sequences + are stored separately and passed to finalize() as a dict keyed by sequence ID. NOTE: This actor might have ``max_concurrency`` > 1 (depending on the number of - assigned partitions, and has to be thread-safe! + assigned partitions), and is thread-safe via per-(sequence, partition) locks. """ + _DEBUG_DUMP_PERIOD_S = 10 + def __init__( self, aggregator_id: int, + num_input_seqs: int, target_partition_ids: List[int], - agg_factory: StatefulShuffleAggregationFactory, + agg_factory: ShuffleAggregationFactory, target_max_block_size: Optional[int], + min_max_shards_compaction_thresholds: Optional[Tuple[int, int]] = None, ): - self._lock = threading.Lock() - self._agg: StatefulShuffleAggregation = agg_factory( - aggregator_id, target_partition_ids + self._aggregator_id: int = aggregator_id + self._target_max_block_size: int = target_max_block_size + + self._max_num_blocks_compaction_threshold = ( + min_max_shards_compaction_thresholds[1] + if min_max_shards_compaction_thresholds is not None + else None + ) + + # Create stateless aggregation component + self._aggregation: ShuffleAggregation = agg_factory() + + min_num_blocks_compaction_threshold = ( + min_max_shards_compaction_thresholds[0] + if min_max_shards_compaction_thresholds is not None + else None + ) + + # Per-sequence mapping of partition-id to `PartitionState` with individual + # locks for thread-safe block accumulation + self._input_seq_partition_buckets: Dict[ + int, Dict[int, PartitionBucket] + ] = self._allocate_partition_buckets( + num_input_seqs, + target_partition_ids, + min_num_blocks_compaction_threshold, ) - self._target_max_block_size = target_max_block_size + + self._bg_thread = threading.Thread( + target=self._debug_dump, + name="hash_shuffle_aggregator_debug_dump", + daemon=True, + ) + self._bg_thread.start() def submit(self, input_seq_id: int, partition_id: int, partition_shard: Block): - with self._lock: - self._agg.accept(input_seq_id, partition_id, partition_shard) + """Accepts a partition shard for accumulation. + + Uses per-(sequence, partition) locking to avoid cross-partition contention. + Performs incremental compaction when the block count exceeds threshold. + """ + bucket = self._input_seq_partition_buckets[input_seq_id][partition_id] + + # Add partition shard into the queue + bucket.queue.put(partition_shard) + # Check whether queue exceeded compaction threshold + if ( + self._aggregation.is_compacting() + and bucket.queue.qsize() >= bucket.compaction_threshold + ): + # We're taking a lock to drain the queue to make sure that there's + # no concurrent compactions happening + with bucket.lock: + # Check queue size again to avoid running compaction after + # another one just drained the queue + if bucket.queue.qsize() < bucket.compaction_threshold: + return + + # Drain the queue to perform compaction + to_compact = bucket.drain_queue() + # We revise up compaction thresholds for partition after every + # compaction so that for "non-reducing" aggregations (like + # `Unique`, `AsList`) we amortize the cost of compaction processing + # the same elements multiple times. + bucket.compaction_threshold = min( + bucket.compaction_threshold * 2, + self._max_num_blocks_compaction_threshold, + ) + + # For actual compaction we're releasing the lock + compacted = self._aggregation.compact(to_compact) + # Requeue compacted block back into the queue + bucket.queue.put(compacted) def finalize( self, partition_id: int ) -> Generator[Union[Block, "BlockMetadataWithSchema"], None, None]: + """Finalizes aggregation for a partition and yields output blocks. + + NOTE: Finalize is expected to be called + + - Only all `accept` calls are complete + - Only once per partition + + And therefore as such doesn't require explicit concurrency control + """ + exec_stats_builder = BlockExecStats.builder() + + # Collect partition shards from all input sequences for this partition + partition_shards_map: Dict[int, List[Block]] = {} + + # Find all sequences that have data for this partition + for seq_id, partition_map in list(self._input_seq_partition_buckets.items()): + if partition_id in partition_map: + partition_shards_map[seq_id] = partition_map[partition_id].drain_queue() + + # Accumulated partition shard lists could be empty in case of + # dataset being empty + if partition_shards_map: + # Finalization happens outside the lock (doesn't block other partitions) + # Convert dict to tuple ordered by sequence ID for finalize interface + blocks = self._aggregation.finalize(partition_shards_map) + else: + blocks = iter([]) - with self._lock: + if self._target_max_block_size is not None: + blocks = _shape_blocks(blocks, self._target_max_block_size) + + for block in blocks: + # Collect execution stats (and reset) + exec_stats = exec_stats_builder.build() exec_stats_builder = BlockExecStats.builder() - # Finalize given partition id - blocks = self._agg.finalize(partition_id) + yield block + yield BlockMetadataWithSchema.from_block(block, stats=exec_stats) + + def _debug_dump(self): + """Periodically dumps the state of the HashShuffleAggregator for debugging.""" + while True: + time.sleep(self._DEBUG_DUMP_PERIOD_S) - if self._target_max_block_size is not None: - blocks = _shape_blocks(blocks, self._target_max_block_size) + result = defaultdict(defaultdict) - for block in blocks: - # Collect execution stats (and reset) - exec_stats = exec_stats_builder.build() - exec_stats_builder = BlockExecStats.builder() + for seq_id, partition_map in list( + self._input_seq_partition_buckets.items() + ): + for partition_id, partition in list(partition_map.items()): + result[f"seq_{seq_id}"][f"partition_{partition_id}"] = { + # NOTE: qsize() is approximate but sufficient for debug logging + "num_blocks": partition.queue.qsize(), + "compaction_threshold": partition.compaction_threshold, + } + + logger.debug( + f"Hash shuffle aggregator id={self._aggregator_id}, " f"state: {result}" + ) - yield block - yield BlockMetadataWithSchema.from_block(block, stats=exec_stats) + @staticmethod + def _allocate_partition_buckets( + num_input_seqs: int, + target_partition_ids: List[int], + compaction_threshold: Optional[int], + ): + partition_buckets = defaultdict(defaultdict) + + for seq_id in range(num_input_seqs): + for part_id in target_partition_ids: + partition_buckets[seq_id][part_id] = PartitionBucket.create( + compaction_threshold + ) - # Clear any remaining state (to release resources) - self._agg.clear(partition_id) + return partition_buckets def _shape_blocks( diff --git a/python/ray/data/_internal/execution/operators/join.py b/python/ray/data/_internal/execution/operators/join.py index 3a6170e5f774..665ac0c6dc15 100644 --- a/python/ray/data/_internal/execution/operators/join.py +++ b/python/ray/data/_internal/execution/operators/join.py @@ -4,7 +4,7 @@ from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple, Type from ray._private.arrow_utils import get_pyarrow_version -from ray.data._internal.arrow_block import ArrowBlockAccessor, ArrowBlockBuilder +from ray.data._internal.arrow_block import ArrowBlockAccessor from ray.data._internal.arrow_ops.transform_pyarrow import ( MIN_PYARROW_VERSION_RUN_END_ENCODED_TYPES, MIN_PYARROW_VERSION_VIEW_TYPES, @@ -12,7 +12,8 @@ from ray.data._internal.execution.interfaces import PhysicalOperator from ray.data._internal.execution.operators.hash_shuffle import ( HashShufflingOperatorBase, - StatefulShuffleAggregation, + ShuffleAggregation, + _combine, ) from ray.data._internal.logical.operators.join_operator import JoinType from ray.data._internal.util import GiB, MiB @@ -51,92 +52,69 @@ class _DatasetPreprocessingResult: logger = logging.getLogger(__name__) -class JoiningShuffleAggregation(StatefulShuffleAggregation): - """Aggregation performing distributed joining of the 2 sequences, - by utilising hash-based shuffling. +class JoiningAggregation(ShuffleAggregation): + """Stateless aggregation for distributed joining of 2 sequences. - Hash-based shuffling applied to 2 input sequences and employing the same - partitioning scheme allows to + This implementation performs hash-based distributed joining by: + - Accumulating identical keys from both sequences into the same partition + - Performing join on individual partitions independently - - Accumulate identical keys from both sequences into the same - (numerical) partition. In other words, all keys such that - - hash(key) % num_partitions = partition_id - - - Perform join on individual partitions independently (from other partitions) - - For actual joining Pyarrow native joining functionality is utilised, providing - incredible performance while allowing keep the data from being deserialized. + For actual joining, Pyarrow native joining functionality is utilised. """ def __init__( self, *, - aggregator_id: int, join_type: JoinType, - left_key_col_names: Tuple[str], - right_key_col_names: Tuple[str], - target_partition_ids: List[int], - data_context: DataContext, + left_key_col_names: Tuple[str, ...], + right_key_col_names: Tuple[str, ...], left_columns_suffix: Optional[str] = None, right_columns_suffix: Optional[str] = None, + data_context: DataContext, ): - super().__init__(aggregator_id) - assert ( len(left_key_col_names) > 0 ), "At least 1 column to join on has to be provided" assert len(right_key_col_names) == len( left_key_col_names - ), "Number of column for both left and right join operands has to match" + ), "Number of columns for both left and right join operands has to match" assert join_type in _JOIN_TYPE_TO_ARROW_JOIN_VERB_MAP, ( f"Join type is not currently supported (got: {join_type}; " # noqa: C416 f"supported: {[jt for jt in JoinType]})" # noqa: C416 ) - self._left_key_col_names: Tuple[str] = left_key_col_names - self._right_key_col_names: Tuple[str] = right_key_col_names + self._left_key_col_names: Tuple[str, ...] = left_key_col_names + self._right_key_col_names: Tuple[str, ...] = right_key_col_names self._join_type: JoinType = join_type self._left_columns_suffix: Optional[str] = left_columns_suffix self._right_columns_suffix: Optional[str] = right_columns_suffix - # Partition builders for the partition corresponding to - # left and right input sequences respectively - self._left_input_seq_partition_builders: Dict[int, ArrowBlockBuilder] = { - partition_id: ArrowBlockBuilder() for partition_id in target_partition_ids - } - - self._right_input_seq_partition_builders: Dict[int, ArrowBlockBuilder] = { - partition_id: ArrowBlockBuilder() for partition_id in target_partition_ids - } - self._data_context = data_context + def finalize(self, partition_shards_map: Dict[int, List[Block]]) -> Iterator[Block]: + """Performs join on blocks from left (seq 0) and right (seq 1) sequences.""" - def accept(self, input_seq_id: int, partition_id: int, partition_shard: Block): - assert 0 <= input_seq_id < 2 - - partition_builder = self._get_partition_builder( - input_seq_id=input_seq_id, - partition_id=partition_id, - ) + assert ( + len(partition_shards_map) == 2 + ), f"Two input-sequences are expected (got {len(partition_shards_map)})" - partition_builder.add_block(partition_shard) + left_partition_shards = partition_shards_map[0] + right_partition_shards = partition_shards_map[1] - def finalize(self, partition_id: int) -> Iterator[Block]: + left_table = _combine(left_partition_shards) + right_table = _combine(right_partition_shards) - left_on, right_on = list(self._left_key_col_names), list( - self._right_key_col_names - ) + left_on = list(self._left_key_col_names) + right_on = list(self._right_key_col_names) + # Preprocess: split unsupported columns and add index columns if needed preprocess_result_l, preprocess_result_r = self._preprocess( - left_on, right_on, partition_id + left_table, right_table, left_on, right_on ) # Perform the join on supported columns arrow_join_type = _JOIN_TYPE_TO_ARROW_JOIN_VERB_MAP[self._join_type] - # Perform the join on supported columns supported = preprocess_result_l.supported_projection.join( preprocess_result_r.supported_projection, join_type=arrow_join_type, @@ -146,36 +124,26 @@ def finalize(self, partition_id: int) -> Iterator[Block]: right_suffix=self._right_columns_suffix, ) - # Add back unsupported columns (join type logic is in should_index_* variables) - supported = self._postprocess( + # Add back unsupported columns + result = self._postprocess( supported, preprocess_result_l.unsupported_projection, preprocess_result_r.unsupported_projection, ) - yield supported + yield result def _preprocess( self, + left_table: "pa.Table", + right_table: "pa.Table", left_on: List[str], right_on: List[str], - partition_id: int, ) -> Tuple[_DatasetPreprocessingResult, _DatasetPreprocessingResult]: - import pyarrow as pa - - left_seq_partition: pa.Table = self._get_partition_builder( - input_seq_id=0, partition_id=partition_id - ).build() - - right_seq_partition: pa.Table = self._get_partition_builder( - input_seq_id=1, partition_id=partition_id - ).build() - + """Preprocesses tables by splitting unsupported columns and adding indices.""" # Get supported columns - supported_l, unsupported_l = self._split_unsupported_columns(left_seq_partition) - supported_r, unsupported_r = self._split_unsupported_columns( - right_seq_partition - ) + supported_l, unsupported_l = self._split_unsupported_columns(left_table) + supported_r, unsupported_r = self._split_unsupported_columns(right_table) # Handle joins on unsupported columns conflicting_columns: Set[str] = set(unsupported_l.column_names) & set(left_on) @@ -248,21 +216,6 @@ def _postprocess( def _index_name(self, suffix: str) -> str: return f"__rd_index_level_{suffix}__" - def clear(self, partition_id: int): - self._left_input_seq_partition_builders.pop(partition_id) - self._right_input_seq_partition_builders.pop(partition_id) - - def _get_partition_builder(self, *, input_seq_id: int, partition_id: int): - if input_seq_id == 0: - partition_builder = self._left_input_seq_partition_builders[partition_id] - elif input_seq_id == 1: - partition_builder = self._right_input_seq_partition_builders[partition_id] - else: - raise ValueError( - f"Unexpected inpt sequence id of '{input_seq_id}' (expected 0 or 1)" - ) - return partition_builder - def _should_index_side( self, side: str, supported_table: "pa.Table", unsupported_table: "pa.Table" ) -> bool: @@ -403,16 +356,28 @@ def __init__( right_columns_suffix: Optional[str] = None, partition_size_hint: Optional[int] = None, aggregator_ray_remote_args_override: Optional[Dict[str, Any]] = None, - shuffle_aggregation_type: Optional[Type[StatefulShuffleAggregation]] = None, + shuffle_aggregation_type: Optional[Type[ShuffleAggregation]] = None, ): - if shuffle_aggregation_type is not None: - if not issubclass(shuffle_aggregation_type, StatefulShuffleAggregation): - raise TypeError( - f"shuffle_aggregation_type must be a subclass of StatefulShuffleAggregation, " - f"got {shuffle_aggregation_type}" - ) + # Use new stateless JoiningAggregation factory + def _create_joining_aggregation() -> JoiningAggregation: + if shuffle_aggregation_type is not None: + if not issubclass(shuffle_aggregation_type, ShuffleAggregation): + raise TypeError( + f"shuffle_aggregation_type must be a subclass of {ShuffleAggregation}, " + f"got {shuffle_aggregation_type}" + ) + + aggregation_class = shuffle_aggregation_type or JoiningAggregation + + return aggregation_class( + join_type=join_type, + left_key_col_names=left_key_columns, + right_key_col_names=right_key_columns, + left_columns_suffix=left_columns_suffix, + right_columns_suffix=right_columns_suffix, + data_context=data_context, + ) - aggregation_class = shuffle_aggregation_type or JoiningShuffleAggregation super().__init__( name_factory=( lambda num_partitions: f"Join(num_partitions={num_partitions})" @@ -420,20 +385,10 @@ def __init__( input_ops=[left_input_op, right_input_op], data_context=data_context, key_columns=[left_key_columns, right_key_columns], + num_input_seqs=2, num_partitions=num_partitions, partition_size_hint=partition_size_hint, - partition_aggregation_factory=( - lambda aggregator_id, target_partition_ids: aggregation_class( - aggregator_id=aggregator_id, - join_type=join_type, - left_key_col_names=left_key_columns, - right_key_col_names=right_key_columns, - target_partition_ids=target_partition_ids, - data_context=data_context, - left_columns_suffix=left_columns_suffix, - right_columns_suffix=right_columns_suffix, - ) - ), + partition_aggregation_factory=_create_joining_aggregation, aggregator_ray_remote_args_override=aggregator_ray_remote_args_override, shuffle_progress_bar_name="Shuffle", finalize_progress_bar_name="Join", diff --git a/python/ray/data/_internal/table_block.py b/python/ray/data/_internal/table_block.py index d3f890dd24e7..6ba93ef5c526 100644 --- a/python/ray/data/_internal/table_block.py +++ b/python/ray/data/_internal/table_block.py @@ -148,6 +148,9 @@ def build(self) -> Block: def num_rows(self) -> int: return self._num_rows + def num_blocks(self) -> int: + return len(self._tables) + def get_estimated_memory_usage(self) -> int: if self._num_rows == 0: return 0 diff --git a/python/ray/data/tests/test_consumption.py b/python/ray/data/tests/test_consumption.py index 721daa92b7fe..c041f83240b6 100644 --- a/python/ray/data/tests/test_consumption.py +++ b/python/ray/data/tests/test_consumption.py @@ -552,7 +552,12 @@ def test_take_all(ray_start_regular_shared): assert ray.data.range(5).take_all(4) -def test_union(ray_start_regular_shared): +def test_union(ray_start_regular_shared, restore_data_context): + # Set aggregator CPU to 0 to avoid deadlock in resource-constrained test env. + # Without this, the shuffle task (1 CPU) + aggregator actor (~0.25 CPU) would + # exceed the 1 CPU available in the test cluster, causing scheduler deadlock. + restore_data_context.hash_aggregate_operator_actor_num_cpus_override = 0 + ds = ray.data.range(20, override_num_blocks=10).materialize() # Test lazy union. diff --git a/python/ray/data/tests/test_hash_shuffle_aggregator.py b/python/ray/data/tests/test_hash_shuffle_aggregator.py new file mode 100644 index 000000000000..e4cd4039c8fe --- /dev/null +++ b/python/ray/data/tests/test_hash_shuffle_aggregator.py @@ -0,0 +1,185 @@ +"""Unit tests for HashShuffleAggregator.""" + +from typing import Dict, Iterator, List + +import pyarrow as pa +import pytest + +from ray.data._internal.arrow_ops import transform_pyarrow +from ray.data._internal.execution.operators.hash_shuffle import ( + HashShuffleAggregator, + ShuffleAggregation, +) +from ray.data._internal.planner.exchange.sort_task_spec import SortKey +from ray.data.block import Block + +# Access underlying class for direct instantiation (bypassing Ray actor) +_HashShuffleAggregatorClass = HashShuffleAggregator.__ray_actor_class__ + + +def make_block(n: int = 10, offset: int = 0) -> pa.Table: + return pa.table({"x": list(range(offset, offset + n))}) + + +def split_block(block: pa.Table, chunk_size: int) -> List[pa.Table]: + """Split block into chunks of given size.""" + return [block.slice(i, chunk_size) for i in range(0, block.num_rows, chunk_size)] + + +class MockCompactingAggregation(ShuffleAggregation): + """Tracks compact/finalize calls and input blocks.""" + + def __init__(self): + self.compact_calls: List[int] = [] + self.finalize_input: Dict[int, List[Block]] = {} + + @classmethod + def is_compacting(cls): + return True + + def compact(self, shards: List[Block]) -> Block: + self.compact_calls.append(len(shards)) + return pa.concat_tables(shards) if shards else make_block(0) + + def finalize(self, shards: Dict[int, List[Block]]) -> Iterator[Block]: + self.finalize_input = dict(shards) + blocks = [b for bs in shards.values() for b in bs] + yield pa.concat_tables(blocks) if blocks else make_block(0) + + +class MockNonCompactingAggregation(ShuffleAggregation): + """Tracks finalize input blocks.""" + + def __init__(self): + self.finalize_input: Dict[int, List[Block]] = {} + + @classmethod + def is_compacting(cls): + return False + + def compact(self, shards: List[Block]) -> Block: + raise RuntimeError("Should not be called") + + def finalize(self, shards: Dict[int, List[Block]]) -> Iterator[Block]: + self.finalize_input = dict(shards) + blocks = [b for bs in shards.values() for b in bs] + yield pa.concat_tables(blocks) if blocks else make_block(0) + + +class TestHashShuffleAggregator: + def test_compacting_workflow(self, ray_start_regular_shared): + """Tests compaction triggers, threshold doubling, multi-partition/sequence.""" + agg = MockCompactingAggregation() + aggregator = _HashShuffleAggregatorClass( + aggregator_id=0, + num_input_seqs=2, + target_partition_ids=[0, 1, 2], + agg_factory=lambda: agg, + target_max_block_size=None, + min_max_shards_compaction_thresholds=(3, 2000), + ) + + # Pre-generate blocks: split a 100-row block into 10 chunks of 10 rows + full_block = make_block(80) + input_seq0_part0 = split_block(full_block, 10) + + def get_compaction_thresholds(): + """Helper to extract compaction thresholds from partition buckets.""" + # Thresholds are now per-partition in PartitionBucket + return { + part_id: bucket.compaction_threshold + for part_id, bucket in aggregator._input_seq_partition_buckets[ + 0 + ].items() + if bucket.compaction_threshold is not None + } + + # Submit 2 blocks (below threshold=3) - no compaction + for b in input_seq0_part0[:2]: + aggregator.submit(0, 0, b) + assert agg.compact_calls == [] + assert get_compaction_thresholds() == {0: 3, 1: 3, 2: 3} + + # Submit 3rd block - triggers compaction, threshold doubles + aggregator.submit(0, 0, input_seq0_part0[2]) + assert agg.compact_calls == [3] + assert get_compaction_thresholds() == {0: 6, 1: 3, 2: 3} + + # Submit 5 more (queue: 1+5=6) - triggers at new threshold + for b in input_seq0_part0[3:8]: + aggregator.submit(0, 0, b) + + assert agg.compact_calls == [3, 6] + assert get_compaction_thresholds() == {0: 12, 1: 3, 2: 3} + + # Partition 1 has independent threshold (starts at 3) + for b in split_block(make_block(30, offset=1000), 10): + aggregator.submit(0, 1, b) + + assert agg.compact_calls == [3, 6, 3] + assert get_compaction_thresholds() == {0: 12, 1: 6, 2: 3} + + # Multiple sequences (join scenario) - seq_id=1 for partition 0 + input_seq1_part0 = split_block(make_block(20, offset=2000), 10) + for b in input_seq1_part0: + aggregator.submit(1, 0, b) + + # Finalize partition 0 - receives blocks from both sequences + results = list(aggregator.finalize(0)) + block, metadata = results + assert len(agg.finalize_input) == 2 # dict with 2 sequences + + # Verify output equals concatenation of seq0 (first 8 chunks) + seq1 + expected = transform_pyarrow.sort( + pa.concat_tables(tables=[*input_seq0_part0, *input_seq1_part0]), + sort_key=SortKey("x"), + ) + assert transform_pyarrow.sort(block, sort_key=SortKey("x")) == expected + + # Empty partition + results = list(aggregator.finalize(2)) + assert results[0] == make_block(0) + + def test_non_compacting_workflow(self, ray_start_regular_shared): + """Tests non-compacting aggregation with and without block splitting.""" + # Without splitting + full_block = make_block(50) + input_seq = split_block(full_block, 10) + + aggregator = _HashShuffleAggregatorClass( + aggregator_id=1, + num_input_seqs=1, + target_partition_ids=[0], + agg_factory=MockNonCompactingAggregation, + target_max_block_size=None, + ) + for b in input_seq: + aggregator.submit(0, 0, b) + + results = list(aggregator.finalize(0)) + block, metadata = results + assert block == full_block + + # With splitting - output blocks should reconstruct to original + full_block = make_block(500) + input_seq = split_block(full_block, 100) + + aggregator = _HashShuffleAggregatorClass( + aggregator_id=2, + num_input_seqs=1, + target_partition_ids=[0], + agg_factory=MockNonCompactingAggregation, + target_max_block_size=50, + ) + for b in input_seq: + aggregator.submit(0, 0, b) + + results = list(aggregator.finalize(0)) + output_blocks = [results[i] for i in range(0, len(results), 2)] + assert pa.concat_tables(output_blocks) == full_block + + +if __name__ == "__main__": + import sys + + sys.exit(pytest.main(["-v", __file__]))