diff --git a/CHANGELOG.md b/CHANGELOG.md index f0178742613..418211e8f5c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -65,6 +65,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio - **General:** Improve the function used to normalize metric names ([#3789](https://github.com/kedacore/keda/issues/3789) - **Apache Kafka Scaler:** SASL/OAuthbearer Implementation ([#3681](https://github.com/kedacore/keda/issues/3681)) - **Apache Kafka Scaler:** Limit Kafka Partitions KEDA operates on ([#3830](https://github.com/kedacore/keda/issues/3830)) +- **Apache Kafka Scaler:** Implementation for Excluding Persistent Lag ([#3904](https://github.com/kedacore/keda/issues/3904)) - **Azure AD Pod Identity Authentication:** Improve error messages to emphasize problems around the integration with aad-pod-identity itself ([#3610](https://github.com/kedacore/keda/issues/3610)) - **Azure Event Hub Scaler:** Support Azure Active Direcotry Pod & Workload Identity for Storage Blobs ([#3569](https://github.com/kedacore/keda/issues/3569)) - **Azure Event Hub Scaler:** Support using connection strings for Event Hub namespace instead of the Event Hub itself. ([#3922](https://github.com/kedacore/keda/issues/3922)) diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index 603cc717724..8ce8a159694 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -17,11 +17,12 @@ import ( ) type kafkaScaler struct { - metricType v2.MetricTargetType - metadata kafkaMetadata - client sarama.Client - admin sarama.ClusterAdmin - logger logr.Logger + metricType v2.MetricTargetType + metadata kafkaMetadata + client sarama.Client + admin sarama.ClusterAdmin + logger logr.Logger + previousOffsets map[string]map[int32]int64 } type kafkaMetadata struct { @@ -33,6 +34,7 @@ type kafkaMetadata struct { activationLagThreshold int64 offsetResetPolicy offsetResetPolicy allowIdleConsumers bool + excludePersistentLag bool version sarama.KafkaVersion // If an invalid offset is found, whether to scale to 1 (false - the default) so consumption can @@ -105,12 +107,15 @@ func NewKafkaScaler(config *ScalerConfig) (Scaler, error) { return nil, err } + previousOffsets := make(map[string]map[int32]int64) + return &kafkaScaler{ - client: client, - admin: admin, - metricType: metricType, - metadata: kafkaMetadata, - logger: logger, + client: client, + admin: admin, + metricType: metricType, + metadata: kafkaMetadata, + logger: logger, + previousOffsets: previousOffsets, }, nil } @@ -270,6 +275,15 @@ func parseKafkaMetadata(config *ScalerConfig, logger logr.Logger) (kafkaMetadata meta.allowIdleConsumers = t } + meta.excludePersistentLag = false + if val, ok := config.TriggerMetadata["excludePersistentLag"]; ok { + t, err := strconv.ParseBool(val) + if err != nil { + return meta, fmt.Errorf("error parsing excludePersistentLag: %s", err) + } + meta.excludePersistentLag = t + } + meta.scaleToZeroOnInvalidOffset = false if val, ok := config.TriggerMetadata["scaleToZeroOnInvalidOffset"]; ok { t, err := strconv.ParseBool(val) @@ -293,13 +307,14 @@ func parseKafkaMetadata(config *ScalerConfig, logger logr.Logger) (kafkaMetadata } // IsActive determines if we need to scale from zero +// When replicas is zero, all lag will be deemed as persistent, hence use totalLagWithPersistent to determine scaling. func (s *kafkaScaler) IsActive(ctx context.Context) (bool, error) { - totalLag, err := s.getTotalLag() + _, totalLagWithPersistent, err := s.getTotalLag() if err != nil { return false, err } - return totalLag > s.metadata.activationLagThreshold, nil + return totalLagWithPersistent > s.metadata.activationLagThreshold, nil } func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin, error) { @@ -433,12 +448,16 @@ func (s *kafkaScaler) getConsumerOffsets(topicPartitions map[string][]int32) (*s return offsets, nil } -func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offsets *sarama.OffsetFetchResponse, topicPartitionOffsets map[string]map[int32]int64) (int64, error) { +// getLagForPartition returns (lag, lagWithPersistent, error) +// When excludePersistentLag is set to `false` (default), lag will always be equal to lagWithPersistent +// When excludePersistentLag is set to `true`, if partition is deemed to have persistent lag, lag will be set to 0 and lagWithPersistent will be latestOffset - consumerOffset +// These return values will allow proper scaling from 0 -> 1 replicas by the IsActive func. +func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offsets *sarama.OffsetFetchResponse, topicPartitionOffsets map[string]map[int32]int64) (int64, int64, error) { block := offsets.GetBlock(topic, partitionID) if block == nil { errMsg := fmt.Errorf("error finding offset block for topic %s and partition %d", topic, partitionID) s.logger.Error(errMsg, "") - return 0, errMsg + return 0, 0, errMsg } if block.Err > 0 { errMsg := fmt.Errorf("error finding offset block for topic %s and partition %d: %s", topic, partitionID, offsets.Err.Error()) @@ -455,17 +474,39 @@ func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offset "invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet. Returning with lag of %d", topic, s.metadata.group, partitionID, retVal) s.logger.V(1).Info(msg) - return retVal, nil + return retVal, retVal, nil } if _, found := topicPartitionOffsets[topic]; !found { - return 0, fmt.Errorf("error finding partition offset for topic %s", topic) + return 0, 0, fmt.Errorf("error finding partition offset for topic %s", topic) } latestOffset := topicPartitionOffsets[topic][partitionID] if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == earliest { - return latestOffset, nil + return latestOffset, latestOffset, nil } - return latestOffset - consumerOffset, nil + + // This code block tries to prevent KEDA Kafka trigger from scaling the scale target based on erroneous events + if s.metadata.excludePersistentLag { + switch previousOffset, found := s.previousOffsets[topic][partitionID]; { + case !found: + // No record of previous offset, so store current consumer offset + // Allow this consumer lag to be considered in scaling + if _, topicFound := s.previousOffsets[topic]; !topicFound { + s.previousOffsets[topic] = map[int32]int64{partitionID: consumerOffset} + } else { + s.previousOffsets[topic][partitionID] = consumerOffset + } + case previousOffset == consumerOffset: + // Indicates consumer is still on the same offset as the previous polling cycle, there may be some issue with consuming this offset. + // return 0, so this consumer lag is not considered for scaling + return 0, latestOffset - consumerOffset, nil + default: + // Successfully Consumed some messages, proceed to change the previous offset + s.previousOffsets[topic][partitionID] = consumerOffset + } + } + + return latestOffset - consumerOffset, latestOffset - consumerOffset, nil } // Close closes the kafka admin and client @@ -535,7 +576,7 @@ func (s *kafkaScaler) getConsumerAndProducerOffsets(topicPartitions map[string][ // GetMetrics returns value for a supported metric and an error if there is a problem getting the metric func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, error) { - totalLag, err := s.getTotalLag() + totalLag, _, err := s.getTotalLag() if err != nil { return []external_metrics.ExternalMetricValue{}, err } @@ -544,24 +585,29 @@ func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string) ([]exte return append([]external_metrics.ExternalMetricValue{}, metric), nil } -func (s *kafkaScaler) getTotalLag() (int64, error) { +// getTotalLag returns totalLag, totalLagWithPersistent, error +// totalLag and totalLagWithPersistent are the summations of lag and lagWithPersistent returned by getLagForPartition function respectively. +// totalLag maybe less than totalLagWithPersistent when excludePersistentLag is set to `true` due to some partitions deemed as having persistent lag +func (s *kafkaScaler) getTotalLag() (int64, int64, error) { topicPartitions, err := s.getTopicPartitions() if err != nil { - return 0, err + return 0, 0, err } consumerOffsets, producerOffsets, err := s.getConsumerAndProducerOffsets(topicPartitions) if err != nil { - return 0, err + return 0, 0, err } totalLag := int64(0) + totalLagWithPersistent := int64(0) totalTopicPartitions := int64(0) for topic, partitionsOffsets := range producerOffsets { for partition := range partitionsOffsets { - lag, _ := s.getLagForPartition(topic, partition, consumerOffsets, producerOffsets) + lag, lagWithPersistent, _ := s.getLagForPartition(topic, partition, consumerOffsets, producerOffsets) totalLag += lag + totalLagWithPersistent += lagWithPersistent } totalTopicPartitions += (int64)(len(partitionsOffsets)) } @@ -573,7 +619,7 @@ func (s *kafkaScaler) getTotalLag() (int64, error) { totalLag = totalTopicPartitions * s.metadata.lagThreshold } } - return totalLag, nil + return totalLag, totalLagWithPersistent, nil } type brokerOffsetResult struct { diff --git a/pkg/scalers/kafka_scaler_test.go b/pkg/scalers/kafka_scaler_test.go index c51445b4047..071dcebc45b 100644 --- a/pkg/scalers/kafka_scaler_test.go +++ b/pkg/scalers/kafka_scaler_test.go @@ -11,15 +11,16 @@ import ( ) type parseKafkaMetadataTestData struct { - metadata map[string]string - isError bool - numBrokers int - brokers []string - group string - topic string - partitionLimitation []int32 - offsetResetPolicy offsetResetPolicy - allowIdleConsumers bool + metadata map[string]string + isError bool + numBrokers int + brokers []string + group string + topic string + partitionLimitation []int32 + offsetResetPolicy offsetResetPolicy + allowIdleConsumers bool + excludePersistentLag bool } type parseKafkaAuthParamsTestData struct { @@ -54,45 +55,49 @@ var validWithoutAuthParams = map[string]string{} var parseKafkaMetadataTestDataset = []parseKafkaMetadataTestData{ // failure, no bootstrapServers - {map[string]string{}, true, 0, nil, "", "", nil, "", false}, + {map[string]string{}, true, 0, nil, "", "", nil, "", false, false}, // failure, no consumer group - {map[string]string{"bootstrapServers": "foobar:9092"}, true, 1, []string{"foobar:9092"}, "", "", nil, "latest", false}, + {map[string]string{"bootstrapServers": "foobar:9092"}, true, 1, []string{"foobar:9092"}, "", "", nil, "latest", false, false}, // success, no topic - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, false, 1, []string{"foobar:9092"}, "my-group", "", nil, offsetResetPolicy("latest"), false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, false, 1, []string{"foobar:9092"}, "my-group", "", nil, offsetResetPolicy("latest"), false, false}, // success, ignore partitionLimitation if no topic - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": "1,2,3,4,5,6"}, false, 1, []string{"foobar:9092"}, "my-group", "", nil, offsetResetPolicy("latest"), false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": "1,2,3,4,5,6"}, false, 1, []string{"foobar:9092"}, "my-group", "", nil, offsetResetPolicy("latest"), false, false}, // failure, version not supported - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "version": "1.2.3.4"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "version": "1.2.3.4"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false}, // failure, lagThreshold is negative value - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "-1"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "-1"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false}, // failure, lagThreshold is 0 - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "0"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "0"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false}, // failure, activationLagThreshold is not int - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "10", "activationLagThreshold": "AA"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "10", "activationLagThreshold": "AA"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false}, // success - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false}, // success, partitionLimitation as list - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": "1,2,3,4"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", []int32{1, 2, 3, 4}, offsetResetPolicy("latest"), false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": "1,2,3,4"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", []int32{1, 2, 3, 4}, offsetResetPolicy("latest"), false, false}, // success, partitionLimitation as range - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": "1-4"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", []int32{1, 2, 3, 4}, offsetResetPolicy("latest"), false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": "1-4"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", []int32{1, 2, 3, 4}, offsetResetPolicy("latest"), false, false}, // success, partitionLimitation mixed list + ranges - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": "1-4,8,10-12"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", []int32{1, 2, 3, 4, 8, 10, 11, 12}, offsetResetPolicy("latest"), false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": "1-4,8,10-12"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", []int32{1, 2, 3, 4, 8, 10, 11, 12}, offsetResetPolicy("latest"), false, false}, // failure, partitionLimitation wrong data type - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": "a,b,c,d"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": "a,b,c,d"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false}, // success, more brokers - {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false}, + {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false}, // success, offsetResetPolicy policy latest - {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "latest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false}, + {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "latest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false}, // failure, offsetResetPolicy policy wrong - {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "foo"}, true, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", nil, "", false}, + {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "foo"}, true, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", nil, "", false, false}, // success, offsetResetPolicy policy earliest - {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "earliest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("earliest"), false}, + {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "earliest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("earliest"), false, false}, // failure, allowIdleConsumers malformed - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "notvalid"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "notvalid"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false}, // success, allowIdleConsumers is true - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), true}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), true, false}, + // failure, excludePersistentLag is malformed + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "excludePersistentLag": "notvalid"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false}, + // success, excludePersistentLag is true + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "excludePersistentLag": "true"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, true}, // success, version supported - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), true}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), true, false}, } var parseKafkaAuthParamsTestDataset = []parseKafkaAuthParamsTestData{ @@ -225,6 +230,9 @@ func TestGetBrokers(t *testing.T) { if err == nil && meta.allowIdleConsumers != testData.allowIdleConsumers { t.Errorf("Expected allowIdleConsumers %t but got %t\n", testData.allowIdleConsumers, meta.allowIdleConsumers) } + if err == nil && meta.excludePersistentLag != testData.excludePersistentLag { + t.Errorf("Expected excludePersistentLag %t but got %t\n", testData.excludePersistentLag, meta.excludePersistentLag) + } } } @@ -282,7 +290,7 @@ func TestKafkaGetMetricSpecForScaling(t *testing.T) { if err != nil { t.Fatal("Could not parse metadata:", err) } - mockKafkaScaler := kafkaScaler{"", meta, nil, nil, logr.Discard()} + mockKafkaScaler := kafkaScaler{"", meta, nil, nil, logr.Discard(), make(map[string]map[int32]int64)} metricSpec := mockKafkaScaler.GetMetricSpecForScaling(context.Background()) metricName := metricSpec[0].External.Metric.Name @@ -309,7 +317,7 @@ func TestGetTopicPartitions(t *testing.T) { if err != nil { t.Fatal("Could not parse metadata:", err) } - mockKafkaScaler := kafkaScaler{"", meta, nil, &MockClusterAdmin{partitionIds: tt.partitionIds}, logr.Discard()} + mockKafkaScaler := kafkaScaler{"", meta, nil, &MockClusterAdmin{partitionIds: tt.partitionIds}, logr.Discard(), make(map[string]map[int32]int64)} patitions, err := mockKafkaScaler.getTopicPartitions() diff --git a/tests/helper/helper.go b/tests/helper/helper.go index 6b2079fbd23..1b25db557bf 100644 --- a/tests/helper/helper.go +++ b/tests/helper/helper.go @@ -550,3 +550,21 @@ func DeletePodsInNamespaceBySelector(t *testing.T, kc *kubernetes.Clientset, sel }) assert.NoErrorf(t, err, "cannot delete pods - %s", err) } + +// Wait for Pods identified by selector to complete termination +func WaitForPodsTerminated(t *testing.T, kc *kubernetes.Clientset, selector, namespace string, + iterations, intervalSeconds int) bool { + for i := 0; i < iterations; i++ { + pods, err := kc.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{LabelSelector: selector}) + if (err != nil && errors.IsNotFound(err)) || len(pods.Items) == 0 { + t.Logf("No pods with label %s", selector) + return true + } + + t.Logf("Waiting for pods with label %s to terminate", selector) + + time.Sleep(time.Duration(intervalSeconds) * time.Second) + } + + return false +} diff --git a/tests/scalers/kafka/kafka_test.go b/tests/scalers/kafka/kafka_test.go index d16598429b0..26acfa52285 100644 --- a/tests/scalers/kafka/kafka_test.go +++ b/tests/scalers/kafka/kafka_test.go @@ -22,19 +22,22 @@ const ( ) var ( - testNamespace = fmt.Sprintf("%s-ns", testName) - deploymentName = fmt.Sprintf("%s-deployment", testName) - kafkaName = fmt.Sprintf("%s-kafka", testName) - kafkaClientName = fmt.Sprintf("%s-client", testName) - scaledObjectName = fmt.Sprintf("%s-so", testName) - bootstrapServer = fmt.Sprintf("%s-kafka-bootstrap.%s:9092", kafkaName, testNamespace) - strimziOperatorVersion = "0.30.0" - topic1 = "kafka-topic" - topic2 = "kafka-topic2" - zeroInvalidOffsetTopic = "kafka-topic-zero-invalid-offset" - oneInvalidOffsetTopic = "kafka-topic-one-invalid-offset" - invalidOffsetGroup = "invalidOffset" - topicPartitions = 3 + testNamespace = fmt.Sprintf("%s-ns", testName) + deploymentName = fmt.Sprintf("%s-deployment", testName) + kafkaName = fmt.Sprintf("%s-kafka", testName) + kafkaClientName = fmt.Sprintf("%s-client", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) + bootstrapServer = fmt.Sprintf("%s-kafka-bootstrap.%s:9092", kafkaName, testNamespace) + strimziOperatorVersion = "0.30.0" + topic1 = "kafka-topic" + topic2 = "kafka-topic2" + zeroInvalidOffsetTopic = "kafka-topic-zero-invalid-offset" + oneInvalidOffsetTopic = "kafka-topic-one-invalid-offset" + invalidOffsetGroup = "invalidOffset" + persistentLagTopic = "kafka-topic-persistent-lag" + persistentLagGroup = "persistentLag" + persistentLagDeploymentGroup = "persistentLagDeploymentGroup" + topicPartitions = 3 ) type templateData struct { @@ -53,6 +56,7 @@ type templateData struct { Params string Commit string ScaleToZeroOnInvalid string + ExcludePersistentLag string } const ( @@ -181,6 +185,43 @@ spec: consumerGroup: {{.ResetPolicy}} lagThreshold: '1' scaleToZeroOnInvalidOffset: '{{.ScaleToZeroOnInvalid}}' + offsetResetPolicy: 'latest'` + + persistentLagScaledObjectTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + pollingInterval: 15 + scaleTargetRef: + name: {{.DeploymentName}} + advanced: + horizontalPodAutoscalerConfig: + behavior: + scaleUp: + stabilizationWindowSeconds: 30 + policies: + - type: Percent + value: 100 + periodSeconds: 15 + scaleDown: + stabilizationWindowSeconds: 30 + policies: + - type: Percent + value: 100 + periodSeconds: 15 + triggers: + - type: kafka + metadata: + topic: {{.TopicName}} + bootstrapServers: {{.BootstrapServer}} + consumerGroup: {{.ResetPolicy}} + lagThreshold: '1' + excludePersistentLag: '{{.ExcludePersistentLag}}' offsetResetPolicy: 'latest'` kafkaClusterTemplate = `apiVersion: kafka.strimzi.io/v1beta2 @@ -261,6 +302,7 @@ func TestScaler(t *testing.T) { addTopic(t, data, topic2, topicPartitions) addTopic(t, data, zeroInvalidOffsetTopic, 1) addTopic(t, data, oneInvalidOffsetTopic, 1) + addTopic(t, data, persistentLagTopic, topicPartitions) // test scaling testEarliestPolicy(t, kc, data) @@ -268,6 +310,7 @@ func TestScaler(t *testing.T) { testMultiTopic(t, kc, data) testZeroOnInvalidOffset(t, kc, data) testOneOnInvalidOffset(t, kc, data) + testPersistentLag(t, kc, data) // cleanup uninstallKafkaOperator(t) @@ -426,6 +469,49 @@ func commitPartition(t *testing.T, topic string, group string) { assert.NoErrorf(t, err, "cannot execute command - %s", err) } +func testPersistentLag(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing persistentLag: no scale out ---") + + // Simulate Consumption from topic by consumer group + // To avoid edge case where where scaling could be effectively disabled (Consumer never makes a commit) + data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", persistentLagTopic, persistentLagGroup) + data.Commit = StringTrue + data.TopicName = persistentLagTopic + data.ResetPolicy = persistentLagGroup + data.ExcludePersistentLag = StringTrue + KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) + KubectlApplyWithTemplate(t, data, "persistentLagScaledObjectTemplate", persistentLagScaledObjectTemplate) + + // Scale application with kafka messages in persistentLagTopic + publishMessage(t, persistentLagTopic) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2), + "replica count should be %d after 2 minute", 1) + // Recreate Deployment to delibrately assign different consumer group to deployment and scaled object + // This is to simulate inability to consume from topic + // Scaled Object remains unchanged + KubernetesScaleDeployment(t, kc, deploymentName, 0, testNamespace) + assert.True(t, WaitForPodsTerminated(t, kc, "app=kafka-consumer", testNamespace, 60, 2), + "pod should be terminated after %d minute", 2) + + data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", persistentLagTopic, persistentLagDeploymentGroup) + KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) + + messages := 5 + for i := 0; i < messages; i++ { + publishMessage(t, persistentLagTopic) + } + + // Persistent Lag should not scale pod above minimum replicas after 2 reconciliation cycles + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2), + "replica count should be %d after 2 minute", 1) + + // Shouldn't scale pods + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 1, 30) + + KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) + KubectlDeleteWithTemplate(t, data, "persistentLagScaledObjectTemplate", persistentLagScaledObjectTemplate) +} + func installKafkaOperator(t *testing.T) { _, err := ExecuteCommand("helm repo add strimzi https://strimzi.io/charts/") assert.NoErrorf(t, err, "cannot execute command - %s", err)