Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -843,8 +843,17 @@ public InputT element(DoFn<InputT, OutputT> doFn) {
}

@Override
public Object sideInput(String tagId) {
throw new UnsupportedOperationException("SideInput parameters are not supported.");
public @Nullable Object sideInput(String tagId) {
PCollectionView<?> view =
checkStateNotNull(sideInputMapping.get(tagId), "Side input tag %s not found", tagId);
return sideInput(view);
}

@Override
public <T> T sideInput(PCollectionView<T> view) {
checkNotNull(view, "View passed to sideInput cannot be null");
return SimpleDoFnRunner.this.sideInput(
view, view.getWindowMappingFn().getSideInputWindow(window()));
}

@Override
Expand Down Expand Up @@ -1147,8 +1156,17 @@ public InputT element(DoFn<InputT, OutputT> doFn) {
}

@Override
public Object sideInput(String tagId) {
throw new UnsupportedOperationException("SideInput parameters are not supported.");
public @Nullable Object sideInput(String tagId) {
PCollectionView<?> view =
checkStateNotNull(sideInputMapping.get(tagId), "Side input tag %s not found", tagId);
return sideInput(view);
}

@Override
public <T> T sideInput(PCollectionView<T> view) {
checkNotNull(view, "View passed to sideInput cannot be null");
return SimpleDoFnRunner.this.sideInput(
view, view.getWindowMappingFn().getSideInputWindow(window()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ public TransformResult<InputT> finishBundle() {
} else {
resultBuilder = StepTransformResult.withoutHold(transform);
}

return resultBuilder
.addOutput(outputManager.bundles.values())
.withTimerUpdate(stepContext.getTimerUpdate())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
Expand Down Expand Up @@ -68,17 +67,6 @@ public DoFnRunner<InputT, OutputT> createRunner(
windowingStrategy,
doFnSchemaInformation,
sideInputMapping);
boolean hasStreamingSideInput =
options.as(StreamingOptions.class).isStreaming() && !sideInputReader.isEmpty();
if (hasStreamingSideInput) {
return new StreamingSideInputDoFnRunner<>(
fnRunner,
new StreamingSideInputFetcher<>(
sideInputViews,
inputCoder,
windowingStrategy,
(StreamingModeExecutionContext.StreamingModeStepContext) userStepContext));
}
return fnRunner;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.StateInternals;
Expand All @@ -43,6 +44,7 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
Expand All @@ -58,6 +60,7 @@
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
Expand All @@ -76,7 +79,7 @@
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class SimpleParDoFn<InputT, OutputT> implements ParDoFn {
public class SimpleParDoFn<InputT, OutputT, W extends BoundedWindow> implements ParDoFn {

// TODO: Remove once Distributions has shipped.
@VisibleForTesting
Expand Down Expand Up @@ -112,6 +115,8 @@ public class SimpleParDoFn<InputT, OutputT> implements ParDoFn {
// GroupAlsoByWindowViaWindowSetDoFn
private @Nullable DoFnSignature fnSignature;

private @Nullable StreamingSideInputFetcher<InputT, W> sideInputFetcher;

/** Creates a {@link SimpleParDoFn} using basic information about the step being executed. */
SimpleParDoFn(
PipelineOptions options,
Expand Down Expand Up @@ -317,8 +322,38 @@ public <TagT> void output(TupleTag<TagT> tag, WindowedValue<TagT> output) {
outputManager,
doFnSchemaInformation,
sideInputMapping);
if (hasStreamingSideInput) {
sideInputFetcher =
new StreamingSideInputFetcher<InputT, W>(
fnInfo.getSideInputViews(),
fnInfo.getInputCoder(),
(WindowingStrategy<?, W>) fnInfo.getWindowingStrategy(),
(StreamingModeExecutionContext.StreamingModeStepContext) userStepContext);
}

fnRunner.startBundle();
if (sideInputFetcher != null) {
Set<W> readyWindows = sideInputFetcher.getReadyWindows();
Iterable<BagState<WindowedValue<InputT>>> elementsBags =
sideInputFetcher.prefetchElements(readyWindows);
for (BagState<WindowedValue<InputT>> elementsBag : elementsBags) {
Iterable<WindowedValue<InputT>> elements = elementsBag.read();
for (WindowedValue<InputT> elem : elements) {
fnRunner.processElement(elem);
}
elementsBag.clear();

boolean hasState = fnSignature != null && !fnSignature.stateDeclarations().isEmpty();
if (hasState) {
// These elements are now processed. Register cleanup timers for all the unblocked
// windows.
registerStateCleanup(
(WindowingStrategy<?, W>) getDoFnInfo().getWindowingStrategy(),
(Collection<W>) readyWindows);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The call to registerStateCleanup is located inside the loop that iterates over elementsBags. Since it is passed the entire readyWindows collection, it will redundantly register cleanup timers for all unblocked windows in every iteration of the loop. This should be moved outside the loop to improve performance, especially when multiple windows are unblocked simultaneously.

}
sideInputFetcher.releaseBlockedWindows(readyWindows);
}
}

@Override
Expand All @@ -334,14 +369,30 @@ public void processElement(Object untypedElem) throws Exception {

WindowedValue<InputT> elem = (WindowedValue<InputT>) untypedElem;

if (fnSignature != null && fnSignature.stateDeclarations().size() > 0) {
registerStateCleanup(
(WindowingStrategy<?, BoundedWindow>) getDoFnInfo().getWindowingStrategy(),
(Collection<BoundedWindow>) elem.getWindows());
}

boolean hasState = fnSignature != null && !fnSignature.stateDeclarations().isEmpty();
outputsPerElementTracker.onProcessElement();
fnRunner.processElement(elem);
if (sideInputFetcher != null) {
for (WindowedValue<InputT> exploded : elem.explodeWindows()) {
if (!sideInputFetcher.storeIfBlocked(exploded)) {
fnRunner.processElement(exploded);
if (hasState) {
registerStateCleanup(
(WindowingStrategy<?, W>) getDoFnInfo().getWindowingStrategy(),
(Collection<W>) exploded.getWindows());
}
}
// If the element was blocked, don't register a cleanup timer. The timer will be registered
// when
// the window is unblocked.
}
} else {
fnRunner.processElement(elem);
if (hasState) {
registerStateCleanup(
(WindowingStrategy<?, W>) getDoFnInfo().getWindowingStrategy(),
(Collection<W>) elem.getWindows());
}
}
outputsPerElementTracker.onProcessElementSuccess();
}

Expand All @@ -367,6 +418,14 @@ private void processUserTimer(TimerData timer) throws Exception {
if (fnSignature.timerDeclarations().containsKey(timer.getTimerId())
|| fnSignature.timerFamilyDeclarations().containsKey(timer.getTimerFamilyId())) {
BoundedWindow window = ((WindowNamespace) timer.getNamespace()).getWindow();
if (sideInputFetcher != null) {
// We must call this to ensure the side-input is cached for the timer. However since a user
// timer can only
// be set via element processing (or another timer) in the same window, the window should be
// unblocked once
// we get here.
Preconditions.checkState(!sideInputFetcher.storeIfBlocked(timer));
}
fnRunner.onTimer(
timer.getTimerId(),
timer.getTimerFamilyId(),
Expand All @@ -380,7 +439,6 @@ private void processUserTimer(TimerData timer) throws Exception {
}

private void processSystemTimer(TimerData timer) throws Exception {

// Timer owned by this class, for cleaning up state in expired windows
if (timer.getTimerId().equals(CLEANUP_TIMER_ID)) {
checkState(
Expand All @@ -396,6 +454,13 @@ private void processSystemTimer(TimerData timer) throws Exception {
WindowNamespace.class.getSimpleName(),
timer);

if (sideInputFetcher != null) {
// We must call this to ensure the side-input is cached for onWindowExpiration. Since we
// don't set cleanup
// timers until we actually call processElement, the window must be unblocked here.
Preconditions.checkState(!sideInputFetcher.storeIfBlocked(timer));
}

BoundedWindow window = ((WindowNamespace) timer.getNamespace()).getWindow();
Instant targetTime = earliestAllowableCleanupTime(window, fnInfo.getWindowingStrategy());

Expand Down Expand Up @@ -436,6 +501,9 @@ private void processSystemTimer(TimerData timer) throws Exception {
public void finishBundle() throws Exception {
if (fnRunner != null) {
fnRunner.finishBundle();
if (sideInputFetcher != null) {
sideInputFetcher.persist();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The sideInputFetcher field should be cleared at the end of the bundle. Since SimpleParDoFn instances are typically reused across bundles in the Dataflow worker, failing to null out this field can lead to stale state being carried over to subsequent bundles or potential memory leaks. Clearing it here ensures that each bundle starts with a clean state.

Suggested change
if (sideInputFetcher != null) {
sideInputFetcher.persist();
}
if (sideInputFetcher != null) {
sideInputFetcher.persist();
sideInputFetcher = null;
}

doFnInstanceManager.complete(fnInfo);
fnRunner = null;
fnInfo = null;
Expand Down Expand Up @@ -490,7 +558,7 @@ private void processTimers(
}
}

private <W extends BoundedWindow> void registerStateCleanup(
private void registerStateCleanup(
WindowingStrategy<?, W> windowingStrategy, Collection<W> windowsToCleanup) {
Coder<W> windowCoder = windowingStrategy.getWindowFn().windowCoder();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ public void startBundle() {
Iterable<BagState<WindowedValue<InputT>>> elementsBags =
sideInputFetcher.prefetchElements(readyWindows);

// Run the DoFn code now that all side inputs are ready.
for (BagState<WindowedValue<InputT>> elementsBag : elementsBags) {
Iterable<WindowedValue<InputT>> elements = elementsBag.read();
for (WindowedValue<InputT> elem : elements) {
simpleDoFnRunner.processElement(elem);
}
elementsBag.clear();
}

sideInputFetcher.releaseBlockedWindows(readyWindows);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -83,7 +84,6 @@ public StreamingSideInputFetcher(
this.stepContext = stepContext;

this.mainWindowCoder = windowingStrategy.getWindowFn().windowCoder();

this.sideInputViews = new HashMap<>();
for (PCollectionView<?> view : views) {
sideInputViews.put(view.getTagInternal().getId(), view);
Expand Down Expand Up @@ -188,11 +188,7 @@ public Iterable<BagState<TimerData>> prefetchTimers(Iterable<W> readyWindows) {
return timers;
}

/** Compute the set of side inputs that are not yet ready for the given main input window. */
public boolean storeIfBlocked(WindowedValue<InputT> elem) {
@SuppressWarnings("unchecked")
W window = (W) Iterables.getOnlyElement(elem.getWindows());

private Set<Windmill.GlobalDataRequest> checkIfBlocked(W window) {
Set<Windmill.GlobalDataRequest> blocked = blockedMap().get(window);
if (blocked == null) {
for (PCollectionView<?> view : sideInputViews.values()) {
Expand All @@ -205,7 +201,18 @@ public boolean storeIfBlocked(WindowedValue<InputT> elem) {
}
}
}
if (blocked != null) {
return blocked == null ? Collections.emptySet() : blocked;
}

/** Compute the set of side inputs that are not yet ready for the given main input window. */
public boolean storeIfBlocked(WindowedValue<InputT> elem) {
@SuppressWarnings("unchecked")
W window = (W) Iterables.getOnlyElement(elem.getWindows());

Set<Windmill.GlobalDataRequest> blocked = checkIfBlocked(window);
blockedMap().get(window);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The call to blockedMap().get(window) on line 213 is a side-effect-free operation whose result is ignored. It appears to be a leftover from refactoring and should be removed to keep the code clean.

Suggested change
Set<Windmill.GlobalDataRequest> blocked = checkIfBlocked(window);
blockedMap().get(window);
Set<Windmill.GlobalDataRequest> blocked = checkIfBlocked(window);


if (!blocked.isEmpty()) {
elementBag(window).add(elem);
watermarkHold(window).add(elem.getTimestamp());
stepContext.addBlockingSideInputs(blocked);
Expand All @@ -223,17 +230,12 @@ public boolean storeIfBlocked(TimerData timer) {
@SuppressWarnings("unchecked")
WindowNamespace<W> windowNamespace = (WindowNamespace<W>) timer.getNamespace();
W window = windowNamespace.getWindow();

boolean blocked = false;
for (PCollectionView<?> view : sideInputViews.values()) {
if (!stepContext.issueSideInputFetch(view, window, SideInputState.UNKNOWN)) {
blocked = true;
}
}
if (blocked) {
Set<Windmill.GlobalDataRequest> blocked = checkIfBlocked(window);
if (!blocked.isEmpty()) {
timerBag(window).add(timer);
return true;
}
return blocked;
return false;
}

public void persist() {
Expand Down
Loading
Loading