Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,13 @@ public class KafkaSourceOptions extends KafkaBaseOptions {
.noDefaultValue()
.withDescription(
"The time required for consumption mode to be timestamp.The endTimestamp configuration specifies the end timestamp of the messages and is only applicable in batch mode");

public static final Option<Boolean> STRIP_SCHEMA_REGISTRY_HEADER =
Options.key("strip_schema_registry_header")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to strip the Confluent Schema Registry wire format header "
+ "(magic byte, schema id and message indexes) before "
+ "protobuf deserialization.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.seatunnel.format.json.maxwell.MaxWellJsonDeserializationSchema;
import org.apache.seatunnel.format.json.ogg.OggJsonDeserializationSchema;
import org.apache.seatunnel.format.protobuf.ProtobufDeserializationSchema;
import org.apache.seatunnel.format.protobuf.SchemaRegistryAwareProtobufDeserializationSchema;
import org.apache.seatunnel.format.text.TextDeserializationSchema;
import org.apache.seatunnel.format.text.constant.TextFormatConstant;

Expand Down Expand Up @@ -100,6 +101,7 @@
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.START_MODE_END_TIMESTAMP;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.START_MODE_OFFSETS;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.START_MODE_TIMESTAMP;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.STRIP_SCHEMA_REGISTRY_HEADER;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.TOPIC;

public class KafkaSourceConfig implements Serializable {
Expand Down Expand Up @@ -408,7 +410,13 @@ private DeserializationSchema<SeaTunnelRow> createDeserializationSchema(
schema = new AvroDeserializationSchema(catalogTable);
break;
case PROTOBUF:
schema = new ProtobufDeserializationSchema(catalogTable);
boolean stripSchemaRegistryHeader =
readonlyConfig.get(STRIP_SCHEMA_REGISTRY_HEADER);
if (stripSchemaRegistryHeader) {
schema = new SchemaRegistryAwareProtobufDeserializationSchema(catalogTable);
} else {
schema = new ProtobufDeserializationSchema(catalogTable);
}
break;
default:
throw new SeaTunnelJsonFormatException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1558,6 +1558,53 @@ private void sendData(DefaultSeaTunnelRowSerializer serializer) {
producer.flush();
}

private byte[] wrapWithSchemaRegistryHeader(byte[] protobufBytes) {
// Confluent Schema Registry Protobuf wire format:
// magic byte (0) + 4 bytes schema id + 1 byte message index (varint for value 1)
byte magic = 0;
int schemaId = 1;
byte[] header = new byte[6];
header[0] = magic;
header[1] = (byte) ((schemaId >> 24) & 0xFF);
header[2] = (byte) ((schemaId >> 16) & 0xFF);
header[3] = (byte) ((schemaId >> 8) & 0xFF);
header[4] = (byte) (schemaId & 0xFF);
header[5] = 1; // single message index

byte[] result = new byte[header.length + protobufBytes.length];
System.arraycopy(header, 0, result, 0, header.length);
System.arraycopy(protobufBytes, 0, result, header.length, protobufBytes.length);
return result;
}

private void sendSchemaRegistryHeaderData(DefaultSeaTunnelRowSerializer serializer) {
// Produce Schema Registry wire-format records to Kafka
IntStream.range(0, 20)
.forEach(
i -> {
try {
SeaTunnelRow originalRow = buildSeaTunnelRow();
ProducerRecord<byte[], byte[]> originalRecord =
serializer.serializeRow(originalRow);
byte[] wrappedValue =
wrapWithSchemaRegistryHeader(originalRecord.value());
ProducerRecord<byte[], byte[]> wrappedRecord =
new ProducerRecord<>(
originalRecord.topic(),
originalRecord.partition(),
originalRecord.key(),
wrappedValue);
producer.send(wrappedRecord).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(
"Error sending Kafka message with Schema Registry header",
e);
}
});

producer.flush();
}

@TestTemplate
public void testKafkaProtobufForTransformToAssert(TestContainer container)
throws IOException, InterruptedException, URISyntaxException {
Expand Down Expand Up @@ -1608,6 +1655,59 @@ public void testKafkaProtobufForTransformToAssert(TestContainer container)
}
}

@TestTemplate
public void testKafkaProtobufSchemaRegistryHeaderForTransformToAssert(TestContainer container)
throws IOException, InterruptedException, URISyntaxException {

String confFile =
"/protobuf/kafka_protobuf_schema_registry_header_transform_to_assert.conf";
String path = getTestConfigFile(confFile);
Config config = ConfigFactory.parseFile(new File(path));
Config sinkConfig = config.getConfigList("source").get(0);
ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(sinkConfig);
SeaTunnelRowType seaTunnelRowType = buildSeaTunnelRowType();

// Create serializer
DefaultSeaTunnelRowSerializer serializer =
getDefaultSeaTunnelRowSerializer(
"test_protobuf_schema_registry_topic_transform_fake_source",
seaTunnelRowType,
readonlyConfig);

// Produce Schema Registry wire-format records to Kafka
sendSchemaRegistryHeaderData(serializer);

// Execute the job and validate
Container.ExecResult execResult = container.executeJob(confFile);
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());

try (KafkaConsumer<byte[], byte[]> consumer =
new KafkaConsumer<>(kafkaByteConsumerConfig())) {
consumer.subscribe(Arrays.asList("verify_protobuf_schema_registry_transform"));
Map<TopicPartition, Long> offsets =
consumer.endOffsets(
Arrays.asList(
new TopicPartition(
"verify_protobuf_schema_registry_transform", 0)));
Long endOffset = offsets.entrySet().iterator().next().getValue();
Long lastProcessedOffset = -1L;

do {
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<byte[], byte[]> record : records) {
if (lastProcessedOffset < record.offset()) {
String data = new String(record.value(), "UTF-8");
ObjectNode jsonNodes = JsonUtils.parseObject(data);
Assertions.assertEquals(jsonNodes.size(), 2);
Assertions.assertEquals(jsonNodes.get("city").asText(), "city_value");
Assertions.assertEquals(jsonNodes.get("c_string").asText(), "test data");
}
lastProcessedOffset = record.offset();
}
} while (lastProcessedOffset < endOffset - 1);
}
}

