Skip to content

Commit

Permalink
initial grpc service code in chat
Browse files Browse the repository at this point in the history
  • Loading branch information
jkt-signal committed Jun 27, 2023
1 parent cc3cab9 commit 8d995e4
Show file tree
Hide file tree
Showing 15 changed files with 545 additions and 189 deletions.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@
<goal>compile</goal>
<goal>compile-custom</goal>
<goal>test-compile</goal>
<goal>test-compile-custom</goal>
</goals>
</execution>
</executions>
Expand Down
2 changes: 2 additions & 0 deletions service/config/sample.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ adminEventLoggingConfiguration:
projectId: some-project-id
logName: some-log-name

grpcPort: 8080

stripe:
apiKey: secret://stripe.apiKey
idempotencyKeyGenerator: secret://stripe.idempotencyKeyGenerator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,11 @@ public class WhisperServerConfiguration extends Configuration {
@JsonProperty
private TurnSecretConfiguration turn;

@Valid
@NotNull
@JsonProperty
private int grpcPort;

public AdminEventLoggingConfiguration getAdminEventLoggingConfiguration() {
return adminEventLoggingConfiguration;
}
Expand Down Expand Up @@ -448,4 +453,9 @@ public RegistrationServiceConfiguration getRegistrationServiceConfiguration() {
public TurnSecretConfiguration getTurnSecretConfiguration() {
return turn;
}

public int getGrpcPort() {
return grpcPort;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
import io.dropwizard.auth.basic.BasicCredentials;
import io.dropwizard.setup.Bootstrap;
import io.dropwizard.setup.Environment;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.lettuce.core.resource.ClientResources;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.binder.grpc.MetricCollectingServerInterceptor;
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
import java.io.ByteArrayInputStream;
import java.net.http.HttpClient;
Expand Down Expand Up @@ -105,6 +108,8 @@
import org.whispersystems.textsecuregcm.currency.CoinMarketCapClient;
import org.whispersystems.textsecuregcm.currency.CurrencyConversionManager;
import org.whispersystems.textsecuregcm.currency.FixerClient;
import org.whispersystems.textsecuregcm.grpc.GrpcServerManagedWrapper;
import org.whispersystems.textsecuregcm.grpc.UserAgentInterceptor;
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
import org.whispersystems.textsecuregcm.filters.RemoteDeprecationFilter;
import org.whispersystems.textsecuregcm.filters.RequestStatisticsFilter;
Expand Down Expand Up @@ -626,10 +631,22 @@ public void run(WhisperServerConfiguration config, Environment environment) thro
AuthFilter<BasicCredentials, DisabledPermittedAuthenticatedAccount> disabledPermittedAccountAuthFilter = new BasicCredentialAuthFilter.Builder<DisabledPermittedAuthenticatedAccount>().setAuthenticator(
disabledPermittedAccountAuthenticator).buildAuthFilter();

final ServerBuilder<?> grpcServer = ServerBuilder.forPort(config.getGrpcPort())
.intercept(new MetricCollectingServerInterceptor(Metrics.globalRegistry)); /* TODO: specialize metrics with user-agent platform */

RemoteDeprecationFilter remoteDeprecationFilter = new RemoteDeprecationFilter(dynamicConfigurationManager);
environment.servlets()
.addFilter("RemoteDeprecationFilter", new RemoteDeprecationFilter(dynamicConfigurationManager))
.addFilter("RemoteDeprecationFilter", remoteDeprecationFilter)
.addMappingForUrlPatterns(EnumSet.of(DispatcherType.REQUEST), false, "/*");

// Note: interceptors run in the reverse order they are added; the remote deprecation filter
// depends on the user-agent context so it has to come first here!
// http://grpc.github.io/grpc-java/javadoc/io/grpc/ServerBuilder.html#intercept-io.grpc.ServerInterceptor-
grpcServer.intercept(remoteDeprecationFilter);
grpcServer.intercept(new UserAgentInterceptor());

environment.lifecycle().manage(new GrpcServerManagedWrapper(grpcServer.build()));

environment.jersey().register(new RequestStatisticsFilter(TrafficSource.HTTP));
environment.jersey().register(MultiRecipientMessageProvider.class);
environment.jersey().register(new MetricsApplicationEventListener(TrafficSource.HTTP));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,15 @@

import static com.codahale.metrics.MetricRegistry.name;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.net.HttpHeaders;
import com.vdurmont.semver4j.Semver;

import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import io.micrometer.core.instrument.Metrics;
import java.io.IOException;
import java.util.Map;
Expand All @@ -22,6 +29,8 @@
import javax.servlet.http.HttpServletResponse;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicRemoteDeprecationConfiguration;
import org.whispersystems.textsecuregcm.grpc.StatusConstants;
import org.whispersystems.textsecuregcm.grpc.UserAgentInterceptor;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.util.ua.ClientPlatform;
import org.whispersystems.textsecuregcm.util.ua.UnrecognizedUserAgentException;
Expand All @@ -34,7 +43,7 @@
* If a client platform does not have a configured minimum version, all traffic from that client
* platform is allowed.
*/
public class RemoteDeprecationFilter implements Filter {
public class RemoteDeprecationFilter implements Filter, ServerInterceptor {

private final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager;

Expand All @@ -52,58 +61,82 @@ public RemoteDeprecationFilter(final DynamicConfigurationManager<DynamicConfigur

@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
final String userAgentString = ((HttpServletRequest) request).getHeader(HttpHeaders.USER_AGENT);

UserAgent userAgent;
try {
userAgent = UserAgentUtil.parseUserAgentString(userAgentString);
} catch (final UnrecognizedUserAgentException e) {
userAgent = null;
}

if (shouldBlock(userAgent)) {
((HttpServletResponse) response).sendError(499);
} else {
chain.doFilter(request, response);
}
}

@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
final ServerCall<ReqT, RespT> call,
final Metadata headers,
final ServerCallHandler<ReqT, RespT> next) {

if (shouldBlock(UserAgentUtil.userAgentFromGrpcContext())) {
call.close(StatusConstants.UPGRADE_NEEDED_STATUS, new Metadata());
return new ServerCall.Listener<>() {};
} else {
return next.startCall(call, headers);
}
}

private boolean shouldBlock(final UserAgent userAgent) {
final DynamicRemoteDeprecationConfiguration configuration = dynamicConfigurationManager
.getConfiguration().getRemoteDeprecationConfiguration();

final Map<ClientPlatform, Semver> minimumVersionsByPlatform = configuration.getMinimumVersions();
final Map<ClientPlatform, Semver> versionsPendingDeprecationByPlatform = configuration.getVersionsPendingDeprecation();
final Map<ClientPlatform, Semver> versionsPendingDeprecationByPlatform = configuration
.getVersionsPendingDeprecation();
final Map<ClientPlatform, Set<Semver>> blockedVersionsByPlatform = configuration.getBlockedVersions();
final Map<ClientPlatform, Set<Semver>> versionsPendingBlockByPlatform = configuration.getVersionsPendingBlock();
final boolean allowUnrecognizedUserAgents = configuration.isUnrecognizedUserAgentAllowed();

boolean shouldBlock = false;

try {
final String userAgentString = ((HttpServletRequest) request).getHeader(HttpHeaders.USER_AGENT);
final UserAgent userAgent = UserAgentUtil.parseUserAgentString(userAgentString);

if (blockedVersionsByPlatform.containsKey(userAgent.getPlatform())) {
if (blockedVersionsByPlatform.get(userAgent.getPlatform()).contains(userAgent.getVersion())) {
recordDeprecation(userAgent, BLOCKED_CLIENT_REASON);
shouldBlock = true;
}
if (userAgent == null) {
if (configuration.isUnrecognizedUserAgentAllowed()) {
return false;
}
recordDeprecation(null, UNRECOGNIZED_UA_REASON);
return true;
}

if (minimumVersionsByPlatform.containsKey(userAgent.getPlatform())) {
if (userAgent.getVersion().isLowerThan(minimumVersionsByPlatform.get(userAgent.getPlatform()))) {
recordDeprecation(userAgent, EXPIRED_CLIENT_REASON);
shouldBlock = true;
}
if (blockedVersionsByPlatform.containsKey(userAgent.getPlatform())) {
if (blockedVersionsByPlatform.get(userAgent.getPlatform()).contains(userAgent.getVersion())) {
recordDeprecation(userAgent, BLOCKED_CLIENT_REASON);
shouldBlock = true;
}
}

if (versionsPendingBlockByPlatform.containsKey(userAgent.getPlatform())) {
if (versionsPendingBlockByPlatform.get(userAgent.getPlatform()).contains(userAgent.getVersion())) {
recordPendingDeprecation(userAgent, BLOCKED_CLIENT_REASON);
}
if (minimumVersionsByPlatform.containsKey(userAgent.getPlatform())) {
if (userAgent.getVersion().isLowerThan(minimumVersionsByPlatform.get(userAgent.getPlatform()))) {
recordDeprecation(userAgent, EXPIRED_CLIENT_REASON);
shouldBlock = true;
}
}

if (versionsPendingDeprecationByPlatform.containsKey(userAgent.getPlatform())) {
if (userAgent.getVersion().isLowerThan(versionsPendingDeprecationByPlatform.get(userAgent.getPlatform()))) {
recordPendingDeprecation(userAgent, EXPIRED_CLIENT_REASON);
}
}
} catch (final UnrecognizedUserAgentException e) {
if (!allowUnrecognizedUserAgents) {
recordDeprecation(null, UNRECOGNIZED_UA_REASON);
shouldBlock = true;
if (versionsPendingBlockByPlatform.containsKey(userAgent.getPlatform())) {
if (versionsPendingBlockByPlatform.get(userAgent.getPlatform()).contains(userAgent.getVersion())) {
recordPendingDeprecation(userAgent, BLOCKED_CLIENT_REASON);
}
}

if (shouldBlock) {
((HttpServletResponse) response).sendError(499);
} else {
chain.doFilter(request, response);
if (versionsPendingDeprecationByPlatform.containsKey(userAgent.getPlatform())) {
if (userAgent.getVersion().isLowerThan(versionsPendingDeprecationByPlatform.get(userAgent.getPlatform()))) {
recordPendingDeprecation(userAgent, EXPIRED_CLIENT_REASON);
}
}

return shouldBlock;
}

private void recordDeprecation(final UserAgent userAgent, final String reason) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2023 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/

package org.whispersystems.textsecuregcm.grpc;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import io.dropwizard.lifecycle.Managed;
import io.grpc.Server;

public class GrpcServerManagedWrapper implements Managed {

private final Server server;

public GrpcServerManagedWrapper(final Server server) {
this.server = server;
}

@Override
public void start() throws IOException {
server.start();
}

@Override
public void stop() {
try {
server.shutdown().awaitTermination(5, TimeUnit.MINUTES);
} catch (InterruptedException e) {
server.shutdownNow();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright 2023 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/

package org.whispersystems.textsecuregcm.grpc;

import io.grpc.Status;

public abstract class StatusConstants {
public static final Status UPGRADE_NEEDED_STATUS = Status.INVALID_ARGUMENT.withDescription("signal-upgrade-required");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2023 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/

package org.whispersystems.textsecuregcm.grpc;

import com.google.common.annotations.VisibleForTesting;

import org.whispersystems.textsecuregcm.util.ua.UnrecognizedUserAgentException;
import org.whispersystems.textsecuregcm.util.ua.UserAgent;
import org.whispersystems.textsecuregcm.util.ua.UserAgentUtil;

import io.grpc.Context;
import io.grpc.Contexts;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;

public class UserAgentInterceptor implements ServerInterceptor {
@VisibleForTesting
public static final Metadata.Key<String> USER_AGENT_GRPC_HEADER =
Metadata.Key.of("user-agent", Metadata.ASCII_STRING_MARSHALLER);

@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(final ServerCall<ReqT, RespT> call,
final Metadata headers,
final ServerCallHandler<ReqT, RespT> next) {

UserAgent userAgent;
try {
userAgent = UserAgentUtil.parseUserAgentString(headers.get(USER_AGENT_GRPC_HEADER));
} catch (final UnrecognizedUserAgentException e) {
userAgent = null;
}

final Context context = Context.current().withValue(UserAgentUtil.USER_AGENT_CONTEXT_KEY, userAgent);
return Contexts.interceptCall(context, call, headers, next);
}

}
Loading

0 comments on commit 8d995e4

Please sign in to comment.