Skip to content

Commit

Permalink
Added more fine grained metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
hr2904 committed Oct 24, 2024
1 parent 19913d8 commit eec08a8
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3555,25 +3555,28 @@ public void deleteClassification(String entityGuid, String classificationName) t
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "EntityGraphMapper.deleteClassification");
}

List<String> traitNames = getTraitNames(entityVertex);
AtlasPerfMetrics.MetricRecorder deleteClassification1 = RequestContext.get().startMetricRecord("deleteClassification-1");
List<String> traitNames = getTraitNames(entityVertex); // TODO: clean

if (CollectionUtils.isEmpty(traitNames)) {
throw new AtlasBaseException(AtlasErrorCode.NO_CLASSIFICATIONS_FOUND_FOR_ENTITY, entityGuid);
}

validateClassificationExists(traitNames, classificationName);

AtlasVertex classificationVertex = getClassificationVertex(entityVertex, classificationName);
AtlasVertex classificationVertex = getClassificationVertex(entityVertex, classificationName); // TODO: clean

// Get in progress task to see if there already is a propagation for this particular vertex
AtlasPerfMetrics.MetricRecorder taskManagementMetric = RequestContext.get().startMetricRecord("deleteClassification-taskManagement");
List<AtlasTask> inProgressTasks = taskManagement.getInProgressTasks();
RequestContext.get().endMetricRecord(taskManagementMetric);
AtlasPerfMetrics.MetricRecorder taskManagementMetric2 = RequestContext.get().startMetricRecord("deleteClassification-taskManagement2");
for (AtlasTask task : inProgressTasks) {
if (isTaskMatchingWithVertexIdAndEntityGuid(task, classificationVertex.getIdForDisplay(), entityGuid)) {
throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_CURRENTLY_BEING_PROPAGATED, classificationName);
}
}

RequestContext.get().endMetricRecord(taskManagementMetric2);
AtlasClassification classification = entityRetriever.toAtlasClassification(classificationVertex);

