Skip to content

Commit

Permalink
Merge pull request #3617 from atlanhq/DG-1851-beta-helper
Browse files Browse the repository at this point in the history
DG-1851 Added the commiting fix for batch-statements
  • Loading branch information
hr2904 authored Oct 10, 2024
2 parents d093392 + f53ceb2 commit e839efd
Showing 1 changed file with 16 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ public class EntityGraphMapper {

private static final boolean RESTRICT_PROPAGATION_THROUGH_HIERARCHY_DEFAULT = false;
public static final int CLEANUP_BATCH_SIZE = 200000;
public static final int CLASSIFICATION_EDGE_BATCH_LIMIT = 100;
private boolean DEFERRED_ACTION_ENABLED = AtlasConfiguration.TASKS_USE_ENABLED.getBoolean();
private boolean DIFFERENTIAL_AUDITS = STORE_DIFFERENTIAL_AUDITS.getBoolean();

Expand Down Expand Up @@ -3184,19 +3185,23 @@ public void cleanUpClassificationPropagation(String classificationName, int batc
List<AtlasEdge> classificationEdges = GraphHelper.getClassificationEdges(vertex, null, classificationName);
LOG.info("Found {} classification edges for vertex {}", classificationEdges.size(), GraphHelper.getGuid(vertex));
classificationEdgeCount += classificationEdges.size();
for (AtlasEdge edge : classificationEdges) {
try {
AtlasClassification classification = entityRetriever.toAtlasClassification(edge.getInVertex());
deletedClassifications.add(classification);
LOG.info("Deleting classification edge between vertex {} and classification {}", GraphHelper.getGuid(vertex), classification.getTypeName());
deleteDelegate.getHandler().deleteEdgeReference(edge, TypeCategory.CLASSIFICATION, false, true, null, vertex);
int batchSize = CLASSIFICATION_EDGE_BATCH_LIMIT;
for (int i = 0; i < classificationEdges.size(); i += batchSize) {
int end = Math.min(i + batchSize, classificationEdges.size());
List<AtlasEdge> batch = classificationEdges.subList(i, end);
for (AtlasEdge edge : batch) {
try {
AtlasClassification classification = entityRetriever.toAtlasClassification(edge.getInVertex());
deleteDelegate.getHandler().deleteEdgeReference(edge, TypeCategory.CLASSIFICATION, false, true, null, vertex);
LOG.info("Deleting classification edge between vertex {} and classification {}", GraphHelper.getGuid(vertex), classification.getTypeName());deleteDelegate.getHandler().deleteEdgeReference(edge, TypeCategory.CLASSIFICATION, false, true, null, vertex);
deletedClassifications.add(classification);
} catch (IllegalStateException | AtlasBaseException e) {LOG.error("Error deleting classification edge for vertex {}: {}", GraphHelper.getGuid(vertex), e.getMessage());
e.printStackTrace();
}
}
catch (IllegalStateException | AtlasBaseException e){
LOG.error("Error deleting classification edge for vertex {}: {}", GraphHelper.getGuid(vertex), e.getMessage());
e.printStackTrace();
}
}
transactionInterceptHelper.intercept();

}
try {
AtlasEntity entity = repairClassificationMappings(vertex);
LOG.info("Notifying about deleted classifications for entity {}", GraphHelper.getGuid(vertex));
Expand Down

0 comments on commit e839efd

Please sign in to comment.