Skip to content

Commit

Permalink
Return futures from "send push notification" operations
Browse files Browse the repository at this point in the history
  • Loading branch information
jon-signal committed Jul 8, 2024
1 parent 2e36673 commit 6d166fd
Showing 1 changed file with 68 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.micrometer.core.instrument.Tags;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -49,35 +50,38 @@ public PushNotificationManager(final AccountsManager accountsManager,
this.pushLatencyManager = pushLatencyManager;
}

public void sendNewMessageNotification(final Account destination, final byte destinationDeviceId, final boolean urgent) throws NotPushRegisteredException {
public CompletableFuture<Optional<SendPushNotificationResult>> sendNewMessageNotification(final Account destination, final byte destinationDeviceId, final boolean urgent) throws NotPushRegisteredException {
final Device device = destination.getDevice(destinationDeviceId).orElseThrow(NotPushRegisteredException::new);
final Pair<String, PushNotification.TokenType> tokenAndType = getToken(device);

sendNotification(new PushNotification(tokenAndType.first(), tokenAndType.second(),
return sendNotification(new PushNotification(tokenAndType.first(), tokenAndType.second(),
PushNotification.NotificationType.NOTIFICATION, null, destination, device, urgent));
}

public void sendRegistrationChallengeNotification(final String deviceToken, final PushNotification.TokenType tokenType, final String challengeToken) {
sendNotification(new PushNotification(deviceToken, tokenType, PushNotification.NotificationType.CHALLENGE, challengeToken, null, null, true));
public CompletableFuture<SendPushNotificationResult> sendRegistrationChallengeNotification(final String deviceToken, final PushNotification.TokenType tokenType, final String challengeToken) {
return sendNotification(new PushNotification(deviceToken, tokenType, PushNotification.NotificationType.CHALLENGE, challengeToken, null, null, true))
.thenApply(maybeResponse -> maybeResponse.orElseThrow(() -> new AssertionError("Responses must be present for urgent notifications")));
}

public void sendRateLimitChallengeNotification(final Account destination, final String challengeToken)
public CompletableFuture<SendPushNotificationResult> sendRateLimitChallengeNotification(final Account destination, final String challengeToken)
throws NotPushRegisteredException {

final Device device = destination.getPrimaryDevice();
final Pair<String, PushNotification.TokenType> tokenAndType = getToken(device);

sendNotification(new PushNotification(tokenAndType.first(), tokenAndType.second(),
PushNotification.NotificationType.RATE_LIMIT_CHALLENGE, challengeToken, destination, device, true));
return sendNotification(new PushNotification(tokenAndType.first(), tokenAndType.second(),
PushNotification.NotificationType.RATE_LIMIT_CHALLENGE, challengeToken, destination, device, true))
.thenApply(maybeResponse -> maybeResponse.orElseThrow(() -> new AssertionError("Responses must be present for urgent notifications")));
}

public void sendAttemptLoginNotification(final Account destination, final String context) throws NotPushRegisteredException {
public CompletableFuture<SendPushNotificationResult> sendAttemptLoginNotification(final Account destination, final String context) throws NotPushRegisteredException {
final Device device = destination.getDevice(Device.PRIMARY_ID).orElseThrow(NotPushRegisteredException::new);
final Pair<String, PushNotification.TokenType> tokenAndType = getToken(device);

sendNotification(new PushNotification(tokenAndType.first(), tokenAndType.second(),
return sendNotification(new PushNotification(tokenAndType.first(), tokenAndType.second(),
PushNotification.NotificationType.ATTEMPT_LOGIN_NOTIFICATION_HIGH_PRIORITY,
context, destination, device, true));
context, destination, device, true))
.thenApply(maybeResponse -> maybeResponse.orElseThrow(() -> new AssertionError("Responses must be present for urgent notifications")));
}

public void handleMessagesRetrieved(final Account account, final Device device, final String userAgent) {
Expand All @@ -103,64 +107,66 @@ Pair<String, PushNotification.TokenType> getToken(final Device device) throws No
}

@VisibleForTesting
void sendNotification(final PushNotification pushNotification) {
CompletableFuture<Optional<SendPushNotificationResult>> sendNotification(final PushNotification pushNotification) {
if (pushNotification.tokenType() == PushNotification.TokenType.APN && !pushNotification.urgent()) {
// APNs imposes a per-device limit on background push notifications; schedule a notification for some time in the
// future (possibly even now!) rather than sending a notification directly
apnPushNotificationScheduler
return apnPushNotificationScheduler
.scheduleBackgroundNotification(pushNotification.destination(), pushNotification.destinationDevice())
.whenComplete(logErrors());
.whenComplete(logErrors())
.thenApply(ignored -> Optional.<SendPushNotificationResult>empty())
.toCompletableFuture();
}

} else {
final PushNotificationSender sender = switch (pushNotification.tokenType()) {
case FCM -> fcmSender;
case APN, APN_VOIP -> apnSender;
};

sender.sendNotification(pushNotification).whenComplete((result, throwable) -> {
if (throwable == null) {
Tags tags = Tags.of("tokenType", pushNotification.tokenType().name(),
"notificationType", pushNotification.notificationType().name(),
"urgent", String.valueOf(pushNotification.urgent()),
"accepted", String.valueOf(result.accepted()),
"unregistered", String.valueOf(result.unregistered()));

if (result.errorCode().isPresent()) {
tags = tags.and("errorCode", result.errorCode().get());
}

Metrics.counter(SENT_NOTIFICATION_COUNTER_NAME, tags).increment();

if (result.unregistered() && pushNotification.destination() != null
&& pushNotification.destinationDevice() != null) {

handleDeviceUnregistered(pushNotification.destination(),
pushNotification.destinationDevice(),
pushNotification.tokenType(),
result.errorCode(),
result.unregisteredTimestamp());
}

if (result.accepted() &&
pushNotification.tokenType() == PushNotification.TokenType.APN_VOIP &&
pushNotification.notificationType() == PushNotification.NotificationType.NOTIFICATION &&
pushNotification.destination() != null &&
pushNotification.destinationDevice() != null) {

apnPushNotificationScheduler.scheduleRecurringVoipNotification(
pushNotification.destination(),
pushNotification.destinationDevice())
.whenComplete(logErrors());
}
} else {
logger.debug("Failed to deliver {} push notification to {} ({})",
pushNotification.notificationType(), pushNotification.deviceToken(), pushNotification.tokenType(),
throwable);

Metrics.counter(FAILED_NOTIFICATION_COUNTER_NAME, "cause", throwable.getClass().getSimpleName()).increment();
final PushNotificationSender sender = switch (pushNotification.tokenType()) {
case FCM -> fcmSender;
case APN, APN_VOIP -> apnSender;
};

return sender.sendNotification(pushNotification).whenComplete((result, throwable) -> {
if (throwable == null) {
Tags tags = Tags.of("tokenType", pushNotification.tokenType().name(),
"notificationType", pushNotification.notificationType().name(),
"urgent", String.valueOf(pushNotification.urgent()),
"accepted", String.valueOf(result.accepted()),
"unregistered", String.valueOf(result.unregistered()));

if (result.errorCode().isPresent()) {
tags = tags.and("errorCode", result.errorCode().get());
}
});
}

Metrics.counter(SENT_NOTIFICATION_COUNTER_NAME, tags).increment();

if (result.unregistered() && pushNotification.destination() != null
&& pushNotification.destinationDevice() != null) {

handleDeviceUnregistered(pushNotification.destination(),
pushNotification.destinationDevice(),
pushNotification.tokenType(),
result.errorCode(),
result.unregisteredTimestamp());
}

if (result.accepted() &&
pushNotification.tokenType() == PushNotification.TokenType.APN_VOIP &&
pushNotification.notificationType() == PushNotification.NotificationType.NOTIFICATION &&
pushNotification.destination() != null &&
pushNotification.destinationDevice() != null) {

apnPushNotificationScheduler.scheduleRecurringVoipNotification(
pushNotification.destination(),
pushNotification.destinationDevice())
.whenComplete(logErrors());
}
} else {
logger.debug("Failed to deliver {} push notification to {} ({})",
pushNotification.notificationType(), pushNotification.deviceToken(), pushNotification.tokenType(),
throwable);

Metrics.counter(FAILED_NOTIFICATION_COUNTER_NAME, "cause", throwable.getClass().getSimpleName()).increment();
}
})
.thenApply(Optional::of);
}

private static <T> BiConsumer<T, Throwable> logErrors() {
Expand Down

0 comments on commit 6d166fd

Please sign in to comment.