From 8999fd8691bdb7488ac4f1c7cbdf239121c53a85 Mon Sep 17 00:00:00 2001 From: Gwenneg Lepage Date: Mon, 16 Oct 2023 09:01:18 +0200 Subject: [PATCH] Lower the aggregation batch size and make it configurable from env var --- .rhcicd/clowdapp-engine.yaml | 3 +++ .../processors/email/EmailAggregator.java | 19 +++++-------------- .../processors/email/EmailAggregatorTest.java | 8 ++++---- 3 files changed, 12 insertions(+), 18 deletions(-) diff --git a/.rhcicd/clowdapp-engine.yaml b/.rhcicd/clowdapp-engine.yaml index 9a0ec6ceb6..c0cf9bad28 100644 --- a/.rhcicd/clowdapp-engine.yaml +++ b/.rhcicd/clowdapp-engine.yaml @@ -420,3 +420,6 @@ parameters: - name: NOTIFICATIONS_DRAWER_CONNECTOR_ENABLED description: Is the drawer connector enabled to process them instead of in the engine? value: "false" +- name: NOTIFICATIONS_AGGREGATION_BATCH_SIZE + description: Number of aggregation records loaded from the DB during each aggregation process iteration + value: "100" diff --git a/engine/src/main/java/com/redhat/cloud/notifications/processors/email/EmailAggregator.java b/engine/src/main/java/com/redhat/cloud/notifications/processors/email/EmailAggregator.java index 091c637969..80d53d27ee 100644 --- a/engine/src/main/java/com/redhat/cloud/notifications/processors/email/EmailAggregator.java +++ b/engine/src/main/java/com/redhat/cloud/notifications/processors/email/EmailAggregator.java @@ -1,6 +1,5 @@ package com.redhat.cloud.notifications.processors.email; -import com.redhat.cloud.notifications.config.FeatureFlipper; import com.redhat.cloud.notifications.db.repositories.EmailAggregationRepository; import com.redhat.cloud.notifications.db.repositories.EmailSubscriptionRepository; import com.redhat.cloud.notifications.db.repositories.EndpointRepository; @@ -45,20 +44,12 @@ public class EmailAggregator { @Inject EmailSubscriptionRepository emailSubscriptionRepository; - @Inject - FeatureFlipper featureFlipper; - // This is manually used from the JSON payload instead of converting it to an Action and using getEventType() private static final String EVENT_TYPE_KEY = "event_type"; private static final String RECIPIENTS_KEY = "recipients"; - @ConfigProperty(name = "notifications.get.aggregation.max.page.size", defaultValue = "10000") - int aggregationMaxPageSize; - - private Set getEmailSubscribers(EmailAggregationKey aggregationKey, EmailSubscriptionType emailSubscriptionType) { - return Set.copyOf(emailSubscriptionRepository - .getEmailSubscribersUserId(aggregationKey.getOrgId(), aggregationKey.getBundle(), aggregationKey.getApplication(), emailSubscriptionType)); - } + @ConfigProperty(name = "notifications.aggregation.batch-size", defaultValue = "10000") + int batchSize; private Map> getEmailSubscribersGroupedByEventType(EmailAggregationKey aggregationKey, EmailSubscriptionType emailSubscriptionType) { return emailSubscriptionRepository @@ -84,8 +75,8 @@ public Map> getAggregated(EmailAggregationKey aggregat List aggregations; do { // First, we retrieve paginated aggregations that match the given key. - aggregations = emailAggregationRepository.getEmailAggregation(aggregationKey, start, end, offset, aggregationMaxPageSize); - offset += aggregationMaxPageSize; + aggregations = emailAggregationRepository.getEmailAggregation(aggregationKey, start, end, offset, batchSize); + offset += batchSize; // For each aggregation... for (EmailAggregation aggregation : aggregations) { @@ -122,7 +113,7 @@ public Map> getAggregated(EmailAggregationKey aggregat }); } totalAggregatedElements += aggregations.size(); - } while (aggregationMaxPageSize == aggregations.size()); + } while (batchSize == aggregations.size()); Log.infof("%d elements were aggregated for key %s", totalAggregatedElements, aggregationKey); return aggregated diff --git a/engine/src/test/java/com/redhat/cloud/notifications/processors/email/EmailAggregatorTest.java b/engine/src/test/java/com/redhat/cloud/notifications/processors/email/EmailAggregatorTest.java index df4f64a696..8e9573d952 100644 --- a/engine/src/test/java/com/redhat/cloud/notifications/processors/email/EmailAggregatorTest.java +++ b/engine/src/test/java/com/redhat/cloud/notifications/processors/email/EmailAggregatorTest.java @@ -67,7 +67,7 @@ class EmailAggregatorTest { @BeforeEach void beforeEach() { - emailAggregator.aggregationMaxPageSize = 5; + emailAggregator.batchSize = 5; } @Test @@ -98,7 +98,7 @@ void shouldTestRecipientsFromSubscription() { // Test user subscription based on event type Map> result = aggregate(); verify(emailAggregationRepository, times(1)).getEmailAggregation(any(EmailAggregationKey.class), any(LocalDateTime.class), any(LocalDateTime.class), anyInt(), anyInt()); - verify(emailAggregationRepository, times(1)).getEmailAggregation(any(EmailAggregationKey.class), any(LocalDateTime.class), any(LocalDateTime.class), eq(0), eq(emailAggregator.aggregationMaxPageSize)); + verify(emailAggregationRepository, times(1)).getEmailAggregation(any(EmailAggregationKey.class), any(LocalDateTime.class), any(LocalDateTime.class), eq(0), eq(emailAggregator.batchSize)); reset(emailAggregationRepository); // just reset mockito counter // nobody subscribed to the right event type yet @@ -108,8 +108,8 @@ void shouldTestRecipientsFromSubscription() { // because after the previous aggregate() call the email_aggregation DB table was not purged, we already have 4 records on database result = aggregate(); verify(emailAggregationRepository, times(2)).getEmailAggregation(any(EmailAggregationKey.class), any(LocalDateTime.class), any(LocalDateTime.class), anyInt(), anyInt()); - verify(emailAggregationRepository, times(1)).getEmailAggregation(any(EmailAggregationKey.class), any(LocalDateTime.class), any(LocalDateTime.class), eq(0), eq(emailAggregator.aggregationMaxPageSize)); - verify(emailAggregationRepository, times(1)).getEmailAggregation(any(EmailAggregationKey.class), any(LocalDateTime.class), any(LocalDateTime.class), eq(5), eq(emailAggregator.aggregationMaxPageSize)); + verify(emailAggregationRepository, times(1)).getEmailAggregation(any(EmailAggregationKey.class), any(LocalDateTime.class), any(LocalDateTime.class), eq(0), eq(emailAggregator.batchSize)); + verify(emailAggregationRepository, times(1)).getEmailAggregation(any(EmailAggregationKey.class), any(LocalDateTime.class), any(LocalDateTime.class), eq(5), eq(emailAggregator.batchSize)); assertEquals(1, result.size()); User user = result.keySet().stream().findFirst().get(); assertTrue(user.getEmail().equals("user-2"));