From 2b8f2fd3b27987df7a95cae0b8aac4787968e803 Mon Sep 17 00:00:00 2001 From: Zbynek Roubalik Date: Thu, 14 Jul 2022 16:35:45 +0200 Subject: [PATCH] Kafka Scaler: check `lagThreshold` is a positive number (#3367) --- CHANGELOG.md | 1 + pkg/scalers/kafka_scaler.go | 9 ++++++--- pkg/scalers/kafka_scaler_test.go | 8 ++++++-- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8900ec5ed5b..cba40809649 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -61,6 +61,7 @@ To learn more about our roadmap, we recommend reading [this document](ROADMAP.md - **ActiveMQ Scaler:** KEDA doesn't respect restAPITemplate ([#3188](https://github.com/kedacore/keda/issues/3188)) - **Azure Eventhub Scaler:** KEDA operator crashes on nil memory panic if the eventhub connectionstring for Azure Eventhub Scaler contains an invalid character ([#3082](https://github.com/kedacore/keda/issues/3082)) - **Azure Pipelines Scaler:** Fix issue with Azure Pipelines wrong PAT Auth. ([#3159](https://github.com/kedacore/keda/issues/3159)) +- **Kafka Scaler:** Check `lagThreshold` is a positive number ([#3366](https://github.com/kedacore/keda/issues/3366)) ### Deprecations diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index 961a3827d77..9f44b9aeaa9 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -177,7 +177,7 @@ func parseKafkaMetadata(config *ScalerConfig) (kafkaMetadata, error) { meta.topic = config.TriggerMetadata["topic"] default: meta.topic = "" - kafkaLog.V(1).Info(fmt.Sprintf("consumer group %s has no topic specified, "+ + kafkaLog.V(1).Info(fmt.Sprintf("consumer group %q has no topic specified, "+ "will use all topics subscribed by the consumer group for scaling", meta.group)) } @@ -186,7 +186,7 @@ func parseKafkaMetadata(config *ScalerConfig) (kafkaMetadata, error) { if config.TriggerMetadata["offsetResetPolicy"] != "" { policy := offsetResetPolicy(config.TriggerMetadata["offsetResetPolicy"]) if policy != earliest && policy != latest { - return meta, fmt.Errorf("err offsetResetPolicy policy %s given", policy) + return meta, fmt.Errorf("err offsetResetPolicy policy %q given", policy) } meta.offsetResetPolicy = policy } @@ -196,7 +196,10 @@ func parseKafkaMetadata(config *ScalerConfig) (kafkaMetadata, error) { if val, ok := config.TriggerMetadata[lagThresholdMetricName]; ok { t, err := strconv.ParseInt(val, 10, 64) if err != nil { - return meta, fmt.Errorf("error parsing %s: %s", lagThresholdMetricName, err) + return meta, fmt.Errorf("error parsing %q: %s", lagThresholdMetricName, err) + } + if t <= 0 { + return meta, fmt.Errorf("%q must be positive number", lagThresholdMetricName) } meta.lagThreshold = t } diff --git a/pkg/scalers/kafka_scaler_test.go b/pkg/scalers/kafka_scaler_test.go index 166e37d628f..acbecd0a998 100644 --- a/pkg/scalers/kafka_scaler_test.go +++ b/pkg/scalers/kafka_scaler_test.go @@ -56,6 +56,10 @@ var parseKafkaMetadataTestDataset = []parseKafkaMetadataTestData{ {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, false, 1, []string{"foobar:9092"}, "my-group", "", offsetResetPolicy("latest"), false}, // failure, version not supported {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "version": "1.2.3.4"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false}, + // failure, lagThreshold is negative value + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "-1"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false}, + // failure, lagThreshold is 0 + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "0"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false}, // success {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false}, // success, more brokers @@ -118,8 +122,8 @@ var parseKafkaAuthParamsTestDataset = []parseKafkaAuthParamsTestData{ } var kafkaMetricIdentifiers = []kafkaMetricIdentifier{ - {&parseKafkaMetadataTestDataset[4], 0, "s0-kafka-my-topic"}, - {&parseKafkaMetadataTestDataset[4], 1, "s1-kafka-my-topic"}, + {&parseKafkaMetadataTestDataset[6], 0, "s0-kafka-my-topic"}, + {&parseKafkaMetadataTestDataset[6], 1, "s1-kafka-my-topic"}, {&parseKafkaMetadataTestDataset[2], 1, "s1-kafka-my-group-topics"}, }