Skip to content

Commit

Permalink
Merge pull request #3666 from atlanhq/dg1875
Browse files Browse the repository at this point in the history
DG-1875 setClassification.deleteClassification optimisation fix
  • Loading branch information
hr2904 authored Oct 27, 2024
2 parents e60126c + 6fc4e7a commit 89a73b6
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,11 @@ public List<AtlasTask> getByGuidsES(List<String> guids) throws AtlasBaseExceptio
}

public List<AtlasTask> getInProgressTasks() {
return registry.getInProgressTasks();
if(AtlasConfiguration.TASKS_REQUEUE_GRAPH_QUERY.getBoolean()) {
return registry.getInProgressTasks();
} else {
return registry.getInProgressTasksES();
}
}

public void deleteByGuid(String guid) throws AtlasBaseException {
Expand Down
33 changes: 31 additions & 2 deletions repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.atlas.repository.Constants.TASK_GUID;
import static org.apache.atlas.repository.Constants.TASK_STATUS;
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;

@Component
Expand Down Expand Up @@ -128,6 +127,36 @@ public List<AtlasTask> getInProgressTasks() {
return ret;
}

public List<AtlasTask> getInProgressTasksES() {
List<AtlasTask> ret = new ArrayList<>();
int size = 100;
try {
int from = 0;
while(true) {
List statusClauseList = new ArrayList();
statusClauseList.add(mapOf("match", mapOf(TASK_STATUS, AtlasTask.Status.IN_PROGRESS.toString())));

Map<String, Object> dsl = mapOf("query", mapOf("bool", mapOf("must", statusClauseList)));
dsl.put("sort", Collections.singletonList(mapOf(Constants.TASK_CREATED_TIME, mapOf("order", "asc"))));
dsl.put("size", size);
dsl.put("from", from);
TaskSearchParams taskSearchParams = new TaskSearchParams();
taskSearchParams.setDsl(dsl);
List<AtlasTask> results = taskService.getTasks(taskSearchParams).getTasks();
if (results.isEmpty()){
break;
}
ret.addAll(results);
from+=size;
}
} catch (Exception exception) {
LOG.error("Error fetching in progress tasks!", exception);
exception.printStackTrace();
}

return ret;
}

@GraphTransaction
public void updateStatus(AtlasVertex taskVertex, AtlasTask task) {
if (taskVertex == null) {
Expand Down

0 comments on commit 89a73b6

Please sign in to comment.