diff --git a/pkg/scalers/cassandra_scaler.go b/pkg/scalers/cassandra_scaler.go index 9e6fbbd926d..85351ac06bd 100644 --- a/pkg/scalers/cassandra_scaler.go +++ b/pkg/scalers/cassandra_scaler.go @@ -24,17 +24,18 @@ type cassandraScaler struct { // CassandraMetadata defines metadata used by KEDA to query a Cassandra table. type CassandraMetadata struct { - username string - password string - clusterIPAddress string - port int - consistency gocql.Consistency - protocolVersion int - keyspace string - query string - targetQueryValue int64 - metricName string - scalerIndex int + username string + password string + clusterIPAddress string + port int + consistency gocql.Consistency + protocolVersion int + keyspace string + query string + targetQueryValue int64 + activationTargetQueryValue int64 + metricName string + scalerIndex int } var cassandraLog = logf.Log.WithName("cassandra_scaler") @@ -83,6 +84,15 @@ func ParseCassandraMetadata(config *ScalerConfig) (*CassandraMetadata, error) { return nil, fmt.Errorf("no targetQueryValue given") } + meta.activationTargetQueryValue = 0 + if val, ok := config.TriggerMetadata["activationTargetQueryValue"]; ok { + activationTargetQueryValue, err := strconv.ParseInt(val, 10, 64) + if err != nil { + return nil, fmt.Errorf("activationTargetQueryValue parsing error %s", err.Error()) + } + meta.activationTargetQueryValue = activationTargetQueryValue + } + if val, ok := config.TriggerMetadata["username"]; ok { meta.username = val } else { @@ -175,7 +185,7 @@ func (s *cassandraScaler) IsActive(ctx context.Context) (bool, error) { return false, fmt.Errorf("error inspecting cassandra: %s", err) } - return messages > 0, nil + return messages > s.metadata.activationTargetQueryValue, nil } // GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler. diff --git a/tests/scalers_go/cassandra/cassandra_test.go b/tests/scalers_go/cassandra/cassandra_test.go index 30f1b096def..43ed4e75eaf 100644 --- a/tests/scalers_go/cassandra/cassandra_test.go +++ b/tests/scalers_go/cassandra/cassandra_test.go @@ -208,14 +208,18 @@ spec: keyspace: "{{.CassandraKeyspace}}" query: "SELECT COUNT(*) FROM {{.CassandraKeyspace}}.{{.CassandraTableName}};" targetQueryValue: "1" + activationTargetQueryValue: "4" metricName: "{{.CassandraKeyspace}}" authenticationRef: name: keda-trigger-auth-cassandra-secret ` - insertDataTemplate = `BEGIN BATCH + insertDataTemplateA = `BEGIN BATCH INSERT INTO {{.CassandraKeyspace}}.{{.CassandraTableName}} (name, surname, age) VALUES ('Mary', 'Paul', 30); INSERT INTO {{.CassandraKeyspace}}.{{.CassandraTableName}} (name, surname, age) VALUES ('James', 'Miller', 25); INSERT INTO {{.CassandraKeyspace}}.{{.CassandraTableName}} (name, surname, age) VALUES ('Lisa', 'Wilson', 29); + APPLY BATCH;` + + insertDataTemplateB = `BEGIN BATCH INSERT INTO {{.CassandraKeyspace}}.{{.CassandraTableName}} (name, surname, age) VALUES ('Bob', 'Taylor', 33); INSERT INTO {{.CassandraKeyspace}}.{{.CassandraTableName}} (name, surname, age) VALUES ('Carol', 'Moore', 31); INSERT INTO {{.CassandraKeyspace}}.{{.CassandraTableName}} (name, surname, age) VALUES ('Richard', 'Brown', 23); @@ -235,6 +239,7 @@ func TestCassandraScaler(t *testing.T) { "replica count should be %s after 3 minute", minReplicaCount) // test scaling + testActivation(t, kc) testScaleUp(t, kc) testScaleDown(t, kc) @@ -275,9 +280,19 @@ func checkIfCassandraStatusIsReady(t *testing.T, name string) error { return errors.New("cassandra is not ready") } +func testActivation(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing activation ---") + result, err := getCassandraInsertCmd(insertDataTemplateA) + assert.NoErrorf(t, err, "cannot parse log - %s", err) + out, errOut, _ := ExecCommandOnSpecificPod(t, "cassandra-client-0", testNamespace, fmt.Sprintf("bash cqlsh -u %s -p %s %s.%s --execute=\"%s\"", cassandraUsername, cassandraPassword, deploymentName, testNamespace, result)) + t.Logf("Output: %s, Error: %s", out, errOut) + + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, minReplicaCount, 60) +} + func testScaleUp(t *testing.T, kc *kubernetes.Clientset) { t.Log("--- testing scale up ---") - result, err := getCassandraInsertCmd() + result, err := getCassandraInsertCmd(insertDataTemplateB) assert.NoErrorf(t, err, "cannot parse log - %s", err) out, errOut, _ := ExecCommandOnSpecificPod(t, "cassandra-client-0", testNamespace, fmt.Sprintf("bash cqlsh -u %s -p %s %s.%s --execute=\"%s\"", cassandraUsername, cassandraPassword, deploymentName, testNamespace, result)) t.Logf("Output: %s, Error: %s", out, errOut) @@ -296,7 +311,7 @@ func testScaleDown(t *testing.T, kc *kubernetes.Clientset) { "replica count should be %s after 3 minutes", minReplicaCount) } -func getCassandraInsertCmd() (string, error) { +func getCassandraInsertCmd(insertDataTemplate string) (string, error) { tmpl, err := template.New("cassandra insert").Parse(insertDataTemplate) var tpl bytes.Buffer if err := tmpl.Execute(&tpl, templateData{CassandraKeyspace: cassandraKeyspace, CassandraTableName: cassandraTableName}); err != nil {