Skip to content

Commit

Permalink
Merge pull request #3610 from atlanhq/DG-1853
Browse files Browse the repository at this point in the history
DG-1853 Created a task to separate out tag attributes update during classification update
  • Loading branch information
nikhilbonte21 authored Oct 9, 2024
2 parents 09f3e2e + 83540a7 commit 8a3f458
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@

import static org.apache.atlas.AtlasConfiguration.LABEL_MAX_LENGTH;
import static org.apache.atlas.AtlasConfiguration.STORE_DIFFERENTIAL_AUDITS;
import static org.apache.atlas.AtlasErrorCode.CLASSIFICATION_NOT_FOUND;
import static org.apache.atlas.model.TypeCategory.ARRAY;
import static org.apache.atlas.model.TypeCategory.CLASSIFICATION;
import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
Expand Down Expand Up @@ -3649,6 +3650,7 @@ public void updateClassifications(EntityMutationContext context, String guid, Li

Map<AtlasVertex, List<AtlasClassification>> addedPropagations = null;
Map<AtlasClassification, List<AtlasVertex>> removedPropagations = new HashMap<>();
String propagationType = null;

for (AtlasClassification classification : classifications) {
String classificationName = classification.getTypeName();
Expand Down Expand Up @@ -3690,7 +3692,7 @@ public void updateClassifications(EntityMutationContext context, String guid, Li
currentClassification.setAttribute(attributeName, updatedAttributes.get(attributeName));
}

isClassificationUpdated = true;
createAndQueueTask(CLASSIFICATION_PROPAGATION_TEXT_UPDATE, entityVertex, classificationVertex.getIdForDisplay(), classification.getTypeName());
}

// check for validity period update
Expand Down Expand Up @@ -3719,7 +3721,6 @@ public void updateClassifications(EntityMutationContext context, String guid, Li

if (isClassificationUpdated) {
List<AtlasVertex> propagatedEntityVertices = graphHelper.getAllPropagatedEntityVertices(classificationVertex);

notificationVertices.addAll(propagatedEntityVertices);
}

Expand Down Expand Up @@ -3769,7 +3770,7 @@ public void updateClassifications(EntityMutationContext context, String guid, Li
!Objects.equals(currentRestrictPropagationThroughLineage, updatedRestrictPropagationThroughLineage)) &&
taskManagement != null && DEFERRED_ACTION_ENABLED) {

String propagationType = CLASSIFICATION_PROPAGATION_ADD;
propagationType = CLASSIFICATION_PROPAGATION_ADD;
if(currentRestrictPropagationThroughLineage != updatedRestrictPropagationThroughLineage || currentRestrictPropagationThroughHierarchy != updatedRestrictPropagationThroughHierarchy){
propagationType = CLASSIFICATION_REFRESH_PROPAGATION;
}
Expand Down Expand Up @@ -3918,6 +3919,34 @@ public void deleteClassifications(String guid) throws AtlasBaseException {
}
}

public void updateClassificationTextPropagation(String classificationVertexId) throws AtlasBaseException {
if (StringUtils.isEmpty(classificationVertexId)) {
LOG.warn("updateClassificationTextPropagation(classificationVertexId={}): classification vertex id is empty", classificationVertexId);
return;
}
AtlasVertex classificationVertex = graph.getVertex(classificationVertexId);
AtlasClassification classification = entityRetriever.toAtlasClassification(classificationVertex);
LOG.info("Fetched classification : {} ", classification.toString());
List<AtlasVertex> impactedVertices = graphHelper.getAllPropagatedEntityVertices(classificationVertex);
LOG.info("impactedVertices : {}", impactedVertices.size());
int batchSize = 100;
for (int i = 0; i < impactedVertices.size(); i += batchSize) {
int end = Math.min(i + batchSize, impactedVertices.size());
List<AtlasVertex> batch = impactedVertices.subList(i, end);
for (AtlasVertex vertex : batch) {
String entityGuid = graphHelper.getGuid(vertex);
AtlasEntity entity = instanceConverter.getAndCacheEntity(entityGuid, true);

if (entity != null) {
vertex.setProperty(CLASSIFICATION_TEXT_KEY, fullTextMapperV2.getClassificationTextForEntity(entity));
entityChangeNotifier.onClassificationUpdatedToEntity(entity, Collections.singletonList(classification));
}
}
transactionInterceptHelper.intercept();
LOG.info("Updated classificationText from {} for {}", i, batchSize);
}
}

public List<String> deleteClassificationPropagation(String entityGuid, String classificationVertexId) throws AtlasBaseException {
try {
if (StringUtils.isEmpty(classificationVertexId)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
public class ClassificationPropagateTaskFactory implements TaskFactory {
private static final Logger LOG = LoggerFactory.getLogger(ClassificationPropagateTaskFactory.class);

public static final String CLASSIFICATION_PROPAGATION_TEXT_UPDATE = "CLASSIFICATION_PROPAGATION_TEXT_UPDATE";
public static final String CLASSIFICATION_PROPAGATION_ADD = "CLASSIFICATION_PROPAGATION_ADD";

//This should be used when referencing vertex to which classification is directly attached
Expand All @@ -59,6 +60,7 @@ public class ClassificationPropagateTaskFactory implements TaskFactory {


public static final List<String> supportedTypes = new ArrayList<String>() {{
add(CLASSIFICATION_PROPAGATION_TEXT_UPDATE);
add(CLASSIFICATION_PROPAGATION_ADD);
add(CLASSIFICATION_PROPAGATION_DELETE);
add(CLASSIFICATION_ONLY_PROPAGATION_DELETE);
Expand Down Expand Up @@ -90,6 +92,9 @@ public org.apache.atlas.tasks.AbstractTask create(AtlasTask task) {
case CLASSIFICATION_PROPAGATION_ADD:
return new ClassificationPropagationTasks.Add(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);

case CLASSIFICATION_PROPAGATION_TEXT_UPDATE:
return new ClassificationPropagationTasks.UpdateText(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);

case CLASSIFICATION_PROPAGATION_DELETE:
return new ClassificationPropagationTasks.Delete(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ protected void run(Map<String, Object> parameters) throws AtlasBaseException {
}
}

public static class UpdateText extends ClassificationTask {
public UpdateText(AtlasTask task, AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) {
super(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
}

@Override
protected void run(Map<String, Object> parameters) throws AtlasBaseException {
String classificationVertexId = (String) parameters.get(PARAM_CLASSIFICATION_VERTEX_ID);
entityGraphMapper.updateClassificationTextPropagation(classificationVertexId);
}
}

public static class Delete extends ClassificationTask {
public Delete(AtlasTask task, AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) {
super(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
Expand Down

0 comments on commit 8a3f458

Please sign in to comment.