Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/upgrade.html
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ <h5><a id="upgrade_420_notable" href="#upgrade_420_notable">Notable changes in 4
<li>The behavior of <code>org.apache.kafka.streams.KafkaStreams#removeStreamThread</code> has been changed. The consumer has no longer remove once <code>removeStreamThread</code> finished.
Instead, consumer would be kicked off from the group after <code>org.apache.kafka.streams.processor.internals.StreamThread</code> completes its <code>run</code> function.
</li>
<li>TThe streams thread metrics <code>commit-ratio</code>, <code>process-ratio</code>, <code>punctuate-ratio</code>, and <code>poll-ratio</code> have been updated.
They now report, over a rolling measurement window,
the ratio of time this thread spends performing the given action (<code>{action}</code>) to the total elapsed time in that window.
The effective window duration is determined by your metrics configuration: <code>metrics.sample.window.ms</code> (per-sample window length)
and <code>metrics.num.samples</code> (number of rolling windows).
</li>
<li>
The support for MX4J library, enabled through <code>kafka_mx4jenable</code> system property, was deprecated and will be removed in Kafka 5.0.
</li>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.WindowedSum;
import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.LogContext;
Expand Down Expand Up @@ -375,6 +377,15 @@ public boolean isStartingRunningOrPartitionAssigned() {
private final boolean eosEnabled;
private final boolean processingThreadsEnabled;

private final WindowedSum pollLatencyWindowedSum = new WindowedSum();
private final WindowedSum totalCommitLatencyWindowedSum = new WindowedSum();
private final WindowedSum processLatencyWindowedSum = new WindowedSum();
private final WindowedSum punctuateLatencyWindowedSum = new WindowedSum();
private final WindowedSum runOnceLatencyWindowedSum = new WindowedSum();
private final MetricConfig metricsConfig;

private boolean latencyWindowsInitialized = false;

private volatile long fetchDeadlineClientInstanceId = -1;
private volatile KafkaFutureImpl<Uuid> mainConsumerInstanceIdFuture = new KafkaFutureImpl<>();
private volatile KafkaFutureImpl<Uuid> restoreConsumerInstanceIdFuture = new KafkaFutureImpl<>();
Expand Down Expand Up @@ -788,6 +799,7 @@ public StreamThread(final Time time,
this.shutdownErrorHook = shutdownErrorHook;
this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
this.cacheResizer = cacheResizer;
this.metricsConfig = streamsMetrics.metricsRegistry().config();

// The following sensors are created here but their references are not stored in this object, since within
// this object they are not recorded. The sensors are created here so that the stream threads starts with all
Expand Down Expand Up @@ -887,6 +899,7 @@ public void run() {
boolean cleanRun = false;
try {
taskManager.init();
initLatencyWindowsIfNeeded(System.currentTimeMillis());
cleanRun = runLoop();
} catch (final Throwable e) {
failedStreamThreadSensor.record();
Expand Down Expand Up @@ -1275,11 +1288,13 @@ void runOnceWithoutProcessingThreads() {

now = time.milliseconds();
final long runOnceLatency = now - startMs;

recordWindowedSum(now, pollLatency, totalCommitLatency, totalProcessLatency, totalPunctuateLatency, runOnceLatency);
recordRatio(now, pollLatencyWindowedSum, pollRatioSensor);
recordRatio(now, totalCommitLatencyWindowedSum, commitRatioSensor);
recordRatio(now, processLatencyWindowedSum, processRatioSensor);
recordRatio(now, punctuateLatencyWindowedSum, punctuateRatioSensor);
processRecordsSensor.record(totalProcessed, now);
processRatioSensor.record((double) totalProcessLatency / runOnceLatency, now);
punctuateRatioSensor.record((double) totalPunctuateLatency / runOnceLatency, now);
pollRatioSensor.record((double) pollLatency / runOnceLatency, now);
commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, now);

final long timeSinceLastLog = now - lastLogSummaryMs;
if (logSummaryIntervalMs > 0 && timeSinceLastLog > logSummaryIntervalMs) {
Expand Down Expand Up @@ -1362,8 +1377,10 @@ void runOnceWithProcessingThreads() {

now = time.milliseconds();
final long runOnceLatency = now - startMs;
pollRatioSensor.record((double) pollLatency / runOnceLatency, now);
commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, now);

recordWindowedSum(now, pollLatency, totalCommitLatency, 0, 0, runOnceLatency);
recordRatio(now, pollLatencyWindowedSum, pollRatioSensor);
recordRatio(now, totalCommitLatencyWindowedSum, commitRatioSensor);

if (logSummaryIntervalMs > 0 && now - lastLogSummaryMs > logSummaryIntervalMs) {
log.info("Committed {} total tasks since the last update", totalCommittedSinceLastSummary);
Expand Down Expand Up @@ -2103,4 +2120,46 @@ Admin adminClient() {
Optional<StreamsRebalanceData> streamsRebalanceData() {
return streamsRebalanceData;
}

/**
* Initialize both WindowedSum instances at exactly the same timestamp so
* their windows are aligned from the very beginning.
*/
private void initLatencyWindowsIfNeeded(final long now) {
if (!latencyWindowsInitialized) {
// Start both windows at the same instant with a zero record
pollLatencyWindowedSum.record(metricsConfig, 0.0, now);
this.totalCommitLatencyWindowedSum.record(metricsConfig, 0, now);
this.processLatencyWindowedSum.record(metricsConfig, 0, now);
this.punctuateLatencyWindowedSum.record(metricsConfig, 0, now);
runOnceLatencyWindowedSum.record(metricsConfig, 0.0, now);
latencyWindowsInitialized = true;
}
}

private void recordWindowedSum(final long now,
final double pollLatency,
final double totalCommitLatency,
final double processLatency,
final double punctuateLatency,
final double runOnceLatency) {
this.pollLatencyWindowedSum.record(metricsConfig, pollLatency, now);
this.totalCommitLatencyWindowedSum.record(metricsConfig, totalCommitLatency, now);
this.processLatencyWindowedSum.record(metricsConfig, processLatency, now);
this.punctuateLatencyWindowedSum.record(metricsConfig, punctuateLatency, now);
this.runOnceLatencyWindowedSum.record(metricsConfig, runOnceLatency, now);
}

private void recordRatio(final long now, final WindowedSum windowedSum, final Sensor ratioSensor) {
final double runOnceLatencyWindow =
runOnceLatencyWindowedSum.measure(metricsConfig, now);

if (runOnceLatencyWindow > 0.0) {
final double latencyWindow =
windowedSum.measure(metricsConfig, now);
ratioSensor.record(latencyWindow / runOnceLatencyWindow);
Comment on lines +2158 to +2160

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final double latencyWindow =
windowedSum.measure(metricsConfig, now);
ratioSensor.record(latencyWindow / runOnceLatencyWindow);
ratioSensor.record(windowedSum.measure(metricsConfig, now) / runOnceLatencyWindow);

} else {
ratioSensor.record(0.0, now);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,17 @@ private ThreadMetrics() {}
private static final String PUNCTUATE_AVG_LATENCY_DESCRIPTION = "The average punctuate latency";
private static final String PUNCTUATE_MAX_LATENCY_DESCRIPTION = "The maximum punctuate latency";
private static final String PROCESS_RATIO_DESCRIPTION =
"The fraction of time the thread spent on processing active tasks";
"The ratio, over a rolling measurement window, of the time this thread spent " +
"processing active tasks to the total elapsed time in that window.";
private static final String PUNCTUATE_RATIO_DESCRIPTION =
"The fraction of time the thread spent on punctuating active tasks";
"The ratio, over a rolling measurement window, of the time this thread spent " +
"punctuating active tasks to the total elapsed time in that window.";
private static final String POLL_RATIO_DESCRIPTION =
"The fraction of time the thread spent on polling records from consumer";
"The ratio, over a rolling measurement window, of the time this thread spent " +
"polling records from the consumer to the total elapsed time in that window.";
private static final String COMMIT_RATIO_DESCRIPTION =
"The fraction of time the thread spent on committing all tasks";
"The ratio, over a rolling measurement window, of the time this thread spent " +
"committing all tasks to the total elapsed time in that window.";
private static final String BLOCKED_TIME_DESCRIPTION =
"The total time the thread spent blocked on kafka in nanoseconds";
private static final String THREAD_START_TIME_DESCRIPTION =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public class ThreadMetricsTest {
@Test
public void shouldGetProcessRatioSensor() {
final String operation = "process-ratio";
final String ratioDescription = "The fraction of time the thread spent on processing active tasks";
final String ratioDescription = "The ratio, over a rolling measurement window, of the time this thread spent " +
"processing active tasks to the total elapsed time in that window.";
when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor);
when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap);

Expand Down Expand Up @@ -148,7 +149,8 @@ public void shouldGetProcessRateSensor() {
@Test
public void shouldGetPollRatioSensor() {
final String operation = "poll-ratio";
final String ratioDescription = "The fraction of time the thread spent on polling records from consumer";
final String ratioDescription = "The ratio, over a rolling measurement window, of the time this thread " +
"spent polling records from the consumer to the total elapsed time in that window.";
when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor);
when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap);

Expand Down Expand Up @@ -268,7 +270,8 @@ public void shouldGetCommitSensor() {
@Test
public void shouldGetCommitRatioSensor() {
final String operation = "commit-ratio";
final String ratioDescription = "The fraction of time the thread spent on committing all tasks";
final String ratioDescription = "The ratio, over a rolling measurement window, of the time this thread spent " +
"committing all tasks to the total elapsed time in that window.";
when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor);
when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap);

Expand Down Expand Up @@ -327,7 +330,8 @@ public void shouldGetPunctuateSensor() {
@Test
public void shouldGetPunctuateRatioSensor() {
final String operation = "punctuate-ratio";
final String ratioDescription = "The fraction of time the thread spent on punctuating active tasks";
final String ratioDescription = "The ratio, over a rolling measurement window, of the time this thread spent " +
"punctuating active tasks to the total elapsed time in that window.";
when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor);
when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap);

Expand Down