From 62d3b9b8a788be6a642a50696b04a2c712cb88d3 Mon Sep 17 00:00:00 2001 From: Gwenneg Lepage Date: Wed, 11 Oct 2023 21:38:24 +0200 Subject: [PATCH] Run the aggregation process from a worker thread --- .../email/EmailSubscriptionTypeProcessor.java | 79 +++++++++++-------- .../EmailSubscriptionTypeProcessorTest.java | 2 +- 2 files changed, 46 insertions(+), 35 deletions(-) diff --git a/engine/src/main/java/com/redhat/cloud/notifications/processors/email/EmailSubscriptionTypeProcessor.java b/engine/src/main/java/com/redhat/cloud/notifications/processors/email/EmailSubscriptionTypeProcessor.java index 5776e4a125..c185871d85 100644 --- a/engine/src/main/java/com/redhat/cloud/notifications/processors/email/EmailSubscriptionTypeProcessor.java +++ b/engine/src/main/java/com/redhat/cloud/notifications/processors/email/EmailSubscriptionTypeProcessor.java @@ -44,6 +44,7 @@ import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.control.ActivateRequestContext; import jakarta.inject.Inject; +import org.eclipse.microprofile.context.ManagedExecutor; import org.eclipse.microprofile.reactive.messaging.Acknowledgment; import org.eclipse.microprofile.reactive.messaging.Incoming; @@ -115,6 +116,10 @@ public class EmailSubscriptionTypeProcessor extends SystemEndpointTypeProcessor @Inject EventRepository eventRepository; + // This executor is used to run a task asynchronously using a worker thread from a threads pool managed by Quarkus. + @Inject + ManagedExecutor managedExecutor; + private Counter rejectedAggregationCommandCount; private Counter processedAggregationCommandCount; private Counter failedAggregationCommandCount; @@ -194,45 +199,51 @@ private void sendEmail(Event event, Set endpoints) { } public void processAggregation(Event event) { + /* + * The aggregation process is long-running task. To avoid blocking the thread used to consume + * Kafka messages from the ingress topic, we're performing the aggregation from a worker thread. + */ + managedExecutor.submit(() -> { - AggregationCommand aggregationCommand = null; - Timer.Sample consumedTimer = Timer.start(registry); + AggregationCommand aggregationCommand; + Timer.Sample consumedTimer = Timer.start(registry); - try { - Action action = actionParser.fromJsonString(event.getPayload()); - Map map = action.getEvents().get(0).getPayload().getAdditionalProperties(); - aggregationCommand = objectMapper.convertValue(map, AggregationCommand.class); - } catch (Exception e) { - Log.error("Kafka aggregation payload parsing failed for event " + event.getId(), e); - rejectedAggregationCommandCount.increment(); - return; - } + try { + Action action = actionParser.fromJsonString(event.getPayload()); + Map map = action.getEvents().get(0).getPayload().getAdditionalProperties(); + aggregationCommand = objectMapper.convertValue(map, AggregationCommand.class); + } catch (Exception e) { + Log.error("Kafka aggregation payload parsing failed for event " + event.getId(), e); + rejectedAggregationCommandCount.increment(); + return; + } - Log.infof("Processing received aggregation command: %s", aggregationCommand); - processedAggregationCommandCount.increment(); + Log.infof("Processing received aggregation command: %s", aggregationCommand); + processedAggregationCommandCount.increment(); - try { - Optional app = applicationRepository.getApplication(aggregationCommand.getAggregationKey().getBundle(), aggregationCommand.getAggregationKey().getApplication()); - if (app.isPresent()) { - event.setEventTypeDisplayName( - String.format("%s - %s - %s", - event.getEventTypeDisplayName(), - app.get().getDisplayName(), - app.get().getBundle().getDisplayName()) - ); - eventRepository.updateEventDisplayName(event); + try { + Optional app = applicationRepository.getApplication(aggregationCommand.getAggregationKey().getBundle(), aggregationCommand.getAggregationKey().getApplication()); + if (app.isPresent()) { + event.setEventTypeDisplayName( + String.format("%s - %s - %s", + event.getEventTypeDisplayName(), + app.get().getDisplayName(), + app.get().getBundle().getDisplayName()) + ); + eventRepository.updateEventDisplayName(event); + } + processAggregateEmailsByAggregationKey(aggregationCommand, Optional.of(event)); + } catch (Exception e) { + Log.warn("Error while processing aggregation", e); + failedAggregationCommandCount.increment(); + } finally { + consumedTimer.stop(registry.timer( + AGGREGATION_CONSUMED_TIMER_NAME, + TAG_KEY_BUNDLE, aggregationCommand.getAggregationKey().getBundle(), + TAG_KEY_APPLICATION, aggregationCommand.getAggregationKey().getApplication() + )); } - processAggregateEmailsByAggregationKey(aggregationCommand, Optional.of(event)); - } catch (Exception e) { - Log.warn("Error while processing aggregation", e); - failedAggregationCommandCount.increment(); - } finally { - consumedTimer.stop(registry.timer( - AGGREGATION_CONSUMED_TIMER_NAME, - TAG_KEY_BUNDLE, aggregationCommand.getAggregationKey().getBundle(), - TAG_KEY_APPLICATION, aggregationCommand.getAggregationKey().getApplication() - )); - } + }); } @Incoming(AGGREGATION_CHANNEL) diff --git a/engine/src/test/java/com/redhat/cloud/notifications/processors/email/EmailSubscriptionTypeProcessorTest.java b/engine/src/test/java/com/redhat/cloud/notifications/processors/email/EmailSubscriptionTypeProcessorTest.java index ac0f8682c5..a227526029 100644 --- a/engine/src/test/java/com/redhat/cloud/notifications/processors/email/EmailSubscriptionTypeProcessorTest.java +++ b/engine/src/test/java/com/redhat/cloud/notifications/processors/email/EmailSubscriptionTypeProcessorTest.java @@ -201,7 +201,7 @@ void shouldSuccessfullySendTwoAggregatedEmails(String channel) { inMemoryConnector.source(INGRESS_CHANNEL).send(buildAggregatorAction(aggregationCommand2)); } - micrometerAssertionHelper.awaitAndAssertTimerIncrement(AGGREGATION_CONSUMED_TIMER_NAME, 1); + micrometerAssertionHelper.awaitAndAssertTimerIncrement(AGGREGATION_CONSUMED_TIMER_NAME, 2); micrometerAssertionHelper.awaitAndAssertCounterIncrement(AGGREGATION_COMMAND_PROCESSED_COUNTER_NAME, 2); micrometerAssertionHelper.assertCounterIncrement(AGGREGATION_COMMAND_REJECTED_COUNTER_NAME, 0); micrometerAssertionHelper.assertCounterIncrement(AGGREGATION_COMMAND_ERROR_COUNTER_NAME, 0);