Skip to content

Conversation

@alexeykudinkin
Copy link
Contributor

Context

This change aims at revisiting of the HashShuffleAggregator protocol by

  • Removing global lock (per aggregator)
  • Making shard accepting flow lock-free
  • Relocating all state from ShuffleAggregation into Aggregator itself
  • Adding dynamic compaction (exponentially increasing compaction period) to amortize compaction costs
  • Adding debugging state dumps

Related issues

Link related issues: "Fixes #1234", "Closes #1234", or "Related to #1234".

Additional information

Optional: Add implementation details, API changes, usage examples, screenshots, etc.

@alexeykudinkin alexeykudinkin requested a review from a team as a code owner January 8, 2026 19:08
@alexeykudinkin alexeykudinkin added the go add ONLY when ready to merge, run all tests label Jan 8, 2026
@alexeykudinkin alexeykudinkin changed the base branch from master to ak/hsh-shfl-strm January 8, 2026 19:09
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request is a significant and well-executed refactoring of the HashShuffleAggregator protocol. It moves from a stateful to a stateless aggregation model, which simplifies the aggregation logic and removes the need for state management within aggregation components. The introduction of per-partition locking and a lock-free queue for block submission is a major performance improvement, removing the global lock bottleneck. The addition of dynamic compaction with an exponentially increasing period is a clever optimization to amortize compaction costs. The new tests for HashShuffleAggregator are comprehensive and cover the new features well.

I've found a critical issue related to undefined constants in the new compaction logic that could lead to runtime errors. I've also identified a couple of minor issues: a typo in a new method name and a bug in an error message. Overall, this is a high-quality change that significantly improves the architecture of hash-based shuffling in Ray Data.

def clear(self, partition_id: int):
self._aggregated_blocks: List[Block] = []
if not blocks:
return
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Empty partitions silently yield no output blocks

Medium Severity

The new implementation changes behavior for empty partitions. The old ReducingShuffleAggregation.finalize() always yielded a block (returning ArrowBlockAccessor._empty_table() for empty partitions), while the new code in both ReducingAggregation.finalize() and ConcatAggregation.finalize() has if not blocks: return which yields nothing. Additionally, HashShuffleAggregator.finalize() sets blocks = iter([]) when partition_shards_map is empty. This behavior change could break downstream code expecting a block per partition and causes the new test at line 129 to fail with IndexError when accessing results[0].

Additional Locations (2)

Fix in Cursor Fix in Web

Base automatically changed from ak/hsh-shfl-strm to master January 8, 2026 19:44
if (
self._aggregation.is_compacting()
and bucket.queue.qsize()
>= self._current_compaction_thresholds[partition_id]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TypeError when compacting aggregation lacks compaction thresholds

Low Severity

When is_compacting() returns True but min_max_shards_compaction_thresholds is None, the comparison bucket.queue.qsize() >= self._current_compaction_thresholds[partition_id] will raise a TypeError because _current_compaction_thresholds is a defaultdict that returns None (the value of _min_num_blocks_compaction_threshold). Currently this doesn't trigger because only ReducingAggregation returns is_compacting() = True, and it's paired with HashAggregateOperator which provides thresholds. However, custom aggregations returning is_compacting() = True with operators not overriding _get_min_max_partition_shards_compaction_thresholds() would crash.

Additional Locations (1)

Fix in Cursor Fix in Web

Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Override compaction thresholds for Aggregate

Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
if (
self._aggregation.is_compacting()
and bucket.queue.qsize()
>= self._current_compaction_thresholds[partition_id]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy over a conversation

Does dictionary also need to be protected by a lock?
Not really (b/c it's read-only + GIL)

Hmm I understand the GIL, however, it's not read only because it's a default dict. Are you sure it's thread-safe to access an item in defaultdict without a lock?

# NOTE: We revise compaction thresholds for partition after every
# compaction to amortize the cost of compaction.
self._current_compaction_thresholds[partition_id] = min(
self._current_compaction_thresholds[partition_id] * 2,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy over a conversation

I'm a bit confused why u are multiplying by 2?
Increasing threshold

I guess I want to understand the motivation behind multiplying by 2? Why not scale by 2 when you can keep it constant. It's not intuitive to me why 2 is used

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexeykudinkin discussed offline, i think you update ur comment to refer to only Concat aggregations, as this does not affect sum or count

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The motivation is the same as with doubling the size of the list whenever resizing the list while appending to it: amortizing the cost of copying existing data in the list over N appends.

Similarly here for non-reducing aggregations (like AsList) b/c after compaction we add the block back into the queue we want to reduce amount of time we're copying the data over and over again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me update the comment

exec_stats_builder = BlockExecStats.builder()

# Collect partition shards from all input sequences for this partition
partition_shards_map: Dict[int, List[Block]] = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's create a typevar to denote that this is a partitionId

)
self._target_max_block_size = target_max_block_size

self._current_compaction_thresholds: Dict[int, int] = defaultdict(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy over a conversation

Hmm, i feel that a more accurate way to compact should be based on object store usage. Do u think it makes more sense to refactor PartitionState to accurately keep track of byte size too, and then compact based on that? Curious about ur intentions behind making it "block" based
It's put in place to manage performance of Pyarrow table concatenation
For aggregations (sum, etc) it allows us to run most of the aggregations while shuffling itself is ongoing (ie finalization is really fast)

hmm, i understand the intention, it's more that if blocks are very large (1GiB block), or (1KB) per block) then this heuristic isn't very strong. Or, are you assuming block sizes are roughly 128MiB?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed offline: This is simpliest, so he is going with this, but can be experimented with.


# Per-sequence mapping of partition-id to `PartitionState` with individual
# locks for thread-safe block accumulation
self._input_seq_partition_buckets: Dict[
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use typevars

Signed-off-by: Alexey Kudinkin <[email protected]>
@ray-gardener ray-gardener bot added the data Ray Data-related issues label Jan 9, 2026
lock: threading.Lock
queue: queue.Queue # Queue[Block]

def drain_queue(self) -> List[Block]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For associative reducing aggs, you can fold per block instead of holding onto a reference of blocks.

def drain(self, fold_fn: Callable[[(Block, AggState), Block], Block]) -> Optional[Block]:
    acc = None
    while True:
        try:
            block = self.queue.get(False)
            acc = block if acc is None else fold_fn(acc, block)
        except Empty:
            break
    return acc

Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
"compaction_threshold": self._current_compaction_thresholds[
partition_id
],
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Undefined attribute accessed in debug dump thread

High Severity

The _debug_dump method references self._current_compaction_thresholds[partition_id] which is never defined in HashShuffleAggregator.__init__. The compaction threshold is actually stored in each PartitionBucket.compaction_threshold (accessed via partition.compaction_threshold where partition is the loop variable). This causes the debug daemon thread to crash with AttributeError 10 seconds after aggregator initialization. The test file also references this undefined attribute and will fail.

Additional Locations (1)

Fix in Cursor Fix in Web

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Ray fails to serialize self-reference objects

4 participants