diff --git a/benchmark/pom.xml b/benchmark/pom.xml new file mode 100644 index 0000000..80a6778 --- /dev/null +++ b/benchmark/pom.xml @@ -0,0 +1,186 @@ + + + 4.0.0 + + flink-connector-gcp-parent + com.google.flink.connector.gcp + 0.0.0 + + + flink.contector.gcp + benchmark + 1.0 + jar + + JMH benchmark sample: Java + + + + + + org.openjdk.jmh + jmh-core + ${jmh.version} + + + org.openjdk.jmh + jmh-generator-annprocess + ${jmh.version} + provided + + + com.google.flink.connector.gcp + flink-examples-gcp + 0.0.0 + + + com.google.cloud.flink + flink-1.17-connector-bigquery + + 0.1.0-preview + + + + + UTF-8 + + + 1.37 + + + 1.8 + + + benchmarks + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.0 + + ${javac.target} + ${javac.target} + ${javac.target} + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + package + + shade + + + ${uberjar.name} + + + org.openjdk.jmh.Main + + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + + + + + maven-clean-plugin + 2.5 + + + maven-deploy-plugin + 2.8.1 + + + maven-install-plugin + 2.5.1 + + + maven-jar-plugin + 2.4 + + + maven-javadoc-plugin + 2.9.1 + + + maven-resources-plugin + 2.6 + + + maven-site-plugin + 3.3 + + + maven-source-plugin + 2.2.1 + + + maven-surefire-plugin + 2.17 + + + + + + diff --git a/benchmark/src/main/java/flink/contector/gcp/AvroRandomRecordGenerator.java b/benchmark/src/main/java/flink/contector/gcp/AvroRandomRecordGenerator.java new file mode 100644 index 0000000..9b32180 --- /dev/null +++ b/benchmark/src/main/java/flink/contector/gcp/AvroRandomRecordGenerator.java @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package flink.contector.gcp; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.RandomUtils; + +public class AvroRandomRecordGenerator { + + public static String getRandomString() { + return RandomStringUtils.randomAlphabetic(RandomUtils.nextInt(2, 40)); + } + + public static Integer getRandomInteger() { + return RandomUtils.nextInt(0, Integer.MAX_VALUE); + } + + public static Long getRandomLong() { + return RandomUtils.nextLong(0L, Long.MAX_VALUE); + } + + public static Double getRandomDouble() { + return RandomUtils.nextDouble(0, Double.MAX_VALUE); + } + + public static Float getRandomFloat() { + return RandomUtils.nextFloat(0f, Float.MAX_VALUE); + } + + public static Boolean getRandomBoolean() { + return RandomUtils.nextInt(0, 10) % 2 == 0 ? Boolean.TRUE : Boolean.FALSE; + } + + public static ByteBuffer getRandomBytes() { + return ByteBuffer.wrap(RandomUtils.nextBytes(RandomUtils.nextInt(1, 25))); + } + + /** + * Generates Generic Record with random values to given avro Schema. + * + * @param schema Avro Schema + * @return + */ + public static GenericData.Record generateRandomRecordData(Schema schema) { + if (!Schema.Type.RECORD.equals(schema.getType())) { + throw new IllegalArgumentException("input schema must be a record schema"); + } + + final GenericData.Record record = new GenericData.Record(schema); + + for (Schema.Field field : schema.getFields()) { + switch (field.schema().getType()) { + case BOOLEAN: + record.put(field.pos(), getRandomBoolean()); + break; + case INT: + record.put(field.pos(), getRandomInteger()); + break; + case LONG: + record.put(field.pos(), getRandomLong()); + break; + case DOUBLE: + record.put(field.pos(), getRandomDouble()); + break; + case FLOAT: + record.put(field.pos(), getRandomFloat()); + break; + case BYTES: + record.put(field.pos(), getRandomBytes()); + break; + case STRING: + record.put(field.pos(), getRandomString()); + break; + case RECORD: + record.put(field.pos(), generateRandomRecordData(field.schema())); + break; + case ENUM: + record.put(field.pos(), generateRandomEnumSymbol(field.schema())); + break; + case FIXED: + record.put(field.pos(), generateRandomFixed(field.schema())); + break; + case UNION: + record.put(field.pos(), generateRandomUnion(field.schema())); + break; + case ARRAY: + record.put(field.pos(), generateRandomArray(field.schema())); + break; + case MAP: + record.put(field.pos(), generateRandomMap(field.schema())); + break; + default: + throw new IllegalArgumentException("Not Support type " + schema.getValueType().getType()); + } + } + + return record; + } + + /** + * Generates Enum Field with random values to given avro Schema. + * + * @param schema Avro Schema + * @return + */ + public static GenericData.EnumSymbol generateRandomEnumSymbol(Schema schema) { + if (!Schema.Type.ENUM.equals(schema.getType())) { + throw new IllegalArgumentException("input schema must be an enum schema"); + } + + return new GenericData.EnumSymbol(schema, + schema.getEnumSymbols().get(RandomUtils.nextInt(0, schema.getEnumSymbols().size()))); + } + + /** + * Generates Fixed Field with random values to given avro Schema. + * + * @param schema Avro Schema + * @return + */ + public static GenericData.Fixed generateRandomFixed(Schema schema) { + if (!Schema.Type.FIXED.equals(schema.getType())) { + throw new IllegalArgumentException("input schema must be an fixed schema"); + } + + return new GenericData.Fixed(schema, + RandomUtils.nextBytes(schema.getFixedSize())); + } + + /** + * Generates Union Field with random values to given avro Schema. + * + * @param schema Avro Schema + * @return + */ + public static Object generateRandomUnion(Schema schema) { + if (!Schema.Type.UNION.equals(schema.getType())) { + throw new IllegalArgumentException("input schema must be an union schema"); + } + Object unionData = null; + switch (schema.getTypes().get(1).getType()) { + case BOOLEAN: + unionData = getRandomBoolean(); + break; + case INT: + unionData = getRandomInteger(); + break; + case LONG: + unionData = getRandomLong(); + break; + case DOUBLE: + unionData = getRandomDouble(); + break; + case FLOAT: + unionData = getRandomFloat(); + break; + case BYTES: + unionData = getRandomBytes(); + break; + case STRING: + unionData = getRandomString(); + break; + case RECORD: + unionData = generateRandomRecordData(schema.getTypes().get(1)); + break; + case ENUM: + unionData = generateRandomEnumSymbol(schema.getTypes().get(1)); + break; + case FIXED: + unionData = generateRandomFixed(schema.getTypes().get(1)); + break; + case UNION: + unionData = generateRandomUnion(schema.getTypes().get(1)); + break; + default: + unionData = null; + } + + return unionData; + } + + /** + * Generates Array Field with random values to given avro Schema. + * + * @param schema Avro Schema + * @return + */ + public static GenericData.Array generateRandomArray(Schema schema) { + if (!Schema.Type.ARRAY.equals(schema.getType())) { + throw new IllegalArgumentException("input schema must be an array schema"); + } + + int elements = RandomUtils.nextInt(1, 11); + GenericData.Array arrayData = new GenericData.Array<>(elements, schema); + for (int i = 0; i < elements; i++) { + switch (schema.getElementType().getType()) { + case BOOLEAN: + arrayData.add(getRandomBoolean()); + break; + case INT: + arrayData.add(getRandomInteger()); + break; + case LONG: + arrayData.add(getRandomLong()); + break; + case DOUBLE: + arrayData.add(getRandomDouble()); + break; + case FLOAT: + arrayData.add(getRandomFloat()); + break; + case BYTES: + arrayData.add(getRandomBytes()); + break; + case STRING: + arrayData.add(getRandomString()); + break; + case RECORD: + arrayData.add(generateRandomRecordData(schema.getElementType())); + break; + case ENUM: + arrayData.add(generateRandomEnumSymbol(schema.getElementType())); + break; + case FIXED: + arrayData.add(generateRandomFixed(schema.getElementType())); + break; + default: + throw new IllegalArgumentException("Not Support type " + schema.getValueType().getType()); + } + } + + return arrayData; + } + + /** + * Generates Map Field with random values to given avro Schema. + * + * @param schema Avro Schema + * @return + */ + public static Map generateRandomMap(Schema schema) { + if (!Schema.Type.MAP.equals(schema.getType())) { + throw new IllegalArgumentException("input schema must be an map schema"); + } + + int elements = RandomUtils.nextInt(1, 11); + Map mapData = new HashMap<>(); + for (int i = 0; i < elements; i++) { + switch (schema.getValueType().getType()) { + case BOOLEAN: + mapData.put(RandomStringUtils.randomAlphabetic(10), getRandomBoolean()); + break; + case INT: + mapData.put(RandomStringUtils.randomAlphabetic(10), getRandomInteger()); + break; + case LONG: + mapData.put(RandomStringUtils.randomAlphabetic(10), getRandomLong()); + break; + case DOUBLE: + mapData.put(RandomStringUtils.randomAlphabetic(10), getRandomDouble()); + break; + case FLOAT: + mapData.put(RandomStringUtils.randomAlphabetic(10), getRandomFloat()); + break; + case BYTES: + mapData.put(RandomStringUtils.randomAlphabetic(10), getRandomBytes()); + break; + case STRING: + mapData.put(RandomStringUtils.randomAlphabetic(10), getRandomString()); + break; + case RECORD: + mapData.put(RandomStringUtils.randomAlphabetic(10), + generateRandomRecordData(schema.getValueType())); + break; + case ENUM: + mapData.put(RandomStringUtils.randomAlphabetic(10), + generateRandomEnumSymbol(schema.getValueType())); + break; + case FIXED: + mapData.put(RandomStringUtils.randomAlphabetic(10), + generateRandomFixed(schema.getValueType())); + break; + default: + throw new IllegalArgumentException("Not Support type " + schema.getValueType().getType()); + } + } + + return mapData; + } + +} \ No newline at end of file diff --git a/benchmark/src/main/java/flink/contector/gcp/MyBenchmark.java b/benchmark/src/main/java/flink/contector/gcp/MyBenchmark.java new file mode 100644 index 0000000..ed586d5 --- /dev/null +++ b/benchmark/src/main/java/flink/contector/gcp/MyBenchmark.java @@ -0,0 +1,107 @@ +/* + * Copyright (C) 2024 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package flink.contector.gcp; + +import com.google.cloud.flink.bigquery.sink.serializer.AvroToProtoSerializer; +import com.google.cloud.flink.bigquery.sink.serializer.BigQueryProtoSerializer; +import com.google.cloud.flink.bigquery.sink.serializer.BigQuerySchemaProvider; +import flink.connector.gcp.ProtoSerializer; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import org.apache.avro.generic.GenericData; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.profile.GCProfiler; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +@State(Scope.Benchmark) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@Fork(1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +public class MyBenchmark { + private final Random random = new Random(); + private static final int NUMBER_OF_OPERATIONS = 100_000; + + private static final int RECORD_COUNT = 10; + + private final BigQuerySchemaProvider bigQuerySchemaProvider; + + private static BigQueryProtoSerializer serializer; + private static BigQueryProtoSerializer fastSerializer; + + private GenericData.Record[] generatedRecords; + private GenericData.Record generatedRecord; + + + public MyBenchmark() { + bigQuerySchemaProvider = TestBigQuerySchemas.getSchemaWithRequiredPrimitiveTypes(); + } + + public static void main(String[] args) throws RunnerException { + org.openjdk.jmh.runner.options.Options opt = new OptionsBuilder() + .include(MyBenchmark.class.getSimpleName()) + .addProfiler(GCProfiler.class) + .build(); + new Runner(opt).run(); + } + + @Setup(Level.Trial) + public void prepare() throws Exception { + serializer = new AvroToProtoSerializer(); + serializer.init(bigQuerySchemaProvider); + + fastSerializer = new ProtoSerializer(); + + // generate avro record + generatedRecords = new GenericData.Record[RECORD_COUNT]; + for(int i = 0; i < RECORD_COUNT; i++){ + generatedRecords[i] = AvroRandomRecordGenerator + .generateRandomRecordData(bigQuerySchemaProvider.getAvroSchema()); + } + generatedRecord = generatedRecords[random.nextInt(RECORD_COUNT)]; + } + + @Benchmark + @OperationsPerInvocation(NUMBER_OF_OPERATIONS) + public void testFastAvroSerialization(Blackhole bh) throws Exception { + for (int i = 0; i < NUMBER_OF_OPERATIONS; i++) { + bh.consume(fastSerializer.serialize(generatedRecord)); + } + } + + @Benchmark + @OperationsPerInvocation(NUMBER_OF_OPERATIONS) + public void testAvroSerialization(Blackhole bh) throws Exception { + for (int i = 0; i < NUMBER_OF_OPERATIONS; i++) { + bh.consume(serializer.serialize(generatedRecord)); + } + } +} diff --git a/benchmark/src/main/java/flink/contector/gcp/TestBigQuerySchemas.java b/benchmark/src/main/java/flink/contector/gcp/TestBigQuerySchemas.java new file mode 100644 index 0000000..fe1fb79 --- /dev/null +++ b/benchmark/src/main/java/flink/contector/gcp/TestBigQuerySchemas.java @@ -0,0 +1,80 @@ +/* + * Copyright (C) 2024 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package flink.contector.gcp; + +import com.google.api.services.bigquery.model.TableFieldSchema; +import com.google.api.services.bigquery.model.TableSchema; +import com.google.cloud.flink.bigquery.sink.serializer.BigQuerySchemaProvider; +import com.google.cloud.flink.bigquery.sink.serializer.BigQuerySchemaProviderImpl; +import com.google.protobuf.Descriptors.Descriptor; +import java.util.Arrays; +import java.util.List; + +public class TestBigQuerySchemas { + + // Private Constructor to ensure no instantiation. + private TestBigQuerySchemas() {} + + public static BigQuerySchemaProvider getSchemaWithStringTypes(String mode) { + List fields = + Arrays.asList( + new TableFieldSchema().setName("word").setType("STRING").setMode(mode), + new TableFieldSchema().setName("countStr").setType("STRING").setMode(mode), + new TableFieldSchema().setName("word1").setType("STRING").setMode(mode), + new TableFieldSchema().setName("countStr1").setType("STRING").setMode(mode), + new TableFieldSchema().setName("word2").setType("STRING").setMode(mode), + new TableFieldSchema().setName("countStr2").setType("STRING").setMode(mode), + new TableFieldSchema().setName("word3").setType("STRING").setMode(mode), + new TableFieldSchema().setName("countStr3").setType("STRING").setMode(mode), + new TableFieldSchema().setName("word4").setType("STRING").setMode(mode), + new TableFieldSchema().setName("countStr4").setType("STRING").setMode(mode), + new TableFieldSchema().setName("word5").setType("STRING").setMode(mode), + new TableFieldSchema().setName("countStr5").setType("STRING").setMode(mode) + ); + TableSchema tableSchema = new TableSchema().setFields(fields); + BigQuerySchemaProvider bigQuerySchemaProvider = new BigQuerySchemaProviderImpl(tableSchema); + Descriptor descriptor = bigQuerySchemaProvider.getDescriptor(); + return new TestSchemaProvider(bigQuerySchemaProvider.getAvroSchema(), descriptor); + } + + public static BigQuerySchemaProvider getSchemaWithPrimitiveTypes(String mode) { + List fields = + Arrays.asList( + new TableFieldSchema().setName("number").setType("INTEGER").setMode(mode), + new TableFieldSchema().setName("price").setType("FLOAT").setMode(mode), + new TableFieldSchema().setName("species").setType("STRING").setMode(mode), + new TableFieldSchema().setName("flighted").setType("BOOLEAN").setMode(mode), + new TableFieldSchema().setName("number1").setType("INTEGER").setMode(mode), + new TableFieldSchema().setName("price1").setType("FLOAT").setMode(mode), + new TableFieldSchema().setName("species1").setType("STRING").setMode(mode), + new TableFieldSchema().setName("flighted1").setType("BOOLEAN").setMode(mode), + new TableFieldSchema().setName("number2").setType("STRING").setMode(mode), + new TableFieldSchema().setName("price2").setType("STRING").setMode(mode)); + TableSchema tableSchema = new TableSchema().setFields(fields); + BigQuerySchemaProvider bigQuerySchemaProvider = new BigQuerySchemaProviderImpl(tableSchema); + Descriptor descriptor = bigQuerySchemaProvider.getDescriptor(); + return new TestSchemaProvider(bigQuerySchemaProvider.getAvroSchema(), descriptor); + } + + public static BigQuerySchemaProvider getSchemaWithRequiredPrimitiveTypes() { + return getSchemaWithPrimitiveTypes("REQUIRED"); + } + + public static BigQuerySchemaProvider getSchemaWithNullablePrimitiveTypes() { + return getSchemaWithPrimitiveTypes("NULLABLE"); + } + +} \ No newline at end of file diff --git a/benchmark/src/main/java/flink/contector/gcp/TestSchemaProvider.java b/benchmark/src/main/java/flink/contector/gcp/TestSchemaProvider.java new file mode 100644 index 0000000..4096e8c --- /dev/null +++ b/benchmark/src/main/java/flink/contector/gcp/TestSchemaProvider.java @@ -0,0 +1,50 @@ +/* + * Copyright (C) 2024 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package flink.contector.gcp; + +import com.google.cloud.flink.bigquery.sink.serializer.BigQuerySchemaProvider; +import com.google.protobuf.DescriptorProtos.DescriptorProto; +import com.google.protobuf.Descriptors.Descriptor; +import org.apache.avro.Schema; + +/** + * Class inheriting {@link BigQuerySchemaProvider} for {@link AvroToProtoSerializerTest} and {@link + * BigQuerySchemaProviderTest}. + */ +public class TestSchemaProvider implements BigQuerySchemaProvider { + private final Schema schema; + private final Descriptor descriptor; + + TestSchemaProvider(Schema schema, Descriptor descriptor) { + this.schema = schema; + this.descriptor = descriptor; + } + + @Override + public DescriptorProto getDescriptorProto() { + return this.getDescriptor().toProto(); + } + + @Override + public Descriptor getDescriptor() { + return this.descriptor; + } + + public Schema getAvroSchema() { + return this.schema; + } +} diff --git a/flink-examples-gcp/gmk-to-bq-wc.yaml b/flink-examples-gcp/gmk-to-bq-wc.yaml index 53b206a..682b81d 100644 --- a/flink-examples-gcp/gmk-to-bq-wc.yaml +++ b/flink-examples-gcp/gmk-to-bq-wc.yaml @@ -1,16 +1,18 @@ apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: - name: gmk-to-bq-wordcount + name: gmk-to-bq-wordcount-talat spec: - image: us-central1-docker.pkg.dev/managed-flink-shared-dev/flink-connector-repo/flink-examples-gcp:latest + image: us-central1-docker.pkg.dev/managed-flink-shared-dev/flink-connector-repo/flink-examples-gcp:serializer-2 flinkVersion: v1_19 flinkConfiguration: taskmanager.numberOfTaskSlots: "1" + rest.flamegraph.enabled: "true" + execution.checkpointing.tolerable-failed-checkpoints: "10000" serviceAccount: flink jobManager: resource: - memory: "2048m" + memory: "4096m" cpu: 1 podTemplate: apiVersion: v1 @@ -29,17 +31,17 @@ spec: key: password taskManager: resource: - memory: "2048m" + memory: "4096m" cpu: 1 job: entryClass: flink.connector.gcp.GMKToBQWordCount jarURI: local:///opt/flink/usrlib/gmf-examples.jar # Replace these with your values: - args: ["--brokers", "bootstrap..us-central1.managedkafka-..cloud-staging.goog:9092", - "--gmk-username", "@.iam.gserviceaccount.com", - "--kafka-topic", "my-topic", - "--project-id", "", - "--dataset-name", "", - "--table-name", ""] - parallelism: 5 + args: ["--brokers", "bootstrap.gmf-load-test-clstr.us-central1.managedkafka.managed-flink-shared-dev.cloud.goog:9092", + "--gmk-username", "clairemccarthy-srvc-accnt@managed-flink-shared-dev.iam.gserviceaccount.com", + "--kafka-topic", "load-test-topic", + "--project-id", "managed-flink-shared-dev", + "--dataset-name", "test_ds", + "--table-name", "test_table", "--bq-sink-parallelism", "1"] + parallelism: 1 upgradeMode: stateless \ No newline at end of file diff --git a/flink-examples-gcp/src/main/java/flink/connector/gcp/GMKToBQWordCount.java b/flink-examples-gcp/src/main/java/flink/connector/gcp/GMKToBQWordCount.java index 7bc28d0..d233cad 100644 --- a/flink-examples-gcp/src/main/java/flink/connector/gcp/GMKToBQWordCount.java +++ b/flink-examples-gcp/src/main/java/flink/connector/gcp/GMKToBQWordCount.java @@ -23,6 +23,8 @@ import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.MultipleParameterTool; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; @@ -32,7 +34,6 @@ import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions; import com.google.cloud.flink.bigquery.sink.BigQuerySink; import com.google.cloud.flink.bigquery.sink.BigQuerySinkConfig; -import com.google.cloud.flink.bigquery.sink.serializer.AvroToProtoSerializer; import com.google.cloud.flink.bigquery.sink.serializer.BigQuerySchemaProvider; import com.google.cloud.flink.bigquery.sink.serializer.BigQuerySchemaProviderImpl; import org.apache.avro.Schema; @@ -57,7 +58,15 @@ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(parameters); + Configuration config = new Configuration(); + config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem"); + config.set( + CheckpointingOptions.CHECKPOINTS_DIRECTORY, + "gs://clairemccarthy-checkpoint/checkpoints/"); + env.configure(config); env.enableCheckpointing(checkpointInterval); + env.getCheckpointConfig().enableUnalignedCheckpoints(); + env.getCheckpointConfig().setMinPauseBetweenCheckpoints(60000L); KafkaSource source = KafkaSource.builder() @@ -86,7 +95,7 @@ public static void main(String[] args) throws Exception { .connectOptions(sinkConnectOptions) .deliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .schemaProvider(schemaProvider) - .serializer(new AvroToProtoSerializer()) + .serializer(new ProtoSerializer()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source") diff --git a/flink-examples-gcp/src/main/java/flink/connector/gcp/ProtoSerializer.java b/flink-examples-gcp/src/main/java/flink/connector/gcp/ProtoSerializer.java new file mode 100644 index 0000000..0fa24ff --- /dev/null +++ b/flink-examples-gcp/src/main/java/flink/connector/gcp/ProtoSerializer.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package flink.connector.gcp; + +import com.google.cloud.flink.bigquery.sink.exceptions.BigQuerySerializationException; +import com.google.cloud.flink.bigquery.sink.serializer.BigQueryProtoSerializer; +import com.google.protobuf.ByteString; +import com.google.protobuf.CodedOutputStream; +import java.io.IOException; +import org.apache.avro.generic.GenericRecord; + +/** + * ProtoSerializer class. + */ +public class ProtoSerializer extends BigQueryProtoSerializer { + @Override + public ByteString serialize(GenericRecord message) throws BigQuerySerializationException { + ByteString.Output output = ByteString.newOutput(); + CodedOutputStream codedOutputStream = CodedOutputStream.newInstance(output); + + // Assuming the field order in your MyMessage.proto is: word (1), countStr (2) + try { + codedOutputStream.writeInt64(1, (long) message.get(0)); + codedOutputStream.writeDouble(2, (double) message.get(1)); + codedOutputStream.writeString(3, (String) message.get(2)); + codedOutputStream.writeBool(4, (boolean) message.get(3)); + codedOutputStream.writeInt64(5, (long) message.get(4)); + codedOutputStream.writeDouble(6, (double) message.get(5)); + codedOutputStream.writeString(7, (String) message.get(6)); + codedOutputStream.writeBool(8, (boolean) message.get(7)); + codedOutputStream.writeString(9, (String) message.get(8)); + codedOutputStream.writeString(10, (String) message.get(9)); + codedOutputStream.flush(); // Ensure everything is written + } catch (IOException e) { + throw new BigQuerySerializationException(e.getMessage()); + } + + // Convert ByteArrayOutputStream to ByteString + return output.toByteString(); + } +} diff --git a/pom.xml b/pom.xml index 8a5e0d9..7973614 100644 --- a/pom.xml +++ b/pom.xml @@ -55,6 +55,7 @@ under the License. flink-examples-gcp + benchmark @@ -91,7 +92,7 @@ under the License. of entries in the form '[-]{2}add-[opens|exports]=/=ALL-UNNAMED'.--> - false + true