Skip to content

[Drain] Support extensible element metadata propagation in ReduceFnRunner#38230

Merged
stankiewicz merged 4 commits intoapache:masterfrom
stankiewicz:drain_combiner
May 6, 2026
Merged

[Drain] Support extensible element metadata propagation in ReduceFnRunner#38230
stankiewicz merged 4 commits intoapache:masterfrom
stankiewicz:drain_combiner

Conversation

@stankiewicz
Copy link
Copy Markdown
Contributor

@stankiewicz stankiewicz commented Apr 17, 2026

Description

This PR refactors the metadata propagation logic in ReduceFnRunner to support extensible metadata.

Previously, metadata tracking (specifically causedByDrain) was not stored in state at all during execution in ReduceFnRunner, which caused metatada loss failures when firing timers or merging panes. This PR fixes that by introducing a unified state map for element metadata.

To make it easier to add future payloads (such as OpenTelemetry context maps or CDC insert/update markers) without modifying method signatures, this change groups targeted fields into a unified container and offloads combination rules to a clean aggregator class.

Key Changes

  • Runner Fix: Added stateful tracking using METADATA_TAG (persisting CombinedMetadata) in ReduceFnRunner, ensuring metadata is no longer lost during grouping.
  • Extensible Container: Created CombinedMetadata (guided by @AutoValue) that groups element metadata together.
  • Metadata Combiners: Encapsulated combination logic within CombinedMetadataCombiner.

Fixes: #36884

Open question

  • is it breaking change?

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@stankiewicz
Copy link
Copy Markdown
Contributor Author

R: @kennknowles

@github-actions
Copy link
Copy Markdown
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

@stankiewicz stankiewicz changed the title [Refactor] Support extensible element metadata propagation in ReduceFnRunner [Drain] Support extensible element metadata propagation in ReduceFnRunner Apr 17, 2026
@apache apache deleted a comment from gemini-code-assist Bot Apr 17, 2026
@kennknowles kennknowles self-requested a review April 20, 2026 14:08
@stankiewicz
Copy link
Copy Markdown
Contributor Author

@kennknowles this is generally finished, the task :runners:core-java:analyzeClassesDependencies is failing because of an undeclared dependency on beam.model:fn-execution.
I need to add the missing dependency in runners/core-java/build.gradle.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a CombinedMetadata class and associated logic to support stateful propagation of pipeline metadata within ReduceFnRunner. The changes include new combiner and coder implementations, as well as updates to state handling and tests. Feedback identifies a performance regression due to per-element state access in processElement, a bug in onTrigger where metadata is lost when windows remain open, and an optimization opportunity for the CombinedMetadata coder to reduce serialization overhead.

Comment thread runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java Outdated
Comment thread runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java Outdated
@stankiewicz stankiewicz marked this pull request as draft April 22, 2026 18:12
@github-actions github-actions Bot added the flink label Apr 24, 2026
@stankiewicz stankiewicz marked this pull request as ready for review April 24, 2026 16:31
@stankiewicz stankiewicz force-pushed the drain_combiner branch 3 times, most recently from 36e54b3 to cb00495 Compare May 4, 2026 12:48
@stankiewicz
Copy link
Copy Markdown
Contributor Author

@kennknowles this is ready.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a CombinedMetadata class and supporting infrastructure to propagate and aggregate element-level metadata, such as causedByDrain, across pipeline operations like GroupByKey. The changes include updates to ReduceFnRunner for stateful metadata management, new combiners, and expanded test coverage for metadata propagation. Feedback focuses on optimizing performance and efficiency by switching from BagState to CombiningState for metadata aggregation, simplifying the coder implementation using delimited proto encoding, and avoiding redundant state writes for default metadata values.

Comment thread runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java Outdated
Comment thread runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java Outdated
Comment thread runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java Outdated
Comment thread runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java Outdated
@stankiewicz
Copy link
Copy Markdown
Contributor Author

@kennknowles this is ready.

well, I will go through those comments, and probably improve state..

@stankiewicz stankiewicz force-pushed the drain_combiner branch 3 times, most recently from 7db2154 to 39263ba Compare May 5, 2026 11:08
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces CombinedMetadata to encapsulate and propagate pipeline metadata, such as CausedByDrain, across the Beam pipeline. It includes a new CombinedMetadataCombiner for merging metadata in ReduceFnRunner and updates various runners and tests to support this propagation. Feedback was provided regarding the logic for persisting trigger metadata in state, specifically concerning future extensibility with non-idempotent metadata fields.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a framework for propagating and combining element metadata, specifically the CausedByDrain status, across pipeline operations such as GroupByKey and windowing. Key additions include the CombinedMetadata class, a CombinedMetadataCombiner for aggregating metadata during element merging, and updates to ReduceFnRunner to manage metadata within the state. Furthermore, the Dataflow worker's WindmillSink was updated to support metadata propagation via protocol buffers, and new tests were added to verify metadata persistence across shuffles and windowing boundaries. I have no feedback to provide as there were no review comments to evaluate.

