From ddb4fc34068741e61dc2e7824db81da8185034cd Mon Sep 17 00:00:00 2001 From: PRATHAM2002-DS Date: Thu, 16 Jan 2025 21:52:21 +0530 Subject: [PATCH] mesh-363: additional info for outputPort update event --- .../apache/atlas/repository/Constants.java | 4 ++++ .../store/graph/v2/EntityGraphMapper.java | 12 +++++++++-- .../java/org/apache/atlas/RequestContext.java | 20 +++++++++++++++++++ .../EntityNotificationListenerV2.java | 8 ++++++++ 4 files changed, 42 insertions(+), 2 deletions(-) diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java index ff3741878b..28bcbb74a8 100644 --- a/common/src/main/java/org/apache/atlas/repository/Constants.java +++ b/common/src/main/java/org/apache/atlas/repository/Constants.java @@ -158,6 +158,10 @@ public final class Constants { public static final String INPUT_PORT_PRODUCT_EDGE_LABEL = "__Asset.inputPortDataProducts"; public static final String OUTPUT_PORT_PRODUCT_EDGE_LABEL = "__Asset.outputPortDataProducts"; + public static final String OUTPUT_PORTS = "outputPorts"; + public static final String ADDED_OUTPUT_PORTS = "addedOutputPorts"; + public static final String REMOVED_OUTPUT_PORTS = "removedOutputPorts"; + public static final String UD_RELATIONSHIP_EDGE_LABEL = "__Referenceable.userDefRelationshipTo"; public static final String UD_RELATIONSHIP_END_NAME_FROM = "userDefRelationshipFrom"; public static final String UD_RELATIONSHIP_END_NAME_TO = "userDefRelationshipTo"; diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java index 76f0be44cb..498521891c 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java @@ -2331,12 +2331,20 @@ private void addInternalProductAttr(AttributeMutationContext ctx, List c private void addOrRemoveDaapInternalAttr(AtlasVertex toVertex, String internalAttr, List createdElements, List deletedElements) { if (CollectionUtils.isNotEmpty(createdElements)) { List addedGuids = createdElements.stream().map(x -> ((AtlasEdge) x).getOutVertex().getProperty("__guid", String.class)).collect(Collectors.toList()); - addedGuids.forEach(guid -> AtlasGraphUtilsV2.addEncodedProperty(toVertex, internalAttr, guid)); + //addedGuids.forEach(guid -> AtlasGraphUtilsV2.addEncodedProperty(toVertex, internalAttr, guid)); + + if (internalAttr.equals(OUTPUT_PORT_GUIDS_ATTR)) { + RequestContext.get().setAddedOutputPorts(addedGuids); + } } if (CollectionUtils.isNotEmpty(deletedElements)) { List removedGuids = deletedElements.stream().map(x -> x.getOutVertex().getProperty("__guid", String.class)).collect(Collectors.toList()); - removedGuids.forEach(guid -> AtlasGraphUtilsV2.removeItemFromListPropertyValue(toVertex, internalAttr, guid)); + //removedGuids.forEach(guid -> AtlasGraphUtilsV2.removeItemFromListPropertyValue(toVertex, internalAttr, guid)); + + if (internalAttr.equals(OUTPUT_PORT_GUIDS_ATTR)) { + RequestContext.get().setRemovedOutputPorts(removedGuids); + } } } diff --git a/server-api/src/main/java/org/apache/atlas/RequestContext.java b/server-api/src/main/java/org/apache/atlas/RequestContext.java index 45e63641c5..b3ffadd437 100644 --- a/server-api/src/main/java/org/apache/atlas/RequestContext.java +++ b/server-api/src/main/java/org/apache/atlas/RequestContext.java @@ -110,6 +110,8 @@ public class RequestContext { private boolean delayTagNotifications = false; private Map> deletedClassificationAndVertices = new HashMap<>(); private Map> addedClassificationAndVertices = new HashMap<>(); + private final List addedOutputPorts = new ArrayList<>(); + private final List removedOutputPorts = new ArrayList<>(); private RequestContext() { @@ -175,6 +177,8 @@ public void clearCache() { this.delayTagNotifications = false; deletedClassificationAndVertices.clear(); addedClassificationAndVertices.clear(); + this.addedOutputPorts.clear(); + this.removedOutputPorts.clear(); if (metrics != null && !metrics.isEmpty()) { METRICS.debug(metrics.toString()); @@ -829,4 +833,20 @@ public boolean isEdgeLabelAlreadyProcessed(String processEdgeLabel) { return edgeLabels.contains(processEdgeLabel); } + public void setAddedOutputPorts(List addedOutputPorts) { + this.addedOutputPorts.addAll(addedOutputPorts); + } + + public List getAddedOutputPorts() { + return addedOutputPorts; + } + + public void setRemovedOutputPorts(List removedOutputPorts) { + this.removedOutputPorts.addAll(removedOutputPorts); + } + + public List getRemovedOutputPorts() { + return removedOutputPorts; + } + } \ No newline at end of file diff --git a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java index 06b8f9678d..b3922e51b1 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java +++ b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java @@ -49,6 +49,7 @@ import java.util.*; import static org.apache.atlas.model.notification.EntityNotification.EntityNotificationV2.OperationType.*; +import static org.apache.atlas.repository.Constants.*; import static org.apache.atlas.repository.graph.GraphHelper.isInternalType; import static org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.CREATE_TIME; import static org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.DESCRIPTION; @@ -152,6 +153,9 @@ private void notifyEntityEvents(List entities, OperationType operat Map differentialEntities = RequestContext.get().getDifferentialEntitiesMap(); Map requestContextHeaders = RequestContext.get().getRequestContextHeaders(); + List addedOutputPorts = RequestContext.get().getAddedOutputPorts(); + List removedOutputPorts = RequestContext.get().getRemovedOutputPorts(); + List messages = new ArrayList<>(); for (AtlasEntity entity : entities) { @@ -162,6 +166,10 @@ private void notifyEntityEvents(List entities, OperationType operat if(differentialEntities != null){ if (differentialEntities.containsKey(entityGuid)) { + if (differentialEntities.get(entityGuid).hasRelationshipAttribute(OUTPUT_PORTS)) { + differentialEntities.get(entityGuid).setRelationshipAttribute(ADDED_OUTPUT_PORTS, addedOutputPorts); + differentialEntities.get(entityGuid).setRelationshipAttribute(REMOVED_OUTPUT_PORTS, removedOutputPorts); + } messages.add(new EntityNotificationV2(toNotificationHeader(entity), differentialEntities.get(entityGuid), operationType, RequestContext.get().getRequestTime(), requestContextHeaders)); }else {