Skip to content

Commit

Permalink
Merge branch 'master' into beta-temp-0
Browse files Browse the repository at this point in the history
  • Loading branch information
nikhilbonte21 committed Oct 29, 2024
2 parents 96d2048 + 30d5879 commit f200f94
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public enum AtlasConfiguration {
DEBUG_METRICS_ENABLED("atlas.debug.metrics.enabled", false),
TASKS_USE_ENABLED("atlas.tasks.enabled", true),
TASKS_REQUEUE_GRAPH_QUERY("atlas.tasks.requeue.graph.query", false),
TASKS_IN_PROGRESS_GRAPH_QUERY("atlas.tasks.inprogress.graph.query", false),
TASKS_REQUEUE_POLL_INTERVAL("atlas.tasks.requeue.poll.interval.millis", 60000),
TASKS_QUEUE_SIZE("atlas.tasks.queue.size", 1000),
SESSION_TIMEOUT_SECS("atlas.session.timeout.secs", -1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public List<AtlasTask> getByGuidsES(List<String> guids) throws AtlasBaseExceptio
}

public List<AtlasTask> getInProgressTasks() {
if(AtlasConfiguration.TASKS_REQUEUE_GRAPH_QUERY.getBoolean()) {
if(AtlasConfiguration.TASKS_IN_PROGRESS_GRAPH_QUERY.getBoolean()) {
return registry.getInProgressTasks();
} else {
return registry.getInProgressTasksES();
Expand Down
49 changes: 28 additions & 21 deletions repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.graphdb.DirectIndexQueryResult;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.slf4j.Logger;
Expand All @@ -46,22 +47,29 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.Constants.TASK_GUID;
import static org.apache.atlas.repository.Constants.TASK_STATUS;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;

@Component
public class TaskRegistry {
private static final Logger LOG = LoggerFactory.getLogger(TaskRegistry.class);
public static final int TASK_FETCH_BATCH_SIZE = 100;
public static final List<Map<String, Object>> SORT_ARRAY = Collections.singletonList(mapOf(Constants.TASK_CREATED_TIME, mapOf("order", "asc")));

private AtlasGraph graph;
private TaskService taskService;
private int queueSize;
private boolean useGraphQuery;

private static final List<Map<String, Object>> STATUS_CLAUSE_LIST = Arrays.asList(mapOf("match", mapOf(TASK_STATUS, AtlasTask.Status.IN_PROGRESS.toString())));
private static final Map<String, Object> QUERY_MAP = mapOf("bool", mapOf("must", STATUS_CLAUSE_LIST));

@Inject
public TaskRegistry(AtlasGraph graph, TaskService taskService) {
this.graph = graph;
Expand Down Expand Up @@ -128,32 +136,31 @@ public List<AtlasTask> getInProgressTasks() {
}

public List<AtlasTask> getInProgressTasksES() {
AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("getInProgressTasksES");
List<AtlasTask> ret = new ArrayList<>();
int size = 100;
try {
int from = 0;
Map<String, Object> dsl = mapOf("query", QUERY_MAP);
dsl.put("sort", SORT_ARRAY);
dsl.put("size", TASK_FETCH_BATCH_SIZE);
int from = 0;
while(true) {
List statusClauseList = new ArrayList();
statusClauseList.add(mapOf("match", mapOf(TASK_STATUS, AtlasTask.Status.IN_PROGRESS.toString())));

Map<String, Object> dsl = mapOf("query", mapOf("bool", mapOf("must", statusClauseList)));
dsl.put("sort", Collections.singletonList(mapOf(Constants.TASK_CREATED_TIME, mapOf("order", "asc"))));
dsl.put("size", size);
dsl.put("from", from);
TaskSearchParams taskSearchParams = new TaskSearchParams();
taskSearchParams.setDsl(dsl);
List<AtlasTask> results = taskService.getTasks(taskSearchParams).getTasks();
if (results.isEmpty()){
break;
try {
List<AtlasTask> results = taskService.getTasks(taskSearchParams).getTasks();
if (results.isEmpty()){
break;
}
ret.addAll(results);
from += TASK_FETCH_BATCH_SIZE;
} catch (AtlasBaseException exception) {
LOG.error("Error fetching in progress tasks from ES, redirecting to GraphQuery", exception);
exception.printStackTrace();
ret = getInProgressTasks();
return ret;
}
ret.addAll(results);
from+=size;
}
} catch (Exception exception) {
LOG.error("Error fetching in progress tasks!", exception);
exception.printStackTrace();
}

RequestContext.get().endMetricRecord(metric);
return ret;
}

Expand Down Expand Up @@ -559,7 +566,7 @@ private AtlasVertex createVertex(AtlasTask task) {
return taskService.createTaskVertex(task);
}

private Map<String, Object> mapOf(String key, Object value) {
private static Map<String, Object> mapOf(String key, Object value) {
Map<String, Object> map = new HashMap<>();
map.put(key, value);

Expand Down

0 comments on commit f200f94

Please sign in to comment.