public static String getTestConfigFile(String configFile)
throws FileNotFoundException, URISyntaxException {
URL resource = KafkaIT.class.getResource(configFile);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
parallelism = 1
job.mode = "BATCH"
spark.app.name = "SeaTunnel"
spark.executor.instances = 1
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local
}

source {
Kafka {
topic = "test_protobuf_schema_registry_topic_transform_fake_source"
format = protobuf
strip_schema_registry_header = true
protobuf_message_name = Person
protobuf_schema = """
syntax = "proto3";

package org.apache.seatunnel.format.protobuf;

option java_outer_classname = "ProtobufE2E";

message Person {
int32 c_int32 = 1;
int64 c_int64 = 2;
float c_float = 3;
double c_double = 4;
bool c_bool = 5;
string c_string = 6;
bytes c_bytes = 7;

message Address {
string street = 1;
string city = 2;
string state = 3;
string zip = 4;
}

Address address = 8;

map<string, float> attributes = 9;

repeated string phone_numbers = 10;
}
"""
schema = {
fields {
c_int32 = int
c_int64 = long
c_float = float
c_double = double
c_bool = boolean
c_string = string
c_bytes = bytes

Address {
city = string
state = string
street = string
}
attributes = "map<string,float>"
phone_numbers = "array<string>"
}
}
bootstrap.servers = "kafkaCluster:9092"
start_mode = "earliest"
plugin_output = "kafka_table"
}
}

transform {
Sql {
plugin_input = "kafka_table"
plugin_output = "kafka_table_transform"
query = "select Address.city,c_string from dual"
}
}

sink {
kafka {
topic = "verify_protobuf_schema_registry_transform"
plugin_input = "kafka_table_transform"
bootstrap.servers = "kafkaCluster:9092"
kafka.request.timeout.ms = 60000
kafka.config = {
acks = "all"
request.timeout.ms = 60000
buffer.memory = 33554432
}

}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.format.protobuf;

import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Arrays;

/**
* A Protobuf deserialization schema that is aware of Confluent Schema Registry's wire format.
*
* <p>This schema will try to strip the Schema Registry header (magic byte, schema id and message
* indexes) before delegating to {@link ProtobufDeserializationSchema}. If stripping fails, it falls
* back to using the original payload, so it can safely be enabled for both plain and Schema
* Registry encoded messages.
*/
public class SchemaRegistryAwareProtobufDeserializationSchema
implements DeserializationSchema<SeaTunnelRow> {

private static final long serialVersionUID = -2134049729306615854L;

/**
* Maximum number of additional header bytes (beyond the 5 bytes magic + schema id) to probe
* when trying to locate the actual Protobuf message. This covers the variable-length "message
* indexes" part used by Schema Registry for Protobuf.
*/
private static final int MAX_ADDITIONAL_HEADER_BYTES = 16;

private static final Logger LOG =
LoggerFactory.getLogger(SchemaRegistryAwareProtobufDeserializationSchema.class);

private final ProtobufDeserializationSchema inner;
private final SeaTunnelRowType rowType;

public SchemaRegistryAwareProtobufDeserializationSchema(CatalogTable catalogTable) {
this.inner = new ProtobufDeserializationSchema(catalogTable);
this.rowType = catalogTable.getSeaTunnelRowType();
}

@Override
public SeaTunnelRow deserialize(byte[] message) throws IOException {
if (message == null || message.length == 0) {
return inner.deserialize(message);
}

int length = message.length;

// Confluent Schema Registry Protobuf wire format:
// 1 byte magic (0), 4 bytes schema id, N bytes message indexes (varints), then protobuf.
if (length >= 6 && message[0] == 0) {
int candidateStart = 6;
if (candidateStart < length) {
try {
return inner.deserialize(Arrays.copyOfRange(message, candidateStart, length));
} catch (IOException | RuntimeException ignored) {
LOG.warn("Protobuf message not recognized at candidate offset 6, falling back");
}
}

int maxProbeStart = Math.min(5 + MAX_ADDITIONAL_HEADER_BYTES, length - 1);
for (int start = 5; start <= maxProbeStart; start++) {
if (start == candidateStart) {
continue;
}
try {
return inner.deserialize(Arrays.copyOfRange(message, start, length));
} catch (IOException | RuntimeException ignored) {
LOG.warn(
"Protobuf message not recognized at candidate offset {}, falling back",
start);
}
}
}

return inner.deserialize(message);
}

@Override
public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
return this.rowType;
}
}