<
Since 2.6.0 release, Kafka Streams depends on a RocksDB version that requires MacOS 10.14 or higher.
+
+
+ The streams thread metrics commit-ratio, process-ratio, punctuate-ratio, and poll-ratio have been updated.
+ They now report, over a rolling measurement window,
+ the ratio of time this thread spends performing the given action ({action}) to the total elapsed time in that window.
+ The effective window duration is determined by the metrics configuration: metrics.sample.window.ms (per-sample window length)
+ and metrics.num.samples (number of rolling windows).
+
+
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 405ea720b6ecf..ba87bcd463840 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -42,7 +42,9 @@
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.WindowedSum;
import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.LogContext;
@@ -375,6 +377,15 @@ public boolean isStartingRunningOrPartitionAssigned() {
private final boolean eosEnabled;
private final boolean processingThreadsEnabled;
+ private final WindowedSum pollLatencyWindowedSum = new WindowedSum();
+ private final WindowedSum totalCommitLatencyWindowedSum = new WindowedSum();
+ private final WindowedSum processLatencyWindowedSum = new WindowedSum();
+ private final WindowedSum punctuateLatencyWindowedSum = new WindowedSum();
+ private final WindowedSum runOnceLatencyWindowedSum = new WindowedSum();
+ private final MetricConfig metricsConfig;
+
+ private boolean latencyWindowsInitialized = false;
+
private volatile long fetchDeadlineClientInstanceId = -1;
private volatile KafkaFutureImpl mainConsumerInstanceIdFuture = new KafkaFutureImpl<>();
private volatile KafkaFutureImpl restoreConsumerInstanceIdFuture = new KafkaFutureImpl<>();
@@ -788,6 +799,7 @@ public StreamThread(final Time time,
this.shutdownErrorHook = shutdownErrorHook;
this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
this.cacheResizer = cacheResizer;
+ this.metricsConfig = streamsMetrics.metricsRegistry().config();
// The following sensors are created here but their references are not stored in this object, since within
// this object they are not recorded. The sensors are created here so that the stream threads starts with all
@@ -887,6 +899,7 @@ public void run() {
boolean cleanRun = false;
try {
taskManager.init();
+ initLatencyWindowsIfNeeded(time.milliseconds());
cleanRun = runLoop();
} catch (final Throwable e) {
failedStreamThreadSensor.record();
@@ -1275,11 +1288,13 @@ void runOnceWithoutProcessingThreads() {
now = time.milliseconds();
final long runOnceLatency = now - startMs;
+
+ recordWindowedSum(now, pollLatency, totalCommitLatency, totalProcessLatency, totalPunctuateLatency, runOnceLatency);
+ recordRatio(now, pollLatencyWindowedSum, pollRatioSensor);
+ recordRatio(now, totalCommitLatencyWindowedSum, commitRatioSensor);
+ recordRatio(now, processLatencyWindowedSum, processRatioSensor);
+ recordRatio(now, punctuateLatencyWindowedSum, punctuateRatioSensor);
processRecordsSensor.record(totalProcessed, now);
- processRatioSensor.record((double) totalProcessLatency / runOnceLatency, now);
- punctuateRatioSensor.record((double) totalPunctuateLatency / runOnceLatency, now);
- pollRatioSensor.record((double) pollLatency / runOnceLatency, now);
- commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, now);
final long timeSinceLastLog = now - lastLogSummaryMs;
if (logSummaryIntervalMs > 0 && timeSinceLastLog > logSummaryIntervalMs) {
@@ -1362,8 +1377,10 @@ void runOnceWithProcessingThreads() {
now = time.milliseconds();
final long runOnceLatency = now - startMs;
- pollRatioSensor.record((double) pollLatency / runOnceLatency, now);
- commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, now);
+
+ recordWindowedSum(now, pollLatency, totalCommitLatency, 0, 0, runOnceLatency);
+ recordRatio(now, pollLatencyWindowedSum, pollRatioSensor);
+ recordRatio(now, totalCommitLatencyWindowedSum, commitRatioSensor);
if (logSummaryIntervalMs > 0 && now - lastLogSummaryMs > logSummaryIntervalMs) {
log.info("Committed {} total tasks since the last update", totalCommittedSinceLastSummary);
@@ -2103,4 +2120,46 @@ Admin adminClient() {
Optional streamsRebalanceData() {
return streamsRebalanceData;
}
+
+ /**
+ * Initialize both WindowedSum instances at exactly the same timestamp so
+ * their windows are aligned from the very beginning.
+ */
+ private void initLatencyWindowsIfNeeded(final long now) {
+ if (!latencyWindowsInitialized) {
+ // Start both windows at the same instant with a zero record
+ pollLatencyWindowedSum.record(metricsConfig, 0.0, now);
+ totalCommitLatencyWindowedSum.record(metricsConfig, 0, now);
+ processLatencyWindowedSum.record(metricsConfig, 0, now);
+ punctuateLatencyWindowedSum.record(metricsConfig, 0, now);
+ runOnceLatencyWindowedSum.record(metricsConfig, 0.0, now);
+ latencyWindowsInitialized = true;
+ }
+ }
+
+ private void recordWindowedSum(final long now,
+ final double pollLatency,
+ final double totalCommitLatency,
+ final double processLatency,
+ final double punctuateLatency,
+ final double runOnceLatency) {
+ pollLatencyWindowedSum.record(metricsConfig, pollLatency, now);
+ totalCommitLatencyWindowedSum.record(metricsConfig, totalCommitLatency, now);
+ processLatencyWindowedSum.record(metricsConfig, processLatency, now);
+ punctuateLatencyWindowedSum.record(metricsConfig, punctuateLatency, now);
+ runOnceLatencyWindowedSum.record(metricsConfig, runOnceLatency, now);
+ }
+
+ private void recordRatio(final long now, final WindowedSum windowedSum, final Sensor ratioSensor) {
+ final double runOnceLatencyWindow =
+ runOnceLatencyWindowedSum.measure(metricsConfig, now);
+
+ if (runOnceLatencyWindow > 0.0) {
+ final double latencyWindow =
+ windowedSum.measure(metricsConfig, now);
+ ratioSensor.record(latencyWindow / runOnceLatencyWindow);
+ } else {
+ ratioSensor.record(0.0, now);
+ }
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
index b45bde5ddd366..73fa53ebcdd04 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
@@ -82,13 +82,17 @@ private ThreadMetrics() {}
private static final String PUNCTUATE_AVG_LATENCY_DESCRIPTION = "The average punctuate latency";
private static final String PUNCTUATE_MAX_LATENCY_DESCRIPTION = "The maximum punctuate latency";
private static final String PROCESS_RATIO_DESCRIPTION =
- "The fraction of time the thread spent on processing active tasks";
+ "The ratio, over a rolling measurement window, of the time this thread spent " +
+ "processing active tasks to the total elapsed time in that window.";
private static final String PUNCTUATE_RATIO_DESCRIPTION =
- "The fraction of time the thread spent on punctuating active tasks";
+ "The ratio, over a rolling measurement window, of the time this thread spent " +
+ "punctuating active tasks to the total elapsed time in that window.";
private static final String POLL_RATIO_DESCRIPTION =
- "The fraction of time the thread spent on polling records from consumer";
+ "The ratio, over a rolling measurement window, of the time this thread spent " +
+ "polling records from the consumer to the total elapsed time in that window.";
private static final String COMMIT_RATIO_DESCRIPTION =
- "The fraction of time the thread spent on committing all tasks";
+ "The ratio, over a rolling measurement window, of the time this thread spent " +
+ "committing all tasks to the total elapsed time in that window.";
private static final String BLOCKED_TIME_DESCRIPTION =
"The total time the thread spent blocked on kafka in nanoseconds";
private static final String THREAD_START_TIME_DESCRIPTION =
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
index 87891ab389730..63a3512049f12 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
@@ -53,7 +53,8 @@ public class ThreadMetricsTest {
@Test
public void shouldGetProcessRatioSensor() {
final String operation = "process-ratio";
- final String ratioDescription = "The fraction of time the thread spent on processing active tasks";
+ final String ratioDescription = "The ratio, over a rolling measurement window, of the time this thread spent " +
+ "processing active tasks to the total elapsed time in that window.";
when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor);
when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap);
@@ -148,7 +149,8 @@ public void shouldGetProcessRateSensor() {
@Test
public void shouldGetPollRatioSensor() {
final String operation = "poll-ratio";
- final String ratioDescription = "The fraction of time the thread spent on polling records from consumer";
+ final String ratioDescription = "The ratio, over a rolling measurement window, of the time this thread " +
+ "spent polling records from the consumer to the total elapsed time in that window.";
when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor);
when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap);
@@ -268,7 +270,8 @@ public void shouldGetCommitSensor() {
@Test
public void shouldGetCommitRatioSensor() {
final String operation = "commit-ratio";
- final String ratioDescription = "The fraction of time the thread spent on committing all tasks";
+ final String ratioDescription = "The ratio, over a rolling measurement window, of the time this thread spent " +
+ "committing all tasks to the total elapsed time in that window.";
when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor);
when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap);
@@ -327,7 +330,8 @@ public void shouldGetPunctuateSensor() {
@Test
public void shouldGetPunctuateRatioSensor() {
final String operation = "punctuate-ratio";
- final String ratioDescription = "The fraction of time the thread spent on punctuating active tasks";
+ final String ratioDescription = "The ratio, over a rolling measurement window, of the time this thread spent " +
+ "punctuating active tasks to the total elapsed time in that window.";
when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor);
when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap);