We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
(Note: this is some scala code)
But if I have some scala code like:
val stream = builder.stream(topicName)( Consumed.`with`(keySerde, valueSerde) ).mapValues { v => val rec = v.asInstanceOf[GenericRecord]
It requires me to do an explicit type cast since AWSKafkaAvroSerDe implements Serde[Object]
AWSKafkaAvroSerDe
Serde[Object]
It would be much better ergonomics to have something like:
package foo import com.amazonaws.services.schemaregistry.kafkastreams.AWSKafkaAvroSerDe import com.amazonaws.services.schemaregistry.utils.{AWSSchemaRegistryConstants, AvroRecordType} import org.apache.avro.generic.GenericRecord import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer} import software.amazon.awssdk.services.glue.model.DataFormat import scala.collection.JavaConverters._ object GenericRecordAWSKafkaAvroSerDe { private val formatProps = Map( AWSSchemaRegistryConstants.AVRO_RECORD_TYPE -> AvroRecordType.GENERIC_RECORD.getName, AWSSchemaRegistryConstants.DATA_FORMAT -> DataFormat.AVRO.name(), ) } class GenericRecordAWSKafkaAvroSerDe extends Serde[GenericRecord] { private val inner: AWSKafkaAvroSerDe = new AWSKafkaAvroSerDe() override def serializer(): Serializer[GenericRecord] = inner.serializer().asInstanceOf[Serializer[GenericRecord]] override def deserializer(): Deserializer[GenericRecord] = inner.deserializer().asInstanceOf[Deserializer[GenericRecord]] /** * Configure the serializer and de-serializer wrapper. * * @param serdeConfig configuration elements for the wrapper * @param isSerdeForRecordKeys true if key, false otherwise */ final def configure(serdeConfig: Map[String, _], isSerdeForRecordKeys: Boolean): Unit = { val updatedConfig = GenericRecordAWSKafkaAvroSerDe.formatProps ++ serdeConfig inner.serializer.configure(updatedConfig.asJava, isSerdeForRecordKeys) inner.deserializer.configure(updatedConfig.asJava, isSerdeForRecordKeys) } }
and a corresponding one for SpecificRecord.
SpecificRecord
The text was updated successfully, but these errors were encountered:
No branches or pull requests
(Note: this is some scala code)
But if I have some scala code like:
It requires me to do an explicit type cast since
AWSKafkaAvroSerDe
implementsSerde[Object]
It would be much better ergonomics to have something like:
and a corresponding one for
SpecificRecord
.The text was updated successfully, but these errors were encountered: