From 6fee9c7d62c4a1b72e916518c597319a0733c2f4 Mon Sep 17 00:00:00 2001 From: Francisco Javier Tirado Sarti Date: Thu, 22 Feb 2024 13:59:06 +0100 Subject: [PATCH 1/3] [KOGITO-9276] Adding support for businnesKey to resume a process --- .../jdbc/JDBCProcessInstances.java | 11 ++++++ .../kie/kogito/process/ProcessInstances.java | 4 ++ .../event/impl/ProcessEventDispatcher.java | 38 +++++++++---------- .../impl/ProcessEventDispatcherTest.java | 6 ++- .../quarkus/workflows/AssuredTestUtils.java | 33 +++++++++++----- .../kogito/quarkus/workflows/EventFlowIT.java | 19 ++++++++-- 6 files changed, 77 insertions(+), 34 deletions(-) diff --git a/addons/common/persistence/jdbc/src/main/java/org/kie/kogito/persistence/jdbc/JDBCProcessInstances.java b/addons/common/persistence/jdbc/src/main/java/org/kie/kogito/persistence/jdbc/JDBCProcessInstances.java index 789da4f5c7b..b451b4b5866 100644 --- a/addons/common/persistence/jdbc/src/main/java/org/kie/kogito/persistence/jdbc/JDBCProcessInstances.java +++ b/addons/common/persistence/jdbc/src/main/java/org/kie/kogito/persistence/jdbc/JDBCProcessInstances.java @@ -101,6 +101,17 @@ public Optional> findById(String id, ProcessInstanceReadMode return repository.findByIdInternal(process.id(), process.version(), UUID.fromString(id)).map(r -> unmarshall(r, mode)); } + @Override + public Optional> findByIdOrBusinessKey(String id) { + LOGGER.debug("Find process instance id or businessKey: {}", id); + try { + UUID uuid = UUID.fromString(id); + return repository.findByIdInternal(process.id(), process.version(), uuid).map(r -> unmarshall(r, ProcessInstanceReadMode.MUTABLE)); + } catch (IllegalArgumentException ex) { + return stream(ProcessInstanceReadMode.MUTABLE).filter(pi -> id.equals(pi.businessKey())).findAny(); + } + } + @Override public Stream> stream(ProcessInstanceReadMode mode) { LOGGER.debug("Find process instance values using mode: {}", mode); diff --git a/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessInstances.java b/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessInstances.java index d6233541d45..740713876de 100644 --- a/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessInstances.java +++ b/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessInstances.java @@ -29,6 +29,10 @@ default Optional> findById(String id) { Optional> findById(String id, ProcessInstanceReadMode mode); + default Optional> findByIdOrBusinessKey(String id) { + return findById(id).or(() -> stream().filter(pi -> id.equals(pi.businessKey())).findAny()); + } + Stream> stream(ProcessInstanceReadMode mode); default Stream> stream() { diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java index 48d46a04fd1..75c543f8c87 100644 --- a/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java @@ -37,9 +37,10 @@ import org.kie.kogito.correlation.SimpleCorrelation; import org.kie.kogito.event.DataEvent; import org.kie.kogito.event.EventDispatcher; -import org.kie.kogito.internal.utils.ConversionUtils; +import org.kie.kogito.event.cloudevents.CloudEventExtensionConstants; import org.kie.kogito.process.Process; import org.kie.kogito.process.ProcessInstance; +import org.kie.kogito.process.ProcessInstances; import org.kie.kogito.process.ProcessService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,20 +75,19 @@ public CompletableFuture> dispatch(String trigger, DataEvent< } return CompletableFuture.completedFuture(null); } - - final String kogitoReferenceId = resolveCorrelationId(event); - if (!ConversionUtils.isEmpty(kogitoReferenceId)) { - return CompletableFuture.supplyAsync(() -> handleMessageWithReference(trigger, event, kogitoReferenceId), executor); - } - - //if the trigger is for a start event (model converter is set only for start node) - if (modelConverter.isPresent()) { - return CompletableFuture.supplyAsync(() -> startNewInstance(trigger, event), executor); - } - if (LOGGER.isInfoEnabled()) { - LOGGER.info("No matches found for trigger {} in process {}. Skipping consumed message {}", trigger, process.id(), event); - } - return CompletableFuture.completedFuture(null); + return resolveCorrelationId(event) + .map(kogitoReferenceId -> CompletableFuture.supplyAsync(() -> handleMessageWithReference(trigger, event, kogitoReferenceId), executor)) + .orElseGet(() -> { + // if the trigger is for a start event (model converter is set only for start node) + if (modelConverter.isPresent()) { + return CompletableFuture.supplyAsync(() -> startNewInstance(trigger, event), executor); + } else { + if (LOGGER.isInfoEnabled()) { + LOGGER.info("No matches found for trigger {} in process {}. Skipping consumed message {}", trigger, process.id(), event); + } + return CompletableFuture.completedFuture(null); + } + }); } private Optional compositeCorrelation(DataEvent event) { @@ -95,10 +95,11 @@ private Optional compositeCorrelation(DataEvent event) correlationKeys.stream().map(k -> new SimpleCorrelation<>(k, resolve(event, k))).collect(Collectors.toSet()))) : Optional.empty(); } - private String resolveCorrelationId(DataEvent event) { + private Optional resolveCorrelationId(DataEvent event) { return compositeCorrelation(event).flatMap(process.correlations()::find) .map(CorrelationInstance::getCorrelatedId) - .orElseGet(event::getKogitoReferenceId); + .or(() -> Optional.ofNullable((String) event.getExtension(CloudEventExtensionConstants.BUSINESS_KEY))) + .or(() -> Optional.ofNullable(event.getKogitoReferenceId())); } private Object resolve(DataEvent event, String key) { @@ -117,8 +118,7 @@ private ProcessInstance handleMessageWithReference(String trigger, DataEvent< LOGGER.debug("Received message with reference id '{}' going to use it to send signal '{}'", instanceId, trigger); - return process.instances() - .findById(instanceId) + return process.instances().findByIdOrBusinessKey(instanceId) .map(instance -> { signalProcessInstance(trigger, instance.id(), event); return instance; diff --git a/api/kogito-events-core/src/test/java/org/kie/kogito/event/impl/ProcessEventDispatcherTest.java b/api/kogito-events-core/src/test/java/org/kie/kogito/event/impl/ProcessEventDispatcherTest.java index f7d4cda1ea0..497a38d8f11 100644 --- a/api/kogito-events-core/src/test/java/org/kie/kogito/event/impl/ProcessEventDispatcherTest.java +++ b/api/kogito-events-core/src/test/java/org/kie/kogito/event/impl/ProcessEventDispatcherTest.java @@ -78,6 +78,8 @@ void setup() { when(process.correlations()).thenReturn(correlationService); when(processInstances.findById(Mockito.anyString())).thenReturn(Optional.empty()); when(processInstances.findById("1")).thenReturn(Optional.of(processInstance)); + when(processInstances.findByIdOrBusinessKey(Mockito.anyString())).thenReturn(Optional.empty()); + when(processInstances.findByIdOrBusinessKey("1")).thenReturn(Optional.of(processInstance)); processService = mock(ProcessService.class); when(processService.createProcessInstance(eq(process), any(), any(), any(), any(), any(), any(), any())).thenReturn(processInstance); when(processService.signalProcessInstance(eq(process), any(), any(), any())).thenReturn(Optional.of(mock(DummyModel.class))); @@ -117,7 +119,7 @@ void testCloudEventNewInstanceWithoutReference() throws Exception { ArgumentCaptor signal = ArgumentCaptor.forClass(String.class); ArgumentCaptor referenceId = ArgumentCaptor.forClass(String.class); - verify(processInstances, never()).findById(any()); + verify(processInstances, never()).findByIdOrBusinessKey("invalidReference"); verify(processService, never()).signalProcessInstance(eq(process), any(), any(), signal.capture()); verify(processService, times(1)).createProcessInstance(eq(process), any(), any(DummyModel.class), any(), any(), signal.capture(), referenceId.capture(), isNull()); @@ -135,7 +137,7 @@ void testCloudEventNewInstanceWithReference() throws Exception { ArgumentCaptor referenceId = ArgumentCaptor.forClass(String.class); ArgumentCaptor>> headers = ArgumentCaptor.forClass(Map.class); - verify(processInstances, times(1)).findById("invalidReference"); + verify(processInstances, times(1)).findByIdOrBusinessKey("invalidReference"); verify(processService, never()).signalProcessInstance(eq(process), any(), any(), signal.capture()); verify(processService, times(1)).createProcessInstance(eq(process), any(), any(DummyModel.class), headers.capture(), any(), signal.capture(), referenceId.capture(), isNull()); diff --git a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/AssuredTestUtils.java b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/AssuredTestUtils.java index 92a031c10e9..b620b2de526 100644 --- a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/AssuredTestUtils.java +++ b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/AssuredTestUtils.java @@ -22,13 +22,16 @@ import java.time.Duration; import java.time.OffsetDateTime; import java.util.Collections; +import java.util.Optional; import java.util.UUID; import org.kie.kogito.event.CloudEventMarshaller; +import org.kie.kogito.event.cloudevents.CloudEventExtensionConstants; import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; import io.restassured.http.ContentType; +import io.restassured.specification.RequestSpecification; import static io.restassured.RestAssured.given; import static java.util.concurrent.TimeUnit.SECONDS; @@ -40,8 +43,11 @@ private AssuredTestUtils() { } static String startProcess(String flowName) { - String id = startProcessNoCheck(flowName); + return startProcess(flowName, Optional.empty()); + } + static String startProcess(String flowName, Optional businessKey) { + String id = startProcessNoCheck(flowName, businessKey); given() .contentType(ContentType.JSON) .accept(ContentType.JSON) @@ -52,11 +58,16 @@ static String startProcess(String flowName) { } static String startProcessNoCheck(String flowName) { - return given() + return startProcessNoCheck(flowName, Optional.empty()); + } + + static String startProcessNoCheck(String flowName, Optional businessKey) { + RequestSpecification body = given() .contentType(ContentType.JSON) .when() - .body(Collections.singletonMap("workflowdata", Collections.emptyMap())) - .post("/" + flowName) + .body(Collections.singletonMap("workflowdata", Collections.emptyMap())); + businessKey.ifPresent(key -> body.queryParam("businessKey", key)); + return body.post("/" + flowName) .then() .statusCode(201) .extract().path("id"); @@ -73,15 +84,19 @@ static void waitForFinish(String flowName, String id, Duration duration) { .statusCode(404)); } - static CloudEvent buildCloudEvent(String id, String type, CloudEventMarshaller marshaller) { - return CloudEventBuilder.v1() + static CloudEvent buildCloudEvent(String id, Optional businessKey, String type, CloudEventMarshaller marshaller) { + io.cloudevents.core.v1.CloudEventBuilder builder = CloudEventBuilder.v1() .withId(UUID.randomUUID().toString()) .withSource(URI.create("")) .withType(type) .withTime(OffsetDateTime.now()) - .withExtension("kogitoprocrefid", id) - .withData(marshaller.cloudEventDataFactory().apply(Collections.singletonMap(type, "This has been injected by the event"))) - .build(); + .withData(marshaller.cloudEventDataFactory().apply(Collections.singletonMap(type, "This has been injected by the event"))); + businessKey.ifPresentOrElse(key -> builder.withExtension(CloudEventExtensionConstants.BUSINESS_KEY, key), () -> builder.withExtension(CloudEventExtensionConstants.PROCESS_REFERENCE_ID, id)); + return builder.build(); + } + + static CloudEvent buildCloudEvent(String id, String type, CloudEventMarshaller marshaller) { + return buildCloudEvent(id, Optional.empty(), type, marshaller); } } diff --git a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java index 50fcb8e822f..91ede4a21f9 100644 --- a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java +++ b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.time.Duration; import java.util.Map; +import java.util.Optional; import org.awaitility.core.ConditionTimeoutException; import org.junit.jupiter.api.BeforeAll; @@ -57,7 +58,7 @@ static void init() { @Test void testNotStartingEvent() throws IOException { - doIt("nonStartEvent", "move"); + doIt("nonStartEvent", Optional.of("manolo"), "move"); } @Test @@ -108,25 +109,35 @@ void testNotStartingMultipleEventRainy() throws IOException { } private void sendEvents(String id, String... eventTypes) throws IOException { + sendEvents(id, Optional.empty(), eventTypes); + } + + private void sendEvents(String id, Optional businessKey, String... eventTypes) throws IOException { for (String eventType : eventTypes) { given() .contentType(ContentType.JSON) .when() - .body(generateCloudEvent(id, eventType)) + .body(generateCloudEvent(id, businessKey, eventType)) .post("/" + eventType) .then() .statusCode(202); } } + private void doIt(String flowName, Optional businessKey, String... eventTypes) throws IOException { + String id = startProcess(flowName, businessKey); + sendEvents(id, businessKey, eventTypes); + waitForFinish(flowName, id, Duration.ofSeconds(15)); + } + private void doIt(String flowName, String... eventTypes) throws IOException { String id = startProcess(flowName); sendEvents(id, eventTypes); waitForFinish(flowName, id, Duration.ofSeconds(15)); } - private byte[] generateCloudEvent(String id, String type) throws IOException { + private byte[] generateCloudEvent(String id, Optional businessKey, String type) throws IOException { CloudEventMarshaller marshaller = marshallers.getOrDefault(type, defaultMarshaller); - return marshaller.marshall(buildCloudEvent(id, type, marshaller)); + return marshaller.marshall(buildCloudEvent(id, businessKey, type, marshaller)); } } From 2a03356802c2baf94469b063e05626fef3fdfe12 Mon Sep 17 00:00:00 2001 From: Francisco Javier Tirado Sarti Date: Thu, 22 Feb 2024 15:57:47 +0100 Subject: [PATCH 2/3] [KOGITO-9276] Cleaner alternative --- .../jdbc/JDBCProcessInstances.java | 11 --- .../kie/kogito/process/ProcessInstances.java | 4 +- .../event/impl/ProcessEventDispatcher.java | 76 +++++++++++-------- .../impl/ProcessEventDispatcherTest.java | 6 +- 4 files changed, 50 insertions(+), 47 deletions(-) diff --git a/addons/common/persistence/jdbc/src/main/java/org/kie/kogito/persistence/jdbc/JDBCProcessInstances.java b/addons/common/persistence/jdbc/src/main/java/org/kie/kogito/persistence/jdbc/JDBCProcessInstances.java index b451b4b5866..789da4f5c7b 100644 --- a/addons/common/persistence/jdbc/src/main/java/org/kie/kogito/persistence/jdbc/JDBCProcessInstances.java +++ b/addons/common/persistence/jdbc/src/main/java/org/kie/kogito/persistence/jdbc/JDBCProcessInstances.java @@ -101,17 +101,6 @@ public Optional> findById(String id, ProcessInstanceReadMode return repository.findByIdInternal(process.id(), process.version(), UUID.fromString(id)).map(r -> unmarshall(r, mode)); } - @Override - public Optional> findByIdOrBusinessKey(String id) { - LOGGER.debug("Find process instance id or businessKey: {}", id); - try { - UUID uuid = UUID.fromString(id); - return repository.findByIdInternal(process.id(), process.version(), uuid).map(r -> unmarshall(r, ProcessInstanceReadMode.MUTABLE)); - } catch (IllegalArgumentException ex) { - return stream(ProcessInstanceReadMode.MUTABLE).filter(pi -> id.equals(pi.businessKey())).findAny(); - } - } - @Override public Stream> stream(ProcessInstanceReadMode mode) { LOGGER.debug("Find process instance values using mode: {}", mode); diff --git a/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessInstances.java b/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessInstances.java index 740713876de..6cc0d95649c 100644 --- a/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessInstances.java +++ b/api/kogito-api/src/main/java/org/kie/kogito/process/ProcessInstances.java @@ -29,8 +29,8 @@ default Optional> findById(String id) { Optional> findById(String id, ProcessInstanceReadMode mode); - default Optional> findByIdOrBusinessKey(String id) { - return findById(id).or(() -> stream().filter(pi -> id.equals(pi.businessKey())).findAny()); + default Optional> findByBusinessKey(String id) { + return stream().filter(pi -> id.equals(pi.businessKey())).findAny(); } Stream> stream(ProcessInstanceReadMode mode); diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java index 75c543f8c87..1ede066bd93 100644 --- a/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java @@ -37,10 +37,8 @@ import org.kie.kogito.correlation.SimpleCorrelation; import org.kie.kogito.event.DataEvent; import org.kie.kogito.event.EventDispatcher; -import org.kie.kogito.event.cloudevents.CloudEventExtensionConstants; import org.kie.kogito.process.Process; import org.kie.kogito.process.ProcessInstance; -import org.kie.kogito.process.ProcessInstances; import org.kie.kogito.process.ProcessService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,20 +74,49 @@ public CompletableFuture> dispatch(String trigger, DataEvent< return CompletableFuture.completedFuture(null); } return resolveCorrelationId(event) - .map(kogitoReferenceId -> CompletableFuture.supplyAsync(() -> handleMessageWithReference(trigger, event, kogitoReferenceId), executor)) + .map(kogitoReferenceId -> asCompletable(trigger, event, findById(kogitoReferenceId))) .orElseGet(() -> { - // if the trigger is for a start event (model converter is set only for start node) - if (modelConverter.isPresent()) { - return CompletableFuture.supplyAsync(() -> startNewInstance(trigger, event), executor); - } else { - if (LOGGER.isInfoEnabled()) { - LOGGER.info("No matches found for trigger {} in process {}. Skipping consumed message {}", trigger, process.id(), event); - } - return CompletableFuture.completedFuture(null); + // check processInstanceId + String processInstanceId = event.getKogitoReferenceId(); + if (processInstanceId != null) { + return asCompletable(trigger, event, findById(processInstanceId)); } + // check businessKey + String businessKey = event.getKogitoBusinessKey(); + if (businessKey != null) { + return asCompletable(trigger, event, findByBusinessKey(businessKey)); + } + // try to start a new instance if possible + return CompletableFuture.supplyAsync(() -> startNewInstance(trigger, event), executor); }); } + private CompletableFuture> asCompletable(String trigger, DataEvent event, Optional> processInstance) { + return CompletableFuture.supplyAsync(() -> processInstance.map(pi -> { + signalProcessInstance(trigger, pi.id(), event); + return pi; + }).orElseGet(() -> startNewInstance(trigger, event)), executor); + } + + private Optional> findById(String id) { + LOGGER.debug("Received message with process instance id '{}'", id); + Optional> result = process.instances().findById(id); + if (LOGGER.isDebugEnabled() && result.isEmpty()) { + LOGGER.debug("No instance found for process instance id '{}'", id); + } + return result; + + } + + private Optional> findByBusinessKey(String key) { + LOGGER.debug("Received message with business key '{}'", key); + Optional> result = process.instances().findByBusinessKey(key); + if (LOGGER.isDebugEnabled() && result.isEmpty()) { + LOGGER.debug("No instance found for business key '{}'", key); + } + return result; + } + private Optional compositeCorrelation(DataEvent event) { return correlationKeys != null && !correlationKeys.isEmpty() ? Optional.of(new CompositeCorrelation( correlationKeys.stream().map(k -> new SimpleCorrelation<>(k, resolve(event, k))).collect(Collectors.toSet()))) : Optional.empty(); @@ -97,9 +124,8 @@ private Optional compositeCorrelation(DataEvent event) private Optional resolveCorrelationId(DataEvent event) { return compositeCorrelation(event).flatMap(process.correlations()::find) - .map(CorrelationInstance::getCorrelatedId) - .or(() -> Optional.ofNullable((String) event.getExtension(CloudEventExtensionConstants.BUSINESS_KEY))) - .or(() -> Optional.ofNullable(event.getKogitoReferenceId())); + .map(CorrelationInstance::getCorrelatedId); + } private Object resolve(DataEvent event, String key) { @@ -114,21 +140,6 @@ private Object resolve(DataEvent event, String key) { } } - private ProcessInstance handleMessageWithReference(String trigger, DataEvent event, String instanceId) { - LOGGER.debug("Received message with reference id '{}' going to use it to send signal '{}'", - instanceId, - trigger); - return process.instances().findByIdOrBusinessKey(instanceId) - .map(instance -> { - signalProcessInstance(trigger, instance.id(), event); - return instance; - }) - .orElseGet(() -> { - LOGGER.info("Process instance with id '{}' not found for triggering signal '{}'", instanceId, trigger); - return startNewInstance(trigger, event); - }); - } - private Optional signalProcessInstance(String trigger, String id, DataEvent event) { return processService.signalProcessInstance((Process) process, id, dataResolver.apply(event), "Message-" + trigger); } @@ -139,7 +150,12 @@ private ProcessInstance startNewInstance(String trigger, DataEvent event) return processService.createProcessInstance(process, event.getKogitoBusinessKey(), m.apply(dataResolver.apply(event)), headersFromEvent(event), event.getKogitoStartFromNode(), trigger, event.getKogitoProcessInstanceId(), compositeCorrelation(event).orElse(null)); - }).orElse(null); + }).orElseGet(() -> { + if (LOGGER.isInfoEnabled()) { + LOGGER.info("No matches found for trigger {} in process {}. Skipping consumed message {}", trigger, process.id(), event); + } + return null; + }); } protected Map> headersFromEvent(DataEvent event) { diff --git a/api/kogito-events-core/src/test/java/org/kie/kogito/event/impl/ProcessEventDispatcherTest.java b/api/kogito-events-core/src/test/java/org/kie/kogito/event/impl/ProcessEventDispatcherTest.java index 497a38d8f11..f7d4cda1ea0 100644 --- a/api/kogito-events-core/src/test/java/org/kie/kogito/event/impl/ProcessEventDispatcherTest.java +++ b/api/kogito-events-core/src/test/java/org/kie/kogito/event/impl/ProcessEventDispatcherTest.java @@ -78,8 +78,6 @@ void setup() { when(process.correlations()).thenReturn(correlationService); when(processInstances.findById(Mockito.anyString())).thenReturn(Optional.empty()); when(processInstances.findById("1")).thenReturn(Optional.of(processInstance)); - when(processInstances.findByIdOrBusinessKey(Mockito.anyString())).thenReturn(Optional.empty()); - when(processInstances.findByIdOrBusinessKey("1")).thenReturn(Optional.of(processInstance)); processService = mock(ProcessService.class); when(processService.createProcessInstance(eq(process), any(), any(), any(), any(), any(), any(), any())).thenReturn(processInstance); when(processService.signalProcessInstance(eq(process), any(), any(), any())).thenReturn(Optional.of(mock(DummyModel.class))); @@ -119,7 +117,7 @@ void testCloudEventNewInstanceWithoutReference() throws Exception { ArgumentCaptor signal = ArgumentCaptor.forClass(String.class); ArgumentCaptor referenceId = ArgumentCaptor.forClass(String.class); - verify(processInstances, never()).findByIdOrBusinessKey("invalidReference"); + verify(processInstances, never()).findById(any()); verify(processService, never()).signalProcessInstance(eq(process), any(), any(), signal.capture()); verify(processService, times(1)).createProcessInstance(eq(process), any(), any(DummyModel.class), any(), any(), signal.capture(), referenceId.capture(), isNull()); @@ -137,7 +135,7 @@ void testCloudEventNewInstanceWithReference() throws Exception { ArgumentCaptor referenceId = ArgumentCaptor.forClass(String.class); ArgumentCaptor>> headers = ArgumentCaptor.forClass(Map.class); - verify(processInstances, times(1)).findByIdOrBusinessKey("invalidReference"); + verify(processInstances, times(1)).findById("invalidReference"); verify(processService, never()).signalProcessInstance(eq(process), any(), any(), signal.capture()); verify(processService, times(1)).createProcessInstance(eq(process), any(), any(DummyModel.class), headers.capture(), any(), signal.capture(), referenceId.capture(), isNull()); From 12742d992f63fdf6d76040554722eec2467e6cf8 Mon Sep 17 00:00:00 2001 From: Francisco Javier Tirado Sarti Date: Mon, 26 Feb 2024 17:15:58 +0100 Subject: [PATCH 3/3] [KOGITO-9276] Gonzalos comments --- .../org/kie/kogito/event/impl/ProcessEventDispatcher.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java index 1ede066bd93..704e854865c 100644 --- a/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/ProcessEventDispatcher.java @@ -92,7 +92,11 @@ public CompletableFuture> dispatch(String trigger, DataEvent< } private CompletableFuture> asCompletable(String trigger, DataEvent event, Optional> processInstance) { + return CompletableFuture.supplyAsync(() -> processInstance.map(pi -> { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Sending signal {} to process instance id '{}'", trigger, pi.id()); + } signalProcessInstance(trigger, pi.id(), event); return pi; }).orElseGet(() -> startNewInstance(trigger, event)), executor);