Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 90 additions & 11 deletions data-avro/src/main/java/com/linkedin/data/avro/DataTranslator.java
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ private Object translate(Object value, DataSchema dataSchema, Schema avroSchema)
Schema recordAvroSchema = record.getSchema();
RecordDataSchema recordDataSchema = (RecordDataSchema) dereferencedDataSchema;
dataMap = new DataMap(avroSchema.getFields().size());
String setFieldName = "";
for (RecordDataSchema.Field field : recordDataSchema.getFields())
{
String fieldName = field.getName();
Expand All @@ -370,16 +371,30 @@ private Object translate(Object value, DataSchema dataSchema, Schema avroSchema)
//explicitly check the avro record schema has this field as accessing a non-existent field throws
//under avro 1.10+
Schema.Field avroSchemaField = recordAvroSchema.getField(fieldName);
if (avroSchemaField == null)
{
continue;
Object fieldValue = null;

// If the field is field discriminator enum, but not present in the recordAvroSchema,
// then the recordAvroSchema is the schema generated from proto
// To make the avro object compatible with the one genarated from pegasus translated
// avro schema, we need to set the field discriminator enum value to the field name
// that was set in the record.
if (fieldName.equals(DataSchemaConstants.DISCRIMINATOR_FIELD) && avroSchemaField == null) {
fieldValue = setFieldName;
}
else {
if (avroSchemaField == null) {
continue;
}

Object fieldValue = record.get(avroSchemaField.pos());
fieldValue = record.get(avroSchemaField.pos());
}
if (fieldValue == null)
{
continue;
}
if (!fieldName.equals(DataSchemaConstants.DISCRIMINATOR_FIELD)) {
setFieldName = fieldName;
}
DataSchema fieldDataSchema = field.getType();
Schema fieldAvroSchema = avroSchema.getField(fieldName).schema();
if (fieldDataSchema.getDereferencedType() != DataSchema.Type.UNION && fieldAvroSchema.getType() == Schema.Type.UNION)
Expand Down Expand Up @@ -440,10 +455,47 @@ private Object translate(Object value, DataSchema dataSchema, Schema avroSchema)
return result;
}

private Schema findRecordSchemaWithoutFieldDiscriminator(Object value, Schema avroSchema) {
for (Schema schema : avroSchema.getTypes()) {
if (schema.getType() == Schema.Type.RECORD) {
Schema.Field fieldDiscriminator = schema.getField(DataSchemaConstants.DISCRIMINATOR_FIELD);

if (isEnumField(fieldDiscriminator)) {
GenericRecord record = (GenericRecord) value;

if (recordHasNonNullField(record) && record.get(DataSchemaConstants.DISCRIMINATOR_FIELD) == null) {
return schema;
}
}
}
}

return null;
}

private boolean isEnumField(Schema.Field field) {
return field != null && field.schema().getType() == Schema.Type.ENUM;
}

private boolean recordHasNonNullField(GenericRecord record) {
for (Schema.Field recordField : record.getSchema().getFields()) {
if (record.get(recordField.name()) != null) {
return true;
}
}

return false;
}

private final Map.Entry<DataSchema, Schema> findUnionMemberSchema(Object value, UnionDataSchema unionDataSchema, Schema avroSchema)
{
int index = _genericData.resolveUnion(avroSchema, value);
Schema memberAvroSchema = avroSchema.getTypes().get(index);
Schema memberAvroSchema = findRecordSchemaWithoutFieldDiscriminator(value, avroSchema);

if (memberAvroSchema == null) {
int index = _genericData.resolveUnion(avroSchema, value);
memberAvroSchema = avroSchema.getTypes().get(index);
}

String key;
switch (memberAvroSchema.getType())
{
Expand Down Expand Up @@ -479,6 +531,31 @@ private final Map.Entry<DataSchema, Schema> findUnionMemberSchema(Object value,
return new AbstractMap.SimpleEntry<>(memberDataSchema, memberAvroSchema);
}

private Object getDataMapForUnionField(String fieldName, Object fieldValue, DataSchema fieldDataSchema, Schema fieldAvroSchema)
{
DataMap dataMap = new DataMap(1);
_path.addLast(fieldName);
dataMap.put(fieldName, translate(fieldValue, fieldDataSchema, extractNonnullSchema(fieldAvroSchema)));
_path.removeLast();
return dataMap;

}

private Object translateAvroRecordToPegasusUnionWithAliasesWithoutDescriminator(GenericRecord record, UnionDataSchema unionDataSchema) {
for (Schema.Field field : record.getSchema().getFields())
{
Object fieldValue = record.get(field.name());
if (fieldValue != null)
{
DataSchema memberDataSchema = unionDataSchema.getTypeByMemberKey(field.name());
Schema memberAvroSchema = field.schema();
return getDataMapForUnionField(field.name(), fieldValue, memberDataSchema, memberAvroSchema);
}
}
appendMessage("cannot find required field in record %1$s", record);
return BAD_RESULT;
}

private Object translateAvroRecordToPegasusUnionWithAliases(Object value, UnionDataSchema unionDataSchema, Schema avroSchema)
{
Schema recordAvroSchema = extractNonnullSchema(avroSchema);
Expand All @@ -489,6 +566,7 @@ private Object translateAvroRecordToPegasusUnionWithAliases(Object value, UnionD
{
appendMessage("cannot find required field %1$s in record %2$s", DataSchemaConstants.DISCRIMINATOR_FIELD, record);
return BAD_RESULT;
// return translateAvroRecordToPegasusUnionWithAliasesWithoutDescriminator(record, unionDataSchema);
}
String fieldDiscriminator = fieldDiscriminatorValue.toString();

Expand All @@ -502,11 +580,12 @@ private Object translateAvroRecordToPegasusUnionWithAliases(Object value, UnionD
Schema fieldAvroSchema = recordAvroSchema.getField(fieldDiscriminator).schema();
DataSchema memberDataSchema = unionDataSchema.getTypeByMemberKey(fieldDiscriminator);

DataMap result = new DataMap(1);
_path.add(fieldDiscriminator);
result.put(fieldDiscriminator, translate(fieldValue, memberDataSchema, extractNonnullSchema(fieldAvroSchema)));
_path.removeLast();
return result;
// DataMap result = new DataMap(1);
// _path.add(fieldDiscriminator);
// result.put(fieldDiscriminator, translate(fieldValue, memberDataSchema, extractNonnullSchema(fieldAvroSchema)));
// _path.removeLast();
// return result;
return getDataMapForUnionField(fieldDiscriminator, fieldValue, memberDataSchema, fieldAvroSchema);
}
}
}
Expand Down
Loading