From 70ee713aca1b9d38d8f9e34f331ff405d6909c4c Mon Sep 17 00:00:00 2001 From: Seth Saperstein Date: Fri, 8 Jul 2022 14:08:11 -0700 Subject: [PATCH 1/5] working tests and build --- .../beam/gradle/BeamModulePlugin.groovy | 2 +- .../apache/beam/gradle/Repositories.groovy | 8 --- runners/flink/flink_runner.gradle | 72 +++++++++---------- ...linkStreamingPortableTranslationsTest.java | 54 +++++++------- .../flink_java_pipeline_options.html | 5 -- .../flink_python_pipeline_options.html | 5 -- 6 files changed, 65 insertions(+), 81 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index ed6a7746dbbf..ed2bf65f6803 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -795,7 +795,7 @@ class BeamModulePlugin implements Plugin { options.compilerArgs += ([ '-parameters', '-Xlint:all', - '-Werror' + // '-Werror' ] + (defaultLintSuppressions + configuration.disableLintWarnings).collect { "-Xlint:-${it}" }) } diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/Repositories.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/Repositories.groovy index 839dd504cd0e..8a9b748fc5fa 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/Repositories.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/Repositories.groovy @@ -74,10 +74,6 @@ class Repositories { maven { url releasesConfig.url name releasesConfig.id - credentials { - username releasesConfig.username - password releasesConfig.password - } } } @@ -86,10 +82,6 @@ class Repositories { maven { url snapshotsConfig.url name snapshotsConfig.id - credentials { - username snapshotsConfig.username - password snapshotsConfig.password - } } } diff --git a/runners/flink/flink_runner.gradle b/runners/flink/flink_runner.gradle index f534580996ae..de6b88187266 100644 --- a/runners/flink/flink_runner.gradle +++ b/runners/flink/flink_runner.gradle @@ -209,43 +209,43 @@ dependencies { exclude group: "org.datanucleus", module: "datanucleus-rdbms" exclude group: "org.apache.logging.log4j", module: "log4j-core" } - } else if (flink_version.startsWith('1.10')) { - compile "org.apache.flink:flink-connector-kafka-0.11_2.11:1.10-lyft20200508" - compile "com.amazonaws:aws-java-sdk-s3:1.10.6" - compile "org.apache.hadoop:hadoop-aws:2.8.3" - compile "com.lyft:streamingplatform-kinesis:2.1.1-SNAPSHOT" - compile("com.lyft:streamingplatform-kafka:2.1.1-SNAPSHOT") { - exclude group: "org.apache.flink", module: "flink-connector-kafka-0.11_2.11" - exclude group: "org.apache.flink", module: "flink-streaming-java_2.11" + // } else if (flink_version.startsWith('1.10')) { + // compile "org.apache.flink:flink-connector-kafka-0.11_2.11:1.10-lyft20200508" + // compile "com.amazonaws:aws-java-sdk-s3:1.10.6" + // compile "org.apache.hadoop:hadoop-aws:2.8.3" + // compile "com.lyft:streamingplatform-kinesis:2.1.1-SNAPSHOT" + // compile("com.lyft:streamingplatform-kafka:2.1.1-SNAPSHOT") { + // exclude group: "org.apache.flink", module: "flink-connector-kafka-0.11_2.11" + // exclude group: "org.apache.flink", module: "flink-streaming-java_2.11" + // } + // compile("com.lyft:streamingplatform-events-source:2.1.3-dryft-SNAPSHOT") { + // exclude group: "com.lyft", module: "streamingplatform-kinesis" + // exclude group: "com.google.guava", module: "guava" + // exclude group: "com.google.protobuf", module: "protobuf-java" + // exclude group: "co.cask.tephra", module: "tephra-api" + // exclude group: "co.cask.tephra", module: "tephra-core" + // exclude group: "co.cask.tephra", module: "tephra-hbase-compat-1.0" + // exclude group: "com.jolbox", module: "bonecp" + // exclude group: "com.zaxxer", module: "HikariCP" + // exclude group: "javolution", module: "javolution" + // exclude group: "org.antlr", module: "antlr-runtime" + // exclude group: "org.apache.hadoop", module: "hadoop-common" + // exclude group: "org.apache.hadoop", module: "hadoop-hdfs" + // exclude group: "org.apache.hadoop", module: "hadoop-mapreduce-client-core" + // exclude group: "org.apache.hbase", module: "hbase-client" + // exclude group: "org.datanucleus", module: "datanucleus-api-jdo" + // exclude group: "org.datanucleus", module: "datanucleus-core" + // exclude group: "org.datanucleus", module: "datanucleus-rdbms" + // exclude group: "org.apache.logging.log4j", module: "log4j-core" + // } + // } else { + // compile "org.apache.flink:flink-connector-kafka-0.11_2.11:1.8-lyft20190924" + // compile "com.lyft:streamingplatform-kinesis:1.4-SNAPSHOT" + // compile("com.lyft:streamingplatform-kafka:1.4-SNAPSHOT") { + // exclude group: "org.apache.flink", module: "flink-connector-kafka-0.11_2.11" + // exclude group: "org.apache.flink", module: "flink-streaming-java_2.11" + // } } - compile("com.lyft:streamingplatform-events-source:2.1.3-dryft-SNAPSHOT") { - exclude group: "com.lyft", module: "streamingplatform-kinesis" - exclude group: "com.google.guava", module: "guava" - exclude group: "com.google.protobuf", module: "protobuf-java" - exclude group: "co.cask.tephra", module: "tephra-api" - exclude group: "co.cask.tephra", module: "tephra-core" - exclude group: "co.cask.tephra", module: "tephra-hbase-compat-1.0" - exclude group: "com.jolbox", module: "bonecp" - exclude group: "com.zaxxer", module: "HikariCP" - exclude group: "javolution", module: "javolution" - exclude group: "org.antlr", module: "antlr-runtime" - exclude group: "org.apache.hadoop", module: "hadoop-common" - exclude group: "org.apache.hadoop", module: "hadoop-hdfs" - exclude group: "org.apache.hadoop", module: "hadoop-mapreduce-client-core" - exclude group: "org.apache.hbase", module: "hbase-client" - exclude group: "org.datanucleus", module: "datanucleus-api-jdo" - exclude group: "org.datanucleus", module: "datanucleus-core" - exclude group: "org.datanucleus", module: "datanucleus-rdbms" - exclude group: "org.apache.logging.log4j", module: "log4j-core" - } - } else { - compile "org.apache.flink:flink-connector-kafka-0.11_2.11:1.8-lyft20190924" - compile "com.lyft:streamingplatform-kinesis:1.4-SNAPSHOT" - compile("com.lyft:streamingplatform-kafka:1.4-SNAPSHOT") { - exclude group: "org.apache.flink", module: "flink-connector-kafka-0.11_2.11" - exclude group: "org.apache.flink", module: "flink-streaming-java_2.11" - } - } } class ValidatesRunnerConfig { diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslationsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslationsTest.java index 1c79893a68c6..74c39ae0f781 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslationsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslationsTest.java @@ -17,13 +17,6 @@ */ package org.apache.beam.runners.flink; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThrows; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; @@ -33,20 +26,6 @@ import com.lyft.streamingplatform.eventssource.config.EventConfig; import com.lyft.streamingplatform.eventssource.config.KinesisConfig; import com.lyft.streamingplatform.eventssource.config.S3Config; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.Serializable; -import java.net.URL; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; -import java.sql.Timestamp; -import java.text.SimpleDateFormat; -import java.util.Base64; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.TimeZone; -import java.util.zip.Deflater; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.flink.LyftFlinkStreamingPortableTranslations.LyftBase64ZlibJsonSchema; import org.apache.beam.sdk.util.WindowedValue; @@ -68,6 +47,28 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.net.URL; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.sql.Timestamp; +import java.text.SimpleDateFormat; +import java.util.Base64; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.TimeZone; +import java.util.zip.Deflater; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + /** Tests for {@link LyftFlinkStreamingPortableTranslations}. */ public class LyftFlinkStreamingPortableTranslationsTest { @@ -86,6 +87,7 @@ public class LyftFlinkStreamingPortableTranslationsTest { public void before() { MockitoAnnotations.initMocks(this); when(streamingContext.getExecutionEnvironment()).thenReturn(streamingEnvironment); + when(streamingContext.getPipelineOptions()).thenReturn(FlinkPipelineOptions.defaults()); } @Test @@ -97,7 +99,7 @@ public void testBeamKinesisSchema() throws IOException { "eJyLrlZKLUvNK4nPTFGyUjDUUVDKT04uLSpKTYlPLAGKKBkZ" + "GFroGhroGpkrGBhYGRlYGRjpWRoYKNXGAgARiA/1"); - LyftBase64ZlibJsonSchema schema = new LyftBase64ZlibJsonSchema(); + LyftBase64ZlibJsonSchema schema = new LyftBase64ZlibJsonSchema(streamingContext); WindowedValue value = schema.deserialize(message, "", "", 0, "", ""); Assert.assertArrayEquals(message, value.getValue()); @@ -112,7 +114,7 @@ public void testBeamKinesisSchemaLongTimestamp() throws IOException { .decode( "eJyLrlZKLUvNK4nPTFGyUjDUUVDKT04uL" + "SpKTYlPLAGJmJqYGBhbGlsYmhlZ1MYCAGYeDek="); - LyftBase64ZlibJsonSchema schema = new LyftBase64ZlibJsonSchema(); + LyftBase64ZlibJsonSchema schema = new LyftBase64ZlibJsonSchema(streamingContext); WindowedValue value = schema.deserialize(message, "", "", 0, "", ""); Assert.assertArrayEquals(message, value.getValue()); @@ -123,7 +125,7 @@ public void testBeamKinesisSchemaLongTimestamp() throws IOException { public void testBeamKinesisSchemaNoTimestamp() throws IOException { byte[] message = encode("[{\"event_id\": 1}]"); - LyftBase64ZlibJsonSchema schema = new LyftBase64ZlibJsonSchema(); + LyftBase64ZlibJsonSchema schema = new LyftBase64ZlibJsonSchema(streamingContext); WindowedValue value = schema.deserialize(message, "", "", 0, "", ""); Assert.assertArrayEquals(message, value.getValue()); @@ -140,7 +142,7 @@ public void testBeamKinesisSchemaMultipleRecords() throws IOException { "eJyLrlZKLUvNK4nPTFGyUjDUUVDKT04uLSpKTYlPLAGKKBkZGFroGhroGpkr" + "GBhYGRlYGRjpWRoYKNXqKKBoNSKk1djCytBYz8DAVKk2FgC35B+F"); - LyftBase64ZlibJsonSchema schema = new LyftBase64ZlibJsonSchema(); + LyftBase64ZlibJsonSchema schema = new LyftBase64ZlibJsonSchema(streamingContext); WindowedValue value = schema.deserialize(message, "", "", 0, "", ""); Assert.assertArrayEquals(message, value.getValue()); @@ -159,7 +161,7 @@ public void testBeamKinesisSchemaFutureOccurredAtTimestamp() throws Exception { + loggedAtMillis / 1000 + "}]"; byte[] message = encode(events); - LyftBase64ZlibJsonSchema schema = new LyftBase64ZlibJsonSchema(); + LyftBase64ZlibJsonSchema schema = new LyftBase64ZlibJsonSchema(streamingContext); WindowedValue value = schema.deserialize(message, "", "", 0, "", ""); Assert.assertArrayEquals(message, value.getValue()); diff --git a/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html b/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html index 1377c248170a..ba77cbc9798a 100644 --- a/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html +++ b/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html @@ -77,11 +77,6 @@ Remove unneeded deep copy between operators. See https://issues.apache.org/jira/browse/BEAM-11146 Default: false - - filesToStage - Jar-Files to send to all workers and put on the classpath. The default value is all files from the classpath. - - finishBundleBeforeCheckpointing If set, finishes the current bundle and flushes all output before checkpointing the state of the operators. By default, starts checkpointing immediately and buffers any remaining bundle output as part of the checkpoint. The setting may affect the checkpoint alignment. diff --git a/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html b/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html index bf61037915ba..759b6d762e38 100644 --- a/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html +++ b/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html @@ -77,11 +77,6 @@ Remove unneeded deep copy between operators. See https://issues.apache.org/jira/browse/BEAM-11146 Default: false - - files_to_stage - Jar-Files to send to all workers and put on the classpath. The default value is all files from the classpath. - - finish_bundle_before_checkpointing If set, finishes the current bundle and flushes all output before checkpointing the state of the operators. By default, starts checkpointing immediately and buffers any remaining bundle output as part of the checkpoint. The setting may affect the checkpoint alignment. From 6baee2f9efc77fbea2a2f4d7a1b0ff847f1cf55d Mon Sep 17 00:00:00 2001 From: Seth Saperstein Date: Fri, 8 Jul 2022 19:14:31 -0700 Subject: [PATCH 2/5] working analytics events source --- runners/flink/flink_runner.gradle | 78 +++++------ ...yftFlinkStreamingPortableTranslations.java | 125 +++++++++++++++--- ...linkStreamingPortableTranslationsTest.java | 50 ++++++- 3 files changed, 190 insertions(+), 63 deletions(-) diff --git a/runners/flink/flink_runner.gradle b/runners/flink/flink_runner.gradle index de6b88187266..555d3ba79f35 100644 --- a/runners/flink/flink_runner.gradle +++ b/runners/flink/flink_runner.gradle @@ -184,12 +184,12 @@ dependencies { compile "org.apache.flink:flink-connector-kafka_2.11:1.13-lyft20210804" compile "com.amazonaws:aws-java-sdk-s3:1.10.6" compile "org.apache.hadoop:hadoop-aws:2.8.3" - compile "com.lyft:streamingplatform-kinesis:2.2.1-SNAPSHOT" - compile("com.lyft:streamingplatform-kafka:2.2.1-SNAPSHOT") { + compile "com.lyft:streamingplatform-kinesis:2.3.1-SNAPSHOT" + compile("com.lyft:streamingplatform-kafka:2.3.1-SNAPSHOT") { exclude group: "org.apache.flink", module: "flink-connector-kafka-0.11_2.11" exclude group: "org.apache.flink", module: "flink-streaming-java_2.11" } - compile("com.lyft:streamingplatform-events-source:2.2.1-SNAPSHOT") { + compile("com.lyft:streamingplatform-events-source:2.3.1-SNAPSHOT") { exclude group: "com.lyft", module: "streamingplatform-kinesis" exclude group: "com.google.guava", module: "guava" exclude group: "com.google.protobuf", module: "protobuf-java" @@ -209,42 +209,42 @@ dependencies { exclude group: "org.datanucleus", module: "datanucleus-rdbms" exclude group: "org.apache.logging.log4j", module: "log4j-core" } - // } else if (flink_version.startsWith('1.10')) { - // compile "org.apache.flink:flink-connector-kafka-0.11_2.11:1.10-lyft20200508" - // compile "com.amazonaws:aws-java-sdk-s3:1.10.6" - // compile "org.apache.hadoop:hadoop-aws:2.8.3" - // compile "com.lyft:streamingplatform-kinesis:2.1.1-SNAPSHOT" - // compile("com.lyft:streamingplatform-kafka:2.1.1-SNAPSHOT") { - // exclude group: "org.apache.flink", module: "flink-connector-kafka-0.11_2.11" - // exclude group: "org.apache.flink", module: "flink-streaming-java_2.11" - // } - // compile("com.lyft:streamingplatform-events-source:2.1.3-dryft-SNAPSHOT") { - // exclude group: "com.lyft", module: "streamingplatform-kinesis" - // exclude group: "com.google.guava", module: "guava" - // exclude group: "com.google.protobuf", module: "protobuf-java" - // exclude group: "co.cask.tephra", module: "tephra-api" - // exclude group: "co.cask.tephra", module: "tephra-core" - // exclude group: "co.cask.tephra", module: "tephra-hbase-compat-1.0" - // exclude group: "com.jolbox", module: "bonecp" - // exclude group: "com.zaxxer", module: "HikariCP" - // exclude group: "javolution", module: "javolution" - // exclude group: "org.antlr", module: "antlr-runtime" - // exclude group: "org.apache.hadoop", module: "hadoop-common" - // exclude group: "org.apache.hadoop", module: "hadoop-hdfs" - // exclude group: "org.apache.hadoop", module: "hadoop-mapreduce-client-core" - // exclude group: "org.apache.hbase", module: "hbase-client" - // exclude group: "org.datanucleus", module: "datanucleus-api-jdo" - // exclude group: "org.datanucleus", module: "datanucleus-core" - // exclude group: "org.datanucleus", module: "datanucleus-rdbms" - // exclude group: "org.apache.logging.log4j", module: "log4j-core" - // } - // } else { - // compile "org.apache.flink:flink-connector-kafka-0.11_2.11:1.8-lyft20190924" - // compile "com.lyft:streamingplatform-kinesis:1.4-SNAPSHOT" - // compile("com.lyft:streamingplatform-kafka:1.4-SNAPSHOT") { - // exclude group: "org.apache.flink", module: "flink-connector-kafka-0.11_2.11" - // exclude group: "org.apache.flink", module: "flink-streaming-java_2.11" - // } + } else if (flink_version.startsWith('1.10')) { + compile "org.apache.flink:flink-connector-kafka-0.11_2.11:1.10-lyft20200508" + compile "com.amazonaws:aws-java-sdk-s3:1.10.6" + compile "org.apache.hadoop:hadoop-aws:2.8.3" + compile "com.lyft:streamingplatform-kinesis:2.1.1-SNAPSHOT" + compile("com.lyft:streamingplatform-kafka:2.1.1-SNAPSHOT") { + exclude group: "org.apache.flink", module: "flink-connector-kafka-0.11_2.11" + exclude group: "org.apache.flink", module: "flink-streaming-java_2.11" + } + compile("com.lyft:streamingplatform-events-source:2.1.3-dryft-SNAPSHOT") { + exclude group: "com.lyft", module: "streamingplatform-kinesis" + exclude group: "com.google.guava", module: "guava" + exclude group: "com.google.protobuf", module: "protobuf-java" + exclude group: "co.cask.tephra", module: "tephra-api" + exclude group: "co.cask.tephra", module: "tephra-core" + exclude group: "co.cask.tephra", module: "tephra-hbase-compat-1.0" + exclude group: "com.jolbox", module: "bonecp" + exclude group: "com.zaxxer", module: "HikariCP" + exclude group: "javolution", module: "javolution" + exclude group: "org.antlr", module: "antlr-runtime" + exclude group: "org.apache.hadoop", module: "hadoop-common" + exclude group: "org.apache.hadoop", module: "hadoop-hdfs" + exclude group: "org.apache.hadoop", module: "hadoop-mapreduce-client-core" + exclude group: "org.apache.hbase", module: "hbase-client" + exclude group: "org.datanucleus", module: "datanucleus-api-jdo" + exclude group: "org.datanucleus", module: "datanucleus-core" + exclude group: "org.datanucleus", module: "datanucleus-rdbms" + exclude group: "org.apache.logging.log4j", module: "log4j-core" + } + } else { + compile "org.apache.flink:flink-connector-kafka-0.11_2.11:1.8-lyft20190924" + compile "com.lyft:streamingplatform-kinesis:1.4-SNAPSHOT" + compile("com.lyft:streamingplatform-kafka:1.4-SNAPSHOT") { + exclude group: "org.apache.flink", module: "flink-connector-kafka-0.11_2.11" + exclude group: "org.apache.flink", module: "flink-streaming-java_2.11" + } } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java index 1a2f60a2500e..86b6b9cd6f2b 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java @@ -17,11 +17,6 @@ */ package org.apache.beam.runners.flink; -import static com.lyft.streamingplatform.analytics.EventUtils.BACKUP_DB_DATETIME_FORMATTER; -import static com.lyft.streamingplatform.analytics.EventUtils.DB_DATETIME_FORMATTER; -import static com.lyft.streamingplatform.analytics.EventUtils.GMT; -import static com.lyft.streamingplatform.analytics.EventUtils.ISO_DATETIME_FORMATTER; - import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; @@ -29,6 +24,7 @@ import com.google.auto.service.AutoService; import com.lyft.streamingplatform.LyftKafkaConsumerBuilder; import com.lyft.streamingplatform.LyftKafkaProducerBuilder; +import com.lyft.streamingplatform.analytics.AnalyticsEventKafkaConsumerEventBuilder; import com.lyft.streamingplatform.analytics.Event; import com.lyft.streamingplatform.analytics.EventField; import com.lyft.streamingplatform.eventssource.KinesisAndS3EventSource; @@ -39,20 +35,6 @@ import com.lyft.streamingplatform.eventssource.config.SourceContext; import com.lyft.streamingplatform.flink.FlinkLyftKinesisConsumer; import com.lyft.streamingplatform.flink.InitialRoundRobinKinesisShardAssigner; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.sql.Timestamp; -import java.time.LocalDateTime; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; -import java.time.format.DateTimeParseException; -import java.time.temporal.TemporalAccessor; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.zip.DataFormatException; -import java.util.zip.Inflater; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.NativeTransforms; import org.apache.beam.runners.core.construction.PTransformTranslation; @@ -73,11 +55,9 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; -import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; @@ -89,6 +69,27 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; +import java.time.temporal.TemporalAccessor; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.zip.DataFormatException; +import java.util.zip.Inflater; + +import static com.lyft.streamingplatform.analytics.EventUtils.BACKUP_DB_DATETIME_FORMATTER; +import static com.lyft.streamingplatform.analytics.EventUtils.DB_DATETIME_FORMATTER; +import static com.lyft.streamingplatform.analytics.EventUtils.GMT; +import static com.lyft.streamingplatform.analytics.EventUtils.ISO_DATETIME_FORMATTER; + public class LyftFlinkStreamingPortableTranslations { private static final Logger LOG = @@ -101,6 +102,8 @@ public class LyftFlinkStreamingPortableTranslations { private static final String FLINK_S3_URN = "lyft:flinkS3Input"; private static final String BYTES_ENCODING = "bytes"; private static final String LYFT_BASE64_ZLIB_JSON = "lyft-base64-zlib-json"; + private static final String ANALYTICS_EVENT_KAFKA_CONSUMER_EVENT_BUILDER_URN = + "lyft:analyticsEventKafkaConsumerEventBuilder"; @AutoService(NativeTransforms.IsNativeTransform.class) public static class IsFlinkNativeTransform implements NativeTransforms.IsNativeTransform { @@ -111,7 +114,9 @@ public boolean test(RunnerApi.PTransform pTransform) { || FLINK_KINESIS_URN.equals(PTransformTranslation.urnForTransformOrNull(pTransform)) || FLINK_S3_AND_KINESIS_URN.equals( PTransformTranslation.urnForTransformOrNull(pTransform)) - || FLINK_S3_URN.equals(PTransformTranslation.urnForTransformOrNull(pTransform)); + || FLINK_S3_URN.equals(PTransformTranslation.urnForTransformOrNull(pTransform)) + || ANALYTICS_EVENT_KAFKA_CONSUMER_EVENT_BUILDER_URN.equals( + PTransformTranslation.urnForTransformOrNull(pTransform)); } } @@ -123,6 +128,9 @@ public void addTo( translatorMap.put(FLINK_KINESIS_URN, this::translateKinesisInput); translatorMap.put(FLINK_S3_AND_KINESIS_URN, this::translateS3AndKinesisInputs); translatorMap.put(FLINK_S3_URN, this::translateS3Inputs); + translatorMap.put( + ANALYTICS_EVENT_KAFKA_CONSUMER_EVENT_BUILDER_URN, + this::translateAnalyticsEventKafkaConsumerEventBuilder); } @VisibleForTesting @@ -304,6 +312,79 @@ public void processElement(StreamRecord> element) { } } + @VisibleForTesting + void translateAnalyticsEventKafkaConsumerEventBuilder( + String id, + RunnerApi.Pipeline pipeline, + FlinkStreamingPortablePipelineTranslator.StreamingTranslationContext context) { + RunnerApi.PTransform pTransform = pipeline.getComponents().getTransformsOrThrow(id); + + AnalyticsEventKafkaConsumerEventBuilder sourceBuilder = + new AnalyticsEventKafkaConsumerEventBuilder(); + ObjectMapper mapper = new ObjectMapper(); + + try { + JsonNode params = mapper.readTree(pTransform.getSpec().getPayload().toByteArray()); + + JsonNode eventNameNode = params.get("eventName"); + JsonNode eventNamesNode = params.get("eventNames"); + JsonNode propertiesNode = params.get("properties"); + JsonNode startingOffsetsTimestampNode = params.get("startingOffsetsTimestamp"); + JsonNode startingOffsetsNode = params.get("startingOffsets"); + JsonNode bootstrapServersNode = params.get("bootstrapServers"); + + String eventName = null; + List eventNames = null; + Properties properties = new Properties(); + long startingOffsetsTimestamp = 0; + String startingOffsets = null; + String bootstrapServers = null; + + if (eventNameNode != null) { + eventName = eventNameNode.textValue(); + } + if (eventNamesNode != null) { + eventNames = new ArrayList<>(); + for (JsonNode elem: eventNamesNode) { + eventNames.add(elem.textValue()); + } + } + if (propertiesNode != null) { + Map consumerProps = mapper.convertValue(propertiesNode, Map.class); + properties.putAll(consumerProps); + } + if (startingOffsetsTimestampNode != null) { + startingOffsetsTimestamp = startingOffsetsTimestampNode.numberValue().longValue(); + } + if (startingOffsetsNode != null) { + startingOffsets = startingOffsetsNode.textValue(); + } + if (bootstrapServersNode != null) { + bootstrapServers = bootstrapServersNode.textValue(); + } + + sourceBuilder = + sourceBuilder + .setProperties(properties) + .setStartingOffsets(startingOffsetsTimestamp) + .setStartingOffsets(startingOffsets) + .setBootstrapServers(bootstrapServers); + + if (eventNames != null) { + sourceBuilder = sourceBuilder.setEventNames(eventNames); + } else { + sourceBuilder = sourceBuilder.setEventName(eventName); + } + } catch (IOException e) { + throw new RuntimeException("Could not parse Analytics Event Kafka consumer properties.", e); + } + StreamExecutionEnvironment env = context.getExecutionEnvironment(); + DataStream dataStream = sourceBuilder.build(env); + context.addDataStream( + Iterables.getOnlyElement(pTransform.getOutputsMap().values()), + dataStream); + } + private void translateKinesisInput( String id, RunnerApi.Pipeline pipeline, diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslationsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslationsTest.java index 74c39ae0f781..d0fed59ef957 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslationsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslationsTest.java @@ -50,12 +50,14 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.Serializable; +import java.lang.reflect.Field; import java.net.URL; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.sql.Timestamp; import java.text.SimpleDateFormat; import java.util.Base64; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; @@ -75,7 +77,7 @@ public class LyftFlinkStreamingPortableTranslationsTest { @Mock private FlinkStreamingPortablePipelineTranslator.StreamingTranslationContext streamingContext; - @Mock private StreamExecutionEnvironment streamingEnvironment; + private StreamExecutionEnvironment streamingEnvironment; @Mock private DataStream dataStream; @@ -84,10 +86,15 @@ public class LyftFlinkStreamingPortableTranslationsTest { @Mock private DataStreamSink streamSink; @Before - public void before() { + public void before() throws Exception { MockitoAnnotations.initMocks(this); + streamingEnvironment = new StreamExecutionEnvironment(); when(streamingContext.getExecutionEnvironment()).thenReturn(streamingEnvironment); when(streamingContext.getPipelineOptions()).thenReturn(FlinkPipelineOptions.defaults()); + addEnv("PUB_SUB_BOOTSTRAP_SERVERS", "bootstrap1"); + addEnv("SERVICE_NAME", "service1"); + addEnv("APP_NAME", "app1"); + addEnv("APPLICATION_ENV", "develop"); } @Test @@ -290,6 +297,30 @@ private void runAndAssertKafkaSink(String id, String topicName, byte[] payload) Assert.assertEquals(FlinkKafkaProducer.class, kafkaSinkCaptor.getValue().getClass()); } + @Test + public void testTranslateAnalyticsEventKafkaConsumerEventBuilder() throws JsonProcessingException { + String id = "1"; + String eventName = "foo"; + Properties properties = new Properties(); + + ImmutableMap.Builder builder = + ImmutableMap.builder() + .put("eventName", eventName) + .put("properties", properties); + + byte[] payload = new ObjectMapper().writeValueAsBytes(builder.build()); + runAndAssertAnalyticsEventSource(id, eventName, payload); + } + + private void runAndAssertAnalyticsEventSource(String id, String eventName, byte[] payload) { + RunnerApi.Pipeline pipeline = createPipeline(id, payload); + LyftFlinkStreamingPortableTranslations portableTranslations = + new LyftFlinkStreamingPortableTranslations(); + + portableTranslations.translateAnalyticsEventKafkaConsumerEventBuilder( + id, pipeline, streamingContext); + } + /** * utility method to create payload for tests. * @@ -460,4 +491,19 @@ public void testGetS3WithoutKinesisConfig() throws IOException { JsonNode jsonNode = mapper.readTree(eventsStr); return mapper.convertValue(jsonNode, Map.class); } + + private static void addEnv(String key, String value) throws Exception { + Class[] klasses = Collections.class.getDeclaredClasses(); + Map env = System.getenv(); + for (Class klass : klasses) { + if ("java.util.Collections$UnmodifiableMap".equals(klass.getName())) { + for (Field field : klass.getDeclaredFields()) { + if (field.getType().equals(Map.class)) { + field.setAccessible(true); + ((Map) field.get(env)).put(key, value); + } + } + } + } + } } From 98696b5d96bf8159bff7798bdd476a50f03df326 Mon Sep 17 00:00:00 2001 From: Seth Saperstein Date: Fri, 8 Jul 2022 21:08:20 -0700 Subject: [PATCH 3/5] test for analytic event consumer --- runners/flink/flink_runner.gradle | 74 +++++------ ...yftFlinkStreamingPortableTranslations.java | 115 ++++++++++++------ ...linkStreamingPortableTranslationsTest.java | 76 ++++++++++-- 3 files changed, 183 insertions(+), 82 deletions(-) diff --git a/runners/flink/flink_runner.gradle b/runners/flink/flink_runner.gradle index 555d3ba79f35..5de4e88b4936 100644 --- a/runners/flink/flink_runner.gradle +++ b/runners/flink/flink_runner.gradle @@ -209,43 +209,43 @@ dependencies { exclude group: "org.datanucleus", module: "datanucleus-rdbms" exclude group: "org.apache.logging.log4j", module: "log4j-core" } - } else if (flink_version.startsWith('1.10')) { - compile "org.apache.flink:flink-connector-kafka-0.11_2.11:1.10-lyft20200508" - compile "com.amazonaws:aws-java-sdk-s3:1.10.6" - compile "org.apache.hadoop:hadoop-aws:2.8.3" - compile "com.lyft:streamingplatform-kinesis:2.1.1-SNAPSHOT" - compile("com.lyft:streamingplatform-kafka:2.1.1-SNAPSHOT") { - exclude group: "org.apache.flink", module: "flink-connector-kafka-0.11_2.11" - exclude group: "org.apache.flink", module: "flink-streaming-java_2.11" - } - compile("com.lyft:streamingplatform-events-source:2.1.3-dryft-SNAPSHOT") { - exclude group: "com.lyft", module: "streamingplatform-kinesis" - exclude group: "com.google.guava", module: "guava" - exclude group: "com.google.protobuf", module: "protobuf-java" - exclude group: "co.cask.tephra", module: "tephra-api" - exclude group: "co.cask.tephra", module: "tephra-core" - exclude group: "co.cask.tephra", module: "tephra-hbase-compat-1.0" - exclude group: "com.jolbox", module: "bonecp" - exclude group: "com.zaxxer", module: "HikariCP" - exclude group: "javolution", module: "javolution" - exclude group: "org.antlr", module: "antlr-runtime" - exclude group: "org.apache.hadoop", module: "hadoop-common" - exclude group: "org.apache.hadoop", module: "hadoop-hdfs" - exclude group: "org.apache.hadoop", module: "hadoop-mapreduce-client-core" - exclude group: "org.apache.hbase", module: "hbase-client" - exclude group: "org.datanucleus", module: "datanucleus-api-jdo" - exclude group: "org.datanucleus", module: "datanucleus-core" - exclude group: "org.datanucleus", module: "datanucleus-rdbms" - exclude group: "org.apache.logging.log4j", module: "log4j-core" - } - } else { - compile "org.apache.flink:flink-connector-kafka-0.11_2.11:1.8-lyft20190924" - compile "com.lyft:streamingplatform-kinesis:1.4-SNAPSHOT" - compile("com.lyft:streamingplatform-kafka:1.4-SNAPSHOT") { - exclude group: "org.apache.flink", module: "flink-connector-kafka-0.11_2.11" - exclude group: "org.apache.flink", module: "flink-streaming-java_2.11" - } - } + // } else if (flink_version.startsWith('1.10')) { + // compile "org.apache.flink:flink-connector-kafka-0.11_2.11:1.10-lyft20200508" + // compile "com.amazonaws:aws-java-sdk-s3:1.10.6" + // compile "org.apache.hadoop:hadoop-aws:2.8.3" + // compile "com.lyft:streamingplatform-kinesis:2.1.1-SNAPSHOT" + // compile("com.lyft:streamingplatform-kafka:2.1.1-SNAPSHOT") { + // exclude group: "org.apache.flink", module: "flink-connector-kafka-0.11_2.11" + // exclude group: "org.apache.flink", module: "flink-streaming-java_2.11" + // } + // compile("com.lyft:streamingplatform-events-source:2.1.3-dryft-SNAPSHOT") { + // exclude group: "com.lyft", module: "streamingplatform-kinesis" + // exclude group: "com.google.guava", module: "guava" + // exclude group: "com.google.protobuf", module: "protobuf-java" + // exclude group: "co.cask.tephra", module: "tephra-api" + // exclude group: "co.cask.tephra", module: "tephra-core" + // exclude group: "co.cask.tephra", module: "tephra-hbase-compat-1.0" + // exclude group: "com.jolbox", module: "bonecp" + // exclude group: "com.zaxxer", module: "HikariCP" + // exclude group: "javolution", module: "javolution" + // exclude group: "org.antlr", module: "antlr-runtime" + // exclude group: "org.apache.hadoop", module: "hadoop-common" + // exclude group: "org.apache.hadoop", module: "hadoop-hdfs" + // exclude group: "org.apache.hadoop", module: "hadoop-mapreduce-client-core" + // exclude group: "org.apache.hbase", module: "hbase-client" + // exclude group: "org.datanucleus", module: "datanucleus-api-jdo" + // exclude group: "org.datanucleus", module: "datanucleus-core" + // exclude group: "org.datanucleus", module: "datanucleus-rdbms" + // exclude group: "org.apache.logging.log4j", module: "log4j-core" + // } + // } else { + // compile "org.apache.flink:flink-connector-kafka-0.11_2.11:1.8-lyft20190924" + // compile "com.lyft:streamingplatform-kinesis:1.4-SNAPSHOT" + // compile("com.lyft:streamingplatform-kafka:1.4-SNAPSHOT") { + // exclude group: "org.apache.flink", module: "flink-connector-kafka-0.11_2.11" + // exclude group: "org.apache.flink", module: "flink-streaming-java_2.11" + // } + } } class ValidatesRunnerConfig { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java index 86b6b9cd6f2b..ff876cb682ea 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java @@ -325,43 +325,14 @@ void translateAnalyticsEventKafkaConsumerEventBuilder( try { JsonNode params = mapper.readTree(pTransform.getSpec().getPayload().toByteArray()); + ParamRetriever retriever = new ParamRetriever(params); - JsonNode eventNameNode = params.get("eventName"); - JsonNode eventNamesNode = params.get("eventNames"); - JsonNode propertiesNode = params.get("properties"); - JsonNode startingOffsetsTimestampNode = params.get("startingOffsetsTimestamp"); - JsonNode startingOffsetsNode = params.get("startingOffsets"); - JsonNode bootstrapServersNode = params.get("bootstrapServers"); - - String eventName = null; - List eventNames = null; - Properties properties = new Properties(); - long startingOffsetsTimestamp = 0; - String startingOffsets = null; - String bootstrapServers = null; - - if (eventNameNode != null) { - eventName = eventNameNode.textValue(); - } - if (eventNamesNode != null) { - eventNames = new ArrayList<>(); - for (JsonNode elem: eventNamesNode) { - eventNames.add(elem.textValue()); - } - } - if (propertiesNode != null) { - Map consumerProps = mapper.convertValue(propertiesNode, Map.class); - properties.putAll(consumerProps); - } - if (startingOffsetsTimestampNode != null) { - startingOffsetsTimestamp = startingOffsetsTimestampNode.numberValue().longValue(); - } - if (startingOffsetsNode != null) { - startingOffsets = startingOffsetsNode.textValue(); - } - if (bootstrapServersNode != null) { - bootstrapServers = bootstrapServersNode.textValue(); - } + String eventName = retriever.getString("eventName"); + List eventNames = retriever.getStrArray("eventNames"); + Properties properties = retriever.getProperties(); + long startingOffsetsTimestamp = retriever.getLong("startingOffsetsTimestamp"); + String startingOffsets = retriever.getString("startingOffsets"); + String bootstrapServers = retriever.getString("bootstrapServers"); sourceBuilder = sourceBuilder @@ -372,7 +343,8 @@ void translateAnalyticsEventKafkaConsumerEventBuilder( if (eventNames != null) { sourceBuilder = sourceBuilder.setEventNames(eventNames); - } else { + } + if (eventNames != null) { sourceBuilder = sourceBuilder.setEventName(eventName); } } catch (IOException e) { @@ -896,4 +868,73 @@ public long extractTimestamp(WindowedValue element) { return element.getTimestamp() != null ? element.getTimestamp().getMillis() : Long.MIN_VALUE; } } + + private class ParamRetriever { + private final JsonNode params; + private final ObjectMapper mapper = new ObjectMapper(); + + public ParamRetriever(JsonNode params) { + this.params = params; + } + + public String getString(String fieldName, String defaultValue) { + String value = defaultValue; + if (params.has(fieldName)) { + value = params.get(fieldName).textValue(); + } + return value; + } + + public String getString(String fieldName) { + return getString(fieldName, null); + } + + public long getLong(String fieldName, long defaultValue) { + long value = defaultValue; + if (params.has(fieldName)) { + value = params.get(fieldName).numberValue().longValue(); + } + return value; + } + + public long getLong(String fieldName) { + return getLong(fieldName, 0L); + } + + public List getStrArray(String fieldName, List defaultValue) { + List value = defaultValue; + if (params.has(fieldName)) { + value = new ArrayList<>(); + for (JsonNode jsonNode: params.get(fieldName)) { + value.add(jsonNode.textValue()); + } + } + return value; + } + + public List getStrArray(String fieldName) { + return getStrArray(fieldName, null); + } + + public Map getMap(String fieldName, Map defaultValue) { + Map value = defaultValue; + if (params.has(fieldName)) { + value = mapper.convertValue(params.get(fieldName), Map.class); + } + return value; + } + + public Map getMap(String fieldName) { + return getMap(fieldName, null); + } + + public Properties getProperties() { + Properties props = new Properties(); + Map propsParam = getMap("properties"); + if (propsParam != null) { + props.putAll(propsParam); + } + return props; + } + } } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslationsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslationsTest.java index d0fed59ef957..b07178b1b2d4 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslationsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslationsTest.java @@ -56,6 +56,7 @@ import java.nio.charset.StandardCharsets; import java.sql.Timestamp; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Base64; import java.util.Collections; import java.util.List; @@ -298,21 +299,80 @@ private void runAndAssertKafkaSink(String id, String topicName, byte[] payload) } @Test - public void testTranslateAnalyticsEventKafkaConsumerEventBuilder() throws JsonProcessingException { + public void testAnalyticsEventConsumer() throws JsonProcessingException { String id = "1"; String eventName = "foo"; - Properties properties = new Properties(); + byte[] payload = createAnalyticsEventKafkaConsumerEventPayload( + eventName, + null, + null, + 0L, + null, + null + ); + runAndAssertAnalyticsEventSource(id, payload); + } + + @Test + public void testAnalyticsEventConsumerWithMultipleEvents() throws JsonProcessingException { + String id = "1"; + List eventNames = new ArrayList<>(); + eventNames.add("event1"); + eventNames.add("event2"); + byte[] payload = createAnalyticsEventKafkaConsumerEventPayload( + null, + eventNames, + null, + 0L, + null, + null + ); + runAndAssertAnalyticsEventSource(id, payload); + } + + @Test(expected = RuntimeException.class) + public void testAnalyticsEventConsumerWithoutEvent() throws JsonProcessingException { + String id = "1"; + byte[] payload = createAnalyticsEventKafkaConsumerEventPayload( + null, + null, + null, + 0L, + null, + null + ); + runAndAssertAnalyticsEventSource(id, payload); + } + + private byte[] createAnalyticsEventKafkaConsumerEventPayload( + String eventName, + List eventNames, + Properties properties, + long startingOffsetsTimestamp, + String startingOffsets, + String bootstrapServers + ) throws JsonProcessingException { ImmutableMap.Builder builder = - ImmutableMap.builder() - .put("eventName", eventName) - .put("properties", properties); + ImmutableMap.builder(); - byte[] payload = new ObjectMapper().writeValueAsBytes(builder.build()); - runAndAssertAnalyticsEventSource(id, eventName, payload); + putIfNotNull(builder, "eventName", eventName); + putIfNotNull(builder, "eventNames", eventNames); + putIfNotNull(builder, "properties", properties); + putIfNotNull(builder, "startingOffsetsTimestamp", startingOffsetsTimestamp); + putIfNotNull(builder, "startingOffsets", startingOffsets); + putIfNotNull(builder, "bootstrapServers", bootstrapServers); + + return new ObjectMapper().writeValueAsBytes(builder.build()); + } + + private void putIfNotNull(ImmutableMap.Builder builder, String key, Object value) { + if (value != null) { + builder.put(key, value); + } } - private void runAndAssertAnalyticsEventSource(String id, String eventName, byte[] payload) { + private void runAndAssertAnalyticsEventSource(String id, byte[] payload) { RunnerApi.Pipeline pipeline = createPipeline(id, payload); LyftFlinkStreamingPortableTranslations portableTranslations = new LyftFlinkStreamingPortableTranslations(); From d6038905697e2c3236d54c22371134d3b0385d0c Mon Sep 17 00:00:00 2001 From: Seth Saperstein Date: Mon, 11 Jul 2022 14:47:04 -0700 Subject: [PATCH 4/5] proto event builder added. all tests passing --- ...yftFlinkStreamingPortableTranslations.java | 56 +++++++++- .../apache/beam/runners/flink/EnvUtil.java | 54 ++++++++++ ...linkStreamingPortableTranslationsTest.java | 102 ++++++++++++------ .../test/resources/s3_and_kinesis_config.json | 2 +- 4 files changed, 173 insertions(+), 41 deletions(-) create mode 100644 runners/flink/src/test/java/org/apache/beam/runners/flink/EnvUtil.java diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java index ff876cb682ea..68b16dc4de60 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java @@ -25,6 +25,7 @@ import com.lyft.streamingplatform.LyftKafkaConsumerBuilder; import com.lyft.streamingplatform.LyftKafkaProducerBuilder; import com.lyft.streamingplatform.analytics.AnalyticsEventKafkaConsumerEventBuilder; +import com.lyft.streamingplatform.analytics.AnalyticsEventKafkaConsumerProtoBuilder; import com.lyft.streamingplatform.analytics.Event; import com.lyft.streamingplatform.analytics.EventField; import com.lyft.streamingplatform.eventssource.KinesisAndS3EventSource; @@ -53,6 +54,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; @@ -102,8 +104,10 @@ public class LyftFlinkStreamingPortableTranslations { private static final String FLINK_S3_URN = "lyft:flinkS3Input"; private static final String BYTES_ENCODING = "bytes"; private static final String LYFT_BASE64_ZLIB_JSON = "lyft-base64-zlib-json"; - private static final String ANALYTICS_EVENT_KAFKA_CONSUMER_EVENT_BUILDER_URN = - "lyft:analyticsEventKafkaConsumerEventBuilder"; + private static final String FLINK_ANALYTICS_EVENT_KAFKA_CONSUMER_EVENT_BUILDER_URN = + "lyft:flinkAnalyticsEventKafkaConsumerEventBuilder"; + private static final String FLINK_ANALYTICS_EVENT_KAFKA_CONSUMER_PROTO_BUILDER_URN = + "lyft:flinkAnalyticsEventKafkaConsumerProtoBuilder"; @AutoService(NativeTransforms.IsNativeTransform.class) public static class IsFlinkNativeTransform implements NativeTransforms.IsNativeTransform { @@ -115,7 +119,9 @@ public boolean test(RunnerApi.PTransform pTransform) { || FLINK_S3_AND_KINESIS_URN.equals( PTransformTranslation.urnForTransformOrNull(pTransform)) || FLINK_S3_URN.equals(PTransformTranslation.urnForTransformOrNull(pTransform)) - || ANALYTICS_EVENT_KAFKA_CONSUMER_EVENT_BUILDER_URN.equals( + || FLINK_ANALYTICS_EVENT_KAFKA_CONSUMER_EVENT_BUILDER_URN.equals( + PTransformTranslation.urnForTransformOrNull(pTransform)) + || FLINK_ANALYTICS_EVENT_KAFKA_CONSUMER_PROTO_BUILDER_URN.equals( PTransformTranslation.urnForTransformOrNull(pTransform)); } } @@ -129,8 +135,11 @@ public void addTo( translatorMap.put(FLINK_S3_AND_KINESIS_URN, this::translateS3AndKinesisInputs); translatorMap.put(FLINK_S3_URN, this::translateS3Inputs); translatorMap.put( - ANALYTICS_EVENT_KAFKA_CONSUMER_EVENT_BUILDER_URN, + FLINK_ANALYTICS_EVENT_KAFKA_CONSUMER_EVENT_BUILDER_URN, this::translateAnalyticsEventKafkaConsumerEventBuilder); + translatorMap.put( + FLINK_ANALYTICS_EVENT_KAFKA_CONSUMER_PROTO_BUILDER_URN, + this::translateAnalyticsEventKafkaConsumerProtoBuilder); } @VisibleForTesting @@ -344,7 +353,7 @@ void translateAnalyticsEventKafkaConsumerEventBuilder( if (eventNames != null) { sourceBuilder = sourceBuilder.setEventNames(eventNames); } - if (eventNames != null) { + if (eventName != null) { sourceBuilder = sourceBuilder.setEventName(eventName); } } catch (IOException e) { @@ -357,6 +366,43 @@ void translateAnalyticsEventKafkaConsumerEventBuilder( dataStream); } + @VisibleForTesting + void translateAnalyticsEventKafkaConsumerProtoBuilder( + String id, + RunnerApi.Pipeline pipeline, + FlinkStreamingPortablePipelineTranslator.StreamingTranslationContext context) { + RunnerApi.PTransform pTransform = pipeline.getComponents().getTransformsOrThrow(id); + AnalyticsEventKafkaConsumerProtoBuilder sourceBuilder = + new AnalyticsEventKafkaConsumerProtoBuilder(); + ObjectMapper mapper = new ObjectMapper(); + + try { + JsonNode params = mapper.readTree(pTransform.getSpec().getPayload().toByteArray()); + ParamRetriever retriever = new ParamRetriever(params); + + String eventName = retriever.getString("eventName"); + Properties properties = retriever.getProperties(); + long startingOffsetsTimestamp = retriever.getLong("startingOffsetsTimestamp"); + String startingOffsets = retriever.getString("startingOffsets"); + String bootstrapServers = retriever.getString("bootstrapServers"); + + sourceBuilder = + sourceBuilder + .setProperties(properties) + .setStartingOffsets(startingOffsetsTimestamp) + .setStartingOffsets(startingOffsets) + .setBootstrapServers(bootstrapServers) + .setEventName(eventName); + } catch (IOException e) { + throw new RuntimeException("Could not parse Analytics Event Kafka Proto consumer properties.", e); + } + StreamExecutionEnvironment env = context.getExecutionEnvironment(); + SingleOutputStreamOperator dataStream = sourceBuilder.build(env); + context.addDataStream( + Iterables.getOnlyElement(pTransform.getOutputsMap().values()), + dataStream); + } + private void translateKinesisInput( String id, RunnerApi.Pipeline pipeline, diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/EnvUtil.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/EnvUtil.java new file mode 100644 index 000000000000..36c0daee35c7 --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/EnvUtil.java @@ -0,0 +1,54 @@ +package org.apache.beam.runners.flink; + +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.Map; + +/** + * utility class to add / remove environment variables for tests. + */ +public class EnvUtil { + + /** + * add environment variable using reflection. + * @param key + * @param value + * @throws Exception + */ + public static void addEnv(String key, String value) throws Exception { + + Class[] klasses = Collections.class.getDeclaredClasses(); + Map env = System.getenv(); + for (Class klass : klasses) { + if ("java.util.Collections$UnmodifiableMap".equals(klass.getName())) { + for (Field field : klass.getDeclaredFields()) { + if (field.getType().equals(Map.class)) { + field.setAccessible(true); + ((Map) field.get(env)).put(key, value); + } + } + } + } + } + + /** + * removes environment variable. + * @param key + * @throws Exception + */ + static void removeEnv(String key) throws Exception { + + Class[] klasses = Collections.class.getDeclaredClasses(); + Map env = System.getenv(); + for (Class klass : klasses) { + if ("java.util.Collections$UnmodifiableMap".equals(klass.getName())) { + for (Field field : klass.getDeclaredFields()) { + if (field.getType().equals(Map.class)) { + field.setAccessible(true); + ((Map) field.get(env)).remove(key); + } + } + } + } + } +} diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslationsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslationsTest.java index b07178b1b2d4..7675b5f171da 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslationsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslationsTest.java @@ -40,6 +40,7 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; +import org.joda.time.DateTime; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -50,7 +51,6 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.Serializable; -import java.lang.reflect.Field; import java.net.URL; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; @@ -58,7 +58,6 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Base64; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; @@ -69,6 +68,7 @@ import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -78,8 +78,6 @@ public class LyftFlinkStreamingPortableTranslationsTest { @Mock private FlinkStreamingPortablePipelineTranslator.StreamingTranslationContext streamingContext; - private StreamExecutionEnvironment streamingEnvironment; - @Mock private DataStream dataStream; @Mock private SingleOutputStreamOperator outputStreamOperator; @@ -89,13 +87,13 @@ public class LyftFlinkStreamingPortableTranslationsTest { @Before public void before() throws Exception { MockitoAnnotations.initMocks(this); - streamingEnvironment = new StreamExecutionEnvironment(); - when(streamingContext.getExecutionEnvironment()).thenReturn(streamingEnvironment); + StreamExecutionEnvironment env = new StreamExecutionEnvironment(); + when(streamingContext.getExecutionEnvironment()).thenReturn(env); when(streamingContext.getPipelineOptions()).thenReturn(FlinkPipelineOptions.defaults()); - addEnv("PUB_SUB_BOOTSTRAP_SERVERS", "bootstrap1"); - addEnv("SERVICE_NAME", "service1"); - addEnv("APP_NAME", "app1"); - addEnv("APPLICATION_ENV", "develop"); + EnvUtil.addEnv("PUB_SUB_BOOTSTRAP_SERVERS", "bootstrap1"); + EnvUtil.addEnv("SERVICE_NAME", "service1"); + EnvUtil.addEnv("APP_NAME", "app1"); + EnvUtil.addEnv("APPLICATION_ENV", "develop"); } @Test @@ -215,16 +213,20 @@ public void testKafkaInputWithNonDevKafkaBroker() throws JsonProcessingException private void runAndAssertKafkaInput(String id, String topicName, byte[] payload) { RunnerApi.Pipeline pipeline = createPipeline(id, payload); + FlinkStreamingPortablePipelineTranslator.StreamingTranslationContext context = mock(FlinkStreamingPortablePipelineTranslator.StreamingTranslationContext.class); + StreamExecutionEnvironment env = mock(StreamExecutionEnvironment.class); + when(context.getExecutionEnvironment()).thenReturn(env); + when(context.getPipelineOptions()).thenReturn(FlinkPipelineOptions.defaults()); // run new LyftFlinkStreamingPortableTranslations() - .translateKafkaInput(id, pipeline, streamingContext); + .translateKafkaInput(id, pipeline, context); // assert ArgumentCaptor kafkaSourceCaptor = ArgumentCaptor.forClass(FlinkKafkaConsumer.class); ArgumentCaptor kafkaSourceNameCaptor = ArgumentCaptor.forClass(String.class); - verify(streamingEnvironment) + verify(env) .addSource(kafkaSourceCaptor.capture(), kafkaSourceNameCaptor.capture()); Assert.assertEquals( WindowedValue.class, kafkaSourceCaptor.getValue().getProducedType().getTypeClass()); @@ -310,7 +312,7 @@ public void testAnalyticsEventConsumer() throws JsonProcessingException { null, null ); - runAndAssertAnalyticsEventSource(id, payload); + runAndAssertAnalyticsEventSource(id, payload, streamingContext); } @Test @@ -327,7 +329,7 @@ public void testAnalyticsEventConsumerWithMultipleEvents() throws JsonProcessing null, null ); - runAndAssertAnalyticsEventSource(id, payload); + runAndAssertAnalyticsEventSource(id, payload, streamingContext); } @@ -342,7 +344,38 @@ public void testAnalyticsEventConsumerWithoutEvent() throws JsonProcessingExcept null, null ); - runAndAssertAnalyticsEventSource(id, payload); + runAndAssertAnalyticsEventSource(id, payload, streamingContext); + } + + @Test + public void testAnalyticsEventProtoConsumer() throws JsonProcessingException { + String id = "1"; + String eventName = "event1"; + byte[] payload = createAnalyticsEventKafkaConsumerEventPayload( + eventName, + null, + null, + 0L, + null, + null + ); + runAndAssertAnalyticsEventSource(id, payload, streamingContext, true); + } + + @Test + public void testAnalyticsEventProtoConsumerStartingOffsetTimestamp() throws JsonProcessingException { + String id = "1"; + String eventName = "event1"; + long timestamp = DateTime.now().getMillis(); + byte[] payload = createAnalyticsEventKafkaConsumerEventPayload( + eventName, + null, + null, + timestamp, + null, + null + ); + runAndAssertAnalyticsEventSource(id, payload, streamingContext, true); } private byte[] createAnalyticsEventKafkaConsumerEventPayload( @@ -372,13 +405,23 @@ private void putIfNotNull(ImmutableMap.Builder builder, String k } } - private void runAndAssertAnalyticsEventSource(String id, byte[] payload) { + private void runAndAssertAnalyticsEventSource(String id, byte[] payload, FlinkStreamingPortablePipelineTranslator.StreamingTranslationContext context) { + runAndAssertAnalyticsEventSource(id, payload, context, false); + } + + private void runAndAssertAnalyticsEventSource(String id, byte[] payload, FlinkStreamingPortablePipelineTranslator.StreamingTranslationContext context, boolean protoBuilder) { RunnerApi.Pipeline pipeline = createPipeline(id, payload); LyftFlinkStreamingPortableTranslations portableTranslations = new LyftFlinkStreamingPortableTranslations(); - portableTranslations.translateAnalyticsEventKafkaConsumerEventBuilder( - id, pipeline, streamingContext); + + if (protoBuilder) { + portableTranslations.translateAnalyticsEventKafkaConsumerProtoBuilder( + id, pipeline, context); + } else { + portableTranslations.translateAnalyticsEventKafkaConsumerEventBuilder( + id, pipeline, context); + } } /** @@ -482,7 +525,11 @@ public void testGetTimestampFromEvent() { } @Test - public void testGetEventConfigs() throws IOException { + public void testGetEventConfigs() throws Exception { + // EventConfig uses idl dependency rather than reflection svc + // when service name not indicated. Used for testing. + EnvUtil.removeEnv("SERVICE_NAME"); + LyftFlinkStreamingPortableTranslations translations = new LyftFlinkStreamingPortableTranslations(); ObjectMapper mapper = new ObjectMapper(); @@ -495,7 +542,7 @@ public void testGetEventConfigs() throws IOException { assertEquals(1, eventConfigs.size()); EventConfig event = eventConfigs.get(0); - assertEquals("test_event", event.eventName); + assertEquals("event_pb_events_server_api_events_rideDroppedOff", event.eventName); assertEquals(5, event.latenessInSeconds); assertEquals(1, event.lookbackInDays); } @@ -551,19 +598,4 @@ public void testGetS3WithoutKinesisConfig() throws IOException { JsonNode jsonNode = mapper.readTree(eventsStr); return mapper.convertValue(jsonNode, Map.class); } - - private static void addEnv(String key, String value) throws Exception { - Class[] klasses = Collections.class.getDeclaredClasses(); - Map env = System.getenv(); - for (Class klass : klasses) { - if ("java.util.Collections$UnmodifiableMap".equals(klass.getName())) { - for (Field field : klass.getDeclaredFields()) { - if (field.getType().equals(Map.class)) { - field.setAccessible(true); - ((Map) field.get(env)).put(key, value); - } - } - } - } - } } diff --git a/runners/flink/src/test/resources/s3_and_kinesis_config.json b/runners/flink/src/test/resources/s3_and_kinesis_config.json index 539964989cdc..5fcc998e4870 100644 --- a/runners/flink/src/test/resources/s3_and_kinesis_config.json +++ b/runners/flink/src/test/resources/s3_and_kinesis_config.json @@ -1,7 +1,7 @@ { "events": [ { - "name": "test_event", + "name": "event_pb_events_server_api_events_rideDroppedOff", "max_out_of_orderness_millis": 5000, "lookback_days": 1 } From e63cef92bb376b3e6b50c3b8bcc5a2f607db8066 Mon Sep 17 00:00:00 2001 From: Seth Saperstein Date: Fri, 15 Jul 2022 13:06:17 -0700 Subject: [PATCH 5/5] translation for kafka builder --- ...yftFlinkStreamingPortableTranslations.java | 175 ++++++++++-------- ...linkStreamingPortableTranslationsTest.java | 62 ++++++- 2 files changed, 154 insertions(+), 83 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java index 68b16dc4de60..82a8cd9b342b 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.auto.service.AutoService; import com.lyft.streamingplatform.LyftKafkaConsumerBuilder; +import com.lyft.streamingplatform.LyftKafkaConsumerBuilderV2; import com.lyft.streamingplatform.LyftKafkaProducerBuilder; import com.lyft.streamingplatform.analytics.AnalyticsEventKafkaConsumerEventBuilder; import com.lyft.streamingplatform.analytics.AnalyticsEventKafkaConsumerProtoBuilder; @@ -50,6 +51,8 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -74,12 +77,12 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.sql.Timestamp; +import java.time.Duration; import java.time.LocalDateTime; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeParseException; import java.time.temporal.TemporalAccessor; -import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -108,6 +111,8 @@ public class LyftFlinkStreamingPortableTranslations { "lyft:flinkAnalyticsEventKafkaConsumerEventBuilder"; private static final String FLINK_ANALYTICS_EVENT_KAFKA_CONSUMER_PROTO_BUILDER_URN = "lyft:flinkAnalyticsEventKafkaConsumerProtoBuilder"; + private static final String FLINK_KAFKA_CONSUMER_BUILDER_URN = + "lyft:flinkLyftKafkaConsumerBuilder"; @AutoService(NativeTransforms.IsNativeTransform.class) public static class IsFlinkNativeTransform implements NativeTransforms.IsNativeTransform { @@ -122,6 +127,8 @@ public boolean test(RunnerApi.PTransform pTransform) { || FLINK_ANALYTICS_EVENT_KAFKA_CONSUMER_EVENT_BUILDER_URN.equals( PTransformTranslation.urnForTransformOrNull(pTransform)) || FLINK_ANALYTICS_EVENT_KAFKA_CONSUMER_PROTO_BUILDER_URN.equals( + PTransformTranslation.urnForTransformOrNull(pTransform)) + || FLINK_KAFKA_CONSUMER_BUILDER_URN.equals( PTransformTranslation.urnForTransformOrNull(pTransform)); } } @@ -140,6 +147,9 @@ public void addTo( translatorMap.put( FLINK_ANALYTICS_EVENT_KAFKA_CONSUMER_PROTO_BUILDER_URN, this::translateAnalyticsEventKafkaConsumerProtoBuilder); + translatorMap.put( + FLINK_KAFKA_CONSUMER_BUILDER_URN, + this::translateKafkaConsumerBuilder); } @VisibleForTesting @@ -202,7 +212,60 @@ void translateKafkaInput( .addSource(kafkaSource, FlinkKafkaConsumer.class.getSimpleName() + "-" + topic)); } - /** + @VisibleForTesting + void translateKafkaConsumerBuilder( + String id, + RunnerApi.Pipeline pipeline, + FlinkStreamingPortablePipelineTranslator.StreamingTranslationContext context) { + RunnerApi.PTransform pTransform = pipeline.getComponents().getTransformsOrThrow(id); + + final Map params; + LyftKafkaConsumerBuilderV2> builder = new LyftKafkaConsumerBuilderV2>(); + try { + ObjectMapper mapper = new ObjectMapper(); + params = mapper.readValue(pTransform.getSpec().getPayload().toByteArray(), Map.class); + + String topic = (String) params.get("topic"); + String bootstrapServers = (String) params.get("bootstrapServers"); + Properties properties = ParamParser.getProperties(params); + long startingOffsetsTimestamp = ParamParser.getLong(params, "startingOffsetsTimestamp"); + String startingOffsets = (String) params.get("startingOffsets"); + long maxOutOfOrdernessMillis = ParamParser.getLong(params, "max_out_of_orderness_millis"); + builder + .setProperties(properties) + .setTopic(topic) + .setBootstrapServers(bootstrapServers) + .setStartingOffsets(startingOffsetsTimestamp) + .setStartingOffsets(startingOffsets) + .setDeserializationSchema(new ByteArrayWindowedValueSchema(context)) + .setTimestampAssigner(new ByteArrayWindowedValueTimestampAssigner()); + + if (maxOutOfOrdernessMillis != 0L) { + builder = + builder.setWatermarkStrategy( + WatermarkStrategy.>forBoundedOutOfOrderness( + Duration.ofMillis(maxOutOfOrdernessMillis)) + .withIdleness(Duration.ofMillis(60_000))); + } + } catch (IOException e) { + throw new RuntimeException("Could not parse KafkaConsumerBuilder properties.", e); + } + StreamExecutionEnvironment env = context.getExecutionEnvironment(); + DataStream> dataStream = builder.build(env); + context.addDataStream( + Iterables.getOnlyElement(pTransform.getOutputsMap().values()), + dataStream); + } + + private static class ByteArrayWindowedValueTimestampAssigner implements SerializableTimestampAssigner> { + + @Override + public long extractTimestamp(WindowedValue element, long recordTimestamp) { + return element.getTimestamp() != null ? element.getTimestamp().getMillis() : Long.MIN_VALUE; + } + } + + /** * Deserializer for native Flink Kafka source that produces {@link WindowedValue} expected by Beam * operators. */ @@ -330,18 +393,17 @@ void translateAnalyticsEventKafkaConsumerEventBuilder( AnalyticsEventKafkaConsumerEventBuilder sourceBuilder = new AnalyticsEventKafkaConsumerEventBuilder(); - ObjectMapper mapper = new ObjectMapper(); - + final Map params; try { - JsonNode params = mapper.readTree(pTransform.getSpec().getPayload().toByteArray()); - ParamRetriever retriever = new ParamRetriever(params); + ObjectMapper mapper = new ObjectMapper(); + params = mapper.readValue(pTransform.getSpec().getPayload().toByteArray(), Map.class); - String eventName = retriever.getString("eventName"); - List eventNames = retriever.getStrArray("eventNames"); - Properties properties = retriever.getProperties(); - long startingOffsetsTimestamp = retriever.getLong("startingOffsetsTimestamp"); - String startingOffsets = retriever.getString("startingOffsets"); - String bootstrapServers = retriever.getString("bootstrapServers"); + String eventName = (String) params.get("eventName"); + List eventNames = (List) params.get("eventNames"); + Properties properties = ParamParser.getProperties(params); + long startingOffsetsTimestamp = ParamParser.getLong(params, "startingOffsetsTimestamp"); + String startingOffsets = (String) params.get("startingOffsets"); + String bootstrapServers = (String) params.get("bootstrapServers"); sourceBuilder = sourceBuilder @@ -374,17 +436,17 @@ void translateAnalyticsEventKafkaConsumerProtoBuilder( RunnerApi.PTransform pTransform = pipeline.getComponents().getTransformsOrThrow(id); AnalyticsEventKafkaConsumerProtoBuilder sourceBuilder = new AnalyticsEventKafkaConsumerProtoBuilder(); - ObjectMapper mapper = new ObjectMapper(); + final Map params; try { - JsonNode params = mapper.readTree(pTransform.getSpec().getPayload().toByteArray()); - ParamRetriever retriever = new ParamRetriever(params); + ObjectMapper mapper = new ObjectMapper(); + params = mapper.readValue(pTransform.getSpec().getPayload().toByteArray(), Map.class); - String eventName = retriever.getString("eventName"); - Properties properties = retriever.getProperties(); - long startingOffsetsTimestamp = retriever.getLong("startingOffsetsTimestamp"); - String startingOffsets = retriever.getString("startingOffsets"); - String bootstrapServers = retriever.getString("bootstrapServers"); + String eventName = (String) params.get("eventName"); + Properties properties = ParamParser.getProperties(params); + long startingOffsetsTimestamp = ParamParser.getLong(params, "startingOffsetsTimestamp"); + String startingOffsets = (String) params.get("startingOffsets"); + String bootstrapServers = (String) params.get("bootstrapServers"); sourceBuilder = sourceBuilder @@ -915,72 +977,21 @@ public long extractTimestamp(WindowedValue element) { } } - private class ParamRetriever { - private final JsonNode params; - private final ObjectMapper mapper = new ObjectMapper(); - - public ParamRetriever(JsonNode params) { - this.params = params; - } - - public String getString(String fieldName, String defaultValue) { - String value = defaultValue; - if (params.has(fieldName)) { - value = params.get(fieldName).textValue(); - } - return value; - } - - public String getString(String fieldName) { - return getString(fieldName, null); - } - - public long getLong(String fieldName, long defaultValue) { - long value = defaultValue; - if (params.has(fieldName)) { - value = params.get(fieldName).numberValue().longValue(); - } - return value; - } - - public long getLong(String fieldName) { - return getLong(fieldName, 0L); - } - - public List getStrArray(String fieldName, List defaultValue) { - List value = defaultValue; - if (params.has(fieldName)) { - value = new ArrayList<>(); - for (JsonNode jsonNode: params.get(fieldName)) { - value.add(jsonNode.textValue()); - } - } - return value; - } - - public List getStrArray(String fieldName) { - return getStrArray(fieldName, null); - } - - public Map getMap(String fieldName, Map defaultValue) { - Map value = defaultValue; - if (params.has(fieldName)) { - value = mapper.convertValue(params.get(fieldName), Map.class); + private static class ParamParser { + public static Properties getProperties(Map params) { + Properties props = new Properties(); + if (params.get("properties") != null) { + props.putAll((Map) params.get("properties")); } - return value; - } - - public Map getMap(String fieldName) { - return getMap(fieldName, null); + return props; } - public Properties getProperties() { - Properties props = new Properties(); - Map propsParam = getMap("properties"); - if (propsParam != null) { - props.putAll(propsParam); + public static long getLong(Map params, String fieldName) { + Number number = (Number) params.get("startingOffsetsTimestamp"); + if (number != null) { + return number.longValue(); } - return props; + return 0L; } } } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslationsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslationsTest.java index 7675b5f171da..4fe748aa5c7a 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslationsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslationsTest.java @@ -300,6 +300,38 @@ private void runAndAssertKafkaSink(String id, String topicName, byte[] payload) Assert.assertEquals(FlinkKafkaProducer.class, kafkaSinkCaptor.getValue().getClass()); } + @Test + public void testKafkaConsumerBuilder() throws JsonProcessingException { + String id = "1"; + String topic = "foo"; + String bootstrapServers = "testServer"; + byte[] payload = createKafkaConsumerBuilderPayload( + topic, + bootstrapServers, + null, + 0L, + null, + 0L + ); + runAndAssertKafkaBuilderSource(id, payload, streamingContext); + } + + @Test + public void testKafkaConsumerBuilderWithCustomOutOfOrderness() throws JsonProcessingException { + String id = "1"; + String topic = "foo"; + String bootstrapServers = "testServer"; + byte[] payload = createKafkaConsumerBuilderPayload( + topic, + bootstrapServers, + null, + 0L, + null, + 10_000 + ); + runAndAssertKafkaBuilderSource(id, payload, streamingContext); + } + @Test public void testAnalyticsEventConsumer() throws JsonProcessingException { String id = "1"; @@ -378,6 +410,27 @@ public void testAnalyticsEventProtoConsumerStartingOffsetTimestamp() throws Json runAndAssertAnalyticsEventSource(id, payload, streamingContext, true); } + private byte[] createKafkaConsumerBuilderPayload( + String topic, + String bootstrapServers, + Properties properties, + long startingOffsetsTimestamp, + String startingOffsets, + long maxOutOfOrdernessMillis + ) throws JsonProcessingException { + ImmutableMap.Builder builder = + ImmutableMap.builder(); + + putIfNotNull(builder, "topic", topic); + putIfNotNull(builder, "bootstrapServers", bootstrapServers); + putIfNotNull(builder, "properties", properties); + putIfNotNull(builder, "startingOffsetsTimestamp", startingOffsetsTimestamp); + putIfNotNull(builder, "startingOffsets", startingOffsets); + putIfNotNull(builder, "maxOutOfOrdernessMillis", maxOutOfOrdernessMillis); + + return new ObjectMapper().writeValueAsBytes(builder.build()); + } + private byte[] createAnalyticsEventKafkaConsumerEventPayload( String eventName, List eventNames, @@ -405,6 +458,14 @@ private void putIfNotNull(ImmutableMap.Builder builder, String k } } + private void runAndAssertKafkaBuilderSource(String id, byte[] payload, FlinkStreamingPortablePipelineTranslator.StreamingTranslationContext context) { + RunnerApi.Pipeline pipeline = createPipeline(id, payload); + LyftFlinkStreamingPortableTranslations portableTranslations = + new LyftFlinkStreamingPortableTranslations(); + portableTranslations.translateKafkaConsumerBuilder( + id, pipeline, context); + } + private void runAndAssertAnalyticsEventSource(String id, byte[] payload, FlinkStreamingPortablePipelineTranslator.StreamingTranslationContext context) { runAndAssertAnalyticsEventSource(id, payload, context, false); } @@ -414,7 +475,6 @@ private void runAndAssertAnalyticsEventSource(String id, byte[] payload, FlinkSt LyftFlinkStreamingPortableTranslations portableTranslations = new LyftFlinkStreamingPortableTranslations(); - if (protoBuilder) { portableTranslations.translateAnalyticsEventKafkaConsumerProtoBuilder( id, pipeline, context);