From e364c9c88b5d89f2cd65f50036920b6ab58f203e Mon Sep 17 00:00:00 2001 From: rfu Date: Tue, 25 Feb 2025 18:24:54 -0800 Subject: [PATCH] Fix Race Condition in removeReadyTimers() to Prevent Cleared Timers from Firing --- .../beam/gradle/BeamModulePlugin.groovy | 2 +- gradle.properties | 4 +- .../beam/runners/samza/runtime/DoFnOp.java | 11 ++-- .../runners/samza/runtime/GroupByKeyOp.java | 12 ++--- .../runtime/SamzaTimerInternalsFactory.java | 52 +++++++++++++++++++ ...SplittableParDoProcessKeyedElementsOp.java | 11 ++-- .../SamzaTimerInternalsFactoryTest.java | 46 ++++++++++++++++ 7 files changed, 110 insertions(+), 28 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index f285742ae62c..091772d07c6a 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -398,7 +398,7 @@ class BeamModulePlugin implements Plugin { // Automatically use the official release version if we are performing a release // otherwise append '-SNAPSHOT' - project.version = '2.45.38' + project.version = '2.45.39' if (isLinkedin(project)) { project.ext.mavenGroupId = 'com.linkedin.beam' } diff --git a/gradle.properties b/gradle.properties index e9b24ba8812d..84d1276d4994 100644 --- a/gradle.properties +++ b/gradle.properties @@ -30,8 +30,8 @@ signing.gnupg.useLegacyGpg=true # buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy. # To build a custom Beam version make sure you change it in both places, see # https://github.com/apache/beam/issues/21302. -version=2.45.38 -sdk_version=2.45.38 +version=2.45.39 +sdk_version=2.45.39 javaVersion=1.8 diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java index 35661ae86fe1..1e51984364d6 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java @@ -329,14 +329,9 @@ private void doProcessWatermark(Instant watermark, OpEmitter emitter) { timerInternalsFactory.setInputWatermark(actualInputWatermark); - Collection> readyTimers = timerInternalsFactory.removeReadyTimers(); - if (!readyTimers.isEmpty()) { - pushbackFnRunner.startBundle(); - for (KeyedTimerData keyedTimerData : readyTimers) { - fireTimer(keyedTimerData); - } - pushbackFnRunner.finishBundle(); - } + pushbackFnRunner.startBundle(); + timerInternalsFactory.fireReadyTimers(this::fireTimer); + pushbackFnRunner.finishBundle(); if (timerInternalsFactory.getOutputWatermark() == null || timerInternalsFactory.getOutputWatermark().isBefore(actualInputWatermark)) { diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java index 1b19275dd967..fa79b63b59b1 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java @@ -17,7 +17,6 @@ */ package org.apache.beam.runners.samza.runtime; -import java.util.Collection; import java.util.Collections; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; @@ -207,14 +206,9 @@ public void processElement( public void processWatermark(Instant watermark, OpEmitter> emitter) { timerInternalsFactory.setInputWatermark(watermark); - Collection> readyTimers = timerInternalsFactory.removeReadyTimers(); - if (!readyTimers.isEmpty()) { - fnRunner.startBundle(); - for (KeyedTimerData keyedTimerData : readyTimers) { - fireTimer(keyedTimerData.getKey(), keyedTimerData.getTimerData()); - } - fnRunner.finishBundle(); - } + fnRunner.startBundle(); + timerInternalsFactory.fireReadyTimers(timer -> fireTimer(timer.getKey(), timer.getTimerData())); + fnRunner.finishBundle(); if (timerInternalsFactory.getOutputWatermark() == null || timerInternalsFactory.getOutputWatermark().isBefore(watermark)) { diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java index bd6547b805a4..d4aeb322aa81 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java @@ -207,6 +207,58 @@ public void removeProcessingTimer(KeyedTimerData keyedTimerData) { state.deletePersisted(keyedTimerData); } + /** Functional interface for firing a timer. */ + @FunctionalInterface + public interface TimeFiringFn { + void fire(KeyedTimerData timerData); + } + + /** + * Consolidated method to fire ready timers. It loops while the event time buffer has timers that + * are due based on the current input watermark. For each timer collected, it checks if the timer + * still exists in state; if so, it deletes it and fires it. Finally, if the event time buffer is + * empty, it reloads timers from state. + */ + public void fireReadyTimers(TimeFiringFn firingFn) { + // Process timers as long as there are timers in the buffer that are due. + while (!eventTimeBuffer.isEmpty() + && !eventTimeBuffer.first().getTimerData().getTimestamp().isAfter(inputWatermark)) { + + // Create a list of ready timers without deleting them from state yet. + Collection> readyTimers = new ArrayList<>(); + while (!eventTimeBuffer.isEmpty() + && !eventTimeBuffer.first().getTimerData().getTimestamp().isAfter(inputWatermark) + && readyTimers.size() < maxReadyTimersToProcessOnce) { + readyTimers.add(eventTimeBuffer.pollFirst()); + } + + // For each timer in our ready list, verify it still exists in state. + for (KeyedTimerData keyedTimerData : readyTimers) { + Long storedTimestamp = state.get(keyedTimerData); + if (storedTimestamp != null + && storedTimestamp.equals(keyedTimerData.getTimerData().getTimestamp().getMillis())) { + // The timer still exists in state; delete it now and fire it. + state.deletePersisted(keyedTimerData); + firingFn.fire(keyedTimerData); + } + } + + // Optionally, warn if we hit the processing limit while there are still expired timers. + if (readyTimers.size() == maxReadyTimersToProcessOnce + && !eventTimeBuffer.isEmpty() + && eventTimeBuffer.first().getTimerData().getTimestamp().isBefore(inputWatermark)) { + LOG.warn( + "Loaded {} expired timers, the remaining will be processed at next watermark.", + maxReadyTimersToProcessOnce); + } + + // If the buffer is empty, reload timers from state. + if (eventTimeBuffer.isEmpty()) { + state.reloadEventTimeTimers(); + } + } + } + public Instant getInputWatermark() { return inputWatermark; } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SplittableParDoProcessKeyedElementsOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SplittableParDoProcessKeyedElementsOp.java index cf164b145b63..0e947ce5d026 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SplittableParDoProcessKeyedElementsOp.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SplittableParDoProcessKeyedElementsOp.java @@ -221,14 +221,9 @@ public void processElement( public void processWatermark(Instant watermark, OpEmitter emitter) { timerInternalsFactory.setInputWatermark(watermark); - Collection> readyTimers = timerInternalsFactory.removeReadyTimers(); - if (!readyTimers.isEmpty()) { - fnRunner.startBundle(); - for (KeyedTimerData keyedTimerData : readyTimers) { - fireTimer(keyedTimerData.getKey(), keyedTimerData.getTimerData()); - } - fnRunner.finishBundle(); - } + fnRunner.startBundle(); + timerInternalsFactory.fireReadyTimers(timer -> fireTimer(timer.getKey(), timer.getTimerData())); + fnRunner.finishBundle(); if (timerInternalsFactory.getOutputWatermark() == null || timerInternalsFactory.getOutputWatermark().isBefore(watermark)) { diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java index e2db0cb7c842..42fb7545a757 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java @@ -687,4 +687,50 @@ public void testByteArray() { assertTrue(!map.containsKey(key2)); assertTrue(map.isEmpty()); } + + @Test + public void testFireReadyTimersConsolidatedLogic() { + // Set up the pipeline options and state store. + SamzaPipelineOptions pipelineOptions = + PipelineOptionsFactory.create().as(SamzaPipelineOptions.class); + KeyValueStore> store = createStore(); + final SamzaTimerInternalsFactory timerInternalsFactory = + createTimerInternalsFactory(null, "timer", pipelineOptions, store); + + final String key = "testKey"; + final StateNamespace namespace = StateNamespaces.global(); + // Get the TimerInternals for the key. + TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey(key); + + // Create two timers with different timestamps. + TimerInternals.TimerData timer1 = + TimerInternals.TimerData.of( + "timer1", namespace, new Instant(10), new Instant(10), TimeDomain.EVENT_TIME); + TimerInternals.TimerData timer2 = + TimerInternals.TimerData.of( + "timer2", namespace, new Instant(20), new Instant(20), TimeDomain.EVENT_TIME); + // Set the timers. + timerInternals.setTimer(timer1); + timerInternals.setTimer(timer2); + + // Set the input watermark such that both timers are due. + timerInternalsFactory.setInputWatermark(new Instant(30)); + + // List to collect fired timers. + final List> firedTimers = new ArrayList<>(); + + // Invoke the consolidated timer firing logic. + timerInternalsFactory.fireReadyTimers(timer -> firedTimers.add(timer)); + + // Verify that both timers have been fired in order. + assertEquals("Expected two fired timers", 2, firedTimers.size()); + assertEquals("First fired timer should be timer1", timer1, firedTimers.get(0).getTimerData()); + assertEquals("Second fired timer should be timer2", timer2, firedTimers.get(1).getTimerData()); + + // Also, the event time buffer should be empty after firing. + assertTrue( + "Event time buffer should be empty", timerInternalsFactory.getEventTimeBuffer().isEmpty()); + + store.close(); + } }