diff --git a/engine/src/main/java/com/redhat/cloud/notifications/db/repositories/EndpointRepository.java b/engine/src/main/java/com/redhat/cloud/notifications/db/repositories/EndpointRepository.java index 9b23430286..3c38de1e41 100644 --- a/engine/src/main/java/com/redhat/cloud/notifications/db/repositories/EndpointRepository.java +++ b/engine/src/main/java/com/redhat/cloud/notifications/db/repositories/EndpointRepository.java @@ -30,6 +30,7 @@ import static com.redhat.cloud.notifications.models.EndpointType.EMAIL_SUBSCRIPTION; import static com.redhat.cloud.notifications.models.EndpointType.WEBHOOK; import static jakarta.persistence.LockModeType.PESSIMISTIC_WRITE; +import static jakarta.transaction.Transactional.TxType.REQUIRES_NEW; @ApplicationScoped public class EndpointRepository { @@ -37,6 +38,12 @@ public class EndpointRepository { @Inject EntityManager entityManager; + // A new transaction is required because this method is executed from a worker thread. + @Transactional(REQUIRES_NEW) + public Endpoint getOrCreateDefaultSystemSubscriptionWithNewTransaction(String accountId, String orgId, EndpointType endpointType) { + return getOrCreateDefaultSystemSubscription(accountId, orgId, endpointType); + } + /** * The purpose of this method is to find or create an EMAIL_SUBSCRIPTION or DRAWER endpoint with empty properties. This * endpoint is used to aggregate and store in the DB the email or drawer actions outcome, which will be used later by the diff --git a/engine/src/main/java/com/redhat/cloud/notifications/db/repositories/EventRepository.java b/engine/src/main/java/com/redhat/cloud/notifications/db/repositories/EventRepository.java index 25a7343601..9758fad529 100644 --- a/engine/src/main/java/com/redhat/cloud/notifications/db/repositories/EventRepository.java +++ b/engine/src/main/java/com/redhat/cloud/notifications/db/repositories/EventRepository.java @@ -15,6 +15,8 @@ import java.util.Map; import java.util.UUID; +import static jakarta.transaction.Transactional.TxType.REQUIRES_NEW; + @ApplicationScoped public class EventRepository { @@ -97,6 +99,12 @@ public void updateDrawerNotification(Event event) { .executeUpdate(); } + // A new transaction is required because this method is executed from a worker thread. + @Transactional(REQUIRES_NEW) + public void updateEventDisplayNameWithNewTransaction(UUID eventId, String eventTypeDisplayName) { + updateEventDisplayName(eventId, eventTypeDisplayName); + } + @Transactional public void updateEventDisplayName(UUID eventId, String eventTypeDisplayName) { String hql = "UPDATE Event SET eventTypeDisplayName = :eventDisplayName WHERE id = :id"; diff --git a/engine/src/main/java/com/redhat/cloud/notifications/processors/email/AsyncAggregation.java b/engine/src/main/java/com/redhat/cloud/notifications/processors/email/AsyncAggregation.java index 9f597f5c7a..d9a912e418 100644 --- a/engine/src/main/java/com/redhat/cloud/notifications/processors/email/AsyncAggregation.java +++ b/engine/src/main/java/com/redhat/cloud/notifications/processors/email/AsyncAggregation.java @@ -7,7 +7,6 @@ import jakarta.enterprise.context.Dependent; import jakarta.enterprise.context.control.ActivateRequestContext; import jakarta.inject.Inject; -import jakarta.transaction.Transactional; @Dependent public class AsyncAggregation implements Runnable { @@ -29,9 +28,8 @@ public void setEvent(Event event) { @Override @ActivateRequestContext - @Transactional public void run() { - emailSubscriptionTypeProcessor.processAggregationSync(event); + emailSubscriptionTypeProcessor.processAggregationSync(event, true); } @PreDestroy diff --git a/engine/src/main/java/com/redhat/cloud/notifications/processors/email/EmailSubscriptionTypeProcessor.java b/engine/src/main/java/com/redhat/cloud/notifications/processors/email/EmailSubscriptionTypeProcessor.java index de56fe211e..dffc5fef9a 100644 --- a/engine/src/main/java/com/redhat/cloud/notifications/processors/email/EmailSubscriptionTypeProcessor.java +++ b/engine/src/main/java/com/redhat/cloud/notifications/processors/email/EmailSubscriptionTypeProcessor.java @@ -210,11 +210,11 @@ public void processAggregation(Event event) { asyncAggregations.destroy(asyncAggregation); }); } else { - processAggregationSync(event); + processAggregationSync(event, false); } } - public void processAggregationSync(Event event) { + public void processAggregationSync(Event event, boolean async) { AggregationCommand aggregationCommand; Timer.Sample consumedTimer = Timer.start(registry); @@ -240,9 +240,13 @@ public void processAggregationSync(Event event) { app.get().getDisplayName(), app.get().getBundle().getDisplayName() ); - eventRepository.updateEventDisplayName(event.getId(), eventTypeDisplayName); + if (async) { + eventRepository.updateEventDisplayNameWithNewTransaction(event.getId(), eventTypeDisplayName); + } else { + eventRepository.updateEventDisplayName(event.getId(), eventTypeDisplayName); + } } - processAggregateEmailsByAggregationKey(aggregationCommand, Optional.of(event)); + processAggregateEmailsByAggregationKey(aggregationCommand, Optional.of(event), async); } catch (Exception e) { Log.warn("Error while processing aggregation", e); failedAggregationCommandCount.increment(); @@ -255,7 +259,7 @@ public void processAggregationSync(Event event) { } } - private void processAggregateEmailsByAggregationKey(AggregationCommand aggregationCommand, Optional aggregatorEvent) { + private void processAggregateEmailsByAggregationKey(AggregationCommand aggregationCommand, Optional aggregatorEvent, boolean async) { TemplateInstance subject = null; TemplateInstance body = null; final long startTime = System.currentTimeMillis(); @@ -270,7 +274,12 @@ private void processAggregateEmailsByAggregationKey(AggregationCommand aggregati body = templateService.compileTemplate(bodyData, "body"); } - Endpoint endpoint = endpointRepository.getOrCreateDefaultSystemSubscription(null, aggregationKey.getOrgId(), EndpointType.EMAIL_SUBSCRIPTION); + Endpoint endpoint; + if (async) { + endpoint = endpointRepository.getOrCreateDefaultSystemSubscriptionWithNewTransaction(null, aggregationKey.getOrgId(), EndpointType.EMAIL_SUBSCRIPTION); + } else { + endpoint = endpointRepository.getOrCreateDefaultSystemSubscription(null, aggregationKey.getOrgId(), EndpointType.EMAIL_SUBSCRIPTION); + } Event event; if (aggregatorEvent.isEmpty()) { event = new Event();