Skip to content

Commit

Permalink
use explicit Timer rather than micrometer annotation for send-messa…
Browse files Browse the repository at this point in the history
…ge latency distribution
  • Loading branch information
jkt-signal committed Feb 15, 2024
1 parent d884700 commit fef57dc
Showing 1 changed file with 136 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,20 @@

import static com.codahale.metrics.MetricRegistry.name;

import com.codahale.metrics.annotation.Timed;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.net.HttpHeaders;
import com.google.protobuf.ByteString;
import io.dropwizard.auth.Auth;
import io.dropwizard.util.DataSize;
import io.micrometer.core.annotation.Timed;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.Timer.Sample;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.Content;
Expand Down Expand Up @@ -104,6 +106,7 @@
import org.whispersystems.textsecuregcm.limits.CardinalityEstimator;
import org.whispersystems.textsecuregcm.limits.RateLimiters;
import org.whispersystems.textsecuregcm.metrics.MessageMetrics;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
import org.whispersystems.textsecuregcm.providers.MultiRecipientMessageProvider;
import org.whispersystems.textsecuregcm.push.MessageSender;
Expand Down Expand Up @@ -174,6 +177,10 @@ private record MultiRecipientDeliveryData(

private static final String REJECT_INVALID_ENVELOPE_TYPE = name(MessageController.class, "rejectInvalidEnvelopeType");
private static final String UNEXPECTED_MISSING_USER_COUNTER_NAME = name(MessageController.class, "unexpectedMissingDestinationForMultiRecipientMessage");
private static final Timer SEND_MESSAGE_LATENCY_TIMER =
Timer.builder(MetricsUtil.name(MessageController.class, "sendMessageLatency"))
.publishPercentileHistogram(true)
.register(Metrics.globalRegistry);

private static final String EPHEMERAL_TAG_NAME = "ephemeral";
private static final String SENDER_TYPE_TAG_NAME = "senderType";
Expand Down Expand Up @@ -222,7 +229,7 @@ public MessageController(
this.spamChecker = spamChecker;
}

@Timed(value = "chat.MessageController.sendMessageLatency", histogram = true)
@Timed
@Path("/{destination}")
@PUT
@Consumes(MediaType.APPLICATION_JSON)
Expand All @@ -236,155 +243,159 @@ public Response sendMessage(@Auth Optional<AuthenticatedAccount> source,
@NotNull @Valid IncomingMessageList messages,
@Context ContainerRequestContext context) throws RateLimitExceededException {

final Sample sample = Timer.start();
try {
if (source.isEmpty() && accessKey.isEmpty() && !isStory) {
throw new WebApplicationException(Response.Status.UNAUTHORIZED);
}

if (source.isEmpty() && accessKey.isEmpty() && !isStory) {
throw new WebApplicationException(Response.Status.UNAUTHORIZED);
}

final String senderType;
if (source.isPresent()) {
if (source.get().getAccount().isIdentifiedBy(destinationIdentifier)) {
senderType = SENDER_TYPE_SELF;
final String senderType;
if (source.isPresent()) {
if (source.get().getAccount().isIdentifiedBy(destinationIdentifier)) {
senderType = SENDER_TYPE_SELF;
} else {
senderType = SENDER_TYPE_IDENTIFIED;
}
} else {
senderType = SENDER_TYPE_IDENTIFIED;
senderType = SENDER_TYPE_UNIDENTIFIED;
}
} else {
senderType = SENDER_TYPE_UNIDENTIFIED;
}

boolean isSyncMessage = source.isPresent() && source.get().getAccount().isIdentifiedBy(destinationIdentifier);
boolean isSyncMessage = source.isPresent() && source.get().getAccount().isIdentifiedBy(destinationIdentifier);

if (isSyncMessage && destinationIdentifier.identityType() == IdentityType.PNI) {
throw new WebApplicationException(Status.FORBIDDEN);
}

Optional<Account> destination;
if (!isSyncMessage) {
destination = accountsManager.getByServiceIdentifier(destinationIdentifier);
} else {
destination = source.map(AuthenticatedAccount::getAccount);
}
if (isSyncMessage && destinationIdentifier.identityType() == IdentityType.PNI) {
throw new WebApplicationException(Status.FORBIDDEN);
}

final Optional<Response> spamCheck = spamChecker.checkForSpam(
context, source.map(AuthenticatedAccount::getAccount), destination);
if (spamCheck.isPresent()) {
return spamCheck.get();
}
Optional<Account> destination;
if (!isSyncMessage) {
destination = accountsManager.getByServiceIdentifier(destinationIdentifier);
} else {
destination = source.map(AuthenticatedAccount::getAccount);
}

final Optional<byte[]> spamReportToken = switch (senderType) {
case SENDER_TYPE_IDENTIFIED ->
reportSpamTokenProvider.makeReportSpamToken(context, source.get().getAccount(), destination);
default -> Optional.empty();
};
final Optional<Response> spamCheck = spamChecker.checkForSpam(
context, source.map(AuthenticatedAccount::getAccount), destination);
if (spamCheck.isPresent()) {
return spamCheck.get();
}

int totalContentLength = 0;
final Optional<byte[]> spamReportToken = switch (senderType) {
case SENDER_TYPE_IDENTIFIED ->
reportSpamTokenProvider.makeReportSpamToken(context, source.get().getAccount(), destination);
default -> Optional.empty();
};

for (final IncomingMessage message : messages.messages()) {
int contentLength = 0;
int totalContentLength = 0;

if (StringUtils.isNotEmpty(message.content())) {
contentLength += message.content().length();
}
for (final IncomingMessage message : messages.messages()) {
int contentLength = 0;

validateContentLength(contentLength, userAgent);
validateEnvelopeType(message.type(), userAgent);
if (StringUtils.isNotEmpty(message.content())) {
contentLength += message.content().length();
}

totalContentLength += contentLength;
}
validateContentLength(contentLength, userAgent);
validateEnvelopeType(message.type(), userAgent);

try {
rateLimiters.getInboundMessageBytes().validate(destinationIdentifier.uuid(), totalContentLength);
} catch (final RateLimitExceededException e) {
if (dynamicConfigurationManager.getConfiguration().getInboundMessageByteLimitConfiguration().enforceInboundLimit()) {
messageByteLimitEstimator.add(destinationIdentifier.uuid().toString());
throw e;
totalContentLength += contentLength;
}
}

try {
// Stories will be checked by the client; we bypass access checks here for stories.
if (!isStory) {
OptionalAccess.verify(source.map(AuthenticatedAccount::getAccount), accessKey, destination);
try {
rateLimiters.getInboundMessageBytes().validate(destinationIdentifier.uuid(), totalContentLength);
} catch (final RateLimitExceededException e) {
if (dynamicConfigurationManager.getConfiguration().getInboundMessageByteLimitConfiguration().enforceInboundLimit()) {
messageByteLimitEstimator.add(destinationIdentifier.uuid().toString());
throw e;
}
}

boolean needsSync = !isSyncMessage && source.isPresent() && source.get().getAccount().hasEnabledLinkedDevice();
try {
// Stories will be checked by the client; we bypass access checks here for stories.
if (!isStory) {
OptionalAccess.verify(source.map(AuthenticatedAccount::getAccount), accessKey, destination);
}

// We return 200 when stories are sent to a non-existent account. Since story sends bypass OptionalAccess.verify
// we leak information about whether a destination UUID exists if we return any other code (e.g. 404) from
// these requests.
if (isStory && destination.isEmpty()) {
return Response.ok(new SendMessageResponse(needsSync)).build();
}
boolean needsSync = !isSyncMessage && source.isPresent() && source.get().getAccount().hasEnabledLinkedDevice();

// if destination is empty we would either throw an exception in OptionalAccess.verify when isStory is false
// or else return a 200 response when isStory is true.
assert destination.isPresent();
// We return 200 when stories are sent to a non-existent account. Since story sends bypass OptionalAccess.verify
// we leak information about whether a destination UUID exists if we return any other code (e.g. 404) from
// these requests.
if (isStory && destination.isEmpty()) {
return Response.ok(new SendMessageResponse(needsSync)).build();
}

if (source.isPresent() && !isSyncMessage) {
checkMessageRateLimit(source.get(), destination.get(), userAgent);
}
// if destination is empty we would either throw an exception in OptionalAccess.verify when isStory is false
// or else return a 200 response when isStory is true.
assert destination.isPresent();

if (isStory) {
rateLimiters.getStoriesLimiter().validate(destination.get().getUuid());
}
if (source.isPresent() && !isSyncMessage) {
checkMessageRateLimit(source.get(), destination.get(), userAgent);
}

final Set<Byte> excludedDeviceIds;
if (isStory) {
rateLimiters.getStoriesLimiter().validate(destination.get().getUuid());
}

if (isSyncMessage) {
excludedDeviceIds = Set.of(source.get().getAuthenticatedDevice().getId());
} else {
excludedDeviceIds = Collections.emptySet();
}
final Set<Byte> excludedDeviceIds;

DestinationDeviceValidator.validateCompleteDeviceList(destination.get(),
messages.messages().stream().map(IncomingMessage::destinationDeviceId).collect(Collectors.toSet()),
excludedDeviceIds);

DestinationDeviceValidator.validateRegistrationIds(destination.get(),
messages.messages(),
IncomingMessage::destinationDeviceId,
IncomingMessage::destinationRegistrationId,
destination.get().getPhoneNumberIdentifier().equals(destinationIdentifier.uuid()));

final List<Tag> tags = List.of(UserAgentTagUtil.getPlatformTag(userAgent),
Tag.of(EPHEMERAL_TAG_NAME, String.valueOf(messages.online())),
Tag.of(SENDER_TYPE_TAG_NAME, senderType),
Tag.of(IDENTITY_TYPE_TAG_NAME, destinationIdentifier.identityType().name()));

for (IncomingMessage incomingMessage : messages.messages()) {
Optional<Device> destinationDevice = destination.get().getDevice(incomingMessage.destinationDeviceId());

if (destinationDevice.isPresent()) {
Metrics.counter(SENT_MESSAGE_COUNTER_NAME, tags).increment();
sendIndividualMessage(
source,
destination.get(),
destinationDevice.get(),
destinationIdentifier,
messages.timestamp(),
messages.online(),
isStory,
messages.urgent(),
incomingMessage,
userAgent,
spamReportToken);
if (isSyncMessage) {
excludedDeviceIds = Set.of(source.get().getAuthenticatedDevice().getId());
} else {
excludedDeviceIds = Collections.emptySet();
}
}

return Response.ok(new SendMessageResponse(needsSync)).build();
} catch (NoSuchUserException e) {
throw new WebApplicationException(Response.status(404).build());
} catch (MismatchedDevicesException e) {
throw new WebApplicationException(Response.status(409)
.type(MediaType.APPLICATION_JSON_TYPE)
.entity(new MismatchedDevices(e.getMissingDevices(),
e.getExtraDevices()))
.build());
} catch (StaleDevicesException e) {
throw new WebApplicationException(Response.status(410)
.type(MediaType.APPLICATION_JSON)
.entity(new StaleDevices(e.getStaleDevices()))
.build());
DestinationDeviceValidator.validateCompleteDeviceList(destination.get(),
messages.messages().stream().map(IncomingMessage::destinationDeviceId).collect(Collectors.toSet()),
excludedDeviceIds);

DestinationDeviceValidator.validateRegistrationIds(destination.get(),
messages.messages(),
IncomingMessage::destinationDeviceId,
IncomingMessage::destinationRegistrationId,
destination.get().getPhoneNumberIdentifier().equals(destinationIdentifier.uuid()));

final List<Tag> tags = List.of(UserAgentTagUtil.getPlatformTag(userAgent),
Tag.of(EPHEMERAL_TAG_NAME, String.valueOf(messages.online())),
Tag.of(SENDER_TYPE_TAG_NAME, senderType),
Tag.of(IDENTITY_TYPE_TAG_NAME, destinationIdentifier.identityType().name()));

for (IncomingMessage incomingMessage : messages.messages()) {
Optional<Device> destinationDevice = destination.get().getDevice(incomingMessage.destinationDeviceId());

if (destinationDevice.isPresent()) {
Metrics.counter(SENT_MESSAGE_COUNTER_NAME, tags).increment();
sendIndividualMessage(
source,
destination.get(),
destinationDevice.get(),
destinationIdentifier,
messages.timestamp(),
messages.online(),
isStory,
messages.urgent(),
incomingMessage,
userAgent,
spamReportToken);
}
}

return Response.ok(new SendMessageResponse(needsSync)).build();
} catch (NoSuchUserException e) {
throw new WebApplicationException(Response.status(404).build());
} catch (MismatchedDevicesException e) {
throw new WebApplicationException(Response.status(409)
.type(MediaType.APPLICATION_JSON_TYPE)
.entity(new MismatchedDevices(e.getMissingDevices(),
e.getExtraDevices()))
.build());
} catch (StaleDevicesException e) {
throw new WebApplicationException(Response.status(410)
.type(MediaType.APPLICATION_JSON)
.entity(new StaleDevices(e.getStaleDevices()))
.build());
}
} finally {
sample.stop(SEND_MESSAGE_LATENCY_TIMER);
}
}

Expand Down

0 comments on commit fef57dc

Please sign in to comment.