From e02bf98e04dfe847609271823129da3f3686e217 Mon Sep 17 00:00:00 2001 From: Francisco Javier Tirado Sarti Date: Thu, 22 Feb 2024 13:59:06 +0100 Subject: [PATCH] [KOGITO-9276] Adding support for businnesKey to resume a process --- .../event/impl/ProcessEventDispatcher.java | 34 +++++++++---------- .../quarkus/workflows/AssuredTestUtils.java | 33 +++++++++++++----- .../kogito/quarkus/workflows/EventFlowIT.java | 19 ++++++++--- 3 files changed, 56 insertions(+), 30 deletions(-) diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java index 48d46a04fd1..0f076984357 100644 --- a/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java @@ -37,7 +37,7 @@ import org.kie.kogito.correlation.SimpleCorrelation; import org.kie.kogito.event.DataEvent; import org.kie.kogito.event.EventDispatcher; -import org.kie.kogito.internal.utils.ConversionUtils; +import org.kie.kogito.event.cloudevents.CloudEventExtensionConstants; import org.kie.kogito.process.Process; import org.kie.kogito.process.ProcessInstance; import org.kie.kogito.process.ProcessService; @@ -74,20 +74,19 @@ public CompletableFuture> dispatch(String trigger, DataEvent< } return CompletableFuture.completedFuture(null); } - - final String kogitoReferenceId = resolveCorrelationId(event); - if (!ConversionUtils.isEmpty(kogitoReferenceId)) { - return CompletableFuture.supplyAsync(() -> handleMessageWithReference(trigger, event, kogitoReferenceId), executor); - } - - //if the trigger is for a start event (model converter is set only for start node) - if (modelConverter.isPresent()) { - return CompletableFuture.supplyAsync(() -> startNewInstance(trigger, event), executor); - } - if (LOGGER.isInfoEnabled()) { - LOGGER.info("No matches found for trigger {} in process {}. Skipping consumed message {}", trigger, process.id(), event); - } - return CompletableFuture.completedFuture(null); + return resolveCorrelationId(event) + .map(kogitoReferenceId -> CompletableFuture.supplyAsync(() -> handleMessageWithReference(trigger, event, kogitoReferenceId), executor)) + .orElseGet(() -> { + // if the trigger is for a start event (model converter is set only for start node) + if (modelConverter.isPresent()) { + return CompletableFuture.supplyAsync(() -> startNewInstance(trigger, event), executor); + } else { + if (LOGGER.isInfoEnabled()) { + LOGGER.info("No matches found for trigger {} in process {}. Skipping consumed message {}", trigger, process.id(), event); + } + return CompletableFuture.completedFuture(null); + } + }); } private Optional compositeCorrelation(DataEvent event) { @@ -95,10 +94,11 @@ private Optional compositeCorrelation(DataEvent event) correlationKeys.stream().map(k -> new SimpleCorrelation<>(k, resolve(event, k))).collect(Collectors.toSet()))) : Optional.empty(); } - private String resolveCorrelationId(DataEvent event) { + private Optional resolveCorrelationId(DataEvent event) { return compositeCorrelation(event).flatMap(process.correlations()::find) .map(CorrelationInstance::getCorrelatedId) - .orElseGet(event::getKogitoReferenceId); + .or(() -> Optional.ofNullable((String) event.getExtension(CloudEventExtensionConstants.BUSINESS_KEY))) + .or(() -> Optional.ofNullable(event.getKogitoReferenceId())); } private Object resolve(DataEvent event, String key) { diff --git a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/AssuredTestUtils.java b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/AssuredTestUtils.java index 92a031c10e9..93caef6e121 100644 --- a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/AssuredTestUtils.java +++ b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/AssuredTestUtils.java @@ -22,13 +22,16 @@ import java.time.Duration; import java.time.OffsetDateTime; import java.util.Collections; +import java.util.Optional; import java.util.UUID; import org.kie.kogito.event.CloudEventMarshaller; +import org.kie.kogito.event.cloudevents.CloudEventExtensionConstants; import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; import io.restassured.http.ContentType; +import io.restassured.specification.RequestSpecification; import static io.restassured.RestAssured.given; import static java.util.concurrent.TimeUnit.SECONDS; @@ -40,8 +43,11 @@ private AssuredTestUtils() { } static String startProcess(String flowName) { - String id = startProcessNoCheck(flowName); + return startProcess(flowName, Optional.empty()); + } + static String startProcess(String flowName, Optional businessKey) { + String id = startProcessNoCheck(flowName, businessKey); given() .contentType(ContentType.JSON) .accept(ContentType.JSON) @@ -52,11 +58,16 @@ static String startProcess(String flowName) { } static String startProcessNoCheck(String flowName) { - return given() + return startProcessNoCheck(flowName, Optional.empty()); + } + + static String startProcessNoCheck(String flowName, Optional businessKey) { + RequestSpecification body = given() .contentType(ContentType.JSON) .when() - .body(Collections.singletonMap("workflowdata", Collections.emptyMap())) - .post("/" + flowName) + .body(Collections.singletonMap("workflowdata", Collections.emptyMap())); + businessKey.ifPresent(key -> body.queryParam("businessKey", key)); + return body.post("/" + flowName) .then() .statusCode(201) .extract().path("id"); @@ -73,15 +84,19 @@ static void waitForFinish(String flowName, String id, Duration duration) { .statusCode(404)); } - static CloudEvent buildCloudEvent(String id, String type, CloudEventMarshaller marshaller) { - return CloudEventBuilder.v1() + static CloudEvent buildCloudEvent(String id, Optional businessKey, String type, CloudEventMarshaller marshaller) { + io.cloudevents.core.v1.CloudEventBuilder builder = CloudEventBuilder.v1() .withId(UUID.randomUUID().toString()) .withSource(URI.create("")) .withType(type) .withTime(OffsetDateTime.now()) - .withExtension("kogitoprocrefid", id) - .withData(marshaller.cloudEventDataFactory().apply(Collections.singletonMap(type, "This has been injected by the event"))) - .build(); + .withData(marshaller.cloudEventDataFactory().apply(Collections.singletonMap(type, "This has been injected by the event"))); + businessKey.ifPresentOrElse(key -> builder.withExtension(CloudEventExtensionConstants.BUSINESS_KEY, key), () -> builder.withExtension("kogitoprocrefid", id)); + return builder.build(); + } + + static CloudEvent buildCloudEvent(String id, String type, CloudEventMarshaller marshaller) { + return buildCloudEvent(id, Optional.empty(), type, marshaller); } } diff --git a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java index 50fcb8e822f..91ede4a21f9 100644 --- a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java +++ b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.time.Duration; import java.util.Map; +import java.util.Optional; import org.awaitility.core.ConditionTimeoutException; import org.junit.jupiter.api.BeforeAll; @@ -57,7 +58,7 @@ static void init() { @Test void testNotStartingEvent() throws IOException { - doIt("nonStartEvent", "move"); + doIt("nonStartEvent", Optional.of("manolo"), "move"); } @Test @@ -108,25 +109,35 @@ void testNotStartingMultipleEventRainy() throws IOException { } private void sendEvents(String id, String... eventTypes) throws IOException { + sendEvents(id, Optional.empty(), eventTypes); + } + + private void sendEvents(String id, Optional businessKey, String... eventTypes) throws IOException { for (String eventType : eventTypes) { given() .contentType(ContentType.JSON) .when() - .body(generateCloudEvent(id, eventType)) + .body(generateCloudEvent(id, businessKey, eventType)) .post("/" + eventType) .then() .statusCode(202); } } + private void doIt(String flowName, Optional businessKey, String... eventTypes) throws IOException { + String id = startProcess(flowName, businessKey); + sendEvents(id, businessKey, eventTypes); + waitForFinish(flowName, id, Duration.ofSeconds(15)); + } + private void doIt(String flowName, String... eventTypes) throws IOException { String id = startProcess(flowName); sendEvents(id, eventTypes); waitForFinish(flowName, id, Duration.ofSeconds(15)); } - private byte[] generateCloudEvent(String id, String type) throws IOException { + private byte[] generateCloudEvent(String id, Optional businessKey, String type) throws IOException { CloudEventMarshaller marshaller = marshallers.getOrDefault(type, defaultMarshaller); - return marshaller.marshall(buildCloudEvent(id, type, marshaller)); + return marshaller.marshall(buildCloudEvent(id, businessKey, type, marshaller)); } }