Skip to content

Commit

Permalink
RHCLOUD-28805 | feature: tune redelivery conditions for the HTTP conn…
Browse files Browse the repository at this point in the history
…ectors (RedHatInsights#2362)

* feature: tune redelivery conditions for the HTTP connectors

With certain responses from the external services, we want the HTTP
connectors to automatically retry the redelivery.

RHCLOUD-28805

* Apply suggestions from code review

* Fix Sonar check

---------

Co-authored-by: Gwenneg Lepage <[email protected]>
  • Loading branch information
MikelAlejoBR and gwenneg authored Nov 30, 2023
1 parent 55f407d commit 13a4440
Show file tree
Hide file tree
Showing 8 changed files with 275 additions and 12 deletions.
11 changes: 11 additions & 0 deletions connector-common-http/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,17 @@
<artifactId>camel-quarkus-http</artifactId>
</dependency>

<!-- Test -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5-mockito</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.redhat.cloud.notifications.connector.http;

import com.redhat.cloud.notifications.connector.RedeliveryPredicate;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import org.apache.camel.Exchange;
import org.apache.camel.http.base.HttpOperationFailedException;
import org.apache.http.HttpStatus;

import java.io.IOException;

import static org.apache.camel.Exchange.EXCEPTION_CAUGHT;

@ApplicationScoped
public class HttpRedeliveryPredicate extends RedeliveryPredicate {
/**
* Check if we should attempt a redelivery.
* @param exchange the exchange that contains the generated exception.
* @return {@code true} if we received a "429 Too Many Requests" or a 5xx
* status code, or if the delivery failed because of a timeout.
*/
@Override
public boolean matches(final Exchange exchange) {
final Throwable t = exchange.getProperty(EXCEPTION_CAUGHT, Throwable.class);

if (t instanceof HttpOperationFailedException e) {
final boolean shouldRetry = e.getStatusCode() >= 500 || e.getStatusCode() == HttpStatus.SC_TOO_MANY_REQUESTS;

Log.debugf("The HTTP request failed with status code '%s' and body '%s'. %s", e.getStatusCode(), e.getResponseBody(), (shouldRetry) ? "Retrying..." : "Not retrying");

return shouldRetry;
}

return t instanceof IOException;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package com.redhat.cloud.notifications.connector.http;

import io.quarkus.test.junit.QuarkusTest;
import jakarta.inject.Inject;
import org.apache.camel.Exchange;
import org.apache.camel.http.base.HttpOperationFailedException;
import org.apache.camel.quarkus.test.CamelQuarkusTestSupport;
import org.apache.http.HttpStatus;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.net.SocketTimeoutException;
import java.util.List;

@QuarkusTest
public class HttpRedeliveryPredicateTest extends CamelQuarkusTestSupport {
@Inject
HttpRedeliveryPredicate httpRedeliveryPredicate;

/**
* Tests that the redelivery predicate returns {@code true} for the status
* codes that we have defined that should trigger a redelivery.
*/
@Test
void testPositiveMatchStatusCodes() {
// List of statuses that should generate a positive match.
final List<Integer> positiveStatuses = List.of(
HttpStatus.SC_TOO_MANY_REQUESTS,
HttpStatus.SC_INTERNAL_SERVER_ERROR,
HttpStatus.SC_NOT_IMPLEMENTED,
HttpStatus.SC_BAD_GATEWAY,
HttpStatus.SC_SERVICE_UNAVAILABLE,
HttpStatus.SC_GATEWAY_TIMEOUT,
HttpStatus.SC_HTTP_VERSION_NOT_SUPPORTED,
HttpStatus.SC_INSUFFICIENT_STORAGE
);

// For each status that should positively match, mock the exception and
// assert that the predicate gives the correct result.
for (final int status : positiveStatuses) {
final Exchange exchange = this.createExchangeWithBody("");

final HttpOperationFailedException exception = Mockito.mock(HttpOperationFailedException.class);
Mockito.when(exception.getStatusCode()).thenReturn(status);

exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exception);

// Call the predicate under test.
Assertions.assertTrue(
this.httpRedeliveryPredicate.matches(exchange),
String.format("the HTTP Redelivery Predicate should positively match for the status code '%s'", status)
);
}
}

/**
* Tests that the redelivery predicate returns {@code true} for when an
* IOException is thrown.
*/
@Test
void testPositiveMatchIOException() {
// Simulate a timeout exception.
final Exchange exchange = this.createExchangeWithBody("");

exchange.setProperty(Exchange.EXCEPTION_CAUGHT, new SocketTimeoutException());

// Call the predicate under test.
Assertions.assertTrue(
this.httpRedeliveryPredicate.matches(exchange),
"the HTTP Redelivery Predicate should positively match for a timeout"
);
}

/**
* Tests that the redelivery predicate returns {@code false} for the status
* codes that we have identified as the ones that we do not need to retry
* our requests for.
*/
@Test
void testNegativeMatches() {
final List<Integer> negativeStatuses = List.of(
HttpStatus.SC_CONTINUE,
HttpStatus.SC_SWITCHING_PROTOCOLS,
HttpStatus.SC_PROCESSING,
HttpStatus.SC_OK,
HttpStatus.SC_CREATED,
HttpStatus.SC_ACCEPTED,
HttpStatus.SC_NON_AUTHORITATIVE_INFORMATION,
HttpStatus.SC_NO_CONTENT,
HttpStatus.SC_RESET_CONTENT,
HttpStatus.SC_PARTIAL_CONTENT,
HttpStatus.SC_MULTI_STATUS,
HttpStatus.SC_MULTIPLE_CHOICES,
HttpStatus.SC_MOVED_PERMANENTLY,
HttpStatus.SC_MOVED_TEMPORARILY,
HttpStatus.SC_SEE_OTHER,
HttpStatus.SC_NOT_MODIFIED,
HttpStatus.SC_USE_PROXY,
HttpStatus.SC_TEMPORARY_REDIRECT,
HttpStatus.SC_BAD_REQUEST,
HttpStatus.SC_UNAUTHORIZED,
HttpStatus.SC_PAYMENT_REQUIRED,
HttpStatus.SC_FORBIDDEN,
HttpStatus.SC_NOT_FOUND,
HttpStatus.SC_METHOD_NOT_ALLOWED,
HttpStatus.SC_NOT_ACCEPTABLE,
HttpStatus.SC_PROXY_AUTHENTICATION_REQUIRED,
HttpStatus.SC_REQUEST_TIMEOUT,
HttpStatus.SC_CONFLICT,
HttpStatus.SC_GONE,
HttpStatus.SC_LENGTH_REQUIRED,
HttpStatus.SC_PRECONDITION_FAILED,
HttpStatus.SC_REQUEST_TOO_LONG,
HttpStatus.SC_REQUEST_URI_TOO_LONG,
HttpStatus.SC_UNSUPPORTED_MEDIA_TYPE,
HttpStatus.SC_REQUESTED_RANGE_NOT_SATISFIABLE,
HttpStatus.SC_EXPECTATION_FAILED,
HttpStatus.SC_INSUFFICIENT_SPACE_ON_RESOURCE,
HttpStatus.SC_METHOD_FAILURE,
HttpStatus.SC_UNPROCESSABLE_ENTITY,
HttpStatus.SC_LOCKED,
HttpStatus.SC_FAILED_DEPENDENCY
);

// For each status that should negatively match, mock the exception and
// assert that the predicate gives the correct result.
for (final int status : negativeStatuses) {
final Exchange exchange = this.createExchangeWithBody("");

final HttpOperationFailedException exception = Mockito.mock(HttpOperationFailedException.class);
Mockito.when(exception.getStatusCode()).thenReturn(status);

exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exception);

// Call the predicate under test.
Assertions.assertFalse(
this.httpRedeliveryPredicate.matches(exchange),
String.format("the HTTP Redelivery Predicate should negatively match for the status code '%s'", status)
);
}
}

/**
* Tests that for any exception that isn't the ones that we have identified
* that should be potential positive matches, the predicate under test
* returns {@code false}.
*/
@Test
void testDifferentExceptionNegativeMatches() {
final List<Exception> exceptions = List.of(
Mockito.mock(ArithmeticException.class),
Mockito.mock(RuntimeException.class),
Mockito.mock(IllegalStateException.class)
);

// For each exception make sure that the predicate under test returns
// a negative match.
for (final Exception exception : exceptions) {
final Exchange exchange = this.createExchangeWithBody("");
exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exception);

// Call the predicate under test.
Assertions.assertFalse(
this.httpRedeliveryPredicate.matches(exchange),
String.format("the HTTP Redelivery Predicate should negatively match for the exception '%s'", exception)
);
}
}
}
23 changes: 23 additions & 0 deletions connector-common-http/src/test/resources/application.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
notifications.connector.kafka.incoming.group-id=notifications-connector-common-http
notifications.connector.kafka.incoming.topic=${mp.messaging.tocamel.topic}
notifications.connector.kafka.outgoing.topic=${mp.messaging.fromcamel.topic}
notifications.connector.name=common-http
notifications.connector.redelivery.counter-name=camel.common-http.retry.counter
notifications.connector.supported-connector-headers=${notifications.connector.name}

