Skip to content

Kafka Sink - KafkaCustomProducer - Error occurred while publishing You must configure() before serialize() or use serializer constructor with SchemaRegistryClient #6191

@couyangm

Description

@couyangm

Describe the bug
Error occurs when configuring kafka sink with avro serializer and schema registry.

To Reproduce
Steps to reproduce the behavior:

  1. Start zookeeper, kafka, and confluent schema registry
  2. Create topic and schema
  3. Configure kafka sink (example below)
  4. Start data prepper
  5. See error (example below)

Expected behavior
kafka producer serializes and sends the event using avro serializer which gets the schema from the configured registry URL

Samples
pipelines/pipelines.yaml

test-pipeline:
  source:
    file:
      path: /usr/share/data-prepper/input.json.log
      record_type: event
      format: json
  sink:
  - kafka:
      bootstrap_servers:
      - "localhost:9092"
      partition_key: "/name"
      topic:
        name: test_avro_topic
        serde_format: avro
      serde_format: avro
      schema:
        registry_url: "http://localhost:8081"
        version: 1
        is_schema_create: true
        schema_file_location: /usr/share/data-prepper/test_schema.avsc
      producer_properties:
        partitioner_ignore_keys: true
      encryption:
        type: none

input.json.log
{ "name": "test1", "value": "Hello, world!" }

test_schema.avsc

{
  "type": "record",
  "namespace": "test_avro_topic",
  "name": "namevalue",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "value",
      "type": "string"
    }
  ]
}

log output

2025-10-21T01:22:38,115 [main] WARN  org.opensearch.dataprepper.core.pipeline.server.config.DataPrepperServerConfiguration - Creating data prepper server without authentication. This is not secure.
2025-10-21T01:22:38,115 [main] WARN  org.opensearch.dataprepper.core.pipeline.server.config.DataPrepperServerConfiguration - In order to set up Http Basic authentication for the data prepper server, go here: https://github.com/opensearch-project/data-prepper/blob/main/docs/core_apis.md#authentication
2025-10-21T01:22:38,156 [test-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.source.file.FileSource - Starting file source with /usr/share/data-prepper/input.json.log path.
2025-10-21T01:22:38,162 [main] WARN  org.opensearch.dataprepper.core.pipeline.server.HttpServerProvider - Creating Data Prepper server without TLS. This is not secure.
2025-10-21T01:22:38,162 [main] WARN  org.opensearch.dataprepper.core.pipeline.server.HttpServerProvider - In order to set up TLS for the Data Prepper server, go here: https://github.com/opensearch-project/data-prepper/blob/main/docs/configuration.md#server-configuration
2025-10-21T01:22:38,182 [file-source] INFO  org.opensearch.dataprepper.plugins.source.file.FileSource - Completed reading file.
2025-10-21T01:22:41,250 [test-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer - Producer properties
2025-10-21T01:22:41,250 [test-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer - property socket.connection.setup.timeout.max.ms value30000
2025-10-21T01:22:41,250 [test-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer - property metadata.max.age.ms value300000
2025-10-21T01:22:41,250 [test-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer - property socket.connection.setup.timeout.ms value10000
2025-10-21T01:22:41,250 [test-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer - property reconnect.backoff.max.ms value1000
2025-10-21T01:22:41,250 [test-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer - property request.timeout.ms value30000
2025-10-21T01:22:41,251 [test-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer - property reconnect.backoff.ms value50
2025-10-21T01:22:41,251 [test-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer - property bootstrap.servers valuelocalhost:9092
2025-10-21T01:22:41,251 [test-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer - property delivery.timeout.ms value120000
2025-10-21T01:22:41,251 [test-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer - property receive.buffer.bytes value52428800
2025-10-21T01:22:41,251 [test-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer - property retry.backoff.ms value100
2025-10-21T01:22:41,251 [test-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer - property buffer.memory value52428800
2025-10-21T01:22:41,251 [test-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer - property partitioner.adaptive.partitioning.enable valuefalse
2025-10-21T01:22:41,251 [test-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer - property metadata.max.idle.ms value300000
2025-10-21T01:22:41,251 [test-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer - property schema.registry.url valuehttp://localhost:8081
2025-10-21T01:22:41,251 [test-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer - property retries value5
2025-10-21T01:22:41,251 [test-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer - property max.request.size value1048576
2025-10-21T01:22:41,251 [test-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer - property metrics.sample.window.ms value300000
2025-10-21T01:22:41,251 [test-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer - property max.block.ms value60000
2025-10-21T01:22:41,252 [test-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer - property connections.max.idle.ms value540000
2025-10-21T01:22:41,252 [test-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer - property send.buffer.bytes value52428800
2025-10-21T01:22:41,252 [test-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer - property session.timeout.ms value30000
2025-10-21T01:22:41,252 [test-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer - property partitioner.ignore.keys valuetrue
2025-10-21T01:22:41,252 [test-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer - Producer properties ends
2025-10-21T01:22:41,564 [pool-5-thread-1] ERROR org.opensearch.dataprepper.plugins.kafka.producer.KafkaCustomProducer - Error occurred while publishing You must configure() before serialize() or use serializer constructor with SchemaRegistryClient

Environment (please complete the following information):

  • OS: Ubuntu 24.04.3 LTS
  • Version: opensearch-data-prepper-2.13.0-SNAPSHOT-linux-x64 (also observed on 2.12.1)

Additional context
The documentation for this plugin is very incomplete so it took a lot of trial-and-error and source code digging to arrive at this configuration, so any part of this could be wrong.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    Status

    Unplanned

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions