Skip to content

Commit

Permalink
properly log and set correct status if any of ScaleObject's triggers …
Browse files Browse the repository at this point in the history
…are in error state (#2604)

Signed-off-by: Zbynek Roubalik <[email protected]>
  • Loading branch information
zroubalik authored Feb 10, 2022
1 parent ed68797 commit 10db6e1
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 35 deletions.
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ issues:
- path: scale_handler.go
linters:
- gocyclo
- path: scale_scaledobjects.go
linters:
- gocyclo
# Exclude for clustertriggerauthentication_controller and triggerauthentication_controller, reason:
# controllers/clustertriggerauthentication_controller.go:1: 1-59 lines are duplicate of `controllers/triggerauthentication_controller.go:1-58` (dupl)
- path: triggerauthentication_controller.go
Expand Down
6 changes: 3 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,16 @@

### Improvements

- **General:** Fix generation of metric names if any of ScaledObject's triggers is unavailable ([#2592](https://github.com/kedacore/keda/issues/2592))
- **General:** Fix failing tests based on the scale to zero bug ([#2603](https://github.com/kedacore/keda/issues/2603))
- **General**: Fix generation of metric names if any of ScaledObject's triggers is unavailable ([#2592](https://github.com/kedacore/keda/issues/2592))
- **General**: Fix logging in KEDA operator and properly set `ScaledObject.Status` in case there is a problem in a ScaledObject's trigger ([#2603](https://github.com/kedacore/keda/issues/2603))

### Breaking Changes

- TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX))

### Other

- TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX))
- **General:** Fix failing tests based on the scale to zero bug ([#2603](https://github.com/kedacore/keda/issues/2603))

## v2.6.0

Expand Down
7 changes: 7 additions & 0 deletions apis/keda/v1alpha1/condition_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ const (
ConditionFallback ConditionType = "Fallback"
)

const (
// ScaledObjectConditionReadySucccesReason defines the default Reason for correct ScaledObject
ScaledObjectConditionReadySucccesReason = "ScaledObjectReady"
// ScaledObjectConditionReadySuccessMessage defines the default Message for correct ScaledObject
ScaledObjectConditionReadySuccessMessage = "ScaledObject is defined correctly and is ready for scaling"
)

// Condition to store the condition state
type Condition struct {
// Type of condition
Expand Down
4 changes: 2 additions & 2 deletions controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
r.Recorder.Event(scaledObject, corev1.EventTypeNormal, eventreason.ScaledObjectReady, "ScaledObject is ready for scaling")
}
reqLogger.V(1).Info(msg)
conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledObjectReady", msg)
conditions.SetReadyCondition(metav1.ConditionTrue, kedav1alpha1.ScaledObjectConditionReadySucccesReason, msg)
}

if err := kedacontrollerutil.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, &conditions); err != nil {
Expand Down Expand Up @@ -248,7 +248,7 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg
}
logger.Info("Initializing Scaling logic according to ScaledObject Specification")
}
return "ScaledObject is defined correctly and is ready for scaling", nil
return kedav1alpha1.ScaledObjectConditionReadySuccessMessage, nil
}

