Skip to content

Commit

Permalink
[Fix apache/incubator-kie-issues#1457] Grouping of events 2nd approach
Browse files Browse the repository at this point in the history
  • Loading branch information
gmunozfe committed Sep 18, 2024
1 parent f634bfd commit 3291fa2
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.kie.kogito.index.service.messaging;

import java.util.Collection;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.process.ProcessDefinitionDataEvent;
Expand All @@ -29,6 +31,7 @@
import org.slf4j.LoggerFactory;

import io.quarkus.arc.properties.IfBuildProperty;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.annotations.Blocking;

import jakarta.enterprise.context.ApplicationScoped;
Expand Down Expand Up @@ -57,34 +60,101 @@ public class BlockingMessagingEventConsumer {
@Incoming(KOGITO_PROCESSINSTANCES_EVENTS)
@Blocking
@Transactional
public void onProcessInstanceEvent(ProcessInstanceDataEvent<?> event) {
LOGGER.debug("Process instance consumer received ProcessInstanceDataEvent: \n{}", event);
indexingService.indexProcessInstanceEvent(event);
eventPublisher.fire(event);
public Uni<Void> onProcessInstanceEvent(Object input) {
if (input instanceof Collection) {
@SuppressWarnings("unchecked")
Collection<ProcessInstanceDataEvent<?>> events = (Collection<ProcessInstanceDataEvent<?>>) input;
LOGGER.debug("Process instance consumer received grouped ProcessInstanceDataEvents: \n{}", events);
for (ProcessInstanceDataEvent<?> event : events) {
handleProcessInstanceEvent(event);
}
} else if (input instanceof ProcessInstanceDataEvent) {
ProcessInstanceDataEvent<?> event = (ProcessInstanceDataEvent<?>) input;
LOGGER.debug("Process instance consumer received ProcessInstanceDataEvent: \n{}", event);
handleProcessInstanceEvent(event);
} else {
LOGGER.error("Unknown event type received: {}", input.getClass());
}
return Uni.createFrom().voidItem();
}

@Incoming(KOGITO_USERTASKINSTANCES_EVENTS)
@Blocking
@Transactional
public void onUserTaskInstanceEvent(UserTaskInstanceDataEvent<?> event) {
LOGGER.debug("Task instance received UserTaskInstanceDataEvent \n{}", event);
indexingService.indexUserTaskInstanceEvent(event);
eventPublisher.fire(event);
public Uni<Void> onUserTaskInstanceEvent(Object input) {
if (input instanceof Collection) {
@SuppressWarnings("unchecked")
Collection<UserTaskInstanceDataEvent<?>> events = (Collection<UserTaskInstanceDataEvent<?>>) input;
LOGGER.debug("UserTask instance consumer received grouped UserTaskInstanceDataEvent: \n{}", events);
for (UserTaskInstanceDataEvent<?> event : events) {
handleUserTaskInstanceEvent(event);
}
} else if (input instanceof UserTaskInstanceDataEvent) {
UserTaskInstanceDataEvent<?> event = (UserTaskInstanceDataEvent<?>) input;
LOGGER.debug("Process instance consumer received UserTaskInstanceDataEvent: \n{}", event);
handleUserTaskInstanceEvent(event);
} else {
LOGGER.error("Unknown event type received: {}", input.getClass());
}
return Uni.createFrom().voidItem();
}

@Incoming(KOGITO_JOBS_EVENTS)
@Blocking
@Transactional
public void onJobEvent(KogitoJobCloudEvent event) {
public Uni<Void> onJobEvent(KogitoJobCloudEvent event) {
LOGGER.debug("Job received KogitoJobCloudEvent \n{}", event);
indexingService.indexJob(event.getData());
return Uni.createFrom().item(event)
.onItem().invoke(e -> indexingService.indexJob(e.getData()))
.onFailure().invoke(t -> LOGGER.error("Error processing job KogitoJobCloudEvent: {}", t.getMessage(), t))
.onItem().ignore().andContinueWithNull();
}

@Incoming(KOGITO_PROCESS_DEFINITIONS_EVENTS)
@Blocking
@Transactional
public void onProcessDefinitionDataEvent(ProcessDefinitionDataEvent event) {
LOGGER.debug("Job received KogitoJobCloudEvent \n{}", event);
indexingService.indexProcessDefinition(event);
public Uni<Void> onProcessDefinitionDataEvent(Object input) {
if (input instanceof Collection) {
@SuppressWarnings("unchecked")
Collection<ProcessDefinitionDataEvent> events = (Collection<ProcessDefinitionDataEvent>) input;
LOGGER.debug("Process definition instance consumer received grouped ProcessDefinitionDataEvent: \n{}", events);
for (ProcessDefinitionDataEvent event : events) {
handleProcessDefinitionEvent(event);
}
} else if (input instanceof ProcessDefinitionDataEvent) {
ProcessDefinitionDataEvent event = (ProcessDefinitionDataEvent) input;
LOGGER.debug("Process definition consumer received ProcessDefinitionDataEvent: \n{}", event);
handleProcessDefinitionEvent(event);
} else {
LOGGER.error("Unknown event type received: {}", input.getClass());
}
return Uni.createFrom().voidItem();
}

private void handleProcessInstanceEvent(ProcessInstanceDataEvent<?> event) {
try {
indexingService.indexProcessInstanceEvent(event);
eventPublisher.fire(event);
} catch (Exception ex) {
LOGGER.error("Error processing process instance event: {}", event, ex);
}
}

private void handleUserTaskInstanceEvent(UserTaskInstanceDataEvent<?> event) {
try {
indexingService.indexUserTaskInstanceEvent(event);
eventPublisher.fire(event);
} catch (Exception ex) {
LOGGER.error("Error processing userTask instance event: {}", event, ex);
}
}

private void handleProcessDefinitionEvent(ProcessDefinitionDataEvent event) {
try {
indexingService.indexProcessDefinition(event);
eventPublisher.fire(event);
} catch (Exception ex) {
LOGGER.error("Error processing process definition event: {}", event, ex);
}
}
}

0 comments on commit 3291fa2

Please sign in to comment.