Skip to content

Commit e776316

Browse files
mergify[bot]belimawroakrizankhushijain21
authored
[8.17](backport #42401) Handle authorization errors in Kafka output (#42844)
* Handle authorization errors in Kafka output (#42401) When there is an authorisation error in the Kafka output, the events are dropped and an error message is logged. (cherry picked from commit 5720300) * Update CHANGELOG.next.asciidoc * Fix changelog --------- Co-authored-by: Tiago Queiroz <[email protected]> Co-authored-by: Olga Naydyonock <[email protected]> Co-authored-by: Khushi Jain <[email protected]>
1 parent 4df6821 commit e776316

File tree

3 files changed

+68
-0
lines changed

3 files changed

+68
-0
lines changed

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
7777
- Support Elastic Agent control protocol chunking support {pull}37343[37343]
7878
- Lower logging level to debug when attempting to configure beats with unknown fields from autodiscovered events/environments {pull}[37816][37816]
7979
- Set timeout of 1 minute for FQDN requests {pull}37756[37756]
80+
- The Kafka output now drops events when there is an authorization error. {issue}42343[42343] {pull}42401[42401]
8081
- 'add_cloud_metadata' processor - improve AWS provider HTTP client overriding to support custom certificate bundle handling {pull}44189[44189]
8182
- The Elasticsearch output now correctly applies exponential backoff when being throttled by 429s ("too many requests") from Elasticsarch. {issue}36926[36926] {pull}45073[45073]
8283

libbeat/outputs/kafka/client.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,20 @@ type msgRef struct {
7070

7171
var (
7272
errNoTopicsSelected = errors.New("no topic could be selected")
73+
74+
// authErrors are authentication/authorisation errors that will cause
75+
// the event to be dropped
76+
authErrors = []error{
77+
sarama.ErrTopicAuthorizationFailed,
78+
sarama.ErrGroupAuthorizationFailed,
79+
sarama.ErrClusterAuthorizationFailed,
80+
// I believe those are handled before the connection is
81+
// stabilised, however we also handle them here just in
82+
// case
83+
sarama.ErrUnsupportedSASLMechanism,
84+
sarama.ErrIllegalSASLState,
85+
sarama.ErrSASLAuthenticationFailed,
86+
}
7387
)
7488

7589
func newKafkaClient(
@@ -368,6 +382,10 @@ func (r *msgRef) fail(msg *message, err error) {
368382
len(msg.key)+len(msg.value))
369383
r.client.observer.PermanentErrors(1)
370384

385+
case isAuthError(err):
386+
r.client.log.Errorf("Kafka (topic=%v): authorisation error: %s", msg.topic, err)
387+
r.client.observer.PermanentErrors(1)
388+
371389
case errors.Is(err, breaker.ErrBreakerOpen):
372390
// Add this message to the failed list, but don't overwrite r.err since
373391
// all the breaker error means is "there were a lot of other errors".
@@ -425,3 +443,13 @@ func (c *client) Test(d testing.Driver) {
425443
}
426444

427445
}
446+
447+
func isAuthError(err error) bool {
448+
for _, e := range authErrors {
449+
if errors.Is(err, e) {
450+
return true
451+
}
452+
}
453+
454+
return false
455+
}

libbeat/tests/integration/kafka_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,3 +87,42 @@ func TestKafkaOutputCanConnectAndPublish(t *testing.T) {
8787
10*time.Second,
8888
"did not find finished batch log")
8989
}
90+
91+
func TestAuthorisationErrors(t *testing.T) {
92+
leader := sarama.NewMockBroker(t, 1)
93+
defer leader.Close()
94+
95+
// The mock broker must respond to a single metadata request.
96+
metadataResponse := new(sarama.MetadataResponse)
97+
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
98+
metadataResponse.AddTopicPartition(kafkaTopic, 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError)
99+
leader.Returns(metadataResponse)
100+
101+
authErrors := []sarama.KError{
102+
sarama.ErrTopicAuthorizationFailed,
103+
sarama.ErrGroupAuthorizationFailed,
104+
sarama.ErrClusterAuthorizationFailed,
105+
}
106+
107+
// The mock broker must return one produce response per error we want
108+
// to test. If less calls are made, the test will fail
109+
for _, err := range authErrors {
110+
producerResponse := new(sarama.ProduceResponse)
111+
producerResponse.AddTopicPartition(kafkaTopic, 0, err)
112+
leader.Returns(producerResponse)
113+
}
114+
115+
// Start mockbeat with the appropriate configuration.
116+
mockbeat := NewBeat(t, "mockbeat", "../../libbeat.test")
117+
mockbeat.WriteConfigFile(fmt.Sprintf(kafkaCfg, kafkaTopic, kafkaVersion, leader.Addr()))
118+
mockbeat.Start()
119+
120+
// Wait for mockbeat to log each of the errors.
121+
for _, err := range authErrors {
122+
t.Log("waiting for:", err)
123+
mockbeat.WaitForLogs(
124+
fmt.Sprintf("Kafka (topic=test_topic): authorisation error: %s", err),
125+
10*time.Second,
126+
"did not find error log: %s", err)
127+
}
128+
}

0 commit comments

Comments
 (0)