Skip to content

Commit

Permalink
Make copy/delete streaming friendly
Browse files Browse the repository at this point in the history
  • Loading branch information
ravi-signal authored Jun 20, 2024
1 parent c27898a commit 4aadabf
Show file tree
Hide file tree
Showing 6 changed files with 434 additions and 319 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.signal.libsignal.protocol.ecc.Curve;
import org.signal.libsignal.protocol.ecc.ECPublicKey;
import org.signal.libsignal.zkgroup.GenericServerSecretParams;
Expand All @@ -32,7 +33,6 @@
import org.whispersystems.textsecuregcm.attachments.AttachmentGenerator;
import org.whispersystems.textsecuregcm.attachments.TusAttachmentGenerator;
import org.whispersystems.textsecuregcm.auth.AuthenticatedBackupUser;
import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException;
import org.whispersystems.textsecuregcm.limits.RateLimiter;
import org.whispersystems.textsecuregcm.limits.RateLimiters;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
Expand All @@ -56,6 +56,9 @@ public class BackupManager {
// How many cdn object deletion requests can be outstanding at a time per backup deletion operation
private static final int DELETION_CONCURRENCY = 10;

// How many cdn object copy requests can be outstanding at a time per batch copy-to-backup operation
private static final int COPY_CONCURRENCY = 10;


private static final String ZK_AUTHN_COUNTER_NAME = MetricsUtil.name(BackupManager.class, "authentication");
private static final String ZK_AUTHZ_FAILURE_COUNTER_NAME = MetricsUtil.name(BackupManager.class,
Expand Down Expand Up @@ -148,7 +151,7 @@ public CompletableFuture<BackupUploadDescriptor> createMessageBackupUploadDescri
.thenApply(result -> cdn3BackupCredentialGenerator.generateUpload(cdnMessageBackupName(backupUser)));
}

public CompletionStage<BackupUploadDescriptor> createTemporaryAttachmentUploadDescriptor(
public CompletableFuture<BackupUploadDescriptor> createTemporaryAttachmentUploadDescriptor(
final AuthenticatedBackupUser backupUser) {
checkBackupLevel(backupUser, BackupLevel.MEDIA);

Expand All @@ -160,7 +163,7 @@ public CompletionStage<BackupUploadDescriptor> createTemporaryAttachmentUploadDe
final String attachmentKey = Base64.getUrlEncoder().encodeToString(bytes);
final AttachmentGenerator.Descriptor descriptor = tusAttachmentGenerator.generateAttachment(attachmentKey);
return new BackupUploadDescriptor(3, attachmentKey, descriptor.headers(), descriptor.signedUploadLocation());
});
}).toCompletableFuture();
}

