-
Notifications
You must be signed in to change notification settings - Fork 284
Update kafka error handling #3201
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3201 +/- ##
==========================================
+ Coverage 79.63% 79.92% +0.28%
==========================================
Files 1177 1184 +7
Lines 21702 21824 +122
Branches 4221 4251 +30
==========================================
+ Hits 17282 17442 +160
+ Misses 3678 3635 -43
- Partials 742 747 +5 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Left 1 question
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR enhances the Kafka destination with improved error handling, configurable batching capabilities, and more robust testing. The main focus is on transitioning from throwing integration errors to returning detailed multi-status responses for better granular error handling.
Key changes include:
- Enhanced error handling with HTTP status code mapping and multi-status responses
- Added configurable batching fields (
enable_batching
andbatch_keys
) to the action definition - Comprehensive test coverage expansion for Kafka utilities including error scenarios and connection management
Reviewed Changes
Copilot reviewed 7 out of 8 changed files in this pull request and generated 2 comments.
Show a summary per file
File | Description |
---|---|
packages/destination-actions/src/destinations/kafka/utils.ts |
Core refactoring: added error mapping functions, multi-status response handling, batching statistics, and retry configuration |
packages/destination-actions/src/destinations/kafka/send/index.ts |
Added new batching configuration fields and updated performBatch to return multi-status response |
packages/destination-actions/src/destinations/kafka/send/generated-types.ts |
Added type definitions for new batching fields |
packages/destination-actions/src/destinations/kafka/index.ts |
Updated authentication mechanism description to remove AWS IAM references |
packages/destination-actions/src/destinations/kafka/generated-types.ts |
Updated generated types to reflect authentication mechanism description changes |
packages/destination-actions/src/destinations/kafka/__tests__/utils.test.ts |
Comprehensive new test suite covering validation, error handling, SSL configuration, and producer caching |
packages/destination-actions/src/destinations/kafka/send/__tests__/index.test.ts |
Updated existing tests to reflect new retry configuration in Kafka initialization |
packages/destination-actions/src/destinations/kafka/send/index.ts
Outdated
Show resolved
Hide resolved
await sendData(settings, [payload], features, statsContext) | ||
}, | ||
performBatch: async (_request, { settings, payload, features, statsContext }) => { | ||
await sendData(settings, payload, features, statsContext) | ||
return await sendData(settings, payload, features, statsContext) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are you returning in the performBatch() but not perform()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because, going forward we will only be allowing batch mode kafka destination. No stream mode. This is because parsing responses for non http actions are problematic for tracing in event-tester
packages/destination-actions/src/destinations/kafka/send/index.ts
Outdated
Show resolved
Hide resolved
8abd8d9
This reverts commit ae32e45.
JIRA: https://twilio-engineering.atlassian.net/browse/STRATCONN-6074
Summary
This pull request introduces several improvements and new features to the Kafka destination, focusing on enhanced batching capabilities, improved error handling, and more robust testing. The main changes include adding support for configurable batching of messages, refactoring error handling to provide more granular responses, and expanding test coverage for Kafka utility functions.
Batching and Payload Enhancements:
Payload
interface and the action definition (enable_batching
andbatch_keys
) to support configurable batching of events before sending them to Kafka. These fields are hidden and default to batching by partition keys. [1] [2]sendData
utility to process batches according to the new configuration, and to record batch key statistics for observability.Error Handling Improvements:
sendData
to map Kafka errors to appropriate HTTP status codes, distinguishing between retriable and non-retriable errors, and to return detailedMultiStatusResponse
objects for each message.kafkaErrorToHttpStatus
andfillKafkaMultiStatusErrorResponse
for consistent error mapping and response population.Testing and Validation:
Documentation Updates:
mechanism
field in both the settings type and the UI to remove references to AWS IAM, reflecting current support. [1] [2]Other Internal Refactoring:
These changes collectively make the Kafka integration more robust, observable, and adaptable to new batching requirements.
Screenshots:
Multi-status
Batch keys and batching enabled
Testing Batch keys:
Notebook: https://segment.datadoghq.com/notebook/12973425/kafka-analyisis?range=1394904&start=1756983270126&live=false
Instance: https://app.segment.build/arijit-dev/destinations/actions-kafka/sources/http_api/instances/6880c59b62bcbb1aabdd3887/actions/mQBdMEYYKWsh6Fx2JCQeEt
Note: We need to run a ctl-plane-migration to enable batching as default for kafka and run
actions-cli push-default
to register defaults for batch keys.PR: https://github.com/segmentio/control-plane/pull/6191
Testing
Include any additional information about the testing you have completed to
ensure your changes behave as expected. For a speedy review, please check
any of the tasks you completed below during your testing.