Skip to content

Commit

Permalink
[KOGITO-9276] Adding support for businessKey to resume a process (apa…
Browse files Browse the repository at this point in the history
…che#3412)

* [KOGITO-9276] Adding support for businnesKey to resume a process

* [KOGITO-9276] Cleaner alternative

* [KOGITO-9276] Gonzalos comments
  • Loading branch information
fjtirado authored and rgdoliveira committed Mar 11, 2024
1 parent f480727 commit c9e33a2
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ default Optional<ProcessInstance<T>> findById(String id) {

Optional<ProcessInstance<T>> findById(String id, ProcessInstanceReadMode mode);

default Optional<ProcessInstance<T>> findByBusinessKey(String id) {
return stream().filter(pi -> id.equals(pi.businessKey())).findAny();
}

Stream<ProcessInstance<T>> stream(ProcessInstanceReadMode mode);

default Stream<ProcessInstance<T>> stream() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.kie.kogito.correlation.SimpleCorrelation;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.EventDispatcher;
import org.kie.kogito.internal.utils.ConversionUtils;
import org.kie.kogito.process.Process;
import org.kie.kogito.process.ProcessInstance;
import org.kie.kogito.process.ProcessService;
Expand Down Expand Up @@ -74,31 +73,63 @@ public CompletableFuture<ProcessInstance<M>> dispatch(String trigger, DataEvent<
}
return CompletableFuture.completedFuture(null);
}
return resolveCorrelationId(event)
.map(kogitoReferenceId -> asCompletable(trigger, event, findById(kogitoReferenceId)))
.orElseGet(() -> {
// check processInstanceId
String processInstanceId = event.getKogitoReferenceId();
if (processInstanceId != null) {
return asCompletable(trigger, event, findById(processInstanceId));
}
// check businessKey
String businessKey = event.getKogitoBusinessKey();
if (businessKey != null) {
return asCompletable(trigger, event, findByBusinessKey(businessKey));
}
// try to start a new instance if possible
return CompletableFuture.supplyAsync(() -> startNewInstance(trigger, event), executor);
});
}

final String kogitoReferenceId = resolveCorrelationId(event);
if (!ConversionUtils.isEmpty(kogitoReferenceId)) {
return CompletableFuture.supplyAsync(() -> handleMessageWithReference(trigger, event, kogitoReferenceId), executor);
}
private CompletableFuture<ProcessInstance<M>> asCompletable(String trigger, DataEvent<D> event, Optional<ProcessInstance<M>> processInstance) {

//if the trigger is for a start event (model converter is set only for start node)
if (modelConverter.isPresent()) {
return CompletableFuture.supplyAsync(() -> startNewInstance(trigger, event), executor);
return CompletableFuture.supplyAsync(() -> processInstance.map(pi -> {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Sending signal {} to process instance id '{}'", trigger, pi.id());
}
signalProcessInstance(trigger, pi.id(), event);
return pi;
}).orElseGet(() -> startNewInstance(trigger, event)), executor);
}

private Optional<ProcessInstance<M>> findById(String id) {
LOGGER.debug("Received message with process instance id '{}'", id);
Optional<ProcessInstance<M>> result = process.instances().findById(id);
if (LOGGER.isDebugEnabled() && result.isEmpty()) {
LOGGER.debug("No instance found for process instance id '{}'", id);
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("No matches found for trigger {} in process {}. Skipping consumed message {}", trigger, process.id(), event);
return result;

}

private Optional<ProcessInstance<M>> findByBusinessKey(String key) {
LOGGER.debug("Received message with business key '{}'", key);
Optional<ProcessInstance<M>> result = process.instances().findByBusinessKey(key);
if (LOGGER.isDebugEnabled() && result.isEmpty()) {
LOGGER.debug("No instance found for business key '{}'", key);
}
return CompletableFuture.completedFuture(null);
return result;
}

private Optional<CompositeCorrelation> compositeCorrelation(DataEvent<?> event) {
return correlationKeys != null && !correlationKeys.isEmpty() ? Optional.of(new CompositeCorrelation(
correlationKeys.stream().map(k -> new SimpleCorrelation<>(k, resolve(event, k))).collect(Collectors.toSet()))) : Optional.empty();
}

private String resolveCorrelationId(DataEvent<?> event) {
private Optional<String> resolveCorrelationId(DataEvent<?> event) {
return compositeCorrelation(event).flatMap(process.correlations()::find)
.map(CorrelationInstance::getCorrelatedId)
.orElseGet(event::getKogitoReferenceId);
.map(CorrelationInstance::getCorrelatedId);

}

private Object resolve(DataEvent<?> event, String key) {
Expand All @@ -113,22 +144,6 @@ private Object resolve(DataEvent<?> event, String key) {
}
}

private ProcessInstance<M> handleMessageWithReference(String trigger, DataEvent<D> event, String instanceId) {
LOGGER.debug("Received message with reference id '{}' going to use it to send signal '{}'",
instanceId,
trigger);
return process.instances()
.findById(instanceId)
.map(instance -> {
signalProcessInstance(trigger, instance.id(), event);
return instance;
})
.orElseGet(() -> {
LOGGER.info("Process instance with id '{}' not found for triggering signal '{}'", instanceId, trigger);
return startNewInstance(trigger, event);
});
}

private Optional<M> signalProcessInstance(String trigger, String id, DataEvent<D> event) {
return processService.signalProcessInstance((Process) process, id, dataResolver.apply(event), "Message-" + trigger);
}
Expand All @@ -139,7 +154,12 @@ private ProcessInstance<M> startNewInstance(String trigger, DataEvent<D> event)
return processService.createProcessInstance(process, event.getKogitoBusinessKey(), m.apply(dataResolver.apply(event)),
headersFromEvent(event), event.getKogitoStartFromNode(), trigger,
event.getKogitoProcessInstanceId(), compositeCorrelation(event).orElse(null));
}).orElse(null);
}).orElseGet(() -> {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("No matches found for trigger {} in process {}. Skipping consumed message {}", trigger, process.id(), event);
}
return null;
});
}

