Skip to content

Commit

Permalink
Add media deletion endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
ravi-signal committed Jan 11, 2024
1 parent e934ead commit cc6cf81
Show file tree
Hide file tree
Showing 11 changed files with 340 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.net.URI;
import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HexFormat;
import java.util.List;
Expand All @@ -30,6 +31,8 @@
import org.whispersystems.textsecuregcm.auth.AuthenticatedBackupUser;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class BackupManager {

Expand Down Expand Up @@ -125,12 +128,10 @@ public CompletableFuture<Void> setPublicKey(
*/
public CompletableFuture<MessageBackupUploadDescriptor> createMessageBackupUploadDescriptor(
final AuthenticatedBackupUser backupUser) {
final String encodedBackupId = encodeBackupIdForCdn(backupUser);

// this could race with concurrent updates, but the only effect would be last-writer-wins on the timestamp
return backupsDb
.addMessageBackup(backupUser)
.thenApply(result -> cdn3BackupCredentialGenerator.generateUpload(encodedBackupId, MESSAGE_BACKUP_NAME));
.thenApply(result -> cdn3BackupCredentialGenerator.generateUpload(cdnMessageBackupName(backupUser)));
}

/**
Expand Down Expand Up @@ -194,8 +195,7 @@ public CompletableFuture<Boolean> canStoreMedia(final AuthenticatedBackupUser ba

// The user is out of quota, and we have not recently recalculated the user's usage. Double check by doing a
// hard recalculation before actually forbidding the user from storing additional media.
final String mediaPrefix = "%s/%s/".formatted(encodeBackupIdForCdn(backupUser), MEDIA_DIRECTORY_NAME);
return this.remoteStorageManager.calculateBytesUsed(mediaPrefix)
return this.remoteStorageManager.calculateBytesUsed(cdnMediaDirectory(backupUser))
.thenCompose(usage -> backupsDb
.setMediaUsage(backupUser, usage)
.thenApply(ignored -> usage))
Expand Down Expand Up @@ -249,15 +249,14 @@ public CompletableFuture<StorageDescriptor> copyToBackup(
}

final MessageBackupUploadDescriptor dst = cdn3BackupCredentialGenerator.generateUpload(
encodeBackupIdForCdn(backupUser),
"%s/%s".formatted(MEDIA_DIRECTORY_NAME, encodeForCdn(destinationMediaId)));
cdnMediaPath(backupUser, destinationMediaId));

final int destinationLength = encryptionParameters.outputSize(sourceLength);

final URI sourceUri = attachmentReadUri(sourceCdn, sourceKey);
return this.backupsDb
// Write the ddb updates before actually updating backing storage
.trackMedia(backupUser, destinationLength)
.trackMedia(backupUser, 1, destinationLength)

// Actually copy the objects. If the copy fails, our estimated quota usage may not be exact
.thenComposeAsync(ignored -> remoteStorageManager.copy(sourceUri, sourceLength, encryptionParameters, dst))
Expand All @@ -267,7 +266,7 @@ public CompletableFuture<StorageDescriptor> copyToBackup(
throw ExceptionUtils.wrap(unwrapped);
}
// In cases where we know the copy fails without writing anything, we can try to restore the user's quota
return this.backupsDb.trackMedia(backupUser, -destinationLength).whenComplete((ignored, ignoredEx) -> {
return this.backupsDb.trackMedia(backupUser, -1, -destinationLength).whenComplete((ignored, ignoredEx) -> {
throw ExceptionUtils.wrap(unwrapped);
});
})
Expand Down Expand Up @@ -335,8 +334,7 @@ public CompletionStage<ListMediaResult> list(
.withDescription("credential does not support list operation")
.asRuntimeException();
}
final String mediaPrefix = "%s/%s/".formatted(encodeBackupIdForCdn(backupUser), MEDIA_DIRECTORY_NAME);
return remoteStorageManager.list(mediaPrefix, cursor, limit)
return remoteStorageManager.list(cdnMediaDirectory(backupUser), cursor, limit)
.thenApply(result ->
new ListMediaResult(
result
Expand All @@ -352,6 +350,74 @@ public CompletionStage<ListMediaResult> list(
));
}


private sealed interface Either permits DeleteSuccess, DeleteFailure {}

private record DeleteSuccess(long usage) implements Either {}

private record DeleteFailure(Throwable e) implements Either {}

public CompletableFuture<Void> delete(final AuthenticatedBackupUser backupUser,
final List<StorageDescriptor> storageDescriptors) {
if (backupUser.backupTier().compareTo(BackupTier.MESSAGES) < 0) {
Metrics.counter(ZK_AUTHZ_FAILURE_COUNTER_NAME).increment();
throw Status.PERMISSION_DENIED
.withDescription("credential does not support list operation")
.asRuntimeException();
}

if (storageDescriptors.stream().anyMatch(sd -> sd.cdn() != remoteStorageManager.cdnNumber())) {
throw Status.INVALID_ARGUMENT
.withDescription("unsupported media cdn provided")
.asRuntimeException();
}

return Flux
.fromIterable(storageDescriptors)

// Issue deletes for all storage descriptors (proceeds with default flux concurrency)
.flatMap(descriptor -> Mono.fromCompletionStage(
remoteStorageManager
.delete(cdnMediaPath(backupUser, descriptor.key))
// Squash errors/success into a single type
.handle((bytesDeleted, throwable) -> throwable != null
? new DeleteFailure(throwable)
: new DeleteSuccess(bytesDeleted))
))

// Update backupsDb with the change in usage
.collectList()
.<Void>flatMap(eithers -> {
// count up usage changes
long totalBytesDeleted = 0;
long totalCountDeleted = 0;
final List<Throwable> toThrow = new ArrayList<>();
for (Either either : eithers) {
switch (either) {
case DeleteFailure f:
toThrow.add(f.e());
break;
case DeleteSuccess s when s.usage() > 0:
totalBytesDeleted += s.usage();
totalCountDeleted++;
break;
default:
break;
}
}
final Mono<Void> result = toThrow.isEmpty()
? Mono.empty()
: Mono.error(toThrow.stream().reduce((t1, t2) -> {
t1.addSuppressed(t2);
return t1;
}).get());
return Mono
.fromCompletionStage(this.backupsDb.trackMedia(backupUser, -totalCountDeleted, -totalBytesDeleted))
.then(result);
})
.toFuture();
}

/**
* Authenticate the ZK anonymous backup credential's presentation
* <p>
Expand Down Expand Up @@ -452,12 +518,25 @@ static String encodeBackupIdForCdn(final AuthenticatedBackupUser backupUser) {
return encodeForCdn(BackupsDb.hashedBackupId(backupUser.backupId()));
}

private static String encodeForCdn(final byte[] bytes) {
@VisibleForTesting
static String encodeForCdn(final byte[] bytes) {
return Base64.getUrlEncoder().encodeToString(bytes);
}

private static byte[] decodeFromCdn(final String base64) {
return Base64.getUrlDecoder().decode(base64);
}

private static String cdnMessageBackupName(final AuthenticatedBackupUser backupUser) {
return "%s/%s".formatted(encodeBackupIdForCdn(backupUser), MESSAGE_BACKUP_NAME);
}

private static String cdnMediaDirectory(final AuthenticatedBackupUser backupUser) {
return "%s/%s/".formatted(encodeBackupIdForCdn(backupUser), MEDIA_DIRECTORY_NAME);
}

private static String cdnMediaPath(final AuthenticatedBackupUser backupUser, final byte[] mediaId) {
return "%s%s".formatted(cdnMediaDirectory(backupUser), encodeForCdn(mediaId));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -126,18 +126,19 @@ CompletableFuture<Optional<byte[]>> retrievePublicKey(byte[] backupId) {
* Update the quota in the backup table
*
* @param backupUser The backup user
* @param mediaLength The length of the media after encryption. A negative length implies the media is being removed
* @param mediaBytesDelta The length of the media after encryption. A negative length implies media being removed
* @param mediaCountDelta The number of media objects being added, or if negative, removed
* @return A stage that completes successfully once the table are updated.
*/
CompletableFuture<Void> trackMedia(final AuthenticatedBackupUser backupUser, final int mediaLength) {
CompletableFuture<Void> trackMedia(final AuthenticatedBackupUser backupUser, final long mediaCountDelta, final long mediaBytesDelta) {
final Instant now = clock.instant();
return dynamoClient
.updateItem(
// Update the media quota and TTL
UpdateBuilder.forUser(backupTableName, backupUser)
.setRefreshTimes(now)
.incrementMediaBytes(mediaLength)
.incrementMediaCount(Integer.signum(mediaLength))
.incrementMediaBytes(mediaBytesDelta)
.incrementMediaCount(mediaCountDelta)
.updateItemBuilder()
.build())
.thenRun(Util.NOOP);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,10 @@ private static ExternalServiceCredentialsGenerator credentialsGenerator(final Cl
.build();
}

public MessageBackupUploadDescriptor generateUpload(final String hashedBackupId, final String objectName) {
if (hashedBackupId.isBlank() || objectName.isBlank()) {
public MessageBackupUploadDescriptor generateUpload(final String key) {
if (key.isBlank()) {
throw new IllegalArgumentException("Upload descriptors must have non-empty keys");
}
final String key = "%s/%s".formatted(hashedBackupId, objectName);
final String entity = WRITE_ENTITY_PREFIX + key;
final ExternalServiceCredentials credentials = credentialsGenerator.generateFor(entity);
final String b64Key = Base64.getEncoder().encodeToString(key.getBytes(StandardCharsets.UTF_8));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.whispersystems.textsecuregcm.backup;

import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
Expand All @@ -21,8 +23,6 @@
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import javax.ws.rs.core.Response;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -168,10 +168,7 @@ public CompletionStage<ListResult> list(
cursor.ifPresent(s -> queryParams.put("cursor", cursor.get()));

final HttpRequest request = HttpRequest.newBuilder().GET()
.uri(URI.create("%s/%s/%s".formatted(
storageManagerBaseUrl,
Cdn3BackupCredentialGenerator.CDN_PATH,
HttpUtils.queryParamString(queryParams.entrySet()))))
.uri(URI.create("%s%s".formatted(listUrl(), HttpUtils.queryParamString(queryParams.entrySet()))))
.header(CLIENT_ID_HEADER, clientId)
.header(CLIENT_SECRET_HEADER, clientSecret)
.build();
Expand Down Expand Up @@ -226,12 +223,13 @@ private static ListResult parseListResponse(final HttpResponse<InputStream> http
*/
record UsageResponse(@NotNull long numObjects, @NotNull long bytesUsed) {}


@Override
public CompletionStage<UsageInfo> calculateBytesUsed(final String prefix) {
final Timer.Sample sample = Timer.start();
final HttpRequest request = HttpRequest.newBuilder().GET()
.uri(URI.create("%s/usage%s".formatted(
storageManagerBaseUrl,
.uri(URI.create("%s%s".formatted(
usageUrl(),
HttpUtils.queryParamString(Map.of("prefix", prefix).entrySet()))))
.header(CLIENT_ID_HEADER, clientId)
.header(CLIENT_SECRET_HEADER, clientSecret)
Expand Down Expand Up @@ -260,5 +258,49 @@ private static UsageInfo parseUsageResponse(final HttpResponse<InputStream> http
return new UsageInfo(response.bytesUsed(), response.numObjects);
}

/**
* Serialized delete response from storage manager
*/
record DeleteResponse(@NotNull long bytesDeleted) {}

public CompletionStage<Long> delete(final String key) {
final HttpRequest request = HttpRequest.newBuilder().DELETE()
.uri(URI.create(deleteUrl(key)))
.header(CLIENT_ID_HEADER, clientId)
.header(CLIENT_SECRET_HEADER, clientSecret)
.build();
return this.storageManagerHttpClient.sendAsync(request, HttpResponse.BodyHandlers.ofInputStream())
.thenApply(response -> {
Metrics.counter(STORAGE_MANAGER_STATUS_COUNTER_NAME,
OPERATION_TAG_NAME, "delete",
STATUS_TAG_NAME, Integer.toString(response.statusCode()))
.increment();
try {
return parseDeleteResponse(response);
} catch (IOException e) {
throw ExceptionUtils.wrap(e);
}
});
}

private long parseDeleteResponse(final HttpResponse<InputStream> httpDeleteResponse) throws IOException {
if (!HttpUtils.isSuccessfulResponse(httpDeleteResponse.statusCode())) {
throw new IOException("Failed to retrieve usage: " + httpDeleteResponse.statusCode());
}
return SystemMapper.jsonMapper().readValue(httpDeleteResponse.body(), DeleteResponse.class).bytesDeleted();
}

private String deleteUrl(final String key) {
return "%s/%s/%s".formatted(storageManagerBaseUrl, Cdn3BackupCredentialGenerator.CDN_PATH, key);
}

private String usageUrl() {
return "%s/usage".formatted(storageManagerBaseUrl);
}

private String listUrl() {
return "%s/%s/".formatted(storageManagerBaseUrl, Cdn3BackupCredentialGenerator.CDN_PATH);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,12 @@ record Entry(String key, long length) {}
* @return The number of bytes used
*/
CompletionStage<UsageInfo> calculateBytesUsed(final String prefix);

/**
* Delete the specified object.
*
* @param key the key of the stored object to delete.
* @return the number of bytes freed by the deletion operation
*/
CompletionStage<Long> delete(final String key);
}
Original file line number Diff line number Diff line change
Expand Up @@ -683,4 +683,53 @@ public CompletionStage<ListResponse> listMedia(
.toList(),
result.cursor().orElse(null)));
}

public record DeleteMedia(@Size(min = 1, max = 1000) List<@Valid MediaToDelete> mediaToDelete) {

public record MediaToDelete(
@Schema(description = "The backup cdn where this media object is stored")
@NotNull
Integer cdn,

@Schema(description = "The mediaId of the object in URL-safe base64", implementation = String.class)
@JsonSerialize(using = ByteArrayBase64UrlAdapter.Serializing.class)
@JsonDeserialize(using = ByteArrayBase64UrlAdapter.Deserializing.class)
@NotNull
@ExactlySize(15)
byte[] mediaId
) {}
}

@POST
@Produces(MediaType.APPLICATION_JSON)
@Path("/media/delete")
@Operation(summary = "Delete media objects",
description = "Delete media objects stored with this backup-id")
@ApiResponse(responseCode = "204")
@ApiResponse(responseCode = "429", description = "Rate limited.")
@ApiResponseZkAuth
public CompletionStage<Response> deleteMedia(
@Auth final Optional<AuthenticatedAccount> account,

@Parameter(description = BackupAuthCredentialPresentationHeader.DESCRIPTION, schema = @Schema(implementation = String.class))
@NotNull
@HeaderParam(X_SIGNAL_ZK_AUTH) final BackupAuthCredentialPresentationHeader presentation,

@Parameter(description = BackupAuthCredentialPresentationSignature.DESCRIPTION, schema = @Schema(implementation = String.class))
@NotNull
@HeaderParam(X_SIGNAL_ZK_AUTH_SIGNATURE) final BackupAuthCredentialPresentationSignature signature,

@Valid @NotNull DeleteMedia deleteMedia) {
if (account.isPresent()) {
throw new BadRequestException("must not use authenticated connection for anonymous operations");
}

return backupManager
.authenticateBackupUser(presentation.presentation, signature.signature)
.thenCompose(authenticatedBackupUser -> backupManager.delete(authenticatedBackupUser,
deleteMedia.mediaToDelete().stream()
.map(media -> new BackupManager.StorageDescriptor(media.cdn(), media.mediaId))
.toList()))
.thenApply(Util.ASYNC_EMPTY_RESPONSE);
}
}
Loading

0 comments on commit cc6cf81

Please sign in to comment.