diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/BulkUpdateProductsRestorationService.java b/repository/src/main/java/org/apache/atlas/repository/migration/BulkUpdateProductsRestorationService.java new file mode 100644 index 0000000000..3daa6588ce --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/migration/BulkUpdateProductsRestorationService.java @@ -0,0 +1,149 @@ +package org.apache.atlas.repository.migration; + +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.repository.graph.GraphHelper; +import org.apache.atlas.repository.graphdb.AtlasEdge; +import org.apache.atlas.repository.graphdb.AtlasEdgeDirection; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.v2.TransactionInterceptHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; +import java.util.Set; + +import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE; +import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED; +import static org.apache.atlas.repository.graph.GraphHelper.getStatus; + +public class BulkUpdateProductsRestorationService { + + private static final Logger LOG = LoggerFactory.getLogger(BulkUpdateProductsRestorationService.class); + + private final AtlasGraph graph; + private final Set productGuids; + private final GraphHelper graphHelper; + private final TransactionInterceptHelper transactionInterceptHelper; + + public BulkUpdateProductsRestorationService(AtlasGraph graph, Set productGuids, GraphHelper graphHelper, TransactionInterceptHelper transactionInterceptHelper) { + this.graph = graph; + this.productGuids = productGuids; + this.graphHelper = graphHelper; + this.transactionInterceptHelper = transactionInterceptHelper; + } + + public void productState() throws AtlasBaseException { + try { + int count = 0; + int totalUpdatedCount = 0; + for (String productGuid: productGuids) { + LOG.info("Restoring state for Product: {}", productGuid); + + if (productGuid != null && !productGuid.trim().isEmpty()) { + AtlasVertex productVertex = graphHelper.getVertexForGUID(productGuid); + + AtlasEntity.Status vertexStatus = getStatus(productVertex); + + if (ACTIVE.equals(vertexStatus)) { + LOG.info("Removing edges for Active Product: {}", productGuid); + boolean isCommitRequired = deleteEdgeForActiveProduct(productVertex); + if (isCommitRequired) { + count++; + totalUpdatedCount++; + } + } else { + LOG.info("Restoring edges for Archived Product: {}", productGuid); + boolean isCommitRequired = deleteEdgeForArchivedProduct(productVertex); + if (isCommitRequired) { + count++; + totalUpdatedCount++; + } + } + + if (count == 20) { + LOG.info("Committing batch of 20 products..."); + commitChanges(); + count = 0; + } + } + } + + if (count > 0) { + LOG.info("Committing remaining {} products...", count); + commitChanges(); + } + + LOG.info("Total products updated: {}", totalUpdatedCount); + } catch (Exception e) { + LOG.error("Error while restoring state for Products: {}", productGuids, e); + throw new AtlasBaseException(e); + } + } + + + public boolean deleteEdgeForActiveProduct(AtlasVertex productVertex) { + boolean isCommitRequired = false; + try { + Iterator existingEdges = productVertex.getEdges(AtlasEdgeDirection.BOTH).iterator(); + + if (existingEdges == null || !existingEdges.hasNext()) { + LOG.info("No edges found for Product: {}", productVertex); + return isCommitRequired; + } + + while (existingEdges.hasNext()) { + AtlasEdge edge = existingEdges.next(); + + AtlasEntity.Status edgeStatus = getStatus(edge); + LOG.info("Edge status: {}", edgeStatus); + + if (DELETED.equals(edgeStatus)) { + graph.removeEdge(edge); + isCommitRequired = true; + } + } + } catch (Exception e) { + LOG.error("Error while deleting soft edges for Active Product: {}", productVertex, e); + throw new RuntimeException(e); + } + return isCommitRequired; + } + + + private boolean deleteEdgeForArchivedProduct(AtlasVertex productVertex) { + boolean isCommitRequired = false; + try { + Long updatedTime = productVertex.getProperty("__modificationTimestamp", Long.class); + Iterator existingEdges = productVertex.getEdges(AtlasEdgeDirection.BOTH).iterator(); + + while (existingEdges.hasNext()) { + AtlasEdge edge = existingEdges.next(); + Long modifiedTimestamp = edge.getProperty("__modificationTimestamp", Long.class); + + if (!updatedTime.equals(modifiedTimestamp)) { + LOG.info("Removing edge with different timestamp: {}", edge); + graph.removeEdge(edge); + isCommitRequired = true; + } else { + LOG.info("Keeping edge with matching timestamp: {}", edge); + } + } + } catch (Exception e) { + LOG.error("Error while deleting edges for Archived Product: {}", productVertex, e); + throw new RuntimeException(e); + } + return isCommitRequired; + } + + public void commitChanges() throws AtlasBaseException { + try { + transactionInterceptHelper.intercept(); + LOG.info("Committed a entity to the graph"); + } catch (Exception e) { + LOG.error("Failed to commit asset: ", e); + throw e; + } + } +} \ No newline at end of file diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java index 86f7a41cb0..241c358a80 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java @@ -3006,13 +3006,16 @@ private List removeUnusedArrayEntries(AtlasAttribute attribute, List< continue; } - boolean deleted = false; - if (edgeLabelsForHardDeletion.contains(edge.getLabel())) { - graph.removeEdge(edge); - } else { - deleted = deleteDelegate.getHandler().deleteEdgeReference(edge, entryType.getTypeCategory(), attribute.isOwnedRef(), - true, attribute.getRelationshipEdgeDirection(), entityVertex); - } + boolean deleted = deleteDelegate.getHandler().deleteEdgeReference(edge, entryType.getTypeCategory(), attribute.isOwnedRef(), + true, attribute.getRelationshipEdgeDirection(), entityVertex); + +// boolean deleted = false; +// if (edgeLabelsForHardDeletion.contains(edge.getLabel())) { +// graph.removeEdge(edge); +// } else { +// deleted = deleteDelegate.getHandler().deleteEdgeReference(edge, entryType.getTypeCategory(), attribute.isOwnedRef(), +// true, attribute.getRelationshipEdgeDirection(), entityVertex); +// } if (!deleted) { additionalElements.add(edge); diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/MigrationREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/MigrationREST.java index c50e472d84..a3e18d00fd 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/MigrationREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/MigrationREST.java @@ -28,6 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; +import org.apache.atlas.repository.migration.BulkUpdateProductsRestorationService; import javax.inject.Inject; import javax.inject.Singleton; @@ -350,6 +351,33 @@ public Boolean updateUniqueQualifiedName(final Set assetGuids) throws Ex return Boolean.TRUE; } + @POST + @Path("product/remove-edges") + @Timed + @Consumes(MediaType.APPLICATION_JSON) + public Boolean bulkProductsRedundantEdgeRemoval(Set guids) throws Exception { + AtlasPerfTracer perf = null; + try { + if (CollectionUtils.isEmpty(guids)) { + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Product GUIDs are required for removing redundant edges"); + } + + if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { + perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "MigrationREST.bulkProductsRedundantEdgeRemoval(" + guids + ")"); + } + + BulkUpdateProductsRestorationService migrationService = new BulkUpdateProductsRestorationService(graph, guids, new GraphHelper(graph), transactionInterceptHelper); + migrationService.productState(); + + } catch (Exception e) { + LOG.error("Error while removing edges for guid: {}", guids, e); + throw e; + } finally { + AtlasPerfTracer.log(perf); + } + return Boolean.TRUE; + } + private List getEntitiesByIndexSearch(IndexSearchParams indexSearchParams, Boolean minExtInfo, boolean ignoreRelationships) throws AtlasBaseException { List entities = new ArrayList<>(); String indexName = "janusgraph_vertex_index";