From f1f827bc7e89478b9507281a08769276dc149cb0 Mon Sep 17 00:00:00 2001 From: rickbrouwer Date: Wed, 8 Jan 2025 16:23:36 +0100 Subject: [PATCH] Handling StatusNotFound in IBMMQ scaler Signed-off-by: rickbrouwer --- CHANGELOG.md | 2 +- pkg/scalers/ibmmq_scaler.go | 81 ++++++++++++++++++++------------ pkg/scalers/ibmmq_scaler_test.go | 2 +- 3 files changed, 53 insertions(+), 32 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 963665fd4b7..a514f0dee2c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -71,7 +71,7 @@ Here is an overview of all new **experimental** features: ### Improvements -- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) +- **IBMMQ Scaler**: Handling StatusNotFound in IBMMQ scaler ([#6472](https://github.com/kedacore/keda/pull/6472)) ### Fixes diff --git a/pkg/scalers/ibmmq_scaler.go b/pkg/scalers/ibmmq_scaler.go index 5963ca1b222..9c823468ed4 100644 --- a/pkg/scalers/ibmmq_scaler.go +++ b/pkg/scalers/ibmmq_scaler.go @@ -8,7 +8,6 @@ import ( "io" "net/http" "net/url" - "strings" "github.com/go-logr/logr" v2 "k8s.io/api/autoscaling/v2" @@ -54,6 +53,13 @@ type Response struct { Message []string `json:"message"` } +// ErrorResponse Structure for error messages from IBM MQ +type ErrorResponse struct { + Error []struct { + Message string `json:"message"` + } `json:"error"` +} + // Parameters Contains the current depth of the IBM MQ Queue type Parameters struct { Curdepth int `json:"curdepth"` @@ -132,18 +138,18 @@ func parseIBMMQMetadata(config *scalersconfig.ScalerConfig) (ibmmqMetadata, erro func (s *ibmmqScaler) getQueueDepthViaHTTP(ctx context.Context) (int64, error) { depths := make([]int64, 0, len(s.metadata.QueueName)) - url := s.metadata.Host - req, err := http.NewRequestWithContext(ctx, "POST", url, nil) + req, err := http.NewRequestWithContext(ctx, "POST", s.metadata.Host, nil) if err != nil { return 0, fmt.Errorf("failed to create HTTP request: %w", err) } + req.Header.Set("ibm-mq-rest-csrf-token", "value") req.Header.Set("Content-Type", "application/json") req.SetBasicAuth(s.metadata.Username, s.metadata.Password) for _, queueName := range s.metadata.QueueName { - requestJSON := []byte(`{"type": "runCommandJSON", "command": "display", "qualifier": "qlocal", "name": "` + queueName + `", "responseParameters" : ["CURDEPTH"]}`) + requestJSON := []byte(fmt.Sprintf(`{"type": "runCommandJSON", "command": "display", "qualifier": "qlocal", "name": "%s", "responseParameters": ["CURDEPTH"]}`, queueName)) req.Body = io.NopCloser(bytes.NewBuffer(requestJSON)) resp, err := s.httpClient.Do(req) @@ -152,47 +158,62 @@ func (s *ibmmqScaler) getQueueDepthViaHTTP(ctx context.Context) (int64, error) { } defer resp.Body.Close() - if resp.StatusCode == http.StatusUnauthorized { - return 0, fmt.Errorf("authentication failed: incorrect username or password") - } - body, err := io.ReadAll(resp.Body) if err != nil { return 0, fmt.Errorf("failed to read body of request for queue %s: %w", queueName, err) } - var response CommandResponse - err = json.Unmarshal(body, &response) - if err != nil { - return 0, fmt.Errorf("failed to parse JSON for queue %s: %w", queueName, err) - } + switch resp.StatusCode { + case http.StatusUnauthorized: + return 0, fmt.Errorf("authentication failed: incorrect username or password") + case http.StatusNotFound: + var errorResponse ErrorResponse + if err := json.Unmarshal(body, &errorResponse); err != nil { + return 0, fmt.Errorf("failed to parse error response JSON for queue %s: %w", queueName, err) + } + if len(errorResponse.Error) > 0 && errorResponse.Error[0].Message != "" { + return 0, fmt.Errorf("%s", errorResponse.Error[0].Message) + } + return 0, fmt.Errorf("failed to get the current queue depth parameter for queue %s", queueName) + case http.StatusOK: + var response CommandResponse + if err := json.Unmarshal(body, &response); err != nil { + return 0, fmt.Errorf("failed to parse JSON for queue %s: %w", queueName, err) + } - if len(response.CommandResponse) == 0 { - return 0, fmt.Errorf("failed to parse response from REST call for queue %s", queueName) - } + // Check for valid response with message + if len(response.CommandResponse) > 0 && len(response.CommandResponse[0].Message) > 0 { + return 0, fmt.Errorf("%s", response.CommandResponse[0].Message[0]) + } - if response.CommandResponse[0].Parameters == nil { - var reason string - message := strings.Join(response.CommandResponse[0].Message, " ") - if message != "" { - reason = fmt.Sprintf(", reason: %s", message) + // Check for valid response with parameters + if len(response.CommandResponse) == 0 || response.CommandResponse[0].Parameters == nil { + return 0, fmt.Errorf("failed to get the current queue depth parameter for queue %s", queueName) } - return 0, fmt.Errorf("failed to get the current queue depth parameter for queue %s%s", queueName, reason) + + depths = append(depths, int64(response.CommandResponse[0].Parameters.Curdepth)) + default: + return 0, fmt.Errorf("unexpected status code %d for queue %s", resp.StatusCode, queueName) } + } - depth := int64(response.CommandResponse[0].Parameters.Curdepth) - depths = append(depths, depth) + return calculateDepth(depths, s.metadata.Operation), nil +} + +func calculateDepth(depths []int64, operation string) int64 { + if len(depths) == 0 { + return 0 } - switch s.metadata.Operation { + switch operation { case sumOperation: - return sumDepths(depths), nil + return sumDepths(depths) case avgOperation: - return avgDepths(depths), nil + return avgDepths(depths) case maxOperation: - return maxDepth(depths), nil + return maxDepths(depths) default: - return 0, nil + return 0 } } @@ -211,7 +232,7 @@ func avgDepths(depths []int64) int64 { return sumDepths(depths) / int64(len(depths)) } -func maxDepth(depths []int64) int64 { +func maxDepths(depths []int64) int64 { if len(depths) == 0 { return 0 } diff --git a/pkg/scalers/ibmmq_scaler_test.go b/pkg/scalers/ibmmq_scaler_test.go index f5c1d73aa2b..f4aeaa3d58c 100644 --- a/pkg/scalers/ibmmq_scaler_test.go +++ b/pkg/scalers/ibmmq_scaler_test.go @@ -197,7 +197,7 @@ var testQueueDepthResults = []queueDepthResultTestData{ "message": "MQWB0009E: Could not query the queue manager 'testqmgR'.", "explanation": "The REST API was invoked specifying a queue manager name which cannot be located."}] }`, - responseStatus: http.StatusOK, + responseStatus: http.StatusNotFound, expectedValue: 0, isError: true, },