Skip to content

Commit

Permalink
Fix Race Condition in removeReadyTimers() to Prevent Cleared Timers f…
Browse files Browse the repository at this point in the history
…rom Firing
  • Loading branch information
ModRyanFu committed Feb 26, 2025
1 parent a6917b0 commit d5649cf
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ class BeamModulePlugin implements Plugin<Project> {

// Automatically use the official release version if we are performing a release
// otherwise append '-SNAPSHOT'
project.version = '2.45.38'
project.version = '2.45.39'
if (isLinkedin(project)) {
project.ext.mavenGroupId = 'com.linkedin.beam'
}
Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ signing.gnupg.useLegacyGpg=true
# buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy.
# To build a custom Beam version make sure you change it in both places, see
# https://github.com/apache/beam/issues/21302.
version=2.45.38
sdk_version=2.45.38
version=2.45.39
sdk_version=2.45.39

javaVersion=1.8

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,14 +329,9 @@ private void doProcessWatermark(Instant watermark, OpEmitter<OutT> emitter) {

timerInternalsFactory.setInputWatermark(actualInputWatermark);

Collection<? extends KeyedTimerData<?>> readyTimers = timerInternalsFactory.removeReadyTimers();
if (!readyTimers.isEmpty()) {
pushbackFnRunner.startBundle();
for (KeyedTimerData<?> keyedTimerData : readyTimers) {
fireTimer(keyedTimerData);
}
pushbackFnRunner.finishBundle();
}
pushbackFnRunner.startBundle();
timerInternalsFactory.fireReadyTimers(this::fireTimer);
pushbackFnRunner.finishBundle();

if (timerInternalsFactory.getOutputWatermark() == null
|| timerInternalsFactory.getOutputWatermark().isBefore(actualInputWatermark)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.beam.runners.samza.runtime;

import java.util.Collection;
import java.util.Collections;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
Expand Down Expand Up @@ -207,14 +206,9 @@ public void processElement(
public void processWatermark(Instant watermark, OpEmitter<KV<K, OutputT>> emitter) {
timerInternalsFactory.setInputWatermark(watermark);

Collection<KeyedTimerData<K>> readyTimers = timerInternalsFactory.removeReadyTimers();
if (!readyTimers.isEmpty()) {
fnRunner.startBundle();
for (KeyedTimerData<K> keyedTimerData : readyTimers) {
fireTimer(keyedTimerData.getKey(), keyedTimerData.getTimerData());
}
fnRunner.finishBundle();
}
fnRunner.startBundle();
timerInternalsFactory.fireReadyTimers(timer -> fireTimer(timer.getKey(), timer.getTimerData()));
fnRunner.finishBundle();

if (timerInternalsFactory.getOutputWatermark() == null
|| timerInternalsFactory.getOutputWatermark().isBefore(watermark)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,79 @@ public void removeProcessingTimer(KeyedTimerData<K> keyedTimerData) {
state.deletePersisted(keyedTimerData);
}

/** Functional interface for firing a timer. */
@FunctionalInterface
public interface TimeFiringFn<K> {
void fire(KeyedTimerData<K> timerData);
}

/**
* Fires ready timers by processing a batch of timers up to a maximum limit.
*
* <p>This method attempts to load and process timers in batches, ensuring that at most {@code
* maxReadyTimersToProcessOnce} timers are processed per invocation. It works as follows:
*
* <ol>
* <li>If the in-memory event time buffer is empty, the method reloads timers from state.
* <li>It continues adding timers to the ready batch as long as:
* <ul>
* <li>The ready batch size is below the maximum limit.
* <li>The timer at the head of the buffer is due (its timestamp is less than or equal to
* the current input watermark).
* </ul>
* <li>Each timer in the ready batch is then verified (to ensure it still exists in state),
* removed from persistent storage, and fired using the provided {@code firingFn}.
* <li>If the processing limit is reached and there are still expired timers remaining in the
* buffer, a warning is logged to indicate that the remaining timers will be processed
* during the next watermark update.
* </ol>
*
* @param firingFn the function used to fire each valid timer
*/
public void fireReadyTimers(TimeFiringFn<K> firingFn) {
Collection<KeyedTimerData<K>> readyTimers = new ArrayList<>();

// Keep trying until we reach the processing limit.
while (readyTimers.size() < maxReadyTimersToProcessOnce) {
// If the buffer is empty, attempt to reload timers from state.
if (eventTimeBuffer.isEmpty()) {
state.reloadEventTimeTimers();
// If still empty after reloading, break out as there are no timers to process.
if (eventTimeBuffer.isEmpty()) {
break;
}
}

// Only add timers that are due (timestamp <= inputWatermark).
if (!eventTimeBuffer.isEmpty()
&& !eventTimeBuffer.first().getTimerData().getTimestamp().isAfter(inputWatermark)) {
readyTimers.add(eventTimeBuffer.pollFirst());
} else {
// No more timers are due.
break;
}
}

// Process each ready timer: verify it still exists in state, delete it, then fire it.
for (KeyedTimerData<K> keyedTimerData : readyTimers) {
Long storedTimestamp = state.get(keyedTimerData);
if (storedTimestamp != null
&& storedTimestamp.equals(keyedTimerData.getTimerData().getTimestamp().getMillis())) {
state.deletePersisted(keyedTimerData);
firingFn.fire(keyedTimerData);
}
}

// Log a warning if we've hit the processing limit and there are still expired timers remaining.
if (readyTimers.size() == maxReadyTimersToProcessOnce
&& !eventTimeBuffer.isEmpty()
&& eventTimeBuffer.first().getTimerData().getTimestamp().isBefore(inputWatermark)) {
LOG.warn(
"Loaded {} expired timers, the remaining will be processed at next watermark.",
maxReadyTimersToProcessOnce);
}
}

public Instant getInputWatermark() {
return inputWatermark;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,14 +221,9 @@ public void processElement(
public void processWatermark(Instant watermark, OpEmitter<RawUnionValue> emitter) {
timerInternalsFactory.setInputWatermark(watermark);

Collection<KeyedTimerData<byte[]>> readyTimers = timerInternalsFactory.removeReadyTimers();
if (!readyTimers.isEmpty()) {
fnRunner.startBundle();
for (KeyedTimerData<byte[]> keyedTimerData : readyTimers) {
fireTimer(keyedTimerData.getKey(), keyedTimerData.getTimerData());
}
fnRunner.finishBundle();
}
fnRunner.startBundle();
timerInternalsFactory.fireReadyTimers(timer -> fireTimer(timer.getKey(), timer.getTimerData()));
fnRunner.finishBundle();

if (timerInternalsFactory.getOutputWatermark() == null
|| timerInternalsFactory.getOutputWatermark().isBefore(watermark)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,4 +687,50 @@ public void testByteArray() {
assertTrue(!map.containsKey(key2));
assertTrue(map.isEmpty());
}

@Test
public void testFireReadyTimersConsolidatedLogic() {
// Set up the pipeline options and state store.
SamzaPipelineOptions pipelineOptions =
PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
KeyValueStore<ByteArray, StateValue<?>> store = createStore();
final SamzaTimerInternalsFactory<String> timerInternalsFactory =
createTimerInternalsFactory(null, "timer", pipelineOptions, store);

final String key = "testKey";
final StateNamespace namespace = StateNamespaces.global();
// Get the TimerInternals for the key.
TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey(key);

// Create two timers with different timestamps.
TimerInternals.TimerData timer1 =
TimerInternals.TimerData.of(
"timer1", namespace, new Instant(10), new Instant(10), TimeDomain.EVENT_TIME);
TimerInternals.TimerData timer2 =
TimerInternals.TimerData.of(
"timer2", namespace, new Instant(20), new Instant(20), TimeDomain.EVENT_TIME);
// Set the timers.
timerInternals.setTimer(timer1);
timerInternals.setTimer(timer2);

// Set the input watermark such that both timers are due.
timerInternalsFactory.setInputWatermark(new Instant(30));

// List to collect fired timers.
final List<KeyedTimerData<String>> firedTimers = new ArrayList<>();

// Invoke the consolidated timer firing logic.
timerInternalsFactory.fireReadyTimers(timer -> firedTimers.add(timer));

// Verify that both timers have been fired in order.
assertEquals("Expected two fired timers", 2, firedTimers.size());
assertEquals("First fired timer should be timer1", timer1, firedTimers.get(0).getTimerData());
assertEquals("Second fired timer should be timer2", timer2, firedTimers.get(1).getTimerData());

// Also, the event time buffer should be empty after firing.
assertTrue(
"Event time buffer should be empty", timerInternalsFactory.getEventTimeBuffer().isEmpty());

store.close();
}
}

0 comments on commit d5649cf

Please sign in to comment.