Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DG-2027 : Added metrics for number of mismatches found in a single taskFetch call #4119

Merged
merged 5 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions common/src/main/java/org/apache/atlas/utils/AtlasPerfMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,23 @@ public void recordMetric(MetricRecorder recorder) {
}
}

public void recordMetric(MetricRecorder recorder, long invocations) {
if (recorder != null) {
final String name = recorder.name;
final long timeTaken = recorder.getElapsedTime();
Metric metric = metrics.get(name);

if (metric == null) {
metric = new Metric(name);

metrics.put(name, metric);
}

metric.invocations += invocations;
metric.totalTimeMSecs += timeTaken;
}
}

public void clear() {
metrics.clear();
}
Expand Down Expand Up @@ -149,5 +166,8 @@ public void incrementInvocations() {
invocations++;
}

public void setInvocations(long invocations) {
this.invocations = invocations;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.atlas.RequestContext;
import org.apache.atlas.model.tasks.AtlasTask;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.service.metrics.MetricsRegistry;
import org.apache.atlas.service.redis.RedisService;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.utils.AtlasPerfTracer;
Expand Down Expand Up @@ -49,13 +50,14 @@ public class TaskExecutor {
private final ICuratorFactory curatorFactory;
private final boolean isActiveActiveHAEnabled;
private final String zkRoot;
private final MetricsRegistry metricRegistry;

private TaskQueueWatcher watcher;
private Thread watcherThread;
private RedisService redisService;

public TaskExecutor(TaskRegistry registry, Map<String, TaskFactory> taskTypeFactoryMap, TaskManagement.Statistics statistics,
ICuratorFactory curatorFactory, RedisService redisService, final String zkRoot, boolean isActiveActiveHAEnabled) {
ICuratorFactory curatorFactory, RedisService redisService, final String zkRoot, boolean isActiveActiveHAEnabled, MetricsRegistry metricsRegistry) {
this.taskExecutorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(TASK_NAME_FORMAT + Thread.currentThread().getName())
Expand All @@ -68,11 +70,12 @@ public TaskExecutor(TaskRegistry registry, Map<String, TaskFactory> taskTypeFact
this.redisService = redisService;
this.isActiveActiveHAEnabled = isActiveActiveHAEnabled;
this.zkRoot = zkRoot;
this.metricRegistry = metricsRegistry;
}

public Thread startWatcherThread() {

watcher = new TaskQueueWatcher(taskExecutorService, registry, taskTypeFactoryMap, statistics, curatorFactory, redisService, zkRoot, isActiveActiveHAEnabled);
watcher = new TaskQueueWatcher(taskExecutorService, registry, taskTypeFactoryMap, statistics, curatorFactory, redisService, zkRoot, isActiveActiveHAEnabled, metricRegistry);
watcherThread = new Thread(watcher);
watcherThread.start();
return watcherThread;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.model.tasks.AtlasTask;
import org.apache.atlas.service.Service;
import org.apache.atlas.service.metrics.MetricsRegistry;
import org.apache.atlas.service.redis.RedisService;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
Expand All @@ -45,6 +46,7 @@
@Order(7)
public class TaskManagement implements Service, ActiveStateChangeHandler {
private static final Logger LOG = LoggerFactory.getLogger(TaskManagement.class);
private final MetricsRegistry metricRegistry;

private TaskExecutor taskExecutor;
private final Configuration configuration;
Expand All @@ -61,19 +63,21 @@ public enum DeleteType {
}

@Inject
public TaskManagement(Configuration configuration, TaskRegistry taskRegistry, ICuratorFactory curatorFactory, RedisService redisService) {
public TaskManagement(Configuration configuration, TaskRegistry taskRegistry, ICuratorFactory curatorFactory, RedisService redisService, MetricsRegistry metricsRegistry) {
this.configuration = configuration;
this.registry = taskRegistry;
this.redisService = redisService;
this.statistics = new Statistics();
this.taskTypeFactoryMap = new HashMap<>();
this.curatorFactory = curatorFactory;
this.metricRegistry = metricsRegistry;
}

@VisibleForTesting
TaskManagement(Configuration configuration, TaskRegistry taskRegistry, TaskFactory taskFactory, ICuratorFactory curatorFactory, RedisService redisService) {
this.configuration = configuration;
this.registry = taskRegistry;
this.metricRegistry = null;
this.redisService = redisService;
this.statistics = new Statistics();
this.taskTypeFactoryMap = new HashMap<>();
Expand Down Expand Up @@ -251,7 +255,7 @@ private synchronized void startWatcherThread() {
if (this.taskExecutor == null) {
final boolean isActiveActiveHAEnabled = HAConfiguration.isActiveActiveHAEnabled(configuration);
final String zkRoot = HAConfiguration.getZookeeperProperties(configuration).getZkRoot();
this.taskExecutor = new TaskExecutor(registry, taskTypeFactoryMap, statistics, curatorFactory, redisService, zkRoot,isActiveActiveHAEnabled);
this.taskExecutor = new TaskExecutor(registry, taskTypeFactoryMap, statistics, curatorFactory, redisService, zkRoot,isActiveActiveHAEnabled, metricRegistry);
}

if (watcherThread == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.ICuratorFactory;
import org.apache.atlas.RequestContext;
import org.apache.atlas.model.tasks.AtlasTask;
import org.apache.atlas.service.metrics.MetricsRegistry;
import org.apache.atlas.service.redis.RedisService;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
Expand All @@ -41,6 +43,7 @@ public class TaskQueueWatcher implements Runnable {
private static final TaskExecutor.TaskLogger TASK_LOG = TaskExecutor.TaskLogger.getLogger();
private final String zkRoot;
private final boolean isActiveActiveHAEnabled;
private final MetricsRegistry metricRegistry;

private TaskRegistry registry;
private final ExecutorService executorService;
Expand All @@ -57,7 +60,7 @@ public class TaskQueueWatcher implements Runnable {

public TaskQueueWatcher(ExecutorService executorService, TaskRegistry registry,
Map<String, TaskFactory> taskTypeFactoryMap, TaskManagement.Statistics statistics,
ICuratorFactory curatorFactory, RedisService redisService, final String zkRoot, boolean isActiveActiveHAEnabled) {
ICuratorFactory curatorFactory, RedisService redisService, final String zkRoot, boolean isActiveActiveHAEnabled, MetricsRegistry metricsRegistry) {

this.registry = registry;
this.executorService = executorService;
Expand All @@ -67,6 +70,7 @@ public TaskQueueWatcher(ExecutorService executorService, TaskRegistry registry,
this.redisService = redisService;
this.zkRoot = zkRoot;
this.isActiveActiveHAEnabled = isActiveActiveHAEnabled;
this.metricRegistry = metricsRegistry;
}

public void shutdown() {
Expand All @@ -87,6 +91,8 @@ public void run() {
LOG.debug("TaskQueueWatcher: running {}:{}", Thread.currentThread().getName(), Thread.currentThread().getId());
}
while (shouldRun.get()) {
RequestContext requestContext = RequestContext.get();
requestContext.setMetricRegistry(this.metricRegistry);
TasksFetcher fetcher = new TasksFetcher(registry);
try {
if (!redisService.acquireDistributedLock(ATLAS_TASK_LOCK)) {
Expand Down Expand Up @@ -156,6 +162,8 @@ public void run() {
}

this.tasks = registry.getTasksForReQueue();
RequestContext requestContext = RequestContext.get();
requestContext.clearCache();
}

public List<AtlasTask> getTasks() {
Expand Down
33 changes: 27 additions & 6 deletions repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.atlas.repository.graphdb.DirectIndexQueryResult;
import org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchQuery;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.utils.AtlasMetricType;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
Expand All @@ -53,6 +54,7 @@
import java.util.List;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.LinkedHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand All @@ -67,6 +69,7 @@ public class TaskRegistry {
public static final int TASK_FETCH_BATCH_SIZE = 100;
public static final List<Map<String, Object>> SORT_ARRAY = Collections.singletonList(mapOf(Constants.TASK_CREATED_TIME, mapOf("order", "asc")));
public static final String JANUSGRAPH_VERTEX_INDEX = "janusgraph_vertex_index";
public static final String TASK_MISMATCH_TAG = "mismatchTask";

private AtlasGraph graph;
private TaskService taskService;
Expand Down Expand Up @@ -398,6 +401,7 @@ public List<AtlasTask> getTasksForReQueueIndexSearch() {
Map<String, Object> dsl = mapOf("query", mapOf("bool", mapOf("should", statusClauseList)));
dsl.put("sort", Collections.singletonList(mapOf(Constants.TASK_CREATED_TIME, mapOf("order", "asc"))));
dsl.put("size", size);
long mismatches = 0;
int totalFetched = 0;
while (true) {
int fetched = 0;
Expand Down Expand Up @@ -436,8 +440,14 @@ public List<AtlasTask> getTasksForReQueueIndexSearch() {
} else {
LOG.warn("Status mismatch for task with guid: {}. Expected PENDING/IN_PROGRESS but found: {}",
atlasTask.getGuid(), atlasTask.getStatus());
String docId = LongEncoding.encode(Long.parseLong(vertex.getIdForDisplay()));
repairMismatchedTask(atlasTask, docId);
mismatches++;
try {
String docId = LongEncoding.encode(Long.parseLong(vertex.getIdForDisplay()));
repairMismatchedTask(atlasTask, docId);
}
catch (Exception e){
e.printStackTrace();
}
}
} else {
LOG.warn("Null vertex while re-queuing tasks at index {}", fetched);
Expand All @@ -456,7 +466,14 @@ public List<AtlasTask> getTasksForReQueueIndexSearch() {
break;
}
}

if(mismatches > 0) {
AtlasPerfMetrics.Metric mismatchMetrics = new AtlasPerfMetrics.Metric(TASK_MISMATCH_TAG);
mismatchMetrics.setMetricType(AtlasMetricType.COUNTER);
mismatchMetrics.addTag("name", TASK_MISMATCH_TAG);
mismatchMetrics.setInvocations(mismatches);
mismatchMetrics.setTotalTimeMSecs(0);
RequestContext.get().addApplicationMetrics(mismatchMetrics);
}
return ret;
}

Expand All @@ -466,10 +483,14 @@ private void repairMismatchedTask(AtlasTask atlasTask, String docId) {
try {
// Create a map for the fields to be updated
Map<String, Object> fieldsToUpdate = new HashMap<>();
fieldsToUpdate.put("__task_endTime", atlasTask.getEndTime().getTime());
fieldsToUpdate.put("__task_timeTakenInSeconds", atlasTask.getTimeTakenInSeconds());
if(Objects.nonNull(atlasTask.getEndTime())) {
fieldsToUpdate.put("__task_endTime", atlasTask.getEndTime().getTime());
}
if(Objects.nonNull(atlasTask.getTimeTakenInSeconds())) {
fieldsToUpdate.put("__task_timeTakenInSeconds", atlasTask.getTimeTakenInSeconds());
}
fieldsToUpdate.put("__task_status", atlasTask.getStatus().toString());
fieldsToUpdate.put("__task_modificationTimestamp", atlasTask.getUpdatedTime().getTime()); // Set current timestamp
fieldsToUpdate.put("__task_modificationTimestamp", atlasTask.getUpdatedTime().getTime());

// Convert fieldsToUpdate map to JSON using Jackson
ObjectMapper objectMapper = new ObjectMapper();
Expand Down
6 changes: 6 additions & 0 deletions server-api/src/main/java/org/apache/atlas/RequestContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,12 @@ public void endMetricRecord(MetricRecorder recorder) {
}
}

public void endMetricRecord(MetricRecorder recorder,long invocations){
if (metrics != null && recorder != null) {
metrics.recordMetric(recorder, invocations);
}
}

public void recordEntityGuidUpdate(AtlasEntity entity, String guidInRequest) {
recordEntityGuidUpdate(new EntityGuidPair(entity, guidInRequest));
}
Expand Down
Loading