From b810d44a371554bf0fe7e4201594ced27fcddbf8 Mon Sep 17 00:00:00 2001 From: Gwenneg Lepage Date: Fri, 8 Dec 2023 16:07:23 +0100 Subject: [PATCH] RHCLOUD-29896 Change the OCM email sender name based on the source env (#2389) * RHCLOUD-29896 Change the OCM email sender name based on the source env * Fix test * Prevent NPEs * Apply review suggestion --- .../cloud/notifications/models/Event.java | 16 ++++- ...0.0__RHCLOUD-29896_source_env_in_event.sql | 2 + .../notifications/events/EventConsumer.java | 18 ++++-- .../events/KafkaHeadersExtractor.java | 48 ++++++++++++++ .../events/KafkaMessageDeduplicator.java | 61 ++++++------------ .../processors/email/EmailActorsResolver.java | 11 +++- .../events/EventConsumerTest.java | 4 +- .../events/KafkaHeadersExtractorTest.java | 63 +++++++++++++++++++ .../email/EmailActorsResolverTest.java | 25 ++++++-- 9 files changed, 193 insertions(+), 55 deletions(-) create mode 100644 database/src/main/resources/db/migration/V1.90.0__RHCLOUD-29896_source_env_in_event.sql create mode 100644 engine/src/main/java/com/redhat/cloud/notifications/events/KafkaHeadersExtractor.java create mode 100644 engine/src/test/java/com/redhat/cloud/notifications/events/KafkaHeadersExtractorTest.java diff --git a/common/src/main/java/com/redhat/cloud/notifications/models/Event.java b/common/src/main/java/com/redhat/cloud/notifications/models/Event.java index 4361130bf7..ff2b6a69e2 100644 --- a/common/src/main/java/com/redhat/cloud/notifications/models/Event.java +++ b/common/src/main/java/com/redhat/cloud/notifications/models/Event.java @@ -17,6 +17,7 @@ import java.util.Date; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.UUID; @@ -74,15 +75,20 @@ public class Event { private String renderedDrawerNotification; + private String sourceEnvironment; + @Transient private EventWrapper eventWrapper; public Event() { } - public Event(EventType eventType, String payload, EventWrapper eventWrapper) { + public Event(EventType eventType, String payload, EventWrapper eventWrapper, Optional sourceEnvironment) { this(eventWrapper.getAccountId(), eventWrapper.getOrgId(), eventType, eventWrapper.getId()); this.payload = payload; this.eventWrapper = eventWrapper; + if (sourceEnvironment.isPresent()) { + this.sourceEnvironment = sourceEnvironment.get(); + } } public Event(String accountId, String orgId, EventType eventType, UUID eventId) { @@ -236,6 +242,14 @@ public void setRenderedDrawerNotification(String renderedDrawerNotification) { this.renderedDrawerNotification = renderedDrawerNotification; } + public String getSourceEnvironment() { + return sourceEnvironment; + } + + public void setSourceEnvironment(String sourceEnvironment) { + this.sourceEnvironment = sourceEnvironment; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/database/src/main/resources/db/migration/V1.90.0__RHCLOUD-29896_source_env_in_event.sql b/database/src/main/resources/db/migration/V1.90.0__RHCLOUD-29896_source_env_in_event.sql new file mode 100644 index 0000000000..2acd383454 --- /dev/null +++ b/database/src/main/resources/db/migration/V1.90.0__RHCLOUD-29896_source_env_in_event.sql @@ -0,0 +1,2 @@ +ALTER TABLE event + ADD COLUMN source_environment TEXT; diff --git a/engine/src/main/java/com/redhat/cloud/notifications/events/EventConsumer.java b/engine/src/main/java/com/redhat/cloud/notifications/events/EventConsumer.java index 6dc3efe226..7da18858c7 100644 --- a/engine/src/main/java/com/redhat/cloud/notifications/events/EventConsumer.java +++ b/engine/src/main/java/com/redhat/cloud/notifications/events/EventConsumer.java @@ -49,6 +49,7 @@ public class EventConsumer { static final String TAG_KEY_EVENT_TYPE_FQN = "event-type-fqn"; private static final String EVENT_TYPE_NOT_FOUND_MSG = "No event type found for key: %s"; + private static final String SOURCE_ENVIRONMENT_HEADER = "rh-source-environment"; @Inject MeterRegistry registry; @@ -71,6 +72,9 @@ public class EventConsumer { @Inject CloudEventTransformerFactory cloudEventTransformerFactory; + @Inject + KafkaHeadersExtractor kafkaHeadersExtractor; + ConsoleCloudEventParser cloudEventParser = new ConsoleCloudEventParser(); private Counter rejectedCounter; @@ -107,6 +111,11 @@ public CompletionStage process(Message message) { * we now have a bundle/app/eventType triplet or a fully qualified name for the event type. */ + Map> kafkaHeaders = kafkaHeadersExtractor.extract(message, + MESSAGE_ID_HEADER, + SOURCE_ENVIRONMENT_HEADER + ); + /* * Step 2 * The message ID is extracted from the event data - if it is not present we fallback to the kafka headers @@ -114,7 +123,7 @@ public CompletionStage process(Message message) { * apps time to change their integration and start sending the new header. The message ID will become * mandatory with cloud events. We may want to throw an exception when it is null. */ - final UUID messageId = getMessageId(eventWrapper, message); + final UUID messageId = getMessageId(eventWrapper, kafkaHeaders.get(MESSAGE_ID_HEADER)); String msgId = messageId == null ? "null" : messageId.toString(); Log.infof("Processing received event [id=%s, %s=%s, orgId=%s, %s]", @@ -178,7 +187,8 @@ public CompletionStage process(Message message) { * Step 6 * The EventType was found. It's time to create an Event from the current message and persist it. */ - Event event = new Event(eventType, payload, eventWrapperToProcess); + Optional sourceEnvironmentHeader = kafkaHeaders.get(SOURCE_ENVIRONMENT_HEADER); + Event event = new Event(eventType, payload, eventWrapperToProcess, sourceEnvironmentHeader); if (event.getId() == null) { // NOTIF-499 If there is no ID provided whatsoever we create one. event.setId(Objects.requireNonNullElseGet(messageId, UUID::randomUUID)); @@ -241,10 +251,10 @@ public CompletionStage process(Message message) { } } - private UUID getMessageId(EventWrapper eventWrapper, Message message) { + private UUID getMessageId(EventWrapper eventWrapper, Optional messageIdHeader) { UUID messageId = eventWrapper.getId(); if (messageId == null) { - messageId = kafkaMessageDeduplicator.findMessageId(eventWrapper.getKey(), message); + messageId = kafkaMessageDeduplicator.validateMessageId(eventWrapper.getKey(), messageIdHeader); } return messageId; diff --git a/engine/src/main/java/com/redhat/cloud/notifications/events/KafkaHeadersExtractor.java b/engine/src/main/java/com/redhat/cloud/notifications/events/KafkaHeadersExtractor.java new file mode 100644 index 0000000000..6365280966 --- /dev/null +++ b/engine/src/main/java/com/redhat/cloud/notifications/events/KafkaHeadersExtractor.java @@ -0,0 +1,48 @@ +package com.redhat.cloud.notifications.events; + +import io.quarkus.logging.Log; +import io.smallrye.reactive.messaging.kafka.api.KafkaMessageMetadata; +import jakarta.enterprise.context.ApplicationScoped; +import org.apache.kafka.common.header.Header; +import org.eclipse.microprofile.reactive.messaging.Message; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; + +import static java.nio.charset.StandardCharsets.UTF_8; + +@ApplicationScoped +public class KafkaHeadersExtractor { + + public Map> extract(Message message, String... headerKeys) { + Map> headers = new HashMap<>(); + if (headerKeys.length > 0) { + Optional metadata = message.getMetadata(KafkaMessageMetadata.class); + if (metadata.isPresent()) { + for (String headerKey : headerKeys) { + Iterator
headerValues = metadata.get().getHeaders().headers(headerKey).iterator(); + if (headerValues.hasNext()) { + Header header = headerValues.next(); + if (header.value() != null) { + String headerValue = new String(header.value(), UTF_8); + headers.put(headerKey, Optional.of(headerValue)); + } + if (headerValues.hasNext()) { + Log.warnf( + "Processed a Kafka payload with multiple [%s] header values. The emitter of that payload must change their integration and send only one value. Payload: %s", + headerKey, + message.getPayload()); + } + } + } + } + } + // The returned Map always contains all header keys to prevent NPE. + for (String headerKey : headerKeys) { + headers.putIfAbsent(headerKey, Optional.empty()); + } + return headers; + } +} diff --git a/engine/src/main/java/com/redhat/cloud/notifications/events/KafkaMessageDeduplicator.java b/engine/src/main/java/com/redhat/cloud/notifications/events/KafkaMessageDeduplicator.java index 744c30bd1e..d74961cc01 100644 --- a/engine/src/main/java/com/redhat/cloud/notifications/events/KafkaMessageDeduplicator.java +++ b/engine/src/main/java/com/redhat/cloud/notifications/events/KafkaMessageDeduplicator.java @@ -5,22 +5,16 @@ import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.MeterRegistry; import io.quarkus.logging.Log; -import io.smallrye.reactive.messaging.kafka.api.KafkaMessageMetadata; import jakarta.annotation.PostConstruct; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import jakarta.persistence.EntityManager; import jakarta.persistence.NoResultException; import jakarta.transaction.Transactional; -import org.apache.kafka.common.header.Header; -import org.eclipse.microprofile.reactive.messaging.Message; -import java.util.Iterator; import java.util.Optional; import java.util.UUID; -import static java.nio.charset.StandardCharsets.UTF_8; - @ApplicationScoped public class KafkaMessageDeduplicator { @@ -49,47 +43,30 @@ void initCounters() { } /** - * Extracts the message ID from a Kafka message header value. If multiple header values are available, the first one + * Validates the message ID retrieved from the Kafka message headers. If multiple header values are available, the first one * will be used and the other ones will be ignored. An invalid header value will be counted and logged, but won't * interrupt the message processing: the deduplication will be disabled for the message. + * @deprecated The rh-message-id header will be replaced by the cloud events or actions id field soon. */ - public UUID findMessageId(EventTypeKey eventTypeKey, Message message) { - boolean found = false; - Optional metadata = message.getMetadata(KafkaMessageMetadata.class); - if (metadata.isPresent()) { - Iterator
headers = metadata.get().getHeaders().headers(MESSAGE_ID_HEADER).iterator(); - if (headers.hasNext()) { - found = true; - Header header = headers.next(); - if (header.value() == null) { - invalidMessageIdCounter.increment(); - Log.warnf("Application sent an EventType(%s) with an invalid Kafka header [%s=null]. They must change their " + - "integration and send a non-null value.", eventTypeKey, MESSAGE_ID_HEADER); - } else { - String headerValue = new String(header.value(), UTF_8); - try { - UUID messageId = UUID.fromString(headerValue); - // If the UUID version is 4, then its 15th character has to be "4". - if (!headerValue.substring(14, 15).equals(ACCEPTED_UUID_VERSION)) { - throw new IllegalArgumentException("Wrong UUID version received"); - } - validMessageIdCounter.increment(); - Log.tracef("Application sent an EventType(%s) with a valid Kafka header [%s=%s]", - eventTypeKey, MESSAGE_ID_HEADER, headerValue); - return messageId; - } catch (IllegalArgumentException e) { - invalidMessageIdCounter.increment(); - Log.warnf("Application sent an EventType(%s) with an invalid Kafka header [%s=%s]. They must change their " + - "integration and send a valid UUID (version 4).", eventTypeKey, MESSAGE_ID_HEADER, headerValue); - } + @Deprecated(forRemoval = true) + public UUID validateMessageId(EventTypeKey eventTypeKey, Optional messageIdHeader) { + if (messageIdHeader.isPresent()) { + try { + UUID messageId = UUID.fromString(messageIdHeader.get()); + // If the UUID version is 4, then its 15th character has to be "4". + if (!messageIdHeader.get().substring(14, 15).equals(ACCEPTED_UUID_VERSION)) { + throw new IllegalArgumentException("Wrong UUID version received"); } + validMessageIdCounter.increment(); + Log.tracef("Application sent an EventType(%s) with a valid Kafka header [%s=%s]", + eventTypeKey, MESSAGE_ID_HEADER, messageIdHeader.get()); + return messageId; + } catch (IllegalArgumentException e) { + invalidMessageIdCounter.increment(); + Log.warnf("Application sent an EventType(%s) with an invalid Kafka header [%s=%s]. They must change their " + + "integration and send a valid UUID (version 4).", eventTypeKey, MESSAGE_ID_HEADER, messageIdHeader.get()); } - if (headers.hasNext()) { - Log.warnf("Application sent an EventType(%s) with multiple Kafka headers [%s]. They must change their " + - "integration and send only one value.", eventTypeKey, MESSAGE_ID_HEADER); - } - } - if (!found) { + } else { missingMessageIdCounter.increment(); Log.tracef("Application sent an EventType(%s) but did not send any Kafka header [%s]", eventTypeKey, MESSAGE_ID_HEADER); diff --git a/engine/src/main/java/com/redhat/cloud/notifications/processors/email/EmailActorsResolver.java b/engine/src/main/java/com/redhat/cloud/notifications/processors/email/EmailActorsResolver.java index 7bf9f446ac..df5f87aa3a 100644 --- a/engine/src/main/java/com/redhat/cloud/notifications/processors/email/EmailActorsResolver.java +++ b/engine/src/main/java/com/redhat/cloud/notifications/processors/email/EmailActorsResolver.java @@ -11,7 +11,10 @@ public class EmailActorsResolver { * ConsoleDot applications will use. */ public static final String RH_INSIGHTS_SENDER = "\"Red Hat Insights\" noreply@redhat.com"; - public static final String OPENSHIFT_SENDER = "\"Red Hat OpenShift\" no-reply@openshift.com"; + public static final String OPENSHIFT_SENDER_STAGE = "\"Red Hat OpenShift (staging)\" no-reply@openshift.com"; + public static final String OPENSHIFT_SENDER_PROD = "\"Red Hat OpenShift\" no-reply@openshift.com"; + + private static final String STAGE_ENVIRONMENT = "stage"; /** * Determines which sender should be set in the email from the given event. @@ -26,7 +29,11 @@ public String getEmailSender(final Event event) { String bundle = event.getEventType().getApplication().getBundle().getName(); String application = event.getEventType().getApplication().getName(); if ("openshift".equals(bundle) && "cluster-manager".equals(application)) { - return OPENSHIFT_SENDER; + if (STAGE_ENVIRONMENT.equals(event.getSourceEnvironment())) { + return OPENSHIFT_SENDER_STAGE; + } else { + return OPENSHIFT_SENDER_PROD; + } } else { return RH_INSIGHTS_SENDER; } diff --git a/engine/src/test/java/com/redhat/cloud/notifications/events/EventConsumerTest.java b/engine/src/test/java/com/redhat/cloud/notifications/events/EventConsumerTest.java index 15f7c7942d..18ea87c596 100644 --- a/engine/src/test/java/com/redhat/cloud/notifications/events/EventConsumerTest.java +++ b/engine/src/test/java/com/redhat/cloud/notifications/events/EventConsumerTest.java @@ -287,14 +287,14 @@ void testNullMessageId() { TAG_KEY_APPLICATION, action.getApplication(), TAG_KEY_EVENT_TYPE_FQN, "" ).count()); - micrometerAssertionHelper.assertCounterIncrement(MESSAGE_ID_INVALID_COUNTER_NAME, 1); + micrometerAssertionHelper.assertCounterIncrement(MESSAGE_ID_MISSING_COUNTER_NAME, 1); assertNoCounterIncrement( REJECTED_COUNTER_NAME, PROCESSING_ERROR_COUNTER_NAME, PROCESSING_EXCEPTION_COUNTER_NAME, DUPLICATE_COUNTER_NAME, MESSAGE_ID_VALID_COUNTER_NAME, - MESSAGE_ID_MISSING_COUNTER_NAME + MESSAGE_ID_INVALID_COUNTER_NAME ); verifyExactlyOneProcessing(eventType, payload, action, false); verify(kafkaMessageDeduplicator, times(1)).registerMessageId(null); diff --git a/engine/src/test/java/com/redhat/cloud/notifications/events/KafkaHeadersExtractorTest.java b/engine/src/test/java/com/redhat/cloud/notifications/events/KafkaHeadersExtractorTest.java new file mode 100644 index 0000000000..eb596eeef4 --- /dev/null +++ b/engine/src/test/java/com/redhat/cloud/notifications/events/KafkaHeadersExtractorTest.java @@ -0,0 +1,63 @@ +package com.redhat.cloud.notifications.events; + +import io.quarkus.test.junit.QuarkusTest; +import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata; +import jakarta.inject.Inject; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.junit.jupiter.api.Test; + +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@QuarkusTest +public class KafkaHeadersExtractorTest { + + @Inject + KafkaHeadersExtractor kafkaHeadersExtractor; + + @Test + void testWithNoHeaderKeys() { + assertTrue(kafkaHeadersExtractor.extract(Message.of("{}")).isEmpty()); + } + + @Test + void test() { + String messageId = UUID.randomUUID().toString(); + String sourceEnvironment = "stage"; + + Message message = buildMessageWithHeaders(Map.of( + "rh-message-id", messageId, + "rh-source-environment", sourceEnvironment, + "rh-unused-header", "whatever" + )); + + Map> extractedHeaders = kafkaHeadersExtractor.extract(message, + "rh-message-id", + "rh-unknown-header", + "rh-source-environment" + ); + + assertEquals(messageId, extractedHeaders.get("rh-message-id").get()); + assertTrue(extractedHeaders.get("rh-unknown-header").isEmpty()); + assertEquals(sourceEnvironment, extractedHeaders.get("rh-source-environment").get()); + } + + private static Message buildMessageWithHeaders(Map headers) { + Headers recordHeaders = new RecordHeaders(); + for (Map.Entry header : headers.entrySet()) { + recordHeaders.add(header.getKey(), header.getValue().getBytes(UTF_8)); + } + OutgoingKafkaRecordMetadata metadata = OutgoingKafkaRecordMetadata.builder() + .withHeaders(recordHeaders) + .build(); + return Message.of("{}") + .addMetadata(metadata); + } +} diff --git a/engine/src/test/java/com/redhat/cloud/notifications/processors/email/EmailActorsResolverTest.java b/engine/src/test/java/com/redhat/cloud/notifications/processors/email/EmailActorsResolverTest.java index 72f2e6a62b..2a7ad54ecf 100644 --- a/engine/src/test/java/com/redhat/cloud/notifications/processors/email/EmailActorsResolverTest.java +++ b/engine/src/test/java/com/redhat/cloud/notifications/processors/email/EmailActorsResolverTest.java @@ -8,12 +8,14 @@ import jakarta.inject.Inject; import org.junit.jupiter.api.Test; -import static com.redhat.cloud.notifications.processors.email.EmailActorsResolver.OPENSHIFT_SENDER; +import static com.redhat.cloud.notifications.processors.email.EmailActorsResolver.OPENSHIFT_SENDER_PROD; +import static com.redhat.cloud.notifications.processors.email.EmailActorsResolver.OPENSHIFT_SENDER_STAGE; import static com.redhat.cloud.notifications.processors.email.EmailActorsResolver.RH_INSIGHTS_SENDER; import static org.junit.jupiter.api.Assertions.assertEquals; @QuarkusTest public class EmailActorsResolverTest { + @Inject EmailActorsResolver emailActorsResolver; @@ -27,10 +29,24 @@ void testDefaultEmailSender() { } /** - * Tests that the OpenShift sender is returned for OCM events. + * Tests that the OpenShift sender is returned for OCM events originating from the stage source environment. + */ + @Test + void testOpenshiftClusterManagerStageEmailSender() { + Event event = buildOCMEvent("stage"); + assertEquals(OPENSHIFT_SENDER_STAGE, this.emailActorsResolver.getEmailSender(event), "unexpected email sender returned from the function under test"); + } + + /** + * Tests that the OpenShift sender is returned for OCM events originating from source environments other than stage. */ @Test - void testOpenshiftClusterManagerEmailSender() { + void testOpenshiftClusterManagerDefaultEmailSender() { + Event event = buildOCMEvent("prod"); + assertEquals(OPENSHIFT_SENDER_PROD, this.emailActorsResolver.getEmailSender(event), "unexpected email sender returned from the function under test"); + } + + private static Event buildOCMEvent(String sourceEnvironment) { Bundle bundle = new Bundle(); bundle.setName("openshift"); @@ -44,7 +60,8 @@ void testOpenshiftClusterManagerEmailSender() { Event event = new Event(); event.setEventType(eventType); + event.setSourceEnvironment(sourceEnvironment); - assertEquals(OPENSHIFT_SENDER, this.emailActorsResolver.getEmailSender(event), "unexpected email sender returned from the function under test"); + return event; } }