Skip to content

Commit

Permalink
Add DELETE v1/archives
Browse files Browse the repository at this point in the history
  • Loading branch information
ravi-signal committed Apr 23, 2024
1 parent b3bd4cc commit 9ef1fee
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import io.micrometer.core.instrument.Timer;
import org.apache.commons.lang3.StringUtils;
import org.signal.libsignal.protocol.ecc.Curve;
import org.signal.libsignal.protocol.ecc.ECPublicKey;
Expand All @@ -38,6 +39,7 @@
import org.whispersystems.textsecuregcm.limits.RateLimiter;
import org.whispersystems.textsecuregcm.limits.RateLimiters;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.util.AsyncTimerUtil;
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -50,15 +52,23 @@ public class BackupManager {
static final String MESSAGE_BACKUP_NAME = "messageBackup";
static final long MAX_TOTAL_BACKUP_MEDIA_BYTES = 1024L * 1024L * 1024L * 50L;
static final long MAX_MEDIA_OBJECT_SIZE = 1024L * 1024L * 101L;

// If the last media usage recalculation is over MAX_QUOTA_STALENESS, force a recalculation before quota enforcement.
static final Duration MAX_QUOTA_STALENESS = Duration.ofDays(1);

// How many cdn object deletion requests can be outstanding at a time per backup deletion operation
private static final int DELETION_CONCURRENCY = 10;


private static final String ZK_AUTHN_COUNTER_NAME = MetricsUtil.name(BackupManager.class, "authentication");
private static final String ZK_AUTHZ_FAILURE_COUNTER_NAME = MetricsUtil.name(BackupManager.class,
"authorizationFailure");
private static final String USAGE_RECALCULATION_COUNTER_NAME = MetricsUtil.name(BackupManager.class,
"usageRecalculation");
private static final String DELETE_COUNT_DISTRIBUTION_NAME = MetricsUtil.name(BackupManager.class,
"deleteCount");
private static final Timer SYNCHRONOUS_DELETE_TIMER =
Metrics.timer(MetricsUtil.name(BackupManager.class, "synchronousDelete"));

private static final String SUCCESS_TAG_NAME = "success";
private static final String FAILURE_REASON_TAG_NAME = "reason";
Expand Down Expand Up @@ -317,7 +327,7 @@ private URI attachmentReadUri(final int cdn, final String key) throws IOExceptio
* Generate credentials that can be used to read from the backup CDN
*
* @param backupUser an already ZK authenticated backup user
* @param cdnNumber the cdn number to get backup credentials for
* @param cdnNumber the cdn number to get backup credentials for
* @return A map of headers to include with CDN requests
*/
public Map<String, String> generateReadAuth(final AuthenticatedBackupUser backupUser, final int cdnNumber) {
Expand Down Expand Up @@ -366,6 +376,16 @@ public CompletionStage<ListMediaResult> list(
));
}

public CompletableFuture<Void> deleteEntireBackup(final AuthenticatedBackupUser backupUser) {
checkBackupTier(backupUser, BackupTier.MESSAGES);
return backupsDb
// Try to swap out the backupDir for the user
.scheduleBackupDeletion(backupUser)
// If there was already a pending swap, try to delete the cdn objects directly
.exceptionallyCompose(ExceptionUtils.exceptionallyHandler(BackupsDb.PendingDeletionException.class,e ->
AsyncTimerUtil.record(SYNCHRONOUS_DELETE_TIMER, () ->
deletePrefix(backupUser.backupDir(), DELETION_CONCURRENCY))));
}

private sealed interface Either permits DeleteSuccess, DeleteFailure {}

Expand Down Expand Up @@ -494,7 +514,10 @@ public Flux<ExpiredBackup> getExpiredBackups(final int segments, final Scheduler
*/
public CompletableFuture<Void> expireBackup(final ExpiredBackup expiredBackup) {
return backupsDb.startExpiration(expiredBackup)
.thenCompose(ignored -> deletePrefix(expiredBackup.prefixToDelete()))
// the deletion operation is effectively single threaded -- it's expected that the caller can increase
// concurrency by deleting more backups at once, rather than increasing concurrency deleting an individual
// backup
.thenCompose(ignored -> deletePrefix(expiredBackup.prefixToDelete(), 1))
.thenCompose(ignored -> backupsDb.finishExpiration(expiredBackup));
}

Expand All @@ -504,7 +527,7 @@ public CompletableFuture<Void> expireBackup(final ExpiredBackup expiredBackup) {
* @param prefixToDelete The prefix to expire.
* @return A stage that completes when all objects with the given prefix have been deleted
*/
private CompletableFuture<Void> deletePrefix(final String prefixToDelete) {
private CompletableFuture<Void> deletePrefix(final String prefixToDelete, int concurrentDeletes) {
if (prefixToDelete.length() != BackupsDb.BACKUP_DIRECTORY_PATH_LENGTH
&& prefixToDelete.length() != BackupsDb.MEDIA_DIRECTORY_PATH_LENGTH) {
throw new IllegalArgumentException("Unexpected prefix deletion for " + prefixToDelete);
Expand All @@ -519,10 +542,9 @@ private CompletableFuture<Void> deletePrefix(final String prefixToDelete) {
return Mono.fromCompletionStage(() -> this.remoteStorageManager.list(prefix, listResult.cursor(), 1000));
})
.flatMap(listResult -> Flux.fromIterable(listResult.objects()))
// Delete the objects. concatMap effectively makes the deletion operation single threaded -- it's expected
// the caller can increase concurrency by deleting more backups at once, rather than increasing concurrency
// deleting an individual backup
.concatMap(result -> Mono.fromCompletionStage(() -> remoteStorageManager.delete(prefix + result.key())))
.flatMap(
result -> Mono.fromCompletionStage(() -> remoteStorageManager.delete(prefix + result.key())),
concurrentDeletes)
.count()
.doOnSuccess(itemsRemoved -> DistributionSummary.builder(DELETE_COUNT_DISTRIBUTION_NAME)
.publishPercentileHistogram(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package org.whispersystems.textsecuregcm.backup;

import io.grpc.Status;
import java.io.IOException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
Expand Down Expand Up @@ -259,6 +260,55 @@ CompletableFuture<Void> addMessageBackup(final AuthenticatedBackupUser backupUse
.thenRun(Util.NOOP);
}

/**
* Indicates that we couldn't schedule a deletion because one was already scheduled. The caller may want to delete the
* objects directly.
*/
class PendingDeletionException extends IOException {}

/**
* Attempt to mark a backup as expired and swap in a new empty backupDir for the user.
* <p>
* After successful completion, the backupDir for the backup-id will be swapped to a new empty directory on the cdn,
* and the row will be immediately marked eligible for expiration via {@link #getExpiredBackups}.
* <p>
* If there is already a pending deletion, this will not swap the backupDir. The expiration timestamps will be
* updated, but the existing backupDir will remain. The caller should handle this case and start the deletion
* immediately by catching {@link PendingDeletionException}.
*
* @param backupUser The backupUser whose data should be eventually deleted
* @return A future that completes successfully if the user's data is now inaccessible, or with a
* {@link PendingDeletionException} if the backupDir could not be changed.
*/
CompletableFuture<Void> scheduleBackupDeletion(final AuthenticatedBackupUser backupUser) {
final byte[] hashedBackupId = hashedBackupId(backupUser);

// Clear usage metadata, swap names of things we intend to delete, and record our intent to delete in attr_expired_prefix
return dynamoClient.updateItem(new UpdateBuilder(backupTableName, BackupTier.MEDIA, hashedBackupId)
.clearMediaUsage(clock)
.expireDirectoryNames(secureRandom, ExpiredBackup.ExpirationType.ALL)
.setRefreshTimes(Instant.ofEpochSecond(0))
.addSetExpression("#expiredPrefix = :expiredPrefix",
Map.entry("#expiredPrefix", ATTR_EXPIRED_PREFIX),
Map.entry(":expiredPrefix", AttributeValues.s(backupUser.backupDir())))
.withConditionExpression("attribute_not_exists(#expiredPrefix) OR #expiredPrefix = :expiredPrefix")
.updateItemBuilder()
.build())
.exceptionallyCompose(ExceptionUtils.exceptionallyHandler(ConditionalCheckFailedException.class, e ->
// We already have a pending deletion for this backup-id. This is most likely to occur when the caller
// is toggling backups on and off. In this case, it should be pretty cheap to directly delete the backup.
// Instead of changing the backupDir, just make sure the row has expired/ timestamps and tell the caller we
// couldn't schedule the deletion.
dynamoClient.updateItem(new UpdateBuilder(backupTableName, BackupTier.MEDIA, hashedBackupId)
.setRefreshTimes(Instant.ofEpochSecond(0))
.updateItemBuilder()
.build())
.thenApply(ignore -> {
throw ExceptionUtils.wrap(new PendingDeletionException());
})))
.thenRun(Util.NOOP);
}

record BackupDescription(int cdn, Optional<Long> mediaUsedSpace) {}

/**
Expand Down Expand Up @@ -349,15 +399,7 @@ CompletableFuture<Void> startExpiration(final ExpiredBackup expiredBackup) {

// Clear usage metadata, swap names of things we intend to delete, and record our intent to delete in attr_expired_prefix
return dynamoClient.updateItem(new UpdateBuilder(backupTableName, BackupTier.MEDIA, expiredBackup.hashedBackupId())
.addSetExpression("#mediaBytesUsed = :mediaBytesUsed",
Map.entry("#mediaBytesUsed", ATTR_MEDIA_BYTES_USED),
Map.entry(":mediaBytesUsed", AttributeValues.n(0L)))
.addSetExpression("#mediaCount = :mediaCount",
Map.entry("#mediaCount", ATTR_MEDIA_COUNT),
Map.entry(":mediaCount", AttributeValues.n(0L)))
.addSetExpression("#mediaRecalc = :mediaRecalc",
Map.entry("#mediaRecalc", ATTR_MEDIA_USAGE_LAST_RECALCULATION),
Map.entry(":mediaRecalc", AttributeValues.n(clock.instant().getEpochSecond())))
.clearMediaUsage(clock)
.expireDirectoryNames(secureRandom, expiredBackup.expirationType())
.addRemoveExpression(Map.entry("#mediaRefresh", ATTR_LAST_MEDIA_REFRESH))
.addSetExpression("#expiredPrefix = :expiredPrefix",
Expand Down Expand Up @@ -587,6 +629,19 @@ UpdateBuilder incrementMediaBytes(long delta) {
return this;
}

UpdateBuilder clearMediaUsage(final Clock clock) {
addSetExpression("#mediaBytesUsed = :mediaBytesUsed",
Map.entry("#mediaBytesUsed", ATTR_MEDIA_BYTES_USED),
Map.entry(":mediaBytesUsed", AttributeValues.n(0L)));
addSetExpression("#mediaCount = :mediaCount",
Map.entry("#mediaCount", ATTR_MEDIA_COUNT),
Map.entry(":mediaCount", AttributeValues.n(0L)));
addSetExpression("#mediaRecalc = :mediaRecalc",
Map.entry("#mediaRecalc", ATTR_MEDIA_USAGE_LAST_RECALCULATION),
Map.entry(":mediaRecalc", AttributeValues.n(clock.instant().getEpochSecond())));
return this;
}

UpdateBuilder setDirectoryNamesIfMissing(final SecureRandom secureRandom) {
final String backupDir = generateDirName(secureRandom);
final String mediaDir = generateDirName(secureRandom);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import javax.ws.rs.BadRequestException;
import javax.ws.rs.ClientErrorException;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
Expand Down Expand Up @@ -855,4 +856,33 @@ public CompletionStage<Response> deleteMedia(
.toList()))
.thenApply(Util.ASYNC_EMPTY_RESPONSE);
}

@DELETE
@Produces(MediaType.APPLICATION_JSON)
@Operation(summary = "Delete entire backup",
description = """
Delete all backup metadata, objects, and stored public key. To use backups again, a public key must be resupplied.
""")
@ApiResponse(responseCode = "204", description = "The backup has been successfully removed")
@ApiResponse(responseCode = "429", description = "Rate limited.")
@ApiResponseZkAuth
public CompletionStage<Response> deleteBackup(
@ReadOnly @Auth final Optional<AuthenticatedAccount> account,

@Parameter(description = BackupAuthCredentialPresentationHeader.DESCRIPTION, schema = @Schema(implementation = String.class))
@NotNull
@HeaderParam(X_SIGNAL_ZK_AUTH) final BackupAuthCredentialPresentationHeader presentation,

@Parameter(description = BackupAuthCredentialPresentationSignature.DESCRIPTION, schema = @Schema(implementation = String.class))
@NotNull
@HeaderParam(X_SIGNAL_ZK_AUTH_SIGNATURE) final BackupAuthCredentialPresentationSignature signature) {
if (account.isPresent()) {
throw new BadRequestException("must not use authenticated connection for anonymous operations");
}
return backupManager
.authenticateBackupUser(presentation.presentation, signature.signature)
.thenCompose(backupManager::deleteEntireBackup)
.thenApply(Util.ASYNC_EMPTY_RESPONSE);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,39 @@ public void list(final String cursorVal) {

}

@Test
public void deleteEntireBackup() {
final AuthenticatedBackupUser original = backupUser(TestRandomUtil.nextBytes(16), BackupTier.MEDIA);

testClock.pin(Instant.ofEpochSecond(10));

// Deleting should swap the backupDir for the user
backupManager.deleteEntireBackup(original).join();
verifyNoInteractions(remoteStorageManager);
final AuthenticatedBackupUser after = retrieveBackupUser(original.backupId(), BackupTier.MEDIA);
assertThat(original.backupDir()).isNotEqualTo(after.backupDir());
assertThat(original.mediaDir()).isNotEqualTo(after.mediaDir());

// Trying again should do the deletion inline
when(remoteStorageManager.list(anyString(), any(), anyLong()))
.thenReturn(CompletableFuture.completedFuture(new RemoteStorageManager.ListResult(
Collections.emptyList(),
Optional.empty()
)));
backupManager.deleteEntireBackup(after).join();
verify(remoteStorageManager, times(1))
.list(eq(after.backupDir() + "/"), eq(Optional.empty()), anyLong());

// The original prefix to expire should be flagged as requiring expiration
final ExpiredBackup expiredBackup = backupManager
.getExpiredBackups(1, Schedulers.immediate(), Instant.ofEpochSecond(1L))
.collectList().block()
.getFirst();
assertThat(expiredBackup.hashedBackupId()).isEqualTo(hashedBackupId(original.backupId()));
assertThat(expiredBackup.prefixToDelete()).isEqualTo(original.backupDir());
assertThat(expiredBackup.expirationType()).isEqualTo(ExpiredBackup.ExpirationType.GARBAGE_COLLECTION);
}

@Test
public void delete() {
final AuthenticatedBackupUser backupUser = backupUser(TestRandomUtil.nextBytes(16), BackupTier.MEDIA);
Expand Down Expand Up @@ -778,6 +811,9 @@ private static byte[] hashedBackupId(final byte[] backupId) {
}
}

/**
* Create BackupUser with the provided backupId and tier
*/
private AuthenticatedBackupUser backupUser(final byte[] backupId, final BackupTier backupTier) {
// Won't actually validate the public key, but need to have a public key to perform BackupsDB operations
byte[] privateKey = new byte[32];
Expand All @@ -787,7 +823,14 @@ private AuthenticatedBackupUser backupUser(final byte[] backupId, final BackupTi
} catch (InvalidKeyException e) {
throw new RuntimeException(e);
}
return new AuthenticatedBackupUser(backupId, backupTier, BackupsDb.generateDirName(secureRandom),
BackupsDb.generateDirName(secureRandom));
return retrieveBackupUser(backupId, backupTier);
}

/**
* Retrieve an existing BackupUser from the database
*/
private AuthenticatedBackupUser retrieveBackupUser(final byte[] backupId, final BackupTier backupTier) {
final BackupsDb.AuthenticationData authData = backupsDb.retrieveAuthenticationData(backupId).join().get();
return new AuthenticatedBackupUser(backupId, backupTier, authData.backupDir(), authData.mediaDir());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,4 +209,10 @@ public void expirationFailed(ExpiredBackup.ExpirationType expirationType) {
private AuthenticatedBackupUser backupUser(final byte[] backupId, final BackupTier backupTier) {
return new AuthenticatedBackupUser(backupId, backupTier, "myBackupDir", "myMediaDir");
}

private AuthenticatedBackupUser backupUserFromDb(final byte[] backupId, final BackupTier backupTier) {
final BackupsDb.AuthenticationData authenticationData = backupsDb.retrieveAuthenticationData(backupId).join().get();
return new AuthenticatedBackupUser(backupId, backupTier,
authenticationData.backupDir(), authenticationData.mediaDir());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public void setUp() {
GET, v1/archives/media/upload/form,
POST, v1/archives/,
PUT, v1/archives/keys, '{"backupIdPublicKey": "aaaaa"}'
DELETE, v1/archives,
PUT, v1/archives/media, '{
"sourceAttachment": {"cdn": 3, "key": "abc"},
"objectLength": 10,
Expand Down Expand Up @@ -603,6 +604,22 @@ public void readAuthInvalidParam() throws VerificationFailedException {
assertThat(response.getStatus()).isEqualTo(400);
}

@Test
public void deleteEntireBackup() throws VerificationFailedException {
final BackupAuthCredentialPresentation presentation =
backupAuthTestUtil.getPresentation(BackupTier.MEDIA, backupKey, aci);
when(backupManager.authenticateBackupUser(any(), any()))
.thenReturn(CompletableFuture.completedFuture(backupUser(presentation.getBackupId(), BackupTier.MEDIA)));
when(backupManager.deleteEntireBackup(any())).thenReturn(CompletableFuture.completedFuture(null));
Response response = resources.getJerseyTest()
.target("v1/archives/")
.request()
.header("X-Signal-ZK-Auth", Base64.getEncoder().encodeToString(presentation.serialize()))
.header("X-Signal-ZK-Auth-Signature", "aaa")
.delete();
assertThat(response.getStatus()).isEqualTo(204);
}

private static AuthenticatedBackupUser backupUser(byte[] backupId, BackupTier backupTier) {
return new AuthenticatedBackupUser(backupId, backupTier, "myBackupDir", "myMediaDir");
}
Expand Down

0 comments on commit 9ef1fee

Please sign in to comment.