Skip to content

Commit

Permalink
Fix Race Condition in removeReadyTimers() to Prevent Cleared Timers f…
Browse files Browse the repository at this point in the history
…rom Firing (#143)
  • Loading branch information
FuRyanf authored Mar 1, 2025
1 parent a6917b0 commit bb44789
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ class BeamModulePlugin implements Plugin<Project> {

// Automatically use the official release version if we are performing a release
// otherwise append '-SNAPSHOT'
project.version = '2.45.38'
project.version = '2.45.39'
if (isLinkedin(project)) {
project.ext.mavenGroupId = 'com.linkedin.beam'
}
Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ signing.gnupg.useLegacyGpg=true
# buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy.
# To build a custom Beam version make sure you change it in both places, see
# https://github.com/apache/beam/issues/21302.
version=2.45.38
sdk_version=2.45.38
version=2.45.39
sdk_version=2.45.39

javaVersion=1.8

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,14 +329,9 @@ private void doProcessWatermark(Instant watermark, OpEmitter<OutT> emitter) {

timerInternalsFactory.setInputWatermark(actualInputWatermark);

Collection<? extends KeyedTimerData<?>> readyTimers = timerInternalsFactory.removeReadyTimers();
if (!readyTimers.isEmpty()) {
pushbackFnRunner.startBundle();
for (KeyedTimerData<?> keyedTimerData : readyTimers) {
fireTimer(keyedTimerData);
}
pushbackFnRunner.finishBundle();
}
pushbackFnRunner.startBundle();
timerInternalsFactory.fireReadyTimers(this::fireTimer);
pushbackFnRunner.finishBundle();

if (timerInternalsFactory.getOutputWatermark() == null
|| timerInternalsFactory.getOutputWatermark().isBefore(actualInputWatermark)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.beam.runners.samza.runtime;

import java.util.Collection;
import java.util.Collections;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
Expand Down Expand Up @@ -207,14 +206,9 @@ public void processElement(
public void processWatermark(Instant watermark, OpEmitter<KV<K, OutputT>> emitter) {
timerInternalsFactory.setInputWatermark(watermark);

Collection<KeyedTimerData<K>> readyTimers = timerInternalsFactory.removeReadyTimers();
if (!readyTimers.isEmpty()) {
fnRunner.startBundle();
for (KeyedTimerData<K> keyedTimerData : readyTimers) {
fireTimer(keyedTimerData.getKey(), keyedTimerData.getTimerData());
}
fnRunner.finishBundle();
}
fnRunner.startBundle();
timerInternalsFactory.fireReadyTimers(timer -> fireTimer(timer.getKey(), timer.getTimerData()));
fnRunner.finishBundle();

if (timerInternalsFactory.getOutputWatermark() == null
|| timerInternalsFactory.getOutputWatermark().isBefore(watermark)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,69 @@ public void removeProcessingTimer(KeyedTimerData<K> keyedTimerData) {
state.deletePersisted(keyedTimerData);
}

/** Functional interface for firing a timer. */
@FunctionalInterface
public interface TimeFiringFn<K> {
void fire(KeyedTimerData<K> timerData);
}

/**
* Processes and fires ready timers, ensuring that at most {@code maxReadyTimersToProcessOnce}
* timers are handled per invocation.
*
* <p>Steps:
*
* <ol>
* <li>If the event time buffer is empty, reload timers from state.
* <li>Process timers up to the max limit:
* <ul>
* <li>Check if the next timer is due (timestamp ≤ input watermark).
* <li>If valid, remove it from the buffer, verify its state, and fire it.
* </ul>
* <li>If the max limit is reached and expired timers remain, log a warning.
* </ol>
*
* @param firingFn function to fire each valid timer
*/
public void fireReadyTimers(TimeFiringFn<K> firingFn) {
int processedCount = 0;

while (processedCount < maxReadyTimersToProcessOnce) {
// If the buffer is empty, attempt to reload timers from state.
if (eventTimeBuffer.isEmpty()) {
state.reloadEventTimeTimers();
// If still empty after reloading, break out as there are no timers to process.
if (eventTimeBuffer.isEmpty()) {
break;
}
}

// Process the timer only if it is due (timestamp <= inputWatermark).
if (!eventTimeBuffer.first().getTimerData().getTimestamp().isAfter(inputWatermark)) {
final KeyedTimerData<K> timer = eventTimeBuffer.pollFirst();
final Long storedTimestamp = state.get(timer);
if (storedTimestamp != null
&& storedTimestamp.equals(timer.getTimerData().getTimestamp().getMillis())) {
state.deletePersisted(timer);
firingFn.fire(timer);
processedCount++;
}
} else {
// No more timers are due.
break;
}
}
LOG.debug("Processed {} expired timers at this watermark.", processedCount);
// Log a warning if we've hit the processing limit and there are still expired timers remaining.
if (processedCount == maxReadyTimersToProcessOnce
&& !eventTimeBuffer.isEmpty()
&& eventTimeBuffer.first().getTimerData().getTimestamp().isBefore(inputWatermark)) {
LOG.warn(
"Loaded {} expired timers, the remaining will be processed at next watermark.",
maxReadyTimersToProcessOnce);
}
}

public Instant getInputWatermark() {
return inputWatermark;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,14 +221,9 @@ public void processElement(
public void processWatermark(Instant watermark, OpEmitter<RawUnionValue> emitter) {
timerInternalsFactory.setInputWatermark(watermark);

Collection<KeyedTimerData<byte[]>> readyTimers = timerInternalsFactory.removeReadyTimers();
if (!readyTimers.isEmpty()) {
fnRunner.startBundle();
for (KeyedTimerData<byte[]> keyedTimerData : readyTimers) {
fireTimer(keyedTimerData.getKey(), keyedTimerData.getTimerData());
}
fnRunner.finishBundle();
}
fnRunner.startBundle();
timerInternalsFactory.fireReadyTimers(timer -> fireTimer(timer.getKey(), timer.getTimerData()));
fnRunner.finishBundle();

if (timerInternalsFactory.getOutputWatermark() == null
|| timerInternalsFactory.getOutputWatermark().isBefore(watermark)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,4 +687,50 @@ public void testByteArray() {
assertTrue(!map.containsKey(key2));
assertTrue(map.isEmpty());
}

@Test
public void testFireReadyTimersConsolidatedLogic() {
// Set up the pipeline options and state store.
SamzaPipelineOptions pipelineOptions =
PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
KeyValueStore<ByteArray, StateValue<?>> store = createStore();
final SamzaTimerInternalsFactory<String> timerInternalsFactory =
createTimerInternalsFactory(null, "timer", pipelineOptions, store);

final String key = "testKey";
final StateNamespace namespace = StateNamespaces.global();
// Get the TimerInternals for the key.
TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey(key);

// Create two timers with different timestamps.
TimerInternals.TimerData timer1 =
TimerInternals.TimerData.of(
"timer1", namespace, new Instant(10), new Instant(10), TimeDomain.EVENT_TIME);
TimerInternals.TimerData timer2 =
TimerInternals.TimerData.of(
"timer2", namespace, new Instant(20), new Instant(20), TimeDomain.EVENT_TIME);
// Set the timers.
timerInternals.setTimer(timer1);
timerInternals.setTimer(timer2);

// Set the input watermark such that both timers are due.
timerInternalsFactory.setInputWatermark(new Instant(30));

// List to collect fired timers.
final List<KeyedTimerData<String>> firedTimers = new ArrayList<>();

// Invoke the consolidated timer firing logic.
timerInternalsFactory.fireReadyTimers(timer -> firedTimers.add(timer));

// Verify that both timers have been fired in order.
assertEquals("Expected two fired timers", 2, firedTimers.size());
assertEquals("First fired timer should be timer1", timer1, firedTimers.get(0).getTimerData());
assertEquals("Second fired timer should be timer2", timer2, firedTimers.get(1).getTimerData());

// Also, the event time buffer should be empty after firing.
assertTrue(
"Event time buffer should be empty", timerInternalsFactory.getEventTimeBuffer().isEmpty());

store.close();
}
}

0 comments on commit bb44789

Please sign in to comment.