Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.connect.utils.KafkaConnectUtils;
import org.apache.hudi.keygen.KeyGenerator;
Expand Down Expand Up @@ -66,7 +67,7 @@ public AbstractConnectWriter(KafkaConnectConfigs connectConfigs,

@Override
public void writeRecord(SinkRecord record) throws IOException {
AvroConvertor convertor = new AvroConvertor(schemaProvider.getSourceSchema());
AvroConvertor convertor = new AvroConvertor(HoodieSchema.fromAvroSchema(schemaProvider.getSourceSchema()));
Option<GenericRecord> avroRecord;
switch (connectConfigs.getKafkaValueConverter()) {
case KAFKA_AVRO_CONVERTER:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public ProtoClassBasedSchemaProvider(TypedProperties props, JavaSparkContext jss
String className = getStringWithAltKeys(config, PROTO_SCHEMA_CLASS_NAME);
ProtoConversionUtil.SchemaConfig schemaConfig = ProtoConversionUtil.SchemaConfig.fromProperties(props);
try {
schemaString = ProtoConversionUtil.getAvroSchemaForMessageClass(ReflectionUtils.getClass(className), schemaConfig).toString();
schemaString = ProtoConversionUtil.getSchemaForMessageClass(ReflectionUtils.getClass(className), schemaConfig).toString();
} catch (Exception e) {
throw new HoodieSchemaException(String.format("Error reading proto source schema for class: %s", className), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ public ProtoSchemaToAvroSchemaConverter(TypedProperties config) {

public String convert(ParsedSchema schema) throws IOException {
ProtobufSchema protobufSchema = (ProtobufSchema) schema;
return ProtoConversionUtil.getAvroSchemaForMessageDescriptor(protobufSchema.toDescriptor(), schemaConfig).toString();
return ProtoConversionUtil.getSchemaForMessageDescriptor(protobufSchema.toDescriptor(), schemaConfig).toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.utilities.sources;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.UtilHelpers;
Expand Down Expand Up @@ -112,8 +113,9 @@ protected JavaRDD<GenericRecord> toBatch(OffsetRange[] offsetRanges) {
throw new HoodieReadFromSourceException("Please provide a valid schema provider class when use ByteArrayDeserializer!");
}

//Don't want kafka offsets here so we use originalSchemaProvider
AvroConvertor convertor = new AvroConvertor(originalSchemaProvider.getSourceSchema());
// Don't want kafka offsets here so we use originalSchemaProvider
HoodieSchema sourceSchema = HoodieSchema.fromAvroSchema(originalSchemaProvider.getSourceSchema());
AvroConvertor convertor = new AvroConvertor(sourceSchema);
JavaRDD<ConsumerRecord<String, byte[]>> kafkaRDDByteArray = createKafkaRDD(this.props, sparkContext, offsetGen, offsetRanges);
kafkaRDD = kafkaRDDByteArray.filter(obj -> obj.value() != null)
.map(obj -> new ConsumerRecord<>(obj.topic(), obj.partition(), obj.offset(), obj.key(), convertor.fromAvroBinary(obj.value())));
Expand All @@ -126,7 +128,8 @@ protected JavaRDD<GenericRecord> toBatch(OffsetRange[] offsetRanges) {

protected JavaRDD<GenericRecord> maybeAppendKafkaOffsets(JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD) {
if (this.shouldAddOffsets) {
AvroConvertor convertor = new AvroConvertor(schemaProvider.getSourceSchema());
HoodieSchema sourceSchema = HoodieSchema.fromAvroSchema(schemaProvider.getSourceSchema());
AvroConvertor convertor = new AvroConvertor(sourceSchema);
return kafkaRDD.map(convertor::withKafkaFieldsAppended);
} else {
return kafkaRDD.map(consumerRecord -> (GenericRecord) consumerRecord.value());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
package org.apache.hudi.utilities.sources.helpers;

import org.apache.hudi.avro.MercifulJsonConverter;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.internal.schema.HoodieSchemaException;

import com.google.protobuf.Message;
import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -51,12 +51,17 @@
public class AvroConvertor implements Serializable {

private static final long serialVersionUID = 1L;

/**
* Serializable string representation of the schema.
*/
private final String schemaStr;

/**
* To be lazily initialized on executors.
*/
private transient Schema schema;
private transient HoodieSchema schema;

private final String schemaStr;
private final String invalidCharMask;
private final boolean shouldSanitize;

Expand All @@ -65,7 +70,6 @@ public class AvroConvertor implements Serializable {
*/
private transient MercifulJsonConverter jsonConverter;


/**
* To be lazily initialized on executors.
*/
Expand All @@ -81,27 +85,25 @@ public AvroConvertor(String schemaStr, boolean shouldSanitize, String invalidCha
this.invalidCharMask = invalidCharMask;
}

public AvroConvertor(Schema schema) {
this(schema, SANITIZE_SCHEMA_FIELD_NAMES.defaultValue(), SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.defaultValue());
public AvroConvertor(HoodieSchema hoodieSchema) {
this(hoodieSchema, SANITIZE_SCHEMA_FIELD_NAMES.defaultValue(), SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.defaultValue());
}

public AvroConvertor(Schema schema, boolean shouldSanitize, String invalidCharMask) {
this.schemaStr = schema.toString();
this.schema = schema;
public AvroConvertor(HoodieSchema hoodieSchema, boolean shouldSanitize, String invalidCharMask) {
this.schemaStr = hoodieSchema.toString();
this.shouldSanitize = shouldSanitize;
this.invalidCharMask = invalidCharMask;
}

private void initSchema() {
if (schema == null) {
Schema.Parser parser = new Schema.Parser();
schema = parser.parse(schemaStr);
schema = HoodieSchema.parse(schemaStr);
}
}

private void initInjection() {
if (recordInjection == null) {
recordInjection = GenericAvroCodecs.toBinary(schema);
recordInjection = GenericAvroCodecs.toBinary(schema.toAvroSchema());
}
}

Expand All @@ -115,11 +117,11 @@ public GenericRecord fromJson(String json) {
try {
initSchema();
initJsonConvertor();
return jsonConverter.convert(json, schema);
return jsonConverter.convert(json, schema.toAvroSchema());
} catch (Exception e) {
String errorMessage = "Failed to convert JSON string to Avro record: ";
if (json != null) {
throw new HoodieSchemaException(errorMessage + json + "; schema: " + schemaStr, e);
throw new HoodieSchemaException(errorMessage + json + "; schema: " + schema, e);
} else {
throw new HoodieSchemaException(errorMessage + "JSON string was null.", e);
}
Expand All @@ -136,12 +138,9 @@ public Either<GenericRecord,String> fromJsonWithError(String json) {
return new Left(genericRecord);
}

public Schema getSchema() {
try {
return new Schema.Parser().parse(schemaStr);
} catch (Exception e) {
throw new HoodieSchemaException("Failed to parse json schema: " + schemaStr, e);
}
public HoodieSchema getSchema() {
initSchema();
return schema;
}

public GenericRecord fromAvroBinary(byte[] avroBinary) {
Expand Down Expand Up @@ -174,10 +173,10 @@ public GenericRecord fromProtoMessage(Message message) {
public GenericRecord withKafkaFieldsAppended(ConsumerRecord consumerRecord) {
initSchema();
GenericRecord recordValue = (GenericRecord) consumerRecord.value();
GenericRecordBuilder recordBuilder = new GenericRecordBuilder(this.schema);
for (Schema.Field field : recordValue.getSchema().getFields()) {
GenericRecordBuilder recordBuilder = new GenericRecordBuilder(this.schema.toAvroSchema());
recordValue.getSchema().getFields().forEach(field -> {
recordBuilder.set(field, recordValue.get(field.name()));
}
});
String recordKey = StringUtils.objToString(consumerRecord.key());
recordBuilder.set(KAFKA_SOURCE_OFFSET_COLUMN, consumerRecord.offset());
recordBuilder.set(KAFKA_SOURCE_PARTITION_COLUMN, consumerRecord.partition());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package org.apache.hudi.utilities.sources.helpers;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchema.TimePrecision;
import org.apache.hudi.common.schema.HoodieSchemaType;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
Expand Down Expand Up @@ -72,25 +75,26 @@
public class ProtoConversionUtil {

/**
* Creates an Avro {@link Schema} for the provided class. Assumes that the class is a protobuf {@link Message}.
* Creates a HoodieSchema for the provided class. Assumes that the class is a protobuf {@link Message}.
* @param clazz The protobuf class
* @param schemaConfig configuration used to determine how to handle particular cases when converting from the proto schema
* @return An Avro schema
* @return A HoodieSchema
*/
public static Schema getAvroSchemaForMessageClass(Class clazz, SchemaConfig schemaConfig) {

return new AvroSupport(schemaConfig).getSchema(clazz);
public static HoodieSchema getSchemaForMessageClass(Class clazz, SchemaConfig schemaConfig) {
Schema avroSchema = new AvroSupport(schemaConfig).getSchema(clazz);
return HoodieSchema.fromAvroSchema(avroSchema);
}

/**
* Creates an Avro {@link Schema} for the provided {@link Descriptors.Descriptor}.
* Creates a HoodieSchema for the provided {@link Descriptors.Descriptor}.
* Intended for use when the descriptor is provided by an external registry.
* @param descriptor The protobuf descriptor
* @param schemaConfig configuration used to determine how to handle particular cases when converting from the proto schema
* @return An Avro schema
* @return A HoodieSchema
*/
public static Schema getAvroSchemaForMessageDescriptor(Descriptors.Descriptor descriptor, SchemaConfig schemaConfig) {
return new AvroSupport(schemaConfig).getSchema(descriptor);
public static HoodieSchema getSchemaForMessageDescriptor(Descriptors.Descriptor descriptor, SchemaConfig schemaConfig) {
Schema avroSchema = new AvroSupport(schemaConfig).getSchema(descriptor);
return HoodieSchema.fromAvroSchema(avroSchema);
}

/**
Expand All @@ -99,7 +103,7 @@ public static Schema getAvroSchemaForMessageDescriptor(Descriptors.Descriptor de
* @param message the source message to convert
* @return an Avro GenericRecord
*/
public static GenericRecord convertToAvro(Schema schema, Message message) {
public static GenericRecord convertToAvro(HoodieSchema schema, Message message) {
return AvroSupport.convert(schema, message);
}

Expand Down Expand Up @@ -183,7 +187,7 @@ private AvroSupport(SchemaConfig schemaConfig) {
this.timestampsAsRecords = schemaConfig.isTimestampsAsRecords();
}

public static GenericRecord convert(Schema schema, Message message) {
public static GenericRecord convert(HoodieSchema schema, Message message) {
return (GenericRecord) convertObject(schema, message);
}

Expand Down Expand Up @@ -374,13 +378,13 @@ private static Descriptors.FieldDescriptor[] getOrderedFields(Schema schema, Mes
});
}

private static Object convertObject(Schema schema, Object value) {
private static Object convertObject(HoodieSchema schema, Object value) {
if (value == null) {
return null;
}
// if we've reached max recursion depth in the provided schema, write out message to bytes
if (RECURSION_OVERFLOW_SCHEMA.getFullName().equals(schema.getFullName())) {
GenericData.Record overflowRecord = new GenericData.Record(schema);
GenericData.Record overflowRecord = new GenericData.Record(schema.toAvroSchema());
Message messageValue = (Message) value;
overflowRecord.put(OVERFLOW_DESCRIPTOR_FIELD_NAME, messageValue.getDescriptorForType().getFullName());
overflowRecord.put(OVERFLOW_BYTES_FIELD_NAME, ByteBuffer.wrap(messageValue.toByteArray()));
Expand All @@ -390,7 +394,7 @@ private static Object convertObject(Schema schema, Object value) {
switch (schema.getType()) {
case ARRAY:
List<Object> arrayValue = (List<Object>) value;
List<Object> arrayCopy = new GenericData.Array<>(arrayValue.size(), schema);
List<Object> arrayCopy = new GenericData.Array<>(arrayValue.size(), schema.toAvroSchema());
for (Object obj : arrayValue) {
arrayCopy.add(convertObject(schema.getElementType(), obj));
}
Expand All @@ -413,10 +417,10 @@ private static Object convertObject(Schema schema, Object value) {
byteBufferValue.position(start);
return ByteBuffer.wrap(bytesCopy, 0, length);
case ENUM:
return GenericData.get().createEnum(value.toString(), schema);
return GenericData.get().createEnum(value.toString(), schema.toAvroSchema());
case FIXED:
if (value instanceof byte[]) {
return GenericData.get().createFixed(null, (byte[]) value, schema);
return GenericData.get().createFixed(null, (byte[]) value, schema.toAvroSchema());
}
Object unsignedLongValue = value;
if (unsignedLongValue instanceof UInt64Value) {
Expand All @@ -427,7 +431,7 @@ private static Object convertObject(Schema schema, Object value) {
throw new HoodieException("Unexpected Message type when converting as an unsigned long: " + unsignedLongValue.getClass().getName());
}
// convert the long to its unsigned value
return DECIMAL_CONVERSION.toFixed(new BigDecimal(toUnsignedBigInteger((Long) unsignedLongValue)), schema, schema.getLogicalType());
return DECIMAL_CONVERSION.toFixed(new BigDecimal(toUnsignedBigInteger((Long) unsignedLongValue)), schema.toAvroSchema(), schema.toAvroSchema().getLogicalType());
case BOOLEAN:
case DOUBLE:
case FLOAT:
Expand All @@ -440,7 +444,7 @@ private static Object convertObject(Schema schema, Object value) {
Object tmpValue = value;
if (value instanceof Message) {
// check if this is a Timestamp
if (LogicalTypes.timestampMicros().equals(schema.getLogicalType())) {
if (schema.getType().equals(HoodieSchemaType.TIMESTAMP) && ((HoodieSchema.Timestamp) schema).getPrecision().equals(TimePrecision.MICROS)) {
if (value instanceof Timestamp) {
return Timestamps.toMicros((Timestamp) value);
} else if (value instanceof DynamicMessage) {
Expand Down Expand Up @@ -469,16 +473,16 @@ private static Object convertObject(Schema schema, Object value) {
Map<Object, Object> mapValue = (Map) value;
Map<Object, Object> mapCopy = new HashMap<>(mapValue.size());
for (Map.Entry<Object, Object> entry : mapValue.entrySet()) {
mapCopy.put(convertObject(STRING_SCHEMA, entry.getKey()), convertObject(schema.getValueType(), entry.getValue()));
mapCopy.put(convertObject(HoodieSchema.fromAvroSchema(STRING_SCHEMA), entry.getKey()), convertObject(schema.getValueType(), entry.getValue()));
}
return mapCopy;
case NULL:
return null;
case RECORD:
GenericData.Record newRecord = new GenericData.Record(schema);
GenericData.Record newRecord = new GenericData.Record(schema.toAvroSchema());
Message messageValue = (Message) value;
Descriptors.FieldDescriptor[] orderedFields = getOrderedFields(schema, messageValue);
for (Schema.Field field : schema.getFields()) {
Descriptors.FieldDescriptor[] orderedFields = getOrderedFields(schema.toAvroSchema(), messageValue);
for (Schema.Field field : schema.toAvroSchema().getFields()) {
int position = field.pos();
Descriptors.FieldDescriptor fieldDescriptor = orderedFields[position];
Object convertedValue;
Expand All @@ -488,7 +492,7 @@ private static Object convertObject(Schema schema, Object value) {
if (fieldSchema.getType() == Schema.Type.UNION && (fieldDescriptor == null || (!fieldDescriptor.isRepeated() && !messageValue.hasField(fieldDescriptor)))) {
convertedValue = null;
} else {
convertedValue = convertObject(fieldSchema, fieldDescriptor == null ? field.defaultVal() : messageValue.getField(fieldDescriptor));
convertedValue = convertObject(HoodieSchema.fromAvroSchema(fieldSchema), fieldDescriptor == null ? field.defaultVal() : messageValue.getField(fieldDescriptor));
}
newRecord.put(position, convertedValue);
}
Expand Down
Loading
Loading