Skip to content

Schema Registry: Support custom schemaId framing #1688

@wolfchimneyrock

Description

@wolfchimneyrock

Description

Currently the code in the schema_registry serdes to write the schemaId into the payload is duplicated across the three supported serdes (avro, protobuf, and json) and also hardcoded to work with the confluent wire format.

There has been interest in the past to add flexibility to this in #679 and #1119. Now our organization is interested in deploying apicurio registry as a service (as a generic not-just-kafka solution) alongside some existing confluent compatible (kafka-specific) usage.

It would be useful to be able to configure this in the serdes to add support for apicurio's framing, as well as open this up to future customizations for others. My proposal, which I will open as a PR:

Interface:

the serdes classes get a new configuration option schemaid.location that requires a callable that returns a 2-tuple of functions that perform the reading and writing of the schemaid respectively.

two such callables are defined initially: confluent_payload_framing and apicurio_payload_framing with the confluent one being the default if the config isn't specified (to maintain backwards compatibility)

an example client wanting to use apicurio framing:

from confluent_kafka.schema_registry import apicurio_payload_framing
...
avro_conf = {'schemaid.location': apicurio_payload_framing}
avro_serializer = AvroSerializer(schema_registry_client, schema_str, conf=avro_conf)

the simple contents of confluent_payload_framing:

def confluent_payload_framing(ctx):
    def reader(payload):
        if len(payload) <= 5:
            raise SerializationError("Expecting data framing of length 6 bytes or "
                                     "more but total data size is {} bytes. This "
                                     "message was not produced with a Confluent "
                                     "Schema Registry serializer".format(len(data)))
        magic, schema_id = struct.unpack('>bI', payload.read(5))
        if magic != _MAGIC_BYTE:
            raise SerializationError("Unexpected magic byte {}. This message "
                                     "was not produced with a Confluent "
                                     "Schema Registry serializer".format(magic))
        return schema_id

    def writer(fo, schema_id):
        fo.write(struct.pack('>bI', _MAGIC_BYTE, schema_id))

    return reader, writer

Notice that ctx is passed in. This is to enable possible future kafka header based schemaid location support.

As an alternative to the 2-tuple of callable return, I could also work this as a class returned, but I thought this was simpler.

Metadata

Metadata

Assignees

No one assigned

    Labels

    component:schema-registryAny schema registry related isues rather than kafka isolated onesinvestigate furtherIt's unclear what the issue is at this time but there is enough interest to look into it

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions