From 7dce1831701ca4fb7b6963719e79731fb4caa95e Mon Sep 17 00:00:00 2001 From: Chris Eager Date: Fri, 16 Jun 2023 10:46:02 -0500 Subject: [PATCH] Add worker thread pool to PushFeedbackProcessor --- .../textsecuregcm/WhisperServerService.java | 33 +++-- .../storage/PushFeedbackProcessor.java | 118 +++++++++++------- .../workers/CrawlAccountsCommand.java | 15 ++- .../storage/PushFeedbackProcessorTest.java | 5 +- 4 files changed, 107 insertions(+), 64 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index e3d5fb06c..642ef5a78 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -387,26 +387,33 @@ public void run(WhisperServerConfiguration config, Environment environment) thro ScheduledExecutorService recurringJobExecutor = environment.lifecycle() .scheduledExecutorService(name(getClass(), "recurringJob-%d")).threads(6).build(); - ScheduledExecutorService websocketScheduledExecutor = environment.lifecycle().scheduledExecutorService(name(getClass(), "websocket-%d")).threads(8).build(); - ExecutorService keyspaceNotificationDispatchExecutor = environment.lifecycle().executorService(name(getClass(), "keyspaceNotification-%d")).maxThreads(16).workQueue(keyspaceNotificationDispatchQueue).build(); - ExecutorService apnSenderExecutor = environment.lifecycle().executorService(name(getClass(), "apnSender-%d")).maxThreads(1).minThreads(1).build(); + ScheduledExecutorService websocketScheduledExecutor = environment.lifecycle() + .scheduledExecutorService(name(getClass(), "websocket-%d")).threads(8).build(); + ExecutorService keyspaceNotificationDispatchExecutor = environment.lifecycle() + .executorService(name(getClass(), "keyspaceNotification-%d")).maxThreads(16) + .workQueue(keyspaceNotificationDispatchQueue).build(); + ExecutorService apnSenderExecutor = environment.lifecycle().executorService(name(getClass(), "apnSender-%d")) + .maxThreads(1).minThreads(1).build(); ExecutorService fcmSenderExecutor = environment.lifecycle().executorService(name(getClass(), "fcmSender-%d")) .maxThreads(32).minThreads(32).workQueue(fcmSenderQueue).build(); ExecutorService secureValueRecoveryServiceExecutor = environment.lifecycle() .executorService(name(getClass(), "secureValueRecoveryService-%d")).maxThreads(1).minThreads(1).build(); ExecutorService storageServiceExecutor = environment.lifecycle() .executorService(name(getClass(), "storageService-%d")).maxThreads(1).minThreads(1).build(); - ExecutorService accountDeletionExecutor = environment.lifecycle().executorService(name(getClass(), "accountCleaner-%d")).maxThreads(16).minThreads(16).build(); + ExecutorService accountDeletionExecutor = environment.lifecycle() + .executorService(name(getClass(), "accountCleaner-%d")).maxThreads(16).minThreads(16).build(); + ExecutorService pushFeedbackUpdateExecutor = environment.lifecycle() + .executorService(name(getClass(), "pushFeedback-%d")).maxThreads(4).minThreads(4).build(); Scheduler messageDeliveryScheduler = Schedulers.fromExecutorService( - ExecutorServiceMetrics.monitor(Metrics.globalRegistry, - environment.lifecycle().executorService(name(getClass(), "messageDelivery-%d")) - .minThreads(20) - .maxThreads(20) - .workQueue(messageDeliveryQueue) - .build(), - MetricsUtil.name(getClass(), "messageDeliveryExecutor"), MetricsUtil.PREFIX), - "messageDelivery"); + ExecutorServiceMetrics.monitor(Metrics.globalRegistry, + environment.lifecycle().executorService(name(getClass(), "messageDelivery-%d")) + .minThreads(20) + .maxThreads(20) + .workQueue(messageDeliveryQueue) + .build(), + MetricsUtil.name(getClass(), "messageDeliveryExecutor"), MetricsUtil.PREFIX), + "messageDelivery"); // TODO: generally speaking this is a DynamoDB I/O executor for the accounts table; we should eventually have a general executor for speaking to the accounts table, but most of the server is still synchronous so this isn't widely useful yet ExecutorService batchIdentityCheckExecutor = environment.lifecycle().executorService(name(getClass(), "batchIdentityCheck-%d")).minThreads(32).maxThreads(32).build(); ExecutorService multiRecipientMessageExecutor = environment.lifecycle() @@ -582,7 +589,7 @@ public void run(WhisperServerConfiguration config, Environment environment) thro final List accountDatabaseCrawlerListeners = List.of( new NonNormalizedAccountCrawlerListener(accountsManager, metricsCluster), // PushFeedbackProcessor may update device properties - new PushFeedbackProcessor(accountsManager)); + new PushFeedbackProcessor(accountsManager, pushFeedbackUpdateExecutor)); AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(cacheCluster, AccountDatabaseCrawlerCache.GENERAL_PURPOSE_PREFIX); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PushFeedbackProcessor.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PushFeedbackProcessor.java index 32a408a46..2fbff4c06 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PushFeedbackProcessor.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PushFeedbackProcessor.java @@ -13,20 +13,35 @@ import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Metrics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.Util; public class PushFeedbackProcessor extends AccountDatabaseCrawlerListener { + private static final Logger log = LoggerFactory.getLogger(PushFeedbackProcessor.class); + private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); - private final Meter expired = metricRegistry.meter(name(getClass(), "unregistered", "expired")); - private final Meter recovered = metricRegistry.meter(name(getClass(), "unregistered", "recovered")); + private final Meter expired = metricRegistry.meter(name(getClass(), "unregistered", "expired")); + private final Meter recovered = metricRegistry.meter(name(getClass(), "unregistered", "recovered")); + + private static final Counter UPDATED_ACCOUNT_COUNTER = Metrics.counter( + MetricsUtil.name(PushFeedbackProcessor.class, "updatedAccounts")); + private final AccountsManager accountsManager; + private final ExecutorService updateExecutor; - public PushFeedbackProcessor(AccountsManager accountsManager) { + public PushFeedbackProcessor(AccountsManager accountsManager, ExecutorService updateExecutor) { this.accountsManager = accountsManager; + this.updateExecutor = updateExecutor; } @Override @@ -38,51 +53,68 @@ public void onCrawlEnd() { @Override protected void onCrawlChunk(Optional fromUuid, List chunkAccounts) { - for (Account account : chunkAccounts) { - boolean update = false; - - for (Device device : account.getDevices()) { - if (deviceNeedsUpdate(device)) { - if (deviceExpired(device)) { - if (device.isEnabled()) { - expired.mark(); - update = true; + + final List> updateFutures = chunkAccounts.stream() + .filter(account -> { + boolean update = false; + + for (Device device : account.getDevices()) { + if (deviceNeedsUpdate(device)) { + if (deviceExpired(device)) { + if (device.isEnabled()) { + expired.mark(); + update = true; + } + } else { + recovered.mark(); + update = true; + } } - } else { - recovered.mark(); - update = true; } - } - } - - if (update) { - // fetch a new version, since the chunk is shared and implicitly read-only - accountsManager.getByAccountIdentifier(account.getUuid()).ifPresent(accountToUpdate -> { - accountsManager.update(accountToUpdate, a -> { - for (Device device : a.getDevices()) { - if (deviceNeedsUpdate(device)) { - if (deviceExpired(device)) { - if (!Util.isEmpty(device.getApnId())) { - if (device.getId() == 1) { - device.setUserAgent("OWI"); - } else { - device.setUserAgent("OWP"); + + return update; + }) + .map(account -> CompletableFuture.runAsync(() -> { + // fetch a new version, since the chunk is shared and implicitly read-only + accountsManager.getByAccountIdentifier(account.getUuid()).ifPresent(accountToUpdate -> { + accountsManager.update(accountToUpdate, a -> { + for (Device device : a.getDevices()) { + if (deviceNeedsUpdate(device)) { + if (deviceExpired(device)) { + if (!Util.isEmpty(device.getApnId())) { + if (device.getId() == 1) { + device.setUserAgent("OWI"); + } else { + device.setUserAgent("OWP"); + } + } else if (!Util.isEmpty(device.getGcmId())) { + device.setUserAgent("OWA"); + } + device.setGcmId(null); + device.setApnId(null); + device.setVoipApnId(null); + device.setFetchesMessages(false); + } else { + device.setUninstalledFeedbackTimestamp(0); + } } - } else if (!Util.isEmpty(device.getGcmId())) { - device.setUserAgent("OWA"); } - device.setGcmId(null); - device.setApnId(null); - device.setVoipApnId(null); - device.setFetchesMessages(false); - } else { - device.setUninstalledFeedbackTimestamp(0); - } + }); + }); + }, updateExecutor) + .whenComplete((ignored, throwable) -> { + if (throwable != null) { + log.warn("Failed to update account {}", account.getUuid(), throwable); + } else { + UPDATED_ACCOUNT_COUNTER.increment(); } - } - }); - }); - } + })) + .toList(); + + try { + CompletableFuture.allOf(updateFutures.toArray(new CompletableFuture[0])).join(); + } catch (final Exception e) { + log.debug("Failed to update one or more accounts in chunk", e); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CrawlAccountsCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CrawlAccountsCommand.java index 29618a5fb..fae59d68a 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CrawlAccountsCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CrawlAccountsCommand.java @@ -92,12 +92,6 @@ protected void run(final Environment environment, final Namespace namespace, final FaultTolerantRedisCluster metricsCluster = new FaultTolerantRedisCluster("metrics_cluster", configuration.getMetricsClusterConfiguration(), deps.redisClusterClientResources()); - // TODO listeners must be ordered so that ones that directly update accounts come last, so that read-only ones are not working with stale data - final List accountDatabaseCrawlerListeners = List.of( - new NonNormalizedAccountCrawlerListener(accountsManager, metricsCluster), - // PushFeedbackProcessor may update device properties - new PushFeedbackProcessor(accountsManager)); - final DynamicConfigurationManager dynamicConfigurationManager = new DynamicConfigurationManager<>(configuration.getAppConfig().getApplication(), configuration.getAppConfig().getEnvironment(), @@ -111,6 +105,15 @@ protected void run(final Environment environment, final Namespace namespace, final AccountDatabaseCrawler crawler = switch ((CrawlType) namespace.get(CRAWL_TYPE)) { case GENERAL_PURPOSE -> { + final ExecutorService pushFeedbackUpdateExecutor = environment.lifecycle() + .executorService(name(getClass(), "pushFeedback-%d")).maxThreads(workers).minThreads(workers).build(); + + // TODO listeners must be ordered so that ones that directly update accounts come last, so that read-only ones are not working with stale data + final List accountDatabaseCrawlerListeners = List.of( + new NonNormalizedAccountCrawlerListener(accountsManager, metricsCluster), + // PushFeedbackProcessor may update device properties + new PushFeedbackProcessor(accountsManager, pushFeedbackUpdateExecutor)); + final AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache( cacheCluster, AccountDatabaseCrawlerCache.GENERAL_PURPOSE_PREFIX); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/PushFeedbackProcessorTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/PushFeedbackProcessorTest.java index 131fc30e8..958b868df 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/PushFeedbackProcessorTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/PushFeedbackProcessorTest.java @@ -22,6 +22,7 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -101,7 +102,7 @@ void setup() { @Test void testEmpty() throws AccountDatabaseCrawlerRestartException { - PushFeedbackProcessor processor = new PushFeedbackProcessor(accountsManager); + PushFeedbackProcessor processor = new PushFeedbackProcessor(accountsManager, Executors.newSingleThreadExecutor()); processor.timeAndProcessCrawlChunk(Optional.of(UUID.randomUUID()), Collections.emptyList()); verifyNoInteractions(accountsManager); @@ -109,7 +110,7 @@ void testEmpty() throws AccountDatabaseCrawlerRestartException { @Test void testUpdate() throws AccountDatabaseCrawlerRestartException { - PushFeedbackProcessor processor = new PushFeedbackProcessor(accountsManager); + PushFeedbackProcessor processor = new PushFeedbackProcessor(accountsManager, Executors.newSingleThreadExecutor()); processor.timeAndProcessCrawlChunk(Optional.of(UUID.randomUUID()), List.of(uninstalledAccount, mixedAccount, stillActiveAccount, freshAccount, cleanAccount));