Skip to content

Commit

Permalink
Added alternative expression field in AWS CloudWatch scaler (#2997)
Browse files Browse the repository at this point in the history
* Added alternative expression field in AWS CloudWatch scaler

Signed-off-by: Dekel Barzilay <[email protected]>

* Updated CHANGELOG with AWS CloudWatch expression improvement

Signed-off-by: Dekel Barzilay <[email protected]>

* Updated CHANGELOG with AWS CloudWatch expression improvement

Signed-off-by: Dekel Barzilay <[email protected]>
  • Loading branch information
dekelev authored May 4, 2022
1 parent e1e893e commit 1d6aedb
Show file tree
Hide file tree
Showing 5 changed files with 296 additions and 80 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ To learn more about our roadmap, we recommend reading [this document](ROADMAP.md
- **General:** Updated HTTPClient to be proxy-aware, if available, from environment variables. ([#2577](https://github.com/kedacore/keda/issues/2577))
- **General:** Using manager client in KEDA Metrics Server to avoid flush request to Kubernetes Apiserver([2914](https://github.com/kedacore/keda/issues/2914))
- **ActiveMQ Scaler:** Add CorsHeader information to ActiveMQ Scaler ([#2884](https://github.com/kedacore/keda/issues/2884))
- **AWS CloudWatch:** Add support to use expressions([#2998](https://github.com/kedacore/keda/issues/2998))
- **Azure Application Insights Scaler:** Provide support for non-public clouds ([#2735](https://github.com/kedacore/keda/issues/2735))
- **Azure Blob Storage Scaler:** Add optional parameters for counting blobs recursively ([#1789](https://github.com/kedacore/keda/issues/1789))
- **Azure Event Hub Scaler:** Improve logging when blob container not found ([#2363](https://github.com/kedacore/keda/issues/2363))
Expand Down
161 changes: 90 additions & 71 deletions pkg/scalers/aws_cloudwatch_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ type awsCloudwatchMetadata struct {
metricsName string
dimensionName []string
dimensionValue []string
expression string

targetMetricValue float64
minMetricValue float64
targetMetricValue int64
minMetricValue int64

metricCollectionTime int64
metricStat string
Expand Down Expand Up @@ -95,22 +96,6 @@ func getIntMetadataValue(metadata map[string]string, key string, required bool,
return defaultValue, nil
}

func getFloatMetadataValue(metadata map[string]string, key string, required bool, defaultValue float64) (float64, error) {
if val, ok := metadata[key]; ok && val != "" {
value, err := strconv.ParseFloat(val, 64)
if err != nil {
return 0, fmt.Errorf("error parsing %s metadata: %v", key, err)
}
return value, nil
}

if required {
return 0, fmt.Errorf("metadata %s not given", key)
}

return defaultValue, nil
}

func createCloudwatchClient(metadata *awsCloudwatchMetadata) *cloudwatch.CloudWatch {
sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String(metadata.awsRegion),
Expand Down Expand Up @@ -153,28 +138,41 @@ func parseAwsCloudwatchMetadata(config *ScalerConfig) (*awsCloudwatchMetadata, e
return nil, fmt.Errorf("metric name not given")
}

if val, ok := config.TriggerMetadata["dimensionName"]; ok && val != "" {
meta.dimensionName = strings.Split(val, ";")
if config.TriggerMetadata["expression"] != "" {
if val, ok := config.TriggerMetadata["expression"]; ok && val != "" {
meta.expression = val
} else {
return nil, fmt.Errorf("expression not given")
}
} else {
return nil, fmt.Errorf("dimension name not given")
}
if val, ok := config.TriggerMetadata["dimensionName"]; ok && val != "" {
meta.dimensionName = strings.Split(val, ";")
} else {
return nil, fmt.Errorf("dimension name not given")
}

if val, ok := config.TriggerMetadata["dimensionValue"]; ok && val != "" {
meta.dimensionValue = strings.Split(val, ";")
} else {
return nil, fmt.Errorf("dimension value not given")
}
if val, ok := config.TriggerMetadata["dimensionValue"]; ok && val != "" {
meta.dimensionValue = strings.Split(val, ";")
} else {
return nil, fmt.Errorf("dimension value not given")
}

if len(meta.dimensionName) != len(meta.dimensionValue) {
return nil, fmt.Errorf("dimensionName and dimensionValue are not matching in size")
}

if len(meta.dimensionName) != len(meta.dimensionValue) {
return nil, fmt.Errorf("dimensionName and dimensionValue are not matching in size")
meta.metricUnit = config.TriggerMetadata["metricUnit"]
if err = checkMetricUnit(meta.metricUnit); err != nil {
return nil, err
}
}

meta.targetMetricValue, err = getFloatMetadataValue(config.TriggerMetadata, "targetMetricValue", true, 0)
meta.targetMetricValue, err = getIntMetadataValue(config.TriggerMetadata, "targetMetricValue", true, 0)
if err != nil {
return nil, err
}

meta.minMetricValue, err = getFloatMetadataValue(config.TriggerMetadata, "minMetricValue", true, 0)
meta.minMetricValue, err = getIntMetadataValue(config.TriggerMetadata, "minMetricValue", true, 0)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -210,11 +208,6 @@ func parseAwsCloudwatchMetadata(config *ScalerConfig) (*awsCloudwatchMetadata, e
return nil, err
}

meta.metricUnit = config.TriggerMetadata["metricUnit"]
if err = checkMetricUnit(meta.metricUnit); err != nil {
return nil, err
}

if val, ok := config.TriggerMetadata["awsRegion"]; ok && val != "" {
meta.awsRegion = val
} else {
Expand Down Expand Up @@ -287,19 +280,27 @@ func (c *awsCloudwatchScaler) GetMetrics(ctx context.Context, metricName string,

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(metricValue), resource.DecimalSI),
Value: *resource.NewQuantity(metricValue, resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

func (c *awsCloudwatchScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
var metricNameSuffix string

if c.metadata.expression != "" {
metricNameSuffix = c.metadata.metricsName
} else {
metricNameSuffix = c.metadata.dimensionName[0]
}

externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(c.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("aws-cloudwatch-%s", c.metadata.dimensionName[0]))),
Name: GenerateMetricNameWithIndex(c.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("aws-cloudwatch-%s", metricNameSuffix))),
},
Target: GetMetricTarget(c.metricType, int64(c.metadata.targetMetricValue)),
Target: GetMetricTarget(c.metricType, c.metadata.targetMetricValue),
}
metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2beta2.MetricSpec{metricSpec}
Expand All @@ -319,42 +320,60 @@ func (c *awsCloudwatchScaler) Close(context.Context) error {
return nil
}

func (c *awsCloudwatchScaler) GetCloudwatchMetrics() (float64, error) {
dimensions := []*cloudwatch.Dimension{}
for i := range c.metadata.dimensionName {
dimensions = append(dimensions, &cloudwatch.Dimension{
Name: &c.metadata.dimensionName[i],
Value: &c.metadata.dimensionValue[i],
})
}
func (c *awsCloudwatchScaler) GetCloudwatchMetrics() (int64, error) {
var input cloudwatch.GetMetricDataInput

startTime, endTime := computeQueryWindow(time.Now(), c.metadata.metricStatPeriod, c.metadata.metricEndTimeOffset, c.metadata.metricCollectionTime)

var metricUnit *string
if c.metadata.metricUnit != "" {
metricUnit = aws.String(c.metadata.metricUnit)
}

input := cloudwatch.GetMetricDataInput{
StartTime: aws.Time(startTime),
EndTime: aws.Time(endTime),
ScanBy: aws.String(cloudwatch.ScanByTimestampDescending),
MetricDataQueries: []*cloudwatch.MetricDataQuery{
{
Id: aws.String("c1"),
MetricStat: &cloudwatch.MetricStat{
Metric: &cloudwatch.Metric{
Namespace: aws.String(c.metadata.namespace),
Dimensions: dimensions,
MetricName: aws.String(c.metadata.metricsName),
if c.metadata.expression != "" {
input = cloudwatch.GetMetricDataInput{
StartTime: aws.Time(startTime),
EndTime: aws.Time(endTime),
ScanBy: aws.String(cloudwatch.ScanByTimestampDescending),
MetricDataQueries: []*cloudwatch.MetricDataQuery{
{
Expression: aws.String(c.metadata.expression),
Id: aws.String("q1"),
Period: aws.Int64(c.metadata.metricStatPeriod),
Label: aws.String(c.metadata.metricsName),
},
},
}
} else {
dimensions := []*cloudwatch.Dimension{}
for i := range c.metadata.dimensionName {
dimensions = append(dimensions, &cloudwatch.Dimension{
Name: &c.metadata.dimensionName[i],
Value: &c.metadata.dimensionValue[i],
})
}

var metricUnit *string
if c.metadata.metricUnit != "" {
metricUnit = aws.String(c.metadata.metricUnit)
}

input = cloudwatch.GetMetricDataInput{
StartTime: aws.Time(startTime),
EndTime: aws.Time(endTime),
ScanBy: aws.String(cloudwatch.ScanByTimestampDescending),
MetricDataQueries: []*cloudwatch.MetricDataQuery{
{
Id: aws.String("c1"),
MetricStat: &cloudwatch.MetricStat{
Metric: &cloudwatch.Metric{
Namespace: aws.String(c.metadata.namespace),
Dimensions: dimensions,
MetricName: aws.String(c.metadata.metricsName),
},
Period: aws.Int64(c.metadata.metricStatPeriod),
Stat: aws.String(c.metadata.metricStat),
Unit: metricUnit,
},
Period: aws.Int64(c.metadata.metricStatPeriod),
Stat: aws.String(c.metadata.metricStat),
Unit: metricUnit,
ReturnData: aws.Bool(true),
},
ReturnData: aws.Bool(true),
},
},
}
}

output, err := c.cwClient.GetMetricData(&input)
Expand All @@ -365,9 +384,9 @@ func (c *awsCloudwatchScaler) GetCloudwatchMetrics() (float64, error) {
}

cloudwatchLog.V(1).Info("Received Metric Data", "data", output)
var metricValue float64
var metricValue int64
if len(output.MetricDataResults) > 0 && len(output.MetricDataResults[0].Values) > 0 {
metricValue = *output.MetricDataResults[0].Values[0]
metricValue = int64(*output.MetricDataResults[0].Values[0])
} else {
cloudwatchLog.Info("empty metric data received, returning minMetricValue")
metricValue = c.metadata.minMetricValue
Expand Down
44 changes: 37 additions & 7 deletions pkg/scalers/aws_cloudwatch_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,17 @@ var testAWSCloudwatchMetadata = []parseAWSCloudwatchMetadataTestData{
testAWSAuthentication,
false,
"properly formed cloudwatch query and awsRegion"},
// properly formed cloudwatch expression query and awsRegion
{map[string]string{
"namespace": "AWS/SQS",
"expression": "SELECT MIN(MessageCount) FROM \"AWS/AmazonMQ\" WHERE Broker = 'production' and Queue = 'worker'",
"metricName": "ApproximateNumberOfMessagesVisible",
"targetMetricValue": "2",
"minMetricValue": "0",
"awsRegion": "eu-west-1"},
testAWSAuthentication,
false,
"properly formed cloudwatch expression query and awsRegion"},
// Properly formed cloudwatch query with optional parameters
{map[string]string{
"namespace": "AWS/SQS",
Expand Down Expand Up @@ -343,6 +354,7 @@ var testAWSCloudwatchMetadata = []parseAWSCloudwatchMetadataTestData{
var awsCloudwatchMetricIdentifiers = []awsCloudwatchMetricIdentifier{
{&testAWSCloudwatchMetadata[1], 0, "s0-aws-cloudwatch-QueueName"},
{&testAWSCloudwatchMetadata[1], 3, "s3-aws-cloudwatch-QueueName"},
{&testAWSCloudwatchMetadata[2], 5, "s5-aws-cloudwatch-ApproximateNumberOfMessagesVisible"},
}

var awsCloudwatchGetMetricTestData = []awsCloudwatchMetadata{
Expand Down Expand Up @@ -410,21 +422,39 @@ var awsCloudwatchGetMetricTestData = []awsCloudwatchMetadata{
awsAuthorization: awsAuthorizationMetadata{podIdentityOwner: false},
scalerIndex: 0,
},
{
namespace: "Custom",
metricsName: "HasDataFromExpression",
expression: "SELECT MIN(MessageCount) FROM \"AWS/AmazonMQ\" WHERE Broker = 'production' and Queue = 'worker'",
targetMetricValue: 100,
minMetricValue: 0,
metricCollectionTime: 60,
metricStat: "Average",
metricUnit: "SampleCount",
metricStatPeriod: 60,
metricEndTimeOffset: 60,
awsRegion: "us-west-2",
awsAuthorization: awsAuthorizationMetadata{podIdentityOwner: false},
scalerIndex: 0,
},
}

type mockCloudwatch struct {
cloudwatchiface.CloudWatchAPI
}

func (m *mockCloudwatch) GetMetricData(input *cloudwatch.GetMetricDataInput) (*cloudwatch.GetMetricDataOutput, error) {
switch *input.MetricDataQueries[0].MetricStat.Metric.MetricName {
case testAWSCloudwatchErrorMetric:
return nil, errors.New("error")
case testAWSCloudwatchNoValueMetric:
return &cloudwatch.GetMetricDataOutput{
MetricDataResults: []*cloudwatch.MetricDataResult{},
}, nil
if input.MetricDataQueries[0].MetricStat != nil {
switch *input.MetricDataQueries[0].MetricStat.Metric.MetricName {
case testAWSCloudwatchErrorMetric:
return nil, errors.New("error")
case testAWSCloudwatchNoValueMetric:
return &cloudwatch.GetMetricDataOutput{
MetricDataResults: []*cloudwatch.MetricDataResult{},
}, nil
}
}

return &cloudwatch.GetMetricDataOutput{
MetricDataResults: []*cloudwatch.MetricDataResult{
{
Expand Down
Loading

0 comments on commit 1d6aedb

Please sign in to comment.