Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KOGITO-9276] Adding support for businessKey to resume a process #3412

Merged
merged 3 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ default Optional<ProcessInstance<T>> findById(String id) {

Optional<ProcessInstance<T>> findById(String id, ProcessInstanceReadMode mode);

default Optional<ProcessInstance<T>> findByBusinessKey(String id) {
return stream().filter(pi -> id.equals(pi.businessKey())).findAny();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slow default implementation (linear search)
See #3413

}

Stream<ProcessInstance<T>> stream(ProcessInstanceReadMode mode);

default Stream<ProcessInstance<T>> stream() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.kie.kogito.correlation.SimpleCorrelation;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.EventDispatcher;
import org.kie.kogito.internal.utils.ConversionUtils;
import org.kie.kogito.process.Process;
import org.kie.kogito.process.ProcessInstance;
import org.kie.kogito.process.ProcessService;
Expand Down Expand Up @@ -74,31 +73,63 @@ public CompletableFuture<ProcessInstance<M>> dispatch(String trigger, DataEvent<
}
return CompletableFuture.completedFuture(null);
}
return resolveCorrelationId(event)
.map(kogitoReferenceId -> asCompletable(trigger, event, findById(kogitoReferenceId)))
.orElseGet(() -> {
// check processInstanceId
String processInstanceId = event.getKogitoReferenceId();
if (processInstanceId != null) {
return asCompletable(trigger, event, findById(processInstanceId));
}
// check businessKey
String businessKey = event.getKogitoBusinessKey();
if (businessKey != null) {
return asCompletable(trigger, event, findByBusinessKey(businessKey));
}
// try to start a new instance if possible
return CompletableFuture.supplyAsync(() -> startNewInstance(trigger, event), executor);
});
}

final String kogitoReferenceId = resolveCorrelationId(event);
if (!ConversionUtils.isEmpty(kogitoReferenceId)) {
return CompletableFuture.supplyAsync(() -> handleMessageWithReference(trigger, event, kogitoReferenceId), executor);
}
private CompletableFuture<ProcessInstance<M>> asCompletable(String trigger, DataEvent<D> event, Optional<ProcessInstance<M>> processInstance) {

//if the trigger is for a start event (model converter is set only for start node)
if (modelConverter.isPresent()) {
return CompletableFuture.supplyAsync(() -> startNewInstance(trigger, event), executor);
return CompletableFuture.supplyAsync(() -> processInstance.map(pi -> {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Sending signal {} to process instance id '{}'", trigger, pi.id());
}
signalProcessInstance(trigger, pi.id(), event);
fjtirado marked this conversation as resolved.
Show resolved Hide resolved
return pi;
}).orElseGet(() -> startNewInstance(trigger, event)), executor);
}

private Optional<ProcessInstance<M>> findById(String id) {
LOGGER.debug("Received message with process instance id '{}'", id);
Optional<ProcessInstance<M>> result = process.instances().findById(id);
if (LOGGER.isDebugEnabled() && result.isEmpty()) {
LOGGER.debug("No instance found for process instance id '{}'", id);
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("No matches found for trigger {} in process {}. Skipping consumed message {}", trigger, process.id(), event);
return result;

}

private Optional<ProcessInstance<M>> findByBusinessKey(String key) {
LOGGER.debug("Received message with business key '{}'", key);
Optional<ProcessInstance<M>> result = process.instances().findByBusinessKey(key);
if (LOGGER.isDebugEnabled() && result.isEmpty()) {
LOGGER.debug("No instance found for business key '{}'", key);
}
return CompletableFuture.completedFuture(null);
return result;
}

private Optional<CompositeCorrelation> compositeCorrelation(DataEvent<?> event) {
return correlationKeys != null && !correlationKeys.isEmpty() ? Optional.of(new CompositeCorrelation(
correlationKeys.stream().map(k -> new SimpleCorrelation<>(k, resolve(event, k))).collect(Collectors.toSet()))) : Optional.empty();
}

private String resolveCorrelationId(DataEvent<?> event) {
private Optional<String> resolveCorrelationId(DataEvent<?> event) {
return compositeCorrelation(event).flatMap(process.correlations()::find)
.map(CorrelationInstance::getCorrelatedId)
.orElseGet(event::getKogitoReferenceId);
.map(CorrelationInstance::getCorrelatedId);

}

private Object resolve(DataEvent<?> event, String key) {
Expand All @@ -113,22 +144,6 @@ private Object resolve(DataEvent<?> event, String key) {
}
}

private ProcessInstance<M> handleMessageWithReference(String trigger, DataEvent<D> event, String instanceId) {
LOGGER.debug("Received message with reference id '{}' going to use it to send signal '{}'",
instanceId,
trigger);
return process.instances()
.findById(instanceId)
.map(instance -> {
signalProcessInstance(trigger, instance.id(), event);
return instance;
})
.orElseGet(() -> {
LOGGER.info("Process instance with id '{}' not found for triggering signal '{}'", instanceId, trigger);
return startNewInstance(trigger, event);
});
}

private Optional<M> signalProcessInstance(String trigger, String id, DataEvent<D> event) {
return processService.signalProcessInstance((Process) process, id, dataResolver.apply(event), "Message-" + trigger);
}
Expand All @@ -139,7 +154,12 @@ private ProcessInstance<M> startNewInstance(String trigger, DataEvent<D> event)
return processService.createProcessInstance(process, event.getKogitoBusinessKey(), m.apply(dataResolver.apply(event)),
headersFromEvent(event), event.getKogitoStartFromNode(), trigger,
event.getKogitoProcessInstanceId(), compositeCorrelation(event).orElse(null));
}).orElse(null);
}).orElseGet(() -> {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("No matches found for trigger {} in process {}. Skipping consumed message {}", trigger, process.id(), event);
}
return null;
});
}

