Skip to content

Commit

Permalink
Merge pull request #3986 from atlanhq/mesh-363
Browse files Browse the repository at this point in the history
MESH-363 : Additional info for outputPort update event
  • Loading branch information
PRATHAM2002-DS authored Jan 16, 2025
2 parents 18804aa + ddb4fc3 commit 751407e
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ public final class Constants {
public static final String INPUT_PORT_PRODUCT_EDGE_LABEL = "__Asset.inputPortDataProducts";
public static final String OUTPUT_PORT_PRODUCT_EDGE_LABEL = "__Asset.outputPortDataProducts";

public static final String OUTPUT_PORTS = "outputPorts";
public static final String ADDED_OUTPUT_PORTS = "addedOutputPorts";
public static final String REMOVED_OUTPUT_PORTS = "removedOutputPorts";

public static final String UD_RELATIONSHIP_EDGE_LABEL = "__Referenceable.userDefRelationshipTo";
public static final String UD_RELATIONSHIP_END_NAME_FROM = "userDefRelationshipFrom";
public static final String UD_RELATIONSHIP_END_NAME_TO = "userDefRelationshipTo";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2331,12 +2331,20 @@ private void addInternalProductAttr(AttributeMutationContext ctx, List<Object> c
private void addOrRemoveDaapInternalAttr(AtlasVertex toVertex, String internalAttr, List<Object> createdElements, List<AtlasEdge> deletedElements) {
if (CollectionUtils.isNotEmpty(createdElements)) {
List<String> addedGuids = createdElements.stream().map(x -> ((AtlasEdge) x).getOutVertex().getProperty("__guid", String.class)).collect(Collectors.toList());
addedGuids.forEach(guid -> AtlasGraphUtilsV2.addEncodedProperty(toVertex, internalAttr, guid));
//addedGuids.forEach(guid -> AtlasGraphUtilsV2.addEncodedProperty(toVertex, internalAttr, guid));

if (internalAttr.equals(OUTPUT_PORT_GUIDS_ATTR)) {
RequestContext.get().setAddedOutputPorts(addedGuids);
}
}

if (CollectionUtils.isNotEmpty(deletedElements)) {
List<String> removedGuids = deletedElements.stream().map(x -> x.getOutVertex().getProperty("__guid", String.class)).collect(Collectors.toList());
removedGuids.forEach(guid -> AtlasGraphUtilsV2.removeItemFromListPropertyValue(toVertex, internalAttr, guid));
//removedGuids.forEach(guid -> AtlasGraphUtilsV2.removeItemFromListPropertyValue(toVertex, internalAttr, guid));

if (internalAttr.equals(OUTPUT_PORT_GUIDS_ATTR)) {
RequestContext.get().setRemovedOutputPorts(removedGuids);
}
}
}

Expand Down
20 changes: 20 additions & 0 deletions server-api/src/main/java/org/apache/atlas/RequestContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ public class RequestContext {
private boolean delayTagNotifications = false;
private Map<AtlasClassification, Collection<Object>> deletedClassificationAndVertices = new HashMap<>();
private Map<AtlasClassification, Collection<Object>> addedClassificationAndVertices = new HashMap<>();
private final List<String> addedOutputPorts = new ArrayList<>();
private final List<String> removedOutputPorts = new ArrayList<>();


private RequestContext() {
Expand Down Expand Up @@ -181,6 +183,8 @@ public void clearCache() {
this.delayTagNotifications = false;
deletedClassificationAndVertices.clear();
addedClassificationAndVertices.clear();
this.addedOutputPorts.clear();
this.removedOutputPorts.clear();

if (metrics != null && !metrics.isEmpty()) {
METRICS.debug(metrics.toString());
Expand Down Expand Up @@ -866,4 +870,20 @@ public boolean isEdgeLabelAlreadyProcessed(String processEdgeLabel) {
return edgeLabels.contains(processEdgeLabel);
}

public void setAddedOutputPorts(List<String> addedOutputPorts) {
this.addedOutputPorts.addAll(addedOutputPorts);
}

public List<String> getAddedOutputPorts() {
return addedOutputPorts;
}

public void setRemovedOutputPorts(List<String> removedOutputPorts) {
this.removedOutputPorts.addAll(removedOutputPorts);
}

public List<String> getRemovedOutputPorts() {
return removedOutputPorts;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.*;

import static org.apache.atlas.model.notification.EntityNotification.EntityNotificationV2.OperationType.*;
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.graph.GraphHelper.isInternalType;
import static org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.CREATE_TIME;
import static org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.DESCRIPTION;
Expand Down Expand Up @@ -152,6 +153,9 @@ private void notifyEntityEvents(List<AtlasEntity> entities, OperationType operat
Map<String,AtlasEntity> differentialEntities = RequestContext.get().getDifferentialEntitiesMap();
Map<String, String> requestContextHeaders = RequestContext.get().getRequestContextHeaders();

List<String> addedOutputPorts = RequestContext.get().getAddedOutputPorts();
List<String> removedOutputPorts = RequestContext.get().getRemovedOutputPorts();

List<EntityNotificationV2> messages = new ArrayList<>();

for (AtlasEntity entity : entities) {
Expand All @@ -162,6 +166,10 @@ private void notifyEntityEvents(List<AtlasEntity> entities, OperationType operat

if(differentialEntities != null){
if (differentialEntities.containsKey(entityGuid)) {
if (differentialEntities.get(entityGuid).hasRelationshipAttribute(OUTPUT_PORTS)) {
differentialEntities.get(entityGuid).setRelationshipAttribute(ADDED_OUTPUT_PORTS, addedOutputPorts);
differentialEntities.get(entityGuid).setRelationshipAttribute(REMOVED_OUTPUT_PORTS, removedOutputPorts);
}
messages.add(new EntityNotificationV2(toNotificationHeader(entity), differentialEntities.get(entityGuid),
operationType, RequestContext.get().getRequestTime(), requestContextHeaders));
}else {
Expand Down

0 comments on commit 751407e

Please sign in to comment.