Skip to content

Commit

Permalink
Run the aggregation process from a worker thread
Browse files Browse the repository at this point in the history
  • Loading branch information
gwenneg committed Oct 18, 2023
1 parent 50c8e0f commit 62d3b9b
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.control.ActivateRequestContext;
import jakarta.inject.Inject;
import org.eclipse.microprofile.context.ManagedExecutor;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;

Expand Down Expand Up @@ -115,6 +116,10 @@ public class EmailSubscriptionTypeProcessor extends SystemEndpointTypeProcessor
@Inject
EventRepository eventRepository;

// This executor is used to run a task asynchronously using a worker thread from a threads pool managed by Quarkus.
@Inject
ManagedExecutor managedExecutor;

private Counter rejectedAggregationCommandCount;
private Counter processedAggregationCommandCount;
private Counter failedAggregationCommandCount;
Expand Down Expand Up @@ -194,45 +199,51 @@ private void sendEmail(Event event, Set<Endpoint> endpoints) {
}

public void processAggregation(Event event) {
/*
* The aggregation process is long-running task. To avoid blocking the thread used to consume
* Kafka messages from the ingress topic, we're performing the aggregation from a worker thread.
*/
managedExecutor.submit(() -> {

AggregationCommand aggregationCommand = null;
Timer.Sample consumedTimer = Timer.start(registry);
AggregationCommand aggregationCommand;
Timer.Sample consumedTimer = Timer.start(registry);

try {
Action action = actionParser.fromJsonString(event.getPayload());
Map<String, Object> map = action.getEvents().get(0).getPayload().getAdditionalProperties();
aggregationCommand = objectMapper.convertValue(map, AggregationCommand.class);
} catch (Exception e) {
Log.error("Kafka aggregation payload parsing failed for event " + event.getId(), e);
rejectedAggregationCommandCount.increment();
return;
}
try {
Action action = actionParser.fromJsonString(event.getPayload());
Map<String, Object> map = action.getEvents().get(0).getPayload().getAdditionalProperties();
aggregationCommand = objectMapper.convertValue(map, AggregationCommand.class);
} catch (Exception e) {
Log.error("Kafka aggregation payload parsing failed for event " + event.getId(), e);
rejectedAggregationCommandCount.increment();
return;
}

Log.infof("Processing received aggregation command: %s", aggregationCommand);
processedAggregationCommandCount.increment();
Log.infof("Processing received aggregation command: %s", aggregationCommand);
processedAggregationCommandCount.increment();

try {
Optional<Application> app = applicationRepository.getApplication(aggregationCommand.getAggregationKey().getBundle(), aggregationCommand.getAggregationKey().getApplication());
if (app.isPresent()) {
event.setEventTypeDisplayName(
String.format("%s - %s - %s",
event.getEventTypeDisplayName(),
app.get().getDisplayName(),
app.get().getBundle().getDisplayName())
);
eventRepository.updateEventDisplayName(event);
try {
Optional<Application> app = applicationRepository.getApplication(aggregationCommand.getAggregationKey().getBundle(), aggregationCommand.getAggregationKey().getApplication());
if (app.isPresent()) {
event.setEventTypeDisplayName(
String.format("%s - %s - %s",
event.getEventTypeDisplayName(),
app.get().getDisplayName(),
app.get().getBundle().getDisplayName())
);
eventRepository.updateEventDisplayName(event);
}
processAggregateEmailsByAggregationKey(aggregationCommand, Optional.of(event));
} catch (Exception e) {
Log.warn("Error while processing aggregation", e);
failedAggregationCommandCount.increment();
} finally {
consumedTimer.stop(registry.timer(
AGGREGATION_CONSUMED_TIMER_NAME,
TAG_KEY_BUNDLE, aggregationCommand.getAggregationKey().getBundle(),
TAG_KEY_APPLICATION, aggregationCommand.getAggregationKey().getApplication()
));
}
processAggregateEmailsByAggregationKey(aggregationCommand, Optional.of(event));
} catch (Exception e) {
Log.warn("Error while processing aggregation", e);
failedAggregationCommandCount.increment();
} finally {
consumedTimer.stop(registry.timer(
AGGREGATION_CONSUMED_TIMER_NAME,
TAG_KEY_BUNDLE, aggregationCommand.getAggregationKey().getBundle(),
TAG_KEY_APPLICATION, aggregationCommand.getAggregationKey().getApplication()
));
}
});
}

@Incoming(AGGREGATION_CHANNEL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ void shouldSuccessfullySendTwoAggregatedEmails(String channel) {
inMemoryConnector.source(INGRESS_CHANNEL).send(buildAggregatorAction(aggregationCommand2));
}

micrometerAssertionHelper.awaitAndAssertTimerIncrement(AGGREGATION_CONSUMED_TIMER_NAME, 1);
micrometerAssertionHelper.awaitAndAssertTimerIncrement(AGGREGATION_CONSUMED_TIMER_NAME, 2);
micrometerAssertionHelper.awaitAndAssertCounterIncrement(AGGREGATION_COMMAND_PROCESSED_COUNTER_NAME, 2);
micrometerAssertionHelper.assertCounterIncrement(AGGREGATION_COMMAND_REJECTED_COUNTER_NAME, 0);
micrometerAssertionHelper.assertCounterIncrement(AGGREGATION_COMMAND_ERROR_COUNTER_NAME, 0);
Expand Down

0 comments on commit 62d3b9b

Please sign in to comment.