Skip to content

Commit

Permalink
gRPC API for payments service
Browse files Browse the repository at this point in the history
  • Loading branch information
sergey-signal committed Sep 14, 2023
1 parent 8e598c1 commit 9f3ffa3
Show file tree
Hide file tree
Showing 6 changed files with 215 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@
import org.whispersystems.textsecuregcm.grpc.GrpcServerManagedWrapper;
import org.whispersystems.textsecuregcm.grpc.KeysAnonymousGrpcService;
import org.whispersystems.textsecuregcm.grpc.KeysGrpcService;
import org.whispersystems.textsecuregcm.grpc.PaymentsGrpcService;
import org.whispersystems.textsecuregcm.grpc.ProfileAnonymousGrpcService;
import org.whispersystems.textsecuregcm.grpc.ProfileGrpcService;
import org.whispersystems.textsecuregcm.grpc.UserAgentInterceptor;
Expand Down Expand Up @@ -588,7 +589,7 @@ public void run(WhisperServerConfiguration config, Environment environment) thro
FixerClient fixerClient = new FixerClient(currencyClient, config.getPaymentsServiceConfiguration().fixerApiKey().value());
CoinMarketCapClient coinMarketCapClient = new CoinMarketCapClient(currencyClient, config.getPaymentsServiceConfiguration().coinMarketCapApiKey().value(), config.getPaymentsServiceConfiguration().coinMarketCapCurrencyIds());
CurrencyConversionManager currencyManager = new CurrencyConversionManager(fixerClient, coinMarketCapClient,
cacheCluster, config.getPaymentsServiceConfiguration().paymentCurrencies(), Clock.systemUTC());
cacheCluster, config.getPaymentsServiceConfiguration().paymentCurrencies(), recurringJobExecutor, Clock.systemUTC());

environment.lifecycle().manage(apnSender);
environment.lifecycle().manage(apnPushNotificationScheduler);
Expand Down Expand Up @@ -644,6 +645,7 @@ public void run(WhisperServerConfiguration config, Environment environment) thro
final ServerBuilder<?> grpcServer = ServerBuilder.forPort(config.getGrpcPort())
.addService(ServerInterceptors.intercept(new KeysGrpcService(accountsManager, keys, rateLimiters), basicCredentialAuthenticationInterceptor))
.addService(new KeysAnonymousGrpcService(accountsManager, keys))
.addService(new PaymentsGrpcService(currencyManager))
.addService(ServerInterceptors.intercept(new ProfileGrpcService(clock, accountsManager, profilesManager, dynamicConfigurationManager,
config.getBadges(), asyncCdnS3Client, profileCdnPolicyGenerator, profileCdnPolicySigner, profileBadgeConverter, rateLimiters, zkProfileOperations, config.getCdnConfiguration().bucket()), basicCredentialAuthenticationInterceptor))
.addService(new ProfileAnonymousGrpcService(accountsManager, profilesManager, profileBadgeConverter, zkProfileOperations));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* Copyright 2023 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/

package org.whispersystems.textsecuregcm.currency;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -13,13 +18,14 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.CurrencyConversionEntity;
import org.whispersystems.textsecuregcm.entities.CurrencyConversionEntityList;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.util.Util;

