Skip to content

Commit

Permalink
Create new transaction contexts for async aggregation (RedHatInsights…
Browse files Browse the repository at this point in the history
  • Loading branch information
gwenneg authored Nov 10, 2023
1 parent 066b973 commit 69a3245
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,20 @@
import static com.redhat.cloud.notifications.models.EndpointType.EMAIL_SUBSCRIPTION;
import static com.redhat.cloud.notifications.models.EndpointType.WEBHOOK;
import static jakarta.persistence.LockModeType.PESSIMISTIC_WRITE;
import static jakarta.transaction.Transactional.TxType.REQUIRES_NEW;

@ApplicationScoped
public class EndpointRepository {

@Inject
EntityManager entityManager;

// A new transaction is required because this method is executed from a worker thread.
@Transactional(REQUIRES_NEW)
public Endpoint getOrCreateDefaultSystemSubscriptionWithNewTransaction(String accountId, String orgId, EndpointType endpointType) {
return getOrCreateDefaultSystemSubscription(accountId, orgId, endpointType);
}

/**
* The purpose of this method is to find or create an EMAIL_SUBSCRIPTION or DRAWER endpoint with empty properties. This
* endpoint is used to aggregate and store in the DB the email or drawer actions outcome, which will be used later by the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import java.util.Map;
import java.util.UUID;

import static jakarta.transaction.Transactional.TxType.REQUIRES_NEW;

@ApplicationScoped
public class EventRepository {

Expand Down Expand Up @@ -97,6 +99,12 @@ public void updateDrawerNotification(Event event) {
.executeUpdate();
}

// A new transaction is required because this method is executed from a worker thread.
@Transactional(REQUIRES_NEW)
public void updateEventDisplayNameWithNewTransaction(UUID eventId, String eventTypeDisplayName) {
updateEventDisplayName(eventId, eventTypeDisplayName);
}

@Transactional
public void updateEventDisplayName(UUID eventId, String eventTypeDisplayName) {
String hql = "UPDATE Event SET eventTypeDisplayName = :eventDisplayName WHERE id = :id";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.context.control.ActivateRequestContext;
import jakarta.inject.Inject;
import jakarta.transaction.Transactional;

@Dependent
public class AsyncAggregation implements Runnable {
Expand All @@ -29,9 +28,8 @@ public void setEvent(Event event) {

@Override
@ActivateRequestContext
@Transactional
public void run() {
emailSubscriptionTypeProcessor.processAggregationSync(event);
emailSubscriptionTypeProcessor.processAggregationSync(event, true);
}

@PreDestroy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,11 @@ public void processAggregation(Event event) {
asyncAggregations.destroy(asyncAggregation);
});
} else {
processAggregationSync(event);
processAggregationSync(event, false);
}
}

public void processAggregationSync(Event event) {
public void processAggregationSync(Event event, boolean async) {

AggregationCommand aggregationCommand;
Timer.Sample consumedTimer = Timer.start(registry);
Expand All @@ -240,9 +240,13 @@ public void processAggregationSync(Event event) {
app.get().getDisplayName(),
app.get().getBundle().getDisplayName()
);
eventRepository.updateEventDisplayName(event.getId(), eventTypeDisplayName);
if (async) {
eventRepository.updateEventDisplayNameWithNewTransaction(event.getId(), eventTypeDisplayName);
} else {
eventRepository.updateEventDisplayName(event.getId(), eventTypeDisplayName);
}
}
processAggregateEmailsByAggregationKey(aggregationCommand, Optional.of(event));
processAggregateEmailsByAggregationKey(aggregationCommand, Optional.of(event), async);
} catch (Exception e) {
Log.warn("Error while processing aggregation", e);
failedAggregationCommandCount.increment();
Expand All @@ -255,7 +259,7 @@ public void processAggregationSync(Event event) {
}
}

private void processAggregateEmailsByAggregationKey(AggregationCommand aggregationCommand, Optional<Event> aggregatorEvent) {
private void processAggregateEmailsByAggregationKey(AggregationCommand aggregationCommand, Optional<Event> aggregatorEvent, boolean async) {
TemplateInstance subject = null;
TemplateInstance body = null;
final long startTime = System.currentTimeMillis();
Expand All @@ -270,7 +274,12 @@ private void processAggregateEmailsByAggregationKey(AggregationCommand aggregati
body = templateService.compileTemplate(bodyData, "body");
}

Endpoint endpoint = endpointRepository.getOrCreateDefaultSystemSubscription(null, aggregationKey.getOrgId(), EndpointType.EMAIL_SUBSCRIPTION);
Endpoint endpoint;
if (async) {
endpoint = endpointRepository.getOrCreateDefaultSystemSubscriptionWithNewTransaction(null, aggregationKey.getOrgId(), EndpointType.EMAIL_SUBSCRIPTION);
} else {
endpoint = endpointRepository.getOrCreateDefaultSystemSubscription(null, aggregationKey.getOrgId(), EndpointType.EMAIL_SUBSCRIPTION);
}
Event event;
if (aggregatorEvent.isEmpty()) {
event = new Event();
Expand Down

0 comments on commit 69a3245

Please sign in to comment.