Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple queues at the IBMMQ scaler #6182

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ Here is an overview of all new **experimental** features:
- **AWS CloudWatch Scaler**: Add support for ignoreNullValues ([#5352](https://github.com/kedacore/keda/issues/5352))
- **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778))
- **GitHub Scaler**: Fixed pagination, fetching repository list ([#5738](https://github.com/kedacore/keda/issues/5738))
- **IBMMQ Scaler**: Support multiple queues at the IBMMQ scaler ([#6181](https://github.com/kedacore/keda/issues/6181))
- **Kafka**: Fix logic to scale to zero on invalid offset even with earliest offsetResetPolicy ([#5689](https://github.com/kedacore/keda/issues/5689))
- **RabbitMQ Scaler**: Add connection name for AMQP ([#5958](https://github.com/kedacore/keda/issues/5958))
- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
Expand Down
96 changes: 54 additions & 42 deletions pkg/scalers/ibmmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,18 @@ type ibmmqScaler struct {
}

type ibmmqMetadata struct {
Host string `keda:"name=host, order=triggerMetadata"`
QueueName string `keda:"name=queueName, order=triggerMetadata"`
QueueDepth int64 `keda:"name=queueDepth, order=triggerMetadata, default=20"`
ActivationQueueDepth int64 `keda:"name=activationQueueDepth, order=triggerMetadata, default=0"`
Username string `keda:"name=username, order=authParams;resolvedEnv;triggerMetadata"`
Password string `keda:"name=password, order=authParams;resolvedEnv;triggerMetadata"`
UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, default=false"`
TLS bool `keda:"name=tls, order=triggerMetadata, default=false"` // , deprecated=use unsafeSsl instead
CA string `keda:"name=ca, order=authParams, optional"`
Cert string `keda:"name=cert, order=authParams, optional"`
Key string `keda:"name=key, order=authParams, optional"`
KeyPassword string `keda:"name=keyPassword, order=authParams, optional"`
Host string `keda:"name=host, order=triggerMetadata"`
QueueName []string `keda:"name=queueName;queueNames, order=triggerMetadata"`
QueueDepth int64 `keda:"name=queueDepth, order=triggerMetadata, default=20"`
ActivationQueueDepth int64 `keda:"name=activationQueueDepth, order=triggerMetadata, default=0"`
Username string `keda:"name=username, order=authParams;resolvedEnv;triggerMetadata"`
Password string `keda:"name=password, order=authParams;resolvedEnv;triggerMetadata"`
UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, default=false"`
TLS bool `keda:"name=tls, order=triggerMetadata, default=false"` // , deprecated=use unsafeSsl instead
CA string `keda:"name=ca, order=authParams, optional"`
Cert string `keda:"name=cert, order=authParams, optional"`
Key string `keda:"name=key, order=authParams, optional"`
KeyPassword string `keda:"name=keyPassword, order=authParams, optional"`

triggerIndex int
}
Expand Down Expand Up @@ -129,54 +129,66 @@ func parseIBMMQMetadata(config *scalersconfig.ScalerConfig) (ibmmqMetadata, erro
}

func (s *ibmmqScaler) getQueueDepthViaHTTP(ctx context.Context) (int64, error) {
queue := s.metadata.QueueName
maxDepth := int64(0)
url := s.metadata.Host

var requestJSON = []byte(`{"type": "runCommandJSON", "command": "display", "qualifier": "qlocal", "name": "` + queue + `", "responseParameters" : ["CURDEPTH"]}`)
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(requestJSON))
req, err := http.NewRequestWithContext(ctx, "POST", url, nil)
if err != nil {
return 0, fmt.Errorf("failed to request queue depth: %w", err)
return 0, fmt.Errorf("failed to create HTTP request: %w", err)
}
req.Header.Set("ibm-mq-rest-csrf-token", "value")
req.Header.Set("Content-Type", "application/json")

req.SetBasicAuth(s.metadata.Username, s.metadata.Password)

resp, err := s.httpClient.Do(req)
if err != nil {
return 0, fmt.Errorf("failed to contact MQ via REST: %w", err)
}
defer resp.Body.Close()
for _, queueName := range s.metadata.QueueName {
requestJSON := []byte(`{"type": "runCommandJSON", "command": "display", "qualifier": "qlocal", "name": "` + queueName + `", "responseParameters" : ["CURDEPTH"]}`)
req.Body = io.NopCloser(bytes.NewBuffer(requestJSON))

body, err := io.ReadAll(resp.Body)
if err != nil {
return 0, fmt.Errorf("failed to read body of request: %w", err)
}
resp, err := s.httpClient.Do(req)
if err != nil {
return 0, fmt.Errorf("failed to contact MQ via REST for queue %s: %w", queueName, err)
}
defer resp.Body.Close()

var response CommandResponse
err = json.Unmarshal(body, &response)
if err != nil {
return 0, fmt.Errorf("failed to parse JSON: %w", err)
}
if resp.StatusCode == http.StatusUnauthorized {
return 0, fmt.Errorf("authentication failed: incorrect username or password")
}

if response.CommandResponse == nil || len(response.CommandResponse) == 0 {
return 0, fmt.Errorf("failed to parse response from REST call")
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return 0, fmt.Errorf("failed to read body of request for queue %s: %w", queueName, err)
}

var response CommandResponse
err = json.Unmarshal(body, &response)
if err != nil {
return 0, fmt.Errorf("failed to parse JSON for queue %s: %w", queueName, err)
}

if response.CommandResponse == nil || len(response.CommandResponse) == 0 {
return 0, fmt.Errorf("failed to parse response from REST call for queue %s", queueName)
}

if response.CommandResponse[0].Parameters == nil {
var reason string
message := strings.Join(response.CommandResponse[0].Message, " ")
if message != "" {
reason = fmt.Sprintf(", reason: %s", message)
}
return 0, fmt.Errorf("failed to get the current queue depth parameter for queue %s%s", queueName, reason)
}

if response.CommandResponse[0].Parameters == nil {
var reason string
message := strings.Join(response.CommandResponse[0].Message, " ")
if message != "" {
reason = fmt.Sprintf(", reason: %s", message)
depth := int64(response.CommandResponse[0].Parameters.Curdepth)
if depth > maxDepth {
maxDepth = depth
}
return 0, fmt.Errorf("failed to get the current queue depth parameter%s", reason)
}

return int64(response.CommandResponse[0].Parameters.Curdepth), nil
return maxDepth, nil
}

func (s *ibmmqScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
metricName := kedautil.NormalizeString(fmt.Sprintf("ibmmq-%s", s.metadata.QueueName))
metricName := kedautil.NormalizeString(fmt.Sprintf("ibmmq-%s", s.metadata.QueueName[0]))
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, metricName),
Expand Down
9 changes: 7 additions & 2 deletions pkg/scalers/ibmmq_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ var testIBMMQMetadata = []parseIBMMQMetadataTestData{
{map[string]string{}, true, map[string]string{}},
// Properly formed metadata
{map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}},
// Properly formed metadata with 2 queues
{map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue1, testQueue2", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}},
// Properly formed metadata with 2 queues with param queueNames
{map[string]string{"host": testValidMQQueueURL, "queueNames": "testQueue1, testQueue2", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}},
// Invalid queueDepth using a string
{map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue", "queueDepth": "AA"}, true, map[string]string{"username": "testUsername", "password": "Pass123"}},
// Invalid activationQueueDepth using a string
Expand Down Expand Up @@ -89,7 +93,7 @@ func TestIBMMQParseMetadata(t *testing.T) {
t.Error("Expected error but got success")
fmt.Println(testData)
}
if metadata != (ibmmqMetadata{}) && metadata.Password != "" && metadata.Password != testData.authParams["password"] {
if metadata.Password != "" && metadata.Password != testData.authParams["password"] {
t.Error("Expected password from configuration but found something else: ", metadata.Password)
fmt.Println(testData)
}
Expand Down Expand Up @@ -216,7 +220,8 @@ func TestIBMMQScalerGetQueueDepthViaHTTP(t *testing.T) {

scaler := ibmmqScaler{
metadata: ibmmqMetadata{
Host: server.URL,
Host: server.URL,
QueueName: []string{"TEST.QUEUE"},
},
httpClient: server.Client(),
}
Expand Down
Loading