public class CurrencyConversionManager implements Managed {

Expand All @@ -32,31 +38,42 @@ public class CurrencyConversionManager implements Managed {

@VisibleForTesting
static final String COIN_MARKET_CAP_SHARED_CACHE_CURRENT_KEY = "CurrencyConversionManager::CoinMarketCapCacheCurrent";

private static final String COIN_MARKET_CAP_SHARED_CACHE_DATA_KEY = "CurrencyConversionManager::CoinMarketCapCacheData";

private final FixerClient fixerClient;
private final FixerClient fixerClient;

private final CoinMarketCapClient coinMarketCapClient;

private final FaultTolerantRedisCluster cacheCluster;

private final Clock clock;

private final List<String> currencies;

private final ScheduledExecutorService executor;

private final AtomicReference<CurrencyConversionEntityList> cached = new AtomicReference<>(null);

private Instant fixerUpdatedTimestamp = Instant.MIN;

private Map<String, BigDecimal> cachedFixerValues;

private Map<String, BigDecimal> cachedCoinMarketCapValues;

public CurrencyConversionManager(final FixerClient fixerClient,

public CurrencyConversionManager(
final FixerClient fixerClient,
final CoinMarketCapClient coinMarketCapClient,
final FaultTolerantRedisCluster cacheCluster,
final List<String> currencies,
final ScheduledExecutorService executor,
final Clock clock) {
this.fixerClient = fixerClient;
this.coinMarketCapClient = coinMarketCapClient;
this.cacheCluster = cacheCluster;
this.currencies = currencies;
this.executor = executor;
this.clock = clock;
}

Expand All @@ -66,22 +83,13 @@ public Optional<CurrencyConversionEntityList> getCurrencyConversions() {

@Override
public void start() throws Exception {
new Thread(() -> {
for (;;) {
try {
updateCacheIfNecessary();
} catch (Throwable t) {
logger.warn("Error updating currency conversions", t);
}

Util.sleep(15000);
executor.scheduleAtFixedRate(() -> {
try {
updateCacheIfNecessary();
} catch (Throwable t) {
logger.warn("Error updating currency conversions", t);
}
}).start();
}

@Override
public void stop() throws Exception {

}, 0, 15, TimeUnit.SECONDS);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2023 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/

package org.whispersystems.textsecuregcm.grpc;

import static java.util.Objects.requireNonNull;

import io.grpc.Status;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.tuple.Pair;
import org.signal.chat.payments.GetCurrencyConversionsRequest;
import org.signal.chat.payments.GetCurrencyConversionsResponse;
import org.signal.chat.payments.ReactorPaymentsGrpc;
import org.whispersystems.textsecuregcm.auth.grpc.AuthenticationUtil;
import org.whispersystems.textsecuregcm.currency.CurrencyConversionManager;
import org.whispersystems.textsecuregcm.entities.CurrencyConversionEntityList;
import reactor.core.publisher.Mono;

public class PaymentsGrpcService extends ReactorPaymentsGrpc.PaymentsImplBase {

private final CurrencyConversionManager currencyManager;


public PaymentsGrpcService(final CurrencyConversionManager currencyManager) {
this.currencyManager = requireNonNull(currencyManager);
}

@Override
public Mono<GetCurrencyConversionsResponse> getCurrencyConversions(final GetCurrencyConversionsRequest request) {
AuthenticationUtil.requireAuthenticatedDevice();

final CurrencyConversionEntityList currencyConversionEntityList = currencyManager
.getCurrencyConversions()
.orElseThrow(Status.UNAVAILABLE::asRuntimeException);

final List<GetCurrencyConversionsResponse.CurrencyConversionEntity> currencyConversionEntities = currencyConversionEntityList
.getCurrencies()
.stream()
.map(cce -> GetCurrencyConversionsResponse.CurrencyConversionEntity.newBuilder()
.setBase(cce.getBase())
.putAllConversions(transformBigDecimalsToStrings(cce.getConversions()))
.build())
.toList();

return Mono.just(GetCurrencyConversionsResponse.newBuilder()
.addAllCurrencies(currencyConversionEntities).setTimestamp(currencyConversionEntityList.getTimestamp())
.build());
}

@Nonnull
private static Map<String, String> transformBigDecimalsToStrings(final Map<String, BigDecimal> conversions) {
AuthenticationUtil.requireAuthenticatedDevice();
return conversions.entrySet().stream()
.map(e -> Pair.of(e.getKey(), e.getValue().toString()))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
}
}
36 changes: 36 additions & 0 deletions service/src/main/proto/org/signal/chat/payments.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2023 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/

syntax = "proto3";

option java_multiple_files = true;

package org.signal.chat.payments;

/**
* Provides methods for working with payments.
*/
service Payments {
/**
*/
rpc GetCurrencyConversions(GetCurrencyConversionsRequest) returns (GetCurrencyConversionsResponse) {}
}

message GetCurrencyConversionsRequest {
}

message GetCurrencyConversionsResponse {

message CurrencyConversionEntity {

string base = 1;

map<string, string> conversions = 2;
}

uint64 timestamp = 1;

repeated CurrencyConversionEntity currencies = 2;
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* Copyright 2023 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/

package org.whispersystems.textsecuregcm.currency;

import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -12,6 +17,8 @@
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.whispersystems.textsecuregcm.entities.CurrencyConversionEntityList;
Expand All @@ -22,6 +29,8 @@ class CurrencyConversionManagerTest {
@RegisterExtension
static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build();

static final ScheduledExecutorService EXECUTOR = Executors.newSingleThreadScheduledExecutor();

@Test
void testCurrencyCalculations() throws IOException {
FixerClient fixerClient = mock(FixerClient.class);
Expand All @@ -35,7 +44,7 @@ void testCurrencyCalculations() throws IOException {
));

CurrencyConversionManager manager = new CurrencyConversionManager(fixerClient, coinMarketCapClient, REDIS_CLUSTER_EXTENSION.getRedisCluster(),
List.of("FOO"), Clock.systemUTC());
List.of("FOO"), EXECUTOR, Clock.systemUTC());

manager.updateCacheIfNecessary();

Expand Down Expand Up @@ -64,7 +73,7 @@ void testCurrencyCalculations_noTrailingZeros() throws IOException {
));

CurrencyConversionManager manager = new CurrencyConversionManager(fixerClient, coinMarketCapClient, REDIS_CLUSTER_EXTENSION.getRedisCluster(),
List.of("FOO"), Clock.systemUTC());
List.of("FOO"), EXECUTOR, Clock.systemUTC());

manager.updateCacheIfNecessary();

Expand Down Expand Up @@ -93,7 +102,7 @@ void testCurrencyCalculations_accuracy() throws IOException {
));

CurrencyConversionManager manager = new CurrencyConversionManager(fixerClient, coinMarketCapClient, REDIS_CLUSTER_EXTENSION.getRedisCluster(),
List.of("FOO"), Clock.systemUTC());
List.of("FOO"), EXECUTOR, Clock.systemUTC());

manager.updateCacheIfNecessary();

Expand Down Expand Up @@ -122,7 +131,7 @@ void testCurrencyCalculationsTimeoutNoRun() throws IOException {
));

CurrencyConversionManager manager = new CurrencyConversionManager(fixerClient, coinMarketCapClient, REDIS_CLUSTER_EXTENSION.getRedisCluster(),
List.of("FOO"), Clock.systemUTC());
List.of("FOO"), EXECUTOR, Clock.systemUTC());

manager.updateCacheIfNecessary();

Expand Down Expand Up @@ -154,7 +163,7 @@ void testCurrencyCalculationsCoinMarketCapTimeoutWithRun() throws IOException {
));