protected Map<String, List<String>> headersFromEvent(DataEvent<D> event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Collections;
import java.util.Optional;
import java.util.UUID;

import org.kie.kogito.event.CloudEventMarshaller;
import org.kie.kogito.event.cloudevents.CloudEventExtensionConstants;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.restassured.http.ContentType;
import io.restassured.specification.RequestSpecification;

import static io.restassured.RestAssured.given;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand All @@ -40,8 +43,11 @@ private AssuredTestUtils() {
}

static String startProcess(String flowName) {
String id = startProcessNoCheck(flowName);
return startProcess(flowName, Optional.empty());
}

static String startProcess(String flowName, Optional<String> businessKey) {
String id = startProcessNoCheck(flowName, businessKey);
given()
.contentType(ContentType.JSON)
.accept(ContentType.JSON)
Expand All @@ -52,11 +58,16 @@ static String startProcess(String flowName) {
}

static String startProcessNoCheck(String flowName) {
return given()
return startProcessNoCheck(flowName, Optional.empty());
}

static String startProcessNoCheck(String flowName, Optional<String> businessKey) {
RequestSpecification body = given()
.contentType(ContentType.JSON)
.when()
.body(Collections.singletonMap("workflowdata", Collections.emptyMap()))
.post("/" + flowName)
.body(Collections.singletonMap("workflowdata", Collections.emptyMap()));
businessKey.ifPresent(key -> body.queryParam("businessKey", key));
return body.post("/" + flowName)
.then()
.statusCode(201)
.extract().path("id");
Expand All @@ -73,15 +84,19 @@ static void waitForFinish(String flowName, String id, Duration duration) {
.statusCode(404));
}

static CloudEvent buildCloudEvent(String id, String type, CloudEventMarshaller<byte[]> marshaller) {
return CloudEventBuilder.v1()
static CloudEvent buildCloudEvent(String id, Optional<String> businessKey, String type, CloudEventMarshaller<byte[]> marshaller) {
io.cloudevents.core.v1.CloudEventBuilder builder = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSource(URI.create(""))
.withType(type)
.withTime(OffsetDateTime.now())
.withExtension("kogitoprocrefid", id)
.withData(marshaller.cloudEventDataFactory().apply(Collections.singletonMap(type, "This has been injected by the event")))
.build();
.withData(marshaller.cloudEventDataFactory().apply(Collections.singletonMap(type, "This has been injected by the event")));
businessKey.ifPresentOrElse(key -> builder.withExtension(CloudEventExtensionConstants.BUSINESS_KEY, key), () -> builder.withExtension(CloudEventExtensionConstants.PROCESS_REFERENCE_ID, id));
return builder.build();
}

static CloudEvent buildCloudEvent(String id, String type, CloudEventMarshaller<byte[]> marshaller) {
return buildCloudEvent(id, Optional.empty(), type, marshaller);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;

import org.awaitility.core.ConditionTimeoutException;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -57,7 +58,7 @@ static void init() {

@Test
void testNotStartingEvent() throws IOException {
doIt("nonStartEvent", "move");
doIt("nonStartEvent", Optional.of("manolo"), "move");
}

@Test
Expand Down Expand Up @@ -108,25 +109,35 @@ void testNotStartingMultipleEventRainy() throws IOException {
}

private void sendEvents(String id, String... eventTypes) throws IOException {
sendEvents(id, Optional.empty(), eventTypes);
}

private void sendEvents(String id, Optional<String> businessKey, String... eventTypes) throws IOException {
for (String eventType : eventTypes) {
given()
.contentType(ContentType.JSON)
.when()
.body(generateCloudEvent(id, eventType))
.body(generateCloudEvent(id, businessKey, eventType))
.post("/" + eventType)
.then()
.statusCode(202);
}
}

private void doIt(String flowName, Optional<String> businessKey, String... eventTypes) throws IOException {
String id = startProcess(flowName, businessKey);
sendEvents(id, businessKey, eventTypes);
waitForFinish(flowName, id, Duration.ofSeconds(15));
}

private void doIt(String flowName, String... eventTypes) throws IOException {
String id = startProcess(flowName);
sendEvents(id, eventTypes);
waitForFinish(flowName, id, Duration.ofSeconds(15));
}

private byte[] generateCloudEvent(String id, String type) throws IOException {
private byte[] generateCloudEvent(String id, Optional<String> businessKey, String type) throws IOException {
CloudEventMarshaller<byte[]> marshaller = marshallers.getOrDefault(type, defaultMarshaller);
return marshaller.marshall(buildCloudEvent(id, type, marshaller));
return marshaller.marshall(buildCloudEvent(id, businessKey, type, marshaller));
}
}

0 comments on commit c9e33a2

Please sign in to comment.