Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion internal/pkg/pipeline/task/compress/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func (c *core) Run(input <-chan *record.Record, output chan<- *record.Record) (e
return task.ErrNilInput
}


for {
r, ok := c.GetRecord(input)
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/pipeline/task/converter/sst.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (c *sst) convert(data []byte, d string) ([]converterOutput, error) {
return nil, err
}
defer os.Remove(fileName)

sstData, err := os.ReadFile(fileName)
if err != nil {
return nil, err
Expand Down
20 changes: 1 addition & 19 deletions internal/pkg/pipeline/task/kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ The `kafka` task reads from or writes to Apache Kafka topics.

The Kafka task operates in two modes depending on whether an input channel is provided:
- **Write mode** (with input channel): receives records from the input channel and sends them as messages to the Kafka topic. Writes are buffered and flushed in batches (see `batch_size` and `batch_flush_interval`). The task validates `batch_flush_interval < timeout` at runtime and will return an error in write mode if it's violated.
- **Read mode** (no input channel): polls messages from the Kafka topic and sends them to the output channel. The reader's polling is controlled by the configured `timeout` and `exit_on_empty` behavior (see below).
- **Read mode** (no input channel): polls messages from the Kafka topic and sends them to the output channel. The reader's polling is controlled by the configured `timeout` and `retry_limit` behavior (see below).

The task automatically determines its mode based on the presence of input/output channels.

Expand All @@ -17,21 +17,6 @@ When reading from a Kafka topic, there are two main modes of operation:
- **Standalone reader** (no consumer group): omit `group_id`; the reader pulls messages directly from partitions. Offsets are not coordinated across instances and are not committed.
- **Group consumer** (recommended for production): set `group_id`. Multiple instances with the same `group_id` split partitions between them and coordinate offsets. When `group_id` is set the task will commit offsets after processing messages.

#### ***Read termination***

The Kafka task exposes `exit_on_empty` to control how the reader responds to polling timeouts (`context.DeadlineExceeded`):

- **Indefinite mode (default)**: `exit_on_empty: false` (or omitted). The reader treats `context.DeadlineExceeded` as a normal "no message available" event and continues polling indefinitely. Empty polls do not stop the reader in this mode.

- **Exit-on-empty**: `exit_on_empty: true`. The reader counts consecutive `context.DeadlineExceeded` events in an `empty read` counter (starts at 0). When this counter becomes greater than `retry_limit` the reader stops gracefully. The counter is reset to 0 on any successful read.

#### Separate retry counter for other errors

There is a second retry counter for non-deadline errors (network, auth, broker errors). This `other error` counter is initialized to `retry_limit` in `Init()` and is decremented each time such an error occurs. When it reaches zero the reader stops with an error. The `other error` counter is reset to `retry_limit` on successful reads.
Why two counters?
- Keeps normal polling timeouts (no messages available) from exhausting retries intended for real errors.
- Allows an explicit "exit on quiet" behavior when `exit_on_empty: true` without reducing tolerance for transient broker errors.

## Configuration Fields

| Field | Type | Default | Description |
Expand All @@ -44,7 +29,6 @@ Why two counters?
| `batch_size` | int | `100` | Number of messages to buffer/flush for write and reader |
| `batch_flush_interval` | duration string | `2s` | Interval to flush incomplete write batches; must be less than `timeout` |
| `retry_limit` | int | `5` | Number used to initialize retry counters for read behavior |
| `exit_on_empty` | bool | `false` | When true the reader will stop after `retry_limit` consecutive empty polls (`context.DeadlineExceeded`). When false the reader continues polling indefinitely. |
| `group_id` | string | - | Consumer group id for group consumption (optional) |
| `server_auth_type` | string | `none` | `none` or `tls` — server certificate verification mode |
| `cert` | string | - | CA certificate PEM/CRT content used when `server_auth_type: tls` (alternatively use `cert_path`) |
Expand Down Expand Up @@ -162,7 +146,6 @@ tasks:
type: kafka
bootstrap_server: kafka.local:9092
topic: input-topic
exit_on_empty: true
retry_limit: 10
timeout: 5s
```
Expand All @@ -172,7 +155,6 @@ tasks:
- Group consumers enable scaling: Kafka will assign partitions across group members so each message is delivered only once to the group. When `group_id` is set, the task will commit offsets after processing messages.
- The task uses a single configured `timeout` (default 15s) for dial, read, write and commit operations. Dial attempts use the same `timeout` value for each connection attempt.
- Writes use the kafka-go `Writer` with configured `BatchSize` and `BatchTimeout` (`batch_flush_interval`). The task calls `WriteMessages` per record; kafka-go will buffer and flush according to these settings. This means write throughput and latency are primarily controlled by those kafka-go settings rather than explicit batching logic in this task.
- If you expect frequent short `timeout` values and `exit_on_empty: false`, the reader can busy-poll (repeated `DeadlineExceeded`) at the configured timeout rate. Consider increasing `timeout`, enabling `exit_on_empty` with an appropriate `retry_limit`, if you observe high CPU from tight polling loops.
- `mtls` is a placeholder in the code and currently returns an error / not implemented; client certificate authentication is not provided yet.

## Troubleshooting
Expand Down
98 changes: 33 additions & 65 deletions internal/pkg/pipeline/task/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,15 @@ type kafka struct {
GroupID string `yaml:"group_id,omitempty" json:"group_id,omitempty"` // the consumer group id (optional)
BatchSize int `yaml:"batch_size,omitempty" json:"batch_size,omitempty"` // number of messages to read/write in a batch
RetryLimit *int `yaml:"retry_limit,omitempty" json:"retry_limit,omitempty"` // number of retries for read errors
ExitOnEmpty bool `yaml:"exit_on_empty,omitempty" json:"exit_on_empty,omitempty"` // exit when no more messages are available
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason we are removing this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The major reason for that, is the fact that ExitOnEmpty is "a syntax sugar" and this state can be emulated with a timeout 100d and 5 retries. It simplifies code and person understanding of taks

Copy link
Contributor

@prasadlohakpure prasadlohakpure Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I still think this config should be present is:

  1. The kafka task is going to be replacement for sqs task, and sqs task has this behaviour currently. So for continuing the same behaviour and ease in migration from sqs to kafka.
  2. This config allows early exit of tasks in case we are not certain about number of messages present, e.g.:
    • This will allow hourly dags such as pdp/keepa to exit early, instead of waiting for a specified timeout.
    • Having a fixed timeout can cause early/delayed exits, consuming unnecessary processing time/latency.

I am not sure whether we can tackle above scenarios using timeout + retries

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ExitOnEmpty behaviour essentially was using timeout+retries behind the scenes.
So, the actual functionality is still the same but without the verbose "exit on empty" option.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latency is still there even if we have this option in or out.

Copy link
Contributor

@prasadlohakpure prasadlohakpure Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks for clarification.
I understand that behaviour can be emulated using above 2 params. Just that there are still going to be scenarios where I want to exit as soon as there are few messages left, or even no messages.

So it would be difficult to keep early exit condition inline with retries + timeout, everytime.

I think a better approach would have been finding available messages for current partition+consumer group, but that would complicate the implementation, and I am not sure if that is even possible.

ctx context.Context // parent context
timeout time.Duration // timeout duration calculated from Timeout
batchFlushInterval time.Duration // batch flush interval calculated from BatchFlushInterval
readErrorRetries int // number of retries left for read errors
emptyReadRetries int // a counter to track empty reads retries
}

func New() (task.Task, error) {
return &kafka{}, nil
}

func (k *kafka) Init() error {
k.ctx = context.Background()
if k.BootstrapServer == "" {
return fmt.Errorf("bootstrap_server is required")
}
Expand Down Expand Up @@ -86,15 +81,13 @@ func (k *kafka) Init() error {
}
k.timeout = time.Duration(k.Timeout)
k.batchFlushInterval = time.Duration(k.BatchFlushInterval)
k.readErrorRetries = *k.RetryLimit
k.emptyReadRetries = 0

// try connecting to kafka broker to validate config
dialer, err := k.dial()
if err != nil {
return fmt.Errorf("failed to create kafka dialer: %w", err)
}
dialCtx, cancel := context.WithTimeout(k.ctx, k.timeout)
dialCtx, cancel := context.WithTimeout(context.Background(), k.timeout)
defer cancel()
conn, err := dialer.DialContext(dialCtx, "tcp", k.BootstrapServer)
if err != nil {
Expand All @@ -110,17 +103,19 @@ func (k *kafka) Run(input <-chan *record.Record, output chan<- *record.Record) e
return task.ErrPresentInputOutput
}

ctx := context.Background()

// if input is not nil, this is a sink task
if input != nil {
return k.write(input)
return k.write(ctx, input)
}

// else, this is a source task
return k.read(output)
return k.read(ctx, output)
}

// write writes records from the input channel to the Kafka topic
func (k *kafka) write(input <-chan *record.Record) error {
func (k *kafka) write(ctx context.Context, input <-chan *record.Record) error {
if k.batchFlushInterval >= k.timeout {
return fmt.Errorf("batch_flush_interval (%s) must be less than timeout (%s)", k.batchFlushInterval, k.timeout)
}
Expand All @@ -145,7 +140,7 @@ func (k *kafka) write(input <-chan *record.Record) error {
}

// create a write context with timeout per message batch
wctx, cancel := context.WithTimeout(k.ctx, k.timeout)
wctx, cancel := context.WithTimeout(ctx, k.timeout)
err := writer.WriteMessages(wctx, kg.Message{Value: r.Data})
cancel()
if err != nil {
Expand All @@ -156,7 +151,7 @@ func (k *kafka) write(input <-chan *record.Record) error {
}

// read reads messages from the Kafka topic and sends them to the output channel
func (k *kafka) read(output chan<- *record.Record) error {
func (k *kafka) read(ctx context.Context, output chan<- *record.Record) error {
dialer, err := k.dial()
if err != nil {
return fmt.Errorf("failed to create kafka dialer: %w", err)
Expand All @@ -168,40 +163,30 @@ func (k *kafka) read(output chan<- *record.Record) error {
}
}()

retriesNumber := 0
for {
select {
case <-k.ctx.Done():
return nil
default:
// read with a timeout so we can check for cancellation periodically
fetchCtx, cancel := context.WithTimeout(k.ctx, k.timeout)
m, err := reader.FetchMessage(fetchCtx)
cancel()

if err != nil {
err, ok := k.handleReadError(err)
if !ok {
return err
}
continue
fetchCtx, cancel := context.WithTimeout(ctx, k.timeout)
m, err := reader.FetchMessage(fetchCtx)
cancel()
if err != nil {
if !k.shouldRetry(err) {
return err
}
k.readErrorRetries = *k.RetryLimit
k.emptyReadRetries = 0

// process the message
k.SendData(k.ctx, m.Value, output)

if k.GroupID == "" {
// if not using consumer group, no need to commit messages
continue
retriesNumber++
fmt.Printf("kafka error while reading message for attempt #%d with error: %v\n", retriesNumber, err)
if retriesNumber > *k.RetryLimit {
fmt.Printf("kafka error while reading message, reached retry limit (%d), stopping reader\n", *k.RetryLimit)
return nil
}
continue
}
retriesNumber = 0

k.SendData(ctx, m.Value, output)

// commit the message after successful processing
cctx, cancel := context.WithTimeout(k.ctx, k.timeout)
if k.GroupID != "" {
cctx, cancel := context.WithTimeout(ctx, k.timeout)
if err = reader.CommitMessages(cctx, m); err != nil {
// log the commit error but continue processing
// any new message will ensure that all previous
// messages are eventually committed
fmt.Printf("failed to commit message: %v\n", err)
}
cancel()
Expand Down Expand Up @@ -273,35 +258,18 @@ func (k *kafka) handleWriteError(err error) error {
return fmt.Errorf("failed to write message to kafka: %w", err)
}

// handleReadError processes errors returned from reader.FetchMessage
func (k *kafka) handleReadError(err error) (returnErr error, shouldRetry bool) {
// shouldRetry determines if a read error should be retried
func (k *kafka) shouldRetry(err error) bool {
if errors.Is(err, io.EOF) {
// this is not reliable for kafka end of topic detection
fmt.Printf("kafka reached end of topic: %v\n", k.Topic)
return nil, false
return false
}
if errors.Is(err, context.Canceled) {
// not an error, just context cancellation
fmt.Printf("kafka reader context canceled: %v\n", err)
return nil, false
}
if errors.Is(err, context.DeadlineExceeded) {
// not an error, just context deadline exceeded
k.emptyReadRetries++
fmt.Printf("kafka no message returned while reading message for attempt #%d with error: %v\n", k.emptyReadRetries, err)
if k.ExitOnEmpty && k.emptyReadRetries > *k.RetryLimit {
fmt.Printf("kafka no message returned, reached retry limit (%d), stopping reader", *k.RetryLimit)
return nil, false
}
return nil, true
}
// other errors - log and retry up to retry limit
fmt.Printf("kafka error while reading message for attempt #%d with error: %v\n", *k.RetryLimit-k.readErrorRetries+1, err)
k.readErrorRetries--
if k.readErrorRetries <= 0 {
return fmt.Errorf("kafka reached read error retry limit (%d), stopping reader", *k.RetryLimit), false
return false
}
return nil, true

return true
}

// createDialer creates a kafka dialer with optional SASL mechanism and TLS configuration
Expand Down
2 changes: 1 addition & 1 deletion test/pipelines/kafka_group_read.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ tasks:
username: test-caterpillar
password: {{ secret "/kafka/test-caterpillar/secret" }}
server_auth_type: tls
exit_on_empty: true
retry_limit: 1
cert: |
{{ indent 6 (secret "/kafka/ca-cert") }}
task_concurrency: 3

- name: echo_results
type: echo
Expand Down
1 change: 0 additions & 1 deletion test/pipelines/kafka_read.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ tasks:
username: test-caterpillar
password: {{ secret "/kafka/test-caterpillar/secret" }}
server_auth_type: tls
exit_on_empty: false # this is the default value
timeout: 2s
cert: |
{{ indent 6 (secret "/kafka/ca-cert") }}
Expand Down