protected Map<String, List<String>> headersFromEvent(DataEvent<D> event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Collections;
import java.util.Optional;
import java.util.UUID;

import org.kie.kogito.event.CloudEventMarshaller;
import org.kie.kogito.event.cloudevents.CloudEventExtensionConstants;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.restassured.http.ContentType;
import io.restassured.specification.RequestSpecification;

import static io.restassured.RestAssured.given;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand All @@ -40,8 +43,11 @@ private AssuredTestUtils() {
}

static String startProcess(String flowName) {
String id = startProcessNoCheck(flowName);
return startProcess(flowName, Optional.empty());
}

static String startProcess(String flowName, Optional<String> businessKey) {
String id = startProcessNoCheck(flowName, businessKey);
given()
.contentType(ContentType.JSON)
.accept(ContentType.JSON)
Expand All @@ -52,11 +58,16 @@ static String startProcess(String flowName) {
}

static String startProcessNoCheck(String flowName) {
return given()
return startProcessNoCheck(flowName, Optional.empty());
}

static String startProcessNoCheck(String flowName, Optional<String> businessKey) {
RequestSpecification body = given()
.contentType(ContentType.JSON)
.when()
.body(Collections.singletonMap("workflowdata", Collections.emptyMap()))
.post("/" + flowName)
.body(Collections.singletonMap("workflowdata", Collections.emptyMap()));
businessKey.ifPresent(key -> body.queryParam("businessKey", key));
return body.post("/" + flowName)
.then()
.statusCode(201)
.extract().path("id");
Expand All @@ -73,15 +84,19 @@ static void waitForFinish(String flowName, String id, Duration duration) {
.statusCode(404));
}

static CloudEvent buildCloudEvent(String id, String type, CloudEventMarshaller<byte[]> marshaller) {
return CloudEventBuilder.v1()
static CloudEvent buildCloudEvent(String id, Optional<String> businessKey, String type, CloudEventMarshaller<byte[]> marshaller) {
io.cloudevents.core.v1.CloudEventBuilder builder = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSource(URI.create(""))
.withType(type)
.withTime(OffsetDateTime.now())
.withExtension("kogitoprocrefid", id)
.withData(marshaller.cloudEventDataFactory().apply(Collections.singletonMap(type, "This has been injected by the event")))
.build();
.withData(marshaller.cloudEventDataFactory().apply(Collections.singletonMap(type, "This has been injected by the event")));
businessKey.ifPresentOrElse(key -> builder.withExtension(CloudEventExtensionConstants.BUSINESS_KEY, key), () -> builder.withExtension(CloudEventExtensionConstants.PROCESS_REFERENCE_ID, id));
fjtirado marked this conversation as resolved.
Show resolved Hide resolved
return builder.build();
}

static CloudEvent buildCloudEvent(String id, String type, CloudEventMarshaller<byte[]> marshaller) {
return buildCloudEvent(id, Optional.empty(), type, marshaller);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;

import org.awaitility.core.ConditionTimeoutException;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -57,7 +58,7 @@ static void init() {

@Test
void testNotStartingEvent() throws IOException {
doIt("nonStartEvent", "move");
doIt("nonStartEvent", Optional.of("manolo"), "move");
}

@Test
Expand Down Expand Up @@ -108,25 +109,35 @@ void testNotStartingMultipleEventRainy() throws IOException {
}

private void sendEvents(String id, String... eventTypes) throws IOException {
sendEvents(id, Optional.empty(), eventTypes);
}

private void sendEvents(String id, Optional<String> businessKey, String... eventTypes) throws IOException {
for (String eventType : eventTypes) {
given()
.contentType(ContentType.JSON)
.when()
.body(generateCloudEvent(id, eventType))
.body(generateCloudEvent(id, businessKey, eventType))
.post("/" + eventType)
.then()
.statusCode(202);
}
}

private void doIt(String flowName, Optional<String> businessKey, String... eventTypes) throws IOException {
String id = startProcess(flowName, businessKey);
sendEvents(id, businessKey, eventTypes);
waitForFinish(flowName, id, Duration.ofSeconds(15));
}

private void doIt(String flowName, String... eventTypes) throws IOException {
String id = startProcess(flowName);
sendEvents(id, eventTypes);
waitForFinish(flowName, id, Duration.ofSeconds(15));
}

private byte[] generateCloudEvent(String id, String type) throws IOException {
private byte[] generateCloudEvent(String id, Optional<String> businessKey, String type) throws IOException {
CloudEventMarshaller<byte[]> marshaller = marshallers.getOrDefault(type, defaultMarshaller);
return marshaller.marshall(buildCloudEvent(id, type, marshaller));
return marshaller.marshall(buildCloudEvent(id, businessKey, type, marshaller));
}
}
Loading