-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1 from zillow/initial-port
Initial port
- Loading branch information
Showing
56 changed files
with
12,506 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
name: Go | ||
|
||
on: | ||
push: | ||
branches: [main] | ||
tags: ['v*'] | ||
pull_request: | ||
branches: ['*'] | ||
|
||
permissions: | ||
contents: read | ||
|
||
jobs: | ||
build: | ||
runs-on: ubuntu-latest | ||
strategy: | ||
matrix: | ||
go: ["1.22.x"] | ||
include: | ||
- go: 1.22.x | ||
steps: | ||
- name: Checkout code | ||
uses: actions/checkout@v4 | ||
|
||
- name: Setup Go | ||
uses: actions/setup-go@v5 | ||
with: | ||
go-version: ${{ matrix.go }} | ||
cache-dependency-path: '**/go.sum' | ||
|
||
- name: Download Dependencies | ||
run: | | ||
go mod download | ||
- name: Test | ||
run: make cover | ||
|
||
- name: Upload coverage reports to Codecov | ||
uses: codecov/codecov-action@v4 | ||
with: | ||
token: ${{ secrets.CODECOV_TOKEN }} | ||
|
||
lint: | ||
name: Lint | ||
runs-on: ubuntu-latest | ||
|
||
steps: | ||
- uses: actions/checkout@v4 | ||
name: Check out repository | ||
- uses: actions/setup-go@v5 | ||
name: Set up Go | ||
with: | ||
go-version: 1.22.x | ||
cache: false # managed by golangci-lint | ||
|
||
- uses: golangci/golangci-lint-action@v6 | ||
name: Install golangci-lint | ||
with: | ||
version: latest | ||
# Hack: Use the official action to download, but not run. | ||
# make lint below will handle actually running the linter. | ||
args: --help | ||
|
||
- run: make lint | ||
name: Lint |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Stewart Boyd <[email protected]> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
FROM golang:1.22 AS build | ||
|
||
ENV CGO_ENABLED=1 | ||
ENV GOPROXY=https://proxy.golang.org\|https://artifactory.zgtools.net/artifactory/api/go/devex-go\|direct | ||
ENV GONOSUMDB=*gitlab.zgtools.net* | ||
|
||
WORKDIR /go/src/zkafka | ||
COPY . . | ||
|
||
RUN go mod download | ||
RUN go build -o zkafka | ||
|
||
FROM debian | ||
COPY --from=build /go/src/zkafka / | ||
ENTRYPOINT ["/zkafka"] | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
.PHONY: test-no-setup | ||
test-no-setup: | ||
./coverage.sh | ||
|
||
.PHONY: setup-test | ||
setup-test: | ||
docker compose -p $$RANDOM -f ./example/docker-compose.yaml up -d | ||
|
||
.PHONY: test-local | ||
test-local: setup-test test-no-setup | ||
|
||
.PHONY: cover | ||
cover: | ||
go test -v ./... -count=1 -coverprofile=cover.out -covermode atomic && \ | ||
go tool cover -html=cover.out -o cover.html | ||
|
||
.PHONY: example-producer | ||
example-producer: | ||
go run example/producer/producer.go | ||
|
||
.PHONY: example-worker | ||
example-worker: | ||
go run example/worker/worker.go |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
# zkafka | ||
|
||
[data:image/s3,"s3://crabby-images/95837/95837cc59e0e3aa38e3ba7bd7a343f15816cfe5f" alt="License"](https://github.com/zillow/zkafka/blob/main/LICENSE) | ||
[data:image/s3,"s3://crabby-images/83888/83888bd459a40604aee084ae95b1e5eae0f70938" alt="GitHub Actions"](https://github.com/zillow/zkafka/actions/workflows/go.yml) | ||
[data:image/s3,"s3://crabby-images/bdc82/bdc821d8753425e266f6ca84dce34c4d8a237753" alt="Codecov"](https://codecov.io/gh/zillow/zkafka) | ||
|
||
|
||
## Install | ||
|
||
`go get -u github.com/zillow/zkafka` | ||
|
||
## About | ||
|
||
A library built on top of confluent-kafka-go for reading and writing to kafka with limited Schema Registry support. The | ||
library supports at least once message processing. It does so using a commit strategy built off auto commit and manual | ||
offset storage. | ||
|
||
--- | ||
**NOTE** | ||
|
||
confluent-kafka-go is a CGO module, and therefore so is zkafka. When building zkafka, make sure to set | ||
CGO_ENABLED=1. | ||
--- | ||
|
||
There are two quick definitions important to the understanding of the commit strategy | ||
|
||
1. **Commit** - involves communicating with kafka broker and durably persisting offsets on a kafka broker. | ||
2. **Store** - is the action of updating a local store of message offsets which will be persisted during the commit | ||
action | ||
|
||
## Commit Strategy: | ||
|
||
1. *Store* offset of a message for commit after processing | ||
2. *Commit* messages whose offsets have been stored at configurable intervals (`auto.commit.interval.ms`) | ||
3. *Commit* messages whose offsets have been stored when partitions are revoked | ||
(this is implicitly handled by librdkafka. To see this add debug=cgrp in ConsumerTopicConfig, and there'll be COMMIT logs after a rebalance. | ||
If doing this experience, set the `auto.commit.interval.ms` to a large value to avoid confusion between the rebalance commit) | ||
4. *Commit* messages whose offsets have been stored on close of reader | ||
(this is implicitly handled by librdkafka. To see this add debug=cgrp in ConsumerTopicConfig, and there'll be COMMIT logs after the client is closed, but before the client is destroyed) | ||
|
||
Errors returned on processing are still stored. This avoids issues due to poison pill messages (messages which will | ||
never be able to be processed without error) | ||
as well as transient errors blocking future message processing. Use WithOnDone option to register callback for | ||
additional processing of these messages. | ||
|
||
This strategy is based off | ||
of [Kafka Docs - Offset Management](https://docs.confluent.io/platform/current/clients/consumer.html#offset-management) | ||
where a strategy of asynchronous/synchronous commits is suggested to reduced duplicate messages. | ||
|
||
## Work | ||
|
||
zkafka also supports an abstraction built on top of the reader defined in the Work struct (`work.go`). Work introduces | ||
concurrency by way of the configurable option `Speedup(n int)`. This creates n goroutines which process messages as | ||
they are written to the golang channel assigned to that goroutine. Kafka key ordering is preserved (by a mechanism similar to kafka | ||
partitions) whereby a message sharing the same key will always be written to the same channel (internally, this is called a virtual partition). | ||
By default, the number of virtual partitions is equal 1. | ||
Speedup() can be increased beyond the number of assigned physical partitions without concern of data loss on account of the reader tracking in-work message offsets and only | ||
committing the lowest offset to be completed. Additionally, kafka key ordering is preserved even as the number of virtual partitions increases beyond the number of physical assigned | ||
partitions. | ||
|
||
## SchemaRegistry Support: | ||
|
||
There is limited support for schema registry in zkafka. A schemaID can be hardcoded via configuration. No | ||
communication is done with schema registry, but some primitive checks can be conducted if a schemaID is specified via | ||
configuration. | ||
|
||
### Producers | ||
|
||
Producers will include the schemaID in messages written to kafka (without any further verification). | ||
|
||
### Consumers | ||
|
||
Consumers will verify that the message they're consuming has the schemaID specified in configuration | ||
(if it's specified). Be careful here, as backwards compatible schema evolutions would be treated as an error condition | ||
as the new schemaID wouldn't match what's in the configuration. | ||
|
||
## Consumer/Producer Configuration | ||
|
||
See for description of configuration options and their defaults: | ||
|
||
1. [Consumer Configuration](https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html) | ||
2. [Producer Configurations](https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html) | ||
|
||
These are primarily specified through the TopicConfig struct. TopicConfig includes strongly typed fields which translate | ||
to librdconfig values. To see translation see config.go. An escape hatch is provided for ad hoc config properties via | ||
the AdditionalProperties map. Here config values that don't have a strongly typed version in TopicConfig may be | ||
specified. Not all specified config values will work (for example `enable.auto.commit=false` would not work with this | ||
client because that value is explicitly set to true after reading of the AdditionalProperties map). | ||
|
||
```json5 | ||
|
||
{ | ||
"KafkaTopicConfig": { | ||
"Topic": "KafkaTopicName", | ||
"BootstrapServers": [ | ||
"localhost:9093" | ||
], | ||
// translates to librdkafka value "bootstrap.servers" | ||
// specify ad hoc configuration values which don't have a strongly typed version in the TopicConfig struct. | ||
"AdditionalProperties": { | ||
"auto.commit.interval.ms": 1000, | ||
"retry.backoff.ms": 10 | ||
} | ||
} | ||
} | ||
|
||
``` | ||
|
||
3. zkafka.ProcessError | ||
|
||
The `zkafka.ProcessError` can be used to control error handling on a per-message basis. Use of this type is entirely optional. The current options exposed through this type are as follows: | ||
1. `DisableDLTWrite`: if true, the message will not be written to a dead letter topic (if one is configured) | ||
2. `DisableCircuitBreaker`: if true, the message will not count as a failed message for purposes of controlling the circuit breaker. | ||
|
||
## Installation | ||
|
||
go get -u gitlab.zgtools.net/devex/archetypes/gomods/zkafka | ||
|
||
## Running Example | ||
|
||
``` | ||
make setup-test | ||
// <Terminal 1> | ||
make example-producer | ||
// <Terminal 2> | ||
make example-worker | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
package zkafka | ||
|
||
import ( | ||
"github.com/confluentinc/confluent-kafka-go/v2/kafka" | ||
"go.opentelemetry.io/otel/propagation" | ||
) | ||
|
||
var _ propagation.TextMapCarrier = (*kMsgCarrier)(nil) | ||
var _ propagation.TextMapCarrier = (*msgCarrier)(nil) | ||
|
||
type kMsgCarrier struct { | ||
msg *kafka.Message | ||
} | ||
|
||
func (c *kMsgCarrier) Get(key string) string { | ||
for _, h := range c.msg.Headers { | ||
if h.Key == key { | ||
return string(h.Value) | ||
} | ||
} | ||
return "" | ||
} | ||
|
||
func (c *kMsgCarrier) Keys() []string { | ||
keys := make([]string, 0, len(c.msg.Headers)) | ||
for _, v := range c.msg.Headers { | ||
keys = append(keys, v.Key) | ||
} | ||
return keys | ||
} | ||
|
||
func (c *kMsgCarrier) Set(key, val string) { | ||
addStringAttribute(c.msg, key, []byte(val)) | ||
} | ||
|
||
type msgCarrier struct { | ||
msg *Message | ||
} | ||
|
||
func (c *msgCarrier) Get(key string) string { | ||
for k, v := range c.msg.Headers { | ||
if k == key { | ||
return string(v) | ||
} | ||
} | ||
return "" | ||
} | ||
|
||
func (c *msgCarrier) Keys() []string { | ||
keys := make([]string, 0, len(c.msg.Headers)) | ||
for k := range c.msg.Headers { | ||
keys = append(keys, k) | ||
} | ||
return keys | ||
} | ||
|
||
func (c *msgCarrier) Set(_, _ string) {} |
Oops, something went wrong.