Skip to content

Commit

Permalink
feat: extend avro support (#73)
Browse files Browse the repository at this point in the history
As of this commit, the avro support is almost complete. Missing:

- duration type. I'd like to think about it a little more before doing
  this
- we don't carry over to schema types some metadata (like
  precision/scale)
  • Loading branch information
lucapette authored Dec 16, 2023
1 parent 0dd26ca commit 815a47c
Show file tree
Hide file tree
Showing 28 changed files with 925 additions and 306 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ repositories {
dependencies {
implementation("org.jetbrains.kotlin:kotlin-stdlib:1.9.10")
implementation("org.jetbrains.kotlin:kotlin-reflect:1.9.10")
implementation("org.jetbrains.kotlinx:kotlinx-datetime:0.5.0")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core-jvm:1.7.3")
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.6.0")
Expand Down
37 changes: 37 additions & 0 deletions libs/testing/src/main/avro/smoke-type.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
{
"namespace": "io.typestream.testing.avro",
"type": "record",
"name": "SmokeType",
"fields": [
{"name": "booleanField", "type": "boolean"},
{"name": "doubleField", "type": "double"},
{"name": "floatField", "type": "float"},
{"name": "intField", "type": "int"},
{"name": "longField", "type": "long"},
{"name": "stringField", "type": "string"},

{"name": "arrayField", "type": {"type": "array", "items": "string"}},
{"name": "enumField", "type": {"type": "enum", "name": "Color", "symbols": ["RED", "GREEN", "BLUE"]}},
{"name": "mapField", "type": {"type": "map", "values": "string"}},
{"name": "recordField", "type": {
"type": "record",
"name": "NestedRecord",
"fields": [
{"name": "nestedInt", "type": "int"},
{"name": "nestedString", "type": "string"}
]
}},

{"name": "dateField", "type": {"type": "int", "logicalType": "date"}},
{"name": "decimalField", "type": {"type": "bytes", "logicalType": "decimal", "precision": 9, "scale": 2}},
{"name": "localTimestampMicrosField", "type": {"type": "long", "logicalType": "local-timestamp-micros"}},
{"name": "localTimestampMillisField", "type": {"type": "long", "logicalType": "local-timestamp-millis"}},
{"name": "timeMicrosField", "type": {"type": "long", "logicalType": "time-micros"}},
{"name": "timeMillisField", "type": {"type": "int", "logicalType": "time-millis"}},
{"name": "timestampMicrosField", "type": {"type": "long", "logicalType": "timestamp-micros"}},
{"name": "timestampMillisField", "type": {"type": "long", "logicalType": "timestamp-millis"}},
{"name": "uuidField", "type": {"type": "string", "logicalType": "uuid"}},

{"name": "optionalField", "type": ["null", "string"], "default": null}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.typestream.testing.model

import io.typestream.testing.avro.Color
import io.typestream.testing.avro.NestedRecord
import java.time.Instant
import java.time.LocalDate
import java.time.LocalTime
import java.time.ZoneId
import java.util.UUID
import io.typestream.testing.avro.SmokeType as AvroSmokeType

data class SmokeType(override val id: String = "42") : TestRecord {
override fun toAvro(): AvroSmokeType {
val instant = Instant.ofEpochMilli(1576226562000L)
val localDate = LocalDate.ofInstant(instant, ZoneId.systemDefault())
val localTime = LocalTime.ofInstant(instant, ZoneId.systemDefault())
val localDateTime = localTime.atDate(localDate)

return AvroSmokeType.newBuilder()
.setBooleanField(true)
.setDoubleField(1.0)
.setFloatField(2.0f)
.setIntField(3)
.setLongField(4L)
.setStringField("5")
.setArrayField(listOf("a", "b", "c"))
.setEnumField(Color.RED)
.setMapField(mapOf("key" to "value"))
.setRecordField(NestedRecord.newBuilder().setNestedInt(11).setNestedString("12").build())
.setDateField(localDate)
.setDecimalField("13.00".toBigDecimal())
.setLocalTimestampMicrosField(localDateTime)
.setLocalTimestampMillisField(localDateTime)
.setTimeMicrosField(localTime)
.setTimeMillisField(localTime)
.setTimestampMicrosField(instant)
.setTimestampMillisField(instant)
.setUuidField(UUID.fromString("2F5C4556-B5A1-45CE-AB36-DA41AEFF7E8D"))
.setOptionalField("24")
.build()
}

override fun toProto() = TODO()
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ interface TestRecord {
Rating(key, value.bookId.toString(), value.userId.toString(), value.rating)
}

"smoke-type" -> {
require(value is io.typestream.testing.avro.SmokeType)
SmokeType()
}

"users" -> {
require(value is io.typestream.testing.avro.User)
User(value.id.toString(), value.name.toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import io.typestream.compiler.types.datastream.toAvroSchema
import io.typestream.compiler.types.datastream.toBytes
import io.typestream.compiler.types.datastream.toProtoMessage
import io.typestream.compiler.types.datastream.toProtoSchema
import io.typestream.kafka.AvroSerde
import io.typestream.kafka.avro.AvroSerde
import io.typestream.kafka.ProtoSerde
import io.typestream.kafka.StreamsBuilderWrapper
import org.apache.kafka.common.serialization.Serdes
Expand Down
3 changes: 1 addition & 2 deletions server/src/main/kotlin/io/typestream/compiler/types/Infer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package io.typestream.compiler.types
import io.typestream.compiler.ast.Command
import io.typestream.compiler.ast.Cut
import io.typestream.compiler.ast.DataCommand
import io.typestream.compiler.ast.Each
import io.typestream.compiler.ast.Echo
import io.typestream.compiler.ast.Enrich
import io.typestream.compiler.ast.ShellCommand
Expand All @@ -26,7 +25,7 @@ fun inferType(commands: List<Command>): DataStream {
resultingType.merge(
DataStream(
"/bin/cut",
Schema.Struct(command.boundArgs.map { Schema.Named(it, Schema.String.empty) })
Schema.Struct(command.boundArgs.map { Schema.Field(it, Schema.String.zeroValue) })
)
)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,39 @@ package io.typestream.compiler.types.datastream

import io.typestream.compiler.types.DataStream
import io.typestream.compiler.types.schema.Schema
import org.apache.avro.LogicalTypes
import org.apache.avro.LogicalTypes.Decimal
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecord
import org.apache.avro.Schema as AvroSchema

fun DataStream.Companion.fromAvroGenericRecord(path: String, genericRecord: GenericRecord): DataStream {
val values = genericRecord.schema.fields.map { avroField ->
avroField.toNamedValue(genericRecord)
avroField.toSchemaField(genericRecord)
}

return DataStream(path, Schema.Struct(values))
}

fun DataStream.Companion.fromAvroSchema(path: String, avroSchema: AvroSchema): DataStream {
val values = avroSchema.fields.map(AvroSchema.Field::toNamedValue)
val values = avroSchema.fields.map(AvroSchema.Field::toSchemaField)

return DataStream(path, Schema.Struct(values))
}

fun DataStream.toAvroSchema(): AvroSchema {
val parser = AvroSchema.Parser()

//TODO we shouldn't assume top level value is a struct
require(schema is Schema.Struct) { "top level value must be a struct" }

val fields = schema.value.joinToString(",") { namedValue ->
"""
{
"name": "${namedValue.name}",
"type": ${toAvroType(namedValue.value)}
}
""".trimIndent()
val fields = schema.value.joinToString(",") { field ->
"""{"name": "${field.name}","type": ${toAvroType(field.value)}}""".trimIndent()
}

val schemaDefinition = """
{
"type": "record",
"name": "${path.replace("/", "_")}",
"name": "${path.replace("/", "_").replace("-", "_")}",
"namespace": "io.typestream.avro",
"fields": [${fields}]
}
Expand All @@ -49,40 +45,192 @@ fun DataStream.toAvroSchema(): AvroSchema {

private fun toAvroType(schema: Schema): String {
return when (schema) {
is Schema.UUID -> "{\"type\":\"string\",\"logicalType\":\"uuid\"}"
is Schema.String -> "{\"type\":\"string\",\"avro.java.string\":\"String\"}"
is Schema.Long -> "{\"type\":\"long\"}"
is Schema.Int -> "{\"type\":\"int\"}"
else -> error("Unsupported type: ${schema::class.simpleName}")
is Schema.Boolean -> """"boolean""""
is Schema.Date -> """{"type":"int","logicalType":"date"}"""
is Schema.DateTime -> when (schema.precision) {
Schema.DateTime.Precision.MILLIS -> """{"type":"long","logicalType":"local-timestamp-millis"}"""
Schema.DateTime.Precision.MICROS -> """{"type":"long","logicalType":"local-timestamp-micros"}"""
}

is Schema.Double -> """"double""""
is Schema.Enum -> """{"type":"enum","name": "${schema.symbols.joinToString("_")}","symbols":[${
schema.symbols.joinToString(",") { "\"$it\"" }
}]}""".trimIndent()

is Schema.Float -> """"float""""
is Schema.Decimal -> """{"type":"bytes","logicalType":"decimal","precision":9,"scale":2}"""
is Schema.Instant -> when (schema.precision) {
Schema.Instant.Precision.MILLIS -> """{"type":"long","logicalType":"timestamp-millis"}"""
Schema.Instant.Precision.MICROS -> """{"type":"long","logicalType":"timestamp-micros"}"""
}
is Schema.Int -> """"int""""
is Schema.UUID -> """{"type":"string","logicalType":"uuid"}"""
is Schema.String -> """"string""""
is Schema.Long -> """"long""""
is Schema.List -> """{"type":"array","items": ${toAvroType(schema.valueType)}}"""
is Schema.Map -> """{"type":"map","values":${toAvroType(schema.valueType)}}"""
is Schema.Optional -> """["null","string"], "default": null"""

is Schema.Struct -> {
val fields = schema.value.joinToString(",") { field ->
"""{"name": "${field.name}","type": ${toAvroType(field.value)}}"""
}

"""
{
"type": "record",
"name": "${schema.value.joinToString("_") { it.name }}",
"fields": [${fields}]
}
""".trimIndent()
}

is Schema.Time -> when (schema.precision) {
Schema.Time.Precision.MILLIS -> """{"type":"int","logicalType":"time-millis"}"""
Schema.Time.Precision.MICROS -> """{"type":"long","logicalType":"time-micros"}"""
}
}
}

private fun AvroSchema.Field.toNamedValue(): Schema.Named {
private fun AvroSchema.Field.toSchemaField(value: Any? = null): Schema.Field {
return when (schema().type) {
AvroSchema.Type.STRING -> Schema.Named(name(), Schema.String.empty)
AvroSchema.Type.INT -> Schema.Named(name(), Schema.Int(0))
AvroSchema.Type.LONG -> Schema.Named(name(), Schema.Long(0L))
AvroSchema.Type.ARRAY -> {
val elementType = schema().elementType
val avroField = AvroSchema.Field(name(), elementType)
val schemaType = avroField.toSchemaField().value

if (value is List<*> && value.isNotEmpty()) {
Schema.Field(name(), Schema.List(value.map { avroField.toSchemaField(it).value }, schemaType))
} else {
Schema.Field(name(), Schema.List(listOf(), schemaType))
}
}

AvroSchema.Type.BOOLEAN -> Schema.Field(name(), Schema.Boolean.fromAnyValue(value))

AvroSchema.Type.BYTES -> when (schema().logicalType) {
is Decimal -> Schema.Field(name(), Schema.Decimal.fromAnyValue(value))
else -> error("Unsupported type: ${schema().type}")
}

AvroSchema.Type.DOUBLE -> Schema.Field(name(), Schema.Double.fromAnyValue(value))
AvroSchema.Type.ENUM -> if (value != null) {
Schema.Field(name(), Schema.Enum(value.toString(), schema().enumSymbols))
} else {
Schema.Field(name(), Schema.Enum("", schema().enumSymbols))
}

AvroSchema.Type.FIXED -> when (schema().logicalType) {
is Decimal -> Schema.Field(name(), Schema.Decimal.fromAnyValue(value))
else -> error("Unsupported type: ${schema().type}")
}

AvroSchema.Type.FLOAT -> Schema.Field(name(), Schema.Float.fromAnyValue(value))
AvroSchema.Type.INT -> when (schema().logicalType) {
is LogicalTypes.Date -> Schema.Field(name(), Schema.Date.fromAnyValue(value))
is LogicalTypes.TimeMillis -> Schema.Field(
name(),
Schema.Time.fromAnyValue(value, Schema.Time.Precision.MILLIS)
)

else -> Schema.Field(name(), Schema.Int.fromAnyValue(value))
}


AvroSchema.Type.LONG -> when (schema().logicalType) {
is LogicalTypes.LocalTimestampMillis -> Schema.Field(
name(),
Schema.DateTime.fromAnyValue(value, Schema.DateTime.Precision.MILLIS)
)

is LogicalTypes.LocalTimestampMicros -> Schema.Field(
name(),
Schema.DateTime.fromAnyValue(value, Schema.DateTime.Precision.MICROS)
)

is LogicalTypes.TimeMicros -> Schema.Field(
name(),
Schema.Time.fromAnyValue(value, Schema.Time.Precision.MICROS)
)

is LogicalTypes.TimestampMillis -> Schema.Field(
name(),
Schema.Instant.fromAnyValue(value, Schema.Instant.Precision.MILLIS)
)

is LogicalTypes.TimestampMicros -> Schema.Field(
name(),
Schema.Instant.fromAnyValue(value, Schema.Instant.Precision.MICROS)
)

else -> Schema.Field(name(), Schema.Long.fromAnyValue(value))
}


AvroSchema.Type.MAP -> {
val valueType = schema().valueType
val avroField = AvroSchema.Field(name(), valueType)
val schemaType = avroField.toSchemaField().value

if (value is Map<*, *> && value.isNotEmpty()) {
Schema.Field(
name(),
Schema.Map(
value.map { it.key.toString() to avroField.toSchemaField(it.value).value }.toMap(),
schemaType
)
)
} else {
Schema.Field(name(), Schema.Map(mapOf(), schemaType))
}
}

AvroSchema.Type.RECORD -> {
val fields = schema().fields.map {
if (value is GenericRecord) {
it.toSchemaField(value.get(it.name()))
} else {
it.toSchemaField()
}
}
Schema.Field(name(), Schema.Struct(fields))
}

AvroSchema.Type.STRING -> when (schema().logicalType) {
LogicalTypes.uuid() -> Schema.Field(name(), Schema.UUID.fromAnyValue(value))
else -> Schema.Field(name(), Schema.String.fromAnyValue(value))
}

AvroSchema.Type.UNION -> {
val types = schema().types
if (types.size == 2 && types.count { it.type == AvroSchema.Type.NULL } == 1) {
val nonNullType = types.first { it.type != AvroSchema.Type.NULL }
val optionalType = AvroSchema.Field(name(), nonNullType, null).toSchemaField(value)

Schema.Field(name(), Schema.Optional(optionalType.value))
} else {
error("Unsupported type: ${schema().type}")
}
}

else -> error("Unsupported type: ${schema().type}")
}
}

private fun AvroSchema.Field.toNamedValue(genericRecord: GenericRecord): Schema.Named {
return when (schema().type) {
AvroSchema.Type.STRING -> Schema.Named(name(), Schema.String(genericRecord[name()].toString()))
AvroSchema.Type.INT -> Schema.Named(name(), Schema.Int(genericRecord[name()] as Int))
AvroSchema.Type.LONG -> Schema.Named(name(), Schema.Long(genericRecord[name()] as Long))
else -> error("Unsupported type: ${schema().type}")
private fun AvroSchema.Field.toSchemaField(genericRecord: GenericRecord): Schema.Field {
if (schema().type == AvroSchema.Type.RECORD) {
return toSchemaField(genericRecord.get(pos()))
}
return toSchemaField(genericRecord[name()])
}

fun DataStream.toAvroGenericRecord(): GenericRecord {
val genericRecord = GenericData.Record(toAvroSchema())

//TODO we shouldn't assume top level value is a struct
require(schema is Schema.Struct) { "Top level value must be a struct" }
schema.value.forEach { namedValue ->
genericRecord.put(namedValue.name, namedValue.value.value)
}

schema.value.forEach { field -> genericRecord.put(field.name, field.value.value) }

return genericRecord
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import io.typestream.compiler.types.schema.Schema
fun DataStream.join(b: DataStream): DataStream {
val path = if (path == b.path) path else "${path}_${b.path.substringAfterLast("/")}"

val schema = Schema.Struct(listOf(Schema.Named(name, schema), Schema.Named(b.name, b.schema)))
val schema = Schema.Struct(listOf(Schema.Field(name, schema), Schema.Field(b.name, b.schema)))

return DataStream(path, schema)
}
Loading

0 comments on commit 815a47c

Please sign in to comment.