Skip to content

[GSoC 2026] Kafka Streams runner — Redistribute translator + ExecutableStage type-agnostic edge #38840

@junaiddshaukat

Description

@junaiddshaukat

Tracking issue: #18479
Depends on: #38743 / #38764 (ExecutableStage / stateless ParDo translator, merged).

Summary

Direct follow-up to #38764, agreed with @je-ik on Slack:

  1. Implement Redistribute (arbitrarily) as a runner-native passthrough so the GreedyPipelineFuser splits stages at the Redistribute boundary, enabling a chained ExecutableStage end-to-end test.
  2. Drop the byte[] type bound on ExecutableStageProcessor / KStreamsPayload at the processor edge so the runtime value type isn't silently corrupted for non-byte[] outputs (e.g. Integer from MapElements).
  3. Add a chained Impulse -> MapElements<Integer> -> Redistribute.arbitrarily() -> ParDo(record) test under the EMBEDDED environment to prove non-byte[] flow stage-to-stage.

Scope

  • RedistributeTranslator registering beam:transform:redistribute_arbitrarily:v1 as a passthrough (no GBK, no state — single-instance topology has no actual redistribution to do).
  • Wire TrivialNativeTransformExpander.forKnownUrns(pipeline, knownUrns()) into prepareForTranslation so the fuser respects runner-native URNs (matches Spark/Flink pattern).
  • Type generalization on the ExecutableStageProcessor edge.
  • High-level chained test using SharedTestCollector<Integer> to assert side-effect arrival.

Out of scope

  • Redistribute.byKey() URN (rehashing semantics; punt to GBK sub-issue).
  • WatermarkManager (separate sub-issue).

cc @je-ik

Metadata

Metadata

Labels

No labels
No labels

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions