Skip to content

Commit

Permalink
[KOGITO-9276] Cleaner alternative
Browse files Browse the repository at this point in the history
  • Loading branch information
fjtirado committed Feb 22, 2024
1 parent 6fee9c7 commit 2a03356
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,6 @@ public Optional<ProcessInstance<?>> findById(String id, ProcessInstanceReadMode
return repository.findByIdInternal(process.id(), process.version(), UUID.fromString(id)).map(r -> unmarshall(r, mode));
}

@Override
public Optional<ProcessInstance<?>> findByIdOrBusinessKey(String id) {
LOGGER.debug("Find process instance id or businessKey: {}", id);
try {
UUID uuid = UUID.fromString(id);
return repository.findByIdInternal(process.id(), process.version(), uuid).map(r -> unmarshall(r, ProcessInstanceReadMode.MUTABLE));
} catch (IllegalArgumentException ex) {
return stream(ProcessInstanceReadMode.MUTABLE).filter(pi -> id.equals(pi.businessKey())).findAny();
}
}

@Override
public Stream<ProcessInstance<?>> stream(ProcessInstanceReadMode mode) {
LOGGER.debug("Find process instance values using mode: {}", mode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ default Optional<ProcessInstance<T>> findById(String id) {

Optional<ProcessInstance<T>> findById(String id, ProcessInstanceReadMode mode);

default Optional<ProcessInstance<T>> findByIdOrBusinessKey(String id) {
return findById(id).or(() -> stream().filter(pi -> id.equals(pi.businessKey())).findAny());
default Optional<ProcessInstance<T>> findByBusinessKey(String id) {
return stream().filter(pi -> id.equals(pi.businessKey())).findAny();
}

Stream<ProcessInstance<T>> stream(ProcessInstanceReadMode mode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,8 @@
import org.kie.kogito.correlation.SimpleCorrelation;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.EventDispatcher;
import org.kie.kogito.event.cloudevents.CloudEventExtensionConstants;
import org.kie.kogito.process.Process;
import org.kie.kogito.process.ProcessInstance;
import org.kie.kogito.process.ProcessInstances;
import org.kie.kogito.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -76,30 +74,58 @@ public CompletableFuture<ProcessInstance<M>> dispatch(String trigger, DataEvent<
return CompletableFuture.completedFuture(null);
}
return resolveCorrelationId(event)
.map(kogitoReferenceId -> CompletableFuture.supplyAsync(() -> handleMessageWithReference(trigger, event, kogitoReferenceId), executor))
.map(kogitoReferenceId -> asCompletable(trigger, event, findById(kogitoReferenceId)))
.orElseGet(() -> {
// if the trigger is for a start event (model converter is set only for start node)
if (modelConverter.isPresent()) {
return CompletableFuture.supplyAsync(() -> startNewInstance(trigger, event), executor);
} else {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("No matches found for trigger {} in process {}. Skipping consumed message {}", trigger, process.id(), event);
}
return CompletableFuture.completedFuture(null);
// check processInstanceId
String processInstanceId = event.getKogitoReferenceId();
if (processInstanceId != null) {
return asCompletable(trigger, event, findById(processInstanceId));
}
// check businessKey
String businessKey = event.getKogitoBusinessKey();
if (businessKey != null) {
return asCompletable(trigger, event, findByBusinessKey(businessKey));
}
// try to start a new instance if possible
return CompletableFuture.supplyAsync(() -> startNewInstance(trigger, event), executor);
});
}

private CompletableFuture<ProcessInstance<M>> asCompletable(String trigger, DataEvent<D> event, Optional<ProcessInstance<M>> processInstance) {
return CompletableFuture.supplyAsync(() -> processInstance.map(pi -> {
signalProcessInstance(trigger, pi.id(), event);
return pi;
}).orElseGet(() -> startNewInstance(trigger, event)), executor);
}

private Optional<ProcessInstance<M>> findById(String id) {
LOGGER.debug("Received message with process instance id '{}'", id);
Optional<ProcessInstance<M>> result = process.instances().findById(id);
if (LOGGER.isDebugEnabled() && result.isEmpty()) {
LOGGER.debug("No instance found for process instance id '{}'", id);
}
return result;

}

private Optional<ProcessInstance<M>> findByBusinessKey(String key) {
LOGGER.debug("Received message with business key '{}'", key);
Optional<ProcessInstance<M>> result = process.instances().findByBusinessKey(key);
if (LOGGER.isDebugEnabled() && result.isEmpty()) {
LOGGER.debug("No instance found for business key '{}'", key);
}
return result;
}

private Optional<CompositeCorrelation> compositeCorrelation(DataEvent<?> event) {
return correlationKeys != null && !correlationKeys.isEmpty() ? Optional.of(new CompositeCorrelation(
correlationKeys.stream().map(k -> new SimpleCorrelation<>(k, resolve(event, k))).collect(Collectors.toSet()))) : Optional.empty();
}

private Optional<String> resolveCorrelationId(DataEvent<?> event) {
return compositeCorrelation(event).flatMap(process.correlations()::find)
.map(CorrelationInstance::getCorrelatedId)
.or(() -> Optional.ofNullable((String) event.getExtension(CloudEventExtensionConstants.BUSINESS_KEY)))
.or(() -> Optional.ofNullable(event.getKogitoReferenceId()));
.map(CorrelationInstance::getCorrelatedId);

}

private Object resolve(DataEvent<?> event, String key) {
Expand All @@ -114,21 +140,6 @@ private Object resolve(DataEvent<?> event, String key) {
}
}

private ProcessInstance<M> handleMessageWithReference(String trigger, DataEvent<D> event, String instanceId) {
LOGGER.debug("Received message with reference id '{}' going to use it to send signal '{}'",
instanceId,
trigger);
return process.instances().findByIdOrBusinessKey(instanceId)
.map(instance -> {
signalProcessInstance(trigger, instance.id(), event);
return instance;
})
.orElseGet(() -> {
LOGGER.info("Process instance with id '{}' not found for triggering signal '{}'", instanceId, trigger);
return startNewInstance(trigger, event);
});
}

private Optional<M> signalProcessInstance(String trigger, String id, DataEvent<D> event) {
return processService.signalProcessInstance((Process) process, id, dataResolver.apply(event), "Message-" + trigger);
}
Expand All @@ -139,7 +150,12 @@ private ProcessInstance<M> startNewInstance(String trigger, DataEvent<D> event)
return processService.createProcessInstance(process, event.getKogitoBusinessKey(), m.apply(dataResolver.apply(event)),
headersFromEvent(event), event.getKogitoStartFromNode(), trigger,
event.getKogitoProcessInstanceId(), compositeCorrelation(event).orElse(null));
}).orElse(null);
}).orElseGet(() -> {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("No matches found for trigger {} in process {}. Skipping consumed message {}", trigger, process.id(), event);
}
return null;
});
}

