Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,6 @@ dependencies {
testImplementation library.java.google_cloud_dataflow_java_proto_library_all
testImplementation library.java.jackson_dataformat_yaml
testImplementation library.java.mockito_inline
testImplementation project(":sdks:java:io:kafka")
testImplementation library.java.kafka_clients
validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
validatesRunner project(path: project.path, configuration: "testRuntimeMigration")
validatesRunner library.java.hamcrest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -659,10 +659,7 @@ private List<PTransformOverride> getOverrides(boolean streaming) {

try {
overridesBuilder.add(KafkaIO.Read.KAFKA_READ_OVERRIDE);
overridesBuilder.add(
PTransformOverride.of(
KafkaReadWithRedistributeOverride.matcher(),
new KafkaReadWithRedistributeOverride.Factory()));
overridesBuilder.add(KafkaIO.Read.KAFKA_REDISTRIBUTE_OVERRIDE);
} catch (NoClassDefFoundError e) {
// Do nothing. io-kafka is an optional dependency of runners-google-cloud-dataflow-java
// and only needed when KafkaIO is used in the pipeline.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1817,6 +1817,13 @@ private boolean runnerPrefersLegacyRead(PipelineOptions options) {
return true;
}

/** A {@link PTransformOverride} for runners to override redistributed Kafka Read transforms. */
@Internal
public static final PTransformOverride KAFKA_REDISTRIBUTE_OVERRIDE =
PTransformOverride.of(
KafkaReadWithRedistributeOverride.matcher(),
new KafkaReadWithRedistributeOverride.Factory<>());

/**
* A {@link PTransformOverride} for runners to swap {@link ReadFromKafkaViaSDF} to legacy Kafka
* read if runners doesn't have a good support on executing unbounded Splittable DoFn.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow;
package org.apache.beam.sdk.io.kafka;

import java.util.Map;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformMatcher;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
Expand Down Expand Up @@ -48,8 +46,8 @@ public boolean matches(AppliedPTransform<?, ?, ?> application) {
}

/**
* {@link PTransformOverrideFactory} for {@link KafkaIO.Read} that enables {@code
* withOffsetDeduplication} when {@code withRedistribute} is enabled.
* {@link PTransformOverrideFactory} for {@link org.apache.beam.sdk.io.kafka.KafkaIO.Read} that
* enables {@code withOffsetDeduplication} when {@code withRedistribute} is enabled.
*/
static class Factory<K, V>
implements PTransformOverrideFactory<
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow;
package org.apache.beam.sdk.io.kafka;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.nullValue;
Expand All @@ -25,7 +25,6 @@
import java.io.Serializable;
import java.util.Collections;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.testing.TestPipeline;
Expand Down Expand Up @@ -129,5 +128,6 @@ public void leaveCompositeTransform(Node node) {
}
};
p.traverseTopologically(visitor);
p.enableAbandonedNodeEnforcement(false);
}
}
Loading