Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion internal/pkg/pipeline/task/compress/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func (c *core) Run(input <-chan *record.Record, output chan<- *record.Record) (e
return task.ErrNilInput
}


for {
r, ok := c.GetRecord(input)
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/pipeline/task/converter/sst.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (c *sst) convert(data []byte, d string) ([]converterOutput, error) {
return nil, err
}
defer os.Remove(fileName)

sstData, err := os.ReadFile(fileName)
if err != nil {
return nil, err
Expand Down
54 changes: 27 additions & 27 deletions internal/pkg/pipeline/task/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,15 @@ type kafka struct {
BatchSize int `yaml:"batch_size,omitempty" json:"batch_size,omitempty"` // number of messages to read/write in a batch
RetryLimit *int `yaml:"retry_limit,omitempty" json:"retry_limit,omitempty"` // number of retries for read errors
ExitOnEmpty bool `yaml:"exit_on_empty,omitempty" json:"exit_on_empty,omitempty"` // exit when no more messages are available
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason we are removing this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The major reason for that, is the fact that ExitOnEmpty is "a syntax sugar" and this state can be emulated with a timeout 100d and 5 retries. It simplifies code and person understanding of taks

Copy link
Contributor

@prasadlohakpure prasadlohakpure Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I still think this config should be present is:

  1. The kafka task is going to be replacement for sqs task, and sqs task has this behaviour currently. So for continuing the same behaviour and ease in migration from sqs to kafka.
  2. This config allows early exit of tasks in case we are not certain about number of messages present, e.g.:
    • This will allow hourly dags such as pdp/keepa to exit early, instead of waiting for a specified timeout.
    • Having a fixed timeout can cause early/delayed exits, consuming unnecessary processing time/latency.

I am not sure whether we can tackle above scenarios using timeout + retries

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ExitOnEmpty behaviour essentially was using timeout+retries behind the scenes.
So, the actual functionality is still the same but without the verbose "exit on empty" option.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latency is still there even if we have this option in or out.

Copy link
Contributor

@prasadlohakpure prasadlohakpure Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks for clarification.
I understand that behaviour can be emulated using above 2 params. Just that there are still going to be scenarios where I want to exit as soon as there are few messages left, or even no messages.

So it would be difficult to keep early exit condition inline with retries + timeout, everytime.

I think a better approach would have been finding available messages for current partition+consumer group, but that would complicate the implementation, and I am not sure if that is even possible.

ctx context.Context // parent context
timeout time.Duration // timeout duration calculated from Timeout
batchFlushInterval time.Duration // batch flush interval calculated from BatchFlushInterval
readErrorRetries int // number of retries left for read errors
emptyReadRetries int // a counter to track empty reads retries
}

func New() (task.Task, error) {
return &kafka{}, nil
}

func (k *kafka) Init() error {
k.ctx = context.Background()
if k.BootstrapServer == "" {
return fmt.Errorf("bootstrap_server is required")
}
Expand Down Expand Up @@ -86,15 +82,13 @@ func (k *kafka) Init() error {
}
k.timeout = time.Duration(k.Timeout)
k.batchFlushInterval = time.Duration(k.BatchFlushInterval)
k.readErrorRetries = *k.RetryLimit
k.emptyReadRetries = 0

// try connecting to kafka broker to validate config
dialer, err := k.dial()
if err != nil {
return fmt.Errorf("failed to create kafka dialer: %w", err)
}
dialCtx, cancel := context.WithTimeout(k.ctx, k.timeout)
dialCtx, cancel := context.WithTimeout(context.Background(), k.timeout)
defer cancel()
conn, err := dialer.DialContext(dialCtx, "tcp", k.BootstrapServer)
if err != nil {
Expand All @@ -110,17 +104,19 @@ func (k *kafka) Run(input <-chan *record.Record, output chan<- *record.Record) e
return task.ErrPresentInputOutput
}

runCtx := context.Background()

// if input is not nil, this is a sink task
if input != nil {
return k.write(input)
return k.write(input, runCtx)
}

// else, this is a source task
return k.read(output)
return k.read(output, runCtx)
}

// write writes records from the input channel to the Kafka topic
func (k *kafka) write(input <-chan *record.Record) error {
func (k *kafka) write(input <-chan *record.Record, runCtx context.Context) error {
if k.batchFlushInterval >= k.timeout {
return fmt.Errorf("batch_flush_interval (%s) must be less than timeout (%s)", k.batchFlushInterval, k.timeout)
}
Expand All @@ -145,7 +141,7 @@ func (k *kafka) write(input <-chan *record.Record) error {
}

// create a write context with timeout per message batch
wctx, cancel := context.WithTimeout(k.ctx, k.timeout)
wctx, cancel := context.WithTimeout(runCtx, k.timeout)
err := writer.WriteMessages(wctx, kg.Message{Value: r.Data})
cancel()
if err != nil {
Expand All @@ -156,7 +152,7 @@ func (k *kafka) write(input <-chan *record.Record) error {
}

// read reads messages from the Kafka topic and sends them to the output channel
func (k *kafka) read(output chan<- *record.Record) error {
func (k *kafka) read(output chan<- *record.Record, runCtx context.Context) error {
dialer, err := k.dial()
if err != nil {
return fmt.Errorf("failed to create kafka dialer: %w", err)
Expand All @@ -168,36 +164,40 @@ func (k *kafka) read(output chan<- *record.Record) error {
}
}()

// per-run retry counters (local to this invocation)
readErrRetries := *k.RetryLimit
emptyReads := 0

for {
select {
case <-k.ctx.Done():
case <-runCtx.Done():
return nil
default:
// read with a timeout so we can check for cancellation periodically
fetchCtx, cancel := context.WithTimeout(k.ctx, k.timeout)
fetchCtx, cancel := context.WithTimeout(runCtx, k.timeout)
m, err := reader.FetchMessage(fetchCtx)
cancel()

if err != nil {
err, ok := k.handleReadError(err)
err, ok := k.handleReadError(err, &readErrRetries, &emptyReads)
if !ok {
return err
}
continue
}
k.readErrorRetries = *k.RetryLimit
k.emptyReadRetries = 0
readErrRetries = *k.RetryLimit
emptyReads = 0

// process the message
k.SendData(k.ctx, m.Value, output)
k.SendData(runCtx, m.Value, output)

if k.GroupID == "" {
// if not using consumer group, no need to commit messages
continue
}

// commit the message after successful processing
cctx, cancel := context.WithTimeout(k.ctx, k.timeout)
cctx, cancel := context.WithTimeout(runCtx, k.timeout)
if err = reader.CommitMessages(cctx, m); err != nil {
// log the commit error but continue processing
// any new message will ensure that all previous
Expand Down Expand Up @@ -274,7 +274,7 @@ func (k *kafka) handleWriteError(err error) error {
}

// handleReadError processes errors returned from reader.FetchMessage
func (k *kafka) handleReadError(err error) (returnErr error, shouldRetry bool) {
func (k *kafka) handleReadError(err error, readErrRetries *int, emptyReads *int) (returnErr error, shouldRetry bool) {
if errors.Is(err, io.EOF) {
// this is not reliable for kafka end of topic detection
fmt.Printf("kafka reached end of topic: %v\n", k.Topic)
Expand All @@ -287,18 +287,18 @@ func (k *kafka) handleReadError(err error) (returnErr error, shouldRetry bool) {
}
if errors.Is(err, context.DeadlineExceeded) {
// not an error, just context deadline exceeded
k.emptyReadRetries++
fmt.Printf("kafka no message returned while reading message for attempt #%d with error: %v\n", k.emptyReadRetries, err)
if k.ExitOnEmpty && k.emptyReadRetries > *k.RetryLimit {
fmt.Printf("kafka no message returned, reached retry limit (%d), stopping reader", *k.RetryLimit)
(*emptyReads)++
fmt.Printf("kafka no message returned while reading message for attempt #%d with error: %v\n", *emptyReads, err)
if k.ExitOnEmpty && *emptyReads > *k.RetryLimit {
fmt.Printf("kafka no message returned, reached retry limit (%d), stopping reader\n", *k.RetryLimit)
return nil, false
}
return nil, true
}
// other errors - log and retry up to retry limit
fmt.Printf("kafka error while reading message for attempt #%d with error: %v\n", *k.RetryLimit-k.readErrorRetries+1, err)
k.readErrorRetries--
if k.readErrorRetries <= 0 {
fmt.Printf("kafka error while reading message for attempt #%d with error: %v\n", (*k.RetryLimit-*readErrRetries)+1, err)
(*readErrRetries)--
if *readErrRetries <= 0 {
return fmt.Errorf("kafka reached read error retry limit (%d), stopping reader", *k.RetryLimit), false
}
return nil, true
Expand Down
1 change: 1 addition & 0 deletions test/pipelines/kafka_group_read.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ tasks:
retry_limit: 1
cert: |
{{ indent 6 (secret "/kafka/ca-cert") }}
task_concurrency: 3

- name: echo_results
type: echo
Expand Down