[Drain] Support extensible element metadata propagation in ReduceFnRunner#38230
[Drain] Support extensible element metadata propagation in ReduceFnRunner#38230stankiewicz merged 4 commits intoapache:masterfrom
Conversation
|
R: @kennknowles |
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
b1dfea2 to
036803e
Compare
|
@kennknowles this is generally finished, the task :runners:core-java:analyzeClassesDependencies is failing because of an undeclared dependency on beam.model:fn-execution. |
There was a problem hiding this comment.
Code Review
This pull request introduces a CombinedMetadata class and associated logic to support stateful propagation of pipeline metadata within ReduceFnRunner. The changes include new combiner and coder implementations, as well as updates to state handling and tests. Feedback identifies a performance regression due to per-element state access in processElement, a bug in onTrigger where metadata is lost when windows remain open, and an optimization opportunity for the CombinedMetadata coder to reduce serialization overhead.
36e54b3 to
cb00495
Compare
|
@kennknowles this is ready. |
There was a problem hiding this comment.
Code Review
This pull request introduces a CombinedMetadata class and supporting infrastructure to propagate and aggregate element-level metadata, such as causedByDrain, across pipeline operations like GroupByKey. The changes include updates to ReduceFnRunner for stateful metadata management, new combiners, and expanded test coverage for metadata propagation. Feedback focuses on optimizing performance and efficiency by switching from BagState to CombiningState for metadata aggregation, simplifying the coder implementation using delimited proto encoding, and avoiding redundant state writes for default metadata values.
well, I will go through those comments, and probably improve state.. |
7db2154 to
39263ba
Compare
There was a problem hiding this comment.
Code Review
This pull request introduces CombinedMetadata to encapsulate and propagate pipeline metadata, such as CausedByDrain, across the Beam pipeline. It includes a new CombinedMetadataCombiner for merging metadata in ReduceFnRunner and updates various runners and tests to support this propagation. Feedback was provided regarding the logic for persisting trigger metadata in state, specifically concerning future extensibility with non-idempotent metadata fields.
39263ba to
79f7db9
Compare
There was a problem hiding this comment.
Code Review
This pull request introduces a framework for propagating and combining element metadata, specifically the CausedByDrain status, across pipeline operations such as GroupByKey and windowing. Key additions include the CombinedMetadata class, a CombinedMetadataCombiner for aggregating metadata during element merging, and updates to ReduceFnRunner to manage metadata within the state. Furthermore, the Dataflow worker's WindmillSink was updated to support metadata propagation via protocol buffers, and new tests were added to verify metadata persistence across shuffles and windowing boundaries. I have no feedback to provide as there were no review comments to evaluate.
|
new state can and will increase data processed by few percent. |
kennknowles
left a comment
There was a problem hiding this comment.
Couple of comments, but nothing that we need to block on. The encoding is the only thing that will be very sensitive to changes.
| ? BeamFnApi.Elements.DrainMode.Enum.DRAINING | ||
| : BeamFnApi.Elements.DrainMode.Enum.NOT_DRAINING) | ||
| .build(); | ||
| proto.writeDelimitedTo(outStream); |
There was a problem hiding this comment.
Is this implemention the same as using ProtoCoder.of(ElementMetadata.class) ?
There was a problem hiding this comment.
it's almost the same, but as discussed, don't want to bring extension dependency.
| import org.apache.beam.sdk.values.CausedByDrain; | ||
|
|
||
| /** Combiner for CombinedMetadata. */ | ||
| class CombinedMetadataCombiner |
There was a problem hiding this comment.
Pretty sure this is the same as BinaryCombineFn default implementations. But since I hate inheritance, I'm happy to have it spelled out here.
|
before merging I will do job upgrade test. |
|
https://docs.google.com/document/d/1yOFbaH5DwcKV96VRYrGAyow9lzqpfAoDs3yvj8_AxRI/edit?usp=sharing done test for Dataflow v1, v1 with element metadata experiment, v2. |
Description
This PR refactors the metadata propagation logic in
ReduceFnRunnerto support extensible metadata.Previously, metadata tracking (specifically
causedByDrain) was not stored in state at all during execution inReduceFnRunner, which caused metatada loss failures when firing timers or merging panes. This PR fixes that by introducing a unified state map for element metadata.To make it easier to add future payloads (such as OpenTelemetry context maps or CDC insert/update markers) without modifying method signatures, this change groups targeted fields into a unified container and offloads combination rules to a clean aggregator class.
Key Changes
METADATA_TAG(persistingCombinedMetadata) inReduceFnRunner, ensuring metadata is no longer lost during grouping.CombinedMetadata(guided by@AutoValue) that groups element metadata together.CombinedMetadataCombiner.Fixes: #36884
Open question
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.