From 33b363112fefc919d7f86feaac8cffe1e3569729 Mon Sep 17 00:00:00 2001 From: Gwenneg Lepage Date: Tue, 19 Mar 2024 17:08:23 +0100 Subject: [PATCH] Replay skipped notifications (#2568) --- .../routers/engine/ReplayService.java | 15 ++++++ .../routers/internal/InternalResource.java | 12 +++++ .../events/EndpointProcessor.java | 14 +++-- .../notifications/routers/ReplayResource.java | 52 +++++++++++++++++++ .../routers/ReplayResourceTest.java | 24 +++++++++ 5 files changed, 113 insertions(+), 4 deletions(-) create mode 100644 backend/src/main/java/com/redhat/cloud/notifications/routers/engine/ReplayService.java create mode 100644 engine/src/main/java/com/redhat/cloud/notifications/routers/ReplayResource.java create mode 100644 engine/src/test/java/com/redhat/cloud/notifications/routers/ReplayResourceTest.java diff --git a/backend/src/main/java/com/redhat/cloud/notifications/routers/engine/ReplayService.java b/backend/src/main/java/com/redhat/cloud/notifications/routers/engine/ReplayService.java new file mode 100644 index 0000000000..5c70568125 --- /dev/null +++ b/backend/src/main/java/com/redhat/cloud/notifications/routers/engine/ReplayService.java @@ -0,0 +1,15 @@ +package com.redhat.cloud.notifications.routers.engine; + +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import org.eclipse.microprofile.rest.client.inject.RegisterRestClient; + +import static com.redhat.cloud.notifications.Constants.API_INTERNAL; + +@RegisterRestClient(configKey = "internal-engine") +public interface ReplayService { + + @Path(API_INTERNAL + "/replay") + @POST + void replay(); +} diff --git a/backend/src/main/java/com/redhat/cloud/notifications/routers/internal/InternalResource.java b/backend/src/main/java/com/redhat/cloud/notifications/routers/internal/InternalResource.java index f2e359e751..2bb5d48ba6 100644 --- a/backend/src/main/java/com/redhat/cloud/notifications/routers/internal/InternalResource.java +++ b/backend/src/main/java/com/redhat/cloud/notifications/routers/internal/InternalResource.java @@ -25,6 +25,7 @@ import com.redhat.cloud.notifications.routers.SecurityContextUtil; import com.redhat.cloud.notifications.routers.dailydigest.TriggerDailyDigestRequest; import com.redhat.cloud.notifications.routers.engine.DailyDigestService; +import com.redhat.cloud.notifications.routers.engine.ReplayService; import com.redhat.cloud.notifications.routers.internal.models.AddApplicationRequest; import com.redhat.cloud.notifications.routers.internal.models.RequestDefaultBehaviorGroupPropertyList; import com.redhat.cloud.notifications.routers.internal.models.ServerInfo; @@ -127,6 +128,10 @@ public class InternalResource { @Inject SubscriptionRepository subscriptionRepository; + @Inject + @RestClient + ReplayService replayService; + // This endpoint is used during the IQE tests to determine which version of the code is tested. @GET @Path("/version") @@ -142,6 +147,13 @@ public String getVersion() { } } + @POST + @Path("/replay") + @RolesAllowed(RBAC_INTERNAL_ADMIN) + public void replay() { + replayService.replay(); + } + @GET @Path("/serverInfo") @Produces(APPLICATION_JSON) diff --git a/engine/src/main/java/com/redhat/cloud/notifications/events/EndpointProcessor.java b/engine/src/main/java/com/redhat/cloud/notifications/events/EndpointProcessor.java index a581a39433..e0961955b5 100644 --- a/engine/src/main/java/com/redhat/cloud/notifications/events/EndpointProcessor.java +++ b/engine/src/main/java/com/redhat/cloud/notifications/events/EndpointProcessor.java @@ -80,6 +80,10 @@ void init() { } public void process(Event event) { + process(event, false); + } + + public void process(Event event, boolean skipEmails) { processedItems.increment(); final List endpoints; if (TestEventHelper.isIntegrationTestEvent(event)) { @@ -123,10 +127,12 @@ public void process(Event event) { } break; case EMAIL_SUBSCRIPTION: - if (isAggregatorEvent(event)) { - emailProcessor.processAggregation(event); - } else { - emailConnectorProcessor.process(event, endpointsByTypeEntry.getValue()); + if (!skipEmails) { + if (isAggregatorEvent(event)) { + emailProcessor.processAggregation(event); + } else { + emailConnectorProcessor.process(event, endpointsByTypeEntry.getValue()); + } } break; case WEBHOOK: diff --git a/engine/src/main/java/com/redhat/cloud/notifications/routers/ReplayResource.java b/engine/src/main/java/com/redhat/cloud/notifications/routers/ReplayResource.java new file mode 100644 index 0000000000..56022dd485 --- /dev/null +++ b/engine/src/main/java/com/redhat/cloud/notifications/routers/ReplayResource.java @@ -0,0 +1,52 @@ +package com.redhat.cloud.notifications.routers; + +import com.redhat.cloud.notifications.events.EndpointProcessor; +import com.redhat.cloud.notifications.models.Event; +import io.quarkus.logging.Log; +import jakarta.inject.Inject; +import jakarta.persistence.EntityManager; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; + +import java.time.LocalDateTime; +import java.util.List; + +import static com.redhat.cloud.notifications.Constants.API_INTERNAL; +import static java.time.Month.MARCH; + +@Path(API_INTERNAL + "/replay") +public class ReplayResource { + + private static final int MAX_RESULTS = 100; + + @Inject + EntityManager entityManager; + + @Inject + EndpointProcessor endpointProcessor; + + public List getEvents(int firstResult, int maxResults) { + String query = "FROM Event WHERE created > :start AND created <= :end"; + return entityManager.createQuery(query, Event.class) + .setParameter("start", LocalDateTime.of(2024, MARCH, 19, 9, 13)) + .setParameter("end", LocalDateTime.of(2024, MARCH, 19, 14, 23)) + .setFirstResult(firstResult) + .setMaxResults(maxResults) + .getResultList(); + } + + @POST + public void replay() { + Log.info("Replay endpoint was called"); + int firstResult = 0; + List events; + do { + Log.infof("Processing events from index %d", firstResult); + events = getEvents(firstResult, MAX_RESULTS); + firstResult += MAX_RESULTS; + for (Event event : events) { + endpointProcessor.process(event, true); + } + } while (MAX_RESULTS == events.size()); + } +} diff --git a/engine/src/test/java/com/redhat/cloud/notifications/routers/ReplayResourceTest.java b/engine/src/test/java/com/redhat/cloud/notifications/routers/ReplayResourceTest.java new file mode 100644 index 0000000000..7fc2997de3 --- /dev/null +++ b/engine/src/test/java/com/redhat/cloud/notifications/routers/ReplayResourceTest.java @@ -0,0 +1,24 @@ +package com.redhat.cloud.notifications.routers; + +import com.redhat.cloud.notifications.TestLifecycleManager; +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; +import org.junit.jupiter.api.Test; + +import static com.redhat.cloud.notifications.Constants.API_INTERNAL; +import static io.restassured.RestAssured.given; + +@QuarkusTest +@QuarkusTestResource(TestLifecycleManager.class) +public class ReplayResourceTest { + + @Test + void testReplay() { + given() + .basePath(API_INTERNAL) + .when() + .post("/replay") + .then() + .statusCode(204); + } +}