Skip to content

feat(kafka): add franz-go protocol binding#1277

Open
akme wants to merge 1 commit into
cloudevents:mainfrom
akme:main
Open

feat(kafka): add franz-go protocol binding#1277
akme wants to merge 1 commit into
cloudevents:mainfrom
akme:main

Conversation

@akme
Copy link
Copy Markdown

@akme akme commented Apr 17, 2026

Summary

Adds a new Kafka protocol binding based on franz-go to address the feature request in #1233.

This PR introduces a new module at protocol/kafka_franz/v2 with:

  • CloudEvents message decoding from kgo.Record
  • CloudEvents encoding into kgo.Record
  • franz-go based sender / receiver protocol support
  • topic override support via cecontext.WithTopic
  • message key support via WithMessageKey
  • ACK-driven offset commits for protocol-owned franz-go clients

Why

Many Go services already use franz-go as their Kafka client. Without a native binding, users have to maintain custom adapters to integrate franz-go with the CloudEvents SDK.

This adds a first-class franz-go transport alongside the existing Sarama and Confluent Kafka bindings.

Additional changes

  • Adds unit tests covering:
    • message encoding / decoding
    • protocol send behavior
    • ACK to commit behavior
    • fetch error handling
  • Updates protocol documentation to list the new franz-go Kafka binding

Validation

  • env GOCACHE=/tmp/go-build-cache go test ./... in protocol/kafka_franz/v2

@akme akme requested a review from a team as a code owner April 17, 2026 08:58
@embano1
Copy link
Copy Markdown
Member

embano1 commented Apr 26, 2026

Hi, sorry for my delayed response. This would be the 3rd Kafka protocol implementation we'd have to maintain. I understand the motivation, but was wondering if we then should drop an existing protocol which is not maintained anymore? I'm a bit worried that adding more protocol implementations for Kafka further increase our dependency footprint and maintenance effort, especially to guard against the increasing risk of supply chain attacks.

@duglin thoughts?

@duglin
Copy link
Copy Markdown
Contributor

duglin commented May 1, 2026

I'm not an expert on Kafka, so help me understand why we need more than one. Isn't Kafka itself standardized?

And if each impl has slightly different "extras" that people are looking for then perhaps the SDK needs to be designed such that the protocol specific stuff isn't part of the SDK so that each user can do use whatever they want w/o needing us to add support for it. Then the SDK moves to just: how to extract a CE from a blob of data / how to produce a CE blob (e.g. json or provide call-backs to let the protocol specific code iterate of the CE attributes so they can add the headers, or whatever the protocol needs).

I know it's a bit radical, but I'm not thrilled with shifting the job of "sending data over some protocol" into a CE SDK, like we seem to be doing today. To me, CE isn't a protocol - it's more like an add-on for a "protocol user". I'm not sure why a CE SDK became the "manager of the transport". Imagine if there was a secondary CE-type of spec that people wanted to support at the same time as CE for the same message... which one would be responsible for sending the message?

@akme
Copy link
Copy Markdown
Author

akme commented May 1, 2026

Thanks for the discussion here.

I agree that adding support for every Kafka client library directly in this repo would be cumbersome and difficult to maintain.

At the same time, I think this PR highlights a real onboarding gap. Users who want to use CloudEvents with a transport or client library that is not directly supported still need to figure out quite a bit of binding and integration logic themselves.

So maybe the right direction is not first-class support for each Kafka library, but some lightweight integration layer, examples, or guidance that makes it easier to bring different transports or clients without duplicating too much logic.

Also agree that client-library sprawl should be avoided, but I’d still like us to find a way to make onboarding custom transports easier.

@duglin
Copy link
Copy Markdown
Contributor

duglin commented May 1, 2026

Yes an "integration" layer is more what I had in mind. E.g. utils to help with:

  • convert a JSON object (structured CE) into a CE
  • convert a CE into a structured JSON object
  • a func that (given a list of "headers"-like things, and a binding name (e.g. kafka) so we know how the CE attributes are encoded (e.g. ce-* vs ce_*)), it'll invoke a call-back with for each header/attribute so the app doesn't need to understand how the CE attributes are encoded.

Yes this means the app will need to know if they're doing structured vs binary CE, but we might be able to help with that with a func like:

ceProcess(headers, body, protocol, attrCallback) []byte

where it'll examine the headers to determine (based on the protocol) whether it's binary or structured. Then call the attrCallback for each attr and return the "data" as the []byte array.

And we might be to do something similar with the outbound side of things.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants