Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[apache/incubator-kie-issues#1457] Allow grouping of events #2093

Merged
merged 8 commits into from
Oct 4, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;

import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent;
import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent;
Expand Down Expand Up @@ -57,6 +58,8 @@ public ProcessInstanceDataEvent<?> deserialize(JsonParser jp, DeserializationCon
String type = node.get("type").asText();

switch (type) {
case "MultipleProcessInstanceDataEvent":
return jp.getCodec().treeToValue(node, MultipleProcessInstanceDataEvent.class);
case "ProcessInstanceErrorDataEvent":
return (ProcessInstanceDataEvent<?>) jp.getCodec().treeToValue(node, ProcessInstanceErrorDataEvent.class);
case "ProcessInstanceNodeDataEvent":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;

import org.kie.kogito.event.usertask.MultipleUserTaskInstanceDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceAssignmentDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceAttachmentDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceCommentDataEvent;
Expand Down Expand Up @@ -58,6 +59,8 @@ public UserTaskInstanceDataEvent<?> deserialize(JsonParser jp, DeserializationCo
String type = node.get("type").asText();

switch (type) {
case "MultipleUserTaskInstanceDataEvent":
return jp.getCodec().treeToValue(node, MultipleUserTaskInstanceDataEvent.class);
case "UserTaskInstanceAssignmentDataEvent":
return (UserTaskInstanceDataEvent<?>) jp.getCodec().treeToValue(node, UserTaskInstanceAssignmentDataEvent.class);
case "UserTaskInstanceAttachmentDataEvent":
Expand All @@ -76,4 +79,4 @@ public UserTaskInstanceDataEvent<?> deserialize(JsonParser jp, DeserializationCo

}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@
import java.util.Optional;

import org.eclipse.microprofile.faulttolerance.Retry;
import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessDefinitionDataEvent;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent;
import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent;
import org.kie.kogito.event.process.ProcessInstanceSLADataEvent;
import org.kie.kogito.event.process.ProcessInstanceStateDataEvent;
import org.kie.kogito.event.process.ProcessInstanceVariableDataEvent;
import org.kie.kogito.event.usertask.MultipleUserTaskInstanceDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceAssignmentDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceAttachmentDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceCommentDataEvent;
Expand Down Expand Up @@ -76,6 +78,15 @@ public class IndexingService {
@Retry(maxRetries = 3, delay = 300, jitter = 100, retryOn = ConcurrentModificationException.class)
public void indexProcessInstanceEvent(ProcessInstanceDataEvent<?> event) {
ProcessInstanceStorage storage = manager.getProcessInstanceStorage();
if (event instanceof MultipleProcessInstanceDataEvent) {
for (ProcessInstanceDataEvent<?> item : ((MultipleProcessInstanceDataEvent) event).getData())
indexProccessInstanceEvent(storage, item);
} else {
indexProccessInstanceEvent(storage, event);
}
}

private void indexProccessInstanceEvent(ProcessInstanceStorage storage, ProcessInstanceDataEvent<?> event) {
if (event instanceof ProcessInstanceErrorDataEvent) {
storage.indexError((ProcessInstanceErrorDataEvent) event);
} else if (event instanceof ProcessInstanceNodeDataEvent) {
Expand All @@ -100,6 +111,16 @@ public void indexProcessDefinition(ProcessDefinitionDataEvent definitionDataEven
@Retry(maxRetries = 3, delay = 300, jitter = 100, retryOn = ConcurrentModificationException.class)
public <T> void indexUserTaskInstanceEvent(UserTaskInstanceDataEvent<T> event) {
UserTaskInstanceStorage storage = manager.getUserTaskInstanceStorage();
if (event instanceof MultipleUserTaskInstanceDataEvent) {
for (UserTaskInstanceDataEvent<?> item : ((MultipleUserTaskInstanceDataEvent) event).getData()) {
indexUserTaskInstanceEvent(storage, item);
}
} else {
indexUserTaskInstanceEvent(storage, event);
}
}

private void indexUserTaskInstanceEvent(UserTaskInstanceStorage storage, UserTaskInstanceDataEvent<?> event) {
if (event instanceof UserTaskInstanceAssignmentDataEvent) {
storage.indexAssignment((UserTaskInstanceAssignmentDataEvent) event);
} else if (event instanceof UserTaskInstanceAttachmentDataEvent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static io.restassured.RestAssured.given;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.jupiter.api.Assumptions.assumeTrue;

@Timeout(10000)
public abstract class AbstractMessagingConsumerIT {
Expand Down Expand Up @@ -121,6 +122,73 @@ void testJobEvent() throws Exception {
.body("data.Jobs[0].id", is(jobId)));
}

@Test
void testProcessInstanceEventCollection() throws Exception {
assumeTrue(shouldRunCollectionTests());
sendProcessInstanceEventCollection();

String processInstanceId1 = "processId-UUID1";
String processInstanceId2 = "processId-UUID2";

await()
.atMost(timeout)
.untilAsserted(() -> given().contentType(ContentType.JSON)
.body("{ \"query\" : \"{ ProcessInstances { id, state } }\" }")
.when().post("/graphql")
.then().log().ifValidationFails().statusCode(200)
.body("data.ProcessInstances.size()", is(2))
.body("data.ProcessInstances[0].id", is(processInstanceId1))
.body("data.ProcessInstances[0].state", is("ACTIVE"))
.body("data.ProcessInstances[1].id", is(processInstanceId2))
.body("data.ProcessInstances[1].state", is("ACTIVE")));

}

@Test
void testUserTaskInstanceEventCollection() throws Exception {
assumeTrue(shouldRunCollectionTests());
sendUserTaskInstanceEventCollection();

String taskId1 = "taskId-UUID1";
String taskId2 = "taskId-UUID2";

await()
.atMost(timeout)
.untilAsserted(() -> given().contentType(ContentType.JSON)
.body("{ \"query\" : \"{ UserTaskInstances { id, state } }\" }")
.when().post("/graphql")
.then().log().ifValidationFails().statusCode(200)
.body("data.UserTaskInstances.size()", is(2))
.body("data.UserTaskInstances[0].id", is(taskId1))
.body("data.UserTaskInstances[0].state", is("IN_PROGRESS"))
.body("data.UserTaskInstances[1].id", is(taskId2))
.body("data.UserTaskInstances[1].state", is("COMPLETED")));
}

@Test
void testProcessDefinitionEventCollection() throws Exception {
assumeTrue(shouldRunCollectionTests());
sendProcessDefinitionEventCollection();

String definitionId = "jsongreet";

await()
.atMost(timeout)
.untilAsserted(() -> given().contentType(ContentType.JSON)
.body("{ \"query\" : \"{ ProcessDefinitions { id, version } }\" }")
.when().post("/graphql")
.then().log().ifValidationFails().statusCode(200)
.body("data.ProcessDefinitions.size()", is(2))
.body("data.ProcessDefinitions[0].id", is(definitionId))
.body("data.ProcessDefinitions[0].version", is("1.0"))
.body("data.ProcessDefinitions[1].id", is(definitionId))
.body("data.ProcessDefinitions[1].version", is("1.1")));
}

protected boolean shouldRunCollectionTests() {
return true; // Default is to run the collection tests
}
fjtirado marked this conversation as resolved.
Show resolved Hide resolved

protected abstract void sendUserTaskInstanceEvent() throws Exception;

protected abstract void sendProcessInstanceEvent() throws Exception;
Expand All @@ -129,4 +197,9 @@ void testJobEvent() throws Exception {

protected abstract void sendJobEvent() throws Exception;

protected abstract void sendProcessInstanceEventCollection() throws Exception;

protected abstract void sendUserTaskInstanceEventCollection() throws Exception;

protected abstract void sendProcessDefinitionEventCollection() throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@
*/
package org.kie.kogito.index.service.messaging;

import java.net.URI;
import java.util.Collection;
import java.util.List;

import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessDefinitionDataEvent;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.usertask.MultipleUserTaskInstanceDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent;
import org.kie.kogito.index.event.KogitoJobCloudEvent;
import org.kie.kogito.index.json.JsonUtils;
Expand Down Expand Up @@ -69,4 +75,25 @@ protected void sendJobEvent() throws Exception {
connector.source(KOGITO_JOBS_EVENTS).send(event);
}

protected void sendProcessInstanceEventCollection() throws Exception {
Collection<ProcessInstanceDataEvent<?>> events = List.of(
getProcessCloudEvent("travels", "processId-UUID1", ProcessInstanceState.ACTIVE, null, null, null, "user1"),
getProcessCloudEvent("travels", "processId-UUID2", ProcessInstanceState.ACTIVE, null, null, null, "user2"));
connector.source(KOGITO_PROCESSINSTANCES_EVENTS).send(new MultipleProcessInstanceDataEvent(URI.create("test"), events));
}

@Override
protected void sendUserTaskInstanceEventCollection() throws Exception {
Collection<UserTaskInstanceDataEvent<?>> events = List.of(
getUserTaskCloudEvent("taskId-UUID1", "travels", "processId-UUID1", null, null, "IN_PROGRESS"),
getUserTaskCloudEvent("taskId-UUID2", "travels", "processId-UUID1", null, null, "COMPLETED"));
connector.source(KOGITO_USERTASKINSTANCES_EVENTS).send(new MultipleUserTaskInstanceDataEvent(URI.create("test"), events));
}

@Override
protected void sendProcessDefinitionEventCollection() throws Exception {
connector.source(KOGITO_PROCESS_DEFINITIONS_EVENTS).send(JsonUtils.getObjectMapper().readValue(readFileContent("process_definition_event.json"), ProcessDefinitionDataEvent.class));
connector.source(KOGITO_PROCESS_DEFINITIONS_EVENTS).send(JsonUtils.getObjectMapper().readValue(readFileContent("process_definition_11_event.json"), ProcessDefinitionDataEvent.class));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,28 @@
*/
package org.kie.kogito.index.service.messaging;

import java.net.URI;
import java.util.Collection;
import java.util.List;

import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.usertask.MultipleUserTaskInstanceDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent;
import org.kie.kogito.index.model.ProcessInstanceState;
import org.kie.kogito.jackson.utils.ObjectMapperFactory;
import org.kie.kogito.test.quarkus.kafka.KafkaTestClient;
import org.kie.kogito.testcontainers.quarkus.KafkaQuarkusTestResource;

import static org.kie.kogito.index.service.messaging.ReactiveMessagingEventConsumer.KOGITO_JOBS_EVENTS;
import static org.kie.kogito.index.service.messaging.ReactiveMessagingEventConsumer.KOGITO_PROCESSINSTANCES_EVENTS;
import static org.kie.kogito.index.service.messaging.ReactiveMessagingEventConsumer.KOGITO_PROCESS_DEFINITIONS_EVENTS;
import static org.kie.kogito.index.service.messaging.ReactiveMessagingEventConsumer.KOGITO_USERTASKINSTANCES_EVENTS;
import static org.kie.kogito.index.test.TestUtils.getProcessCloudEvent;
import static org.kie.kogito.index.test.TestUtils.getUserTaskCloudEvent;
import static org.kie.kogito.index.test.TestUtils.readFileContent;

public abstract class AbstractMessagingKafkaConsumerIT extends AbstractMessagingConsumerIT {
Expand Down Expand Up @@ -67,6 +79,28 @@ protected void sendJobEvent() throws Exception {
send("job_event.json", KOGITO_JOBS_EVENTS);
}

@Override
protected void sendProcessInstanceEventCollection() throws Exception {
Collection<ProcessInstanceDataEvent<?>> events = List.of(
getProcessCloudEvent("travels", "processId-UUID1", ProcessInstanceState.ACTIVE, null, null, null, "user1"),
getProcessCloudEvent("travels", "processId-UUID2", ProcessInstanceState.ACTIVE, null, null, null, "user2"));
kafkaClient.produce(ObjectMapperFactory.get().writeValueAsString(new MultipleProcessInstanceDataEvent(URI.create("test"), events)), KOGITO_PROCESSINSTANCES_EVENTS);
}

@Override
protected void sendUserTaskInstanceEventCollection() throws Exception {
Collection<UserTaskInstanceDataEvent<?>> events = List.of(
getUserTaskCloudEvent("taskId-UUID1", "travels", "processId-UUID1", null, null, "IN_PROGRESS"),
getUserTaskCloudEvent("taskId-UUID2", "travels", "processId-UUID1", null, null, "COMPLETED"));
kafkaClient.produce(ObjectMapperFactory.get().writeValueAsString(new MultipleUserTaskInstanceDataEvent(URI.create("test"), events)), KOGITO_USERTASKINSTANCES_EVENTS);
}

@Override
protected void sendProcessDefinitionEventCollection() throws Exception {
kafkaClient.produce(readFileContent("process_definition_event.json"), KOGITO_PROCESS_DEFINITIONS_EVENTS);
kafkaClient.produce(readFileContent("process_definition_11_event.json"), KOGITO_PROCESS_DEFINITIONS_EVENTS);
}

private void send(String file, String topic) throws Exception {
String json = readFileContent(file);
kafkaClient.produce(json, topic);
Expand Down
Loading
Loading