Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Getting SchemaProjectorException after schema evolution with enum #728

Open
Cauac opened this issue Feb 28, 2024 · 0 comments
Open

Getting SchemaProjectorException after schema evolution with enum #728

Cauac opened this issue Feb 28, 2024 · 0 comments

Comments

@Cauac
Copy link

Cauac commented Feb 28, 2024

Context

An S3 Sink connector is configured to dump messages from a topic in AWS MSK into an S3 bucket. The Kafka messages are serialized in AVRO format using schema from Schema Registry. The schema contains different types including emuns. When upgrading the schema, the new version is always BACKWARD compatible. It takes type before all our producers are synchronized and switched to the latest schema, meaning that the Kafka topic can contain a combination of messages serialized with the previous and the latest schema (I don't think this is a unique scenario).

Issue

After one such update, our S3 Sink connector started to reject all messages with the previous schema even though the changes were BACKWARD compatible.

Test scenario

Here is a simplified test case where I was able to reproduce the same error.

Components

  • Kafka: 2.8.1 (AWS MSK)
  • Schema Registry: 7.6.0
  • Kafka Connect: 7.6.0
  • kafka-connect-s3: 10.5.8
  • kafka-connect-avro-converter: 7.6.0

Schema

Screenshot 2024-02-28 at 17 01 15

S3 Connector

  config = {
    "name"                = "s3_sink_test_topic"
    "connector.class"     = "io.confluent.connect.s3.S3SinkConnector"
    "topics"              = "test-topic"
    "format.class"        = "io.confluent.connect.s3.format.parquet.ParquetFormat"
    "s3.bucket.name"      = "..."
    "s3.object.tagging"   = "true"
    "s3.region"           = "..."
    "storage.class"       = "io.confluent.connect.s3.storage.S3Storage"
    "topics.dir"          = "parquet"
    "partitioner.class"   = "io.confluent.connect.storage.partitioner.DailyPartitioner"
    "rotate.interval.ms"  = "3600000"
    "timezone"            = "UTC"
    "locale"              = "en"
    "flush.size"          = "5"

    "key.converter"                                = "org.apache.kafka.connect.storage.StringConverter"
    "value.converter"                              = "io.confluent.connect.avro.AvroConverter"
    "value.converter.schema.registry.url"          = "..."
    "enhanced.avro.schema.support"                 = "true"
    "schema.compatibility"                         = "BACKWARD"

    "errors.tolerance"                                = "all"
    "errors.deadletterqueue.topic.name"               = "test-topic-dlq"
    "errors.deadletterqueue.topic.replication.factor" = 3
    "errors.deadletterqueue.context.headers.enable"   = true
  }

Test dataset (test-data.json)

{"color": {"com.example.Color" : "RED"}}
{"color": {"com.example.Color" : "GREEN"}}

Kafka producer

The following command was used to populate Kafka with test data using v1 and v2 schema.

kafka-avro-console-producer \
  --bootstrap-server <.....> \
  --topic test-topic \
  --producer.config client.properties \
  --property schema.registry.url=<.....> \
  --property value.schema.id=<....> \
  < test-data.json

Kafka topic overview

kafka-error

Error message

__connect.errors.exception.stacktrace	org.apache.kafka.connect.errors.SchemaProjectorException: Error projecting color
	at org.apache.kafka.connect.data.SchemaProjector.projectStruct(SchemaProjector.java:113)
	at org.apache.kafka.connect.data.SchemaProjector.projectRequiredSchema(SchemaProjector.java:93)
	at org.apache.kafka.connect.data.SchemaProjector.project(SchemaProjector.java:73)
	at io.confluent.connect.storage.schema.StorageSchemaCompatibility.projectInternal(StorageSchemaCompatibility.java:406)
	at io.confluent.connect.storage.schema.StorageSchemaCompatibility.projectInternal(StorageSchemaCompatibility.java:394)
	at io.confluent.connect.storage.schema.StorageSchemaCompatibility.project(StorageSchemaCompatibility.java:367)
	at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:326)
	at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:267)
	at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:218)
	at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:247)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
	at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.SchemaProjectorException: Schema parameters not equal. source parameters: 
{io.confluent.connect.avro.Enum=com.example.Color, io.confluent.connect.avro.Enum.RED=RED, io.confluent.connect.avro.Enum.GREEN=GREEN} 
and target parameters:
 {io.confluent.connect.avro.Enum=com.example.Color, io.confluent.connect.avro.Enum.RED=RED, io.confluent.connect.avro.Enum.GREEN=GREEN, io.confluent.connect.avro.Enum.BLACK=BLACK}
	at org.apache.kafka.connect.data.SchemaProjector.checkMaybeCompatible(SchemaProjector.java:133)
	at org.apache.kafka.connect.data.SchemaProjector.project(SchemaProjector.java:60)
	at org.apache.kafka.connect.data.SchemaProjector.projectStruct(SchemaProjector.java:110)
	... 21 more
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant