Skip to content

Commit

Permalink
Merge pull request #3988 from atlanhq/mesh-363
Browse files Browse the repository at this point in the history
MESH-363 : Additional info for outputPort update event
  • Loading branch information
PRATHAM2002-DS authored Jan 16, 2025
2 parents 266fc57 + bb45a49 commit ef81419
Showing 1 changed file with 24 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2003,7 +2003,7 @@ public List mapArrayValue(AttributeMutationContext ctx, EntityMutationContext co

case INPUT_PORT_PRODUCT_EDGE_LABEL:
case OUTPUT_PORT_PRODUCT_EDGE_LABEL:
addInternalProductAttr(ctx, newElementsCreated, removedElements);
addInternalProductAttr(ctx, newElementsCreated, removedElements, currentElements);
break;

case UD_RELATIONSHIP_EDGE_LABEL:
Expand Down Expand Up @@ -2097,7 +2097,7 @@ public List appendArrayValue(AttributeMutationContext ctx, EntityMutationContext

case INPUT_PORT_PRODUCT_EDGE_LABEL:
case OUTPUT_PORT_PRODUCT_EDGE_LABEL:
addInternalProductAttr(ctx, newElementsCreated, null);
addInternalProductAttr(ctx, newElementsCreated, null, null);
break;

case UD_RELATIONSHIP_EDGE_LABEL:
Expand Down Expand Up @@ -2173,7 +2173,7 @@ public List removeArrayValue(AttributeMutationContext ctx, EntityMutationContext

case INPUT_PORT_PRODUCT_EDGE_LABEL:
case OUTPUT_PORT_PRODUCT_EDGE_LABEL:
addInternalProductAttr(ctx, null , removedElements);
addInternalProductAttr(ctx, null , removedElements, null);
break;
}

Expand Down Expand Up @@ -2306,7 +2306,7 @@ private static void validateCustomRelationshipCount(long size, AtlasVertex verte
}
}

private void addInternalProductAttr(AttributeMutationContext ctx, List<Object> createdElements, List<AtlasEdge> deletedElements) throws AtlasBaseException {
private void addInternalProductAttr(AttributeMutationContext ctx, List<Object> createdElements, List<AtlasEdge> deletedElements, List<Object> currentElements) throws AtlasBaseException {
MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("addInternalProductAttrForAppend");
AtlasVertex toVertex = ctx.getReferringVertex();
String toVertexType = getTypeName(toVertex);
Expand All @@ -2321,30 +2321,40 @@ private void addInternalProductAttr(AttributeMutationContext ctx, List<Object> c
? OUTPUT_PORT_GUIDS_ATTR
: INPUT_PORT_GUIDS_ATTR;

addOrRemoveDaapInternalAttr(toVertex, attrName, createdElements, deletedElements);
addOrRemoveDaapInternalAttr(toVertex, attrName, createdElements, deletedElements, currentElements);
}else{
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Can not update product relations while updating any asset");
}
RequestContext.get().endMetricRecord(metricRecorder);
}

private void addOrRemoveDaapInternalAttr(AtlasVertex toVertex, String internalAttr, List<Object> createdElements, List<AtlasEdge> deletedElements) {
private void addOrRemoveDaapInternalAttr(AtlasVertex toVertex, String internalAttr, List<Object> createdElements, List<AtlasEdge> deletedElements, List<Object> currentElements) {
List<String> addedGuids = new ArrayList<>();
List<String> removedGuids = new ArrayList<>();
if (CollectionUtils.isNotEmpty(createdElements)) {
List<String> addedGuids = createdElements.stream().map(x -> ((AtlasEdge) x).getOutVertex().getProperty("__guid", String.class)).collect(Collectors.toList());
addedGuids = createdElements.stream().map(x -> ((AtlasEdge) x).getOutVertex().getProperty("__guid", String.class)).collect(Collectors.toList());
addedGuids.forEach(guid -> AtlasGraphUtilsV2.addEncodedProperty(toVertex, internalAttr, guid));

if (internalAttr.equals(OUTPUT_PORT_GUIDS_ATTR)) {
RequestContext.get().setAddedOutputPorts(addedGuids);
}
}

if (CollectionUtils.isNotEmpty(deletedElements)) {
List<String> removedGuids = deletedElements.stream().map(x -> x.getOutVertex().getProperty("__guid", String.class)).collect(Collectors.toList());
removedGuids = deletedElements.stream().map(x -> x.getOutVertex().getProperty("__guid", String.class)).collect(Collectors.toList());
removedGuids.forEach(guid -> AtlasGraphUtilsV2.removeItemFromListPropertyValue(toVertex, internalAttr, guid));
}

if (internalAttr.equals(OUTPUT_PORT_GUIDS_ATTR)) {
RequestContext.get().setRemovedOutputPorts(removedGuids);
// Add more info to outputPort update event.
if (internalAttr.equals(OUTPUT_PORT_GUIDS_ATTR)) {
if (CollectionUtils.isNotEmpty(currentElements)) {
List<String> currentElementGuids = currentElements.stream()
.filter(x -> ((AtlasEdge) x).getProperty(STATE_PROPERTY_KEY, String.class).equals("ACTIVE"))
.map(x -> ((AtlasEdge) x).getOutVertex().getProperty("__guid", String.class))
.collect(Collectors.toList());

addedGuids = addedGuids.stream()
.filter(guid -> !currentElementGuids.contains(guid))
.collect(Collectors.toList());
}
RequestContext.get().setAddedOutputPorts(addedGuids);
RequestContext.get().setRemovedOutputPorts(removedGuids);
}
}

Expand Down

0 comments on commit ef81419

Please sign in to comment.