Skip to content

Commit

Permalink
[KOGITO-9276] Adding support for businnesKey to resume a process
Browse files Browse the repository at this point in the history
  • Loading branch information
fjtirado committed Feb 22, 2024
1 parent 40f080f commit be80703
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ default Optional<ProcessInstance<T>> findById(String id) {

Optional<ProcessInstance<T>> findById(String id, ProcessInstanceReadMode mode);

default Optional<ProcessInstance<T>> findByIdOrBusinessKey(String id) {
return findById(id).or(() -> stream().filter(pi -> id.equals(pi.businessKey())).findAny());
}

Stream<ProcessInstance<T>> stream(ProcessInstanceReadMode mode);

default Stream<ProcessInstance<T>> stream() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@
import org.kie.kogito.correlation.SimpleCorrelation;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.EventDispatcher;
import org.kie.kogito.internal.utils.ConversionUtils;
import org.kie.kogito.event.cloudevents.CloudEventExtensionConstants;
import org.kie.kogito.process.Process;
import org.kie.kogito.process.ProcessInstance;
import org.kie.kogito.process.ProcessInstances;
import org.kie.kogito.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -74,31 +75,31 @@ public CompletableFuture<ProcessInstance<M>> dispatch(String trigger, DataEvent<
}
return CompletableFuture.completedFuture(null);
}

final String kogitoReferenceId = resolveCorrelationId(event);
if (!ConversionUtils.isEmpty(kogitoReferenceId)) {
return CompletableFuture.supplyAsync(() -> handleMessageWithReference(trigger, event, kogitoReferenceId), executor);
}

//if the trigger is for a start event (model converter is set only for start node)
if (modelConverter.isPresent()) {
return CompletableFuture.supplyAsync(() -> startNewInstance(trigger, event), executor);
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("No matches found for trigger {} in process {}. Skipping consumed message {}", trigger, process.id(), event);
}
return CompletableFuture.completedFuture(null);
return resolveCorrelationId(event)
.map(kogitoReferenceId -> CompletableFuture.supplyAsync(() -> handleMessageWithReference(trigger, event, kogitoReferenceId), executor))
.orElseGet(() -> {
// if the trigger is for a start event (model converter is set only for start node)
if (modelConverter.isPresent()) {
return CompletableFuture.supplyAsync(() -> startNewInstance(trigger, event), executor);
} else {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("No matches found for trigger {} in process {}. Skipping consumed message {}", trigger, process.id(), event);
}
return CompletableFuture.completedFuture(null);
}
});
}

private Optional<CompositeCorrelation> compositeCorrelation(DataEvent<?> event) {
return correlationKeys != null && !correlationKeys.isEmpty() ? Optional.of(new CompositeCorrelation(
correlationKeys.stream().map(k -> new SimpleCorrelation<>(k, resolve(event, k))).collect(Collectors.toSet()))) : Optional.empty();
}

