Skip to content

Commit

Permalink
Merge pull request #4119 from atlanhq/dg2027
Browse files Browse the repository at this point in the history
DG-2027 : Added metrics for number of mismatches found in a single taskFetch call
  • Loading branch information
hr2904 authored Feb 11, 2025
2 parents 72e6e57 + b34ec9a commit fb9a275
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 11 deletions.
20 changes: 20 additions & 0 deletions common/src/main/java/org/apache/atlas/utils/AtlasPerfMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,23 @@ public void recordMetric(MetricRecorder recorder) {
}
}

public void recordMetric(MetricRecorder recorder, long invocations) {
if (recorder != null) {
final String name = recorder.name;
final long timeTaken = recorder.getElapsedTime();
Metric metric = metrics.get(name);

if (metric == null) {
metric = new Metric(name);

metrics.put(name, metric);
}

metric.invocations += invocations;
metric.totalTimeMSecs += timeTaken;
}
}

public void clear() {
metrics.clear();
}
Expand Down Expand Up @@ -149,5 +166,8 @@ public void incrementInvocations() {
invocations++;
}

public void setInvocations(long invocations) {
this.invocations = invocations;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.atlas.RequestContext;
import org.apache.atlas.model.tasks.AtlasTask;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.service.metrics.MetricsRegistry;
import org.apache.atlas.service.redis.RedisService;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.utils.AtlasPerfTracer;
Expand Down Expand Up @@ -49,13 +50,14 @@ public class TaskExecutor {
private final ICuratorFactory curatorFactory;
private final boolean isActiveActiveHAEnabled;
private final String zkRoot;
private final MetricsRegistry metricRegistry;

private TaskQueueWatcher watcher;
private Thread watcherThread;
private RedisService redisService;

public TaskExecutor(TaskRegistry registry, Map<String, TaskFactory> taskTypeFactoryMap, TaskManagement.Statistics statistics,
ICuratorFactory curatorFactory, RedisService redisService, final String zkRoot, boolean isActiveActiveHAEnabled) {
ICuratorFactory curatorFactory, RedisService redisService, final String zkRoot, boolean isActiveActiveHAEnabled, MetricsRegistry metricsRegistry) {
this.taskExecutorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(TASK_NAME_FORMAT + Thread.currentThread().getName())
Expand All @@ -68,11 +70,12 @@ public TaskExecutor(TaskRegistry registry, Map<String, TaskFactory> taskTypeFact
this.redisService = redisService;
this.isActiveActiveHAEnabled = isActiveActiveHAEnabled;
this.zkRoot = zkRoot;
this.metricRegistry = metricsRegistry;
}

public Thread startWatcherThread() {

watcher = new TaskQueueWatcher(taskExecutorService, registry, taskTypeFactoryMap, statistics, curatorFactory, redisService, zkRoot, isActiveActiveHAEnabled);
watcher = new TaskQueueWatcher(taskExecutorService, registry, taskTypeFactoryMap, statistics, curatorFactory, redisService, zkRoot, isActiveActiveHAEnabled, metricRegistry);
watcherThread = new Thread(watcher);
watcherThread.start();
return watcherThread;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.model.tasks.AtlasTask;
import org.apache.atlas.service.Service;
import org.apache.atlas.service.metrics.MetricsRegistry;
import org.apache.atlas.service.redis.RedisService;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
Expand All @@ -45,6 +46,7 @@
@Order(7)
public class TaskManagement implements Service, ActiveStateChangeHandler {
private static final Logger LOG = LoggerFactory.getLogger(TaskManagement.class);
private final MetricsRegistry metricRegistry;

private TaskExecutor taskExecutor;
private final Configuration configuration;
Expand All @@ -61,19 +63,21 @@ public enum DeleteType {
}

@Inject
public TaskManagement(Configuration configuration, TaskRegistry taskRegistry, ICuratorFactory curatorFactory, RedisService redisService) {
public TaskManagement(Configuration configuration, TaskRegistry taskRegistry, ICuratorFactory curatorFactory, RedisService redisService, MetricsRegistry metricsRegistry) {
this.configuration = configuration;
this.registry = taskRegistry;
this.redisService = redisService;
this.statistics = new Statistics();
this.taskTypeFactoryMap = new HashMap<>();
this.curatorFactory = curatorFactory;
this.metricRegistry = metricsRegistry;
}

@VisibleForTesting
TaskManagement(Configuration configuration, TaskRegistry taskRegistry, TaskFactory taskFactory, ICuratorFactory curatorFactory, RedisService redisService) {
this.configuration = configuration;
this.registry = taskRegistry;
this.metricRegistry = null;
this.redisService = redisService;
this.statistics = new Statistics();
this.taskTypeFactoryMap = new HashMap<>();
Expand Down Expand Up @@ -251,7 +255,7 @@ private synchronized void startWatcherThread() {
if (this.taskExecutor == null) {
final boolean isActiveActiveHAEnabled = HAConfiguration.isActiveActiveHAEnabled(configuration);
final String zkRoot = HAConfiguration.getZookeeperProperties(configuration).getZkRoot();
this.taskExecutor = new TaskExecutor(registry, taskTypeFactoryMap, statistics, curatorFactory, redisService, zkRoot,isActiveActiveHAEnabled);
this.taskExecutor = new TaskExecutor(registry, taskTypeFactoryMap, statistics, curatorFactory, redisService, zkRoot,isActiveActiveHAEnabled, metricRegistry);
}

if (watcherThread == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.ICuratorFactory;
import org.apache.atlas.RequestContext;
import org.apache.atlas.model.tasks.AtlasTask;
import org.apache.atlas.service.metrics.MetricsRegistry;
import org.apache.atlas.service.redis.RedisService;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
Expand All @@ -41,6 +43,7 @@ public class TaskQueueWatcher implements Runnable {
private static final TaskExecutor.TaskLogger TASK_LOG = TaskExecutor.TaskLogger.getLogger();
private final String zkRoot;
private final boolean isActiveActiveHAEnabled;
private final MetricsRegistry metricRegistry;

private TaskRegistry registry;
private final ExecutorService executorService;
Expand All @@ -57,7 +60,7 @@ public class TaskQueueWatcher implements Runnable {

public TaskQueueWatcher(ExecutorService executorService, TaskRegistry registry,
Map<String, TaskFactory> taskTypeFactoryMap, TaskManagement.Statistics statistics,
ICuratorFactory curatorFactory, RedisService redisService, final String zkRoot, boolean isActiveActiveHAEnabled) {
ICuratorFactory curatorFactory, RedisService redisService, final String zkRoot, boolean isActiveActiveHAEnabled, MetricsRegistry metricsRegistry) {

this.registry = registry;
this.executorService = executorService;
Expand All @@ -67,6 +70,7 @@ public TaskQueueWatcher(ExecutorService executorService, TaskRegistry registry,
this.redisService = redisService;
this.zkRoot = zkRoot;
this.isActiveActiveHAEnabled = isActiveActiveHAEnabled;
this.metricRegistry = metricsRegistry;
}

public void shutdown() {
Expand All @@ -87,6 +91,8 @@ public void run() {
LOG.debug("TaskQueueWatcher: running {}:{}", Thread.currentThread().getName(), Thread.currentThread().getId());
}
while (shouldRun.get()) {
RequestContext requestContext = RequestContext.get();
requestContext.setMetricRegistry(this.metricRegistry);
TasksFetcher fetcher = new TasksFetcher(registry);
try {
if (!redisService.acquireDistributedLock(ATLAS_TASK_LOCK)) {
Expand Down Expand Up @@ -156,6 +162,8 @@ public void run() {
}

this.tasks = registry.getTasksForReQueue();
RequestContext requestContext = RequestContext.get();
requestContext.clearCache();
}

public List<AtlasTask> getTasks() {
Expand Down
33 changes: 27 additions & 6 deletions repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.atlas.repository.graphdb.DirectIndexQueryResult;
import org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchQuery;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.utils.AtlasMetricType;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
Expand All @@ -53,6 +54,7 @@
import java.util.List;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.LinkedHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand All @@ -67,6 +69,7 @@ public class TaskRegistry {
public static final int TASK_FETCH_BATCH_SIZE = 100;
public static final List<Map<String, Object>> SORT_ARRAY = Collections.singletonList(mapOf(Constants.TASK_CREATED_TIME, mapOf("order", "asc")));
public static final String JANUSGRAPH_VERTEX_INDEX = "janusgraph_vertex_index";
public static final String TASK_MISMATCH_TAG = "mismatchTask";

private AtlasGraph graph;
private TaskService taskService;
Expand Down Expand Up @@ -398,6 +401,7 @@ public List<AtlasTask> getTasksForReQueueIndexSearch() {
Map<String, Object> dsl = mapOf("query", mapOf("bool", mapOf("should", statusClauseList)));
dsl.put("sort", Collections.singletonList(mapOf(Constants.TASK_CREATED_TIME, mapOf("order", "asc"))));
dsl.put("size", size);
long mismatches = 0;
int totalFetched = 0;
while (true) {
int fetched = 0;
Expand Down Expand Up @@ -436,8 +440,14 @@ public List<AtlasTask> getTasksForReQueueIndexSearch() {
} else {
LOG.warn("Status mismatch for task with guid: {}. Expected PENDING/IN_PROGRESS but found: {}",
atlasTask.getGuid(), atlasTask.getStatus());
String docId = LongEncoding.encode(Long.parseLong(vertex.getIdForDisplay()));
repairMismatchedTask(atlasTask, docId);
mismatches++;
try {
String docId = LongEncoding.encode(Long.parseLong(vertex.getIdForDisplay()));
repairMismatchedTask(atlasTask, docId);
}
catch (Exception e){
e.printStackTrace();
}
}
} else {
LOG.warn("Null vertex while re-queuing tasks at index {}", fetched);
Expand All @@ -456,7 +466,14 @@ public List<AtlasTask> getTasksForReQueueIndexSearch() {
break;
}
}

if(mismatches > 0) {
AtlasPerfMetrics.Metric mismatchMetrics = new AtlasPerfMetrics.Metric(TASK_MISMATCH_TAG);
mismatchMetrics.setMetricType(AtlasMetricType.COUNTER);
mismatchMetrics.addTag("name", TASK_MISMATCH_TAG);
mismatchMetrics.setInvocations(mismatches);
mismatchMetrics.setTotalTimeMSecs(0);
RequestContext.get().addApplicationMetrics(mismatchMetrics);
}
return ret;
}

Expand All @@ -466,10 +483,14 @@ private void repairMismatchedTask(AtlasTask atlasTask, String docId) {
try {
// Create a map for the fields to be updated
Map<String, Object> fieldsToUpdate = new HashMap<>();
fieldsToUpdate.put("__task_endTime", atlasTask.getEndTime().getTime());
fieldsToUpdate.put("__task_timeTakenInSeconds", atlasTask.getTimeTakenInSeconds());
if(Objects.nonNull(atlasTask.getEndTime())) {
fieldsToUpdate.put("__task_endTime", atlasTask.getEndTime().getTime());
}
if(Objects.nonNull(atlasTask.getTimeTakenInSeconds())) {
fieldsToUpdate.put("__task_timeTakenInSeconds", atlasTask.getTimeTakenInSeconds());
}
fieldsToUpdate.put("__task_status", atlasTask.getStatus().toString());
fieldsToUpdate.put("__task_modificationTimestamp", atlasTask.getUpdatedTime().getTime()); // Set current timestamp
fieldsToUpdate.put("__task_modificationTimestamp", atlasTask.getUpdatedTime().getTime());

// Convert fieldsToUpdate map to JSON using Jackson
ObjectMapper objectMapper = new ObjectMapper();
Expand Down
6 changes: 6 additions & 0 deletions server-api/src/main/java/org/apache/atlas/RequestContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,12 @@ public void endMetricRecord(MetricRecorder recorder) {
}
}

public void endMetricRecord(MetricRecorder recorder,long invocations){
if (metrics != null && recorder != null) {
metrics.recordMetric(recorder, invocations);
}
}

public void recordEntityGuidUpdate(AtlasEntity entity, String guidInRequest) {
recordEntityGuidUpdate(new EntityGuidPair(entity, guidInRequest));
}
Expand Down

0 comments on commit fb9a275

Please sign in to comment.