Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MESH-316 | Support Unarchive #3984

Merged
merged 2 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package org.apache.atlas.repository.migration;

import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.TransactionInterceptHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Iterator;
import java.util.Set;

import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
import static org.apache.atlas.repository.graph.GraphHelper.getStatus;

public class BulkUpdateProductsRestorationService {

private static final Logger LOG = LoggerFactory.getLogger(BulkUpdateProductsRestorationService.class);

private final AtlasGraph graph;
private final Set<String> productGuids;
private final GraphHelper graphHelper;
private final TransactionInterceptHelper transactionInterceptHelper;

public BulkUpdateProductsRestorationService(AtlasGraph graph, Set<String> productGuids, GraphHelper graphHelper, TransactionInterceptHelper transactionInterceptHelper) {
this.graph = graph;
this.productGuids = productGuids;
this.graphHelper = graphHelper;
this.transactionInterceptHelper = transactionInterceptHelper;
}

public void productState() throws AtlasBaseException {
try {
int count = 0;
int totalUpdatedCount = 0;
for (String productGuid: productGuids) {
LOG.info("Restoring state for Product: {}", productGuid);

if (productGuid != null && !productGuid.trim().isEmpty()) {
AtlasVertex productVertex = graphHelper.getVertexForGUID(productGuid);

AtlasEntity.Status vertexStatus = getStatus(productVertex);

if (ACTIVE.equals(vertexStatus)) {
LOG.info("Removing edges for Active Product: {}", productGuid);
boolean isCommitRequired = deleteEdgeForActiveProduct(productVertex);
if (isCommitRequired) {
count++;
totalUpdatedCount++;
}
} else {
LOG.info("Restoring edges for Archived Product: {}", productGuid);
boolean isCommitRequired = deleteEdgeForArchivedProduct(productVertex);
if (isCommitRequired) {
count++;
totalUpdatedCount++;
}
}

if (count == 20) {
LOG.info("Committing batch of 20 products...");
commitChanges();
count = 0;
}
}
}

if (count > 0) {
LOG.info("Committing remaining {} products...", count);
commitChanges();
}

LOG.info("Total products updated: {}", totalUpdatedCount);
} catch (Exception e) {
LOG.error("Error while restoring state for Products: {}", productGuids, e);
throw new AtlasBaseException(e);
}
}


public boolean deleteEdgeForActiveProduct(AtlasVertex productVertex) {
boolean isCommitRequired = false;
try {
Iterator<AtlasEdge> existingEdges = productVertex.getEdges(AtlasEdgeDirection.BOTH).iterator();

if (existingEdges == null || !existingEdges.hasNext()) {
LOG.info("No edges found for Product: {}", productVertex);
return isCommitRequired;
}

while (existingEdges.hasNext()) {
AtlasEdge edge = existingEdges.next();

AtlasEntity.Status edgeStatus = getStatus(edge);
LOG.info("Edge status: {}", edgeStatus);

if (DELETED.equals(edgeStatus)) {
graph.removeEdge(edge);
isCommitRequired = true;
}
}
} catch (Exception e) {
LOG.error("Error while deleting soft edges for Active Product: {}", productVertex, e);
throw new RuntimeException(e);
}
return isCommitRequired;
}


private boolean deleteEdgeForArchivedProduct(AtlasVertex productVertex) {
boolean isCommitRequired = false;
try {
Long updatedTime = productVertex.getProperty("__modificationTimestamp", Long.class);
Iterator<AtlasEdge> existingEdges = productVertex.getEdges(AtlasEdgeDirection.BOTH).iterator();

while (existingEdges.hasNext()) {
AtlasEdge edge = existingEdges.next();
Long modifiedTimestamp = edge.getProperty("__modificationTimestamp", Long.class);

if (!updatedTime.equals(modifiedTimestamp)) {
LOG.info("Removing edge with different timestamp: {}", edge);
graph.removeEdge(edge);
isCommitRequired = true;
} else {
LOG.info("Keeping edge with matching timestamp: {}", edge);
}
}
} catch (Exception e) {
LOG.error("Error while deleting edges for Archived Product: {}", productVertex, e);
throw new RuntimeException(e);
}
return isCommitRequired;
}

public void commitChanges() throws AtlasBaseException {
try {
transactionInterceptHelper.intercept();
LOG.info("Committed a entity to the graph");
} catch (Exception e) {
LOG.error("Failed to commit asset: ", e);
throw e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3006,13 +3006,16 @@ private List<AtlasEdge> removeUnusedArrayEntries(AtlasAttribute attribute, List<
continue;
}

boolean deleted = false;
if (edgeLabelsForHardDeletion.contains(edge.getLabel())) {
graph.removeEdge(edge);
} else {
deleted = deleteDelegate.getHandler().deleteEdgeReference(edge, entryType.getTypeCategory(), attribute.isOwnedRef(),
true, attribute.getRelationshipEdgeDirection(), entityVertex);
}
boolean deleted = deleteDelegate.getHandler().deleteEdgeReference(edge, entryType.getTypeCategory(), attribute.isOwnedRef(),
true, attribute.getRelationshipEdgeDirection(), entityVertex);

// boolean deleted = false;
// if (edgeLabelsForHardDeletion.contains(edge.getLabel())) {
// graph.removeEdge(edge);
// } else {
// deleted = deleteDelegate.getHandler().deleteEdgeReference(edge, entryType.getTypeCategory(), attribute.isOwnedRef(),
// true, attribute.getRelationshipEdgeDirection(), entityVertex);
// }

if (!deleted) {
additionalElements.add(edge);
Expand Down
28 changes: 28 additions & 0 deletions webapp/src/main/java/org/apache/atlas/web/rest/MigrationREST.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.apache.atlas.repository.migration.BulkUpdateProductsRestorationService;

import javax.inject.Inject;
import javax.inject.Singleton;
Expand Down Expand Up @@ -350,6 +351,33 @@ public Boolean updateUniqueQualifiedName(final Set<String> assetGuids) throws Ex
return Boolean.TRUE;
}

@POST
@Path("product/remove-edges")
@Timed
@Consumes(MediaType.APPLICATION_JSON)
public Boolean bulkProductsRedundantEdgeRemoval(Set<String> guids) throws Exception {
AtlasPerfTracer perf = null;
try {
if (CollectionUtils.isEmpty(guids)) {
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Product GUIDs are required for removing redundant edges");
}

if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "MigrationREST.bulkProductsRedundantEdgeRemoval(" + guids + ")");
}

BulkUpdateProductsRestorationService migrationService = new BulkUpdateProductsRestorationService(graph, guids, new GraphHelper(graph), transactionInterceptHelper);
migrationService.productState();

} catch (Exception e) {
LOG.error("Error while removing edges for guid: {}", guids, e);
throw e;
} finally {
AtlasPerfTracer.log(perf);
}
return Boolean.TRUE;
}

private List<AtlasEntity> getEntitiesByIndexSearch(IndexSearchParams indexSearchParams, Boolean minExtInfo, boolean ignoreRelationships) throws AtlasBaseException {
List<AtlasEntity> entities = new ArrayList<>();
String indexName = "janusgraph_vertex_index";
Expand Down
Loading