quarkus.log.cloudwatch.enabled=false
quarkus.log.cloudwatch.level=INFO
quarkus.log.cloudwatch.log-stream-name=notifications-connector-common-http

camel.component.kafka.brokers=localhost:9092
camel.component.kafka.sasl-jaas-config=""
camel.component.kafka.sasl-mechanism=GSSAPI
camel.component.kafka.security-protocol=PLAINTEXT
camel.component.kafka.ssl-truststore-location=
camel.component.kafka.ssl-truststore-type=JKS
camel.component.kafka.retries=3
camel.component.kafka.retry-backoff-ms=200
camel.context.name=notifications-connector-common-http

mp.messaging.tocamel.topic=platform.notifications.tocamel
mp.messaging.fromcamel.topic=platform.notifications.fromcamel
Original file line number Diff line number Diff line change
Expand Up @@ -125,17 +125,26 @@ protected void testSuccessfulNotification() throws Exception {
@Test
protected void testFailedNotificationError500() throws Exception {
mockRemoteServerError(500, "My custom internal error");
testFailedNotification();

// We assume that the connector extends from the "HTTP Common" module,
// so in that case the "500" errors should trigger a redelivery. If
// the connector you are testing does not extend that module, you will
// have to override this test and set the expected redeliveries to
// zero.
testFailedNotification(this.connectorConfig.getRedeliveryMaxAttempts());
}

@Test
protected void testFailedNotificationError404() throws Exception {
mockRemoteServerError(404, "Page not found");
testFailedNotification();

// We expect the connector to not retry the notification since the
// mocked request returns a 404.
testFailedNotification(0);
}


protected JsonObject testFailedNotification() throws Exception {
protected JsonObject testFailedNotification(final int maxRedeliveriesCount) throws Exception {

mockKafkaSourceEndpoint(); // This is the entry point of the connector.
MockEndpoint kafkaSinkMockEndpoint = mockKafkaSinkEndpoint(); // This is where the return message to the engine is sent.
Expand All @@ -154,7 +163,7 @@ protected JsonObject testFailedNotification() throws Exception {
}
checkRouteMetrics(SUCCESS, 0, 0, 0);
checkRouteMetrics(CONNECTOR_TO_ENGINE, 0, 1, 1);
micrometerAssertionHelper.assertCounterIncrement(connectorConfig.getRedeliveryCounterName(), 0);
micrometerAssertionHelper.assertCounterIncrement(connectorConfig.getRedeliveryCounterName(), maxRedeliveriesCount);
return outcomingPayload;
}

Expand All @@ -176,7 +185,7 @@ void testRedeliveredNotification() throws Exception {
checkRouteMetrics(connectorConfig.getConnectorName(), 1, 1, 1);
checkRouteMetrics(SUCCESS, 0, 0, 0);
checkRouteMetrics(CONNECTOR_TO_ENGINE, 0, 1, 1);
micrometerAssertionHelper.assertCounterIncrement(connectorConfig.getRedeliveryCounterName(), 2);
micrometerAssertionHelper.assertCounterIncrement(connectorConfig.getRedeliveryCounterName(), this.connectorConfig.getRedeliveryMaxAttempts());
}

protected void saveRoutesMetrics(String... routeIds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,16 @@ void testFailureNotification() throws Exception {

getMockHttpRequest("/internal/recipients-resolver", verifyEmptyRequest);

JsonObject jsonObject = testFailedNotification();
// We expect the connector to retry the notifications since the mocked
// request returns a 500.
JsonObject jsonObject = testFailedNotification(this.connectorConfig.getRedeliveryMaxAttempts());
JsonObject data = new JsonObject(jsonObject.getString("data"));
JsonArray recipientsList = data.getJsonObject("details").getJsonArray(ExchangeProperty.RESOLVED_RECIPIENT_LIST);
assertNull(recipientsList);
}

@Override
protected JsonObject testFailedNotification() throws Exception {
protected JsonObject testFailedNotification(final int maxExpectedRedeliveries) throws Exception {

mockKafkaSourceEndpoint(); // This is the entry point of the connector.
MockEndpoint kafkaSinkMockEndpoint = mockKafkaSinkEndpoint(); // This is where the return message to the engine is sent.
Expand All @@ -182,7 +184,7 @@ protected JsonObject testFailedNotification() throws Exception {
}
checkRouteMetrics(SUCCESS, 0, 0, 0);
checkRouteMetrics(CONNECTOR_TO_ENGINE, 0, 1, 1);
micrometerAssertionHelper.assertCounterIncrement(connectorConfig.getRedeliveryCounterName(), 0);
micrometerAssertionHelper.assertCounterIncrement(connectorConfig.getRedeliveryCounterName(), maxExpectedRedeliveries);
return outcomingPayload;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,16 @@ void test404ChannelNotFound() throws Exception {
micrometerAssertionHelper.assertCounterIncrement(connectorConfig.getRedeliveryCounterName(), 0);
}

@Override
@Test
protected void testFailedNotificationError500() throws Exception {
mockRemoteServerError(500, "My custom internal error");

// We do not expect any redeliveries because the Slack connector does
// not depend on the "HTTP common" module.
testFailedNotification(0);
}

private void mock404ChannelNotFound() {
getClient()
.when(request().withMethod("POST"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,17 +116,18 @@ protected Predicate checkOutgoingPayload(JsonObject incomingPayload) {

@Test
protected void testFailedNotificationError500() throws Exception {
testFailedNotificationAndReturnedFlagsToEngine(500, "My custom internal error", INCREMENT_ENDPOINT_SERVER_ERRORS);
// We expect the connector to attempt redeliveries for the error.
testFailedNotificationAndReturnedFlagsToEngine(500, "My custom internal error", INCREMENT_ENDPOINT_SERVER_ERRORS, this.connectorConfig.getRedeliveryMaxAttempts());
}

@Test
protected void testFailedNotificationError404() throws Exception {
testFailedNotificationAndReturnedFlagsToEngine(404, "Page not found", DISABLE_ENDPOINT_CLIENT_ERRORS);
testFailedNotificationAndReturnedFlagsToEngine(404, "Page not found", DISABLE_ENDPOINT_CLIENT_ERRORS, 0);
}

private void testFailedNotificationAndReturnedFlagsToEngine(int httpReturnCode, String returnedBodyMessage, String flagNameThatShouldBeTrue) throws Exception {
private void testFailedNotificationAndReturnedFlagsToEngine(int httpReturnCode, String returnedBodyMessage, String flagNameThatShouldBeTrue, final int expectedRedeliveriesCount) throws Exception {
mockRemoteServerError(httpReturnCode, returnedBodyMessage);
JsonObject returnToEngine = super.testFailedNotification();
JsonObject returnToEngine = super.testFailedNotification(expectedRedeliveriesCount);
JsonObject data = new JsonObject(returnToEngine.getString("data"));
assertTrue(data.getBoolean(flagNameThatShouldBeTrue));
JsonObject details = data.getJsonObject("details");
Expand Down

0 comments on commit 13a4440

Please sign in to comment.