From 0a924cbd5d795bba51f9ea0d595e740fdaf89a82 Mon Sep 17 00:00:00 2001 From: Gwenneg Lepage Date: Wed, 11 Oct 2023 21:38:24 +0200 Subject: [PATCH] Run the aggregation process from a worker thread --- .../email/EmailSubscriptionTypeProcessor.java | 79 +++++++++++-------- .../EmailSubscriptionTypeProcessorTest.java | 5 +- 2 files changed, 48 insertions(+), 36 deletions(-) diff --git a/engine/src/main/java/com/redhat/cloud/notifications/processors/email/EmailSubscriptionTypeProcessor.java b/engine/src/main/java/com/redhat/cloud/notifications/processors/email/EmailSubscriptionTypeProcessor.java index 5776e4a125..c185871d85 100644 --- a/engine/src/main/java/com/redhat/cloud/notifications/processors/email/EmailSubscriptionTypeProcessor.java +++ b/engine/src/main/java/com/redhat/cloud/notifications/processors/email/EmailSubscriptionTypeProcessor.java @@ -44,6 +44,7 @@ import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.control.ActivateRequestContext; import jakarta.inject.Inject; +import org.eclipse.microprofile.context.ManagedExecutor; import org.eclipse.microprofile.reactive.messaging.Acknowledgment; import org.eclipse.microprofile.reactive.messaging.Incoming; @@ -115,6 +116,10 @@ public class EmailSubscriptionTypeProcessor extends SystemEndpointTypeProcessor @Inject EventRepository eventRepository; + // This executor is used to run a task asynchronously using a worker thread from a threads pool managed by Quarkus. + @Inject + ManagedExecutor managedExecutor; + private Counter rejectedAggregationCommandCount; private Counter processedAggregationCommandCount; private Counter failedAggregationCommandCount; @@ -194,45 +199,51 @@ private void sendEmail(Event event, Set endpoints) { } public void processAggregation(Event event) { + /* + * The aggregation process is long-running task. To avoid blocking the thread used to consume + * Kafka messages from the ingress topic, we're performing the aggregation from a worker thread. + */ + managedExecutor.submit(() -> { - AggregationCommand aggregationCommand = null; - Timer.Sample consumedTimer = Timer.start(registry); + AggregationCommand aggregationCommand; + Timer.Sample consumedTimer = Timer.start(registry); - try { - Action action = actionParser.fromJsonString(event.getPayload()); - Map map = action.getEvents().get(0).getPayload().getAdditionalProperties(); - aggregationCommand = objectMapper.convertValue(map, AggregationCommand.class); - } catch (Exception e) { - Log.error("Kafka aggregation payload parsing failed for event " + event.getId(), e); - rejectedAggregationCommandCount.increment(); - return; - } + try { + Action action = actionParser.fromJsonString(event.getPayload()); + Map map = action.getEvents().get(0).getPayload().getAdditionalProperties(); + aggregationCommand = objectMapper.convertValue(map, AggregationCommand.class); + } catch (Exception e) { + Log.error("Kafka aggregation payload parsing failed for event " + event.getId(), e); + rejectedAggregationCommandCount.increment(); + return; + } - Log.infof("Processing received aggregation command: %s", aggregationCommand); - processedAggregationCommandCount.increment(); + Log.infof("Processing received aggregation command: %s", aggregationCommand); + processedAggregationCommandCount.increment(); - try { - Optional app = applicationRepository.getApplication(aggregationCommand.getAggregationKey().getBundle(), aggregationCommand.getAggregationKey().getApplication()); - if (app.isPresent()) { - event.setEventTypeDisplayName( - String.format("%s - %s - %s", - event.getEventTypeDisplayName(), - app.get().getDisplayName(), - app.get().getBundle().getDisplayName()) - ); - eventRepository.updateEventDisplayName(event); + try { + Optional app = applicationRepository.getApplication(aggregationCommand.getAggregationKey().getBundle(), aggregationCommand.getAggregationKey().getApplication()); + if (app.isPresent()) { + event.setEventTypeDisplayName( + String.format("%s - %s - %s", + event.getEventTypeDisplayName(), + app.get().getDisplayName(), + app.get().getBundle().getDisplayName()) + ); + eventRepository.updateEventDisplayName(event); + } + processAggregateEmailsByAggregationKey(aggregationCommand, Optional.of(event)); + } catch (Exception e) { + Log.warn("Error while processing aggregation", e); + failedAggregationCommandCount.increment(); + } finally { + consumedTimer.stop(registry.timer( + AGGREGATION_CONSUMED_TIMER_NAME, + TAG_KEY_BUNDLE, aggregationCommand.getAggregationKey().getBundle(), + TAG_KEY_APPLICATION, aggregationCommand.getAggregationKey().getApplication() + )); } - processAggregateEmailsByAggregationKey(aggregationCommand, Optional.of(event)); - } catch (Exception e) { - Log.warn("Error while processing aggregation", e); - failedAggregationCommandCount.increment(); - } finally { - consumedTimer.stop(registry.timer( - AGGREGATION_CONSUMED_TIMER_NAME, - TAG_KEY_BUNDLE, aggregationCommand.getAggregationKey().getBundle(), - TAG_KEY_APPLICATION, aggregationCommand.getAggregationKey().getApplication() - )); - } + }); } @Incoming(AGGREGATION_CHANNEL) diff --git a/engine/src/test/java/com/redhat/cloud/notifications/processors/email/EmailSubscriptionTypeProcessorTest.java b/engine/src/test/java/com/redhat/cloud/notifications/processors/email/EmailSubscriptionTypeProcessorTest.java index ac0f8682c5..0c9c5b9a4b 100644 --- a/engine/src/test/java/com/redhat/cloud/notifications/processors/email/EmailSubscriptionTypeProcessorTest.java +++ b/engine/src/test/java/com/redhat/cloud/notifications/processors/email/EmailSubscriptionTypeProcessorTest.java @@ -67,6 +67,7 @@ import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -207,7 +208,7 @@ void shouldSuccessfullySendTwoAggregatedEmails(String channel) { micrometerAssertionHelper.assertCounterIncrement(AGGREGATION_COMMAND_ERROR_COUNTER_NAME, 0); // Let's check that EndpointEmailSubscriptionResources#sendEmail was called for each aggregation. - verify(emailAggregationRepository, times(1)).getEmailAggregation( + verify(emailAggregationRepository, timeout(5000L).times(1)).getEmailAggregation( eq(aggregationCommand1.getAggregationKey()), eq(aggregationCommand1.getStart()), eq(aggregationCommand1.getEnd()), @@ -219,7 +220,7 @@ void shouldSuccessfullySendTwoAggregatedEmails(String channel) { eq(aggregationCommand1.getAggregationKey()), eq(aggregationCommand1.getEnd()) ); - verify(emailAggregationRepository, times(1)).getEmailAggregation( + verify(emailAggregationRepository, timeout(5000L).times(1)).getEmailAggregation( eq(aggregationCommand2.getAggregationKey()), eq(aggregationCommand2.getStart()), eq(aggregationCommand2.getEnd()),