Skip to content

Commit

Permalink
RHCLOUD-29896 Change the OCM email sender name based on the source env (
Browse files Browse the repository at this point in the history
RedHatInsights#2389)

* RHCLOUD-29896 Change the OCM email sender name based on the source env

* Fix test

* Prevent NPEs

* Apply review suggestion
  • Loading branch information
gwenneg authored Dec 8, 2023
1 parent 15598df commit b810d44
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.Date;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;

Expand Down Expand Up @@ -74,15 +75,20 @@ public class Event {

private String renderedDrawerNotification;

private String sourceEnvironment;

@Transient
private EventWrapper<?, ?> eventWrapper;

public Event() { }

public Event(EventType eventType, String payload, EventWrapper<?, ?> eventWrapper) {
public Event(EventType eventType, String payload, EventWrapper<?, ?> eventWrapper, Optional<String> sourceEnvironment) {
this(eventWrapper.getAccountId(), eventWrapper.getOrgId(), eventType, eventWrapper.getId());
this.payload = payload;
this.eventWrapper = eventWrapper;
if (sourceEnvironment.isPresent()) {
this.sourceEnvironment = sourceEnvironment.get();
}
}

public Event(String accountId, String orgId, EventType eventType, UUID eventId) {
Expand Down Expand Up @@ -236,6 +242,14 @@ public void setRenderedDrawerNotification(String renderedDrawerNotification) {
this.renderedDrawerNotification = renderedDrawerNotification;
}

public String getSourceEnvironment() {
return sourceEnvironment;
}

public void setSourceEnvironment(String sourceEnvironment) {
this.sourceEnvironment = sourceEnvironment;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE event
ADD COLUMN source_environment TEXT;
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class EventConsumer {
static final String TAG_KEY_EVENT_TYPE_FQN = "event-type-fqn";

private static final String EVENT_TYPE_NOT_FOUND_MSG = "No event type found for key: %s";
private static final String SOURCE_ENVIRONMENT_HEADER = "rh-source-environment";

@Inject
MeterRegistry registry;
Expand All @@ -71,6 +72,9 @@ public class EventConsumer {
@Inject
CloudEventTransformerFactory cloudEventTransformerFactory;

@Inject
KafkaHeadersExtractor kafkaHeadersExtractor;

ConsoleCloudEventParser cloudEventParser = new ConsoleCloudEventParser();

private Counter rejectedCounter;
Expand Down Expand Up @@ -107,14 +111,19 @@ public CompletionStage<Void> process(Message<String> message) {
* we now have a bundle/app/eventType triplet or a fully qualified name for the event type.
*/

Map<String, Optional<String>> kafkaHeaders = kafkaHeadersExtractor.extract(message,
MESSAGE_ID_HEADER,
SOURCE_ENVIRONMENT_HEADER
);

/*
* Step 2
* The message ID is extracted from the event data - if it is not present we fallback to the kafka headers
* It can be null for now to give the onboarded
* apps time to change their integration and start sending the new header. The message ID will become
* mandatory with cloud events. We may want to throw an exception when it is null.
*/
final UUID messageId = getMessageId(eventWrapper, message);
final UUID messageId = getMessageId(eventWrapper, kafkaHeaders.get(MESSAGE_ID_HEADER));

String msgId = messageId == null ? "null" : messageId.toString();
Log.infof("Processing received event [id=%s, %s=%s, orgId=%s, %s]",
Expand Down Expand Up @@ -178,7 +187,8 @@ public CompletionStage<Void> process(Message<String> message) {
* Step 6
* The EventType was found. It's time to create an Event from the current message and persist it.
*/
Event event = new Event(eventType, payload, eventWrapperToProcess);
Optional<String> sourceEnvironmentHeader = kafkaHeaders.get(SOURCE_ENVIRONMENT_HEADER);
Event event = new Event(eventType, payload, eventWrapperToProcess, sourceEnvironmentHeader);
if (event.getId() == null) {
// NOTIF-499 If there is no ID provided whatsoever we create one.
event.setId(Objects.requireNonNullElseGet(messageId, UUID::randomUUID));
Expand Down Expand Up @@ -241,10 +251,10 @@ public CompletionStage<Void> process(Message<String> message) {
}
}

private UUID getMessageId(EventWrapper<?, ?> eventWrapper, Message<String> message) {
private UUID getMessageId(EventWrapper<?, ?> eventWrapper, Optional<String> messageIdHeader) {
UUID messageId = eventWrapper.getId();
if (messageId == null) {
messageId = kafkaMessageDeduplicator.findMessageId(eventWrapper.getKey(), message);
messageId = kafkaMessageDeduplicator.validateMessageId(eventWrapper.getKey(), messageIdHeader);
}

return messageId;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.redhat.cloud.notifications.events;

import io.quarkus.logging.Log;
import io.smallrye.reactive.messaging.kafka.api.KafkaMessageMetadata;
import jakarta.enterprise.context.ApplicationScoped;
import org.apache.kafka.common.header.Header;
import org.eclipse.microprofile.reactive.messaging.Message;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;

import static java.nio.charset.StandardCharsets.UTF_8;

@ApplicationScoped
public class KafkaHeadersExtractor {

public <T> Map<String, Optional<String>> extract(Message<T> message, String... headerKeys) {
Map<String, Optional<String>> headers = new HashMap<>();
if (headerKeys.length > 0) {
Optional<KafkaMessageMetadata> metadata = message.getMetadata(KafkaMessageMetadata.class);
if (metadata.isPresent()) {
for (String headerKey : headerKeys) {
Iterator<Header> headerValues = metadata.get().getHeaders().headers(headerKey).iterator();
if (headerValues.hasNext()) {
Header header = headerValues.next();
if (header.value() != null) {
String headerValue = new String(header.value(), UTF_8);
headers.put(headerKey, Optional.of(headerValue));
}
if (headerValues.hasNext()) {
Log.warnf(
"Processed a Kafka payload with multiple [%s] header values. The emitter of that payload must change their integration and send only one value. Payload: %s",
headerKey,
message.getPayload());
}
}
}
}
}
// The returned Map always contains all header keys to prevent NPE.
for (String headerKey : headerKeys) {
headers.putIfAbsent(headerKey, Optional.empty());
}
return headers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,16 @@
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.quarkus.logging.Log;
import io.smallrye.reactive.messaging.kafka.api.KafkaMessageMetadata;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.persistence.EntityManager;
import jakarta.persistence.NoResultException;
import jakarta.transaction.Transactional;
import org.apache.kafka.common.header.Header;
import org.eclipse.microprofile.reactive.messaging.Message;

import java.util.Iterator;
import java.util.Optional;
import java.util.UUID;

import static java.nio.charset.StandardCharsets.UTF_8;

@ApplicationScoped
public class KafkaMessageDeduplicator {

Expand Down Expand Up @@ -49,47 +43,30 @@ void initCounters() {
}

/**
* Extracts the message ID from a Kafka message header value. If multiple header values are available, the first one
* Validates the message ID retrieved from the Kafka message headers. If multiple header values are available, the first one
* will be used and the other ones will be ignored. An invalid header value will be counted and logged, but won't
* interrupt the message processing: the deduplication will be disabled for the message.
* @deprecated The rh-message-id header will be replaced by the cloud events or actions id field soon.
*/
public UUID findMessageId(EventTypeKey eventTypeKey, Message<String> message) {
boolean found = false;
Optional<KafkaMessageMetadata> metadata = message.getMetadata(KafkaMessageMetadata.class);
if (metadata.isPresent()) {
Iterator<Header> headers = metadata.get().getHeaders().headers(MESSAGE_ID_HEADER).iterator();
if (headers.hasNext()) {
found = true;
Header header = headers.next();
if (header.value() == null) {
invalidMessageIdCounter.increment();
Log.warnf("Application sent an EventType(%s) with an invalid Kafka header [%s=null]. They must change their " +
"integration and send a non-null value.", eventTypeKey, MESSAGE_ID_HEADER);
} else {
String headerValue = new String(header.value(), UTF_8);
try {
UUID messageId = UUID.fromString(headerValue);
// If the UUID version is 4, then its 15th character has to be "4".
if (!headerValue.substring(14, 15).equals(ACCEPTED_UUID_VERSION)) {
throw new IllegalArgumentException("Wrong UUID version received");
}
validMessageIdCounter.increment();
Log.tracef("Application sent an EventType(%s) with a valid Kafka header [%s=%s]",
eventTypeKey, MESSAGE_ID_HEADER, headerValue);
return messageId;
} catch (IllegalArgumentException e) {
invalidMessageIdCounter.increment();
Log.warnf("Application sent an EventType(%s) with an invalid Kafka header [%s=%s]. They must change their " +
"integration and send a valid UUID (version 4).", eventTypeKey, MESSAGE_ID_HEADER, headerValue);
}
@Deprecated(forRemoval = true)
public UUID validateMessageId(EventTypeKey eventTypeKey, Optional<String> messageIdHeader) {
if (messageIdHeader.isPresent()) {
try {
UUID messageId = UUID.fromString(messageIdHeader.get());
// If the UUID version is 4, then its 15th character has to be "4".
if (!messageIdHeader.get().substring(14, 15).equals(ACCEPTED_UUID_VERSION)) {
throw new IllegalArgumentException("Wrong UUID version received");
}
validMessageIdCounter.increment();
Log.tracef("Application sent an EventType(%s) with a valid Kafka header [%s=%s]",
eventTypeKey, MESSAGE_ID_HEADER, messageIdHeader.get());
return messageId;
} catch (IllegalArgumentException e) {
invalidMessageIdCounter.increment();
Log.warnf("Application sent an EventType(%s) with an invalid Kafka header [%s=%s]. They must change their " +
"integration and send a valid UUID (version 4).", eventTypeKey, MESSAGE_ID_HEADER, messageIdHeader.get());
}
if (headers.hasNext()) {
Log.warnf("Application sent an EventType(%s) with multiple Kafka headers [%s]. They must change their " +
"integration and send only one value.", eventTypeKey, MESSAGE_ID_HEADER);
}
}
if (!found) {
} else {
missingMessageIdCounter.increment();
Log.tracef("Application sent an EventType(%s) but did not send any Kafka header [%s]",
eventTypeKey, MESSAGE_ID_HEADER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ public class EmailActorsResolver {
* ConsoleDot applications will use.
*/
public static final String RH_INSIGHTS_SENDER = "\"Red Hat Insights\" [email protected]";
public static final String OPENSHIFT_SENDER = "\"Red Hat OpenShift\" [email protected]";
public static final String OPENSHIFT_SENDER_STAGE = "\"Red Hat OpenShift (staging)\" [email protected]";
public static final String OPENSHIFT_SENDER_PROD = "\"Red Hat OpenShift\" [email protected]";

private static final String STAGE_ENVIRONMENT = "stage";

/**
* Determines which sender should be set in the email from the given event.
Expand All @@ -26,7 +29,11 @@ public String getEmailSender(final Event event) {
String bundle = event.getEventType().getApplication().getBundle().getName();
String application = event.getEventType().getApplication().getName();
if ("openshift".equals(bundle) && "cluster-manager".equals(application)) {
return OPENSHIFT_SENDER;
if (STAGE_ENVIRONMENT.equals(event.getSourceEnvironment())) {
return OPENSHIFT_SENDER_STAGE;
} else {
return OPENSHIFT_SENDER_PROD;
}
} else {
return RH_INSIGHTS_SENDER;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,14 +287,14 @@ void testNullMessageId() {
TAG_KEY_APPLICATION, action.getApplication(),
TAG_KEY_EVENT_TYPE_FQN, ""
).count());
micrometerAssertionHelper.assertCounterIncrement(MESSAGE_ID_INVALID_COUNTER_NAME, 1);
micrometerAssertionHelper.assertCounterIncrement(MESSAGE_ID_MISSING_COUNTER_NAME, 1);
assertNoCounterIncrement(
REJECTED_COUNTER_NAME,
PROCESSING_ERROR_COUNTER_NAME,
PROCESSING_EXCEPTION_COUNTER_NAME,
DUPLICATE_COUNTER_NAME,
MESSAGE_ID_VALID_COUNTER_NAME,
MESSAGE_ID_MISSING_COUNTER_NAME
MESSAGE_ID_INVALID_COUNTER_NAME
);
verifyExactlyOneProcessing(eventType, payload, action, false);
verify(kafkaMessageDeduplicator, times(1)).registerMessageId(null);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.redhat.cloud.notifications.events;

import io.quarkus.test.junit.QuarkusTest;
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
import jakarta.inject.Inject;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.junit.jupiter.api.Test;

import java.util.Map;
import java.util.Optional;
import java.util.UUID;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

@QuarkusTest
public class KafkaHeadersExtractorTest {

@Inject
KafkaHeadersExtractor kafkaHeadersExtractor;

@Test
void testWithNoHeaderKeys() {
assertTrue(kafkaHeadersExtractor.extract(Message.of("{}")).isEmpty());
}

@Test
void test() {
String messageId = UUID.randomUUID().toString();
String sourceEnvironment = "stage";

Message<String> message = buildMessageWithHeaders(Map.of(
"rh-message-id", messageId,
"rh-source-environment", sourceEnvironment,
"rh-unused-header", "whatever"
));

Map<String, Optional<String>> extractedHeaders = kafkaHeadersExtractor.extract(message,
"rh-message-id",
"rh-unknown-header",
"rh-source-environment"
);

assertEquals(messageId, extractedHeaders.get("rh-message-id").get());
assertTrue(extractedHeaders.get("rh-unknown-header").isEmpty());
assertEquals(sourceEnvironment, extractedHeaders.get("rh-source-environment").get());
}

private static Message<String> buildMessageWithHeaders(Map<String, String> headers) {
Headers recordHeaders = new RecordHeaders();
for (Map.Entry<String, String> header : headers.entrySet()) {
recordHeaders.add(header.getKey(), header.getValue().getBytes(UTF_8));
}
OutgoingKafkaRecordMetadata<String> metadata = OutgoingKafkaRecordMetadata.<String>builder()
.withHeaders(recordHeaders)
.build();
return Message.of("{}")
.addMetadata(metadata);
}
}
Loading

0 comments on commit b810d44

Please sign in to comment.