From 3cdac48e8652a0187061157e6102f0887e7aaf2c Mon Sep 17 00:00:00 2001 From: Sanskar Jaiswal Date: Tue, 10 May 2022 15:10:11 +0530 Subject: [PATCH] Avoid scaling triggered by events/messages when ScaledObject is paused (#3011) --- CHANGELOG.md | 1 + pkg/scaling/executor/scale_scaledobjects.go | 35 +++++++++++-------- tests/scalers/azure-queue-pause.test.ts | 38 ++++++++++++++++++++- 3 files changed, 58 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 523ce87e16e..dbeee8100bc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -64,6 +64,7 @@ To learn more about our roadmap, we recommend reading [this document](ROADMAP.md - **General**: Fix CVE-2022-21221 in `github.com/valyala/fasthttp` ([#2775](https://github.com/kedacore/keda/issue/2775)) - **General**: Bump Golang to 1.17.9 ([#3016](https://github.com/kedacore/keda/issues/3016)) +- **General**: Fix autoscaling behaviour while paused. ([#3009](https://github.com/kedacore/keda/issues/3009)) ## v2.7.0 diff --git a/pkg/scaling/executor/scale_scaledobjects.go b/pkg/scaling/executor/scale_scaledobjects.go index 25797b43ceb..b3e2db9fcb8 100644 --- a/pkg/scaling/executor/scale_scaledobjects.go +++ b/pkg/scaling/executor/scale_scaledobjects.go @@ -93,23 +93,26 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al } status := scaledObject.Status.DeepCopy() - if pausedCount != nil && *pausedCount != currentReplicas && status.PausedReplicaCount == nil { - _, err := e.updateScaleOnScaleTarget(ctx, scaledObject, currentScale, *pausedCount) - if err != nil { - logger.Error(err, "error scaling target to paused replicas count", "paused replicas", *pausedCount) - if err := e.setReadyCondition(ctx, logger, scaledObject, metav1.ConditionUnknown, - kedav1alpha1.ScaledObjectConditionReadySucccesReason, kedav1alpha1.ScaledObjectConditionReadySuccessMessage); err != nil { - logger.Error(err, "error setting ready condition") + if pausedCount != nil { + // Scale the target to the paused replica count + if *pausedCount != currentReplicas { + _, err := e.updateScaleOnScaleTarget(ctx, scaledObject, currentScale, *pausedCount) + if err != nil { + logger.Error(err, "error scaling target to paused replicas count", "paused replicas", *pausedCount) + if err := e.setReadyCondition(ctx, logger, scaledObject, metav1.ConditionUnknown, + kedav1alpha1.ScaledObjectConditionReadySucccesReason, kedav1alpha1.ScaledObjectConditionReadySuccessMessage); err != nil { + logger.Error(err, "error setting ready condition") + } + return } - return - } - status.PausedReplicaCount = pausedCount - err = kedacontrollerutil.UpdateScaledObjectStatus(ctx, e.client, logger, scaledObject, status) - if err != nil { - logger.Error(err, "error updating status paused replica count") - return + status.PausedReplicaCount = pausedCount + err = kedacontrollerutil.UpdateScaledObjectStatus(ctx, e.client, logger, scaledObject, status) + if err != nil { + logger.Error(err, "error updating status paused replica count") + return + } + logger.Info("Successfully scaled target to paused replicas count", "paused replicas", *pausedCount) } - logger.Info("Successfully scaled target to paused replicas count", "paused replicas", *pausedCount) return } @@ -355,6 +358,8 @@ func getIdleOrMinimumReplicaCount(scaledObject *kedav1alpha1.ScaledObject) (bool return false, *scaledObject.Spec.MinReplicaCount } +// GetPausedReplicaCount returns the paused replica count of the ScaledObject. +// If not paused, it returns nil. func GetPausedReplicaCount(scaledObject *kedav1alpha1.ScaledObject) (*int32, error) { if scaledObject.Annotations != nil { if val, ok := scaledObject.Annotations[kedacontrollerutil.PausedReplicasAnnotation]; ok { diff --git a/tests/scalers/azure-queue-pause.test.ts b/tests/scalers/azure-queue-pause.test.ts index 7624e5b6fc7..777bed7b669 100644 --- a/tests/scalers/azure-queue-pause.test.ts +++ b/tests/scalers/azure-queue-pause.test.ts @@ -4,7 +4,7 @@ import * as azure from 'azure-storage' import * as sh from 'shelljs' import * as tmp from 'tmp' import test from 'ava' -import {createNamespace, waitForDeploymentReplicaCount} from "./helpers"; +import {createNamespace, waitForDeploymentReplicaCount, sleep} from "./helpers"; const testNamespace = 'pause-test' const deploymentFile = tmp.fileSync() @@ -60,6 +60,24 @@ test.serial( } ) +test.serial.cb( + 'Deployment should remain at pausedReplicaCount (0) even with messages on storage', + t => { + const queueSvc = azure.createQueueService(connectionString) + queueSvc.messageEncoder = new azure.QueueMessageEncoder.TextBase64QueueMessageEncoder() + async.mapLimit( + Array(1000).keys(), + 20, + (n, cb) => queueSvc.createMessage(queueName, `test ${n}`, cb), + async () => { + t.true(await checkIfReplicaCountGreater(0, 'test-deployment', testNamespace, 60, 1000), 'replica count remain 0 after 1 minute') + queueSvc.clearMessages(queueName, _ => {}) + t.end() + } + ) + } +) + test.serial(`Updsating ScaledObject (without annotation) should work`, async t => { fs.writeFileSync(scaledObjectFile.name, scaledObjectYaml) t.is( @@ -118,6 +136,24 @@ test.after.always.cb('clean up workload test related deployments', t => { t.end() }) + +// checks if the current replica count is greater than the given target count for a given interval. +// returns false if it is greater, otherwise true. +async function checkIfReplicaCountGreater(target: number, name: string, namespace: string, iterations = 10, interval = 3000): Promise { + for (let i = 0; i < iterations; i++) { + let replicaCountStr = sh.exec(`kubectl get deployment.apps/${name} --namespace ${namespace} -o jsonpath="{.spec.replicas}"`).stdout + try { + const replicaCount = parseInt(replicaCountStr, 10) + if (replicaCount > target) { + return false + } + } catch { } + + await sleep(interval) + } + return true +} + const deployYaml = `apiVersion: v1 kind: Secret metadata: