Skip to content

Commit

Permalink
Delete messages lazily on account and device deletion to prevent time…
Browse files Browse the repository at this point in the history
…outs when deleting accounts/devices with large queues
  • Loading branch information
jkt-signal committed Jun 4, 2024
1 parent 4ef6266 commit 01743e5
Show file tree
Hide file tree
Showing 15 changed files with 415 additions and 176 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ public void run(WhisperServerConfiguration config, Environment environment) thro
MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(dynamoDbClient, dynamoDbAsyncClient,
config.getDynamoDbTables().getMessages().getTableName(),
config.getDynamoDbTables().getMessages().getExpiration(),
dynamicConfigurationManager,
messageDeletionAsyncExecutor);
RemoteConfigs remoteConfigs = new RemoteConfigs(dynamoDbClient,
config.getDynamoDbTables().getRemoteConfig().getTableName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ public class DynamicConfiguration {
@Valid
DynamicMetricsConfiguration metricsConfiguration = new DynamicMetricsConfiguration(false);

@JsonProperty
@Valid
DynamicMessagesConfiguration messagesConfiguration = new DynamicMessagesConfiguration();

public Optional<DynamicExperimentEnrollmentConfiguration> getExperimentEnrollmentConfiguration(
final String experimentName) {
return Optional.ofNullable(experiments.get(experimentName));
Expand Down Expand Up @@ -121,4 +125,8 @@ public DynamicMetricsConfiguration getMetricsConfiguration() {
return metricsConfiguration;
}

public DynamicMessagesConfiguration getMessagesConfiguration() {
return messagesConfiguration;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/

package org.whispersystems.textsecuregcm.configuration.dynamic;

import java.util.List;

import javax.validation.constraints.NotNull;

public record DynamicMessagesConfiguration(@NotNull List<DynamoKeyScheme> dynamoKeySchemes) {
public enum DynamoKeyScheme {
TRADITIONAL,
LAZY_DELETION;
}

public DynamicMessagesConfiguration() {
this(List.of(DynamoKeyScheme.TRADITIONAL));
}

public DynamoKeyScheme writeKeyScheme() {
return dynamoKeySchemes().getLast();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ public CompletableFuture<OutgoingMessageEntityList> getPendingMessages(@ReadOnly

return messagesManager.getMessagesForDevice(
auth.getAccount().getUuid(),
auth.getAuthenticatedDevice().getId(),
auth.getAuthenticatedDevice(),
false)
.map(messagesAndHasMore -> {
Stream<Envelope> envelopes = messagesAndHasMore.first().stream();
Expand Down Expand Up @@ -768,7 +768,7 @@ private static long estimateMessageListSizeBytes(final OutgoingMessageEntityList
public CompletableFuture<Response> removePendingMessage(@ReadOnly @Auth AuthenticatedAccount auth, @PathParam("uuid") UUID uuid) {
return messagesManager.delete(
auth.getAccount().getUuid(),
auth.getAuthenticatedDevice().getId(),
auth.getAuthenticatedDevice(),
uuid,
null)
.thenAccept(maybeDeletedMessage -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,13 @@ int persistNextQueues(final Instant currentTime) {
logger.error("No account record found for account {}", accountUuid);
continue;
}
final Optional<Device> maybeDevice = maybeAccount.flatMap(account -> account.getDevice(deviceId));
if (maybeDevice.isEmpty()) {
logger.error("Account {} does not have a device with id {}", accountUuid, deviceId);
continue;
}
try {
persistQueue(maybeAccount.get(), deviceId);
persistQueue(maybeAccount.get(), maybeDevice.get());
} catch (final Exception e) {
persistQueueExceptionMeter.increment();
logger.warn("Failed to persist queue {}::{}; will schedule for retry", accountUuid, deviceId, e);
Expand All @@ -180,8 +185,9 @@ int persistNextQueues(final Instant currentTime) {
}

@VisibleForTesting
void persistQueue(final Account account, final byte deviceId) throws MessagePersistenceException {
void persistQueue(final Account account, final Device device) throws MessagePersistenceException {
final UUID accountUuid = account.getUuid();
final byte deviceId = device.getId();

final Timer.Sample sample = Timer.start();

Expand All @@ -196,7 +202,7 @@ void persistQueue(final Account account, final byte deviceId) throws MessagePers
do {
messages = messagesCache.getMessagesToPersist(accountUuid, deviceId, MESSAGE_BATCH_LIMIT);

int messagesRemovedFromCache = messagesManager.persistMessages(accountUuid, deviceId, messages);
int messagesRemovedFromCache = messagesManager.persistMessages(accountUuid, device, messages);
messageCount += messages.size();

if (messagesRemovedFromCache == 0) {
Expand Down Expand Up @@ -246,7 +252,7 @@ void unlinkLeastActiveDevice(final Account account, byte destinationDeviceId) th
.filter(d -> !d.isPrimary())
.flatMap(d ->
messagesManager
.getEarliestUndeliveredTimestampForDevice(account.getUuid(), d.getId())
.getEarliestUndeliveredTimestampForDevice(account.getUuid(), d)
.map(t -> Tuples.of(d, t)))
.sort(Comparator.comparing(Tuple2::getT2))
.map(Tuple2::getT1)
Expand Down
Loading

0 comments on commit 01743e5

Please sign in to comment.