/**
Expand Down Expand Up @@ -195,19 +198,96 @@ public CompletableFuture<BackupInfo> backupInfo(final AuthenticatedBackupUser ba
}

/**
* Check if there is enough capacity to store the requested amount of media
* Copy an encrypted object to the backup cdn, adding a layer of encryption
* <p>
* Implementation notes: <p> This method guarantees that any object that gets successfully copied to the backup cdn
* will also be deducted from the user's quota. </p>
* <p>
* However, the converse isn't true. It's possible we may charge the user for media they failed to copy. As a result,
* the quota may be over reported. It should be recalculated before taking quota enforcement actions.
*
* @param backupUser an already ZK authenticated backup user
* @param mediaLength the desired number of media bytes to store
* @return true if mediaLength bytes can be stored
* @return A Flux that emits the locations of the double-encrypted objects on the backup cdn, or includes an error
* detailing why the object could not be copied.
*/
public CompletableFuture<Boolean> canStoreMedia(final AuthenticatedBackupUser backupUser, final long mediaLength) {
public Flux<CopyResult> copyToBackup(final AuthenticatedBackupUser backupUser, List<CopyParameters> toCopy) {
checkBackupLevel(backupUser, BackupLevel.MEDIA);

return Mono
// Figure out how many objects we're allowed to copy, updating the quota usage for the amount we are allowed
.fromFuture(enforceQuota(backupUser, toCopy))

// Copy the ones we have enough quota to hold
.flatMapMany(quotaResult -> Flux.concat(

// These fit in our remaining quota, so perform the copy. If the copy fails, our estimated quota usage may not
// be exact since we already updated our usage. We make a best-effort attempt to undo the usage update if we
// know that the copied failed for sure though.
Flux.fromIterable(quotaResult.requestsToCopy()).flatMapSequential(
copyParams -> copyToBackup(backupUser, copyParams)
.flatMap(copyResult -> switch (copyResult.outcome()) {
case SUCCESS -> Mono.just(copyResult);
case SOURCE_WRONG_LENGTH, SOURCE_NOT_FOUND, OUT_OF_QUOTA -> Mono
.fromFuture(this.backupsDb.trackMedia(backupUser, -1, -copyParams.destinationObjectSize()))
.thenReturn(copyResult);
}),
COPY_CONCURRENCY),

// There wasn't enough quota remaining to perform these copies
Flux.fromIterable(quotaResult.requestsToReject())
.map(arg -> new CopyResult(CopyResult.Outcome.OUT_OF_QUOTA, arg.destinationMediaId(), null))));
}

private Mono<CopyResult> copyToBackup(final AuthenticatedBackupUser backupUser, final CopyParameters copyParameters) {
return Mono.fromCompletionStage(() -> remoteStorageManager.copy(
copyParameters.sourceCdn(), copyParameters.sourceKey(), copyParameters.sourceLength(),
copyParameters.encryptionParameters(),
cdnMediaPath(backupUser, copyParameters.destinationMediaId())))

// Successfully copied!
.thenReturn(new CopyResult(
CopyResult.Outcome.SUCCESS, copyParameters.destinationMediaId(), remoteStorageManager.cdnNumber()))

// Otherwise, squash per-item copy errors that don't fail the entire operation
.onErrorResume(
// If the error maps to an explicit result type
throwable ->
CopyResult.fromCopyError(throwable, copyParameters.destinationMediaId()).isPresent(),
// return that result type instead of propagating the error
throwable ->
Mono.just(CopyResult.fromCopyError(throwable, copyParameters.destinationMediaId()).orElseThrow()));
}

private record QuotaResult(List<CopyParameters> requestsToCopy, List<CopyParameters> requestsToReject) {}

/**
* Determine which copy requests can be performed with the user's remaining quota and update the used quota. If a copy
* request subsequently fails, the caller should attempt to restore the quota for the failed copy.
*
* @param backupUser The user quota to update
* @param toCopy The proposed copy requests
* @return QuotaResult indicating which requests fit into the remaining quota and which requests should be rejected
* with {@link CopyResult.Outcome#OUT_OF_QUOTA}
*/
private CompletableFuture<QuotaResult> enforceQuota(
final AuthenticatedBackupUser backupUser,
final List<CopyParameters> toCopy) {
final long totalBytesAdded = toCopy.stream()
.mapToLong(copyParameters -> {
if (copyParameters.sourceLength() > MAX_MEDIA_OBJECT_SIZE) {
throw Status.INVALID_ARGUMENT
.withDescription("Invalid sourceObject size")
.asRuntimeException();
}
return copyParameters.destinationObjectSize();
})
.sum();

return backupsDb.getMediaUsage(backupUser)
.thenComposeAsync(info -> {
final boolean canStore = MAX_TOTAL_BACKUP_MEDIA_BYTES - info.usageInfo().bytesUsed() >= mediaLength;
long remainingQuota = MAX_TOTAL_BACKUP_MEDIA_BYTES - info.usageInfo().bytesUsed();
final boolean canStore = remainingQuota >= totalBytesAdded;
if (canStore || info.lastRecalculationTime().isAfter(clock.instant().minus(MAX_QUOTA_STALENESS))) {
return CompletableFuture.completedFuture(canStore);
return CompletableFuture.completedFuture(remainingQuota);
}

// The user is out of quota, and we have not recently recalculated the user's usage. Double check by doing a
Expand All @@ -221,68 +301,45 @@ public CompletableFuture<Boolean> canStoreMedia(final AuthenticatedBackupUser ba
Metrics.counter(USAGE_RECALCULATION_COUNTER_NAME, "usageChanged", String.valueOf(usageChanged))
.increment();
})
.thenApply(newUsage -> MAX_TOTAL_BACKUP_MEDIA_BYTES - newUsage.bytesUsed() >= mediaLength);
.thenApply(newUsage -> MAX_TOTAL_BACKUP_MEDIA_BYTES - newUsage.bytesUsed());
})
.thenCompose(remainingQuota -> {
// Figure out how many of the requested objects fit in the remaining quota
final int index = indexWhereTotalExceeds(toCopy, CopyParameters::destinationObjectSize,
remainingQuota);
final QuotaResult result = new QuotaResult(toCopy.subList(0, index),
toCopy.subList(index, toCopy.size()));
if (index == 0) {
// Skip the usage update if we're not able to write anything
return CompletableFuture.completedFuture(result);
}

// Update the usage
final long quotaToConsume = result.requestsToCopy.stream()
.mapToLong(CopyParameters::destinationObjectSize)
.sum();
return backupsDb.trackMedia(backupUser, index, quotaToConsume).thenApply(ignored -> result);
});
}

