Skip to content

Commit

Permalink
Merge pull request #3675 from atlanhq/master
Browse files Browse the repository at this point in the history
Sync staging with master
  • Loading branch information
nikhilbonte21 authored Oct 29, 2024
2 parents d6f65e7 + 30d5879 commit 9dc8db1
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public enum AtlasConfiguration {
DEBUG_METRICS_ENABLED("atlas.debug.metrics.enabled", false),
TASKS_USE_ENABLED("atlas.tasks.enabled", true),
TASKS_REQUEUE_GRAPH_QUERY("atlas.tasks.requeue.graph.query", false),
TASKS_IN_PROGRESS_GRAPH_QUERY("atlas.tasks.inprogress.graph.query", false),
TASKS_REQUEUE_POLL_INTERVAL("atlas.tasks.requeue.poll.interval.millis", 60000),
TASKS_QUEUE_SIZE("atlas.tasks.queue.size", 1000),
SESSION_TIMEOUT_SECS("atlas.session.timeout.secs", -1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.PARTIAL_UPDATE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
import static org.apache.atlas.model.tasks.AtlasTask.Status.IN_PROGRESS;
import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality.SET;
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdge;
Expand Down Expand Up @@ -3569,7 +3570,7 @@ public void deleteClassification(String entityGuid, String classificationName) t
// Get in progress task to see if there already is a propagation for this particular vertex
List<AtlasTask> inProgressTasks = taskManagement.getInProgressTasks();
for (AtlasTask task : inProgressTasks) {
if (isTaskMatchingWithVertexIdAndEntityGuid(task, classificationVertex.getIdForDisplay(), entityGuid)) {
if (IN_PROGRESS.equals(task.getStatus()) && isTaskMatchingWithVertexIdAndEntityGuid(task, classificationVertex.getIdForDisplay(), entityGuid)) {
throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_CURRENTLY_BEING_PROPAGATED, classificationName);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,11 @@ public List<AtlasTask> getByGuidsES(List<String> guids) throws AtlasBaseExceptio
}

public List<AtlasTask> getInProgressTasks() {
return registry.getInProgressTasks();
if(AtlasConfiguration.TASKS_IN_PROGRESS_GRAPH_QUERY.getBoolean()) {
return registry.getInProgressTasks();
} else {
return registry.getInProgressTasksES();
}
}

public void deleteByGuid(String guid) throws AtlasBaseException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.graphdb.DirectIndexQueryResult;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.slf4j.Logger;
Expand All @@ -46,6 +47,7 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand All @@ -57,12 +59,17 @@
@Component
public class TaskRegistry {
private static final Logger LOG = LoggerFactory.getLogger(TaskRegistry.class);
public static final int TASK_FETCH_BATCH_SIZE = 100;
public static final List<Map<String, Object>> SORT_ARRAY = Collections.singletonList(mapOf(Constants.TASK_CREATED_TIME, mapOf("order", "asc")));

private AtlasGraph graph;
private TaskService taskService;
private int queueSize;
private boolean useGraphQuery;

private static final List<Map<String, Object>> STATUS_CLAUSE_LIST = Arrays.asList(mapOf("match", mapOf(TASK_STATUS, AtlasTask.Status.IN_PROGRESS.toString())));
private static final Map<String, Object> QUERY_MAP = mapOf("bool", mapOf("must", STATUS_CLAUSE_LIST));

@Inject
public TaskRegistry(AtlasGraph graph, TaskService taskService) {
this.graph = graph;
Expand Down Expand Up @@ -128,6 +135,35 @@ public List<AtlasTask> getInProgressTasks() {
return ret;
}

public List<AtlasTask> getInProgressTasksES() {
AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("getInProgressTasksES");
List<AtlasTask> ret = new ArrayList<>();
Map<String, Object> dsl = mapOf("query", QUERY_MAP);
dsl.put("sort", SORT_ARRAY);
dsl.put("size", TASK_FETCH_BATCH_SIZE);
int from = 0;
while(true) {
dsl.put("from", from);
TaskSearchParams taskSearchParams = new TaskSearchParams();
taskSearchParams.setDsl(dsl);
try {
List<AtlasTask> results = taskService.getTasks(taskSearchParams).getTasks();
if (results.isEmpty()){
break;
}
ret.addAll(results);
from += TASK_FETCH_BATCH_SIZE;
} catch (AtlasBaseException exception) {
LOG.error("Error fetching in progress tasks from ES, redirecting to GraphQuery", exception);
exception.printStackTrace();
ret = getInProgressTasks();
return ret;
}
}
RequestContext.get().endMetricRecord(metric);
return ret;
}

@GraphTransaction
public void updateStatus(AtlasVertex taskVertex, AtlasTask task) {
if (taskVertex == null) {
Expand Down Expand Up @@ -530,7 +566,7 @@ private AtlasVertex createVertex(AtlasTask task) {
return taskService.createTaskVertex(task);
}

private Map<String, Object> mapOf(String key, Object value) {
private static Map<String, Object> mapOf(String key, Object value) {
Map<String, Object> map = new HashMap<>();
map.put(key, value);

Expand Down

0 comments on commit 9dc8db1

Please sign in to comment.