Skip to content

Commit

Permalink
Define asynchronous ProfilesManager operations
Browse files Browse the repository at this point in the history
  • Loading branch information
katherine-signal committed Jul 19, 2023
1 parent 352e1b2 commit ade2e9c
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.util.AsyncTimerUtil;
import org.whispersystems.textsecuregcm.util.AttributeValues;
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
import org.whispersystems.textsecuregcm.util.SystemMapper;
Expand Down Expand Up @@ -521,7 +521,7 @@ public void clearUsernameHash(final Account account) {

@Nonnull
public CompletionStage<Void> updateAsync(final Account account) {
return record(UPDATE_TIMER, () -> {
return AsyncTimerUtil.record(UPDATE_TIMER, () -> {
final UpdateItemRequest updateItemRequest;
try {
// username, e164, and pni cannot be modified through this method
Expand Down Expand Up @@ -676,7 +676,7 @@ public Optional<Account> getByAccountIdentifier(final UUID uuid) {

@Nonnull
public CompletableFuture<Optional<Account>> getByAccountIdentifierAsync(final UUID uuid) {
return record(GET_BY_UUID_TIMER, () -> itemByKeyAsync(accountsTableName, KEY_ACCOUNT_UUID, AttributeValues.fromUUID(uuid))
return AsyncTimerUtil.record(GET_BY_UUID_TIMER, () -> itemByKeyAsync(accountsTableName, KEY_ACCOUNT_UUID, AttributeValues.fromUUID(uuid))
.thenApply(maybeItem -> maybeItem.map(Accounts::fromItem)))
.toCompletableFuture();
}
Expand Down Expand Up @@ -776,7 +776,7 @@ private CompletableFuture<Optional<Account>> getByIndirectLookupAsync(
final AttributeValue keyValue,
final Predicate<? super Map<String, AttributeValue>> predicate) {

return record(timer, () -> itemByKeyAsync(tableName, keyName, keyValue)
return AsyncTimerUtil.record(timer, () -> itemByKeyAsync(tableName, keyName, keyValue)
.thenCompose(maybeItem -> maybeItem
.filter(predicate)
.map(item -> item.get(KEY_ACCOUNT_UUID))
Expand Down Expand Up @@ -934,12 +934,6 @@ private static TransactWriteItem buildDelete(final String tableName, final Strin
.build())
.build();
}

@Nonnull
private static <T> CompletionStage<T> record(final Timer timer, final Supplier<CompletionStage<T>> toRecord) {
final Timer.Sample sample = Timer.start();
return toRecord.get().whenComplete((ignoreT, ignoreE) -> sample.stop(timer));
}

@Nonnull
private AccountCrawlChunk scanForChunk(final ScanRequest.Builder scanRequestBuilder, final int maxCount, final Timer timer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.whispersystems.textsecuregcm.util.AsyncTimerUtil;
import org.whispersystems.textsecuregcm.util.AttributeValues;
import org.whispersystems.textsecuregcm.util.Util;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
Expand Down Expand Up @@ -95,6 +97,18 @@ public void set(final UUID uuid, final VersionedProfile profile) {
});
}

public CompletableFuture<Void> setAsync(final UUID uuid, final VersionedProfile profile) {
return AsyncTimerUtil.record(SET_PROFILES_TIMER, () -> dynamoDbAsyncClient.updateItem(UpdateItemRequest.builder()
.tableName(tableName)
.key(buildPrimaryKey(uuid, profile.getVersion()))
.updateExpression(buildUpdateExpression(profile))
.expressionAttributeNames(UPDATE_EXPRESSION_ATTRIBUTE_NAMES)
.expressionAttributeValues(buildUpdateExpressionAttributeValues(profile))
.build()
).thenRun(Util.NOOP)
).toCompletableFuture();
}

private static Map<String, AttributeValue> buildPrimaryKey(final UUID uuid, final String version) {
return Map.of(
KEY_ACCOUNT_UUID, AttributeValues.fromUUID(uuid),
Expand Down Expand Up @@ -198,6 +212,17 @@ public Optional<VersionedProfile> get(final UUID uuid, final String version) {
});
}

public CompletableFuture<Optional<VersionedProfile>> getAsync(final UUID uuid, final String version) {
return AsyncTimerUtil.record(GET_PROFILE_TIMER, () -> dynamoDbAsyncClient.getItem(GetItemRequest.builder()
.tableName(tableName)
.key(buildPrimaryKey(uuid, version))
.consistentRead(true)
.build())
.thenApply(response ->
response.hasItem() ? Optional.of(fromItem(response.item())) : Optional.<VersionedProfile>empty())
).toCompletableFuture();
}

private static VersionedProfile fromItem(final Map<String, AttributeValue> item) {
return new VersionedProfile(
AttributeValues.getString(item, ATTR_VERSION, null),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@
import java.io.IOException;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.textsecuregcm.util.Util;
import javax.annotation.Nullable;

public class ProfilesManager {

Expand All @@ -26,6 +29,7 @@ public class ProfilesManager {
private final FaultTolerantRedisCluster cacheCluster;
private final ObjectMapper mapper;


public ProfilesManager(final Profiles profiles,
final FaultTolerantRedisCluster cacheCluster) {
this.profiles = profiles;
Expand All @@ -34,52 +38,105 @@ public ProfilesManager(final Profiles profiles,
}

public void set(UUID uuid, VersionedProfile versionedProfile) {
memcacheSet(uuid, versionedProfile);
redisSet(uuid, versionedProfile);
profiles.set(uuid, versionedProfile);
}

public CompletableFuture<Void> setAsync(UUID uuid, VersionedProfile versionedProfile) {
return profiles.setAsync(uuid, versionedProfile)
.thenCompose(ignored -> redisSetAsync(uuid, versionedProfile));
}

public void deleteAll(UUID uuid) {
memcacheDelete(uuid);
redisDelete(uuid);
profiles.deleteAll(uuid);
}

public Optional<VersionedProfile> get(UUID uuid, String version) {
Optional<VersionedProfile> profile = memcacheGet(uuid, version);
Optional<VersionedProfile> profile = redisGet(uuid, version);

if (profile.isEmpty()) {
profile = profiles.get(uuid, version);
profile.ifPresent(versionedProfile -> memcacheSet(uuid, versionedProfile));
profile.ifPresent(versionedProfile -> redisSet(uuid, versionedProfile));
}

return profile;
}

private void memcacheSet(UUID uuid, VersionedProfile profile) {
public CompletableFuture<Optional<VersionedProfile>> getAsync(UUID uuid, String version) {
return redisGetAsync(uuid, version)
.thenCompose(maybeVersionedProfile -> maybeVersionedProfile
.map(versionedProfile -> CompletableFuture.completedFuture(maybeVersionedProfile))
.orElseGet(() -> profiles.getAsync(uuid, version)
.thenCompose(maybeVersionedProfileFromDynamo -> maybeVersionedProfileFromDynamo
.map(profile -> redisSetAsync(uuid, profile).thenApply(ignored -> maybeVersionedProfileFromDynamo))
.orElseGet(() -> CompletableFuture.completedFuture(maybeVersionedProfileFromDynamo)))));
}

private void redisSet(UUID uuid, VersionedProfile profile) {
try {
final String profileJson = mapper.writeValueAsString(profile);

cacheCluster.useCluster(connection -> connection.sync().hset(CACHE_PREFIX + uuid.toString(), profile.getVersion(), profileJson));
cacheCluster.useCluster(connection -> connection.sync().hset(getCacheKey(uuid), profile.getVersion(), profileJson));
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(e);
}
}

private Optional<VersionedProfile> memcacheGet(UUID uuid, String version) {
private CompletableFuture<Void> redisSetAsync(UUID uuid, VersionedProfile profile) {
final String profileJson;

try {
final String json = cacheCluster.withCluster(connection -> connection.sync().hget(CACHE_PREFIX + uuid.toString(), version));
profileJson = mapper.writeValueAsString(profile);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(e);
}

if (json == null) return Optional.empty();
else return Optional.of(mapper.readValue(json, VersionedProfile.class));
} catch (IOException e) {
logger.warn("Error deserializing value...", e);
return Optional.empty();
return cacheCluster.withCluster(connection ->
connection.async().hset(getCacheKey(uuid), profile.getVersion(), profileJson))
.thenRun(Util.NOOP)
.toCompletableFuture();
}

private Optional<VersionedProfile> redisGet(UUID uuid, String version) {
try {
@Nullable final String json = cacheCluster.withCluster(connection -> connection.sync().hget(getCacheKey(uuid), version));

return parseProfileJson(json);
} catch (RedisException e) {
logger.warn("Redis exception", e);
return Optional.empty();
}
}

private void memcacheDelete(UUID uuid) {
cacheCluster.useCluster(connection -> connection.sync().del(CACHE_PREFIX + uuid.toString()));
private CompletableFuture<Optional<VersionedProfile>> redisGetAsync(UUID uuid, String version) {
return cacheCluster.withCluster(connection ->
connection.async().hget(getCacheKey(uuid), version))
.thenApply(this::parseProfileJson)
.exceptionally(throwable -> {
logger.warn("Failed to read versioned profile from Redis", throwable);
return Optional.empty();
})
.toCompletableFuture();
}

private Optional<VersionedProfile> parseProfileJson(@Nullable final String maybeJson) {
try {
if (maybeJson != null) {
return Optional.of(mapper.readValue(maybeJson, VersionedProfile.class));
}
return Optional.empty();
} catch (final IOException e) {
logger.warn("Error deserializing value...", e);
return Optional.empty();
}
}

private void redisDelete(UUID uuid) {
cacheCluster.useCluster(connection -> connection.sync().del(getCacheKey(uuid)));
}

private String getCacheKey(UUID uuid) {
return CACHE_PREFIX + uuid.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.whispersystems.textsecuregcm.util;

import io.micrometer.core.instrument.Timer;
import javax.annotation.Nonnull;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;

public class AsyncTimerUtil {
@Nonnull
public static <T> CompletionStage<T> record(final Timer timer, final Supplier<CompletionStage<T>> toRecord) {
final Timer.Sample sample = Timer.start();
return toRecord.get().whenComplete((ignoreT, ignoreE) -> sample.stop(timer));
}

}
Loading

0 comments on commit ade2e9c

Please sign in to comment.