diff --git a/.golangci.yml b/.golangci.yml index 31a7ca0..33a5523 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -16,15 +16,6 @@ linters-settings: - default gosimple: go: '1.17' - govet: - check-shadowing: true - settings: - printf: - funcs: - - (gitlab.zgtools.net/devex/archetypes/gomods/zlog.Logger).Debug - - (gitlab.zgtools.net/devex/archetypes/gomods/zlog.Logger).Info - - (gitlab.zgtools.net/devex/archetypes/gomods/zlog.Logger).Warn - - (gitlab.zgtools.net/devex/archetypes/gomods/zlog.Logger).Error depguard: rules: Main: diff --git a/Makefile b/Makefile index f64a146..1feb319 100644 --- a/Makefile +++ b/Makefile @@ -2,24 +2,35 @@ MODULE_DIRS = . -.PHONY: setup-test -setup-test: - docker compose -p $$RANDOM -f ./example/compose.yaml up -d - -.PHONY: test-local -test-local: setup-test cover +# Sets up kafka broker using docker compose +.PHONY: setup +setup: + docker compose -f ./example/compose.yaml up -d +# Assumes setup has been executed. Runs go test with coverage .PHONY: cover cover: - export GO_TAGS=--tags=integration; ./coverage.sh --tags=integration + export GO_TAGS=--tags=integration; ./coverage.sh + +# Runs setup and executes tests with coverage. +.PHONY: test-local +test-local: setup cover .PHONY: example-producer example-producer: - go run example/producer/producer.go + go run example/producer/main.go .PHONY: example-worker example-worker: - go run example/worker/worker.go + go run example/worker/main.go + +.PHONY: example-deadletter-worker +example-deadletter-worker: + go run example/worker-deadletter/main.go + +.PHONY: example-delay-worker +example-delay-worker: + go run example/worker-delay/main.go .PHONY: lint lint: golangci-lint diff --git a/README.md b/README.md index e36c098..3657f6b 100644 --- a/README.md +++ b/README.md @@ -11,119 +11,344 @@ ## About -A library built on top of confluent-kafka-go for reading and writing to kafka with limited Schema Registry support. The -library supports at least once message processing. It does so using a commit strategy built off auto commit and manual -offset storage. +`zkafka` is built to simplify message processing in Kafka. This library aims to minimize boilerplate code, allowing the developer to focus on writing the business logic for each Kafka message. `zkafka` takes care of various responsibilities, including: ---- -**NOTE** +1. Reading from the worker's configured topics +2. Managing message offsets reliably - Kafka offset management can be complex, but `zkafka` handles it. Developers only need to write code to process a single message and indicate whether or not it encountered an error. +3. Distributing messages to virtual partitions (details will be explained later) +4. Implementing dead lettering for failed messages +5. Providing inspectable and customizable behavior through lifecycle functions (Callbacks) - Developers can add metrics or logging at specific points in the message processing lifecycle. -confluent-kafka-go is a CGO module, and therefore so is zkafka. When building zkafka, make sure to set -CGO_ENABLED=1. ---- +`zkafka` provides stateless message processing semantics ( sometimes, called lambda message processing). +This is a churched-up way of saying, "You write code which executes on each message individually (without knowledge of other messages)". +It is purpose-built with this type of usage in mind. Additionally, the worker implementation guarantees at least once processing (Details of how that's achieved are shown in the [Commit Strategy](#commit-strategy) section) -There are two quick definitions important to the understanding of the commit strategy +**NOTE**: +`zkafka` is built on top of [confluent-kafka-go](https://github.com/confluentinc/confluent-kafka-go) +which is a CGO module. Therefore, so is `zkafka`. When building with `zkafka`, make sure to set CGO_ENABLED=1. + +### Features + +The following subsections detail some useful features. To make the following sections more accessible, there are runnable examples in `./examples` directory. +The best way to learn is to experiment with the examples. Dive in! + +#### Stateless Message Processing + +`zkafka` makes stateless message processing easy. All you have to do is write a concrete `processor` implementation and wire it up (shown below). + +```go +type processor interface { + Process(ctx context.Context, message *zkafka.Message) error +} +``` +If you want to skip ahead and see a working processor check out the examples. Specifically `example/worker/main.go`. + +The anatomy of that example is described here: + +A `zkafka.Client` needs to be created which can connect to the kafka broker. Typically, authentication +information must also be specified at this point (today that would include username/password). + +```go + client := zkafka.NewClient(zkafka.Config{ BootstrapServers: []string{"localhost:29092"} }) +``` + +Next, this client should be passed to create a `zkafka.WorkFactory` instance. +The factory design, used by this library, adds a little boilerplate but allows default policies to be injected and +proliferated to all instantiated work instances. We find that useful at zillow for transparently injecting the nuts and bolts +of components that are necessary for our solutions to cross-cutting concerns (typically those revolving around telemetry) + +```go + wf := zkafka.NewWorkFactory(client) +``` + +Next we create the work instance. This is finally where the dots are beginning to connect. +`zkafka.Work` objects are responsible for continually polling topics (the set of whom is specified in the config object) they've been instructed +to listen to, and execute specified code (defined in the user-controlled `processor` and `lifecycle` functions (not shown here)) +```go + topicConfig := zkafka.TopicConfig{Topic: "my-topic", GroupdID: "mygroup", ClientID: "myclient"} + // this implements the interface specified above and will be executed for each read message + processor := &Processor{} + work := wf.Create(topicConfig, processor) +``` + +All that's left now is to kick off the run loop (this will connect to the Kafka broker, create a Kafka consumer group, undergo consumer group assignments, and after the assignment begins polling for messages). +The run loop executes a single reader (Kafka consumer) which reads messages and then fans those messages out to N processors (sized by the virtual partition pool size. Described later). +It's a processing pipeline with a reader at the front, and processors at the back. + +The run loop takes two arguments, both responsible for signaling that the run loop should exit. + +1. `context.Context` object. When this object is canceled, the internal + work loop will begin to abruptly shut down. This involves exiting the reader loop and processor loops immediately. + +2. signal channel. This channel should be `closed`, and tells zkafka to begin a graceful shutdown. + Graceful shutdown means the reader stops reading new messages, and the processors attempt to finish their in-flight work. + +At Zillow, we deploy to a kubernetes cluster, and use a strategy that uses both +mechanisms. When k8s indicates shutdown is imminent, we close the `shutdown` channel. Graceful +shutdown is time-boxed, and if the deadline is reached, the outer `context` object +is canceled signaling a more aggressive teardown. The below example passes in a nil shutdown signal (which is valid). +That's done for brevity in the readme, production use cases should take advantage (see examples). + +```go + err = w.Run(context.Background(), nil) +``` + +#### Hyper Scalability + +`zkafka.Work` supports a concept called `virtual partitions`. This extends +the Kafka `partition` concept. Message ordering is guaranteed within a Kafka partition, +and the same holds true for a `virtual partition`. Every `zkafka.Work` object manages a pool +of goroutines called processors (1 by default and controlled by the `zkafka.Speedup(n int)` option). +Each processor reads from a goroutine channel called a `virtual partition`. +When a message is read by the reader, it is assigned to one of the virtual partitions based on `hash(message.Key) % virtual partition count`. +This follows the same mechanism used by Kafka. With this strategy, a message with the same key will be assigned +to the same virtual partition. + +This allows for another layer of scalability. To increase throughput and maintain the same +message ordering guarantees, there is no longer a need to increase the Kafka partition count (which can be operationally challenging). +Instead, you can use `zkafka.Speedup()` to increase the virtual partition count. + +```shell +// sets up Kafka broker locally +make setup; +// terminal 1. Starts producing messages. To juice up the production rate, remove the time.Sleep() in the producer and turn acks off. +make example-producer +// terminal 2. Starts a worker with speedup=5. +make example-worker +``` + +#### Configurable Dead Letter Topics + +A `zkafka.Work` instance can be configured to write to a Dead Letter Topic (DLT) when message processing fails. +This can be accomplished with the `zkafka.WithDeadLetterTopic()` option. Or, more conveniently, can be controlled by adding +a non nil value to the `zkafka.ConsumerTopicConfig` `DeadLetterTopic` field. Minimally, the topic name of the (dead letter topic) +must be specified (when specified via configuration, no clientID need be specified, as the encompassing consumer topic configs client id will be used). + +```go + zkafka.ConsumerTopicConfig{ + ... + // When DeadLetterTopicConfig is specified a dead letter topic will be configured and written to + // when a processing error occurs. + DeadLetterTopicConfig: &zkafka.ProducerTopicConfig{ + Topic: "zkafka-example-deadletter-topic", + }, + } +``` + +The above will be written to `zkafka-example-deadletter-topic` in the case of a processing error. + + +The above-returned error will skip writing to the DLT. + +To execute a local example of the following pattern: + +```shell +// sets up kafka broker locally +make setup; +// terminal 1. Starts producing messages (1 per second) +make example-producer +// terminal 2. Starts a worker which fails processing and writes to a DLT. Log statements show when messaages +// are written to a DLT +make example-deadletter-worker +``` + +The returned processor error determines whether a message is written to a dead letter topic. In some situations, +you might not want to route an error to a DLT. An example might be malformed data. + +You have control over this behavior by way of the `zkafka.ProcessError`. + +```go + return zkafka.ProcessError{ + Err: err, + DisableDLTWrite: true, + } +``` +#### Process Delay Workers + +Process Delay Workers can be an important piece of an automated retry policy. A simple example of this would be +2 workers daisy-chained together as follows: + +```go + workerConfig1 := zkafka.ConsumerTopicConfig{ + ClientID: "svc1", + GroupID: "grp1", + Topic: "topicA", + // When DeadLetterTopicConfig is specified a dead letter topic will be configured and written to + // when a processing error occurs. + DeadLetterTopicConfig: &zkafka.ProducerTopicConfig{ + Topic: "topicB", + }, + } + + workerConfig2 := zkafka.ConsumerTopicConfig{ + ClientID: "svc1", + GroupID: "grp1", + Topic: "topicB", + // When DeadLetterTopicConfig is specified a dead letter topic will be configured and written to + // when a processing error occurs. + DeadLetterTopicConfig: &zkafka.ProducerTopicConfig{ + Topic: "topicC", + }, + } +``` + +Messages processed by the above worker configuration would: + +1. Worker1 read from `topicA` +2. If message processing fails, write to `topicB` via the DLT configuration +3. Worker2 read from `topicB` +4. If message processing fails, write to `topicC` via the DLT configuration + +This creates a retry pipeline. The issue is that worker2, ideally would process on a delay (giving whatever transient error is occurring a chance to resolve). +Luckily, `zkafka` supports such a pattern. By specifying `ProcessDelayMillis` in the config, a worker is created which will delay processing of +a read message until at least the delay duration has been waited. + +```go + topicConfig := zkafka.ConsumerTopicConfig{ + ... + // This value instructs the kafka worker to inspect the message timestamp, and not call the processor call back until + // at least the process delay duration has passed + ProcessDelayMillis: &processDelayMillis, + } +``` + +The time awaited by the worker varies. If the message is very old (maybe the worker had been stopped previously), +then the worker will detect that the time passed since the message was written > delay. In such a case, it won't delay any further. + +To execute a local example of the following pattern: + +```shell +// sets up kafka broker locally +make setup; +// terminal 1. Starts producing messages (1 per second) +make example-producer +// terminal 2. Starts delay processor. Prints out the duration since msg.Timestamp. +// How long the delay is between when the message was written and when the process callback is executed. +make example-delay-worker +``` + +### Commit Strategy: + +A `zkafka.Work`er commit strategy allows for at least once message processing. + +There are two quick definitions important to the understanding of the commit strategy: 1. **Commit** - involves communicating with kafka broker and durably persisting offsets on a kafka broker. 2. **Store** - is the action of updating a local store of message offsets which will be persisted during the commit action -## Commit Strategy: +The `zkafka.Work` instance will store message offsets as message processing concludes. Because the worker manages +storing commits the library sets `enable.auto.offset.store`=false. Additionally, the library offloads actually committing messages +to a background process managed by `librdkafka` (The frequency at which commits are communicated to the broker is controlled by `auto.commit.interval.ms`, default=5s). +Additionally, during rebalance events, explicit commits are executed. -1. *Store* offset of a message for commit after processing -2. *Commit* messages whose offsets have been stored at configurable intervals (`auto.commit.interval.ms`) -3. *Commit* messages whose offsets have been stored when partitions are revoked -(this is implicitly handled by librdkafka. To see this add debug=cgrp in ConsumerTopicConfig, and there'll be COMMIT logs after a rebalance. -If doing this experience, set the `auto.commit.interval.ms` to a large value to avoid confusion between the rebalance commit) -4. *Commit* messages whose offsets have been stored on close of reader -(this is implicitly handled by librdkafka. To see this add debug=cgrp in ConsumerTopicConfig, and there'll be COMMIT logs after the client is closed, but before the client is destroyed) +This strategy is based off of [Kafka Docs - Offset Management](https://docs.confluent.io/platform/current/clients/consumer.html#offset-management) +where a strategy of asynchronous/synchronous commits is suggested to reduce duplicate messages. -Errors returned on processing are still stored. This avoids issues due to poison pill messages (messages which will -never be able to be processed without error) -as well as transient errors blocking future message processing. Use WithOnDone option to register callback for -additional processing of these messages. +The above results in the following algorithm: -This strategy is based off -of [Kafka Docs - Offset Management](https://docs.confluent.io/platform/current/clients/consumer.html#offset-management) -where a strategy of asynchronous/synchronous commits is suggested to reduced duplicate messages. -## Work +1. Before message processing is started, an internal heap structure is used to track in-flight messages. +2. After message processing concludes, a heap structure managed by `zkafka` marks the message as complete (regardless of whether processing errored or not). +3. The inflight heap and the work completed heap are compared. Since offsets increase incrementally (by 1), it can be determined whether message processing + finished out of order. If the inflight heap's lowest offset is the same as the completed, then that message is safe to be **Stored**. This can be done repetitively + until the inflight heap is empty, or inflight messages haven't yet been marked as complete. -zkafka also supports an abstraction built on top of the reader defined in the Work struct (`work.go`). Work introduces -concurrency by way of the configurable option `Speedup(n int)`. This creates n goroutines which process messages as -they are written to the golang channel assigned to that goroutine. Kafka key ordering is preserved (by a mechanism similar to kafka -partitions) whereby a message sharing the same key will always be written to the same channel (internally, this is called a virtual partition). -By default, the number of virtual partitions is equal 1. -Speedup() can be increased beyond the number of assigned physical partitions without concern of data loss on account of the reader tracking in-work message offsets and only -committing the lowest offset to be completed. Additionally, kafka key ordering is preserved even as the number of virtual partitions increases beyond the number of physical assigned -partitions. +The remaining steps are implicitly handled by `librdkafka` +1. *Commit* messages whose offsets have been stored at configurable intervals (`auto.commit.interval.ms`) +2. *Commit* messages whose offsets have been stored when partitions are revoked + (this is implicitly handled by `librdkafka`. To see this add debug=cgrp in ConsumerTopicConfig, and there'll be COMMIT logs after a rebalance. + If doing this experience, set the `auto.commit.interval.ms` to a large value to avoid confusion between the rebalance commit) +3. *Commit* messages whose offsets have been stored on close of reader + (this is implicitly handled by `librdkafka`. To see this add debug=cgrp in ConsumerTopicConfig, and there'll be COMMIT logs after the client is closed, but before the client is destroyed) -## SchemaRegistry Support: +Errors returned on processing are still stored. This avoids issues due to poison pill messages (messages that will +never be able to be processed without error) +as well as transient errors blocking future message processing. Use dead lettering to sequester these failed messages or Use `WithOnDone()` option to register callback for +special processing of these messages. + + +### SchemaRegistry Support: There is limited support for schema registry in zkafka. A schemaID can be hardcoded via configuration. No communication is done with schema registry, but some primitive checks can be conducted if a schemaID is specified via configuration. -### Producers +Below is a breakdown of schema registry interactions into two subcategories. One is `Raw Handling` where the configurable +foramtter is bypassed entirely in favor of operating with the value byte arrays directly. The other is `Native Support` which +attempts to create confluent compatible serializations, without communicating with schema registry directly. -Producers will include the schemaID in messages written to kafka (without any further verification). +#### Producers -### Consumers +##### Raw Handling -Consumers will verify that the message they're consuming has the schemaID specified in configuration -(if it's specified). Be careful here, as backwards compatible schema evolutions would be treated as an error condition -as the new schemaID wouldn't match what's in the configuration. +For a producer, this would involve using the `kafka.Writer.WriteRaw()` method which takes in a byte array directly. -## Consumer/Producer Configuration +##### Native Support -See for description of configuration options and their defaults: +Producers will include the schemaID in messages written to kafka (without any further verification). -1. [Consumer Configuration](https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html) -2. [Producer Configurations](https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html) +#### Consumers -These are primarily specified through the TopicConfig struct. TopicConfig includes strongly typed fields which translate -to librdconfig values. To see translation see config.go. An escape hatch is provided for ad hoc config properties via -the AdditionalProperties map. Here config values that don't have a strongly typed version in TopicConfig may be -specified. Not all specified config values will work (for example `enable.auto.commit=false` would not work with this -client because that value is explicitly set to true after reading of the AdditionalProperties map). +##### Raw Handling -```json5 - -{ - "KafkaTopicConfig": { - "Topic": "KafkaTopicName", - "BootstrapServers": [ - "localhost:9092" - ], - // translates to librdkafka value "bootstrap.servers" - // specify ad hoc configuration values which don't have a strongly typed version in the TopicConfig struct. - "AdditionalProperties": { - "auto.commit.interval.ms": 1000, - "retry.backoff.ms": 10 - } - } -} +For a consumer, this would involve accessing the value byte array through the `zkafka.Message.Value()` method. +```go +type Processor struct{} +func (p Processor) Process(_ context.Context, msg *zkafka.Message) error { + e := MyEvent{} + // The Decode method uses the configured formatter and the configured schema registry ID. + //err := msg.Decode(&e) + // For schema registry, however, it might be better to bypass the configurable formatters, and deserialize the data by accessing the byte array directly + // The below function is a hypothetical function that inspects the data in in the kafka message's value and communicates with schema registry for verification + myUnmarshallFunction(msg.Value(), &e) + ... +} ``` -3. zkafka.ProcessError - -The `zkafka.ProcessError` can be used to control error handling on a per-message basis. Use of this type is entirely optional. The current options exposed through this type are as follows: -1. `DisableDLTWrite`: if true, the message will not be written to a dead letter topic (if one is configured) -2. `DisableCircuitBreaker`: if true, the message will not count as a failed message for purposes of controlling the circuit breaker. +##### Native Support +Consumers will verify that the message they're consuming has the schemaID specified in the configuration +(if it's specified). Be careful here, as backwards compatible schema evolutions would be treated as an error condition +as the new schemaID wouldn't match what's in the configuration. -## Installation +### Consumer/Producer Configuration -go get -u gitlab.zgtools.net/devex/archetypes/gomods/zkafka +See for description of configuration options and their defaults: -## Running Example +1. [Librdkafka Configuration](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) +2. [Consumer Configuration](https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html) +3. [Producer Configurations](https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html) -``` -make setup-test +These are primarily specified through the TopicConfig structs (`ProducerTopicConfig` and `ConsumerTopicConfig`). +TopicConfigs includes strongly typed fields that translate +to librdconfig values. To see translation see `config.go`. An escape hatch is provided for ad hoc config properties via +the AdditionalProperties map. Here config values that don't have a strongly typed version in TopicConfig may be +specified. Not all specified config values will work (for example `enable.auto.commit=false` would not work with this +client because that value is explicitly set to true after reading of the AdditionalProperties map). -// -make example-producer +```go +deliveryTimeoutMS := 100 +enableIdempotence := false +requiredAcks := "0" + +pcfg := ProducerTopicConfig{ + ClientID: "myclientid", + Topic: "mytopic", + DeliveryTimeoutMs: &deliveryTimeoutMS, + EnableIdempotence: &enableIdempotence, + RequestRequiredAcks: &requiredAcks, + AdditionalProps: map[string]any{ + "linger.ms": float64(5), + }, +} -// -make example-worker +ccfg := ConsumerTopicConfig{ + ClientID: "myclientid2", + GroupID: "mygroup", + Topic: "mytopic", + AdditionalProps: map[string]any{ + "auto.commit.interval.ms": float32(20), + }, +} ``` diff --git a/coverage.sh b/coverage.sh index 03ec218..ee8252c 100755 --- a/coverage.sh +++ b/coverage.sh @@ -1,6 +1,7 @@ #!/usr/bin/env bash set -x +# allows for GO test args to be passed in (Specifically added to control whether or not to pass in `--tags=integration`). go_tags=$GO_TAGS go_tags="${go_tags:---tags=unit}" diff --git a/example/consumer/consumer.go b/example/consumer/consumer.go deleted file mode 100644 index bb94796..0000000 --- a/example/consumer/consumer.go +++ /dev/null @@ -1,75 +0,0 @@ -package main - -import ( - "context" - "fmt" - "log" - - "github.com/zillow/zfmt" - "github.com/zillow/zkafka" -) - -func main() { - // configure broker connectivity options with zkafka.Config - cfg := zkafka.Config{ - BootstrapServers: []string{"localhost:9092"}, - } - - // configure consumer options with zkafka.ConsumerTopicConfig. See zkafka for full option values - topicConfig := zkafka.ConsumerTopicConfig{ - ClientID: "xxx", - GroupID: "golang-example-consumer-3", - Topic: "two-multi-partition", - // defaults to 1 second. Use 10 seconds for example to give time to establish connection - ReadTimeoutMillis: ptr(10000), - // Specify the formatter used to deserialize the contents of kafka message - Formatter: zfmt.JSONFmt, - AdditionalProps: map[string]any{ - "auto.offset.reset": "earliest", - }, - } - - ctx := context.Background() - // Create a reader using Config and ConsumerTopicConfig - reader, err := zkafka.NewClient(cfg).Reader(ctx, topicConfig) - if err != nil { - log.Fatal(err) - } - for { - // Poll for 1 message for up to 10 seconds. Return nil if no messages available. - // To continually poll a kafka message, we suggest using zkafka in conjunction with zwork. This offers a - // good mechanism for processing messages using stateless messaging semantics using golang - msg, err := reader.Read(ctx) - if err != nil { - log.Fatal(err) - } - if msg == nil { - log.Printf("No messages available") - return - } - - // zkafka.Message (the type of msg variable) wraps a kafka message exposing higher level APIs for interacting with the data. - // This includes a decode method for easily - // deserializing the kafka value byte array. In this case, we're using the JSONFmt (specified in TopicConfig). - item := DummyEvent{} - if err = msg.Decode(&item); err != nil { - log.Fatal(err) - } - - // print out the contents of kafka message used to hydrate the DummyEvent struct - fmt.Printf("dummy event %+v\n", item) - - // call msg.Done to commit the work with the kafka broker - msg.Done() - } -} - -// DummyEvent is a deserializable struct for producing/consuming kafka message values. -type DummyEvent struct { - Name string `json:"name"` - Age int `json:"age"` -} - -func ptr[T any](v T) *T { - return &v -} diff --git a/example/producer/producer.go b/example/producer/main.go similarity index 77% rename from example/producer/producer.go rename to example/producer/main.go index 13c0cdf..94cfe77 100644 --- a/example/producer/producer.go +++ b/example/producer/main.go @@ -6,7 +6,6 @@ import ( "math/rand" "time" - "github.com/google/uuid" "github.com/zillow/zfmt" "github.com/zillow/zkafka" ) @@ -14,18 +13,19 @@ import ( func main() { ctx := context.Background() writer, err := zkafka.NewClient(zkafka.Config{ - BootstrapServers: []string{"localhost:9092"}, + BootstrapServers: []string{"localhost:29092"}, }).Writer(ctx, zkafka.ProducerTopicConfig{ ClientID: "example", - Topic: "two-multi-partition", + Topic: "zkafka-example-topic", Formatter: zfmt.JSONFmt, }) + randomNames := []string{"stewy", "lydia", "asif", "mike", "justin"} if err != nil { log.Panic(err) } for { event := DummyEvent{ - Name: uuid.NewString(), + Name: randomNames[rand.Intn(len(randomNames))], Age: rand.Intn(100), } diff --git a/example/worker-deadletter/main.go b/example/worker-deadletter/main.go new file mode 100644 index 0000000..697eebc --- /dev/null +++ b/example/worker-deadletter/main.go @@ -0,0 +1,106 @@ +package main + +import ( + "context" + "errors" + "fmt" + "log" + "os" + "os/signal" + "syscall" + "time" + + "github.com/zillow/zkafka" +) + +// Demonstrates reading from a topic via the zkafka.Work struct which is more convenient, typically, than using the consumer directly +func main() { + ctx := context.Background() + client := zkafka.NewClient(zkafka.Config{ + BootstrapServers: []string{"localhost:29092"}, + }, + zkafka.LoggerOption(stdLogger{}), + ) + // It's important to close the client after consumption to gracefully leave the consumer group + // (this commits completed work, and informs the broker that this consumer is leaving the group which yields a faster rebalance) + defer client.Close() + + readTimeoutMillis := 10000 + topicConfig := zkafka.ConsumerTopicConfig{ + // ClientID is used for caching inside zkafka, and observability within streamz dashboards. But it's not an important + // part of consumer group semantics. A typical convention is to use the service name executing the kafka worker + ClientID: "service-name", + // GroupID is the consumer group. If multiple instances of the same consumer group read messages for the same + // topic the topic's partitions will be split between the collection. The broker remembers + // what offset has been committed for a consumer group, and therefore work can be picked up where it was left off + // across releases + GroupID: "zkafka/example/example-consumer", + Topic: "zkafka-example-topic", + // Controls how long ReadMessage() wait in work before returning a nil message. The default is 1s, but is increased in this example + // to reduce the number of debug logs which come when a nil message is returned + ReadTimeoutMillis: &readTimeoutMillis, + // When DeadLetterTopicConfig is specified a dead letter topic will be configured and written to + // when a processing error occurs. + DeadLetterTopicConfig: &zkafka.ProducerTopicConfig{ + Topic: "zkafka-example-deadletter-topic", + }, + AdditionalProps: map[string]any{ + // only important the first time a consumer group connects. + "auto.offset.reset": "earliest", + }, + } + // optionally set up a channel to signal when worker shutdown should occur. + // A nil channel is also acceptable, but this example demonstrates how to make utility of the signal. + // The channel should be closed, instead of simply written to, to properly broadcast to the separate worker threads. + stopCh := make(chan os.Signal, 1) + signal.Notify(stopCh, os.Interrupt, syscall.SIGTERM) + shutdown := make(chan struct{}) + + go func() { + <-stopCh + close(shutdown) + }() + + wf := zkafka.NewWorkFactory(client) + // Register a processor which is executed per message. + work := wf.Create(topicConfig, &Processor{}) + if err := work.Run(ctx, shutdown); err != nil { + log.Panic(err) + } +} + +type Processor struct{} + +func (p Processor) Process(_ context.Context, msg *zkafka.Message) error { + // Processing errors result in the message being written to the configured dead letter topic (DLT). + // Any error object works, but finer grained controlled cn be accomplished by returning a + // `zkafka.ProcessError`. In this example, we control the behavior of the circuit breaker and can optionally + // skip writing the DLT (this example doesn't opt to do that) + // + // Because debug logging is on, the producer log (for when a message is written to the DLT) will show in std out + return zkafka.ProcessError{ + Err: errors.New("processing failed"), + DisableCircuitBreak: true, + DisableDLTWrite: false, + } +} + +type stdLogger struct { +} + +func (l stdLogger) Debugw(_ context.Context, msg string, keysAndValues ...interface{}) { + log.Printf("Debugw-"+msg, keysAndValues...) +} + +func (l stdLogger) Infow(_ context.Context, msg string, keysAndValues ...interface{}) { + log.Printf("Infow-"+msg, keysAndValues...) +} + +func (l stdLogger) Errorw(_ context.Context, msg string, keysAndValues ...interface{}) { + log.Printf("Errorw-"+msg, keysAndValues...) +} + +func (l stdLogger) Warnw(_ context.Context, msg string, keysAndValues ...interface{}) { + prefix := fmt.Sprintf("Warnw-%s-"+msg, time.Now().Format(time.RFC3339Nano)) + log.Printf(prefix, keysAndValues...) +} diff --git a/example/worker-delay/main.go b/example/worker-delay/main.go new file mode 100644 index 0000000..4725233 --- /dev/null +++ b/example/worker-delay/main.go @@ -0,0 +1,68 @@ +package main + +import ( + "context" + "log" + "os" + "os/signal" + "syscall" + "time" + + "github.com/zillow/zkafka" +) + +// Demonstrates reading from a topic via the zkafka.Work struct which is more convenient, typically, than using the consumer directly +func main() { + ctx := context.Background() + client := zkafka.NewClient(zkafka.Config{ + BootstrapServers: []string{"localhost:29092"}, + }, + // optionally add a logger, which implements zkafka.Logger, to see detailed information about message processsing + //zkafka.LoggerOption(), + ) + // It's important to close the client after consumption to gracefully leave the consumer group + // (this commits completed work, and informs the broker that this consumer is leaving the group which yields a faster rebalance) + defer client.Close() + + processDelayMillis := 10 * 1000 + topicConfig := zkafka.ConsumerTopicConfig{ + // ClientID is used for caching inside zkafka, and observability within streamz dashboards. But it's not an important + // part of consumer group semantics. A typical convention is to use the service name executing the kafka worker + ClientID: "service-name", + // GroupID is the consumer group. If multiple instances of the same consumer group read messages for the same + // topic the topic's partitions will be split between the collection. The broker remembers + // what offset has been committed for a consumer group, and therefore work can be picked up where it was left off + // across releases + GroupID: "zkafka/example/example-consumer", + Topic: "zkafka-example-topic", + // This value instructs the kafka worker to inspect the message timestamp, and not call the processor call back until + // at least the process delay duration has passed + ProcessDelayMillis: &processDelayMillis, + } + // optionally set up a channel to signal when worker shutdown should occur. + // A nil channel is also acceptable, but this example demonstrates how to make utility of the signal. + // The channel should be closed, instead of simply written to, to properly broadcast to the separate worker threads. + stopCh := make(chan os.Signal, 1) + signal.Notify(stopCh, os.Interrupt, syscall.SIGTERM) + shutdown := make(chan struct{}) + + go func() { + <-stopCh + close(shutdown) + }() + + wf := zkafka.NewWorkFactory(client) + // Register a processor which is executed per message. + // Speedup is used to create multiple processor goroutines. Order is still maintained with this setup by way of `virtual partitions` + work := wf.Create(topicConfig, &Processor{}, zkafka.Speedup(5)) + if err := work.Run(ctx, shutdown); err != nil { + log.Panic(err) + } +} + +type Processor struct{} + +func (p Processor) Process(_ context.Context, msg *zkafka.Message) error { + log.Printf(" offset: %d, partition: %d. Time since msg.Timestamp %s", msg.Offset, msg.Partition, time.Since(msg.TimeStamp)) + return nil +} diff --git a/example/worker/worker.go b/example/worker/main.go similarity index 78% rename from example/worker/worker.go rename to example/worker/main.go index 0c3125b..f505582 100644 --- a/example/worker/worker.go +++ b/example/worker/main.go @@ -1,7 +1,6 @@ package main import ( - "bytes" "context" "log" "os" @@ -17,9 +16,10 @@ import ( func main() { ctx := context.Background() client := zkafka.NewClient(zkafka.Config{ - BootstrapServers: []string{"localhost:9092"}, + BootstrapServers: []string{"localhost:29092"}, }, - zkafka.LoggerOption(stdLogger{}), + // optionally add a logger, which implements zkafka.Logger, to see detailed information about message processsing + //zkafka.LoggerOption(), ) // It's important to close the client after consumption to gracefully leave the consumer group // (this commits completed work, and informs the broker that this consumer is leaving the group which yields a faster rebalance) @@ -33,12 +33,12 @@ func main() { // topic the topic's partitions will be split between the collection. The broker remembers // what offset has been committed for a consumer group, and therefore work can be picked up where it was left off // across releases - GroupID: "concierge/example/example-consumery", - Topic: "two-multi-partition", + GroupID: "zkafka/example/example-consumer", + Topic: "zkafka-example-topic", // The formatter is registered internally to the `zkafka.Message` and used when calling `msg.Decode()` // string fmt can be used for both binary and pure strings encoded in the value field of the kafka message. Other options include // json, proto, avro, etc. - Formatter: zfmt.StringFmt, + Formatter: zfmt.JSONFmt, AdditionalProps: map[string]any{ // only important the first time a consumer group connects. Subsequent connections will start // consuming messages @@ -71,8 +71,8 @@ type Processor struct{} func (p Processor) Process(_ context.Context, msg *zkafka.Message) error { // sleep to simulate random amount of work time.Sleep(100 * time.Millisecond) - var buf bytes.Buffer - err := msg.Decode(&buf) + event := DummyEvent{} + err := msg.Decode(&event) if err != nil { return err } @@ -82,24 +82,12 @@ func (p Processor) Process(_ context.Context, msg *zkafka.Message) error { //data := msg.Value() //str := string(data) - log.Printf("message: %s, offset: %d, partition: %d \n", buf.String(), msg.Offset, msg.Partition) + log.Printf(" offset: %d, partition: %d. event.Name: %s, event.Age %d\n", msg.Offset, msg.Partition, event.Name, event.Age) return nil } -type stdLogger struct{} - -func (l stdLogger) Debugw(_ context.Context, msg string, keysAndValues ...any) { - log.Printf("Debugw-"+msg, keysAndValues...) -} - -func (l stdLogger) Infow(_ context.Context, msg string, keysAndValues ...any) { - log.Printf("Infow-"+msg, keysAndValues...) -} - -func (l stdLogger) Errorw(_ context.Context, msg string, keysAndValues ...any) { - log.Printf("Errorw-"+msg, keysAndValues...) -} - -func (l stdLogger) Warnw(_ context.Context, msg string, keysAndValues ...any) { - log.Printf("Warnw-"+msg, keysAndValues...) +// DummyEvent is a deserializable struct for producing/consuming kafka message values. +type DummyEvent struct { + Name string `json:"name"` + Age int `json:"age"` } diff --git a/lifecycle.go b/lifecycle.go index ce8f12a..3e5d414 100644 --- a/lifecycle.go +++ b/lifecycle.go @@ -1,7 +1,5 @@ package zkafka -//go:generate mockgen -package=mock_zkafka -destination=./mocks/mock_lifecycle.go -source=./lifecycle.go - import ( "context" "errors" @@ -64,7 +62,7 @@ type LifecycleHooks struct { // Called prior to executing write operation PreWrite func(ctx context.Context, meta LifecyclePreWriteMeta) (LifecyclePreWriteResp, error) - // Call after the reader attempts a fanout call. + // Call after the reader attempts a fanOut call. PostFanout func(ctx context.Context) } diff --git a/mocks/mock_lifecycle.go b/mocks/mock_lifecycle.go deleted file mode 100644 index d7d006e..0000000 --- a/mocks/mock_lifecycle.go +++ /dev/null @@ -1,5 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: ./lifecycle.go - -// Package mock_zkafka is a generated GoMock package. -package mock_zkafka diff --git a/test/integration_test.go b/test/integration_test.go index 53d1c94..83bc3e9 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -1204,7 +1204,7 @@ func createTopic(t *testing.T, bootstrapServer, topic string, partitions int) { func getBootstrap() string { bootstrapServer, ok := os.LookupEnv("KAFKA_BOOTSTRAP_SERVER") if !ok { - bootstrapServer = "localhost:9092" // local development + bootstrapServer = "localhost:29092" // local development } return bootstrapServer } diff --git a/test/worker_test.go b/test/worker_test.go index 7000947..7aeabab 100644 --- a/test/worker_test.go +++ b/test/worker_test.go @@ -44,11 +44,11 @@ func TestWork_Run_FailsWithLogsWhenFailedToGetReader(t *testing.T) { cp.EXPECT().Reader(gomock.Any(), gomock.Any()).Times(1).Return(nil, errors.New("no kafka client reader created")).MinTimes(1) kwf := zkafka.NewWorkFactory(cp, zkafka.WithLogger(l)) - fanoutCount := atomic.Int64{} + fanOutCount := atomic.Int64{} w := kwf.Create(zkafka.ConsumerTopicConfig{Topic: topicName}, &fakeProcessor{}, zkafka.WithLifecycleHooks(zkafka.LifecycleHooks{PostFanout: func(ctx context.Context) { - fanoutCount.Add(1) + fanOutCount.Add(1) }})) ctx, cancel := context.WithCancel(ctx) @@ -59,7 +59,7 @@ func TestWork_Run_FailsWithLogsWhenFailedToGetReader(t *testing.T) { }) pollWait(func() bool { - return fanoutCount.Load() >= 1 + return fanOutCount.Load() >= 1 }, pollOpts{ exit: cancel, timeoutExit: func() { @@ -1408,7 +1408,7 @@ func TestWork_CircuitBreaker_WithoutBusyLoopBreaker_DoesNotWaitsForCircuitToOpen l := stdLogger{includeDebug: true} kwf := zkafka.NewWorkFactory(kcp, zkafka.WithLogger(l)) - fanoutCount := atomic.Int64{} + fanOutCount := atomic.Int64{} processorCount := atomic.Int64{} w := kwf.Create( zkafka.ConsumerTopicConfig{Topic: topicName}, @@ -1420,7 +1420,7 @@ func TestWork_CircuitBreaker_WithoutBusyLoopBreaker_DoesNotWaitsForCircuitToOpen }, zkafka.DisableBusyLoopBreaker(), zkafka.WithLifecycleHooks(zkafka.LifecycleHooks{PostFanout: func(ctx context.Context) { - fanoutCount.Add(1) + fanOutCount.Add(1) }}), zkafka.CircuitBreakAfter(1), zkafka.CircuitBreakFor(10*time.Second), @@ -1435,16 +1435,16 @@ func TestWork_CircuitBreaker_WithoutBusyLoopBreaker_DoesNotWaitsForCircuitToOpen }) pollWait(func() bool { - return fanoutCount.Load() >= 100 + return fanOutCount.Load() >= 100 }, pollOpts{ exit: cancel, timeoutExit: func() { - require.Failf(t, "Timed out during poll", "Fanout Count %d", fanoutCount.Load()) + require.Failf(t, "Timed out during poll", "Fanout Count %d", fanOutCount.Load()) }, maxWait: 10 * time.Second, }) require.LessOrEqual(t, processorCount.Load(), int64(2), "circuit breaker should prevent processor from being called after circuit break opens, since circuit breaker won't close again until after test completes. At most two messages are read prior to circuit breaker opening") - require.LessOrEqual(t, time.Since(start), time.Second, "without busy loop breaker we expect fanout to called rapidly. Circuit break is open for 10 seconds. So asserting that fanout was called 100 times in a second is a rough assertion that busy loop breaker is not in effect. Typically these 100 calls should be on the order of micro or nanoseconds. But with resource contention in the pipeline we're more conservative with timing based assertions") + require.LessOrEqual(t, time.Since(start), time.Second, "without busy loop breaker we expect fanOut to called rapidly. Circuit break is open for 10 seconds. So asserting that fanOut was called 100 times in a second is a rough assertion that busy loop breaker is not in effect. Typically these 100 calls should be on the order of micro or nanoseconds. But with resource contention in the pipeline we're more conservative with timing based assertions") t.Log("begin") cancel() t.Log("nend") @@ -1531,7 +1531,7 @@ func TestWork_DontDeadlockWhenCircuitBreakerIsInHalfOpen(t *testing.T) { }, } - fanoutCount := atomic.Int64{} + fanOutCount := atomic.Int64{} w := wf.Create(zkafka.ConsumerTopicConfig{Topic: topicName}, &p, // go into half state almost immediately after processing the message. @@ -1540,7 +1540,7 @@ func TestWork_DontDeadlockWhenCircuitBreakerIsInHalfOpen(t *testing.T) { zkafka.CircuitBreakAfter(1), zkafka.WithLifecycleHooks(zkafka.LifecycleHooks{PostFanout: func(ctx context.Context) { time.Sleep(time.Millisecond * 100) - fanoutCount.Add(1) + fanOutCount.Add(1) }}), ) @@ -1555,7 +1555,7 @@ func TestWork_DontDeadlockWhenCircuitBreakerIsInHalfOpen(t *testing.T) { start := time.Now() for { // if we don't hit a deadlock we should get to 10 loops of Do execution quickly (especially since there's no messages to process after subsequent read) - if fanoutCount.Load() >= 10 { + if fanOutCount.Load() >= 10 { cancel() break } @@ -1844,12 +1844,12 @@ func TestWork_ShutdownCausesRunExit(t *testing.T) { l := zkafka.NoopLogger{} kwf := zkafka.NewWorkFactory(kcp, zkafka.WithLogger(l)) - fanoutCount := atomic.Int64{} + fanOutCount := atomic.Int64{} w := kwf.Create( zkafka.ConsumerTopicConfig{Topic: topicName}, &fakeProcessor{}, zkafka.WithLifecycleHooks(zkafka.LifecycleHooks{PostFanout: func(ctx context.Context) { - fanoutCount.Add(1) + fanOutCount.Add(1) }}), ) @@ -1858,7 +1858,7 @@ func TestWork_ShutdownCausesRunExit(t *testing.T) { } go func() { pollWait(func() bool { - return fanoutCount.Load() >= 1 + return fanOutCount.Load() >= 1 }, pollOpts{ maxWait: 10 * time.Second, }) diff --git a/work.go b/work.go index 5456d3d..1a5de2a 100644 --- a/work.go +++ b/work.go @@ -33,7 +33,7 @@ import ( // 1. Read a kafka.Message using the provided reader. // 2. Select the virtual partition pool allocated for a specific topic // 3. Select and write the `kafka.Message` to the pool's virtual partition based on a hash of the `kafka.Message.Key` (virtual partition selection) -// 4. A goroutine is assigned for each virtual partition. Its responsibility is to continously read from its virtual partition, call the Process callback function, and then store the offset of the message. +// 4. A goroutine is assigned for each virtual partition. Its responsibility is to continuously read from its virtual partition, call the Process callback function, and then store the offset of the message. // // Additional responsibilities includes: // 1. Logging @@ -314,7 +314,8 @@ func (w *Work) processVirtualPartition(ctx context.Context, partitionIndex int, remainingDelay := delayCalc.remaining(w.processDelay(), msg.TimeStamp) if !w.execDelay(ctx, shutdown, remainingDelay) { - // while waiting for processDelay we received some shutdown signal, so the message should be removed from in flight so it doesn't block during final rebalance + // while waiting for processDelay we received some shutdown signal, so the message should be removed from in flight, + // so it doesn't block during final rebalance w.removeInWork(msg) continue } @@ -658,6 +659,9 @@ func (f WorkFactory) Create(topicConfig ConsumerTopicConfig, processor processor if topicConfig.DeadLetterTopicConfig != nil { cfg := *topicConfig.DeadLetterTopicConfig + if cfg.ClientID == "" { + cfg.ClientID = topicConfig.ClientID + } options = append(options, WithDeadLetterTopic(cfg)) } @@ -754,7 +758,7 @@ func selectPartitionIndex(key string, isKeyNil bool, partitionCount int) (int, e } // workUnit encapsulates the work being written to a virtual partition. It includes -// the context passed in that current iteration of fanout(), the kafka message to be processed and the +// the context passed in that current iteration of fanOut(), the kafka message to be processed and the // successFunc callback to be called when the work is done (indicating success or failure) type workUnit struct { ctx context.Context @@ -808,7 +812,7 @@ type delayCalculator struct { // now=3:53, w.processDelay=5s // timestamp=2:00 -> 0s delay. (delayed long enough). remainingDelay=5s - (3:53 - 2:00) => -1:52:55s. A negative processDelay doesn't end up pausing // timestamp=3:48 => 0s delay. remainingDelay=5s-(3:53-3:48) =>0s. A 0 (more accurately <=0) processDelay doesn't end up pausing -// timetsamp=3:49 => 1s delay. remainingDelay=5s-(3:53-3:49) => 1s +// timestamp=3:49 => 1s delay. remainingDelay=5s-(3:53-3:49) => 1s // timestamp=3:53 => 5s delay. // timestamp:3:54 => 5s delay. // timestamp:4:00 => 5s delay (the result is capped by the `targetDelay` diff --git a/work_test.go b/work_test.go index 37f4483..9eb7024 100644 --- a/work_test.go +++ b/work_test.go @@ -493,7 +493,7 @@ func Test_busyLoopBreaker_waitRespectsMaxPause(t *testing.T) { <-blocker } -// Test_busyLoopBreaker_waitRespectsRelease asserts that calling release() cancels that wait occuring at the wait() site +// Test_busyLoopBreaker_waitRespectsRelease asserts that calling release() cancels that wait occurring at the wait() site func Test_busyLoopBreaker_waitRespectsRelease(t *testing.T) { defer recoverThenFail(t) blb := busyLoopBreaker{