Skip to content

Commit

Permalink
Merge pull request #3660 from atlanhq/master
Browse files Browse the repository at this point in the history
MM-3720 | Sync with Master
  • Loading branch information
aarshi0301 authored Oct 24, 2024
2 parents 6cffb84 + dac0b61 commit 6bbc192
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1489,16 +1489,31 @@ public void resetHasLineageOnInputOutputDelete(Collection<AtlasEdge> removedEdge

boolean isOutputEdge = PROCESS_OUTPUTS.equals(atlasEdge.getLabel());

AtlasVertex processVertex = atlasEdge.getOutVertex();
AtlasVertex assetVertex = atlasEdge.getInVertex();
String assetEdgeLabel = getLabel(getGuid(assetVertex), atlasEdge.getLabel());

boolean assetLabelPairAlreadyProcessed = RequestContext.get().isEdgeLabelAlreadyProcessed(assetEdgeLabel);

if (!assetLabelPairAlreadyProcessed) {
RequestContext.get().addEdgeLabel(assetEdgeLabel);
if (getStatus(assetVertex) == ACTIVE && !assetVertex.equals(deletedVertex)) {
updateAssetHasLineageStatus(assetVertex, atlasEdge, removedEdges);
}
}

AtlasVertex processVertex = atlasEdge.getOutVertex();
String processId = getGuid(processVertex);
String edgeLabel = isOutputEdge ? PROCESS_OUTPUTS : PROCESS_INPUTS;
String processEdgeLabel = getLabel(processId, edgeLabel);
boolean processLabelPairAlreadyProcessed = RequestContext.get().isEdgeLabelAlreadyProcessed(processEdgeLabel);

if (getStatus(assetVertex) == ACTIVE && !assetVertex.equals(deletedVertex)) {
updateAssetHasLineageStatus(assetVertex, atlasEdge, removedEdges);
if (processLabelPairAlreadyProcessed) {
continue;
}

if (getStatus(processVertex) == ACTIVE && !processVertex.equals(deletedVertex)) {
String edgeLabel = isOutputEdge ? PROCESS_OUTPUTS : PROCESS_INPUTS;
RequestContext.get().addEdgeLabel(processEdgeLabel);

if (getStatus(processVertex) == ACTIVE && !processVertex.equals(deletedVertex)) {
Iterator<AtlasEdge> edgeIterator = GraphHelper.getActiveEdges(processVertex, edgeLabel, AtlasEdgeDirection.BOTH);

boolean activeEdgeFound = false;
Expand All @@ -1520,6 +1535,14 @@ public void resetHasLineageOnInputOutputDelete(Collection<AtlasEdge> removedEdge

String oppositeEdgeLabel = isOutputEdge ? PROCESS_INPUTS : PROCESS_OUTPUTS;

processEdgeLabel = getLabel(processId, oppositeEdgeLabel);
processLabelPairAlreadyProcessed = RequestContext.get().isEdgeLabelAlreadyProcessed(processEdgeLabel);

if (processLabelPairAlreadyProcessed) {
continue;
}
RequestContext.get().addEdgeLabel(processEdgeLabel);

Iterator<AtlasEdge> processEdgeIterator = GraphHelper.getActiveEdges(processVertex, oppositeEdgeLabel, AtlasEdgeDirection.BOTH);

while (processEdgeIterator.hasNext()) {
Expand All @@ -1536,6 +1559,10 @@ public void resetHasLineageOnInputOutputDelete(Collection<AtlasEdge> removedEdge
RequestContext.get().endMetricRecord(metricRecorder);
}

private String getLabel(String guid, String label){
return guid + ":" + label;
}

private void updateAssetHasLineageStatus(AtlasVertex assetVertex, AtlasEdge currentEdge, Collection<AtlasEdge> removedEdges) {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("updateAssetHasLineageStatus");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1684,7 +1684,16 @@ private AtlasStruct mapVertexToStruct(AtlasVertex entityVertex, String edgeLabel

if (GraphHelper.elementExists(edge)) {
final AtlasVertex referenceVertex = edge.getInVertex();
ret = new AtlasStruct(getTypeName(referenceVertex));

String typeName = getTypeName(referenceVertex);

if (StringUtils.isEmpty(typeName)) {
LOG.error("typeName not found for in-vertex {} on edge {} from vertex {} ",
getGuid(referenceVertex), edge.getId(), getGuid(entityVertex));
return ret;
}

ret = new AtlasStruct(typeName);

mapAttributes(referenceVertex, ret, entityExtInfo, isMinExtInfo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,10 @@ private void processCreateContract(AtlasEntity entity, EntityMutationContext con
authorizeContractCreateOrUpdate(entity, associatedAsset);

boolean contractSync = syncContractCertificateStatus(entity, contract);
contractString = DataContract.serialize(contract);
entity.setAttribute(ATTR_CONTRACT, contractString);
if (!isContractYaml(entity)) {
contractString = DataContract.serialize(contract);
entity.setAttribute(ATTR_CONTRACT, contractString);
}
String contractStringJSON = DataContract.serializeJSON(contract);
entity.setAttribute(ATTR_CONTRACT_JSON, contractStringJSON);

Expand Down Expand Up @@ -298,4 +300,8 @@ private static String getContractString(AtlasEntity entity) {
}
return contractString;
}

private static boolean isContractYaml(AtlasEntity entity) {
return !StringUtils.isEmpty((String) entity.getAttribute(ATTR_CONTRACT));
}
}
10 changes: 10 additions & 0 deletions server-api/src/main/java/org/apache/atlas/RequestContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class RequestContext {
private final Map<String, List<Object>> newElementsCreatedMap = new HashMap<>();

private final Map<String, Set<AtlasRelationship>> relationshipMutationMap = new HashMap<>();
private final Set<String> edgeLabels = new HashSet<>();

private String user;
private Set<String> userGroups;
Expand Down Expand Up @@ -830,4 +831,13 @@ public Map<String, String> getLexoRankCache() {
public void setLexoRankCache(Map<String, String> lexoRankCache) {
this.lexoRankCache = lexoRankCache;
}

public void addEdgeLabel(String processEdgeLabel) {
edgeLabels.add(processEdgeLabel);
}

public boolean isEdgeLabelAlreadyProcessed(String processEdgeLabel) {
return edgeLabels.contains(processEdgeLabel);
}

}

0 comments on commit 6bbc192

Please sign in to comment.