From 56f8d4aacdf7da7dff7cbd6aadad3e816f2cc16b Mon Sep 17 00:00:00 2001 From: Omer Aplatony Date: Fri, 20 Dec 2024 10:59:06 +0200 Subject: [PATCH 1/3] refactor liklus scaler Signed-off-by: Omer Aplatony --- pkg/scalers/liiklus_scaler.go | 80 ++++---------- pkg/scalers/liiklus_scaler_test.go | 170 +++++++++++++++++++---------- 2 files changed, 136 insertions(+), 114 deletions(-) diff --git a/pkg/scalers/liiklus_scaler.go b/pkg/scalers/liiklus_scaler.go index 9ca9947fd0a..4d6782ea03a 100644 --- a/pkg/scalers/liiklus_scaler.go +++ b/pkg/scalers/liiklus_scaler.go @@ -3,7 +3,6 @@ package scalers import ( "context" "fmt" - "strconv" "time" "github.com/go-logr/logr" @@ -27,12 +26,12 @@ type liiklusScaler struct { } type liiklusMetadata struct { - lagThreshold int64 - activationLagThreshold int64 - address string - topic string - group string - groupVersion uint32 + LagThreshold int64 `keda:"name=lagThreshold,order=triggerMetadata,default=10"` + ActivationLagThreshold int64 `keda:"name=activationLagThreshold,order=triggerMetadata,default=0"` + Address string `keda:"name=address,order=triggerMetadata"` + Topic string `keda:"name=topic,order=triggerMetadata"` + Group string `keda:"name=group,order=triggerMetadata"` + GroupVersion uint32 `keda:"name=groupVersion,order=triggerMetadata,default=0"` triggerIndex int } @@ -70,7 +69,7 @@ func NewLiiklusScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { return nil, err } - conn, err := grpc.NewClient(lm.address, + conn, err := grpc.NewClient(lm.Address, grpc.WithDefaultServiceConfig(grpcConfig), grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { @@ -94,21 +93,21 @@ func (s *liiklusScaler) GetMetricsAndActivity(ctx context.Context, metricName st return nil, false, err } - if totalLag/uint64(s.metadata.lagThreshold) > uint64(len(lags)) { - totalLag = uint64(s.metadata.lagThreshold) * uint64(len(lags)) + if totalLag/uint64(s.metadata.LagThreshold) > uint64(len(lags)) { + totalLag = uint64(s.metadata.LagThreshold) * uint64(len(lags)) } metric := GenerateMetricInMili(metricName, float64(totalLag)) - return []external_metrics.ExternalMetricValue{metric}, totalLag > uint64(s.metadata.activationLagThreshold), nil + return []external_metrics.ExternalMetricValue{metric}, totalLag > uint64(s.metadata.ActivationLagThreshold), nil } func (s *liiklusScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { externalMetric := &v2.ExternalMetricSource{ Metric: v2.MetricIdentifier{ - Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("liiklus-%s", s.metadata.topic))), + Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("liiklus-%s", s.metadata.Topic))), }, - Target: GetMetricTarget(s.metricType, s.metadata.lagThreshold), + Target: GetMetricTarget(s.metricType, s.metadata.LagThreshold), } metricSpec := v2.MetricSpec{External: externalMetric, Type: liiklusMetricType} return []v2.MetricSpec{metricSpec} @@ -131,9 +130,9 @@ func (s *liiklusScaler) getLag(ctx context.Context) (uint64, map[uint32]uint64, ctx1, cancel1 := context.WithTimeout(ctx, 10*time.Second) defer cancel1() gor, err := s.client.GetOffsets(ctx1, &liiklus_service.GetOffsetsRequest{ - Topic: s.metadata.topic, - Group: s.metadata.group, - GroupVersion: s.metadata.groupVersion, + Topic: s.metadata.Topic, + Group: s.metadata.Group, + GroupVersion: s.metadata.GroupVersion, }) if err != nil { return 0, nil, err @@ -142,7 +141,7 @@ func (s *liiklusScaler) getLag(ctx context.Context) (uint64, map[uint32]uint64, ctx2, cancel2 := context.WithTimeout(ctx, 10*time.Second) defer cancel2() geor, err := s.client.GetEndOffsets(ctx2, &liiklus_service.GetEndOffsetsRequest{ - Topic: s.metadata.topic, + Topic: s.metadata.Topic, }) if err != nil { return 0, nil, err @@ -159,50 +158,17 @@ func (s *liiklusScaler) getLag(ctx context.Context) (uint64, map[uint32]uint64, } func parseLiiklusMetadata(config *scalersconfig.ScalerConfig) (*liiklusMetadata, error) { - lagThreshold := defaultLiiklusLagThreshold - activationLagThreshold := defaultLiiklusActivationLagThreshold - - if val, ok := config.TriggerMetadata[liiklusLagThresholdMetricName]; ok { - t, err := strconv.ParseInt(val, 10, 64) - if err != nil { - return nil, fmt.Errorf("error parsing %s: %w", liiklusLagThresholdMetricName, err) - } - lagThreshold = t + meta := &liiklusMetadata{} + if err := config.TypedConfig(meta); err != nil { + return nil, fmt.Errorf("error parsing liiklus metadata: %w", err) } - - if val, ok := config.TriggerMetadata[liiklusActivationLagThresholdMetricName]; ok { - t, err := strconv.ParseInt(val, 10, 64) - if err != nil { - return nil, fmt.Errorf("error parsing %s: %w", liiklusActivationLagThresholdMetricName, err) - } - activationLagThreshold = t - } - - groupVersion := uint32(0) - if val, ok := config.TriggerMetadata["groupVersion"]; ok { - t, err := strconv.ParseUint(val, 10, 32) - if err != nil { - return nil, fmt.Errorf("error parsing groupVersion: %w", err) - } - groupVersion = uint32(t) - } - switch { - case config.TriggerMetadata["topic"] == "": + case meta.Topic == "": return nil, ErrLiiklusNoTopic - case config.TriggerMetadata["address"] == "": + case meta.Address == "": return nil, ErrLiiklusNoAddress - case config.TriggerMetadata["group"] == "": + case meta.Group == "": return nil, ErrLiiklusNoGroup } - - return &liiklusMetadata{ - topic: config.TriggerMetadata["topic"], - address: config.TriggerMetadata["address"], - group: config.TriggerMetadata["group"], - groupVersion: groupVersion, - lagThreshold: lagThreshold, - activationLagThreshold: activationLagThreshold, - triggerIndex: config.TriggerIndex, - }, nil + return meta, nil } diff --git a/pkg/scalers/liiklus_scaler_test.go b/pkg/scalers/liiklus_scaler_test.go index 268157beb9f..45e6162f9e2 100644 --- a/pkg/scalers/liiklus_scaler_test.go +++ b/pkg/scalers/liiklus_scaler_test.go @@ -2,8 +2,7 @@ package scalers import ( "context" - "errors" - "strconv" + "fmt" "testing" "github.com/go-logr/logr" @@ -15,12 +14,10 @@ import ( ) type parseLiiklusMetadataTestData struct { - metadata map[string]string - err error - liiklusAddress string - group string - topic string - threshold int64 + name string + metadata map[string]string + ExpectedErr error + ExpectedMetatada *liiklusMetadata } type liiklusMetricIdentifier struct { @@ -30,12 +27,64 @@ type liiklusMetricIdentifier struct { } var parseLiiklusMetadataTestDataset = []parseLiiklusMetadataTestData{ - {map[string]string{}, ErrLiiklusNoTopic, "", "", "", 0}, - {map[string]string{"topic": "foo"}, ErrLiiklusNoAddress, "", "", "", 0}, - {map[string]string{"topic": "foo", "address": "bar:6565"}, ErrLiiklusNoGroup, "", "", "", 0}, - {map[string]string{"topic": "foo", "address": "bar:6565", "group": "mygroup"}, nil, "bar:6565", "mygroup", "foo", 10}, - {map[string]string{"topic": "foo", "address": "bar:6565", "group": "mygroup", "activationLagThreshold": "aa"}, strconv.ErrSyntax, "bar:6565", "mygroup", "foo", 10}, - {map[string]string{"topic": "foo", "address": "bar:6565", "group": "mygroup", "lagThreshold": "15"}, nil, "bar:6565", "mygroup", "foo", 15}, + { + name: "Empty metadata", + metadata: map[string]string{}, + ExpectedErr: fmt.Errorf("error parsing liiklus metadata: " + + "missing required parameter \"address\" in [triggerMetadata]\n" + + "missing required parameter \"topic\" in [triggerMetadata]\n" + + "missing required parameter \"group\" in [triggerMetadata]"), + ExpectedMetatada: nil, + }, + { + name: "Empty address", + metadata: map[string]string{"topic": "foo"}, + ExpectedErr: fmt.Errorf("error parsing liiklus metadata: " + + "missing required parameter \"address\" in [triggerMetadata]\n" + + "missing required parameter \"group\" in [triggerMetadata]"), + ExpectedMetatada: nil, + }, + { + name: "Empty group", + metadata: map[string]string{"topic": "foo", "address": "using-mock"}, + ExpectedErr: fmt.Errorf("error parsing liiklus metadata: " + + "missing required parameter \"group\" in [triggerMetadata]"), + ExpectedMetatada: nil, + }, + { + name: "Valid", + metadata: map[string]string{"topic": "foo", "address": "using-mock", "group": "mygroup"}, + ExpectedErr: nil, + ExpectedMetatada: &liiklusMetadata{ + LagThreshold: defaultLiiklusLagThreshold, + ActivationLagThreshold: defaultLiiklusActivationLagThreshold, + Address: "using-mock", + Topic: "foo", + Group: "mygroup", + GroupVersion: 0, + triggerIndex: 0, + }, + }, + { + name: "Invalid activationLagThreshold", + metadata: map[string]string{"topic": "foo", "address": "using-mock", "group": "mygroup", "activationLagThreshold": "invalid"}, + ExpectedErr: fmt.Errorf("error parsing liiklus metadata: unable to set param \"activationLagThreshold\" value \"invalid\": unable to unmarshal to field type int64: invalid character 'i' looking for beginning of value"), + ExpectedMetatada: nil, + }, + { + name: "Custom lagThreshold", + metadata: map[string]string{"topic": "foo", "address": "using-mock", "group": "mygroup", "lagThreshold": "20"}, + ExpectedErr: nil, + ExpectedMetatada: &liiklusMetadata{ + LagThreshold: 20, + ActivationLagThreshold: defaultLiiklusActivationLagThreshold, + Address: "using-mock", + Topic: "foo", + Group: "mygroup", + GroupVersion: 0, + triggerIndex: 0, + }, + }, } var liiklusMetricIdentifiers = []liiklusMetricIdentifier{ @@ -45,38 +94,44 @@ var liiklusMetricIdentifiers = []liiklusMetricIdentifier{ func TestLiiklusParseMetadata(t *testing.T) { for _, testData := range parseLiiklusMetadataTestDataset { - meta, err := parseLiiklusMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata}) - if err != nil && testData.err == nil { - t.Error("Expected success but got error", err) - continue - } - if testData.err != nil && err == nil { - t.Error("Expected error but got success") - continue - } - if testData.err != nil && err != nil && !errors.Is(err, testData.err) { - t.Errorf("Expected error %v but got %v", testData.err, err) - continue - } - if err != nil { - continue - } - if testData.liiklusAddress != meta.address { - t.Errorf("Expected address %q but got %q\n", testData.liiklusAddress, meta.address) - continue - } - if meta.group != testData.group { - t.Errorf("Expected group %q but got %q\n", testData.group, meta.group) - continue - } - if meta.topic != testData.topic { - t.Errorf("Expected topic %q but got %q\n", testData.topic, meta.topic) - continue - } - if meta.lagThreshold != testData.threshold { - t.Errorf("Expected threshold %d but got %d\n", testData.threshold, meta.lagThreshold) - continue - } + t.Run(testData.name, func(t *testing.T) { + meta, err := parseLiiklusMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata}) + + // error cases + if testData.ExpectedErr != nil { + if err == nil { + t.Errorf("Expected error %v but got success", testData.ExpectedErr) + } else if err.Error() != testData.ExpectedErr.Error() { + t.Errorf("Expected error %v but got %v", testData.ExpectedErr, err) + } + return // Skip the rest of the checks for error cases + } + + // success cases + if err != nil { + t.Errorf("Expected success but got error %v", err) + } + if testData.ExpectedMetatada != nil { + if testData.ExpectedMetatada.Address != meta.Address { + t.Errorf("Expected address %q but got %q", testData.ExpectedMetatada.Address, meta.Address) + } + if meta.Group != testData.ExpectedMetatada.Group { + t.Errorf("Expected group %q but got %q", testData.ExpectedMetatada.Group, meta.Group) + } + if meta.Topic != testData.ExpectedMetatada.Topic { + t.Errorf("Expected topic %q but got %q", testData.ExpectedMetatada.Topic, meta.Topic) + } + if meta.LagThreshold != testData.ExpectedMetatada.LagThreshold { + t.Errorf("Expected threshold %d but got %d", testData.ExpectedMetatada.LagThreshold, meta.LagThreshold) + } + if meta.ActivationLagThreshold != testData.ExpectedMetatada.ActivationLagThreshold { + t.Errorf("Expected activation threshold %d but got %d", testData.ExpectedMetatada.ActivationLagThreshold, meta.ActivationLagThreshold) + } + if meta.GroupVersion != testData.ExpectedMetatada.GroupVersion { + t.Errorf("Expected group version %d but got %d", testData.ExpectedMetatada.GroupVersion, meta.GroupVersion) + } + } + }) } } @@ -172,16 +227,17 @@ func TestLiiklusScalerGetMetricsBehavior(t *testing.T) { func TestLiiklusGetMetricSpecForScaling(t *testing.T) { for _, testData := range liiklusMetricIdentifiers { - meta, err := parseLiiklusMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, TriggerIndex: testData.triggerIndex}) - if err != nil { - t.Fatal("Could not parse metadata:", err) - } - mockLiiklusScaler := liiklusScaler{"", meta, nil, nil, logr.Discard()} - - metricSpec := mockLiiklusScaler.GetMetricSpecForScaling(context.Background()) - metricName := metricSpec[0].External.Metric.Name - if metricName != testData.name { - t.Error("Wrong External metric source name:", metricName) - } + t.Run(testData.name, func(t *testing.T) { + meta, err := parseLiiklusMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata}) + if err != nil { + t.Fatal("Could not parse metadata:", err) + } + meta.triggerIndex = testData.triggerIndex + mockLiiklusScaler := liiklusScaler{"", meta, nil, nil, logr.Discard()} + metricSpec := mockLiiklusScaler.GetMetricSpecForScaling(context.Background()) + if metricSpec[0].External.Metric.Name != testData.name { + t.Errorf("Wrong External metric source name: %s", metricSpec[0].External.Metric.Name) + } + }) } } From 4934fd095132403bbef5a4e781424fa6985a0850 Mon Sep 17 00:00:00 2001 From: Omer Aplatony Date: Fri, 20 Dec 2024 21:46:58 +0200 Subject: [PATCH 2/3] Fix unit test for scaling Signed-off-by: Omer Aplatony --- pkg/scalers/liiklus_scaler.go | 9 +-------- pkg/scalers/liiklus_scaler_test.go | 3 +-- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/pkg/scalers/liiklus_scaler.go b/pkg/scalers/liiklus_scaler.go index 4d6782ea03a..1f78ce154a8 100644 --- a/pkg/scalers/liiklus_scaler.go +++ b/pkg/scalers/liiklus_scaler.go @@ -162,13 +162,6 @@ func parseLiiklusMetadata(config *scalersconfig.ScalerConfig) (*liiklusMetadata, if err := config.TypedConfig(meta); err != nil { return nil, fmt.Errorf("error parsing liiklus metadata: %w", err) } - switch { - case meta.Topic == "": - return nil, ErrLiiklusNoTopic - case meta.Address == "": - return nil, ErrLiiklusNoAddress - case meta.Group == "": - return nil, ErrLiiklusNoGroup - } + meta.triggerIndex = config.TriggerIndex return meta, nil } diff --git a/pkg/scalers/liiklus_scaler_test.go b/pkg/scalers/liiklus_scaler_test.go index 45e6162f9e2..2f22bba9abc 100644 --- a/pkg/scalers/liiklus_scaler_test.go +++ b/pkg/scalers/liiklus_scaler_test.go @@ -228,11 +228,10 @@ func TestLiiklusScalerGetMetricsBehavior(t *testing.T) { func TestLiiklusGetMetricSpecForScaling(t *testing.T) { for _, testData := range liiklusMetricIdentifiers { t.Run(testData.name, func(t *testing.T) { - meta, err := parseLiiklusMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata}) + meta, err := parseLiiklusMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, TriggerIndex: testData.triggerIndex}) if err != nil { t.Fatal("Could not parse metadata:", err) } - meta.triggerIndex = testData.triggerIndex mockLiiklusScaler := liiklusScaler{"", meta, nil, nil, logr.Discard()} metricSpec := mockLiiklusScaler.GetMetricSpecForScaling(context.Background()) if metricSpec[0].External.Metric.Name != testData.name { From 9924ef17e97ec4198e21c1cad008535930c642ed Mon Sep 17 00:00:00 2001 From: Omer Aplatony Date: Sun, 22 Dec 2024 19:55:42 +0200 Subject: [PATCH 3/3] Removed unused constants Signed-off-by: Omer Aplatony --- pkg/scalers/liiklus_scaler.go | 9 +-------- pkg/scalers/liiklus_scaler_test.go | 6 +++--- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/pkg/scalers/liiklus_scaler.go b/pkg/scalers/liiklus_scaler.go index 1f78ce154a8..3b623d35af9 100644 --- a/pkg/scalers/liiklus_scaler.go +++ b/pkg/scalers/liiklus_scaler.go @@ -36,14 +36,7 @@ type liiklusMetadata struct { } const ( - defaultLiiklusLagThreshold int64 = 10 - defaultLiiklusActivationLagThreshold int64 = 0 -) - -const ( - liiklusLagThresholdMetricName = "lagThreshold" - liiklusActivationLagThresholdMetricName = "activationLagThreshold" - liiklusMetricType = "External" + liiklusMetricType = "External" ) var ( diff --git a/pkg/scalers/liiklus_scaler_test.go b/pkg/scalers/liiklus_scaler_test.go index 2f22bba9abc..dc4022dcd6f 100644 --- a/pkg/scalers/liiklus_scaler_test.go +++ b/pkg/scalers/liiklus_scaler_test.go @@ -56,8 +56,8 @@ var parseLiiklusMetadataTestDataset = []parseLiiklusMetadataTestData{ metadata: map[string]string{"topic": "foo", "address": "using-mock", "group": "mygroup"}, ExpectedErr: nil, ExpectedMetatada: &liiklusMetadata{ - LagThreshold: defaultLiiklusLagThreshold, - ActivationLagThreshold: defaultLiiklusActivationLagThreshold, + LagThreshold: 10, + ActivationLagThreshold: 0, Address: "using-mock", Topic: "foo", Group: "mygroup", @@ -77,7 +77,7 @@ var parseLiiklusMetadataTestDataset = []parseLiiklusMetadataTestData{ ExpectedErr: nil, ExpectedMetatada: &liiklusMetadata{ LagThreshold: 20, - ActivationLagThreshold: defaultLiiklusActivationLagThreshold, + ActivationLagThreshold: 0, Address: "using-mock", Topic: "foo", Group: "mygroup",