Skip to content

Commit

Permalink
Merge pull request #3601 from atlanhq/DG-1853-beta
Browse files Browse the repository at this point in the history
DG-1853 Changed flow from sync to async for updateClassificationText
  • Loading branch information
hr2904 authored Oct 8, 2024
2 parents 8b72977 + d185e8a commit 7e1f28b
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@

import static org.apache.atlas.AtlasConfiguration.LABEL_MAX_LENGTH;
import static org.apache.atlas.AtlasConfiguration.STORE_DIFFERENTIAL_AUDITS;
import static org.apache.atlas.AtlasErrorCode.CLASSIFICATION_NOT_FOUND;
import static org.apache.atlas.model.TypeCategory.ARRAY;
import static org.apache.atlas.model.TypeCategory.CLASSIFICATION;
import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
Expand Down Expand Up @@ -3139,7 +3140,8 @@ private void updateInConsistentOwnedMapVertices(AttributeMutationContext ctx, At
}
}

public void cleanUpClassificationPropagation(String classificationName, int batchLimit) throws AtlasBaseException {

public void cleanUpClassificationPropagation(String classificationName, int batchLimit) {
int CLEANUP_MAX = batchLimit <= 0 ? CLEANUP_BATCH_SIZE : batchLimit * CLEANUP_BATCH_SIZE;
int cleanedUpCount = 0;
final int CHUNK_SIZE_TEMP = 50;
Expand Down Expand Up @@ -3991,11 +3993,14 @@ public void updateClassifications(EntityMutationContext context, String guid, Li
String entityTypeName = AtlasGraphUtilsV2.getTypeName(entityVertex);
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName);
List<AtlasClassification> updatedClassifications = new ArrayList<>();
List<AtlasVertex> entitiesToPropagateTo = new ArrayList<>();
Set<AtlasVertex> notificationVertices = new HashSet<AtlasVertex>() {{ add(entityVertex); }};
HashMap<String, String> updatedClassificationsVertices = new HashMap<>();
List<AtlasVertex> entitiesToPropagateTo = new ArrayList<>();
Set<AtlasVertex> notificationVertices = new HashSet<AtlasVertex>() {{ add(entityVertex); }};

Map<AtlasVertex, List<AtlasClassification>> addedPropagations = null;
Map<AtlasClassification, List<AtlasVertex>> removedPropagations = new HashMap<>();
String propagationType = null;
Queue<StaggeredTask> tasksToBeCreated = new LinkedList<>();

for (AtlasClassification classification : classifications) {
MetricRecorder metricRecorderClassification = RequestContext.get().startMetricRecord("updateClassifications_classification");
Expand Down Expand Up @@ -4068,8 +4073,9 @@ public void updateClassifications(EntityMutationContext context, String guid, Li
if (isClassificationUpdated) {
List<AtlasVertex> propagatedEntityVertices = graphHelper.getAllPropagatedEntityVertices(classificationVertex);

notificationVertices.addAll(propagatedEntityVertices);
}
notificationVertices.addAll(propagatedEntityVertices);
updatedClassificationsVertices.put(classificationName, classificationVertex.getIdForDisplay());
}

if (LOG.isDebugEnabled()) {
LOG.debug("updating vertex {} for trait {}", string(classificationVertex), classificationName);
Expand Down Expand Up @@ -4117,14 +4123,14 @@ public void updateClassifications(EntityMutationContext context, String guid, Li
!Objects.equals(currentRestrictPropagationThroughLineage, updatedRestrictPropagationThroughLineage)) &&
taskManagement != null && DEFERRED_ACTION_ENABLED) {

String propagationType = CLASSIFICATION_PROPAGATION_ADD;
if(currentRestrictPropagationThroughLineage != updatedRestrictPropagationThroughLineage || currentRestrictPropagationThroughHierarchy != updatedRestrictPropagationThroughHierarchy){
propagationType = CLASSIFICATION_REFRESH_PROPAGATION;
}
if (removePropagation || !updatedTagPropagation) {
propagationType = CLASSIFICATION_PROPAGATION_DELETE;
}
createAndQueueTask(propagationType, entityVertex, classificationVertex.getIdForDisplay(), classificationName, currentRestrictPropagationThroughLineage,currentRestrictPropagationThroughHierarchy);
propagationType = CLASSIFICATION_PROPAGATION_ADD;
if(currentRestrictPropagationThroughLineage != updatedRestrictPropagationThroughLineage || currentRestrictPropagationThroughHierarchy != updatedRestrictPropagationThroughHierarchy){
propagationType = CLASSIFICATION_REFRESH_PROPAGATION;
}
if (removePropagation || !updatedTagPropagation) {
propagationType = CLASSIFICATION_PROPAGATION_DELETE;
}
tasksToBeCreated.add(new StaggeredTask(propagationType, entityVertex, classificationVertex.getIdForDisplay(), classificationName, currentRestrictPropagationThroughLineage,currentRestrictPropagationThroughHierarchy));
updatedTagPropagation = null;
}

Expand Down Expand Up @@ -4197,11 +4203,16 @@ public void updateClassifications(EntityMutationContext context, String guid, Li
AtlasEntity entity = instanceConverter.getAndCacheEntity(entityGuid, ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES);

if (entity != null) {
vertex.setProperty(CLASSIFICATION_TEXT_KEY, fullTextMapperV2.getClassificationTextForEntity(entity));
entityChangeNotifier.onClassificationUpdatedToEntity(entity, updatedClassifications);
}
RequestContext.get().endMetricRecord(metricRecorderEntity);
}
RequestContext.get().endMetricRecord(metricRecorderEntity);}
for(String classification : updatedClassificationsVertices.keySet()){
createAndQueueTask(CLASSIFICATION_PROPAGATION_TEXT_UPDATE, entityVertex, updatedClassificationsVertices.get(classification), classification);
}
while (!tasksToBeCreated.isEmpty()){
StaggeredTask task = tasksToBeCreated.poll();
createAndQueueTask(task.taskType, task.entityVertex, task.classificationVertexId, task.classificationName, task.currentPropagateThroughLineage, task.currentRestrictPropagationThroughHierarchy);
}

if (MapUtils.isNotEmpty(removedPropagations)) {
AtlasPerfMetrics.MetricRecorder metricRecorderEntity = RequestContext.get().startMetricRecord("updateClassifications_removeProp");
Expand Down Expand Up @@ -4281,6 +4292,34 @@ public void deleteClassifications(String guid) throws AtlasBaseException {
}
}

public void updateClassificationTextPropagation(String classificationVertexId) {
try {
if (StringUtils.isEmpty(classificationVertexId)) {
LOG.warn("updateClassificationTextPropagation(classificationVertexId={}): classification vertex id is empty", classificationVertexId);
return;
}

AtlasVertex classificationVertex = graph.getVertex(classificationVertexId);
List<AtlasVertex> impactedVertices = graphHelper.getAllPropagatedEntityVertices(classificationVertex);
int batchSize = 100;
for (int i = 0; i < impactedVertices.size(); i += batchSize) {
int end = Math.min(i + batchSize, impactedVertices.size());
List<AtlasVertex> batch = impactedVertices.subList(i, end);
for (AtlasVertex vertex : batch) {
String entityGuid = graphHelper.getGuid(vertex);
AtlasEntity entity = instanceConverter.getAndCacheEntity(entityGuid, ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES);

if (entity != null) {
vertex.setProperty(CLASSIFICATION_TEXT_KEY, fullTextMapperV2.getClassificationTextForEntity(entity));
}
}
transactionInterceptHelper.intercept();
}
} catch (Exception e){
e.printStackTrace();
}
}

public List<String> deleteClassificationPropagation(String entityGuid, String classificationVertexId) throws AtlasBaseException {
try {
if (StringUtils.isEmpty(classificationVertexId)) {
Expand Down Expand Up @@ -5183,4 +5222,22 @@ private void isAuthorizedToLink(AtlasVertex vertex) throws AtlasBaseException {
"read on source Entity, link/unlink operation denied: ", sourceEntity.getAttribute(NAME));

}

class StaggeredTask {
String taskType;
AtlasVertex entityVertex;
String classificationVertexId;
String classificationName;
Boolean currentPropagateThroughLineage;
Boolean currentRestrictPropagationThroughHierarchy;

public StaggeredTask(String taskType, AtlasVertex entityVertex, String classificationVertexId, String classificationName, Boolean currentPropagateThroughLineage, Boolean currentRestrictPropagationThroughHierarchy) {
this.taskType = taskType;
this.entityVertex = entityVertex;
this.classificationVertexId = classificationVertexId;
this.classificationName = classificationName;
this.currentPropagateThroughLineage = currentPropagateThroughLineage;
this.currentRestrictPropagationThroughHierarchy = currentRestrictPropagationThroughHierarchy;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
public class ClassificationPropagateTaskFactory implements TaskFactory {
private static final Logger LOG = LoggerFactory.getLogger(ClassificationPropagateTaskFactory.class);

public static final String CLASSIFICATION_PROPAGATION_TEXT_UPDATE = "CLASSIFICATION_PROPAGATION_TEXT_UPDATE";
public static final String CLASSIFICATION_PROPAGATION_ADD = "CLASSIFICATION_PROPAGATION_ADD";

//This should be used when referencing vertex to which classification is directly attached
Expand Down Expand Up @@ -90,6 +91,9 @@ public org.apache.atlas.tasks.AbstractTask create(AtlasTask task) {
case CLASSIFICATION_PROPAGATION_ADD:
return new ClassificationPropagationTasks.Add(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);

case CLASSIFICATION_PROPAGATION_TEXT_UPDATE:
return new ClassificationPropagationTasks.UpdateText(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);

case CLASSIFICATION_PROPAGATION_DELETE:
return new ClassificationPropagationTasks.Delete(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ protected void run(Map<String, Object> parameters) throws AtlasBaseException {
}
}

public static class UpdateText extends ClassificationTask {
public UpdateText(AtlasTask task, AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) {
super(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
}

@Override
protected void run(Map<String, Object> parameters) throws AtlasBaseException {
String classificationVertexId = (String) parameters.get(PARAM_CLASSIFICATION_VERTEX_ID);
entityGraphMapper.updateClassificationTextPropagation(classificationVertexId);
}
}

public static class Delete extends ClassificationTask {
public Delete(AtlasTask task, AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) {
super(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
Expand Down

0 comments on commit 7e1f28b

Please sign in to comment.