Skip to content

Commit

Permalink
Adds metrics for task states (#14785)
Browse files Browse the repository at this point in the history
* Adds metrics for task states

* Adds tests
  • Loading branch information
noob-se7en authored Jan 22, 2025
1 parent 1e21ffe commit e05ef7b
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ rules:
tableType: "$6"
partition: "$7"
# Gauges that accept the controller taskType
- pattern: "\"org\\.apache\\.pinot\\.common\\.metrics\"<type=\"ControllerMetrics\", name=\"pinot\\.controller\\.(numMinionTasksInProgress|numMinionSubtasksRunning|numMinionSubtasksWaiting|numMinionSubtasksError|numMinionSubtasksUnknown|percentMinionSubtasksInQueue|percentMinionSubtasksInError)\\.(\\w+)\"><>(\\w+)"
- pattern: "\"org\\.apache\\.pinot\\.common\\.metrics\"<type=\"ControllerMetrics\", name=\"pinot\\.controller\\.(numMinionTasksInProgress|numMinionSubtasksRunning|numMinionSubtasksWaiting|numMinionSubtasksError|numMinionSubtasksUnknown|numMinionSubtasksDropped|numMinionSubtasksTimedOut|numMinionSubtasksAborted|percentMinionSubtasksInQueue|percentMinionSubtasksInError)\\.(\\w+)\"><>(\\w+)"
name: "pinot_controller_$1_$3"
cache: true
labels:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,15 @@ public enum ControllerGauge implements AbstractMetrics.Gauge {
TIME_MS_SINCE_LAST_MINION_TASK_METADATA_UPDATE("TimeMsSinceLastMinionTaskMetadataUpdate", false),
TIME_MS_SINCE_LAST_SUCCESSFUL_MINION_TASK_GENERATION("TimeMsSinceLastSuccessfulMinionTaskGeneration", false),
LAST_MINION_TASK_GENERATION_ENCOUNTERS_ERROR("LastMinionTaskGenerationEncountersError", false),
// TODO: Unify below subtask metrics into a single metric with status label
NUM_MINION_TASKS_IN_PROGRESS("NumMinionTasksInProgress", true),
NUM_MINION_SUBTASKS_WAITING("NumMinionSubtasksWaiting", true),
NUM_MINION_SUBTASKS_RUNNING("NumMinionSubtasksRunning", true),
NUM_MINION_SUBTASKS_ERROR("NumMinionSubtasksError", true),
NUM_MINION_SUBTASKS_UNKNOWN("NumMinionSubtasksUnknown", true),
NUM_MINION_SUBTASKS_DROPPED("NumMinionSubtasksDropped", true),
NUM_MINION_SUBTASKS_TIMED_OUT("NumMinionSubtasksTimedOut", true),
NUM_MINION_SUBTASKS_ABORTED("NumMinionSubtasksAborted", true),
PERCENT_MINION_SUBTASKS_IN_QUEUE("PercentMinionSubtasksInQueue", true),
PERCENT_MINION_SUBTASKS_IN_ERROR("PercentMinionSubtasksInError", true),
TIER_BACKEND_TABLE_COUNT("TierBackendTableCount", true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public abstract class ControllerPrometheusMetricsTest extends PinotPrometheusMet
List.of(ControllerGauge.NUM_MINION_TASKS_IN_PROGRESS, ControllerGauge.NUM_MINION_SUBTASKS_RUNNING,
ControllerGauge.NUM_MINION_SUBTASKS_WAITING, ControllerGauge.NUM_MINION_SUBTASKS_ERROR,
ControllerGauge.NUM_MINION_SUBTASKS_UNKNOWN,
ControllerGauge.NUM_MINION_SUBTASKS_DROPPED,
ControllerGauge.NUM_MINION_SUBTASKS_TIMED_OUT,
ControllerGauge.NUM_MINION_SUBTASKS_ABORTED,
ControllerGauge.PERCENT_MINION_SUBTASKS_IN_QUEUE, ControllerGauge.PERCENT_MINION_SUBTASKS_IN_ERROR);

//local gauges that accept partition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1126,14 +1126,18 @@ public PinotTaskConfig getTaskConfig() {
}
}

@JsonPropertyOrder({"total", "completed", "running", "waiting", "error", "unknown"})
@JsonPropertyOrder({"total", "completed", "running", "waiting", "error", "unknown", "dropped", "timedOut", "aborted"})
public static class TaskCount {
private int _waiting; // Number of tasks waiting to be scheduled on minions
private int _error; // Number of tasks in error
private int _running; // Number of tasks currently running in minions
private int _completed; // Number of tasks completed normally
private int _unknown; // Number of tasks with all other states
private int _total; // Total number of tasks in the batch
private int _dropped; // Total number of tasks dropped
// (Task can be dropped due to no available assigned instance, etc.)
private int _timedOut; // Total number of tasks timed out
private int _aborted; // Total number of tasks aborted

public TaskCount() {
}
Expand All @@ -1157,6 +1161,15 @@ public void addTaskState(TaskPartitionState state) {
case COMPLETED:
_completed++;
break;
case DROPPED:
_dropped++;
break;
case TIMED_OUT:
_timedOut++;
break;
case TASK_ABORTED:
_aborted++;
break;
default:
_unknown++;
break;
Expand Down Expand Up @@ -1188,13 +1201,28 @@ public int getUnknown() {
return _unknown;
}

public int getDropped() {
return _dropped;
}

public int getTimedOut() {
return _timedOut;
}

public int getAborted() {
return _aborted;
}

public void accumulate(TaskCount other) {
_waiting += other.getWaiting();
_running += other.getRunning();
_error += other.getError();
_completed += other.getCompleted();
_unknown += other.getUnknown();
_total += other.getTotal();
_dropped += other.getDropped();
_timedOut += other.getTimedOut();
_aborted += other.getAborted();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ protected final void runTask(Properties periodicTaskProperties) {
taskTypeAccumulatedCount.getError());
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.NUM_MINION_SUBTASKS_UNKNOWN, taskType,
taskTypeAccumulatedCount.getUnknown());
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.NUM_MINION_SUBTASKS_DROPPED, taskType,
taskTypeAccumulatedCount.getDropped());
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.NUM_MINION_SUBTASKS_TIMED_OUT, taskType,
taskTypeAccumulatedCount.getTimedOut());
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.NUM_MINION_SUBTASKS_ABORTED, taskType,
taskTypeAccumulatedCount.getAborted());
int total = taskTypeAccumulatedCount.getTotal();
int percent = total != 0
? (taskTypeAccumulatedCount.getWaiting() + taskTypeAccumulatedCount.getRunning()) * 100 / total : 0;
Expand All @@ -133,6 +139,12 @@ protected final void runTask(Properties periodicTaskProperties) {
ControllerGauge.NUM_MINION_SUBTASKS_ERROR, taskCount.getError());
_controllerMetrics.setOrUpdateTableGauge(tableNameWithType, taskType,
ControllerGauge.NUM_MINION_SUBTASKS_UNKNOWN, taskCount.getUnknown());
_controllerMetrics.setOrUpdateTableGauge(tableNameWithType, taskType,
ControllerGauge.NUM_MINION_SUBTASKS_DROPPED, taskCount.getDropped());
_controllerMetrics.setOrUpdateTableGauge(tableNameWithType, taskType,
ControllerGauge.NUM_MINION_SUBTASKS_TIMED_OUT, taskCount.getTimedOut());
_controllerMetrics.setOrUpdateTableGauge(tableNameWithType, taskType,
ControllerGauge.NUM_MINION_SUBTASKS_ABORTED, taskCount.getAborted());
int tableTotal = taskCount.getTotal();
int tablePercent = tableTotal != 0 ? (taskCount.getWaiting() + taskCount.getRunning()) * 100 / tableTotal : 0;
_controllerMetrics.setOrUpdateTableGauge(tableNameWithType, taskType,
Expand Down Expand Up @@ -168,6 +180,9 @@ protected final void runTask(Properties periodicTaskProperties) {
_controllerMetrics.removeGlobalGauge(taskType, ControllerGauge.NUM_MINION_SUBTASKS_WAITING);
_controllerMetrics.removeGlobalGauge(taskType, ControllerGauge.NUM_MINION_SUBTASKS_ERROR);
_controllerMetrics.removeGlobalGauge(taskType, ControllerGauge.NUM_MINION_SUBTASKS_UNKNOWN);
_controllerMetrics.removeGlobalGauge(taskType, ControllerGauge.NUM_MINION_SUBTASKS_DROPPED);
_controllerMetrics.removeGlobalGauge(taskType, ControllerGauge.NUM_MINION_SUBTASKS_TIMED_OUT);
_controllerMetrics.removeGlobalGauge(taskType, ControllerGauge.NUM_MINION_SUBTASKS_ABORTED);
_controllerMetrics.removeGlobalGauge(taskType, ControllerGauge.PERCENT_MINION_SUBTASKS_IN_QUEUE);
_controllerMetrics.removeGlobalGauge(taskType, ControllerGauge.PERCENT_MINION_SUBTASKS_IN_ERROR);
// remove table task type level gauges
Expand Down Expand Up @@ -198,6 +213,9 @@ private void removeTableTaskTypeMetrics(Set<String> tableNameWithTypeSet, String
_controllerMetrics.removeTableGauge(tableNameWithType, taskType, ControllerGauge.NUM_MINION_SUBTASKS_WAITING);
_controllerMetrics.removeTableGauge(tableNameWithType, taskType, ControllerGauge.NUM_MINION_SUBTASKS_ERROR);
_controllerMetrics.removeTableGauge(tableNameWithType, taskType, ControllerGauge.NUM_MINION_SUBTASKS_UNKNOWN);
_controllerMetrics.removeTableGauge(tableNameWithType, taskType, ControllerGauge.NUM_MINION_SUBTASKS_DROPPED);
_controllerMetrics.removeTableGauge(tableNameWithType, taskType, ControllerGauge.NUM_MINION_SUBTASKS_TIMED_OUT);
_controllerMetrics.removeTableGauge(tableNameWithType, taskType, ControllerGauge.NUM_MINION_SUBTASKS_ABORTED);
_controllerMetrics.removeTableGauge(tableNameWithType, taskType,
ControllerGauge.PERCENT_MINION_SUBTASKS_IN_QUEUE);
_controllerMetrics.removeTableGauge(tableNameWithType, taskType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void taskType1ButNoInProgressTask() {
Mockito.when(_pinotHelixTaskResourceManager.getTasksInProgress(taskType)).thenReturn(ImmutableSet.of());
_taskMetricsEmitter.runTask(null);

Assert.assertEquals(metricsRegistry.allMetrics().size(), 8);
Assert.assertEquals(metricsRegistry.allMetrics().size(), 11);
Assert.assertTrue(metricsRegistry.allMetrics().containsKey(
new YammerMetricName(ControllerMetrics.class, "pinot.controller.onlineMinionInstances")));
Assert.assertEquals(((YammerSettableGauge<?>) metricsRegistry.allMetrics().get(
Expand All @@ -99,6 +99,18 @@ public void taskType1ButNoInProgressTask() {
Assert.assertEquals(((YammerSettableGauge<?>) metricsRegistry.allMetrics().get(
new YammerMetricName(ControllerMetrics.class, "pinot.controller.numMinionSubtasksError.taskType1"))
.getMetric()).value(), 0L);
Assert.assertEquals(((YammerSettableGauge<?>) metricsRegistry.allMetrics().get(
new YammerMetricName(ControllerMetrics.class, "pinot.controller.numMinionSubtasksUnknown.taskType1"))
.getMetric()).value(), 0L);
Assert.assertEquals(((YammerSettableGauge<?>) metricsRegistry.allMetrics().get(
new YammerMetricName(ControllerMetrics.class, "pinot.controller.numMinionSubtasksDropped.taskType1"))
.getMetric()).value(), 0L);
Assert.assertEquals(((YammerSettableGauge<?>) metricsRegistry.allMetrics().get(
new YammerMetricName(ControllerMetrics.class, "pinot.controller.numMinionSubtasksTimedOut.taskType1"))
.getMetric()).value(), 0L);
Assert.assertEquals(((YammerSettableGauge<?>) metricsRegistry.allMetrics().get(
new YammerMetricName(ControllerMetrics.class, "pinot.controller.numMinionSubtasksAborted.taskType1"))
.getMetric()).value(), 0L);
Assert.assertEquals(((YammerSettableGauge<?>) metricsRegistry.allMetrics().get(
new YammerMetricName(ControllerMetrics.class, "pinot.controller.percentMinionSubtasksInQueue.taskType1"))
.getMetric()).value(), 0L);
Expand Down Expand Up @@ -144,7 +156,7 @@ public void taskType1WithTwoTablesEmitMetricTwice() {
private void runAndAssertForTaskType1WithTwoTables() {
PinotMetricsRegistry metricsRegistry = _controllerMetrics.getMetricsRegistry();
_taskMetricsEmitter.runTask(null);
Assert.assertEquals(metricsRegistry.allMetrics().size(), 20);
Assert.assertEquals(metricsRegistry.allMetrics().size(), 29);

Assert.assertTrue(metricsRegistry.allMetrics().containsKey(
new YammerMetricName(ControllerMetrics.class, "pinot.controller.onlineMinionInstances")));
Expand All @@ -160,6 +172,9 @@ private void runAndAssertForTaskType1WithTwoTables() {
Assert.assertEquals(((YammerSettableGauge<?>) metricsRegistry.allMetrics().get(
new YammerMetricName(ControllerMetrics.class, "pinot.controller.numMinionSubtasksError.taskType1"))
.getMetric()).value(), 1L);
Assert.assertEquals(((YammerSettableGauge<?>) metricsRegistry.allMetrics().get(
new YammerMetricName(ControllerMetrics.class, "pinot.controller.numMinionSubtasksDropped.taskType1"))
.getMetric()).value(), 0L);
Assert.assertEquals(((YammerSettableGauge<?>) metricsRegistry.allMetrics().get(
new YammerMetricName(ControllerMetrics.class, "pinot.controller.percentMinionSubtasksInQueue.taskType1"))
.getMetric()).value(), 50L);
Expand All @@ -179,6 +194,10 @@ private void runAndAssertForTaskType1WithTwoTables() {
new YammerMetricName(ControllerMetrics.class,
"pinot.controller.numMinionSubtasksError.table1_OFFLINE.taskType1"))
.getMetric()).value(), 0L);
Assert.assertEquals(((YammerSettableGauge<?>) metricsRegistry.allMetrics().get(
new YammerMetricName(ControllerMetrics.class,
"pinot.controller.numMinionSubtasksDropped.table1_OFFLINE.taskType1"))
.getMetric()).value(), 0L);
Assert.assertEquals(((YammerSettableGauge<?>) metricsRegistry.allMetrics().get(
new YammerMetricName(ControllerMetrics.class,
"pinot.controller.percentMinionSubtasksInQueue.table1_OFFLINE.taskType1"))
Expand All @@ -200,6 +219,10 @@ private void runAndAssertForTaskType1WithTwoTables() {
new YammerMetricName(ControllerMetrics.class,
"pinot.controller.numMinionSubtasksError.table2_OFFLINE.taskType1"))
.getMetric()).value(), 1L);
Assert.assertEquals(((YammerSettableGauge<?>) metricsRegistry.allMetrics().get(
new YammerMetricName(ControllerMetrics.class,
"pinot.controller.numMinionSubtasksDropped.table2_OFFLINE.taskType1"))
.getMetric()).value(), 0L);
Assert.assertEquals(((YammerSettableGauge<?>) metricsRegistry.allMetrics().get(
new YammerMetricName(ControllerMetrics.class,
"pinot.controller.percentMinionSubtasksInQueue.table2_OFFLINE.taskType1"))
Expand Down Expand Up @@ -231,7 +254,7 @@ private void oneTaskTypeWithOneTable(String taskType, String taskName1, String t

PinotMetricsRegistry metricsRegistry = _controllerMetrics.getMetricsRegistry();
_taskMetricsEmitter.runTask(null);
Assert.assertEquals(metricsRegistry.allMetrics().size(), 14);
Assert.assertEquals(metricsRegistry.allMetrics().size(), 20);

Assert.assertTrue(metricsRegistry.allMetrics().containsKey(
new YammerMetricName(ControllerMetrics.class, "pinot.controller.onlineMinionInstances")));
Expand All @@ -251,6 +274,10 @@ private void oneTaskTypeWithOneTable(String taskType, String taskName1, String t
new YammerMetricName(ControllerMetrics.class,
String.format("pinot.controller.numMinionSubtasksError.%s", taskType)))
.getMetric()).value(), 0L);
Assert.assertEquals(((YammerSettableGauge<?>) metricsRegistry.allMetrics().get(
new YammerMetricName(ControllerMetrics.class,
String.format("pinot.controller.numMinionSubtasksDropped.%s", taskType)))
.getMetric()).value(), 0L);
Assert.assertEquals(((YammerSettableGauge<?>) metricsRegistry.allMetrics().get(
new YammerMetricName(ControllerMetrics.class,
String.format("pinot.controller.percentMinionSubtasksInQueue.%s", taskType)))
Expand All @@ -272,6 +299,10 @@ private void oneTaskTypeWithOneTable(String taskType, String taskName1, String t
new YammerMetricName(ControllerMetrics.class,
String.format("pinot.controller.numMinionSubtasksError.%s.%s", tableName, taskType)))
.getMetric()).value(), 0L);
Assert.assertEquals(((YammerSettableGauge<?>) metricsRegistry.allMetrics().get(
new YammerMetricName(ControllerMetrics.class,
String.format("pinot.controller.numMinionSubtasksDropped.%s.%s", tableName, taskType)))
.getMetric()).value(), 0L);
Assert.assertEquals(((YammerSettableGauge<?>) metricsRegistry.allMetrics().get(
new YammerMetricName(ControllerMetrics.class,
String.format("pinot.controller.percentMinionSubtasksInQueue.%s.%s", tableName, taskType)))
Expand Down

0 comments on commit e05ef7b

Please sign in to comment.