Skip to content

Conversation

@JacobZheng0927
Copy link
Contributor

What changes were proposed in this pull request?

This PR optimizes BlockManager remove operations by introducing cached mappings to eliminate O(n) linear scans. The main changes are:

  1. Introduced three concurrent hash maps to track block ID associations:

    • rddToBlockIds: Maps RDD ID to its block IDs
    • broadcastToBlockIds: Maps broadcast ID to its block IDs
    • sessionToBlockIds: Maps session UUID to its cache block IDs
  2. Added cache maintenance methods:

    • addToCache(blockId): Updates caches when blocks are stored
    • removeFromCache(blockId): Updates caches when blocks are deleted
  3. Reworked remove operations to use cached lookups:

    • removeRdd(), removeBroadcast(), and removeCache() now perform O(1) lookups instead of scanning all entries
  4. Integrated with block lifecycle:

    • doPutIterator() calls addToCache() after successful block storage
    • removeBlock() calls removeFromCache() when blocks are removed

Why are the changes needed?

Previously, removeRdd(), removeBroadcast(), and removeCache() required scanning all blocks in blockInfoManager.entries to find matches. This approach becomes a serious bottleneck when:

  1. Large block counts: In production deployments with millions or even tens of millions of cached blocks, linear scans can be prohibitively slow
  2. High cleanup frequency: Workloads that repeatedly create and discard RDDs or broadcast variables accumulate overhead quickly

The original removeRdd() method already contained a TODO noting that an additional mapping would be needed to avoid linear scans. This PR implements that improvement.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

  • Unit tests: Verified the correctness of removeRdd(), removeBroadcast(), and removeCache(), including edge cases.
  • Stress tests: Ran multiple simple tasks using broadcast joins under sustained high concurrency to validate performance and stability of the optimized remove operations.

Before optimization
image

After optimization

image

The optimization delivers significant performance improvements for block cleanup under large data volumes, reducing the overhead caused by frequent GC when blocks accumulate.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the CORE label Sep 3, 2025
@JacobZheng0927 JacobZheng0927 changed the title [SPARK-53446][CORE] Optimize BlockManager remove operations with cach… [SPARK-53446][CORE] Optimize BlockManager remove operations with cached block mappings Sep 4, 2025
Option(rddToBlockIds.get(rddBlockId.rddId)).foreach { blockSet =>
blockSet.remove(blockId)
if (blockSet.isEmpty) {
rddToBlockIds.remove(rddBlockId.rddId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a race condition here:

  1. thread 1 calls addToCache, gets the map for its RDD id
  2. thread 2 calls removeFromCache, gets the map for the same RDD id, remove the last block id, and then removes this RDD id from the cache
  3. thread 1 adds the block id, but it's noop as this map entire is dangling now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @cloud-fan, use compute instead.
For example, something like this:

// Use import java.util.{Set => JSet} and change change the 'type' of value for the sets to JSet[BlockId]

def removeFromCache(
<snip>

    def doRemove[K](map: ConcurrentHashMap[K, JSet[BlockId]], key: K, block: BlockId): Unit = {
      map.compute(key,
        (_, set) => {
          if (null != set) {
            set.remove(block)
            if (set.isEmpty) null else set
          } else {
            // missing
            null
          }
        }
      )
    }

<snip>

case rddBlockId: RDDBlockId =>
  doRemove(rddToBlockIds, rddBlockId.rddId, blockId)
case broadcastBlockId: BroadcastBlockId =>
  doRemove(broadcastToBlockIds, broadcastBlockId.broadcastId, blockId)

// and so on

exceptionWasThrown = false
if (res.isEmpty) {
// the block was successfully stored
addToCache(blockId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is doPut the only entry point that can add blocks?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For adding a new block, yes; it goes through doPut

@cloud-fan
Copy link
Contributor

can we put this extra mapping in the lower level BlockInfoManager? Then it's easier to guarantee the consistency between the extra mapping and the original block id map.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a quick pass, thanks for working on this !

Option(rddToBlockIds.get(rddBlockId.rddId)).foreach { blockSet =>
blockSet.remove(blockId)
if (blockSet.isEmpty) {
rddToBlockIds.remove(rddBlockId.rddId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @cloud-fan, use compute instead.
For example, something like this:

// Use import java.util.{Set => JSet} and change change the 'type' of value for the sets to JSet[BlockId]

def removeFromCache(
<snip>

    def doRemove[K](map: ConcurrentHashMap[K, JSet[BlockId]], key: K, block: BlockId): Unit = {
      map.compute(key,
        (_, set) => {
          if (null != set) {
            set.remove(block)
            if (set.isEmpty) null else set
          } else {
            // missing
            null
          }
        }
      )
    }

<snip>

case rddBlockId: RDDBlockId =>
  doRemove(rddToBlockIds, rddBlockId.rddId, blockId)
case broadcastBlockId: BroadcastBlockId =>
  doRemove(broadcastToBlockIds, broadcastBlockId.broadcastId, blockId)

// and so on

/**
* Add a block ID to the appropriate cache mapping based on its type.
*/
private def addToCache(blockId: BlockId): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Move these helper methods towards the bottom.

Comment on lines +2092 to +2097
val blocksToRemove = Option(rddToBlockIds.get(rddId)) match {
case Some(blockSet) =>
blockSet.asScala.toSeq
case None =>
Seq.empty
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove it proactively from the map. This also makes the size returned consistent with what is actually removed (in case of race conditions).
Same for the other cases as well

Suggested change
val blocksToRemove = Option(rddToBlockIds.get(rddId)) match {
case Some(blockSet) =>
blockSet.asScala.toSeq
case None =>
Seq.empty
}
val blocksToRemove = Option(rddToBlockIds.remove(rddId)).
map(_.asScala.toSeq).getOrElse(Seq.empty)


blockInfoManager.removeBlock(blockId)
removeFromCache(blockId)
hasRemoveBlock = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do this in the finally block as well.

exceptionWasThrown = false
if (res.isEmpty) {
// the block was successfully stored
addToCache(blockId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For adding a new block, yes; it goes through doPut

@JacobZheng0927
Copy link
Contributor Author

Sorry, I don’t have time to continue this PR. @zml1206 has taken over in #52646.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants