Skip to content

Commit

Permalink
Replay skipped notifications (RedHatInsights#2568)
Browse files Browse the repository at this point in the history
  • Loading branch information
gwenneg authored Mar 19, 2024
1 parent a285a8a commit 33b3631
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.redhat.cloud.notifications.routers.engine;

import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import org.eclipse.microprofile.rest.client.inject.RegisterRestClient;

import static com.redhat.cloud.notifications.Constants.API_INTERNAL;

@RegisterRestClient(configKey = "internal-engine")
public interface ReplayService {

@Path(API_INTERNAL + "/replay")
@POST
void replay();
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.redhat.cloud.notifications.routers.SecurityContextUtil;
import com.redhat.cloud.notifications.routers.dailydigest.TriggerDailyDigestRequest;
import com.redhat.cloud.notifications.routers.engine.DailyDigestService;
import com.redhat.cloud.notifications.routers.engine.ReplayService;
import com.redhat.cloud.notifications.routers.internal.models.AddApplicationRequest;
import com.redhat.cloud.notifications.routers.internal.models.RequestDefaultBehaviorGroupPropertyList;
import com.redhat.cloud.notifications.routers.internal.models.ServerInfo;
Expand Down Expand Up @@ -127,6 +128,10 @@ public class InternalResource {
@Inject
SubscriptionRepository subscriptionRepository;

@Inject
@RestClient
ReplayService replayService;

// This endpoint is used during the IQE tests to determine which version of the code is tested.
@GET
@Path("/version")
Expand All @@ -142,6 +147,13 @@ public String getVersion() {
}
}

@POST
@Path("/replay")
@RolesAllowed(RBAC_INTERNAL_ADMIN)
public void replay() {
replayService.replay();
}

@GET
@Path("/serverInfo")
@Produces(APPLICATION_JSON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ void init() {
}

public void process(Event event) {
process(event, false);
}

public void process(Event event, boolean skipEmails) {
processedItems.increment();
final List<Endpoint> endpoints;
if (TestEventHelper.isIntegrationTestEvent(event)) {
Expand Down Expand Up @@ -123,10 +127,12 @@ public void process(Event event) {
}
break;
case EMAIL_SUBSCRIPTION:
if (isAggregatorEvent(event)) {
emailProcessor.processAggregation(event);
} else {
emailConnectorProcessor.process(event, endpointsByTypeEntry.getValue());
if (!skipEmails) {
if (isAggregatorEvent(event)) {
emailProcessor.processAggregation(event);
} else {
emailConnectorProcessor.process(event, endpointsByTypeEntry.getValue());
}
}
break;
case WEBHOOK:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.redhat.cloud.notifications.routers;

import com.redhat.cloud.notifications.events.EndpointProcessor;
import com.redhat.cloud.notifications.models.Event;
import io.quarkus.logging.Log;
import jakarta.inject.Inject;
import jakarta.persistence.EntityManager;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;

import java.time.LocalDateTime;
import java.util.List;

import static com.redhat.cloud.notifications.Constants.API_INTERNAL;
import static java.time.Month.MARCH;

@Path(API_INTERNAL + "/replay")
public class ReplayResource {

private static final int MAX_RESULTS = 100;

@Inject
EntityManager entityManager;

@Inject
EndpointProcessor endpointProcessor;

public List<Event> getEvents(int firstResult, int maxResults) {
String query = "FROM Event WHERE created > :start AND created <= :end";
return entityManager.createQuery(query, Event.class)
.setParameter("start", LocalDateTime.of(2024, MARCH, 19, 9, 13))
.setParameter("end", LocalDateTime.of(2024, MARCH, 19, 14, 23))
.setFirstResult(firstResult)
.setMaxResults(maxResults)
.getResultList();
}

@POST
public void replay() {
Log.info("Replay endpoint was called");
int firstResult = 0;
List<Event> events;
do {
Log.infof("Processing events from index %d", firstResult);
events = getEvents(firstResult, MAX_RESULTS);
firstResult += MAX_RESULTS;
for (Event event : events) {
endpointProcessor.process(event, true);
}
} while (MAX_RESULTS == events.size());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.redhat.cloud.notifications.routers;

import com.redhat.cloud.notifications.TestLifecycleManager;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import org.junit.jupiter.api.Test;

import static com.redhat.cloud.notifications.Constants.API_INTERNAL;
import static io.restassured.RestAssured.given;

@QuarkusTest
@QuarkusTestResource(TestLifecycleManager.class)
public class ReplayResourceTest {

@Test
void testReplay() {
given()
.basePath(API_INTERNAL)
.when()
.post("/replay")
.then()
.statusCode(204);
}
}

0 comments on commit 33b3631

Please sign in to comment.