CurrencyConversionManager manager = new CurrencyConversionManager(fixerClient, coinMarketCapClient, REDIS_CLUSTER_EXTENSION.getRedisCluster(),
List.of("FOO"), Clock.systemUTC());
List.of("FOO"), EXECUTOR, Clock.systemUTC());

manager.updateCacheIfNecessary();

Expand Down Expand Up @@ -195,7 +204,7 @@ void testCurrencyCalculationsFixerTimeoutWithRun() throws IOException {
when(clock.millis()).thenReturn(currentTime.toEpochMilli());

CurrencyConversionManager manager = new CurrencyConversionManager(fixerClient, coinMarketCapClient, REDIS_CLUSTER_EXTENSION.getRedisCluster(),
List.of("FOO"), clock);
List.of("FOO"), EXECUTOR, clock);

manager.updateCacheIfNecessary();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2023 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/

package org.whispersystems.textsecuregcm.grpc;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.when;
import static org.whispersystems.textsecuregcm.grpc.GrpcTestUtils.assertStatusException;

import io.grpc.Status;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.signal.chat.payments.GetCurrencyConversionsRequest;
import org.signal.chat.payments.GetCurrencyConversionsResponse;
import org.signal.chat.payments.PaymentsGrpc;
import org.whispersystems.textsecuregcm.currency.CurrencyConversionManager;
import org.whispersystems.textsecuregcm.entities.CurrencyConversionEntity;
import org.whispersystems.textsecuregcm.entities.CurrencyConversionEntityList;

class PaymentsGrpcServiceTest extends SimpleBaseGrpcTest<PaymentsGrpcService, PaymentsGrpc.PaymentsBlockingStub> {

@Mock
private CurrencyConversionManager currencyManager;

@Override
protected PaymentsGrpcService createServiceBeforeEachTest() {
return new PaymentsGrpcService(currencyManager);
}

@Test
void testGetCurrencyConversions() {
final long timestamp = System.currentTimeMillis();
when(currencyManager.getCurrencyConversions()).thenReturn(Optional.of(
new CurrencyConversionEntityList(List.of(
new CurrencyConversionEntity("FOO", Map.of(
"USD", new BigDecimal("2.35"),
"EUR", new BigDecimal("1.89")
)),
new CurrencyConversionEntity("BAR", Map.of(
"USD", new BigDecimal("1.50"),
"EUR", new BigDecimal("0.98")
))
), timestamp)));

final GetCurrencyConversionsResponse currencyConversions = authenticatedServiceStub().getCurrencyConversions(
GetCurrencyConversionsRequest.newBuilder().build());

assertEquals(timestamp, currencyConversions.getTimestamp());
assertEquals(2, currencyConversions.getCurrenciesCount());
assertEquals("FOO", currencyConversions.getCurrencies(0).getBase());
assertEquals("2.35", currencyConversions.getCurrencies(0).getConversionsMap().get("USD"));
}

@Test
void testUnavailable() {
when(currencyManager.getCurrencyConversions()).thenReturn(Optional.empty());
assertStatusException(Status.UNAVAILABLE, () -> authenticatedServiceStub().getCurrencyConversions(
GetCurrencyConversionsRequest.newBuilder().build()));
}

@Test
public void testUnauthenticated() throws Exception {
assertStatusException(Status.UNAUTHENTICATED, () -> unauthenticatedServiceStub().getCurrencyConversions(
GetCurrencyConversionsRequest.newBuilder().build()));
}
}

0 comments on commit 9f3ffa3

Please sign in to comment.