From 3b831f2779fb70281015c9e8d6a678ac8f3ea1e4 Mon Sep 17 00:00:00 2001 From: hr2904 Date: Mon, 28 Oct 2024 14:15:47 +0530 Subject: [PATCH 1/3] Added code refactors based on PR comments --- .../org/apache/atlas/tasks/TaskRegistry.java | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java index 44b317d042..0ae105b633 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java @@ -130,8 +130,7 @@ public List getInProgressTasks() { public List getInProgressTasksES() { List ret = new ArrayList<>(); int size = 100; - try { - int from = 0; + int from = 0; while(true) { List statusClauseList = new ArrayList(); statusClauseList.add(mapOf("match", mapOf(TASK_STATUS, AtlasTask.Status.IN_PROGRESS.toString()))); @@ -142,17 +141,20 @@ public List getInProgressTasksES() { dsl.put("from", from); TaskSearchParams taskSearchParams = new TaskSearchParams(); taskSearchParams.setDsl(dsl); - List results = taskService.getTasks(taskSearchParams).getTasks(); - if (results.isEmpty()){ - break; + try { + List results = taskService.getTasks(taskSearchParams).getTasks(); + if (results.isEmpty()){ + break; + } + ret.addAll(results); + from+=size; + } catch (AtlasBaseException exception) { + LOG.error("Error fetching in progress tasks from ES, redirecting to GraphQuery", exception); + exception.printStackTrace(); + ret = getInProgressTasks(); + return ret; } - ret.addAll(results); - from+=size; } - } catch (Exception exception) { - LOG.error("Error fetching in progress tasks!", exception); - exception.printStackTrace(); - } return ret; } From 05408e0dd1863307a06f5607d7e3394ae657a8b8 Mon Sep 17 00:00:00 2001 From: hr2904 Date: Mon, 28 Oct 2024 17:47:21 +0530 Subject: [PATCH 2/3] Added refactors based on PR comments --- .../org/apache/atlas/AtlasConfiguration.java | 1 + .../apache/atlas/tasks/TaskManagement.java | 2 +- .../org/apache/atlas/tasks/TaskRegistry.java | 24 +++++++++++-------- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java index 3e23999c40..c12fcbb24e 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java +++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java @@ -84,6 +84,7 @@ public enum AtlasConfiguration { DEBUG_METRICS_ENABLED("atlas.debug.metrics.enabled", false), TASKS_USE_ENABLED("atlas.tasks.enabled", true), TASKS_REQUEUE_GRAPH_QUERY("atlas.tasks.requeue.graph.query", false), + TASKS_IN_PROGRESS_GRAPH_QUERY("atlas.tasks.inprogress.graph.query", false), TASKS_REQUEUE_POLL_INTERVAL("atlas.tasks.requeue.poll.interval.millis", 60000), TASKS_QUEUE_SIZE("atlas.tasks.queue.size", 1000), SESSION_TIMEOUT_SECS("atlas.session.timeout.secs", -1), diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java b/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java index 28cc562e28..c0e071f64e 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java @@ -208,7 +208,7 @@ public List getByGuidsES(List guids) throws AtlasBaseExceptio } public List getInProgressTasks() { - if(AtlasConfiguration.TASKS_REQUEUE_GRAPH_QUERY.getBoolean()) { + if(AtlasConfiguration.TASKS_IN_PROGRESS_GRAPH_QUERY.getBoolean()) { return registry.getInProgressTasks(); } else { return registry.getInProgressTasksES(); diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java index 0ae105b633..441aa9c52d 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java @@ -32,6 +32,7 @@ import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.graphdb.DirectIndexQueryResult; import org.apache.atlas.type.AtlasType; +import org.apache.atlas.utils.AtlasPerfMetrics; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections4.ListUtils; import org.slf4j.Logger; @@ -46,22 +47,28 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Arrays; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import static org.apache.atlas.repository.Constants.*; +import static org.apache.atlas.repository.Constants.TASK_GUID; +import static org.apache.atlas.repository.Constants.TASK_STATUS; import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty; @Component public class TaskRegistry { private static final Logger LOG = LoggerFactory.getLogger(TaskRegistry.class); + public static final int TASK_FETCH_BATCH_SIZE = 100; private AtlasGraph graph; private TaskService taskService; private int queueSize; private boolean useGraphQuery; + private static final List> STATUS_CLAUSE_LIST = Arrays.asList(mapOf("match", mapOf(TASK_STATUS, AtlasTask.Status.IN_PROGRESS.toString()))); + private static final Map QUERY_MAP = mapOf("bool", mapOf("must", STATUS_CLAUSE_LIST)); + @Inject public TaskRegistry(AtlasGraph graph, TaskService taskService) { this.graph = graph; @@ -128,16 +135,13 @@ public List getInProgressTasks() { } public List getInProgressTasksES() { + AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("getInProgressTasksES"); List ret = new ArrayList<>(); - int size = 100; int from = 0; while(true) { - List statusClauseList = new ArrayList(); - statusClauseList.add(mapOf("match", mapOf(TASK_STATUS, AtlasTask.Status.IN_PROGRESS.toString()))); - - Map dsl = mapOf("query", mapOf("bool", mapOf("must", statusClauseList))); + Map dsl = mapOf("query", QUERY_MAP); dsl.put("sort", Collections.singletonList(mapOf(Constants.TASK_CREATED_TIME, mapOf("order", "asc")))); - dsl.put("size", size); + dsl.put("size", TASK_FETCH_BATCH_SIZE); dsl.put("from", from); TaskSearchParams taskSearchParams = new TaskSearchParams(); taskSearchParams.setDsl(dsl); @@ -147,7 +151,7 @@ public List getInProgressTasksES() { break; } ret.addAll(results); - from+=size; + from += TASK_FETCH_BATCH_SIZE; } catch (AtlasBaseException exception) { LOG.error("Error fetching in progress tasks from ES, redirecting to GraphQuery", exception); exception.printStackTrace(); @@ -155,7 +159,7 @@ public List getInProgressTasksES() { return ret; } } - + RequestContext.get().endMetricRecord(metric); return ret; } @@ -561,7 +565,7 @@ private AtlasVertex createVertex(AtlasTask task) { return taskService.createTaskVertex(task); } - private Map mapOf(String key, Object value) { + private static Map mapOf(String key, Object value) { Map map = new HashMap<>(); map.put(key, value); From 0a233e0408f57f623146a0d00e106647749bf7f2 Mon Sep 17 00:00:00 2001 From: hr2904 Date: Mon, 28 Oct 2024 18:45:20 +0530 Subject: [PATCH 3/3] Added refactors based on PR comments - 2 --- .../src/main/java/org/apache/atlas/tasks/TaskRegistry.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java index 441aa9c52d..8c08a26121 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java @@ -60,6 +60,7 @@ public class TaskRegistry { private static final Logger LOG = LoggerFactory.getLogger(TaskRegistry.class); public static final int TASK_FETCH_BATCH_SIZE = 100; + public static final List> SORT_ARRAY = Collections.singletonList(mapOf(Constants.TASK_CREATED_TIME, mapOf("order", "asc"))); private AtlasGraph graph; private TaskService taskService; @@ -137,11 +138,11 @@ public List getInProgressTasks() { public List getInProgressTasksES() { AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("getInProgressTasksES"); List ret = new ArrayList<>(); + Map dsl = mapOf("query", QUERY_MAP); + dsl.put("sort", SORT_ARRAY); + dsl.put("size", TASK_FETCH_BATCH_SIZE); int from = 0; while(true) { - Map dsl = mapOf("query", QUERY_MAP); - dsl.put("sort", Collections.singletonList(mapOf(Constants.TASK_CREATED_TIME, mapOf("order", "asc")))); - dsl.put("size", TASK_FETCH_BATCH_SIZE); dsl.put("from", from); TaskSearchParams taskSearchParams = new TaskSearchParams(); taskSearchParams.setDsl(dsl);