Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/streams/upgrade-guide.html
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,15 @@ <h3 class="anchor-heading"><a id="streams_notable_changes" class="anchor-link"><

<p>Since 2.6.0 release, Kafka Streams depends on a RocksDB version that requires MacOS 10.14 or higher.</p>

<h3><a id="streams_api_changes_430" href="#streams_api_changes_430">Streams API changes in 4.3.0</a></h3>

<p>The streams thread metrics <code>commit-ratio</code>, <code>process-ratio</code>, <code>punctuate-ratio</code>, and <code>poll-ratio</code> have been updated.
They now report, over a rolling measurement window,
the ratio of time this thread spends performing the given action (<code>{action}</code>) to the total elapsed time in that window.
The effective window duration is determined by the metrics configuration: <code>metrics.sample.window.ms</code> (per-sample window length)
and <code>metrics.num.samples</code> (number of rolling windows).
</p>

<h3><a id="streams_api_changes_420" href="#streams_api_changes_420">Streams API changes in 4.2.0</a></h3>

<p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.WindowedSum;
import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.LogContext;
Expand Down Expand Up @@ -375,6 +377,15 @@ public boolean isStartingRunningOrPartitionAssigned() {
private final boolean eosEnabled;
private final boolean processingThreadsEnabled;

private final WindowedSum pollLatencyWindowedSum = new WindowedSum();
private final WindowedSum totalCommitLatencyWindowedSum = new WindowedSum();
private final WindowedSum processLatencyWindowedSum = new WindowedSum();
private final WindowedSum punctuateLatencyWindowedSum = new WindowedSum();
private final WindowedSum runOnceLatencyWindowedSum = new WindowedSum();
private final MetricConfig metricsConfig;

private boolean latencyWindowsInitialized = false;

private volatile long fetchDeadlineClientInstanceId = -1;
private volatile KafkaFutureImpl<Uuid> mainConsumerInstanceIdFuture = new KafkaFutureImpl<>();
private volatile KafkaFutureImpl<Uuid> restoreConsumerInstanceIdFuture = new KafkaFutureImpl<>();
Expand Down Expand Up @@ -788,6 +799,7 @@ public StreamThread(final Time time,
this.shutdownErrorHook = shutdownErrorHook;
this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
this.cacheResizer = cacheResizer;
this.metricsConfig = streamsMetrics.metricsRegistry().config();

// The following sensors are created here but their references are not stored in this object, since within
// this object they are not recorded. The sensors are created here so that the stream threads starts with all
Expand Down Expand Up @@ -887,6 +899,7 @@ public void run() {
boolean cleanRun = false;
try {
taskManager.init();
initLatencyWindowsIfNeeded(time.milliseconds());
cleanRun = runLoop();
} catch (final Throwable e) {
failedStreamThreadSensor.record();
Expand Down Expand Up @@ -1275,11 +1288,13 @@ void runOnceWithoutProcessingThreads() {

now = time.milliseconds();
final long runOnceLatency = now - startMs;

recordWindowedSum(now, pollLatency, totalCommitLatency, totalProcessLatency, totalPunctuateLatency, runOnceLatency);
recordRatio(now, pollLatencyWindowedSum, pollRatioSensor);
recordRatio(now, totalCommitLatencyWindowedSum, commitRatioSensor);
recordRatio(now, processLatencyWindowedSum, processRatioSensor);
recordRatio(now, punctuateLatencyWindowedSum, punctuateRatioSensor);
processRecordsSensor.record(totalProcessed, now);
processRatioSensor.record((double) totalProcessLatency / runOnceLatency, now);
punctuateRatioSensor.record((double) totalPunctuateLatency / runOnceLatency, now);
pollRatioSensor.record((double) pollLatency / runOnceLatency, now);
commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, now);

final long timeSinceLastLog = now - lastLogSummaryMs;
if (logSummaryIntervalMs > 0 && timeSinceLastLog > logSummaryIntervalMs) {
Expand Down Expand Up @@ -1362,8 +1377,10 @@ void runOnceWithProcessingThreads() {

now = time.milliseconds();
final long runOnceLatency = now - startMs;
pollRatioSensor.record((double) pollLatency / runOnceLatency, now);
commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, now);

recordWindowedSum(now, pollLatency, totalCommitLatency, 0, 0, runOnceLatency);
recordRatio(now, pollLatencyWindowedSum, pollRatioSensor);
recordRatio(now, totalCommitLatencyWindowedSum, commitRatioSensor);

if (logSummaryIntervalMs > 0 && now - lastLogSummaryMs > logSummaryIntervalMs) {
log.info("Committed {} total tasks since the last update", totalCommittedSinceLastSummary);
Expand Down Expand Up @@ -2103,4 +2120,46 @@ Admin adminClient() {
Optional<StreamsRebalanceData> streamsRebalanceData() {
return streamsRebalanceData;
}

/**
* Initialize both WindowedSum instances at exactly the same timestamp so
* their windows are aligned from the very beginning.
*/
private void initLatencyWindowsIfNeeded(final long now) {
if (!latencyWindowsInitialized) {
// Start both windows at the same instant with a zero record
pollLatencyWindowedSum.record(metricsConfig, 0.0, now);
totalCommitLatencyWindowedSum.record(metricsConfig, 0, now);
processLatencyWindowedSum.record(metricsConfig, 0, now);
punctuateLatencyWindowedSum.record(metricsConfig, 0, now);
runOnceLatencyWindowedSum.record(metricsConfig, 0.0, now);
latencyWindowsInitialized = true;
}
}

private void recordWindowedSum(final long now,
final double pollLatency,
final double totalCommitLatency,
final double processLatency,
final double punctuateLatency,
final double runOnceLatency) {
pollLatencyWindowedSum.record(metricsConfig, pollLatency, now);
totalCommitLatencyWindowedSum.record(metricsConfig, totalCommitLatency, now);
processLatencyWindowedSum.record(metricsConfig, processLatency, now);
punctuateLatencyWindowedSum.record(metricsConfig, punctuateLatency, now);
runOnceLatencyWindowedSum.record(metricsConfig, runOnceLatency, now);
}

private void recordRatio(final long now, final WindowedSum windowedSum, final Sensor ratioSensor) {
final double runOnceLatencyWindow =
runOnceLatencyWindowedSum.measure(metricsConfig, now);

if (runOnceLatencyWindow > 0.0) {
final double latencyWindow =
windowedSum.measure(metricsConfig, now);
ratioSensor.record(latencyWindow / runOnceLatencyWindow);
Comment on lines +2158 to +2160

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final double latencyWindow =
windowedSum.measure(metricsConfig, now);
ratioSensor.record(latencyWindow / runOnceLatencyWindow);
ratioSensor.record(windowedSum.measure(metricsConfig, now) / runOnceLatencyWindow);

} else {
ratioSensor.record(0.0, now);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,17 @@ private ThreadMetrics() {}
private static final String PUNCTUATE_AVG_LATENCY_DESCRIPTION = "The average punctuate latency";
private static final String PUNCTUATE_MAX_LATENCY_DESCRIPTION = "The maximum punctuate latency";
private static final String PROCESS_RATIO_DESCRIPTION =
"The fraction of time the thread spent on processing active tasks";
"The ratio, over a rolling measurement window, of the time this thread spent " +
"processing active tasks to the total elapsed time in that window.";
private static final String PUNCTUATE_RATIO_DESCRIPTION =
"The fraction of time the thread spent on punctuating active tasks";
"The ratio, over a rolling measurement window, of the time this thread spent " +
"punctuating active tasks to the total elapsed time in that window.";
private static final String POLL_RATIO_DESCRIPTION =
"The fraction of time the thread spent on polling records from consumer";
"The ratio, over a rolling measurement window, of the time this thread spent " +
"polling records from the consumer to the total elapsed time in that window.";
private static final String COMMIT_RATIO_DESCRIPTION =
"The fraction of time the thread spent on committing all tasks";
"The ratio, over a rolling measurement window, of the time this thread spent " +
"committing all tasks to the total elapsed time in that window.";
private static final String BLOCKED_TIME_DESCRIPTION =
"The total time the thread spent blocked on kafka in nanoseconds";
private static final String THREAD_START_TIME_DESCRIPTION =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public class ThreadMetricsTest {
@Test
public void shouldGetProcessRatioSensor() {
final String operation = "process-ratio";
final String ratioDescription = "The fraction of time the thread spent on processing active tasks";
final String ratioDescription = "The ratio, over a rolling measurement window, of the time this thread spent " +
"processing active tasks to the total elapsed time in that window.";
when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor);
when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap);

Expand Down Expand Up @@ -148,7 +149,8 @@ public void shouldGetProcessRateSensor() {
@Test
public void shouldGetPollRatioSensor() {
final String operation = "poll-ratio";
final String ratioDescription = "The fraction of time the thread spent on polling records from consumer";
final String ratioDescription = "The ratio, over a rolling measurement window, of the time this thread " +
"spent polling records from the consumer to the total elapsed time in that window.";
when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor);
when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap);

Expand Down Expand Up @@ -268,7 +270,8 @@ public void shouldGetCommitSensor() {
@Test
public void shouldGetCommitRatioSensor() {
final String operation = "commit-ratio";
final String ratioDescription = "The fraction of time the thread spent on committing all tasks";
final String ratioDescription = "The ratio, over a rolling measurement window, of the time this thread spent " +
"committing all tasks to the total elapsed time in that window.";
when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor);
when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap);

Expand Down Expand Up @@ -327,7 +330,8 @@ public void shouldGetPunctuateSensor() {
@Test
public void shouldGetPunctuateRatioSensor() {
final String operation = "punctuate-ratio";
final String ratioDescription = "The fraction of time the thread spent on punctuating active tasks";
final String ratioDescription = "The ratio, over a rolling measurement window, of the time this thread spent " +
"punctuating active tasks to the total elapsed time in that window.";
when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor);
when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap);

Expand Down