Skip to content

Commit

Permalink
feat: Added tracking product role on send events (#162)
Browse files Browse the repository at this point in the history
  • Loading branch information
manuraf authored Jun 26, 2024
1 parent 8f91a6c commit dc8771d
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static com.mongodb.client.model.Projections.include;
import static it.pagopa.selfcare.user.UserUtils.mapPropsForTrackEvent;
import static it.pagopa.selfcare.user.event.constant.CdcStartAtConstant.*;
import static it.pagopa.selfcare.user.model.TrackEventInput.toTrackEventInput;
import static it.pagopa.selfcare.user.model.constants.EventsMetric.*;
import static it.pagopa.selfcare.user.model.constants.EventsName.EVENT_USER_CDC_NAME;
import static java.util.Arrays.asList;
Expand Down Expand Up @@ -166,11 +167,11 @@ protected void consumerUserInstitutionRepositoryEvent(ChangeStreamDocument<UserI
result -> {
log.info("UserInfo collection successfully updated from UserInstitution document having id: {}", userInstitutionId);
updateLastResumeToken(document.getResumeToken());
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(userInstitutionChanged, null)), Map.of(USER_INFO_UPDATE_SUCCESS, 1D));
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInputByUserInstitution(userInstitutionChanged)), Map.of(USER_INFO_UPDATE_SUCCESS, 1D));
},
failure -> {
log.error("Error during UserInfo collection updating, from UserInstitution document having id: {} , message: {}", userInstitutionId, failure.getMessage());
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(userInstitutionChanged, null)), Map.of(USER_INFO_UPDATE_FAILURE, 1D));
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInputByUserInstitution(userInstitutionChanged)), Map.of(USER_INFO_UPDATE_FAILURE, 1D));
});
}

Expand All @@ -191,26 +192,27 @@ public void consumerToSendScUserEvent(ChangeStreamDocument<UserInstitution> docu
assert document.getDocumentKey() != null;
UserInstitution userInstitutionChanged = document.getFullDocument();

log.info("Starting consumerUserInstitutionRepositoryEvent ... ");
log.info("Starting consumerToSendScUserEvent ... ");

userRegistryApi.findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userInstitutionChanged.getUserId())
.onFailure(this::checkIfIsRetryableException)
.retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofSeconds(retryMaxBackOff)).atMost(maxRetry)
.onItem().transformToUni(userResource -> Multi.createFrom().iterable(UserUtils.groupingProductAndReturnMinStateProduct(userInstitutionChanged.getProducts()))
.map(onboardedProduct -> notificationMapper.toUserNotificationToSend(userInstitutionChanged, onboardedProduct, userResource))
.onItem().transformToUniAndMerge(userNotificationToSend -> eventHubRestClient.sendMessage(userNotificationToSend)
.onItem().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(userInstitutionChanged, userNotificationToSend.getProductId())), Map.of(EVENTS_USER_INSTITUTION_PRODUCT_SUCCESS, 1D)))
.onFailure().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(userInstitutionChanged, userNotificationToSend.getProductId())), Map.of(EVENTS_USER_INSTITUTION_PRODUCT_FAILURE, 1D))))
.onFailure().retry().withBackOff(Duration.ofSeconds(retryMinBackOff), Duration.ofSeconds(retryMaxBackOff)).atMost(maxRetry)
.onItem().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(userNotificationToSend)), Map.of(EVENTS_USER_INSTITUTION_PRODUCT_SUCCESS, 1D)))
.onFailure().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(userNotificationToSend)), Map.of(EVENTS_USER_INSTITUTION_PRODUCT_FAILURE, 1D))))
.toUni()
)
.subscribe().with(
result -> {
log.info("SendEvents successfully performed from UserInstitution document having id: {}", document.getDocumentKey().toJson());
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(userInstitutionChanged, null)), Map.of(EVENTS_USER_INSTITUTION_SUCCESS, 1D));
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInputByUserInstitution(userInstitutionChanged)), Map.of(EVENTS_USER_INSTITUTION_SUCCESS, 1D));
},
failure -> {
log.error("Error during SendEvents from UserInstitution document having id: {} , message: {}", document.getDocumentKey().toJson(), failure.getMessage());
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(userInstitutionChanged, null)), Map.of(EVENTS_USER_INSTITUTION_FAILURE, 1D));
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInputByUserInstitution(userInstitutionChanged)), Map.of(EVENTS_USER_INSTITUTION_FAILURE, 1D));
});
}

