Skip to content

Commit

Permalink
Revert async aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
gwenneg committed Oct 19, 2023
1 parent 1d320d3 commit e93b3f6
Showing 1 changed file with 34 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,6 @@ public class EmailSubscriptionTypeProcessor extends SystemEndpointTypeProcessor
@Inject
EventRepository eventRepository;

// This executor is used to run a task asynchronously using a worker thread from a threads pool managed by Quarkus.
@Inject
ManagedExecutor managedExecutor;

private Counter rejectedAggregationCommandCount;
private Counter processedAggregationCommandCount;
private Counter failedAggregationCommandCount;
Expand Down Expand Up @@ -193,51 +189,45 @@ private void sendEmail(Event event, Set<Endpoint> endpoints) {
}

public void processAggregation(Event event) {
/*
* The aggregation process is long-running task. To avoid blocking the thread used to consume
* Kafka messages from the ingress topic, we're performing the aggregation from a worker thread.
*/
managedExecutor.submit(() -> {

AggregationCommand aggregationCommand;
Timer.Sample consumedTimer = Timer.start(registry);
AggregationCommand aggregationCommand;
Timer.Sample consumedTimer = Timer.start(registry);

try {
Action action = actionParser.fromJsonString(event.getPayload());
Map<String, Object> map = action.getEvents().get(0).getPayload().getAdditionalProperties();
aggregationCommand = objectMapper.convertValue(map, AggregationCommand.class);
} catch (Exception e) {
Log.error("Kafka aggregation payload parsing failed for event " + event.getId(), e);
rejectedAggregationCommandCount.increment();
return;
}
try {
Action action = actionParser.fromJsonString(event.getPayload());
Map<String, Object> map = action.getEvents().get(0).getPayload().getAdditionalProperties();
aggregationCommand = objectMapper.convertValue(map, AggregationCommand.class);
} catch (Exception e) {
Log.error("Kafka aggregation payload parsing failed for event " + event.getId(), e);
rejectedAggregationCommandCount.increment();
return;
}

Log.infof("Processing received aggregation command: %s", aggregationCommand);
processedAggregationCommandCount.increment();
Log.infof("Processing received aggregation command: %s", aggregationCommand);
processedAggregationCommandCount.increment();

try {
Optional<Application> app = applicationRepository.getApplication(aggregationCommand.getAggregationKey().getBundle(), aggregationCommand.getAggregationKey().getApplication());
if (app.isPresent()) {
event.setEventTypeDisplayName(
String.format("%s - %s - %s",
event.getEventTypeDisplayName(),
app.get().getDisplayName(),
app.get().getBundle().getDisplayName())
);
eventRepository.updateEventDisplayName(event);
}
processAggregateEmailsByAggregationKey(aggregationCommand, Optional.of(event));
} catch (Exception e) {
Log.warn("Error while processing aggregation", e);
failedAggregationCommandCount.increment();
} finally {
consumedTimer.stop(registry.timer(
AGGREGATION_CONSUMED_TIMER_NAME,
TAG_KEY_BUNDLE, aggregationCommand.getAggregationKey().getBundle(),
TAG_KEY_APPLICATION, aggregationCommand.getAggregationKey().getApplication()
));
try {
Optional<Application> app = applicationRepository.getApplication(aggregationCommand.getAggregationKey().getBundle(), aggregationCommand.getAggregationKey().getApplication());
if (app.isPresent()) {
event.setEventTypeDisplayName(
String.format("%s - %s - %s",
event.getEventTypeDisplayName(),
app.get().getDisplayName(),
app.get().getBundle().getDisplayName())
);
eventRepository.updateEventDisplayName(event);
}
});
processAggregateEmailsByAggregationKey(aggregationCommand, Optional.of(event));
} catch (Exception e) {
Log.warn("Error while processing aggregation", e);
failedAggregationCommandCount.increment();
} finally {
consumedTimer.stop(registry.timer(
AGGREGATION_CONSUMED_TIMER_NAME,
TAG_KEY_BUNDLE, aggregationCommand.getAggregationKey().getBundle(),
TAG_KEY_APPLICATION, aggregationCommand.getAggregationKey().getApplication()
));
}
}

private void processAggregateEmailsByAggregationKey(AggregationCommand aggregationCommand, Optional<Event> aggregatorEvent) {
Expand Down

0 comments on commit e93b3f6

Please sign in to comment.