Skip to content

Commit 2adbc81

Browse files
authored
Refactor AWS DynamoDB Streams scaler configuration (kedacore#6351)
* Refactor AWS DynamoDB Streams scaler configuration Signed-off-by: Omer Aplatony <[email protected]> * fixed unit tests Signed-off-by: Omer Aplatony <[email protected]> * Fix invalid value test Signed-off-by: Omer Aplatony <[email protected]> * go fmt Signed-off-by: Omer Aplatony <[email protected]> --------- Signed-off-by: Omer Aplatony <[email protected]>
1 parent 5c58849 commit 2adbc81

File tree

2 files changed

+55
-98
lines changed

2 files changed

+55
-98
lines changed

Diff for: pkg/scalers/aws_dynamodb_streams_scaler.go

+18-51
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package scalers
33
import (
44
"context"
55
"fmt"
6-
"strconv"
76

87
"github.com/aws/aws-sdk-go-v2/aws"
98
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
@@ -31,11 +30,11 @@ type awsDynamoDBStreamsScaler struct {
3130
}
3231

3332
type awsDynamoDBStreamsMetadata struct {
34-
targetShardCount int64
35-
activationTargetShardCount int64
36-
tableName string
37-
awsRegion string
38-
awsEndpoint string
33+
TargetShardCount int64 `keda:"name=shardCount, order=triggerMetadata, default=2"`
34+
ActivationTargetShardCount int64 `keda:"name=activationShardCount, order=triggerMetadata, default=0"`
35+
TableName string `keda:"name=tableName, order=triggerMetadata"`
36+
AwsRegion string `keda:"name=awsRegion, order=triggerMetadata"`
37+
AwsEndpoint string `keda:"name=awsEndpoint, order=triggerMetadata, optional"`
3938
awsAuthorization awsutils.AuthorizationMetadata
4039
triggerIndex int
4140
}
@@ -49,7 +48,7 @@ func NewAwsDynamoDBStreamsScaler(ctx context.Context, config *scalersconfig.Scal
4948

5049
logger := InitializeLogger(config, "aws_dynamodb_streams_scaler")
5150

52-
meta, err := parseAwsDynamoDBStreamsMetadata(config, logger)
51+
meta, err := parseAwsDynamoDBStreamsMetadata(config)
5352
if err != nil {
5453
return nil, fmt.Errorf("error parsing dynamodb stream metadata: %w", err)
5554
}
@@ -58,7 +57,7 @@ func NewAwsDynamoDBStreamsScaler(ctx context.Context, config *scalersconfig.Scal
5857
if err != nil {
5958
return nil, fmt.Errorf("error when creating dynamodbstream client: %w", err)
6059
}
61-
streamArn, err := getDynamoDBStreamsArn(ctx, dbClient, &meta.tableName)
60+
streamArn, err := getDynamoDBStreamsArn(ctx, dbClient, &meta.TableName)
6261
if err != nil {
6362
return nil, fmt.Errorf("error dynamodb stream arn: %w", err)
6463
}
@@ -74,43 +73,11 @@ func NewAwsDynamoDBStreamsScaler(ctx context.Context, config *scalersconfig.Scal
7473
}, nil
7574
}
7675

77-
func parseAwsDynamoDBStreamsMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) (*awsDynamoDBStreamsMetadata, error) {
76+
func parseAwsDynamoDBStreamsMetadata(config *scalersconfig.ScalerConfig) (*awsDynamoDBStreamsMetadata, error) {
7877
meta := awsDynamoDBStreamsMetadata{}
79-
meta.targetShardCount = defaultTargetDBStreamsShardCount
8078

81-
if val, ok := config.TriggerMetadata["awsRegion"]; ok && val != "" {
82-
meta.awsRegion = val
83-
} else {
84-
return nil, fmt.Errorf("no awsRegion given")
85-
}
86-
87-
if val, ok := config.TriggerMetadata["awsEndpoint"]; ok {
88-
meta.awsEndpoint = val
89-
}
90-
91-
if val, ok := config.TriggerMetadata["tableName"]; ok && val != "" {
92-
meta.tableName = val
93-
} else {
94-
return nil, fmt.Errorf("no tableName given")
95-
}
96-
97-
if val, ok := config.TriggerMetadata["shardCount"]; ok && val != "" {
98-
shardCount, err := strconv.ParseInt(val, 10, 64)
99-
if err != nil {
100-
meta.targetShardCount = defaultTargetDBStreamsShardCount
101-
logger.Error(err, "error parsing dyanmodb stream metadata shardCount, using default %n", defaultTargetDBStreamsShardCount)
102-
} else {
103-
meta.targetShardCount = shardCount
104-
}
105-
}
106-
if val, ok := config.TriggerMetadata["activationShardCount"]; ok && val != "" {
107-
shardCount, err := strconv.ParseInt(val, 10, 64)
108-
if err != nil {
109-
meta.activationTargetShardCount = defaultActivationTargetDBStreamsShardCount
110-
logger.Error(err, "error parsing dyanmodb stream metadata activationTargetShardCount, using default %n", defaultActivationTargetDBStreamsShardCount)
111-
} else {
112-
meta.activationTargetShardCount = shardCount
113-
}
79+
if err := config.TypedConfig(&meta); err != nil {
80+
return nil, fmt.Errorf("error parsing dynamodb stream metadata: %w", err)
11481
}
11582

11683
auth, err := awsutils.GetAwsAuthorization(config.TriggerUniqueKey, config.PodIdentity, config.TriggerMetadata, config.AuthParams, config.ResolvedEnv)
@@ -125,18 +92,18 @@ func parseAwsDynamoDBStreamsMetadata(config *scalersconfig.ScalerConfig, logger
12592
}
12693

12794
func createClientsForDynamoDBStreamsScaler(ctx context.Context, metadata *awsDynamoDBStreamsMetadata) (*dynamodb.Client, *dynamodbstreams.Client, error) {
128-
cfg, err := awsutils.GetAwsConfig(ctx, metadata.awsRegion, metadata.awsAuthorization)
95+
cfg, err := awsutils.GetAwsConfig(ctx, metadata.AwsRegion, metadata.awsAuthorization)
12996
if err != nil {
13097
return nil, nil, err
13198
}
13299
dbClient := dynamodb.NewFromConfig(*cfg, func(options *dynamodb.Options) {
133-
if metadata.awsEndpoint != "" {
134-
options.BaseEndpoint = aws.String(metadata.awsEndpoint)
100+
if metadata.AwsEndpoint != "" {
101+
options.BaseEndpoint = aws.String(metadata.AwsEndpoint)
135102
}
136103
})
137104
dbStreamClient := dynamodbstreams.NewFromConfig(*cfg, func(options *dynamodbstreams.Options) {
138-
if metadata.awsEndpoint != "" {
139-
options.BaseEndpoint = aws.String(metadata.awsEndpoint)
105+
if metadata.AwsEndpoint != "" {
106+
options.BaseEndpoint = aws.String(metadata.AwsEndpoint)
140107
}
141108
})
142109

@@ -176,9 +143,9 @@ func (s *awsDynamoDBStreamsScaler) Close(_ context.Context) error {
176143
func (s *awsDynamoDBStreamsScaler) GetMetricSpecForScaling(_ context.Context) []v2.MetricSpec {
177144
externalMetric := &v2.ExternalMetricSource{
178145
Metric: v2.MetricIdentifier{
179-
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("aws-dynamodb-streams-%s", s.metadata.tableName))),
146+
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("aws-dynamodb-streams-%s", s.metadata.TableName))),
180147
},
181-
Target: GetMetricTarget(s.metricType, s.metadata.targetShardCount),
148+
Target: GetMetricTarget(s.metricType, s.metadata.TargetShardCount),
182149
}
183150
metricSpec := v2.MetricSpec{External: externalMetric, Type: externalMetricType}
184151
return []v2.MetricSpec{metricSpec}
@@ -195,7 +162,7 @@ func (s *awsDynamoDBStreamsScaler) GetMetricsAndActivity(ctx context.Context, me
195162

196163
metric := GenerateMetricInMili(metricName, float64(shardCount))
197164

198-
return []external_metrics.ExternalMetricValue{metric}, shardCount > s.metadata.activationTargetShardCount, nil
165+
return []external_metrics.ExternalMetricValue{metric}, shardCount > s.metadata.ActivationTargetShardCount, nil
199166
}
200167

201168
// GetDynamoDBStreamShardCount Get DynamoDB Stream Shard Count

Diff for: pkg/scalers/aws_dynamodb_streams_scaler_test.go

+37-47
Original file line numberDiff line numberDiff line change
@@ -135,10 +135,10 @@ var testAwsDynamoDBStreamMetadata = []parseAwsDynamoDBStreamsMetadataTestData{
135135
"awsRegion": testAWSDynamoDBStreamsRegion},
136136
authParams: testAWSKinesisAuthentication,
137137
expected: &awsDynamoDBStreamsMetadata{
138-
targetShardCount: 2,
139-
activationTargetShardCount: 1,
140-
tableName: testAWSDynamoDBSmallTable,
141-
awsRegion: testAWSDynamoDBStreamsRegion,
138+
TargetShardCount: 2,
139+
ActivationTargetShardCount: 1,
140+
TableName: testAWSDynamoDBSmallTable,
141+
AwsRegion: testAWSDynamoDBStreamsRegion,
142142
awsAuthorization: awsutils.AuthorizationMetadata{
143143
AwsAccessKeyID: testAWSDynamoDBStreamsAccessKeyID,
144144
AwsSecretAccessKey: testAWSDynamoDBStreamsSecretAccessKey,
@@ -159,11 +159,11 @@ var testAwsDynamoDBStreamMetadata = []parseAwsDynamoDBStreamsMetadataTestData{
159159
"awsEndpoint": testAWSDynamoDBStreamsEndpoint},
160160
authParams: testAWSKinesisAuthentication,
161161
expected: &awsDynamoDBStreamsMetadata{
162-
targetShardCount: 2,
163-
activationTargetShardCount: 1,
164-
tableName: testAWSDynamoDBSmallTable,
165-
awsRegion: testAWSDynamoDBStreamsRegion,
166-
awsEndpoint: testAWSDynamoDBStreamsEndpoint,
162+
TargetShardCount: 2,
163+
ActivationTargetShardCount: 1,
164+
TableName: testAWSDynamoDBSmallTable,
165+
AwsRegion: testAWSDynamoDBStreamsRegion,
166+
AwsEndpoint: testAWSDynamoDBStreamsEndpoint,
167167
awsAuthorization: awsutils.AuthorizationMetadata{
168168
AwsAccessKeyID: testAWSDynamoDBStreamsAccessKeyID,
169169
AwsSecretAccessKey: testAWSDynamoDBStreamsSecretAccessKey,
@@ -204,10 +204,10 @@ var testAwsDynamoDBStreamMetadata = []parseAwsDynamoDBStreamsMetadataTestData{
204204
"awsRegion": testAWSDynamoDBStreamsRegion},
205205
authParams: testAWSKinesisAuthentication,
206206
expected: &awsDynamoDBStreamsMetadata{
207-
targetShardCount: defaultTargetDBStreamsShardCount,
208-
activationTargetShardCount: defaultActivationTargetDBStreamsShardCount,
209-
tableName: testAWSDynamoDBSmallTable,
210-
awsRegion: testAWSDynamoDBStreamsRegion,
207+
TargetShardCount: defaultTargetDBStreamsShardCount,
208+
ActivationTargetShardCount: defaultActivationTargetDBStreamsShardCount,
209+
TableName: testAWSDynamoDBSmallTable,
210+
AwsRegion: testAWSDynamoDBStreamsRegion,
211211
awsAuthorization: awsutils.AuthorizationMetadata{
212212
AwsAccessKeyID: testAWSDynamoDBStreamsAccessKeyID,
213213
AwsSecretAccessKey: testAWSDynamoDBStreamsSecretAccessKey,
@@ -224,20 +224,10 @@ var testAwsDynamoDBStreamMetadata = []parseAwsDynamoDBStreamsMetadataTestData{
224224
"tableName": testAWSDynamoDBSmallTable,
225225
"shardCount": "a",
226226
"awsRegion": testAWSDynamoDBStreamsRegion},
227-
authParams: testAWSKinesisAuthentication,
228-
expected: &awsDynamoDBStreamsMetadata{
229-
targetShardCount: defaultTargetDBStreamsShardCount,
230-
tableName: testAWSDynamoDBSmallTable,
231-
awsRegion: testAWSDynamoDBStreamsRegion,
232-
awsAuthorization: awsutils.AuthorizationMetadata{
233-
AwsAccessKeyID: testAWSDynamoDBStreamsAccessKeyID,
234-
AwsSecretAccessKey: testAWSDynamoDBStreamsSecretAccessKey,
235-
PodIdentityOwner: true,
236-
},
237-
triggerIndex: 4,
238-
},
239-
isError: false,
240-
comment: "properly formed table name and region, wrong shard count",
227+
authParams: testAWSKinesisAuthentication,
228+
expected: &awsDynamoDBStreamsMetadata{},
229+
isError: true,
230+
comment: "invalid value - should cause error",
241231
triggerIndex: 4,
242232
},
243233
{
@@ -278,9 +268,9 @@ var testAwsDynamoDBStreamMetadata = []parseAwsDynamoDBStreamsMetadataTestData{
278268
"awsSessionToken": testAWSDynamoDBStreamsSessionToken,
279269
},
280270
expected: &awsDynamoDBStreamsMetadata{
281-
targetShardCount: 2,
282-
tableName: testAWSDynamoDBSmallTable,
283-
awsRegion: testAWSDynamoDBStreamsRegion,
271+
TargetShardCount: 2,
272+
TableName: testAWSDynamoDBSmallTable,
273+
AwsRegion: testAWSDynamoDBStreamsRegion,
284274
awsAuthorization: awsutils.AuthorizationMetadata{
285275
AwsAccessKeyID: testAWSDynamoDBStreamsAccessKeyID,
286276
AwsSecretAccessKey: testAWSDynamoDBStreamsSecretAccessKey,
@@ -330,9 +320,9 @@ var testAwsDynamoDBStreamMetadata = []parseAwsDynamoDBStreamsMetadataTestData{
330320
"awsRoleArn": testAWSDynamoDBStreamsRoleArn,
331321
},
332322
expected: &awsDynamoDBStreamsMetadata{
333-
targetShardCount: 2,
334-
tableName: testAWSDynamoDBSmallTable,
335-
awsRegion: testAWSDynamoDBStreamsRegion,
323+
TargetShardCount: 2,
324+
TableName: testAWSDynamoDBSmallTable,
325+
AwsRegion: testAWSDynamoDBStreamsRegion,
336326
awsAuthorization: awsutils.AuthorizationMetadata{
337327
AwsRoleArn: testAWSDynamoDBStreamsRoleArn,
338328
PodIdentityOwner: true,
@@ -350,9 +340,9 @@ var testAwsDynamoDBStreamMetadata = []parseAwsDynamoDBStreamsMetadataTestData{
350340
"identityOwner": "operator"},
351341
authParams: map[string]string{},
352342
expected: &awsDynamoDBStreamsMetadata{
353-
targetShardCount: 2,
354-
tableName: testAWSDynamoDBSmallTable,
355-
awsRegion: testAWSDynamoDBStreamsRegion,
343+
TargetShardCount: 2,
344+
TableName: testAWSDynamoDBSmallTable,
345+
AwsRegion: testAWSDynamoDBStreamsRegion,
356346
awsAuthorization: awsutils.AuthorizationMetadata{
357347
PodIdentityOwner: false,
358348
},
@@ -370,15 +360,15 @@ var awsDynamoDBStreamMetricIdentifiers = []awsDynamoDBStreamsMetricIdentifier{
370360
}
371361

372362
var awsDynamoDBStreamsGetMetricTestData = []*awsDynamoDBStreamsMetadata{
373-
{tableName: testAWSDynamoDBBigTable},
374-
{tableName: testAWSDynamoDBSmallTable},
375-
{tableName: testAWSDynamoDBErrorTable},
376-
{tableName: testAWSDynamoDBInvalidTable},
363+
{TableName: testAWSDynamoDBBigTable},
364+
{TableName: testAWSDynamoDBSmallTable},
365+
{TableName: testAWSDynamoDBErrorTable},
366+
{TableName: testAWSDynamoDBInvalidTable},
377367
}
378368

379369
func TestParseAwsDynamoDBStreamsMetadata(t *testing.T) {
380370
for _, testData := range testAwsDynamoDBStreamMetadata {
381-
result, err := parseAwsDynamoDBStreamsMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testAwsDynamoDBStreamAuthentication, AuthParams: testData.authParams, TriggerIndex: testData.triggerIndex}, logr.Discard())
371+
result, err := parseAwsDynamoDBStreamsMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testAwsDynamoDBStreamAuthentication, AuthParams: testData.authParams, TriggerIndex: testData.triggerIndex})
382372
if err != nil && !testData.isError {
383373
t.Errorf("Expected success because %s got error, %s", testData.comment, err)
384374
}
@@ -395,11 +385,11 @@ func TestParseAwsDynamoDBStreamsMetadata(t *testing.T) {
395385
func TestAwsDynamoDBStreamsGetMetricSpecForScaling(t *testing.T) {
396386
for _, testData := range awsDynamoDBStreamMetricIdentifiers {
397387
ctx := context.Background()
398-
meta, err := parseAwsDynamoDBStreamsMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testAwsDynamoDBStreamAuthentication, AuthParams: testData.metadataTestData.authParams, TriggerIndex: testData.triggerIndex}, logr.Discard())
388+
meta, err := parseAwsDynamoDBStreamsMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testAwsDynamoDBStreamAuthentication, AuthParams: testData.metadataTestData.authParams, TriggerIndex: testData.triggerIndex})
399389
if err != nil {
400390
t.Fatal("Could not parse metadata:", err)
401391
}
402-
streamArn, err := getDynamoDBStreamsArn(ctx, &mockAwsDynamoDB{}, &meta.tableName)
392+
streamArn, err := getDynamoDBStreamsArn(ctx, &mockAwsDynamoDB{}, &meta.TableName)
403393
if err != nil {
404394
t.Fatal("Could not get dynamodb stream arn:", err)
405395
}
@@ -418,12 +408,12 @@ func TestAwsDynamoDBStreamsScalerGetMetrics(t *testing.T) {
418408
var err error
419409
var streamArn *string
420410
ctx := context.Background()
421-
streamArn, err = getDynamoDBStreamsArn(ctx, &mockAwsDynamoDB{}, &meta.tableName)
411+
streamArn, err = getDynamoDBStreamsArn(ctx, &mockAwsDynamoDB{}, &meta.TableName)
422412
if err == nil {
423413
scaler := awsDynamoDBStreamsScaler{"", meta, streamArn, &mockAwsDynamoDBStreams{}, logr.Discard()}
424414
value, _, err = scaler.GetMetricsAndActivity(context.Background(), "MetricName")
425415
}
426-
switch meta.tableName {
416+
switch meta.TableName {
427417
case testAWSDynamoDBErrorTable:
428418
assert.Error(t, err, "expect error because of dynamodb stream api error")
429419
case testAWSDynamoDBInvalidTable:
@@ -442,12 +432,12 @@ func TestAwsDynamoDBStreamsScalerIsActive(t *testing.T) {
442432
var err error
443433
var streamArn *string
444434
ctx := context.Background()
445-
streamArn, err = getDynamoDBStreamsArn(ctx, &mockAwsDynamoDB{}, &meta.tableName)
435+
streamArn, err = getDynamoDBStreamsArn(ctx, &mockAwsDynamoDB{}, &meta.TableName)
446436
if err == nil {
447437
scaler := awsDynamoDBStreamsScaler{"", meta, streamArn, &mockAwsDynamoDBStreams{}, logr.Discard()}
448438
_, value, err = scaler.GetMetricsAndActivity(context.Background(), "MetricName")
449439
}
450-
switch meta.tableName {
440+
switch meta.TableName {
451441
case testAWSDynamoDBErrorTable:
452442
assert.Error(t, err, "expect error because of dynamodb stream api error")
453443
case testAWSDynamoDBInvalidTable:

0 commit comments

Comments
 (0)