if (classification == null) {
Expand All @@ -3582,39 +3585,45 @@ public void deleteClassification(String entityGuid, String classificationName) t

// remove classification from propagated entities if propagation is turned on
final List<AtlasVertex> entityVertices;

if (isPropagationEnabled(classificationVertex)) {
//TODO : add for this flow metric
RequestContext.get().endMetricRecord(deleteClassification1);
AtlasPerfMetrics.MetricRecorder deleteClassification2 = RequestContext.get().startMetricRecord("deleteClassification-2");
if (isPropagationEnabled(classificationVertex)) { // TODO
if (taskManagement != null && DEFERRED_ACTION_ENABLED) {
boolean propagateDelete = true;
String classificationVertexId = classificationVertex.getIdForDisplay();

List<String> entityTaskGuids = (List<String>) entityVertex.getPropertyValues(PENDING_TASKS_PROPERTY_KEY, String.class);

if (CollectionUtils.isNotEmpty(entityTaskGuids)) {
List<AtlasTask> entityPendingTasks = taskManagement.getByGuidsES(entityTaskGuids);

boolean pendingTaskExists = entityPendingTasks.stream()
List<AtlasTask> entityPendingTasks = taskManagement.getByGuidsES(entityTaskGuids); // TODO
AtlasPerfMetrics.MetricRecorder deleteClassification25 = RequestContext.get().startMetricRecord("deleteClassification-25");
boolean pendingTaskExists = entityPendingTasks.stream() // TODO
.anyMatch(x -> isTaskMatchingWithVertexIdAndEntityGuid(x, classificationVertexId, entityGuid));

RequestContext.get().endMetricRecord(deleteClassification25);
if (pendingTaskExists) {
AtlasPerfMetrics.MetricRecorder deleteClassification26 = RequestContext.get().startMetricRecord("deleteClassification-26");
List<AtlasTask> entityClassificationPendingTasks = entityPendingTasks.stream()
.filter(t -> t.getParameters().containsKey("entityGuid")
&& t.getParameters().containsKey("classificationVertexId"))
.filter(t -> t.getParameters().get("entityGuid").equals(entityGuid)
&& t.getParameters().get("classificationVertexId").equals(classificationVertexId)
&& t.getType().equals(CLASSIFICATION_PROPAGATION_ADD))
.collect(Collectors.toList());
RequestContext.get().endMetricRecord(deleteClassification26);
AtlasPerfMetrics.MetricRecorder deleteClassification27 = RequestContext.get().startMetricRecord("deleteClassification-27");
for (AtlasTask entityClassificationPendingTask: entityClassificationPendingTasks) {
String taskGuid = entityClassificationPendingTask.getGuid();
taskManagement.deleteByGuid(taskGuid, TaskManagement.DeleteType.SOFT);
taskManagement.deleteByGuid(taskGuid, TaskManagement.DeleteType.SOFT); //TODO
AtlasGraphUtilsV2.deleteProperty(entityVertex, PENDING_TASKS_PROPERTY_KEY, taskGuid);
// propagateDelete = false; TODO: Uncomment when all unnecessary ADD tasks are resolved
}
RequestContext.get().endMetricRecord(deleteClassification27);
}
}

if (propagateDelete) {
createAndQueueTask(CLASSIFICATION_PROPAGATION_DELETE, entityVertex, classificationVertex.getIdForDisplay(), classificationName);
createAndQueueTask(CLASSIFICATION_PROPAGATION_DELETE, entityVertex, classificationVertex.getIdForDisplay(), classificationName); // TODO:
}

entityVertices = new ArrayList<>();
Expand All @@ -3633,14 +3642,15 @@ public void deleteClassification(String entityGuid, String classificationName) t
if (!entityVertices.contains(entityVertex)) {
entityVertices.add(entityVertex);
}

RequestContext.get().endMetricRecord(deleteClassification2);
AtlasPerfMetrics.MetricRecorder deleteClassification3 = RequestContext.get().startMetricRecord("deleteClassification-3");
// remove classifications from associated entity
if (LOG.isDebugEnabled()) {
LOG.debug("Removing classification: [{}] from: [{}][{}] with edge label: [{}]", classificationName,
getTypeName(entityVertex), entityGuid, CLASSIFICATION_LABEL);
}

AtlasEdge edge = getClassificationEdge(entityVertex, classificationVertex);
AtlasEdge edge = getClassificationEdge(entityVertex, classificationVertex); //TODO : clean

deleteDelegate.getHandler().deleteEdgeReference(edge, CLASSIFICATION, false, true, entityVertex);

Expand All @@ -3657,17 +3667,21 @@ public void deleteClassification(String entityGuid, String classificationName) t
updateModificationMetadata(entityVertex);

if (RequestContext.get().isDelayTagNotifications()) {
AtlasPerfMetrics.MetricRecorder deleteClassification31 = RequestContext.get().startMetricRecord("deleteClassification-31");
RequestContext.get().addDeletedClassificationAndVertices(classification, new ArrayList<>(entityVertices));
RequestContext.get().endMetricRecord(deleteClassification31);
} else if (CollectionUtils.isNotEmpty(entityVertices)) {
List<AtlasEntity> propagatedEntities = updateClassificationText(classification, entityVertices);

//Sending audit request for all entities at once
entityChangeNotifier.onClassificationsDeletedFromEntities(propagatedEntities, Collections.singletonList(classification));
entityChangeNotifier.onClassificationsDeletedFromEntities(propagatedEntities, Collections.singletonList(classification)); //TODO
}
RequestContext.get().endMetricRecord(deleteClassification3);
AtlasPerfTracer.log(perf);
}

private boolean isTaskMatchingWithVertexIdAndEntityGuid(AtlasTask task, String classificationVertexId, String entityGuid) {
AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("isTaskMatchingWithVertexIdAndEntityGuid");
try {
if (CLASSIFICATION_PROPAGATION_ADD.equals(task.getType())) {
return task.getParameters().get(ClassificationTask.PARAM_CLASSIFICATION_VERTEX_ID).equals(classificationVertexId)
Expand All @@ -3676,6 +3690,7 @@ private boolean isTaskMatchingWithVertexIdAndEntityGuid(AtlasTask task, String c
} catch (NullPointerException npe) {
LOG.warn("Task classificationVertexId or entityGuid is null");
}
RequestContext.get().endMetricRecord(metric);
return false;
}

Expand Down Expand Up @@ -4384,9 +4399,11 @@ private void validateClassificationExists(List<String> existingClassifications,
}

private void validateClassificationExists(List<String> existingClassifications, String suppliedClassificationName) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("validateClassificationExists");
if (!existingClassifications.contains(suppliedClassificationName)) {
throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_ASSOCIATED_WITH_ENTITY, suppliedClassificationName);
}
RequestContext.get().endMetricRecord(metric);
}

private AtlasEdge getOrCreateRelationship(AtlasVertex end1Vertex, AtlasVertex end2Vertex, String relationshipName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.graphdb.DirectIndexQueryResult;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -104,6 +105,7 @@ public List<AtlasTask> getPendingTasks() {
}

public List<AtlasTask> getInProgressTasks() {
AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("getInProgressTasks");
List<AtlasTask> ret = new ArrayList<>();

try {
Expand All @@ -124,7 +126,7 @@ public List<AtlasTask> getInProgressTasks() {
} catch (Exception exception) {
LOG.error("Error fetching in progress tasks!", exception);
}

RequestContext.get().endMetricRecord(metric);
return ret;
}

Expand Down Expand Up @@ -166,6 +168,7 @@ public void softDelete(String guid) throws AtlasBaseException{
}

public List<AtlasTask> getByIdsES(List<String> guids) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("getByIdsES");
List<AtlasTask> ret = new ArrayList<>();

List<List<String>> chunkedGuids = ListUtils.partition(guids, 50);
Expand All @@ -188,7 +191,7 @@ public List<AtlasTask> getByIdsES(List<String> guids) throws AtlasBaseException
// adding filtering layer to filter exact tasks
ret.addAll(filterTasksByGuids(result.getTasks(), chunkedGuidList));
}

RequestContext.get().endMetricRecord(metric);
return ret;
}

Expand Down

0 comments on commit eec08a8

Please sign in to comment.