private String resolveCorrelationId(DataEvent<?> event) {
private Optional<String> resolveCorrelationId(DataEvent<?> event) {
return compositeCorrelation(event).flatMap(process.correlations()::find)
.map(CorrelationInstance::getCorrelatedId)
.orElseGet(event::getKogitoReferenceId);
.or(() -> Optional.ofNullable((String) event.getExtension(CloudEventExtensionConstants.BUSINESS_KEY)))
.or(() -> Optional.ofNullable(event.getKogitoReferenceId()));
}

private Object resolve(DataEvent<?> event, String key) {
Expand All @@ -117,8 +118,7 @@ private ProcessInstance<M> handleMessageWithReference(String trigger, DataEvent<
LOGGER.debug("Received message with reference id '{}' going to use it to send signal '{}'",
instanceId,
trigger);
return process.instances()
.findById(instanceId)
return process.instances().findByIdOrBusinessKey(instanceId)
.map(instance -> {
signalProcessInstance(trigger, instance.id(), event);
return instance;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ void setup() {
when(process.correlations()).thenReturn(correlationService);
when(processInstances.findById(Mockito.anyString())).thenReturn(Optional.empty());
when(processInstances.findById("1")).thenReturn(Optional.of(processInstance));
when(processInstances.findByIdOrBusinessKey(Mockito.anyString())).thenReturn(Optional.empty());
when(processInstances.findByIdOrBusinessKey("1")).thenReturn(Optional.of(processInstance));
processService = mock(ProcessService.class);
when(processService.createProcessInstance(eq(process), any(), any(), any(), any(), any(), any(), any())).thenReturn(processInstance);
when(processService.signalProcessInstance(eq(process), any(), any(), any())).thenReturn(Optional.of(mock(DummyModel.class)));
Expand Down Expand Up @@ -117,7 +119,7 @@ void testCloudEventNewInstanceWithoutReference() throws Exception {
ArgumentCaptor<String> signal = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<String> referenceId = ArgumentCaptor.forClass(String.class);

verify(processInstances, never()).findById(any());
verify(processInstances, never()).findByIdOrBusinessKey("invalidReference");
verify(processService, never()).signalProcessInstance(eq(process), any(), any(), signal.capture());
verify(processService, times(1)).createProcessInstance(eq(process), any(), any(DummyModel.class), any(), any(), signal.capture(), referenceId.capture(), isNull());

Expand All @@ -135,7 +137,7 @@ void testCloudEventNewInstanceWithReference() throws Exception {
ArgumentCaptor<String> referenceId = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Map<String, List<String>>> headers = ArgumentCaptor.forClass(Map.class);

verify(processInstances, times(1)).findById("invalidReference");
verify(processInstances, times(1)).findByIdOrBusinessKey("invalidReference");
verify(processService, never()).signalProcessInstance(eq(process), any(), any(), signal.capture());
verify(processService, times(1)).createProcessInstance(eq(process), any(), any(DummyModel.class), headers.capture(), any(), signal.capture(), referenceId.capture(), isNull());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Collections;
import java.util.Optional;
import java.util.UUID;

import org.kie.kogito.event.CloudEventMarshaller;
import org.kie.kogito.event.cloudevents.CloudEventExtensionConstants;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.restassured.http.ContentType;
import io.restassured.specification.RequestSpecification;

import static io.restassured.RestAssured.given;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand All @@ -40,8 +43,11 @@ private AssuredTestUtils() {
}

static String startProcess(String flowName) {
String id = startProcessNoCheck(flowName);
return startProcess(flowName, Optional.empty());
}

static String startProcess(String flowName, Optional<String> businessKey) {
String id = startProcessNoCheck(flowName, businessKey);
given()
.contentType(ContentType.JSON)
.accept(ContentType.JSON)
Expand All @@ -52,11 +58,16 @@ static String startProcess(String flowName) {
}

static String startProcessNoCheck(String flowName) {
return given()
return startProcessNoCheck(flowName, Optional.empty());
}

static String startProcessNoCheck(String flowName, Optional<String> businessKey) {
RequestSpecification body = given()
.contentType(ContentType.JSON)
.when()
.body(Collections.singletonMap("workflowdata", Collections.emptyMap()))
.post("/" + flowName)
.body(Collections.singletonMap("workflowdata", Collections.emptyMap()));
businessKey.ifPresent(key -> body.queryParam("businessKey", key));
return body.post("/" + flowName)
.then()
.statusCode(201)
.extract().path("id");
Expand All @@ -73,15 +84,19 @@ static void waitForFinish(String flowName, String id, Duration duration) {
.statusCode(404));
}

static CloudEvent buildCloudEvent(String id, String type, CloudEventMarshaller<byte[]> marshaller) {
return CloudEventBuilder.v1()
static CloudEvent buildCloudEvent(String id, Optional<String> businessKey, String type, CloudEventMarshaller<byte[]> marshaller) {
io.cloudevents.core.v1.CloudEventBuilder builder = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSource(URI.create(""))
.withType(type)
.withTime(OffsetDateTime.now())
.withExtension("kogitoprocrefid", id)
.withData(marshaller.cloudEventDataFactory().apply(Collections.singletonMap(type, "This has been injected by the event")))
.build();
.withData(marshaller.cloudEventDataFactory().apply(Collections.singletonMap(type, "This has been injected by the event")));
businessKey.ifPresentOrElse(key -> builder.withExtension(CloudEventExtensionConstants.BUSINESS_KEY, key), () -> builder.withExtension(CloudEventExtensionConstants.PROCESS_REFERENCE_ID, id));
return builder.build();
}

static CloudEvent buildCloudEvent(String id, String type, CloudEventMarshaller<byte[]> marshaller) {
return buildCloudEvent(id, Optional.empty(), type, marshaller);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;

import org.awaitility.core.ConditionTimeoutException;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -57,7 +58,7 @@ static void init() {

@Test
void testNotStartingEvent() throws IOException {
doIt("nonStartEvent", "move");
doIt("nonStartEvent", Optional.of("manolo"), "move");
}

@Test
Expand Down Expand Up @@ -108,25 +109,35 @@ void testNotStartingMultipleEventRainy() throws IOException {
}

private void sendEvents(String id, String... eventTypes) throws IOException {
sendEvents(id, Optional.empty(), eventTypes);
}

private void sendEvents(String id, Optional<String> businessKey, String... eventTypes) throws IOException {
for (String eventType : eventTypes) {
given()
.contentType(ContentType.JSON)
.when()
.body(generateCloudEvent(id, eventType))
.body(generateCloudEvent(id, businessKey, eventType))
.post("/" + eventType)
.then()
.statusCode(202);
}
}

private void doIt(String flowName, Optional<String> businessKey, String... eventTypes) throws IOException {
String id = startProcess(flowName, businessKey);
sendEvents(id, businessKey, eventTypes);
waitForFinish(flowName, id, Duration.ofSeconds(15));
}

private void doIt(String flowName, String... eventTypes) throws IOException {
String id = startProcess(flowName);
sendEvents(id, eventTypes);
waitForFinish(flowName, id, Duration.ofSeconds(15));
}

private byte[] generateCloudEvent(String id, String type) throws IOException {
private byte[] generateCloudEvent(String id, Optional<String> businessKey, String type) throws IOException {
CloudEventMarshaller<byte[]> marshaller = marshallers.getOrDefault(type, defaultMarshaller);
return marshaller.marshall(buildCloudEvent(id, type, marshaller));
return marshaller.marshall(buildCloudEvent(id, businessKey, type, marshaller));
}
}

0 comments on commit be80703

Please sign in to comment.