Skip to content

Commit a321441

Browse files
reiabreuclaude
andcommitted
utils: use WeakReference in BatchInserter to fix ThreadLocal retention of JCQueue
BatchInserter held a strong reference to its owning JCQueue, and the inserters live in instance-field ThreadLocals on the same JCQueue. This formed a cycle through ThreadLocalMap: value (BatchInserter) -> queue (JCQueue) -> thdLocalBatcher (ThreadLocal) = key Because the key was strongly reachable via the value, the weak-key expunge path in ThreadLocalMap never triggered, and the JCQueue (along with its metrics, recv/overflow queues and batch buffers) could not be GC'd for as long as any producer thread that ever published to it stayed alive. The fix stores the JCQueue as a WeakReference inside BatchInserter, cutting the value->key path. When the last external strong ref to the JCQueue is dropped, the ThreadLocal field it owns becomes weakly reachable, the ThreadLocalMap key can be expunged, and both the BatchInserter and the JCQueue are released. flush() and tryFlush() dereference the WeakReference once at entry and bail out cleanly if the queue has already been collected (dead topology in LocalCluster/embedded scenarios). publish() and tryPublish() are unchanged — they only manipulate currentBatch. Fixes #8810 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent a18676b commit a321441

1 file changed

Lines changed: 18 additions & 2 deletions

File tree

storm-client/src/jvm/org/apache/storm/utils/JCQueue.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.storm.utils;
2020

2121
import java.io.Closeable;
22+
import java.lang.ref.WeakReference;
2223
import java.util.ArrayList;
2324
import java.util.List;
2425
import org.apache.storm.metrics2.StormMetricRegistry;
@@ -345,11 +346,16 @@ public boolean tryFlush() {
345346
/* Not thread safe. Have one instance per producer thread or synchronize externally */
346347
private static class BatchInserter implements Inserter {
347348
private final int batchSz;
348-
private final JCQueue queue;
349+
// WeakReference breaks the ThreadLocal retention cycle: thdLocalBatcher is an instance field
350+
// of JCQueue, so the ThreadLocalMap key (the ThreadLocal object) is kept strongly reachable
351+
// via value(BatchInserter) -> queue(JCQueue) -> field. A WeakReference here cuts that path,
352+
// allowing the key to become weakly-reachable and the entry to be expunged once the JCQueue
353+
// is no longer externally referenced.
354+
private final WeakReference<JCQueue> queueRef;
349355
private final ArrayList<Object> currentBatch;
350356

351357
BatchInserter(JCQueue queue, int batchSz) {
352-
this.queue = queue;
358+
this.queueRef = new WeakReference<>(queue);
353359
this.batchSz = batchSz;
354360
this.currentBatch = new ArrayList<>(batchSz + 1);
355361
}
@@ -402,6 +408,11 @@ public void flush() throws InterruptedException {
402408
if (currentBatch.isEmpty()) {
403409
return;
404410
}
411+
JCQueue queue = queueRef.get();
412+
if (queue == null) {
413+
currentBatch.clear();
414+
return;
415+
}
405416
boolean wasFull = currentBatch.size() >= batchSize();
406417
int publishCount = queue.tryPublishInternal(currentBatch);
407418
int retryCount = 0;
@@ -432,6 +443,11 @@ public boolean tryFlush() {
432443
if (currentBatch.isEmpty()) {
433444
return true;
434445
}
446+
JCQueue queue = queueRef.get();
447+
if (queue == null) {
448+
currentBatch.clear();
449+
return true;
450+
}
435451
boolean wasFull = currentBatch.size() >= batchSize();
436452
int publishCount = queue.tryPublishInternal(currentBatch);
437453
if (publishCount == 0) {

0 commit comments

Comments
 (0)