Skip to content

Commit

Permalink
Lower the aggregation batch size and make it configurable from env var
Browse files Browse the repository at this point in the history
  • Loading branch information
gwenneg committed Oct 16, 2023
1 parent 37ce4cd commit fdd296f
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 18 deletions.
5 changes: 5 additions & 0 deletions .rhcicd/clowdapp-engine.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ objects:
value: ${MP_MESSAGING_OUTGOING_TOCAMEL_ENABLED}
- name: MP_MESSAGING_OUTGOING_DRAWER_ENABLED
value: ${MP_MESSAGING_OUTGOING_DRAWER_ENABLED}
- name: NOTIFICATIONS_AGGREGATION_BATCH_SIZE
value: ${NOTIFICATIONS_AGGREGATION_BATCH_SIZE}
- name: NOTIFICATIONS_DISABLE_WEBHOOK_ENDPOINTS_ON_FAILURE
value: ${NOTIFICATIONS_DISABLE_WEBHOOK_ENDPOINTS_ON_FAILURE}
- name: NOTIFICATIONS_EMAILS_ONLY_MODE_ENABLED
Expand Down Expand Up @@ -318,6 +320,9 @@ parameters:
value: "true"
- name: MP_MESSAGING_OUTGOING_DRAWER_ENABLED
value: "false"
- name: NOTIFICATIONS_AGGREGATION_BATCH_SIZE
description: Number of aggregation records loaded from the DB during each aggregation process iteration
value: "100"
- name: NOTIFICATIONS_DISABLE_WEBHOOK_ENDPOINTS_ON_FAILURE
value: "false"
- name: NOTIFICATIONS_EMAILS_ONLY_MODE_ENABLED
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.redhat.cloud.notifications.processors.email;

import com.redhat.cloud.notifications.config.FeatureFlipper;
import com.redhat.cloud.notifications.db.repositories.EmailAggregationRepository;
import com.redhat.cloud.notifications.db.repositories.EmailSubscriptionRepository;
import com.redhat.cloud.notifications.db.repositories.EndpointRepository;
Expand Down Expand Up @@ -45,20 +44,12 @@ public class EmailAggregator {
@Inject
EmailSubscriptionRepository emailSubscriptionRepository;

@Inject
FeatureFlipper featureFlipper;

// This is manually used from the JSON payload instead of converting it to an Action and using getEventType()
private static final String EVENT_TYPE_KEY = "event_type";
private static final String RECIPIENTS_KEY = "recipients";

@ConfigProperty(name = "notifications.get.aggregation.max.page.size", defaultValue = "10000")
int aggregationMaxPageSize;

private Set<String> getEmailSubscribers(EmailAggregationKey aggregationKey, EmailSubscriptionType emailSubscriptionType) {
return Set.copyOf(emailSubscriptionRepository
.getEmailSubscribersUserId(aggregationKey.getOrgId(), aggregationKey.getBundle(), aggregationKey.getApplication(), emailSubscriptionType));
}
@ConfigProperty(name = "notifications.aggregation.batch-size", defaultValue = "100")
int batchSize;

private Map<String, Set<String>> getEmailSubscribersGroupedByEventType(EmailAggregationKey aggregationKey, EmailSubscriptionType emailSubscriptionType) {
return emailSubscriptionRepository
Expand All @@ -84,8 +75,8 @@ public Map<User, Map<String, Object>> getAggregated(EmailAggregationKey aggregat
List<EmailAggregation> aggregations;
do {
// First, we retrieve paginated aggregations that match the given key.
aggregations = emailAggregationRepository.getEmailAggregation(aggregationKey, start, end, offset, aggregationMaxPageSize);
offset += aggregationMaxPageSize;
aggregations = emailAggregationRepository.getEmailAggregation(aggregationKey, start, end, offset, batchSize);
offset += batchSize;

// For each aggregation...
for (EmailAggregation aggregation : aggregations) {
Expand Down Expand Up @@ -122,7 +113,7 @@ public Map<User, Map<String, Object>> getAggregated(EmailAggregationKey aggregat
});
}
totalAggregatedElements += aggregations.size();
} while (aggregationMaxPageSize == aggregations.size());
} while (batchSize == aggregations.size());
Log.infof("%d elements were aggregated for key %s", totalAggregatedElements, aggregationKey);

return aggregated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class EmailAggregatorTest {

@BeforeEach
void beforeEach() {
emailAggregator.aggregationMaxPageSize = 5;
emailAggregator.batchSize = 5;
}

@Test
Expand Down Expand Up @@ -98,7 +98,7 @@ void shouldTestRecipientsFromSubscription() {
// Test user subscription based on event type
Map<User, Map<String, Object>> result = aggregate();
verify(emailAggregationRepository, times(1)).getEmailAggregation(any(EmailAggregationKey.class), any(LocalDateTime.class), any(LocalDateTime.class), anyInt(), anyInt());
verify(emailAggregationRepository, times(1)).getEmailAggregation(any(EmailAggregationKey.class), any(LocalDateTime.class), any(LocalDateTime.class), eq(0), eq(emailAggregator.aggregationMaxPageSize));
verify(emailAggregationRepository, times(1)).getEmailAggregation(any(EmailAggregationKey.class), any(LocalDateTime.class), any(LocalDateTime.class), eq(0), eq(emailAggregator.batchSize));
reset(emailAggregationRepository); // just reset mockito counter

// nobody subscribed to the right event type yet
Expand All @@ -108,8 +108,8 @@ void shouldTestRecipientsFromSubscription() {
// because after the previous aggregate() call the email_aggregation DB table was not purged, we already have 4 records on database
result = aggregate();
verify(emailAggregationRepository, times(2)).getEmailAggregation(any(EmailAggregationKey.class), any(LocalDateTime.class), any(LocalDateTime.class), anyInt(), anyInt());
verify(emailAggregationRepository, times(1)).getEmailAggregation(any(EmailAggregationKey.class), any(LocalDateTime.class), any(LocalDateTime.class), eq(0), eq(emailAggregator.aggregationMaxPageSize));
verify(emailAggregationRepository, times(1)).getEmailAggregation(any(EmailAggregationKey.class), any(LocalDateTime.class), any(LocalDateTime.class), eq(5), eq(emailAggregator.aggregationMaxPageSize));
verify(emailAggregationRepository, times(1)).getEmailAggregation(any(EmailAggregationKey.class), any(LocalDateTime.class), any(LocalDateTime.class), eq(0), eq(emailAggregator.batchSize));
verify(emailAggregationRepository, times(1)).getEmailAggregation(any(EmailAggregationKey.class), any(LocalDateTime.class), any(LocalDateTime.class), eq(5), eq(emailAggregator.batchSize));
assertEquals(1, result.size());
User user = result.keySet().stream().findFirst().get();
assertTrue(user.getEmail().equals("user-2"));
Expand Down

0 comments on commit fdd296f

Please sign in to comment.