From 517a81073af26c89203f6279b4ca19bf3e0d89c2 Mon Sep 17 00:00:00 2001 From: hr2904 Date: Fri, 25 Oct 2024 15:37:51 +0530 Subject: [PATCH 1/2] Removed the getInProgressTasks method flow, which is taking 90% of the response time. --- .../repository/store/graph/v2/EntityGraphMapper.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java index b3d6495fbb..ea99bbeae2 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java @@ -3567,12 +3567,12 @@ public void deleteClassification(String entityGuid, String classificationName) t AtlasVertex classificationVertex = getClassificationVertex(entityVertex, classificationName); // Get in progress task to see if there already is a propagation for this particular vertex - List inProgressTasks = taskManagement.getInProgressTasks(); - for (AtlasTask task : inProgressTasks) { - if (isTaskMatchingWithVertexIdAndEntityGuid(task, classificationVertex.getIdForDisplay(), entityGuid)) { - throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_CURRENTLY_BEING_PROPAGATED, classificationName); - } - } +// List inProgressTasks = taskManagement.getInProgressTasks(); +// for (AtlasTask task : inProgressTasks) { +// if (isTaskMatchingWithVertexIdAndEntityGuid(task, classificationVertex.getIdForDisplay(), entityGuid)) { +// throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_CURRENTLY_BEING_PROPAGATED, classificationName); +// } +// } AtlasClassification classification = entityRetriever.toAtlasClassification(classificationVertex); From 6fc4e7aa55b7e3b8249be233bacbd21d513bff6c Mon Sep 17 00:00:00 2001 From: hr2904 Date: Sun, 27 Oct 2024 19:43:15 +0530 Subject: [PATCH 2/2] Re-added the getInProgressTasks method flow, but with ES fetching mechanism. --- .../store/graph/v2/EntityGraphMapper.java | 12 +++---- .../apache/atlas/tasks/TaskManagement.java | 6 +++- .../org/apache/atlas/tasks/TaskRegistry.java | 33 +++++++++++++++++-- 3 files changed, 42 insertions(+), 9 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java index ea99bbeae2..b3d6495fbb 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java @@ -3567,12 +3567,12 @@ public void deleteClassification(String entityGuid, String classificationName) t AtlasVertex classificationVertex = getClassificationVertex(entityVertex, classificationName); // Get in progress task to see if there already is a propagation for this particular vertex -// List inProgressTasks = taskManagement.getInProgressTasks(); -// for (AtlasTask task : inProgressTasks) { -// if (isTaskMatchingWithVertexIdAndEntityGuid(task, classificationVertex.getIdForDisplay(), entityGuid)) { -// throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_CURRENTLY_BEING_PROPAGATED, classificationName); -// } -// } + List inProgressTasks = taskManagement.getInProgressTasks(); + for (AtlasTask task : inProgressTasks) { + if (isTaskMatchingWithVertexIdAndEntityGuid(task, classificationVertex.getIdForDisplay(), entityGuid)) { + throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_CURRENTLY_BEING_PROPAGATED, classificationName); + } + } AtlasClassification classification = entityRetriever.toAtlasClassification(classificationVertex); diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java b/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java index eb39621193..28cc562e28 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java @@ -208,7 +208,11 @@ public List getByGuidsES(List guids) throws AtlasBaseExceptio } public List getInProgressTasks() { - return registry.getInProgressTasks(); + if(AtlasConfiguration.TASKS_REQUEUE_GRAPH_QUERY.getBoolean()) { + return registry.getInProgressTasks(); + } else { + return registry.getInProgressTasksES(); + } } public void deleteByGuid(String guid) throws AtlasBaseException { diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java index 1becc45fba..44b317d042 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java @@ -50,8 +50,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import static org.apache.atlas.repository.Constants.TASK_GUID; -import static org.apache.atlas.repository.Constants.TASK_STATUS; +import static org.apache.atlas.repository.Constants.*; import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty; @Component @@ -128,6 +127,36 @@ public List getInProgressTasks() { return ret; } + public List getInProgressTasksES() { + List ret = new ArrayList<>(); + int size = 100; + try { + int from = 0; + while(true) { + List statusClauseList = new ArrayList(); + statusClauseList.add(mapOf("match", mapOf(TASK_STATUS, AtlasTask.Status.IN_PROGRESS.toString()))); + + Map dsl = mapOf("query", mapOf("bool", mapOf("must", statusClauseList))); + dsl.put("sort", Collections.singletonList(mapOf(Constants.TASK_CREATED_TIME, mapOf("order", "asc")))); + dsl.put("size", size); + dsl.put("from", from); + TaskSearchParams taskSearchParams = new TaskSearchParams(); + taskSearchParams.setDsl(dsl); + List results = taskService.getTasks(taskSearchParams).getTasks(); + if (results.isEmpty()){ + break; + } + ret.addAll(results); + from+=size; + } + } catch (Exception exception) { + LOG.error("Error fetching in progress tasks!", exception); + exception.printStackTrace(); + } + + return ret; + } + @GraphTransaction public void updateStatus(AtlasVertex taskVertex, AtlasTask task) { if (taskVertex == null) {