Skip to content

Commit

Permalink
Added refactors based on PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
hr2904 committed Oct 28, 2024
1 parent 3b831f2 commit 05408e0
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public enum AtlasConfiguration {
DEBUG_METRICS_ENABLED("atlas.debug.metrics.enabled", false),
TASKS_USE_ENABLED("atlas.tasks.enabled", true),
TASKS_REQUEUE_GRAPH_QUERY("atlas.tasks.requeue.graph.query", false),
TASKS_IN_PROGRESS_GRAPH_QUERY("atlas.tasks.inprogress.graph.query", false),
TASKS_REQUEUE_POLL_INTERVAL("atlas.tasks.requeue.poll.interval.millis", 60000),
TASKS_QUEUE_SIZE("atlas.tasks.queue.size", 1000),
SESSION_TIMEOUT_SECS("atlas.session.timeout.secs", -1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public List<AtlasTask> getByGuidsES(List<String> guids) throws AtlasBaseExceptio
}

public List<AtlasTask> getInProgressTasks() {
if(AtlasConfiguration.TASKS_REQUEUE_GRAPH_QUERY.getBoolean()) {
if(AtlasConfiguration.TASKS_IN_PROGRESS_GRAPH_QUERY.getBoolean()) {
return registry.getInProgressTasks();
} else {
return registry.getInProgressTasksES();
Expand Down
24 changes: 14 additions & 10 deletions repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.graphdb.DirectIndexQueryResult;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.slf4j.Logger;
Expand All @@ -46,22 +47,28 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.Constants.TASK_GUID;
import static org.apache.atlas.repository.Constants.TASK_STATUS;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;

@Component
public class TaskRegistry {
private static final Logger LOG = LoggerFactory.getLogger(TaskRegistry.class);
public static final int TASK_FETCH_BATCH_SIZE = 100;

private AtlasGraph graph;
private TaskService taskService;
private int queueSize;
private boolean useGraphQuery;

private static final List<Map<String, Object>> STATUS_CLAUSE_LIST = Arrays.asList(mapOf("match", mapOf(TASK_STATUS, AtlasTask.Status.IN_PROGRESS.toString())));
private static final Map<String, Object> QUERY_MAP = mapOf("bool", mapOf("must", STATUS_CLAUSE_LIST));

@Inject
public TaskRegistry(AtlasGraph graph, TaskService taskService) {
this.graph = graph;
Expand Down Expand Up @@ -128,16 +135,13 @@ public List<AtlasTask> getInProgressTasks() {
}

public List<AtlasTask> getInProgressTasksES() {
AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("getInProgressTasksES");
List<AtlasTask> ret = new ArrayList<>();
int size = 100;
int from = 0;
while(true) {
List statusClauseList = new ArrayList();
statusClauseList.add(mapOf("match", mapOf(TASK_STATUS, AtlasTask.Status.IN_PROGRESS.toString())));

Map<String, Object> dsl = mapOf("query", mapOf("bool", mapOf("must", statusClauseList)));
Map<String, Object> dsl = mapOf("query", QUERY_MAP);
dsl.put("sort", Collections.singletonList(mapOf(Constants.TASK_CREATED_TIME, mapOf("order", "asc"))));
dsl.put("size", size);
dsl.put("size", TASK_FETCH_BATCH_SIZE);
dsl.put("from", from);
TaskSearchParams taskSearchParams = new TaskSearchParams();
taskSearchParams.setDsl(dsl);
Expand All @@ -147,15 +151,15 @@ public List<AtlasTask> getInProgressTasksES() {
break;
}
ret.addAll(results);
from+=size;
from += TASK_FETCH_BATCH_SIZE;
} catch (AtlasBaseException exception) {
LOG.error("Error fetching in progress tasks from ES, redirecting to GraphQuery", exception);
exception.printStackTrace();
ret = getInProgressTasks();
return ret;
}
}

RequestContext.get().endMetricRecord(metric);
return ret;
}

Expand Down Expand Up @@ -561,7 +565,7 @@ private AtlasVertex createVertex(AtlasTask task) {
return taskService.createTaskVertex(task);
}

private Map<String, Object> mapOf(String key, Object value) {
private static Map<String, Object> mapOf(String key, Object value) {
Map<String, Object> map = new HashMap<>();
map.put(key, value);

Expand Down

0 comments on commit 05408e0

Please sign in to comment.