From 13a44409b096f576e80ca1279333ea3e4a0fd4dd Mon Sep 17 00:00:00 2001 From: Mikel Alejo Date: Thu, 30 Nov 2023 09:03:43 +0100 Subject: [PATCH] RHCLOUD-28805 | feature: tune redelivery conditions for the HTTP connectors (#2362) * feature: tune redelivery conditions for the HTTP connectors With certain responses from the external services, we want the HTTP connectors to automatically retry the redelivery. RHCLOUD-28805 * Apply suggestions from code review * Fix Sonar check --------- Co-authored-by: Gwenneg Lepage --- connector-common-http/pom.xml | 11 ++ .../http/HttpRedeliveryPredicate.java | 37 ++++ .../http/HttpRedeliveryPredicateTest.java | 170 ++++++++++++++++++ .../src/test/resources/application.properties | 23 +++ .../connector/ConnectorRoutesTest.java | 19 +- .../drawer/DrawerConnectorRoutesTest.java | 8 +- .../slack/SlackConnectorRoutesTest.java | 10 ++ .../webhook/WebhookConnectorRoutesTest.java | 9 +- 8 files changed, 275 insertions(+), 12 deletions(-) create mode 100644 connector-common-http/src/main/java/com/redhat/cloud/notifications/connector/http/HttpRedeliveryPredicate.java create mode 100644 connector-common-http/src/test/java/com/redhat/cloud/notifications/connector/http/HttpRedeliveryPredicateTest.java create mode 100644 connector-common-http/src/test/resources/application.properties diff --git a/connector-common-http/pom.xml b/connector-common-http/pom.xml index 9c69a30c5a..be423fa6b0 100644 --- a/connector-common-http/pom.xml +++ b/connector-common-http/pom.xml @@ -59,6 +59,17 @@ camel-quarkus-http + + + io.quarkus + quarkus-junit5-mockito + test + + + org.apache.camel.quarkus + camel-quarkus-junit5 + test + diff --git a/connector-common-http/src/main/java/com/redhat/cloud/notifications/connector/http/HttpRedeliveryPredicate.java b/connector-common-http/src/main/java/com/redhat/cloud/notifications/connector/http/HttpRedeliveryPredicate.java new file mode 100644 index 0000000000..22b328ca40 --- /dev/null +++ b/connector-common-http/src/main/java/com/redhat/cloud/notifications/connector/http/HttpRedeliveryPredicate.java @@ -0,0 +1,37 @@ +package com.redhat.cloud.notifications.connector.http; + +import com.redhat.cloud.notifications.connector.RedeliveryPredicate; +import io.quarkus.logging.Log; +import jakarta.enterprise.context.ApplicationScoped; +import org.apache.camel.Exchange; +import org.apache.camel.http.base.HttpOperationFailedException; +import org.apache.http.HttpStatus; + +import java.io.IOException; + +import static org.apache.camel.Exchange.EXCEPTION_CAUGHT; + +@ApplicationScoped +public class HttpRedeliveryPredicate extends RedeliveryPredicate { + /** + * Check if we should attempt a redelivery. + * @param exchange the exchange that contains the generated exception. + * @return {@code true} if we received a "429 Too Many Requests" or a 5xx + * status code, or if the delivery failed because of a timeout. + */ + @Override + public boolean matches(final Exchange exchange) { + final Throwable t = exchange.getProperty(EXCEPTION_CAUGHT, Throwable.class); + + if (t instanceof HttpOperationFailedException e) { + final boolean shouldRetry = e.getStatusCode() >= 500 || e.getStatusCode() == HttpStatus.SC_TOO_MANY_REQUESTS; + + Log.debugf("The HTTP request failed with status code '%s' and body '%s'. %s", e.getStatusCode(), e.getResponseBody(), (shouldRetry) ? "Retrying..." : "Not retrying"); + + return shouldRetry; + } + + return t instanceof IOException; + } + +} diff --git a/connector-common-http/src/test/java/com/redhat/cloud/notifications/connector/http/HttpRedeliveryPredicateTest.java b/connector-common-http/src/test/java/com/redhat/cloud/notifications/connector/http/HttpRedeliveryPredicateTest.java new file mode 100644 index 0000000000..c329fa530f --- /dev/null +++ b/connector-common-http/src/test/java/com/redhat/cloud/notifications/connector/http/HttpRedeliveryPredicateTest.java @@ -0,0 +1,170 @@ +package com.redhat.cloud.notifications.connector.http; + +import io.quarkus.test.junit.QuarkusTest; +import jakarta.inject.Inject; +import org.apache.camel.Exchange; +import org.apache.camel.http.base.HttpOperationFailedException; +import org.apache.camel.quarkus.test.CamelQuarkusTestSupport; +import org.apache.http.HttpStatus; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.net.SocketTimeoutException; +import java.util.List; + +@QuarkusTest +public class HttpRedeliveryPredicateTest extends CamelQuarkusTestSupport { + @Inject + HttpRedeliveryPredicate httpRedeliveryPredicate; + + /** + * Tests that the redelivery predicate returns {@code true} for the status + * codes that we have defined that should trigger a redelivery. + */ + @Test + void testPositiveMatchStatusCodes() { + // List of statuses that should generate a positive match. + final List positiveStatuses = List.of( + HttpStatus.SC_TOO_MANY_REQUESTS, + HttpStatus.SC_INTERNAL_SERVER_ERROR, + HttpStatus.SC_NOT_IMPLEMENTED, + HttpStatus.SC_BAD_GATEWAY, + HttpStatus.SC_SERVICE_UNAVAILABLE, + HttpStatus.SC_GATEWAY_TIMEOUT, + HttpStatus.SC_HTTP_VERSION_NOT_SUPPORTED, + HttpStatus.SC_INSUFFICIENT_STORAGE + ); + + // For each status that should positively match, mock the exception and + // assert that the predicate gives the correct result. + for (final int status : positiveStatuses) { + final Exchange exchange = this.createExchangeWithBody(""); + + final HttpOperationFailedException exception = Mockito.mock(HttpOperationFailedException.class); + Mockito.when(exception.getStatusCode()).thenReturn(status); + + exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exception); + + // Call the predicate under test. + Assertions.assertTrue( + this.httpRedeliveryPredicate.matches(exchange), + String.format("the HTTP Redelivery Predicate should positively match for the status code '%s'", status) + ); + } + } + + /** + * Tests that the redelivery predicate returns {@code true} for when an + * IOException is thrown. + */ + @Test + void testPositiveMatchIOException() { + // Simulate a timeout exception. + final Exchange exchange = this.createExchangeWithBody(""); + + exchange.setProperty(Exchange.EXCEPTION_CAUGHT, new SocketTimeoutException()); + + // Call the predicate under test. + Assertions.assertTrue( + this.httpRedeliveryPredicate.matches(exchange), + "the HTTP Redelivery Predicate should positively match for a timeout" + ); + } + + /** + * Tests that the redelivery predicate returns {@code false} for the status + * codes that we have identified as the ones that we do not need to retry + * our requests for. + */ + @Test + void testNegativeMatches() { + final List negativeStatuses = List.of( + HttpStatus.SC_CONTINUE, + HttpStatus.SC_SWITCHING_PROTOCOLS, + HttpStatus.SC_PROCESSING, + HttpStatus.SC_OK, + HttpStatus.SC_CREATED, + HttpStatus.SC_ACCEPTED, + HttpStatus.SC_NON_AUTHORITATIVE_INFORMATION, + HttpStatus.SC_NO_CONTENT, + HttpStatus.SC_RESET_CONTENT, + HttpStatus.SC_PARTIAL_CONTENT, + HttpStatus.SC_MULTI_STATUS, + HttpStatus.SC_MULTIPLE_CHOICES, + HttpStatus.SC_MOVED_PERMANENTLY, + HttpStatus.SC_MOVED_TEMPORARILY, + HttpStatus.SC_SEE_OTHER, + HttpStatus.SC_NOT_MODIFIED, + HttpStatus.SC_USE_PROXY, + HttpStatus.SC_TEMPORARY_REDIRECT, + HttpStatus.SC_BAD_REQUEST, + HttpStatus.SC_UNAUTHORIZED, + HttpStatus.SC_PAYMENT_REQUIRED, + HttpStatus.SC_FORBIDDEN, + HttpStatus.SC_NOT_FOUND, + HttpStatus.SC_METHOD_NOT_ALLOWED, + HttpStatus.SC_NOT_ACCEPTABLE, + HttpStatus.SC_PROXY_AUTHENTICATION_REQUIRED, + HttpStatus.SC_REQUEST_TIMEOUT, + HttpStatus.SC_CONFLICT, + HttpStatus.SC_GONE, + HttpStatus.SC_LENGTH_REQUIRED, + HttpStatus.SC_PRECONDITION_FAILED, + HttpStatus.SC_REQUEST_TOO_LONG, + HttpStatus.SC_REQUEST_URI_TOO_LONG, + HttpStatus.SC_UNSUPPORTED_MEDIA_TYPE, + HttpStatus.SC_REQUESTED_RANGE_NOT_SATISFIABLE, + HttpStatus.SC_EXPECTATION_FAILED, + HttpStatus.SC_INSUFFICIENT_SPACE_ON_RESOURCE, + HttpStatus.SC_METHOD_FAILURE, + HttpStatus.SC_UNPROCESSABLE_ENTITY, + HttpStatus.SC_LOCKED, + HttpStatus.SC_FAILED_DEPENDENCY + ); + + // For each status that should negatively match, mock the exception and + // assert that the predicate gives the correct result. + for (final int status : negativeStatuses) { + final Exchange exchange = this.createExchangeWithBody(""); + + final HttpOperationFailedException exception = Mockito.mock(HttpOperationFailedException.class); + Mockito.when(exception.getStatusCode()).thenReturn(status); + + exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exception); + + // Call the predicate under test. + Assertions.assertFalse( + this.httpRedeliveryPredicate.matches(exchange), + String.format("the HTTP Redelivery Predicate should negatively match for the status code '%s'", status) + ); + } + } + + /** + * Tests that for any exception that isn't the ones that we have identified + * that should be potential positive matches, the predicate under test + * returns {@code false}. + */ + @Test + void testDifferentExceptionNegativeMatches() { + final List exceptions = List.of( + Mockito.mock(ArithmeticException.class), + Mockito.mock(RuntimeException.class), + Mockito.mock(IllegalStateException.class) + ); + + // For each exception make sure that the predicate under test returns + // a negative match. + for (final Exception exception : exceptions) { + final Exchange exchange = this.createExchangeWithBody(""); + exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exception); + + // Call the predicate under test. + Assertions.assertFalse( + this.httpRedeliveryPredicate.matches(exchange), + String.format("the HTTP Redelivery Predicate should negatively match for the exception '%s'", exception) + ); + } + } +} diff --git a/connector-common-http/src/test/resources/application.properties b/connector-common-http/src/test/resources/application.properties new file mode 100644 index 0000000000..54a5c64b38 --- /dev/null +++ b/connector-common-http/src/test/resources/application.properties @@ -0,0 +1,23 @@ +notifications.connector.kafka.incoming.group-id=notifications-connector-common-http +notifications.connector.kafka.incoming.topic=${mp.messaging.tocamel.topic} +notifications.connector.kafka.outgoing.topic=${mp.messaging.fromcamel.topic} +notifications.connector.name=common-http +notifications.connector.redelivery.counter-name=camel.common-http.retry.counter +notifications.connector.supported-connector-headers=${notifications.connector.name} + +quarkus.log.cloudwatch.enabled=false +quarkus.log.cloudwatch.level=INFO +quarkus.log.cloudwatch.log-stream-name=notifications-connector-common-http + +camel.component.kafka.brokers=localhost:9092 +camel.component.kafka.sasl-jaas-config="" +camel.component.kafka.sasl-mechanism=GSSAPI +camel.component.kafka.security-protocol=PLAINTEXT +camel.component.kafka.ssl-truststore-location= +camel.component.kafka.ssl-truststore-type=JKS +camel.component.kafka.retries=3 +camel.component.kafka.retry-backoff-ms=200 +camel.context.name=notifications-connector-common-http + +mp.messaging.tocamel.topic=platform.notifications.tocamel +mp.messaging.fromcamel.topic=platform.notifications.fromcamel diff --git a/connector-common/src/test/java/com/redhat/cloud/notifications/connector/ConnectorRoutesTest.java b/connector-common/src/test/java/com/redhat/cloud/notifications/connector/ConnectorRoutesTest.java index 208edf1df8..d619540d42 100644 --- a/connector-common/src/test/java/com/redhat/cloud/notifications/connector/ConnectorRoutesTest.java +++ b/connector-common/src/test/java/com/redhat/cloud/notifications/connector/ConnectorRoutesTest.java @@ -125,17 +125,26 @@ protected void testSuccessfulNotification() throws Exception { @Test protected void testFailedNotificationError500() throws Exception { mockRemoteServerError(500, "My custom internal error"); - testFailedNotification(); + + // We assume that the connector extends from the "HTTP Common" module, + // so in that case the "500" errors should trigger a redelivery. If + // the connector you are testing does not extend that module, you will + // have to override this test and set the expected redeliveries to + // zero. + testFailedNotification(this.connectorConfig.getRedeliveryMaxAttempts()); } @Test protected void testFailedNotificationError404() throws Exception { mockRemoteServerError(404, "Page not found"); - testFailedNotification(); + + // We expect the connector to not retry the notification since the + // mocked request returns a 404. + testFailedNotification(0); } - protected JsonObject testFailedNotification() throws Exception { + protected JsonObject testFailedNotification(final int maxRedeliveriesCount) throws Exception { mockKafkaSourceEndpoint(); // This is the entry point of the connector. MockEndpoint kafkaSinkMockEndpoint = mockKafkaSinkEndpoint(); // This is where the return message to the engine is sent. @@ -154,7 +163,7 @@ protected JsonObject testFailedNotification() throws Exception { } checkRouteMetrics(SUCCESS, 0, 0, 0); checkRouteMetrics(CONNECTOR_TO_ENGINE, 0, 1, 1); - micrometerAssertionHelper.assertCounterIncrement(connectorConfig.getRedeliveryCounterName(), 0); + micrometerAssertionHelper.assertCounterIncrement(connectorConfig.getRedeliveryCounterName(), maxRedeliveriesCount); return outcomingPayload; } @@ -176,7 +185,7 @@ void testRedeliveredNotification() throws Exception { checkRouteMetrics(connectorConfig.getConnectorName(), 1, 1, 1); checkRouteMetrics(SUCCESS, 0, 0, 0); checkRouteMetrics(CONNECTOR_TO_ENGINE, 0, 1, 1); - micrometerAssertionHelper.assertCounterIncrement(connectorConfig.getRedeliveryCounterName(), 2); + micrometerAssertionHelper.assertCounterIncrement(connectorConfig.getRedeliveryCounterName(), this.connectorConfig.getRedeliveryMaxAttempts()); } protected void saveRoutesMetrics(String... routeIds) { diff --git a/connector-drawer/src/test/java/com/redhat/cloud/notifications/connector/drawer/DrawerConnectorRoutesTest.java b/connector-drawer/src/test/java/com/redhat/cloud/notifications/connector/drawer/DrawerConnectorRoutesTest.java index f46c7de44d..42be90e2ec 100644 --- a/connector-drawer/src/test/java/com/redhat/cloud/notifications/connector/drawer/DrawerConnectorRoutesTest.java +++ b/connector-drawer/src/test/java/com/redhat/cloud/notifications/connector/drawer/DrawerConnectorRoutesTest.java @@ -156,14 +156,16 @@ void testFailureNotification() throws Exception { getMockHttpRequest("/internal/recipients-resolver", verifyEmptyRequest); - JsonObject jsonObject = testFailedNotification(); + // We expect the connector to retry the notifications since the mocked + // request returns a 500. + JsonObject jsonObject = testFailedNotification(this.connectorConfig.getRedeliveryMaxAttempts()); JsonObject data = new JsonObject(jsonObject.getString("data")); JsonArray recipientsList = data.getJsonObject("details").getJsonArray(ExchangeProperty.RESOLVED_RECIPIENT_LIST); assertNull(recipientsList); } @Override - protected JsonObject testFailedNotification() throws Exception { + protected JsonObject testFailedNotification(final int maxExpectedRedeliveries) throws Exception { mockKafkaSourceEndpoint(); // This is the entry point of the connector. MockEndpoint kafkaSinkMockEndpoint = mockKafkaSinkEndpoint(); // This is where the return message to the engine is sent. @@ -182,7 +184,7 @@ protected JsonObject testFailedNotification() throws Exception { } checkRouteMetrics(SUCCESS, 0, 0, 0); checkRouteMetrics(CONNECTOR_TO_ENGINE, 0, 1, 1); - micrometerAssertionHelper.assertCounterIncrement(connectorConfig.getRedeliveryCounterName(), 0); + micrometerAssertionHelper.assertCounterIncrement(connectorConfig.getRedeliveryCounterName(), maxExpectedRedeliveries); return outcomingPayload; } diff --git a/connector-slack/src/test/java/com/redhat/cloud/notifications/connector/slack/SlackConnectorRoutesTest.java b/connector-slack/src/test/java/com/redhat/cloud/notifications/connector/slack/SlackConnectorRoutesTest.java index a370ad1768..c239d1a87e 100644 --- a/connector-slack/src/test/java/com/redhat/cloud/notifications/connector/slack/SlackConnectorRoutesTest.java +++ b/connector-slack/src/test/java/com/redhat/cloud/notifications/connector/slack/SlackConnectorRoutesTest.java @@ -82,6 +82,16 @@ void test404ChannelNotFound() throws Exception { micrometerAssertionHelper.assertCounterIncrement(connectorConfig.getRedeliveryCounterName(), 0); } + @Override + @Test + protected void testFailedNotificationError500() throws Exception { + mockRemoteServerError(500, "My custom internal error"); + + // We do not expect any redeliveries because the Slack connector does + // not depend on the "HTTP common" module. + testFailedNotification(0); + } + private void mock404ChannelNotFound() { getClient() .when(request().withMethod("POST")) diff --git a/connector-webhook/src/test/java/com/redhat/cloud/notifications/connector/webhook/WebhookConnectorRoutesTest.java b/connector-webhook/src/test/java/com/redhat/cloud/notifications/connector/webhook/WebhookConnectorRoutesTest.java index fd535b7116..d8ec2188bc 100644 --- a/connector-webhook/src/test/java/com/redhat/cloud/notifications/connector/webhook/WebhookConnectorRoutesTest.java +++ b/connector-webhook/src/test/java/com/redhat/cloud/notifications/connector/webhook/WebhookConnectorRoutesTest.java @@ -116,17 +116,18 @@ protected Predicate checkOutgoingPayload(JsonObject incomingPayload) { @Test protected void testFailedNotificationError500() throws Exception { - testFailedNotificationAndReturnedFlagsToEngine(500, "My custom internal error", INCREMENT_ENDPOINT_SERVER_ERRORS); + // We expect the connector to attempt redeliveries for the error. + testFailedNotificationAndReturnedFlagsToEngine(500, "My custom internal error", INCREMENT_ENDPOINT_SERVER_ERRORS, this.connectorConfig.getRedeliveryMaxAttempts()); } @Test protected void testFailedNotificationError404() throws Exception { - testFailedNotificationAndReturnedFlagsToEngine(404, "Page not found", DISABLE_ENDPOINT_CLIENT_ERRORS); + testFailedNotificationAndReturnedFlagsToEngine(404, "Page not found", DISABLE_ENDPOINT_CLIENT_ERRORS, 0); } - private void testFailedNotificationAndReturnedFlagsToEngine(int httpReturnCode, String returnedBodyMessage, String flagNameThatShouldBeTrue) throws Exception { + private void testFailedNotificationAndReturnedFlagsToEngine(int httpReturnCode, String returnedBodyMessage, String flagNameThatShouldBeTrue, final int expectedRedeliveriesCount) throws Exception { mockRemoteServerError(httpReturnCode, returnedBodyMessage); - JsonObject returnToEngine = super.testFailedNotification(); + JsonObject returnToEngine = super.testFailedNotification(expectedRedeliveriesCount); JsonObject data = new JsonObject(returnToEngine.getString("data")); assertTrue(data.getBoolean(flagNameThatShouldBeTrue)); JsonObject details = data.getJsonObject("details");