Skip to content

Commit

Permalink
Run the aggregation process from a worker thread
Browse files Browse the repository at this point in the history
  • Loading branch information
gwenneg committed Oct 18, 2023
1 parent 50c8e0f commit 0a924cb
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.control.ActivateRequestContext;
import jakarta.inject.Inject;
import org.eclipse.microprofile.context.ManagedExecutor;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;

Expand Down Expand Up @@ -115,6 +116,10 @@ public class EmailSubscriptionTypeProcessor extends SystemEndpointTypeProcessor
@Inject
EventRepository eventRepository;

// This executor is used to run a task asynchronously using a worker thread from a threads pool managed by Quarkus.
@Inject
ManagedExecutor managedExecutor;

private Counter rejectedAggregationCommandCount;
private Counter processedAggregationCommandCount;
private Counter failedAggregationCommandCount;
Expand Down Expand Up @@ -194,45 +199,51 @@ private void sendEmail(Event event, Set<Endpoint> endpoints) {
}

public void processAggregation(Event event) {
/*
* The aggregation process is long-running task. To avoid blocking the thread used to consume
* Kafka messages from the ingress topic, we're performing the aggregation from a worker thread.
*/
managedExecutor.submit(() -> {

AggregationCommand aggregationCommand = null;
Timer.Sample consumedTimer = Timer.start(registry);
AggregationCommand aggregationCommand;
Timer.Sample consumedTimer = Timer.start(registry);

try {
Action action = actionParser.fromJsonString(event.getPayload());
Map<String, Object> map = action.getEvents().get(0).getPayload().getAdditionalProperties();
aggregationCommand = objectMapper.convertValue(map, AggregationCommand.class);
} catch (Exception e) {
Log.error("Kafka aggregation payload parsing failed for event " + event.getId(), e);
rejectedAggregationCommandCount.increment();
return;
}
try {
Action action = actionParser.fromJsonString(event.getPayload());
Map<String, Object> map = action.getEvents().get(0).getPayload().getAdditionalProperties();
aggregationCommand = objectMapper.convertValue(map, AggregationCommand.class);
} catch (Exception e) {
Log.error("Kafka aggregation payload parsing failed for event " + event.getId(), e);
rejectedAggregationCommandCount.increment();
return;
}

Log.infof("Processing received aggregation command: %s", aggregationCommand);
processedAggregationCommandCount.increment();
Log.infof("Processing received aggregation command: %s", aggregationCommand);
processedAggregationCommandCount.increment();

try {
Optional<Application> app = applicationRepository.getApplication(aggregationCommand.getAggregationKey().getBundle(), aggregationCommand.getAggregationKey().getApplication());
if (app.isPresent()) {
event.setEventTypeDisplayName(
String.format("%s - %s - %s",
event.getEventTypeDisplayName(),
app.get().getDisplayName(),
app.get().getBundle().getDisplayName())
);
eventRepository.updateEventDisplayName(event);
try {
Optional<Application> app = applicationRepository.getApplication(aggregationCommand.getAggregationKey().getBundle(), aggregationCommand.getAggregationKey().getApplication());
if (app.isPresent()) {
event.setEventTypeDisplayName(
String.format("%s - %s - %s",
event.getEventTypeDisplayName(),
app.get().getDisplayName(),
app.get().getBundle().getDisplayName())
);
eventRepository.updateEventDisplayName(event);
}
processAggregateEmailsByAggregationKey(aggregationCommand, Optional.of(event));
} catch (Exception e) {
Log.warn("Error while processing aggregation", e);
failedAggregationCommandCount.increment();
} finally {
consumedTimer.stop(registry.timer(
AGGREGATION_CONSUMED_TIMER_NAME,
TAG_KEY_BUNDLE, aggregationCommand.getAggregationKey().getBundle(),
TAG_KEY_APPLICATION, aggregationCommand.getAggregationKey().getApplication()
));
}
processAggregateEmailsByAggregationKey(aggregationCommand, Optional.of(event));
} catch (Exception e) {
Log.warn("Error while processing aggregation", e);
failedAggregationCommandCount.increment();
} finally {
consumedTimer.stop(registry.timer(
AGGREGATION_CONSUMED_TIMER_NAME,
TAG_KEY_BUNDLE, aggregationCommand.getAggregationKey().getBundle(),
TAG_KEY_APPLICATION, aggregationCommand.getAggregationKey().getApplication()
));
}
});
}

@Incoming(AGGREGATION_CHANNEL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -207,7 +208,7 @@ void shouldSuccessfullySendTwoAggregatedEmails(String channel) {
micrometerAssertionHelper.assertCounterIncrement(AGGREGATION_COMMAND_ERROR_COUNTER_NAME, 0);

// Let's check that EndpointEmailSubscriptionResources#sendEmail was called for each aggregation.
verify(emailAggregationRepository, times(1)).getEmailAggregation(
verify(emailAggregationRepository, timeout(5000L).times(1)).getEmailAggregation(
eq(aggregationCommand1.getAggregationKey()),
eq(aggregationCommand1.getStart()),
eq(aggregationCommand1.getEnd()),
Expand All @@ -219,7 +220,7 @@ void shouldSuccessfullySendTwoAggregatedEmails(String channel) {
eq(aggregationCommand1.getAggregationKey()),
eq(aggregationCommand1.getEnd())
);
verify(emailAggregationRepository, times(1)).getEmailAggregation(
verify(emailAggregationRepository, timeout(5000L).times(1)).getEmailAggregation(
eq(aggregationCommand2.getAggregationKey()),
eq(aggregationCommand2.getStart()),
eq(aggregationCommand2.getEnd()),
Expand Down

0 comments on commit 0a924cb

Please sign in to comment.