Skip to content

Commit

Permalink
Use a separate request scope for async aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
gwenneg committed Oct 19, 2023
1 parent 1d320d3 commit 0986f66
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 40 deletions.
4 changes: 4 additions & 0 deletions .rhcicd/clowdapp-engine.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ objects:
value: ${NOTIFICATIONS_EMAIL_CONNECTOR_ENABLED}
- name: NOTIFICATIONS_DRAWER_CONNECTOR_ENABLED
value: ${NOTIFICATIONS_DRAWER_CONNECTOR_ENABLED}
- name: NOTIFICATIONS_ASYNC_AGGREGATION_ENABLED
value: ${NOTIFICATIONS_ASYNC_AGGREGATION_ENABLED}
parameters:
- name: BACKOFFICE_CLIENT_ENV
description: Back-office client environment
Expand Down Expand Up @@ -417,3 +419,5 @@ parameters:
- name: NOTIFICATIONS_DRAWER_CONNECTOR_ENABLED
description: Is the drawer connector enabled to process them instead of in the engine?
value: "false"
- name: NOTIFICATIONS_ASYNC_AGGREGATION_ENABLED
value: "false"
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ public class FeatureFlipper {
@ConfigProperty(name = "notifications.drawer-connector.enabled", defaultValue = "false")
boolean drawerConnectorEnabled;

@ConfigProperty(name = "notifications.async-aggregation.enabled", defaultValue = "false")
boolean asyncAggregation;

void logFeaturesStatusAtStartup(@Observes StartupEvent event) {
Log.infof("=== %s startup status ===", FeatureFlipper.class.getSimpleName());
Log.infof("The behavior groups unique name constraint is %s", enforceBehaviorGroupNameUnicity ? "enabled" : "disabled");
Expand All @@ -116,6 +119,7 @@ void logFeaturesStatusAtStartup(@Observes StartupEvent event) {
Log.infof("The webhook connector is %s", webhookConnectorEnabled ? "enabled" : "disabled");
Log.infof("The email connector is %s", emailConnectorEnabled ? "enabled" : "disabled");
Log.infof("The drawer connector is %s", drawerConnectorEnabled ? "enabled" : "disabled");
Log.infof("The async aggregation is %s", asyncAggregation ? "enabled" : "disabled");
}

public boolean isEnforceBehaviorGroupNameUnicity() {
Expand Down Expand Up @@ -285,6 +289,15 @@ public void setDrawerConnectorEnabled(boolean drawerConnectorEnabled) {
this.drawerConnectorEnabled = drawerConnectorEnabled;
}

public boolean isAsyncAggregation() {
return asyncAggregation;
}

public void setAsyncAggregation(boolean asyncAggregation) {
checkTestLaunchMode();
this.asyncAggregation = asyncAggregation;
}

/**
* This method throws an {@link IllegalStateException} if it is invoked with a launch mode different from
* {@link io.quarkus.runtime.LaunchMode#TEST TEST}. It should be added to methods that allow overriding a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@
import io.vertx.core.json.JsonObject;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.control.ActivateRequestContext;
import jakarta.inject.Inject;
import jakarta.transaction.Transactional;
import org.eclipse.microprofile.context.ManagedExecutor;

import java.time.LocalDateTime;
Expand Down Expand Up @@ -193,51 +195,70 @@ private void sendEmail(Event event, Set<Endpoint> endpoints) {
}

public void processAggregation(Event event) {
/*
* The aggregation process is long-running task. To avoid blocking the thread used to consume
* Kafka messages from the ingress topic, we're performing the aggregation from a worker thread.
*/
managedExecutor.submit(() -> {

AggregationCommand aggregationCommand;
Timer.Sample consumedTimer = Timer.start(registry);
if (featureFlipper.isAsyncAggregation()) {
/*
* The aggregation process is long-running task. To avoid blocking the thread used to consume
* Kafka messages from the ingress topic, we're performing the aggregation from a worker thread.
*/
managedExecutor.submit(new AsyncAggregation(event));
return;
}

try {
Action action = actionParser.fromJsonString(event.getPayload());
Map<String, Object> map = action.getEvents().get(0).getPayload().getAdditionalProperties();
aggregationCommand = objectMapper.convertValue(map, AggregationCommand.class);
} catch (Exception e) {
Log.error("Kafka aggregation payload parsing failed for event " + event.getId(), e);
rejectedAggregationCommandCount.increment();
return;
}
AggregationCommand aggregationCommand;
Timer.Sample consumedTimer = Timer.start(registry);

Log.infof("Processing received aggregation command: %s", aggregationCommand);
processedAggregationCommandCount.increment();
try {
Action action = actionParser.fromJsonString(event.getPayload());
Map<String, Object> map = action.getEvents().get(0).getPayload().getAdditionalProperties();
aggregationCommand = objectMapper.convertValue(map, AggregationCommand.class);
} catch (Exception e) {
Log.error("Kafka aggregation payload parsing failed for event " + event.getId(), e);
rejectedAggregationCommandCount.increment();
return;
}

try {
Optional<Application> app = applicationRepository.getApplication(aggregationCommand.getAggregationKey().getBundle(), aggregationCommand.getAggregationKey().getApplication());
if (app.isPresent()) {
event.setEventTypeDisplayName(
String.format("%s - %s - %s",
event.getEventTypeDisplayName(),
app.get().getDisplayName(),
app.get().getBundle().getDisplayName())
);
eventRepository.updateEventDisplayName(event);
}
processAggregateEmailsByAggregationKey(aggregationCommand, Optional.of(event));
} catch (Exception e) {
Log.warn("Error while processing aggregation", e);
failedAggregationCommandCount.increment();
} finally {
consumedTimer.stop(registry.timer(
AGGREGATION_CONSUMED_TIMER_NAME,
TAG_KEY_BUNDLE, aggregationCommand.getAggregationKey().getBundle(),
TAG_KEY_APPLICATION, aggregationCommand.getAggregationKey().getApplication()
));
Log.infof("Processing received aggregation command: %s", aggregationCommand);
processedAggregationCommandCount.increment();

try {
Optional<Application> app = applicationRepository.getApplication(aggregationCommand.getAggregationKey().getBundle(), aggregationCommand.getAggregationKey().getApplication());
if (app.isPresent()) {
event.setEventTypeDisplayName(
String.format("%s - %s - %s",
event.getEventTypeDisplayName(),
app.get().getDisplayName(),
app.get().getBundle().getDisplayName())
);
eventRepository.updateEventDisplayName(event);
}
});
processAggregateEmailsByAggregationKey(aggregationCommand, Optional.of(event));
} catch (Exception e) {
Log.warn("Error while processing aggregation", e);
failedAggregationCommandCount.increment();
} finally {
consumedTimer.stop(registry.timer(
AGGREGATION_CONSUMED_TIMER_NAME,
TAG_KEY_BUNDLE, aggregationCommand.getAggregationKey().getBundle(),
TAG_KEY_APPLICATION, aggregationCommand.getAggregationKey().getApplication()
));
}
}

class AsyncAggregation implements Runnable {

private final Event event;

public AsyncAggregation(Event event) {
this.event = event;
}

@Override
@Transactional
@ActivateRequestContext
public void run() {
processAggregation(event);
}
}

private void processAggregateEmailsByAggregationKey(AggregationCommand aggregationCommand, Optional<Event> aggregatorEvent) {
Expand Down

0 comments on commit 0986f66

Please sign in to comment.