-
Notifications
You must be signed in to change notification settings - Fork 14.9k
KAFKA-18369: State updater's *-ratio metrics are incorrect #21201
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
d679df3
cd124a4
04467dc
3d1ab86
b3887d3
9f3107b
dcf8af0
b3d8b69
ba28acd
c8e6b5b
bcc1cf2
e14379d
dab548b
713b125
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,12 +22,14 @@ | |
| import org.apache.kafka.common.Uuid; | ||
| import org.apache.kafka.common.errors.TimeoutException; | ||
| import org.apache.kafka.common.internals.KafkaFutureImpl; | ||
| import org.apache.kafka.common.metrics.MetricConfig; | ||
| import org.apache.kafka.common.metrics.Metrics; | ||
| import org.apache.kafka.common.metrics.Sensor; | ||
| import org.apache.kafka.common.metrics.Sensor.RecordingLevel; | ||
| import org.apache.kafka.common.metrics.stats.Avg; | ||
| import org.apache.kafka.common.metrics.stats.Rate; | ||
| import org.apache.kafka.common.metrics.stats.Value; | ||
| import org.apache.kafka.common.metrics.stats.WindowedCount; | ||
| import org.apache.kafka.common.metrics.stats.WindowedSum; | ||
| import org.apache.kafka.common.utils.LogContext; | ||
| import org.apache.kafka.common.utils.Time; | ||
| import org.apache.kafka.streams.StreamsConfig; | ||
|
|
@@ -67,8 +69,9 @@ | |
| import java.util.stream.Stream; | ||
|
|
||
| import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_DESCRIPTION; | ||
| import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATIO_DESCRIPTION; | ||
| import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_ID_TAG; | ||
| import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_TIME_UNIT_DESCRIPTION; | ||
| import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.WINDOWED_RATIO_DESCRIPTION_PREFIX; | ||
|
|
||
| public class DefaultStateUpdater implements StateUpdater { | ||
|
|
||
|
|
@@ -84,6 +87,14 @@ private class StateUpdaterThread extends Thread { | |
| private final Map<TaskId, Task> updatingTasks = new ConcurrentHashMap<>(); | ||
| private final Map<TaskId, Task> pausedTasks = new ConcurrentHashMap<>(); | ||
|
|
||
| private final WindowedSum idleTimeWindowedSum = new WindowedSum(); | ||
| private final WindowedSum checkpointTimeWindowedSum = new WindowedSum(); | ||
| private final WindowedSum activeRestoreTimeWindowedSum = new WindowedSum(); | ||
| private final WindowedSum standbyRestoreTimeWindowedSum = new WindowedSum(); | ||
| private final MetricConfig metricsConfig; | ||
|
|
||
| private boolean timeWindowInitialized = false; | ||
|
|
||
| private long totalCheckpointLatency = 0L; | ||
|
|
||
| private volatile long fetchDeadlineClientInstanceId = -1L; | ||
|
|
@@ -95,6 +106,7 @@ public StateUpdaterThread(final String name, | |
| super(name); | ||
| this.changelogReader = changelogReader; | ||
| this.updaterMetrics = new StateUpdaterMetrics(metrics, name); | ||
| this.metricsConfig = metrics.metricsRegistry().config(); | ||
| } | ||
|
|
||
| public Collection<Task> updatingTasks() { | ||
|
|
@@ -144,6 +156,7 @@ public long numPausedActiveTasks() { | |
| public void run() { | ||
| log.info("State updater thread started"); | ||
| try { | ||
| initTimeWindowIfNeeded(time.milliseconds()); | ||
| while (isRunning.get()) { | ||
| runOnce(); | ||
| } | ||
|
|
@@ -713,19 +726,63 @@ private void measureCheckpointLatency(final Runnable actionToMeasure) { | |
| private void recordMetrics(final long now, final long totalLatency, final long totalWaitLatency) { | ||
| final long totalRestoreLatency = Math.max(0L, totalLatency - totalWaitLatency - totalCheckpointLatency); | ||
|
|
||
| updaterMetrics.idleRatioSensor.record((double) totalWaitLatency / totalLatency, now); | ||
| updaterMetrics.checkpointRatioSensor.record((double) totalCheckpointLatency / totalLatency, now); | ||
| recordWindowedSum( | ||
| now, | ||
| (double) totalWaitLatency, | ||
|
||
| (double) totalCheckpointLatency, | ||
| (double) totalRestoreLatency * (changelogReader.isRestoringActive() ? 1.0d : 0.0d), | ||
| (double) totalRestoreLatency * (changelogReader.isRestoringActive() ? 0.0d : 1.0d) | ||
| ); | ||
|
|
||
| if (changelogReader.isRestoringActive()) { | ||
| updaterMetrics.activeRestoreRatioSensor.record((double) totalRestoreLatency / totalLatency, now); | ||
| updaterMetrics.standbyRestoreRatioSensor.record(0.0d, now); | ||
| } else { | ||
| updaterMetrics.standbyRestoreRatioSensor.record((double) totalRestoreLatency / totalLatency, now); | ||
| updaterMetrics.activeRestoreRatioSensor.record(0.0d, now); | ||
| } | ||
| recordRatios(now, totalLatency); | ||
|
|
||
| totalCheckpointLatency = 0L; | ||
| } | ||
|
|
||
| private void initTimeWindowIfNeeded(final long now) { | ||
| if (!timeWindowInitialized) { | ||
| idleTimeWindowedSum.record(metricsConfig, 0.0, now); | ||
| checkpointTimeWindowedSum.record(metricsConfig, 0.0, now); | ||
| activeRestoreTimeWindowedSum.record(metricsConfig, 0.0, now); | ||
| standbyRestoreTimeWindowedSum.record(metricsConfig, 0.0, now); | ||
| timeWindowInitialized = true; | ||
| } | ||
| } | ||
|
|
||
| private void recordWindowedSum(final long now, | ||
| final double idleTime, | ||
| final double checkpointTime, | ||
| final double activeRestoreTime, | ||
| final double standbyRestoreTime) { | ||
| idleTimeWindowedSum.record(metricsConfig, idleTime, now); | ||
| checkpointTimeWindowedSum.record(metricsConfig, checkpointTime, now); | ||
| activeRestoreTimeWindowedSum.record(metricsConfig, activeRestoreTime, now); | ||
| standbyRestoreTimeWindowedSum.record(metricsConfig, standbyRestoreTime, now); | ||
| } | ||
|
|
||
| private void recordRatios(final long now, final long totalTime) { | ||
| final double idleTime = idleTimeWindowedSum.measure(metricsConfig, now); | ||
| final double checkpointTime = checkpointTimeWindowedSum.measure(metricsConfig, now); | ||
| final double activeRestoreTime = activeRestoreTimeWindowedSum.measure(metricsConfig, now); | ||
| final double standbyRestoreTime = standbyRestoreTimeWindowedSum.measure(metricsConfig, now); | ||
|
|
||
| recordRatio(now, totalTime, idleTime, updaterMetrics.idleRatioSensor); | ||
| recordRatio(now, totalTime, checkpointTime, updaterMetrics.checkpointRatioSensor); | ||
| recordRatio(now, totalTime, activeRestoreTime, updaterMetrics.activeRestoreRatioSensor); | ||
| recordRatio(now, totalTime, standbyRestoreTime, updaterMetrics.standbyRestoreRatioSensor); | ||
| } | ||
|
|
||
| private void recordRatio(final long now, | ||
| final double totalTime, | ||
| final double elapsedTime, | ||
| final Sensor ratioSensor) { | ||
| if (totalTime > 0.0) { | ||
| ratioSensor.record(elapsedTime / totalTime, now); | ||
| } else { | ||
| ratioSensor.record(0.0, now); | ||
| } | ||
| } | ||
|
|
||
| } | ||
|
|
||
| private final Time time; | ||
|
|
@@ -1035,10 +1092,10 @@ private Stream<Task> streamOfNonPausedTasks() { | |
| private class StateUpdaterMetrics { | ||
| private static final String STATE_LEVEL_GROUP = "stream-state-updater-metrics"; | ||
|
|
||
| private static final String IDLE_RATIO_DESCRIPTION = RATIO_DESCRIPTION + "being idle"; | ||
| private static final String RESTORE_RATIO_DESCRIPTION = RATIO_DESCRIPTION + "restoring active tasks"; | ||
| private static final String UPDATE_RATIO_DESCRIPTION = RATIO_DESCRIPTION + "updating standby tasks"; | ||
| private static final String CHECKPOINT_RATIO_DESCRIPTION = RATIO_DESCRIPTION + "checkpointing tasks restored progress"; | ||
| private static final String IDLE_RATIO_DESCRIPTION = WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION + "being idle"; | ||
|
||
| private static final String RESTORE_RATIO_DESCRIPTION = WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION + "restoring active tasks"; | ||
| private static final String UPDATE_RATIO_DESCRIPTION = WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION + "updating standby tasks"; | ||
| private static final String CHECKPOINT_RATIO_DESCRIPTION = WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION + "checkpointing tasks restored progress"; | ||
| private static final String RESTORE_RECORDS_RATE_DESCRIPTION = RATE_DESCRIPTION + "records restored"; | ||
| private static final String RESTORE_RATE_DESCRIPTION = RATE_DESCRIPTION + "restore calls triggered"; | ||
|
|
||
|
|
@@ -1089,19 +1146,19 @@ private StateUpdaterMetrics(final StreamsMetricsImpl metrics, final String threa | |
| allMetricNames.push(metricName); | ||
|
|
||
| this.idleRatioSensor = metrics.threadLevelSensor(threadId, "idle-ratio", RecordingLevel.INFO); | ||
| this.idleRatioSensor.add(new MetricName("idle-ratio", STATE_LEVEL_GROUP, IDLE_RATIO_DESCRIPTION, threadLevelTags), new Avg()); | ||
| this.idleRatioSensor.add(new MetricName("idle-ratio", STATE_LEVEL_GROUP, IDLE_RATIO_DESCRIPTION, threadLevelTags), new Value()); | ||
| allSensors.add(this.idleRatioSensor); | ||
|
|
||
| this.activeRestoreRatioSensor = metrics.threadLevelSensor(threadId, "active-restore-ratio", RecordingLevel.INFO); | ||
| this.activeRestoreRatioSensor.add(new MetricName("active-restore-ratio", STATE_LEVEL_GROUP, RESTORE_RATIO_DESCRIPTION, threadLevelTags), new Avg()); | ||
| this.activeRestoreRatioSensor.add(new MetricName("active-restore-ratio", STATE_LEVEL_GROUP, RESTORE_RATIO_DESCRIPTION, threadLevelTags), new Value()); | ||
| allSensors.add(this.activeRestoreRatioSensor); | ||
|
|
||
| this.standbyRestoreRatioSensor = metrics.threadLevelSensor(threadId, "standby-update-ratio", RecordingLevel.INFO); | ||
| this.standbyRestoreRatioSensor.add(new MetricName("standby-update-ratio", STATE_LEVEL_GROUP, UPDATE_RATIO_DESCRIPTION, threadLevelTags), new Avg()); | ||
| this.standbyRestoreRatioSensor.add(new MetricName("standby-update-ratio", STATE_LEVEL_GROUP, UPDATE_RATIO_DESCRIPTION, threadLevelTags), new Value()); | ||
| allSensors.add(this.standbyRestoreRatioSensor); | ||
|
|
||
| this.checkpointRatioSensor = metrics.threadLevelSensor(threadId, "checkpoint-ratio", RecordingLevel.INFO); | ||
| this.checkpointRatioSensor.add(new MetricName("checkpoint-ratio", STATE_LEVEL_GROUP, CHECKPOINT_RATIO_DESCRIPTION, threadLevelTags), new Avg()); | ||
| this.checkpointRatioSensor.add(new MetricName("checkpoint-ratio", STATE_LEVEL_GROUP, CHECKPOINT_RATIO_DESCRIPTION, threadLevelTags), new Value()); | ||
| allSensors.add(this.checkpointRatioSensor); | ||
|
|
||
| this.restoreSensor = metrics.threadLevelSensor(threadId, "restore-records", RecordingLevel.INFO); | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -150,12 +150,13 @@ public int hashCode() { | |||||
| public static final String OPERATIONS = " operations"; | ||||||
| public static final String TOTAL_DESCRIPTION = "The total number of "; | ||||||
| public static final String RATE_DESCRIPTION = "The average per-second number of "; | ||||||
| public static final String RATIO_DESCRIPTION = "The fraction of time the thread spent on "; | ||||||
| public static final String AVG_LATENCY_DESCRIPTION = "The average latency of "; | ||||||
| public static final String MAX_LATENCY_DESCRIPTION = "The maximum latency of "; | ||||||
| public static final String LATENCY_DESCRIPTION_SUFFIX = " in milliseconds"; | ||||||
| public static final String RATE_DESCRIPTION_PREFIX = "The average number of "; | ||||||
| public static final String RATE_DESCRIPTION_SUFFIX = " per second"; | ||||||
| public static final String WINDOWED_RATIO_DESCRIPTION_PREFIX = "The ratio, over a rolling measurement window, "; | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
is it only used like this?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right now this prefix consts is only used for thread. But it could be used for describing against something other units in the future.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good, it’s your call, but I’d keep it simple and let whoever comes next make further adjustments if they’re ever needed. |
||||||
| public static final String THREAD_TIME_UNIT_DESCRIPTION = "of the time this thread spent "; | ||||||
|
|
||||||
| public static final String RECORD_E2E_LATENCY = "record-e2e-latency"; | ||||||
| public static final String RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX = | ||||||
|
|
||||||
Uh oh!
There was an error while loading. Please reload this page.