Expand All @@ -219,12 +221,11 @@ private boolean checkIfIsRetryableException(Throwable throwable) {
(throwable instanceof WebApplicationException webApplicationException && webApplicationException.getResponse().getStatus() == 429);
}

private TrackEventInput toTrackEventInput(UserInstitution userInstitution, String productId) {
private TrackEventInput toTrackEventInputByUserInstitution(UserInstitution userInstitution) {
return TrackEventInput.builder()
.documentKey(userInstitution.getInstitutionId())
.documentKey(userInstitution.getId().toHexString())
.userId(userInstitution.getUserId())
.institutionId(userInstitution.getInstitutionId())
.productId(productId)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import static it.pagopa.selfcare.user.UserUtils.mapPropsForTrackEvent;
import static it.pagopa.selfcare.user.constant.TemplateMailConstant.*;
import static it.pagopa.selfcare.user.model.TrackEventInput.toTrackEventInput;
import static it.pagopa.selfcare.user.model.constants.EventsMetric.EVENTS_USER_INSTITUTION_PRODUCT_FAILURE;
import static it.pagopa.selfcare.user.model.constants.EventsMetric.EVENTS_USER_INSTITUTION_PRODUCT_SUCCESS;
import static it.pagopa.selfcare.user.model.constants.EventsName.EVENT_USER_MS_NAME;
Expand Down Expand Up @@ -69,13 +70,7 @@ public Uni<UserNotificationToSend> sendKafkaNotification(UserNotificationToSend
}

private Runnable trackTelemetryEvent(UserNotificationToSend userNotificationToSend, String metricsName) {
TrackEventInput trackEventInput = TrackEventInput.builder()
.documentKey(userNotificationToSend.getInstitutionId())
.userId(Optional.ofNullable(userNotificationToSend.getUser()).map(UserToNotify::getUserId).orElse(null))
.institutionId(userNotificationToSend.getInstitutionId())
.productId(userNotificationToSend.getProductId())
.build();
return () -> telemetryClient.trackEvent(EVENT_USER_MS_NAME, mapPropsForTrackEvent(trackEventInput), Map.of(metricsName, 1D));
return () -> telemetryClient.trackEvent(EVENT_USER_MS_NAME, mapPropsForTrackEvent(toTrackEventInput(userNotificationToSend)), Map.of(metricsName, 1D));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ public Uni<Void> sendEventsByDateAndUserIdAndInstitutionId(LocalDateTime fromDat
String userIdToUse = userId != null ? userId : userInstitution.getUserId();
Uni<UserResource> userResourceUni = userRegistryService.findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userIdToUse);
TrackEventInput trackEventInput = TrackEventInput.builder()
.documentKey(userInstitution.getInstitutionId())
.documentKey(userInstitution.getId().toHexString())
.userId(userIdToUse)
.institutionId(userInstitution.getInstitutionId())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public static Map<String, String> mapPropsForTrackEvent(TrackEventInput trackEve
Optional.ofNullable(trackEventInput.getUserId()).ifPresent(value -> propertiesMap.put("userId", value));
Optional.ofNullable(trackEventInput.getProductId()).ifPresent(value -> propertiesMap.put("productId", value));
Optional.ofNullable(trackEventInput.getInstitutionId()).ifPresent(value -> propertiesMap.put("institutionId", value));
Optional.ofNullable(trackEventInput.getProductRole()).ifPresent(value -> propertiesMap.put("productRole", value));
Optional.ofNullable(trackEventInput.getException()).ifPresent(value -> propertiesMap.put("exec", value));
return propertiesMap;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,26 @@
import lombok.Builder;
import lombok.Data;

import java.util.Optional;

@Data
@Builder(toBuilder = true)
public class TrackEventInput {

private String documentKey;
private String userId;
private String productId;

private String institutionId;

private String exception;
private String productRole;

public static TrackEventInput toTrackEventInput(UserNotificationToSend userNotificationToSend) {
return TrackEventInput.builder()
.documentKey(userNotificationToSend.getId())
.userId(Optional.ofNullable(userNotificationToSend.getUser()).map(UserToNotify::getUserId).orElse(null))
.institutionId(userNotificationToSend.getInstitutionId())
.productId(userNotificationToSend.getProductId())
.productRole(Optional.ofNullable(userNotificationToSend.getUser()).map(UserToNotify::getProductRole).orElse(null))
.build();
}
}

0 comments on commit dc8771d

Please sign in to comment.