Skip to content

Commit

Permalink
[Fix apache/incubator-kie-issues#1457] Converter deserialization
Browse files Browse the repository at this point in the history
  • Loading branch information
fjtirado committed Oct 4, 2024
1 parent 5e3ef94 commit be1654b
Show file tree
Hide file tree
Showing 6 changed files with 8 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public ProcessInstanceDataEvent<?> deserialize(JsonParser jp, DeserializationCon
String type = node.get("type").asText();

switch (type) {
case "MultipleProcessInstanceDataEvent":
case MultipleProcessInstanceDataEvent.TYPE:
return jp.getCodec().treeToValue(node, MultipleProcessInstanceDataEvent.class);
case "ProcessInstanceErrorDataEvent":
return (ProcessInstanceDataEvent<?>) jp.getCodec().treeToValue(node, ProcessInstanceErrorDataEvent.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public UserTaskInstanceDataEvent<?> deserialize(JsonParser jp, DeserializationCo
String type = node.get("type").asText();

switch (type) {
case "MultipleUserTaskInstanceDataEvent":
case MultipleUserTaskInstanceDataEvent.TYPE:
return jp.getCodec().treeToValue(node, MultipleUserTaskInstanceDataEvent.class);
case "UserTaskInstanceAssignmentDataEvent":
return (UserTaskInstanceDataEvent<?>) jp.getCodec().treeToValue(node, UserTaskInstanceAssignmentDataEvent.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public ProcessInstanceDataEvent<?> deserialize(JsonParser jp, DeserializationCon
String type = node.get("type").asText();

switch (type) {
case "MultipleProcessInstanceDataEvent":
case MultipleProcessInstanceDataEvent.TYPE:
return jp.getCodec().treeToValue(node, MultipleProcessInstanceDataEvent.class);
case "ProcessInstanceErrorDataEvent":
return (ProcessInstanceDataEvent<?>) jp.getCodec().treeToValue(node, ProcessInstanceErrorDataEvent.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public UserTaskInstanceDataEvent<?> deserialize(JsonParser jp, DeserializationCo
String type = node.get("type").asText();

switch (type) {
case "MultipleUserTaskInstanceDataEvent":
case MultipleUserTaskInstanceDataEvent.TYPE:
return jp.getCodec().treeToValue(node, MultipleUserTaskInstanceDataEvent.class);
case "UserTaskInstanceAssignmentDataEvent":
return (UserTaskInstanceDataEvent<?>) jp.getCodec().treeToValue(node, UserTaskInstanceAssignmentDataEvent.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.eclipse.microprofile.reactive.messaging.Message;
import org.kie.kogito.event.AbstractDataEvent;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessDefinitionDataEvent;
import org.kie.kogito.event.process.ProcessDefinitionEventBody;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
Expand All @@ -38,6 +39,7 @@
import org.kie.kogito.event.process.ProcessInstanceStateEventBody;
import org.kie.kogito.event.process.ProcessInstanceVariableDataEvent;
import org.kie.kogito.event.process.ProcessInstanceVariableEventBody;
import org.kie.kogito.event.usertask.MultipleUserTaskInstanceDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceAssignmentDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceAssignmentEventBody;
import org.kie.kogito.event.usertask.UserTaskInstanceAttachmentDataEvent;
Expand Down Expand Up @@ -88,9 +90,7 @@ public boolean canConvert(Message<?> message, Type type) {
}

private boolean isIndexable(Type type) {
return type == ProcessInstanceDataEvent.class
|| type == ProcessDefinitionDataEvent.class
|| type == UserTaskInstanceDataEvent.class
return type == ProcessDefinitionDataEvent.class
|| type == KogitoJobCloudEvent.class;
}

Expand All @@ -106,12 +106,8 @@ public Message<?> convert(Message<?> message, Type type) {
MessageReader messageReader = VertxMessageFactory.createReader(httpHeaders, buffer);
cloudEvent = messageReader.toEvent();

if (type.getTypeName().equals(ProcessInstanceDataEvent.class.getTypeName())) {
return message.withPayload(buildProcessInstanceDataEventVariant(cloudEvent));
} else if (type.getTypeName().equals(KogitoJobCloudEvent.class.getTypeName())) {
if (type.getTypeName().equals(KogitoJobCloudEvent.class.getTypeName())) {
return message.withPayload(buildKogitoJobCloudEvent(cloudEvent));
} else if (type.getTypeName().equals(UserTaskInstanceDataEvent.class.getTypeName())) {
return message.withPayload(buildUserTaskInstanceDataEvent(cloudEvent));
} else if (type.getTypeName().equals(ProcessDefinitionDataEvent.class.getTypeName())) {
return message.withPayload(buildProcessDefinitionEvent(cloudEvent));
}
Expand All @@ -137,41 +133,6 @@ public void setObjectMapper(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}

private DataEvent<?> buildProcessInstanceDataEventVariant(CloudEvent cloudEvent) throws IOException {
switch (cloudEvent.getType()) {
case "ProcessInstanceErrorDataEvent":
return buildDataEvent(cloudEvent, objectMapper, ProcessInstanceErrorDataEvent::new, ProcessInstanceErrorEventBody.class);
case "ProcessInstanceNodeDataEvent":
return buildDataEvent(cloudEvent, objectMapper, ProcessInstanceNodeDataEvent::new, ProcessInstanceNodeEventBody.class);
case "ProcessInstanceSLADataEvent":
return buildDataEvent(cloudEvent, objectMapper, ProcessInstanceSLADataEvent::new, ProcessInstanceSLAEventBody.class);
case "ProcessInstanceStateDataEvent":
return buildDataEvent(cloudEvent, objectMapper, ProcessInstanceStateDataEvent::new, ProcessInstanceStateEventBody.class);
case "ProcessInstanceVariableDataEvent":
return buildDataEvent(cloudEvent, objectMapper, ProcessInstanceVariableDataEvent::new, ProcessInstanceVariableEventBody.class);
default:
throw new IllegalArgumentException("Unknown ProcessInstanceDataEvent variant: " + cloudEvent.getType());
}
}

private DataEvent<?> buildUserTaskInstanceDataEvent(CloudEvent cloudEvent) throws IOException {
switch (cloudEvent.getType()) {
case "UserTaskInstanceAssignmentDataEvent":
return buildDataEvent(cloudEvent, objectMapper, UserTaskInstanceAssignmentDataEvent::new, UserTaskInstanceAssignmentEventBody.class);
case "UserTaskInstanceAttachmentDataEvent":
return buildDataEvent(cloudEvent, objectMapper, UserTaskInstanceAttachmentDataEvent::new, UserTaskInstanceAttachmentEventBody.class);
case "UserTaskInstanceCommentDataEvent":
return buildDataEvent(cloudEvent, objectMapper, UserTaskInstanceCommentDataEvent::new, UserTaskInstanceCommentEventBody.class);
case "UserTaskInstanceDeadlineDataEvent":
return buildDataEvent(cloudEvent, objectMapper, UserTaskInstanceDeadlineDataEvent::new, UserTaskInstanceDeadlineEventBody.class);
case "UserTaskInstanceStateDataEvent":
return buildDataEvent(cloudEvent, objectMapper, UserTaskInstanceStateDataEvent::new, UserTaskInstanceStateEventBody.class);
case "UserTaskInstanceVariableDataEvent":
return buildDataEvent(cloudEvent, objectMapper, UserTaskInstanceVariableDataEvent::new, UserTaskInstanceVariableEventBody.class);
default:
throw new IllegalArgumentException("Unknown UserTaskInstanceDataEvent variant: " + cloudEvent.getType());
}
}

private KogitoJobCloudEvent buildKogitoJobCloudEvent(CloudEvent cloudEvent) throws IOException {
KogitoJobCloudEvent jobCloudEvent = new KogitoJobCloudEvent();
Expand All @@ -188,16 +149,6 @@ private KogitoJobCloudEvent buildKogitoJobCloudEvent(CloudEvent cloudEvent) thro
return jobCloudEvent;
}

private static <E extends AbstractDataEvent<T>, T> E buildDataEvent(CloudEvent cloudEvent, ObjectMapper objectMapper, Supplier<E> supplier, Class<T> clazz) throws IOException {
E dataEvent = supplier.get();
applyCloudEventAttributes(cloudEvent, dataEvent);
applyExtensions(cloudEvent, dataEvent);
if (cloudEvent.getData() != null) {
dataEvent.setData(objectMapper.readValue(cloudEvent.getData().toBytes(), clazz));
}
return dataEvent;
}

private static void applyCloudEventAttributes(CloudEvent cloudEvent, AbstractDataEvent<?> dataEvent) {
dataEvent.setSpecVersion(cloudEvent.getSpecVersion());
dataEvent.setId(cloudEvent.getId());
Expand All @@ -208,8 +159,4 @@ private static void applyCloudEventAttributes(CloudEvent cloudEvent, AbstractDat
dataEvent.setSubject(cloudEvent.getSubject());
dataEvent.setTime(cloudEvent.getTime());
}

private static void applyExtensions(CloudEvent cloudEvent, AbstractDataEvent<?> dataEvent) {
cloudEvent.getExtensionNames().forEach(extensionName -> dataEvent.addExtensionAttribute(extensionName, cloudEvent.getExtension(extensionName)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,6 @@ void testProcessDefinitionEventCollection() throws Exception {
.body("data.ProcessDefinitions[1].version", is("1.1")));
}

<<<<<<< Upstream, based on e6cc0b002938513d7f447f46b31b3feeee032d53
<<<<<<< Upstream, based on e6cc0b002938513d7f447f46b31b3feeee032d53
=======

>>>>>>> 462bcee Update data-index/data-index-service/data-index-service-common/src/test/java/org/kie/kogito/index/service/messaging/AbstractMessagingConsumerIT.java
=======
>>>>>>> e621c56 Update data-index/data-index-service/data-index-service-postgresql/src/test/java/org/kie/kogito/index/service/messaging/PostgreSqlMessagingKafkaConsumerIT.java
protected abstract void sendUserTaskInstanceEvent() throws Exception;

protected abstract void sendProcessInstanceEvent() throws Exception;
Expand Down

0 comments on commit be1654b

Please sign in to comment.