protected Map<String, List<String>> headersFromEvent(DataEvent<D> event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ void setup() {
when(process.correlations()).thenReturn(correlationService);
when(processInstances.findById(Mockito.anyString())).thenReturn(Optional.empty());
when(processInstances.findById("1")).thenReturn(Optional.of(processInstance));
when(processInstances.findByIdOrBusinessKey(Mockito.anyString())).thenReturn(Optional.empty());
when(processInstances.findByIdOrBusinessKey("1")).thenReturn(Optional.of(processInstance));
processService = mock(ProcessService.class);
when(processService.createProcessInstance(eq(process), any(), any(), any(), any(), any(), any(), any())).thenReturn(processInstance);
when(processService.signalProcessInstance(eq(process), any(), any(), any())).thenReturn(Optional.of(mock(DummyModel.class)));
Expand Down Expand Up @@ -119,7 +117,7 @@ void testCloudEventNewInstanceWithoutReference() throws Exception {
ArgumentCaptor<String> signal = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<String> referenceId = ArgumentCaptor.forClass(String.class);

verify(processInstances, never()).findByIdOrBusinessKey("invalidReference");
verify(processInstances, never()).findById(any());
verify(processService, never()).signalProcessInstance(eq(process), any(), any(), signal.capture());
verify(processService, times(1)).createProcessInstance(eq(process), any(), any(DummyModel.class), any(), any(), signal.capture(), referenceId.capture(), isNull());

Expand All @@ -137,7 +135,7 @@ void testCloudEventNewInstanceWithReference() throws Exception {
ArgumentCaptor<String> referenceId = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Map<String, List<String>>> headers = ArgumentCaptor.forClass(Map.class);

verify(processInstances, times(1)).findByIdOrBusinessKey("invalidReference");
verify(processInstances, times(1)).findById("invalidReference");
verify(processService, never()).signalProcessInstance(eq(process), any(), any(), signal.capture());
verify(processService, times(1)).createProcessInstance(eq(process), any(), any(DummyModel.class), headers.capture(), any(), signal.capture(), referenceId.capture(), isNull());

Expand Down

0 comments on commit 2a03356

Please sign in to comment.