public record StorageDescriptor(int cdn, byte[] key) {}

public record StorageDescriptorWithLength(int cdn, byte[] key, long length) {}

/**
* Copy an encrypted object to the backup cdn, adding a layer of encryption
* <p>
* Implementation notes: <p> This method guarantees that any object that gets successfully copied to the backup cdn
* will also be deducted from the user's quota. </p>
* <p>
* However, the converse isn't true. It's possible we may charge the user for media they failed to copy. As a result,
* the quota may be over reported and it should be recalculated before taking quota enforcement actions.
*
* @return A stage that completes successfully with location of the twice-encrypted object on the backup cdn. The
* returned CompletionStage can be completed exceptionally with the following exceptions.
* <ul>
* <li> {@link InvalidLengthException} If the expectedSourceLength does not match the length of the sourceUri </li>
* <li> {@link SourceObjectNotFoundException} If the no object at sourceUri is found </li>
* <li> {@link java.io.IOException} If there was a generic IO issue </li>
* </ul>
* @return the largest index i such that sum(ts[0],...ts[i - 1]) <= max
*/
public CompletableFuture<StorageDescriptor> copyToBackup(
final AuthenticatedBackupUser backupUser,
final int sourceCdn,
final String sourceKey,
final int sourceLength,
final MediaEncryptionParameters encryptionParameters,
final byte[] destinationMediaId) {
checkBackupLevel(backupUser, BackupLevel.MEDIA);
if (sourceLength > MAX_MEDIA_OBJECT_SIZE) {
throw Status.INVALID_ARGUMENT
.withDescription("Invalid sourceObject size")
.asRuntimeException();
private static <T> int indexWhereTotalExceeds(List<T> ts, Function<T, Long> valueFunction, long max) {
long sum = 0;
for (int index = 0; index < ts.size(); index++) {
sum += valueFunction.apply(ts.get(index));
if (sum > max) {
return index;
}
}
return ts.size();
}

final String destination = cdnMediaPath(backupUser, destinationMediaId);
final int destinationLength = encryptionParameters.outputSize(sourceLength);
return this.backupsDb
// Write the ddb updates before actually updating backing storage
.trackMedia(backupUser, 1, destinationLength)

// Actually copy the objects. If the copy fails, our estimated quota usage may not be exact
.thenComposeAsync(ignored ->
remoteStorageManager.copy(sourceCdn, sourceKey, sourceLength, encryptionParameters, destination))
.exceptionallyCompose(throwable -> {
final Throwable unwrapped = ExceptionUtils.unwrap(throwable);
if (!(unwrapped instanceof SourceObjectNotFoundException) && !(unwrapped instanceof InvalidLengthException)) {
throw ExceptionUtils.wrap(unwrapped);
}
// In cases where we know the copy fails without writing anything, we can try to restore the user's quota
return this.backupsDb.trackMedia(backupUser, -1, -destinationLength).whenComplete((ignored, ignoredEx) -> {
throw ExceptionUtils.wrap(unwrapped);
});
})
// indicates where the backup was stored
.thenApply(ignore -> new StorageDescriptor(remoteStorageManager.cdnNumber(), destinationMediaId));

}
public record StorageDescriptor(int cdn, byte[] key) {}

public record StorageDescriptorWithLength(int cdn, byte[] key, long length) {}

/**
* Generate credentials that can be used to read from the backup CDN
Expand Down Expand Up @@ -348,66 +405,60 @@ public CompletableFuture<Void> deleteEntireBackup(final AuthenticatedBackupUser
deletePrefix(backupUser.backupDir(), DELETION_CONCURRENCY))));
}

private sealed interface Either permits DeleteSuccess, DeleteFailure {}

private record DeleteSuccess(long usage) implements Either {}

private record DeleteFailure(Throwable e) implements Either {}

public CompletableFuture<Void> delete(final AuthenticatedBackupUser backupUser,
public Flux<StorageDescriptor> deleteMedia(final AuthenticatedBackupUser backupUser,
final List<StorageDescriptor> storageDescriptors) {
checkBackupLevel(backupUser, BackupLevel.MESSAGES);

// Check for a cdn we don't know how to process
if (storageDescriptors.stream().anyMatch(sd -> sd.cdn() != remoteStorageManager.cdnNumber())) {
throw Status.INVALID_ARGUMENT
.withDescription("unsupported media cdn provided")
.asRuntimeException();
}

return Flux
.fromIterable(storageDescriptors)

// Issue deletes for all storage descriptors (proceeds with default flux concurrency)
.flatMap(descriptor -> Mono.fromCompletionStage(
remoteStorageManager
.delete(cdnMediaPath(backupUser, descriptor.key))
// Squash errors/success into a single type
.handle((bytesDeleted, throwable) -> throwable != null
? new DeleteFailure(throwable)
: new DeleteSuccess(bytesDeleted))
))

// Update backupsDb with the change in usage
.collectList()
.<Void>flatMap(eithers -> {
// count up usage changes
long totalBytesDeleted = 0;
long totalCountDeleted = 0;
final List<Throwable> toThrow = new ArrayList<>();
for (Either either : eithers) {
switch (either) {
case DeleteFailure f:
toThrow.add(f.e());
break;
case DeleteSuccess s when s.usage() > 0:
totalBytesDeleted += s.usage();
totalCountDeleted++;
break;
default:
break;
}
}
final Mono<Void> result = toThrow.isEmpty()
? Mono.empty()
: Mono.error(toThrow.stream().reduce((t1, t2) -> {
t1.addSuppressed(t2);
return t1;
}).get());
return Mono
.fromCompletionStage(this.backupsDb.trackMedia(backupUser, -totalCountDeleted, -totalBytesDeleted))
.then(result);
})
.toFuture();
return Flux.usingWhen(

// Gather usage updates into the UsageBatcher to apply during the cleanup operation
Mono.just(new UsageBatcher()),

// Deletes the objects, returning their former location. Tracks bytes removed so the quota can be updated on
// completion
batcher -> Flux.fromIterable(storageDescriptors)
.flatMapSequential(sd -> Mono
// Delete the object
.fromCompletionStage(remoteStorageManager.delete(cdnMediaPath(backupUser, sd.key())))
// Track how much the remote storage manager indicated was deleted as part of the operation
.doOnNext(deletedBytes -> batcher.update(-deletedBytes))
.thenReturn(sd), DELETION_CONCURRENCY),

// On cleanup, update the quota using whatever updates were accumulated in the batcher
batcher ->
Mono.fromFuture(backupsDb.trackMedia(backupUser, batcher.countDelta.get(), batcher.usageDelta.get())));
}

/**
* Track pending media usage updates
*/
private static class UsageBatcher {

AtomicLong countDelta = new AtomicLong();
AtomicLong usageDelta = new AtomicLong();

/**
* Stage a usage update that will be applied later
*
* @param bytesDelta The amount of bytes that should be tracked as used (or if negative, freed). If the delta is
* non-zero, the count will also be updated.
*/
void update(long bytesDelta) {
if (bytesDelta < 0) {
countDelta.decrementAndGet();
} else if (bytesDelta > 0) {
countDelta.incrementAndGet();
}
usageDelta.addAndGet(bytesDelta);
}
}

private static final ECPublicKey INVALID_PUBLIC_KEY = Curve.generateKeyPair().getPublicKey();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.backup;

/**
* Descriptor for a single copy-and-encrypt operation
*
* @param sourceCdn The cdn of the object to copy
* @param sourceKey The mediaId within the cdn of the object to copy
* @param sourceLength The length of the object to copy
* @param encryptionParameters Encryption parameters to double encrypt the object
* @param destinationMediaId The mediaId of the destination object
*/
public record CopyParameters(
int sourceCdn,
String sourceKey,
int sourceLength,
MediaEncryptionParameters encryptionParameters,
byte[] destinationMediaId) {

/**
* @return The size of the double-encrypted destination object after it is copied
*/
long destinationObjectSize() {
return encryptionParameters().outputSize(sourceLength());
}
}
Loading

0 comments on commit 4aadabf

Please sign in to comment.