// ensureScaledObjectLabel ensures that scaledobject.keda.sh/name=<scaledObject.Name> label exist in the ScaledObject
Expand Down
11 changes: 7 additions & 4 deletions pkg/scaling/cache/scalers_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (c *ScalersCache) GetMetricsForScaler(ctx context.Context, id int, metricNa
func (c *ScalersCache) IsScaledObjectActive(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject) (bool, bool, []external_metrics.ExternalMetricValue) {
isActive := false
isError := false
// Let's collect status of all scalers, no matter if any scaler raises error or is active
for i, s := range c.Scalers {
isTriggerActive, err := s.Scaler.IsActive(ctx)
if err != nil {
Expand All @@ -92,19 +93,21 @@ func (c *ScalersCache) IsScaledObjectActive(ctx context.Context, scaledObject *k
}
}

logger := c.Logger.WithValues("scaledobject.Name", scaledObject.Name, "scaledObject.Namespace", scaledObject.Namespace,
"scaleTarget.Name", scaledObject.Spec.ScaleTargetRef.Name)

if err != nil {
c.Logger.V(1).Info("Error getting scale decision", "Error", err)
isError = true
logger.Error(err, "Error getting scale decision")
c.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
} else if isTriggerActive {
isActive = true
if externalMetricsSpec := s.Scaler.GetMetricSpecForScaling(ctx)[0].External; externalMetricsSpec != nil {
c.Logger.V(1).Info("Scaler for scaledObject is active", "Metrics Name", externalMetricsSpec.Metric.Name)
logger.V(1).Info("Scaler for scaledObject is active", "Metrics Name", externalMetricsSpec.Metric.Name)
}
if resourceMetricsSpec := s.Scaler.GetMetricSpecForScaling(ctx)[0].Resource; resourceMetricsSpec != nil {
c.Logger.V(1).Info("Scaler for scaledObject is active", "Metrics Name", resourceMetricsSpec.Name)
logger.V(1).Info("Scaler for scaledObject is active", "Metrics Name", resourceMetricsSpec.Name)
}
break
}
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/scaling/executor/scale_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ func (e *scaleExecutor) setCondition(ctx context.Context, logger logr.Logger, ob
return err
}

func (e *scaleExecutor) setReadyCondition(ctx context.Context, logger logr.Logger, object interface{}, status metav1.ConditionStatus, reason string, message string) error {
active := func(conditions kedav1alpha1.Conditions, status metav1.ConditionStatus, reason string, message string) {
conditions.SetReadyCondition(status, reason, message)
}
return e.setCondition(ctx, logger, object, status, reason, message, active)
}

func (e *scaleExecutor) setActiveCondition(ctx context.Context, logger logr.Logger, object interface{}, status metav1.ConditionStatus, reason string, message string) error {
active := func(conditions kedav1alpha1.Conditions, status metav1.ConditionStatus, reason string, message string) {
conditions.SetActiveCondition(status, reason, message)
Expand Down
34 changes: 34 additions & 0 deletions pkg/scaling/executor/scale_scaledobjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,16 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al
minReplicas = *scaledObject.Spec.MinReplicaCount
}

// if the ScaledObject's triggers aren't in the error state,
// but ScaledObject.Status.ReadyCondition is set not set to 'true' -> set it back to 'true'
readyCondition := scaledObject.Status.Conditions.GetReadyCondition()
if !isError && !readyCondition.IsTrue() {
if err := e.setReadyCondition(ctx, logger, scaledObject, metav1.ConditionFalse,
kedav1alpha1.ScaledObjectConditionReadySucccesReason, kedav1alpha1.ScaledObjectConditionReadySuccessMessage); err != nil {
logger.Error(err, "error setting ready condition")
}
}

if isActive {
switch {
case scaledObject.Spec.IdleReplicaCount != nil && currentReplicas < minReplicas,
Expand All @@ -89,6 +99,17 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al

// Scale the ScaleTarget up
e.scaleFromZeroOrIdle(ctx, logger, scaledObject, currentScale)
case isError:
// some triggers are active, but some responded with error

// Set ScaledObject.Status.ReadyCondition to Unknown
msg := "Some triggers defined in ScaledObject are not working correctly"
logger.V(1).Info(msg)
if !readyCondition.IsUnknown() {
if err := e.setReadyCondition(ctx, logger, scaledObject, metav1.ConditionUnknown, "PartialTriggerError", msg); err != nil {
logger.Error(err, "error setting ready condition")
}
}
default:
// triggers are active, but we didn't need to scale (replica count > 0)

Expand All @@ -109,6 +130,19 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al

// Scale to the fallback replicas count
e.doFallbackScaling(ctx, scaledObject, currentScale, logger, currentReplicas)
case isError && scaledObject.Spec.Fallback == nil:
// there are no active triggers, but a scaler responded with an error
// AND
// there is not a fallback replicas count defined

// Set ScaledObject.Status.ReadyCondition to false
msg := "Triggers defined in ScaledObject are not working correctly"
logger.V(1).Info(msg)
if !readyCondition.IsFalse() {
if err := e.setReadyCondition(ctx, logger, scaledObject, metav1.ConditionFalse, "TriggerError", msg); err != nil {
logger.Error(err, "error setting ready condition")
}
}
case scaledObject.Spec.IdleReplicaCount != nil && currentReplicas > *scaledObject.Spec.IdleReplicaCount,
// there are no active triggers, Idle Replicas mode is enabled
// AND
Expand Down
20 changes: 10 additions & 10 deletions pkg/scaling/executor/scale_scaledobjects_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ func TestScaleToMinReplicasWhenNotActive(t *testing.T) {
mockScaleInterface.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(scale, nil)
mockScaleInterface.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Eq(scale), gomock.Any())

client.EXPECT().Status().Return(statusWriter)
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any())
client.EXPECT().Status().Return(statusWriter).Times(2)
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Times(2)

scaleExecutor.RequestScale(context.TODO(), &scaledObject, false, false)

Expand Down Expand Up @@ -206,8 +206,8 @@ func TestScaleToMinReplicasFromLowerInitialReplicaCount(t *testing.T) {
mockScaleInterface.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(scale, nil)
mockScaleInterface.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Eq(scale), gomock.Any())

client.EXPECT().Status().Return(statusWriter)
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any())
client.EXPECT().Status().Return(statusWriter).Times(2)
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Times(2)

scaleExecutor.RequestScale(context.TODO(), &scaledObject, false, false)

Expand Down Expand Up @@ -265,8 +265,8 @@ func TestScaleFromMinReplicasWhenActive(t *testing.T) {
mockScaleInterface.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(scale, nil)
mockScaleInterface.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Eq(scale), gomock.Any())

client.EXPECT().Status().Times(2).Return(statusWriter)
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Times(2)
client.EXPECT().Status().Times(2).Return(statusWriter).Times(3)
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Times(3)

scaleExecutor.RequestScale(context.TODO(), &scaledObject, true, false)

Expand Down Expand Up @@ -328,8 +328,8 @@ func TestScaleToIdleReplicasWhenNotActive(t *testing.T) {
mockScaleInterface.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(scale, nil)
mockScaleInterface.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Eq(scale), gomock.Any())

client.EXPECT().Status().Return(statusWriter)
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any())
client.EXPECT().Status().Return(statusWriter).Times(2)
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Times(2)

scaleExecutor.RequestScale(context.TODO(), &scaledObject, false, false)

Expand Down Expand Up @@ -389,8 +389,8 @@ func TestScaleFromIdleToMinReplicasWhenActive(t *testing.T) {
mockScaleInterface.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(scale, nil)
mockScaleInterface.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Eq(scale), gomock.Any())

client.EXPECT().Status().Times(2).Return(statusWriter)
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Times(2)
client.EXPECT().Status().Times(2).Return(statusWriter).Times(3)
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Times(3)

scaleExecutor.RequestScale(context.TODO(), &scaledObject, true, false)

Expand Down
52 changes: 36 additions & 16 deletions pkg/scaling/scale_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ func TestCheckScaledObjectScalersWithError(t *testing.T) {
Name: "test",
Namespace: "test",
},
Spec: kedav1alpha1.ScaledObjectSpec{
ScaleTargetRef: &kedav1alpha1.ScaleTarget{
Name: "test",
},
},
}

cache := cache.ScalersCache{
Expand All @@ -71,34 +76,49 @@ func TestCheckScaledObjectScalersWithError(t *testing.T) {
assert.Equal(t, true, isError)
}

func TestCheckScaledObjectFindFirstActiveIgnoringOthers(t *testing.T) {
func TestCheckScaledObjectFindFirstActiveNotIgnoreOthers(t *testing.T) {
ctrl := gomock.NewController(t)
recorder := record.NewFakeRecorder(1)
activeScaler := mock_scalers.NewMockScaler(ctrl)
failingScaler := mock_scalers.NewMockScaler(ctrl)

metricsSpecs := []v2beta2.MetricSpec{createMetricSpec(1)}

activeFactory := func() (scalers.Scaler, error) {
scaler := mock_scalers.NewMockScaler(ctrl)
scaler.EXPECT().IsActive(gomock.Any()).Return(true, nil)
scaler.EXPECT().GetMetricSpecForScaling(gomock.Any()).Times(2).Return(metricsSpecs)
scaler.EXPECT().Close(gomock.Any())
return scaler, nil
}
activeScaler, err := activeFactory()
assert.Nil(t, err)

failingFactory := func() (scalers.Scaler, error) {
scaler := mock_scalers.NewMockScaler(ctrl)
scaler.EXPECT().IsActive(gomock.Any()).Return(false, errors.New("some error"))
scaler.EXPECT().Close(gomock.Any())
return scaler, nil
}
failingScaler, err := failingFactory()
assert.Nil(t, err)

scaledObject := &kedav1alpha1.ScaledObject{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "test",
},
Spec: kedav1alpha1.ScaledObjectSpec{
ScaleTargetRef: &kedav1alpha1.ScaleTarget{
Name: "test",
},
},
}

metricsSpecs := []v2beta2.MetricSpec{createMetricSpec(1)}

activeScaler.EXPECT().IsActive(gomock.Any()).Return(true, nil)
activeScaler.EXPECT().GetMetricSpecForScaling(gomock.Any()).Times(2).Return(metricsSpecs)
activeScaler.EXPECT().Close(gomock.Any())
failingScaler.EXPECT().Close(gomock.Any())

factory := func() (scalers.Scaler, error) {
return mock_scalers.NewMockScaler(ctrl), nil
}
scalers := []cache.ScalerBuilder{{
Scaler: activeScaler,
Factory: factory,
Factory: activeFactory,
}, {
Scaler: failingScaler,
Factory: factory,
Factory: failingFactory,
}}

scalersCache := cache.ScalersCache{
Expand All @@ -111,7 +131,7 @@ func TestCheckScaledObjectFindFirstActiveIgnoringOthers(t *testing.T) {
scalersCache.Close(context.Background())

assert.Equal(t, true, isActive)
assert.Equal(t, false, isError)
assert.Equal(t, true, isError)
}

func createMetricSpec(averageValue int) v2beta2.MetricSpec {
Expand Down

0 comments on commit 10db6e1

Please sign in to comment.