From be807036ee4e2da1d79cedce927ec86b2f528b70 Mon Sep 17 00:00:00 2001 From: Francisco Javier Tirado Sarti Date: Thu, 22 Feb 2024 13:59:06 +0100 Subject: [PATCH] [KOGITO-9276] Adding support for businnesKey to resume a process --- .../kie/kogito/process/ProcessInstances.java | 4 ++ .../event/impl/ProcessEventDispatcher.java | 38 +++++++++---------- .../impl/ProcessEventDispatcherTest.java | 6 ++- .../quarkus/workflows/AssuredTestUtils.java | 33 +++++++++++----- .../kogito/quarkus/workflows/EventFlowIT.java | 19 ++++++++-- 5 files changed, 66 insertions(+), 34 deletions(-) diff --git a/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessInstances.java b/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessInstances.java index d6233541d45..740713876de 100644 --- a/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessInstances.java +++ b/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessInstances.java @@ -29,6 +29,10 @@ default Optional> findById(String id) { Optional> findById(String id, ProcessInstanceReadMode mode); + default Optional> findByIdOrBusinessKey(String id) { + return findById(id).or(() -> stream().filter(pi -> id.equals(pi.businessKey())).findAny()); + } + Stream> stream(ProcessInstanceReadMode mode); default Stream> stream() { diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java index 48d46a04fd1..75c543f8c87 100644 --- a/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java @@ -37,9 +37,10 @@ import org.kie.kogito.correlation.SimpleCorrelation; import org.kie.kogito.event.DataEvent; import org.kie.kogito.event.EventDispatcher; -import org.kie.kogito.internal.utils.ConversionUtils; +import org.kie.kogito.event.cloudevents.CloudEventExtensionConstants; import org.kie.kogito.process.Process; import org.kie.kogito.process.ProcessInstance; +import org.kie.kogito.process.ProcessInstances; import org.kie.kogito.process.ProcessService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,20 +75,19 @@ public CompletableFuture> dispatch(String trigger, DataEvent< } return CompletableFuture.completedFuture(null); } - - final String kogitoReferenceId = resolveCorrelationId(event); - if (!ConversionUtils.isEmpty(kogitoReferenceId)) { - return CompletableFuture.supplyAsync(() -> handleMessageWithReference(trigger, event, kogitoReferenceId), executor); - } - - //if the trigger is for a start event (model converter is set only for start node) - if (modelConverter.isPresent()) { - return CompletableFuture.supplyAsync(() -> startNewInstance(trigger, event), executor); - } - if (LOGGER.isInfoEnabled()) { - LOGGER.info("No matches found for trigger {} in process {}. Skipping consumed message {}", trigger, process.id(), event); - } - return CompletableFuture.completedFuture(null); + return resolveCorrelationId(event) + .map(kogitoReferenceId -> CompletableFuture.supplyAsync(() -> handleMessageWithReference(trigger, event, kogitoReferenceId), executor)) + .orElseGet(() -> { + // if the trigger is for a start event (model converter is set only for start node) + if (modelConverter.isPresent()) { + return CompletableFuture.supplyAsync(() -> startNewInstance(trigger, event), executor); + } else { + if (LOGGER.isInfoEnabled()) { + LOGGER.info("No matches found for trigger {} in process {}. Skipping consumed message {}", trigger, process.id(), event); + } + return CompletableFuture.completedFuture(null); + } + }); } private Optional compositeCorrelation(DataEvent event) { @@ -95,10 +95,11 @@ private Optional compositeCorrelation(DataEvent event) correlationKeys.stream().map(k -> new SimpleCorrelation<>(k, resolve(event, k))).collect(Collectors.toSet()))) : Optional.empty(); } - private String resolveCorrelationId(DataEvent event) { + private Optional resolveCorrelationId(DataEvent event) { return compositeCorrelation(event).flatMap(process.correlations()::find) .map(CorrelationInstance::getCorrelatedId) - .orElseGet(event::getKogitoReferenceId); + .or(() -> Optional.ofNullable((String) event.getExtension(CloudEventExtensionConstants.BUSINESS_KEY))) + .or(() -> Optional.ofNullable(event.getKogitoReferenceId())); } private Object resolve(DataEvent event, String key) { @@ -117,8 +118,7 @@ private ProcessInstance handleMessageWithReference(String trigger, DataEvent< LOGGER.debug("Received message with reference id '{}' going to use it to send signal '{}'", instanceId, trigger); - return process.instances() - .findById(instanceId) + return process.instances().findByIdOrBusinessKey(instanceId) .map(instance -> { signalProcessInstance(trigger, instance.id(), event); return instance; diff --git a/api/kogito-events-core/src/test/java/org/kie/kogito/event/impl/ProcessEventDispatcherTest.java b/api/kogito-events-core/src/test/java/org/kie/kogito/event/impl/ProcessEventDispatcherTest.java index f7d4cda1ea0..497a38d8f11 100644 --- a/api/kogito-events-core/src/test/java/org/kie/kogito/event/impl/ProcessEventDispatcherTest.java +++ b/api/kogito-events-core/src/test/java/org/kie/kogito/event/impl/ProcessEventDispatcherTest.java @@ -78,6 +78,8 @@ void setup() { when(process.correlations()).thenReturn(correlationService); when(processInstances.findById(Mockito.anyString())).thenReturn(Optional.empty()); when(processInstances.findById("1")).thenReturn(Optional.of(processInstance)); + when(processInstances.findByIdOrBusinessKey(Mockito.anyString())).thenReturn(Optional.empty()); + when(processInstances.findByIdOrBusinessKey("1")).thenReturn(Optional.of(processInstance)); processService = mock(ProcessService.class); when(processService.createProcessInstance(eq(process), any(), any(), any(), any(), any(), any(), any())).thenReturn(processInstance); when(processService.signalProcessInstance(eq(process), any(), any(), any())).thenReturn(Optional.of(mock(DummyModel.class))); @@ -117,7 +119,7 @@ void testCloudEventNewInstanceWithoutReference() throws Exception { ArgumentCaptor signal = ArgumentCaptor.forClass(String.class); ArgumentCaptor referenceId = ArgumentCaptor.forClass(String.class); - verify(processInstances, never()).findById(any()); + verify(processInstances, never()).findByIdOrBusinessKey("invalidReference"); verify(processService, never()).signalProcessInstance(eq(process), any(), any(), signal.capture()); verify(processService, times(1)).createProcessInstance(eq(process), any(), any(DummyModel.class), any(), any(), signal.capture(), referenceId.capture(), isNull()); @@ -135,7 +137,7 @@ void testCloudEventNewInstanceWithReference() throws Exception { ArgumentCaptor referenceId = ArgumentCaptor.forClass(String.class); ArgumentCaptor>> headers = ArgumentCaptor.forClass(Map.class); - verify(processInstances, times(1)).findById("invalidReference"); + verify(processInstances, times(1)).findByIdOrBusinessKey("invalidReference"); verify(processService, never()).signalProcessInstance(eq(process), any(), any(), signal.capture()); verify(processService, times(1)).createProcessInstance(eq(process), any(), any(DummyModel.class), headers.capture(), any(), signal.capture(), referenceId.capture(), isNull()); diff --git a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/AssuredTestUtils.java b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/AssuredTestUtils.java index 92a031c10e9..b620b2de526 100644 --- a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/AssuredTestUtils.java +++ b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/AssuredTestUtils.java @@ -22,13 +22,16 @@ import java.time.Duration; import java.time.OffsetDateTime; import java.util.Collections; +import java.util.Optional; import java.util.UUID; import org.kie.kogito.event.CloudEventMarshaller; +import org.kie.kogito.event.cloudevents.CloudEventExtensionConstants; import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; import io.restassured.http.ContentType; +import io.restassured.specification.RequestSpecification; import static io.restassured.RestAssured.given; import static java.util.concurrent.TimeUnit.SECONDS; @@ -40,8 +43,11 @@ private AssuredTestUtils() { } static String startProcess(String flowName) { - String id = startProcessNoCheck(flowName); + return startProcess(flowName, Optional.empty()); + } + static String startProcess(String flowName, Optional businessKey) { + String id = startProcessNoCheck(flowName, businessKey); given() .contentType(ContentType.JSON) .accept(ContentType.JSON) @@ -52,11 +58,16 @@ static String startProcess(String flowName) { } static String startProcessNoCheck(String flowName) { - return given() + return startProcessNoCheck(flowName, Optional.empty()); + } + + static String startProcessNoCheck(String flowName, Optional businessKey) { + RequestSpecification body = given() .contentType(ContentType.JSON) .when() - .body(Collections.singletonMap("workflowdata", Collections.emptyMap())) - .post("/" + flowName) + .body(Collections.singletonMap("workflowdata", Collections.emptyMap())); + businessKey.ifPresent(key -> body.queryParam("businessKey", key)); + return body.post("/" + flowName) .then() .statusCode(201) .extract().path("id"); @@ -73,15 +84,19 @@ static void waitForFinish(String flowName, String id, Duration duration) { .statusCode(404)); } - static CloudEvent buildCloudEvent(String id, String type, CloudEventMarshaller marshaller) { - return CloudEventBuilder.v1() + static CloudEvent buildCloudEvent(String id, Optional businessKey, String type, CloudEventMarshaller marshaller) { + io.cloudevents.core.v1.CloudEventBuilder builder = CloudEventBuilder.v1() .withId(UUID.randomUUID().toString()) .withSource(URI.create("")) .withType(type) .withTime(OffsetDateTime.now()) - .withExtension("kogitoprocrefid", id) - .withData(marshaller.cloudEventDataFactory().apply(Collections.singletonMap(type, "This has been injected by the event"))) - .build(); + .withData(marshaller.cloudEventDataFactory().apply(Collections.singletonMap(type, "This has been injected by the event"))); + businessKey.ifPresentOrElse(key -> builder.withExtension(CloudEventExtensionConstants.BUSINESS_KEY, key), () -> builder.withExtension(CloudEventExtensionConstants.PROCESS_REFERENCE_ID, id)); + return builder.build(); + } + + static CloudEvent buildCloudEvent(String id, String type, CloudEventMarshaller marshaller) { + return buildCloudEvent(id, Optional.empty(), type, marshaller); } } diff --git a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java index 50fcb8e822f..91ede4a21f9 100644 --- a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java +++ b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.time.Duration; import java.util.Map; +import java.util.Optional; import org.awaitility.core.ConditionTimeoutException; import org.junit.jupiter.api.BeforeAll; @@ -57,7 +58,7 @@ static void init() { @Test void testNotStartingEvent() throws IOException { - doIt("nonStartEvent", "move"); + doIt("nonStartEvent", Optional.of("manolo"), "move"); } @Test @@ -108,25 +109,35 @@ void testNotStartingMultipleEventRainy() throws IOException { } private void sendEvents(String id, String... eventTypes) throws IOException { + sendEvents(id, Optional.empty(), eventTypes); + } + + private void sendEvents(String id, Optional businessKey, String... eventTypes) throws IOException { for (String eventType : eventTypes) { given() .contentType(ContentType.JSON) .when() - .body(generateCloudEvent(id, eventType)) + .body(generateCloudEvent(id, businessKey, eventType)) .post("/" + eventType) .then() .statusCode(202); } } + private void doIt(String flowName, Optional businessKey, String... eventTypes) throws IOException { + String id = startProcess(flowName, businessKey); + sendEvents(id, businessKey, eventTypes); + waitForFinish(flowName, id, Duration.ofSeconds(15)); + } + private void doIt(String flowName, String... eventTypes) throws IOException { String id = startProcess(flowName); sendEvents(id, eventTypes); waitForFinish(flowName, id, Duration.ofSeconds(15)); } - private byte[] generateCloudEvent(String id, String type) throws IOException { + private byte[] generateCloudEvent(String id, Optional businessKey, String type) throws IOException { CloudEventMarshaller marshaller = marshallers.getOrDefault(type, defaultMarshaller); - return marshaller.marshall(buildCloudEvent(id, type, marshaller)); + return marshaller.marshall(buildCloudEvent(id, businessKey, type, marshaller)); } }