@stankiewicz
Copy link
Copy Markdown
Contributor Author

new state can and will increase data processed by few percent.
this pipeline had 2% data processed increased.

public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
    Pipeline p = Pipeline.create(options);

    p
        // Read taxi ride data
        .apply("ReadFromPubSub", PubsubIO.readStrings().fromTopic(TAXI_RIDES_TOPIC))
        // Convert JSON strings to Beam Rows
        .apply("JsonToRow", JsonToRow.withSchema(TAXI_RIDE_INFO_SCHEMA))
        // Introduce withKeys transform with random value from 0 to 10
        .apply(
            "WithRandomKey",
            WithKeys.of(
                    new SerializableFunction<Row, Integer>() {
                      @Override
                      public Integer apply(Row input) {
                        return ThreadLocalRandom.current().nextInt(11);
                      }
                    })
                .withKeyType(TypeDescriptors.integers()))
        // Setup sliding window, with 1 minute period, 5 minute length
        .apply(
            "SlidingWindow",
            Window.<KV<Integer, Row>>into(
                SlidingWindows.of(Duration.standardMinutes(5)).every(Duration.standardMinutes(1))))
        .apply(
            "FilterRows",
            ParDo.of(
                new DoFn<KV<Integer, Row>, KV<Integer, Row>>() {
                  @ProcessElement
                  public void processElement(
                      @Element KV<Integer, Row> kv, OutputReceiver<KV<Integer, Row>> receiver) {
                    receiver.builder(kv).setCausedByDrain(CausedByDrain.CAUSED_BY_DRAIN).output();
                  }
                }))
        // GroupByKey
        .apply("GroupByKey", GroupByKey.<Integer, Row>create())
        // ParDo that takes KV<Integer, Iterable<Row>>
        .apply(
            "ProcessGroupedRows",
            ParDo.of(
                new DoFn<KV<Integer, Iterable<Row>>, Void>() {
                  @ProcessElement
                  public void processElement(
                      @Element KV<Integer, Iterable<Row>> kv, CausedByDrain drain) {
                    Integer key = kv.getKey();
                    Iterable<Row> rows = kv.getValue();

                    int passengerSum = 0;
                    for (Row row : rows) {
                      Integer passengers = row.getInt32("passenger_count");
                      if (passengers != null) {
                        passengerSum += passengers;
                      }
                    }
                    System.out.println(
                        "Partition "
                            + key
                            + " had a sum of "
                            + passengerSum
                            + " passengers. Aggregation was drained? - "
                            + drain);
                  }
                }));

    p.run();
  }

Copy link
Copy Markdown
Member

@kennknowles kennknowles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple of comments, but nothing that we need to block on. The encoding is the only thing that will be very sensitive to changes.

? BeamFnApi.Elements.DrainMode.Enum.DRAINING
: BeamFnApi.Elements.DrainMode.Enum.NOT_DRAINING)
.build();
proto.writeDelimitedTo(outStream);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this implemention the same as using ProtoCoder.of(ElementMetadata.class) ?

public static <T extends Message> ProtoCoder<T> of(Class<T> protoMessageClass) {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's almost the same, but as discussed, don't want to bring extension dependency.

import org.apache.beam.sdk.values.CausedByDrain;

/** Combiner for CombinedMetadata. */
class CombinedMetadataCombiner
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty sure this is the same as BinaryCombineFn default implementations. But since I hate inheritance, I'm happy to have it spelled out here.

@stankiewicz
Copy link
Copy Markdown
Contributor Author

before merging I will do job upgrade test.

@stankiewicz
Copy link
Copy Markdown
Contributor Author

https://docs.google.com/document/d/1yOFbaH5DwcKV96VRYrGAyow9lzqpfAoDs3yvj8_AxRI/edit?usp=sharing

done test for Dataflow v1, v1 with element metadata experiment, v2.
Baseline revision was updated to revision including the change, update was successful. All versions (v1, v2) were combining metadata and v1 with experiment was able to aggregate drain as it has received it correctly from streaming engine

@stankiewicz stankiewicz merged commit da391d0 into apache:master May 6, 2026
34 of 35 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature Request]: Propagate drain mode up to reduceFn

2 participants