From fef57dce0d4af0e5e7d208644a9a7c591e4d942e Mon Sep 17 00:00:00 2001 From: Jonathan Klabunde Tomer <125505367+jkt-signal@users.noreply.github.com> Date: Thu, 15 Feb 2024 14:58:43 -0800 Subject: [PATCH] use explicit `Timer` rather than micrometer annotation for send-message latency distribution --- .../controllers/MessageController.java | 261 +++++++++--------- 1 file changed, 136 insertions(+), 125 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java index 78f043ee9..7befad701 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java @@ -6,6 +6,7 @@ import static com.codahale.metrics.MetricRegistry.name; +import com.codahale.metrics.annotation.Timed; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -13,11 +14,12 @@ import com.google.protobuf.ByteString; import io.dropwizard.auth.Auth; import io.dropwizard.util.DataSize; -import io.micrometer.core.annotation.Timed; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Tags; +import io.micrometer.core.instrument.Timer; +import io.micrometer.core.instrument.Timer.Sample; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.media.Content; @@ -104,6 +106,7 @@ import org.whispersystems.textsecuregcm.limits.CardinalityEstimator; import org.whispersystems.textsecuregcm.limits.RateLimiters; import org.whispersystems.textsecuregcm.metrics.MessageMetrics; +import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil; import org.whispersystems.textsecuregcm.providers.MultiRecipientMessageProvider; import org.whispersystems.textsecuregcm.push.MessageSender; @@ -174,6 +177,10 @@ private record MultiRecipientDeliveryData( private static final String REJECT_INVALID_ENVELOPE_TYPE = name(MessageController.class, "rejectInvalidEnvelopeType"); private static final String UNEXPECTED_MISSING_USER_COUNTER_NAME = name(MessageController.class, "unexpectedMissingDestinationForMultiRecipientMessage"); + private static final Timer SEND_MESSAGE_LATENCY_TIMER = + Timer.builder(MetricsUtil.name(MessageController.class, "sendMessageLatency")) + .publishPercentileHistogram(true) + .register(Metrics.globalRegistry); private static final String EPHEMERAL_TAG_NAME = "ephemeral"; private static final String SENDER_TYPE_TAG_NAME = "senderType"; @@ -222,7 +229,7 @@ public MessageController( this.spamChecker = spamChecker; } - @Timed(value = "chat.MessageController.sendMessageLatency", histogram = true) + @Timed @Path("/{destination}") @PUT @Consumes(MediaType.APPLICATION_JSON) @@ -236,155 +243,159 @@ public Response sendMessage(@Auth Optional source, @NotNull @Valid IncomingMessageList messages, @Context ContainerRequestContext context) throws RateLimitExceededException { + final Sample sample = Timer.start(); + try { + if (source.isEmpty() && accessKey.isEmpty() && !isStory) { + throw new WebApplicationException(Response.Status.UNAUTHORIZED); + } - if (source.isEmpty() && accessKey.isEmpty() && !isStory) { - throw new WebApplicationException(Response.Status.UNAUTHORIZED); - } - - final String senderType; - if (source.isPresent()) { - if (source.get().getAccount().isIdentifiedBy(destinationIdentifier)) { - senderType = SENDER_TYPE_SELF; + final String senderType; + if (source.isPresent()) { + if (source.get().getAccount().isIdentifiedBy(destinationIdentifier)) { + senderType = SENDER_TYPE_SELF; + } else { + senderType = SENDER_TYPE_IDENTIFIED; + } } else { - senderType = SENDER_TYPE_IDENTIFIED; + senderType = SENDER_TYPE_UNIDENTIFIED; } - } else { - senderType = SENDER_TYPE_UNIDENTIFIED; - } - boolean isSyncMessage = source.isPresent() && source.get().getAccount().isIdentifiedBy(destinationIdentifier); + boolean isSyncMessage = source.isPresent() && source.get().getAccount().isIdentifiedBy(destinationIdentifier); - if (isSyncMessage && destinationIdentifier.identityType() == IdentityType.PNI) { - throw new WebApplicationException(Status.FORBIDDEN); - } - - Optional destination; - if (!isSyncMessage) { - destination = accountsManager.getByServiceIdentifier(destinationIdentifier); - } else { - destination = source.map(AuthenticatedAccount::getAccount); - } + if (isSyncMessage && destinationIdentifier.identityType() == IdentityType.PNI) { + throw new WebApplicationException(Status.FORBIDDEN); + } - final Optional spamCheck = spamChecker.checkForSpam( - context, source.map(AuthenticatedAccount::getAccount), destination); - if (spamCheck.isPresent()) { - return spamCheck.get(); - } + Optional destination; + if (!isSyncMessage) { + destination = accountsManager.getByServiceIdentifier(destinationIdentifier); + } else { + destination = source.map(AuthenticatedAccount::getAccount); + } - final Optional spamReportToken = switch (senderType) { - case SENDER_TYPE_IDENTIFIED -> - reportSpamTokenProvider.makeReportSpamToken(context, source.get().getAccount(), destination); - default -> Optional.empty(); - }; + final Optional spamCheck = spamChecker.checkForSpam( + context, source.map(AuthenticatedAccount::getAccount), destination); + if (spamCheck.isPresent()) { + return spamCheck.get(); + } - int totalContentLength = 0; + final Optional spamReportToken = switch (senderType) { + case SENDER_TYPE_IDENTIFIED -> + reportSpamTokenProvider.makeReportSpamToken(context, source.get().getAccount(), destination); + default -> Optional.empty(); + }; - for (final IncomingMessage message : messages.messages()) { - int contentLength = 0; + int totalContentLength = 0; - if (StringUtils.isNotEmpty(message.content())) { - contentLength += message.content().length(); - } + for (final IncomingMessage message : messages.messages()) { + int contentLength = 0; - validateContentLength(contentLength, userAgent); - validateEnvelopeType(message.type(), userAgent); + if (StringUtils.isNotEmpty(message.content())) { + contentLength += message.content().length(); + } - totalContentLength += contentLength; - } + validateContentLength(contentLength, userAgent); + validateEnvelopeType(message.type(), userAgent); - try { - rateLimiters.getInboundMessageBytes().validate(destinationIdentifier.uuid(), totalContentLength); - } catch (final RateLimitExceededException e) { - if (dynamicConfigurationManager.getConfiguration().getInboundMessageByteLimitConfiguration().enforceInboundLimit()) { - messageByteLimitEstimator.add(destinationIdentifier.uuid().toString()); - throw e; + totalContentLength += contentLength; } - } - try { - // Stories will be checked by the client; we bypass access checks here for stories. - if (!isStory) { - OptionalAccess.verify(source.map(AuthenticatedAccount::getAccount), accessKey, destination); + try { + rateLimiters.getInboundMessageBytes().validate(destinationIdentifier.uuid(), totalContentLength); + } catch (final RateLimitExceededException e) { + if (dynamicConfigurationManager.getConfiguration().getInboundMessageByteLimitConfiguration().enforceInboundLimit()) { + messageByteLimitEstimator.add(destinationIdentifier.uuid().toString()); + throw e; + } } - boolean needsSync = !isSyncMessage && source.isPresent() && source.get().getAccount().hasEnabledLinkedDevice(); + try { + // Stories will be checked by the client; we bypass access checks here for stories. + if (!isStory) { + OptionalAccess.verify(source.map(AuthenticatedAccount::getAccount), accessKey, destination); + } - // We return 200 when stories are sent to a non-existent account. Since story sends bypass OptionalAccess.verify - // we leak information about whether a destination UUID exists if we return any other code (e.g. 404) from - // these requests. - if (isStory && destination.isEmpty()) { - return Response.ok(new SendMessageResponse(needsSync)).build(); - } + boolean needsSync = !isSyncMessage && source.isPresent() && source.get().getAccount().hasEnabledLinkedDevice(); - // if destination is empty we would either throw an exception in OptionalAccess.verify when isStory is false - // or else return a 200 response when isStory is true. - assert destination.isPresent(); + // We return 200 when stories are sent to a non-existent account. Since story sends bypass OptionalAccess.verify + // we leak information about whether a destination UUID exists if we return any other code (e.g. 404) from + // these requests. + if (isStory && destination.isEmpty()) { + return Response.ok(new SendMessageResponse(needsSync)).build(); + } - if (source.isPresent() && !isSyncMessage) { - checkMessageRateLimit(source.get(), destination.get(), userAgent); - } + // if destination is empty we would either throw an exception in OptionalAccess.verify when isStory is false + // or else return a 200 response when isStory is true. + assert destination.isPresent(); - if (isStory) { - rateLimiters.getStoriesLimiter().validate(destination.get().getUuid()); - } + if (source.isPresent() && !isSyncMessage) { + checkMessageRateLimit(source.get(), destination.get(), userAgent); + } - final Set excludedDeviceIds; + if (isStory) { + rateLimiters.getStoriesLimiter().validate(destination.get().getUuid()); + } - if (isSyncMessage) { - excludedDeviceIds = Set.of(source.get().getAuthenticatedDevice().getId()); - } else { - excludedDeviceIds = Collections.emptySet(); - } + final Set excludedDeviceIds; - DestinationDeviceValidator.validateCompleteDeviceList(destination.get(), - messages.messages().stream().map(IncomingMessage::destinationDeviceId).collect(Collectors.toSet()), - excludedDeviceIds); - - DestinationDeviceValidator.validateRegistrationIds(destination.get(), - messages.messages(), - IncomingMessage::destinationDeviceId, - IncomingMessage::destinationRegistrationId, - destination.get().getPhoneNumberIdentifier().equals(destinationIdentifier.uuid())); - - final List tags = List.of(UserAgentTagUtil.getPlatformTag(userAgent), - Tag.of(EPHEMERAL_TAG_NAME, String.valueOf(messages.online())), - Tag.of(SENDER_TYPE_TAG_NAME, senderType), - Tag.of(IDENTITY_TYPE_TAG_NAME, destinationIdentifier.identityType().name())); - - for (IncomingMessage incomingMessage : messages.messages()) { - Optional destinationDevice = destination.get().getDevice(incomingMessage.destinationDeviceId()); - - if (destinationDevice.isPresent()) { - Metrics.counter(SENT_MESSAGE_COUNTER_NAME, tags).increment(); - sendIndividualMessage( - source, - destination.get(), - destinationDevice.get(), - destinationIdentifier, - messages.timestamp(), - messages.online(), - isStory, - messages.urgent(), - incomingMessage, - userAgent, - spamReportToken); + if (isSyncMessage) { + excludedDeviceIds = Set.of(source.get().getAuthenticatedDevice().getId()); + } else { + excludedDeviceIds = Collections.emptySet(); } - } - return Response.ok(new SendMessageResponse(needsSync)).build(); - } catch (NoSuchUserException e) { - throw new WebApplicationException(Response.status(404).build()); - } catch (MismatchedDevicesException e) { - throw new WebApplicationException(Response.status(409) - .type(MediaType.APPLICATION_JSON_TYPE) - .entity(new MismatchedDevices(e.getMissingDevices(), - e.getExtraDevices())) - .build()); - } catch (StaleDevicesException e) { - throw new WebApplicationException(Response.status(410) - .type(MediaType.APPLICATION_JSON) - .entity(new StaleDevices(e.getStaleDevices())) - .build()); + DestinationDeviceValidator.validateCompleteDeviceList(destination.get(), + messages.messages().stream().map(IncomingMessage::destinationDeviceId).collect(Collectors.toSet()), + excludedDeviceIds); + + DestinationDeviceValidator.validateRegistrationIds(destination.get(), + messages.messages(), + IncomingMessage::destinationDeviceId, + IncomingMessage::destinationRegistrationId, + destination.get().getPhoneNumberIdentifier().equals(destinationIdentifier.uuid())); + + final List tags = List.of(UserAgentTagUtil.getPlatformTag(userAgent), + Tag.of(EPHEMERAL_TAG_NAME, String.valueOf(messages.online())), + Tag.of(SENDER_TYPE_TAG_NAME, senderType), + Tag.of(IDENTITY_TYPE_TAG_NAME, destinationIdentifier.identityType().name())); + + for (IncomingMessage incomingMessage : messages.messages()) { + Optional destinationDevice = destination.get().getDevice(incomingMessage.destinationDeviceId()); + + if (destinationDevice.isPresent()) { + Metrics.counter(SENT_MESSAGE_COUNTER_NAME, tags).increment(); + sendIndividualMessage( + source, + destination.get(), + destinationDevice.get(), + destinationIdentifier, + messages.timestamp(), + messages.online(), + isStory, + messages.urgent(), + incomingMessage, + userAgent, + spamReportToken); + } + } + + return Response.ok(new SendMessageResponse(needsSync)).build(); + } catch (NoSuchUserException e) { + throw new WebApplicationException(Response.status(404).build()); + } catch (MismatchedDevicesException e) { + throw new WebApplicationException(Response.status(409) + .type(MediaType.APPLICATION_JSON_TYPE) + .entity(new MismatchedDevices(e.getMissingDevices(), + e.getExtraDevices())) + .build()); + } catch (StaleDevicesException e) { + throw new WebApplicationException(Response.status(410) + .type(MediaType.APPLICATION_JSON) + .entity(new StaleDevices(e.getStaleDevices())) + .build()); + } + } finally { + sample.stop(SEND_MESSAGE_LATENCY_TIMER); } }