Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Patch field order with avro #248

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ case class Field(

// converter from catalyst to avro
lazy val catalystToAvro: (Any) => Any ={
SchemaConverters.createConverterToAvro(dt, colName, "recordNamespace")
SchemaConverters.createConverterToAvro(dt, exeSchema.get,colName, "recordNamespace")
}

val dt =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class Avro(f:Option[Field] = None) extends SHCDataType {
// Here we assume the top level type is structType
if (f.isDefined) {
val record = f.get.catalystToAvro(input)
AvroSerde.serialize(record, f.get.schema.get)
AvroSerde.serialize(record, f.get.exeSchema.get)
} else {
throw new UnsupportedOperationException(
"Avro coder: Without field metadata, 'toBytes' conversion can not be supported")
Expand Down Expand Up @@ -243,7 +243,8 @@ object SchemaConverters {
// writing Avro records out to disk.
def createConverterToAvro(
dataType: DataType,
structName: String,
avroType: Schema,
currentFieldName: String,
recordNamespace: String): (Any) => Any = {

dataType match {
Expand All @@ -257,7 +258,11 @@ object SchemaConverters {
case TimestampType => (item: Any) =>
if (item == null) null else item.asInstanceOf[Timestamp].getTime
case ArrayType(elementType, _) =>
val elementConverter = createConverterToAvro(elementType, structName, recordNamespace)
val elementConverter = createConverterToAvro(
elementType,
avroType.getElementType,
avroType.getElementType.getName,
recordNamespace)
(item: Any) => {
if (item == null) {
null
Expand All @@ -274,7 +279,11 @@ object SchemaConverters {
}
}
case MapType(StringType, valueType, _) =>
val valueConverter = createConverterToAvro(valueType, structName, recordNamespace)
val valueConverter = createConverterToAvro(
valueType,
avroType.getValueType,
avroType.getValueType.getName,
recordNamespace)
(item: Any) => {
if (item == null) {
null
Expand All @@ -287,23 +296,44 @@ object SchemaConverters {
}
}
case structType: StructType =>
val builder = SchemaBuilder.record(structName).namespace(recordNamespace)
val schema: Schema = SchemaConverters.convertSparkStructTypeToAvro(
structType, builder, recordNamespace)
// Avro schema is the user supplied one, not the one generated from the dataset
val schema: Schema = avroType
// Build in the structType order, which has been build for:
// schema.map{ x => SchemaConverters.toSqlType(x).dataType }.get
// where shema is the Avro schema => it is not the Dataframe field order!
val fieldConverters = structType.fields.map(field =>
createConverterToAvro(field.dataType, field.name, recordNamespace))
createConverterToAvro(
field.dataType,
schema.getField(field.name).schema(),
field.name,
recordNamespace))
(item: Any) => {
if (item == null) {
null
} else {
val record = new Record(schema)
val row = item.asInstanceOf[Row]
val rowIterator = row.toSeq.iterator
val fieldNamesIterator = structType.fieldNames.iterator
val convertersIterator = fieldConverters.iterator
val fieldNamesIterator = dataType.asInstanceOf[StructType].fieldNames.iterator
val rowIterator = item.asInstanceOf[Row].toSeq.iterator

while (convertersIterator.hasNext) {
val converter = convertersIterator.next()
record.put(fieldNamesIterator.next(), converter(rowIterator.next()))
if (row.schema == null) {
// It seems we can be here with a row without schema ...

// No schema: fields in the Row have to be in the expected order
// (defined by the avro schema)
while (convertersIterator.hasNext) {
val converter = convertersIterator.next()
record.put(fieldNamesIterator.next(), converter(rowIterator.next()))
}
} else {
// The row may come for a Dataframe with a different field order
// Take them by name and not by position
while (fieldNamesIterator.hasNext) {
val fieldname = fieldNamesIterator.next()
val converter = convertersIterator.next()
record.put(fieldname, converter(row.get(row.fieldIndex(fieldname))))
}
}
record
}
Expand Down
46 changes: 42 additions & 4 deletions core/src/test/scala/org/apache/spark/sql/AvroRecordSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class AvroRecordSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAft
println(sqlUser1)
val schema = SchemaConverters.toSqlType(avroSchema)
println(s"\nSqlschema: $schema")
val avroUser1 = SchemaConverters.createConverterToAvro(schema.dataType, "avro", "example.avro")(sqlUser1)
val avroUser1 = SchemaConverters.createConverterToAvro(schema.dataType, avroSchema,"avro", "example.avro")(sqlUser1)
val avroByte = AvroSerde.serialize(avroUser1, avroSchema)
val avroUser11 = AvroSerde.deserialize(avroByte, avroSchema)
println(s"$avroUser1")
Expand All @@ -77,7 +77,7 @@ class AvroRecordSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAft
println(sqlConv)
val sqlSchema = SchemaConverters.toSqlType(avroSchema)
println(s"\nSqlschema: $sqlSchema")
val avroData = SchemaConverters.createConverterToAvro(sqlSchema.dataType, "avro", "example.avro")(sqlConv)
val avroData = SchemaConverters.createConverterToAvro(sqlSchema.dataType, avroSchema,"avro", "example.avro")(sqlConv)
val avroBytes = AvroSerde.serialize(avroData, avroSchema)
val desData = AvroSerde.deserialize(avroBytes, avroSchema)
println(s"$desData")
Expand Down Expand Up @@ -107,7 +107,7 @@ class AvroRecordSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAft
println(sqlConv)
val sqlSchema = SchemaConverters.toSqlType(avroSchema)
println(s"\nSqlschema: $sqlSchema")
val avroData = SchemaConverters.createConverterToAvro(sqlSchema.dataType, "avro", "example.avro")(sqlConv)
val avroData = SchemaConverters.createConverterToAvro(sqlSchema.dataType, avroSchema,"avro", "example.avro")(sqlConv)
val avroBytes = AvroSerde.serialize(avroData, avroSchema)
val desData = AvroSerde.deserialize(avroBytes, avroSchema)
println(s"$desData")
Expand Down Expand Up @@ -238,7 +238,7 @@ class AvroRecordSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAft
val sqlRec = SchemaConverters.createConverterToSQL(avroComplex)(avroRec)
println(s"\nsqlRec: $sqlRec")

val avroRec1 = SchemaConverters.createConverterToAvro(schema.dataType, "test_schema", "example.avro")(sqlRec)
val avroRec1 = SchemaConverters.createConverterToAvro(schema.dataType, avroComplex,"test_schema", "example.avro")(sqlRec)
println(s"\navroRec1: $avroRec1")
val avroByte = AvroSerde.serialize(avroRec1, avroComplex)
println("\nserialize")
Expand All @@ -247,4 +247,42 @@ class AvroRecordSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAft
val sqlRec1 = SchemaConverters.createConverterToSQL(avroComplex)(avroRec11)
println(s"sqlRec1: $sqlRec1")
}

test("avro not dependent on schema field order") {
val schemaString =
s"""{"namespace": "example.avro",
| "type": "record", "name": "User",
| "fields": [ {"name": "name", "type": "string"},
| {"name": "bool", "type": "boolean"} ] }""".stripMargin
val avroSchema: Schema = {
val p = new Schema.Parser
p.parse(schemaString)
}
val schemaDatasetString =
s"""{"namespace": "example.avro",
| "type": "record", "name": "User",
| "fields": [ {"name": "bool", "type": "boolean"},
| {"name": "name", "type": "string"} ] }""".stripMargin
val avroDatasetSchema: Schema = {
val p = new Schema.Parser
p.parse(schemaDatasetString)
}

val user1 = new GenericData.Record(avroDatasetSchema)
user1.put("name", "Alyssa")
user1.put("bool", true)

val user2 = new GenericData.Record(avroDatasetSchema)
user2.put("name", "Ben")
user2.put("bool", false)

val sqlUser1 = SchemaConverters.createConverterToSQL(avroDatasetSchema)(user1)
println(s"user1 from sql: $sqlUser1")
val schema = SchemaConverters.toSqlType(avroDatasetSchema)
println(s"\nSqlschema: $schema")
val avroUser1 = SchemaConverters.createConverterToAvro(schema.dataType, avroSchema,"avro", "example.avro")(sqlUser1)
val avroByte = AvroSerde.serialize(avroUser1, avroSchema)
val avroUser11 = AvroSerde.deserialize(avroByte, avroSchema)
println(s"user1 deserialized: $avroUser1")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ object AvroRecord {
println(sqlUser1)
val schema = SchemaConverters.toSqlType(avroSchema)
println(s"\nSqlschema: $schema")
val avroUser1 = SchemaConverters.createConverterToAvro(schema.dataType, "avro", "example.avro")(sqlUser1)
val avroUser1 = SchemaConverters.createConverterToAvro(schema.dataType, avroSchema, "avro", "example.avro")(sqlUser1)
val avroByte = AvroSerde.serialize(avroUser1, avroSchema)
val avroUser11 = AvroSerde.deserialize(avroByte, avroSchema)
println(s"$avroUser1")
Expand Down