From ced1b22b398fcdb36bae51353fbef3b7fc36b395 Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Sun, 12 Oct 2025 18:13:03 -0500 Subject: [PATCH 01/12] add logs for task readiness --- .../streams/processor/internals/PartitionGroup.java | 8 +++++--- .../streams/processor/internals/StreamTask.java | 12 +++++++++++- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java index 5fb313ff6052d..919b908b572cd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java @@ -125,12 +125,13 @@ boolean readyToProcess(final long wallClockTime) { if (!queue.isEmpty()) { // this partition is ready for processing + logger.trace("Partition {} has buffered data, ready for processing", partition); idlePartitionDeadlines.remove(partition); queued.add(partition); } else { final Long fetchedLag = fetchedLags.getOrDefault(partition, -1L); - logger.trace("Fetched lag for {} is {}", partition, fetchedLag); + logger.trace("Fetched lag for partition {} is {}", partition, fetchedLag); if (fetchedLag == -1L) { // must wait to fetch metadata for the partition @@ -141,7 +142,7 @@ boolean readyToProcess(final long wallClockTime) { // must wait to poll the data we know to be on the broker idlePartitionDeadlines.remove(partition); logger.trace( - "Lag for {} is currently {}, but no data is buffered locally. Waiting to buffer some records.", + "Lag for partition {} is currently {}, but no data is buffered locally. Waiting to buffer some records.", partition, fetchedLag ); @@ -157,7 +158,7 @@ boolean readyToProcess(final long wallClockTime) { final long deadline = idlePartitionDeadlines.get(partition); if (wallClockTime < deadline) { logger.trace( - "Lag for {} is currently 0 and current time is {}. Waiting for new data to be produced for configured idle time {} (deadline is {}).", + "Lag for partition {} is currently 0 and current time is {}. Waiting for new data to be produced for configured idle time {} (deadline is {}).", partition, wallClockTime, maxTaskIdleMs, @@ -166,6 +167,7 @@ boolean readyToProcess(final long wallClockTime) { return false; } else { // this partition is ready for processing due to the task idling deadline passing + logger.trace("Partition {} is ready for processing due to the task idling deadline passing", partition); if (enforced == null) { enforced = new HashMap<>(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 42b57e46aa4f3..69ba2f6969793 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -450,7 +450,7 @@ public Map prepareCommit(final boolean clean) } hasPendingTxCommit = eosEnabled; - log.debug("Prepared {} task for committing", state()); + log.debug("Prepared {} task {} for committing", state(), id); return committableOffsetsAndMetadata(); } else { log.debug("Skipped preparing {} task for commit since there is nothing to commit", state()); @@ -742,6 +742,9 @@ public boolean isProcessable(final long wallClockTime) { if (hasPendingTxCommit) { // if the task has a pending TX commit, we should just retry the commit but not process any records // thus, the task is not processable, even if there is available data in the record queue + if (log.isDebugEnabled()) { + log.debug("Stream task {} has a pending transaction commit, skip processing it.", id()); + } return false; } final boolean readyToProcess = partitionGroup.readyToProcess(wallClockTime); @@ -749,6 +752,7 @@ public boolean isProcessable(final long wallClockTime) { if (timeCurrentIdlingStarted.isEmpty()) { timeCurrentIdlingStarted = Optional.of(wallClockTime); } + log.debug("Task {} started idling at time {}", id, timeCurrentIdlingStarted.get()); } else { timeCurrentIdlingStarted = Optional.empty(); } @@ -773,6 +777,7 @@ record = partitionGroup.nextRecord(recordInfo, wallClockTime); // if there is no record to process, return immediately if (record == null) { + log.trace("Task {} has no next record to process.", id()); return false; } } @@ -788,9 +793,14 @@ record = partitionGroup.nextRecord(recordInfo, wallClockTime); consumedOffsets.put(partition, record.offset()); commitNeeded = true; + log.trace("Task {} processed record: topic={}, partition={}, offset={}, remainingBuffered={}", + id, record.topic(), record.partition(), record.offset(), recordInfo.queue().size()); + // after processing this record, if its partition queue's buffered size has been // decreased to the threshold, we can then resume the consumption on this partition if (recordInfo.queue().size() <= maxBufferedSize) { + log.trace("Resume consumption for partition {}: buffered size {} is under the threshold {}", + partition, recordInfo.queue().size(), maxBufferedSize); partitionsToResume.add(partition); } From 69c60506cfd91cc9b37cc2a449c0d6272a0092b9 Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Sun, 12 Oct 2025 19:01:15 -0500 Subject: [PATCH 02/12] add more logs --- .../streams/processor/internals/StreamTask.java | 12 ++++++++---- .../processor/internals/TaskExecutionMetadata.java | 4 ++++ streams/src/test/resources/log4j2.yaml | 9 +++++++++ 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 69ba2f6969793..50eda39d724b1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -752,7 +752,9 @@ public boolean isProcessable(final long wallClockTime) { if (timeCurrentIdlingStarted.isEmpty()) { timeCurrentIdlingStarted = Optional.of(wallClockTime); } - log.debug("Task {} started idling at time {}", id, timeCurrentIdlingStarted.get()); + if (log.isDebugEnabled()) { + log.debug("Task {} started idling at time {}", id, timeCurrentIdlingStarted.get()); + } } else { timeCurrentIdlingStarted = Optional.empty(); } @@ -793,14 +795,16 @@ record = partitionGroup.nextRecord(recordInfo, wallClockTime); consumedOffsets.put(partition, record.offset()); commitNeeded = true; - log.trace("Task {} processed record: topic={}, partition={}, offset={}, remainingBuffered={}", - id, record.topic(), record.partition(), record.offset(), recordInfo.queue().size()); + if (log.isTraceEnabled()) { + log.trace("Task {} processed record: topic={}, partition={}, offset={}", + id, record.topic(), record.partition(), record.offset()); + } // after processing this record, if its partition queue's buffered size has been // decreased to the threshold, we can then resume the consumption on this partition if (recordInfo.queue().size() <= maxBufferedSize) { log.trace("Resume consumption for partition {}: buffered size {} is under the threshold {}", - partition, recordInfo.queue().size(), maxBufferedSize); + partition, recordInfo.queue().size(), maxBufferedSize); partitionsToResume.add(partition); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java index 86001ba413073..45f14ccf40b4d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java @@ -44,6 +44,7 @@ public class TaskExecutionMetadata { private final Collection successfullyProcessed = new HashSet<>(); // map of topologies experiencing errors/currently under backoff private final ConcurrentHashMap topologyNameToErrorMetadata = new ConcurrentHashMap<>(); + private final Logger log; public TaskExecutionMetadata(final Set allTopologyNames, final Set pausedTopologies, @@ -51,6 +52,7 @@ public TaskExecutionMetadata(final Set allTopologyNames, this.hasNamedTopologies = !(allTopologyNames.size() == 1 && allTopologyNames.contains(UNNAMED_TOPOLOGY)); this.pausedTopologies = pausedTopologies; this.processingMode = processingMode; + this.log = new LogContext("[TaskExecutionMetadata]").logger(TaskExecutionMetadata.class); } public boolean hasNamedTopologies() { @@ -65,9 +67,11 @@ public boolean canProcessTask(final Task task, final long now) { final String topologyName = task.id().topologyName(); if (!hasNamedTopologies) { // TODO implement error handling/backoff for non-named topologies (needs KIP) + log.debug("Task {} has no named topologies", task.id()); return !pausedTopologies.contains(UNNAMED_TOPOLOGY); } else { if (pausedTopologies.contains(topologyName)) { + log.debug("Task {} has paused topologies", task.id()); return false; } else { final NamedTopologyMetadata metadata = topologyNameToErrorMetadata.get(topologyName); diff --git a/streams/src/test/resources/log4j2.yaml b/streams/src/test/resources/log4j2.yaml index 0942036a33c80..44d4c8cf6024a 100644 --- a/streams/src/test/resources/log4j2.yaml +++ b/streams/src/test/resources/log4j2.yaml @@ -63,3 +63,12 @@ Configuration: - name: org.apache.kafka.streams.StreamsConfig level: ERROR + + - name: org.apache.kafka.streams.processor.internals.StreamTask + level: TRACE + + - name: org.apache.kafka.streams.processor.internals.PartitionGroup + level: TRACE + + - name: org.apache.kafka.streams.processor.internals.TaskExecutionMetadata + level: TRACE \ No newline at end of file From beafd373f79ad63f0693c1e3a0e64f26bd88f4b4 Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Sun, 12 Oct 2025 19:06:28 -0500 Subject: [PATCH 03/12] remove redundant debug level info for log4j2 --- streams/src/test/resources/log4j2.yaml | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/streams/src/test/resources/log4j2.yaml b/streams/src/test/resources/log4j2.yaml index 44d4c8cf6024a..60c2de0a7ae3f 100644 --- a/streams/src/test/resources/log4j2.yaml +++ b/streams/src/test/resources/log4j2.yaml @@ -62,13 +62,4 @@ Configuration: level: ERROR - name: org.apache.kafka.streams.StreamsConfig - level: ERROR - - - name: org.apache.kafka.streams.processor.internals.StreamTask - level: TRACE - - - name: org.apache.kafka.streams.processor.internals.PartitionGroup - level: TRACE - - - name: org.apache.kafka.streams.processor.internals.TaskExecutionMetadata - level: TRACE \ No newline at end of file + level: ERROR \ No newline at end of file From c514cbd4d02f5cb858ce40d2aeb622de02152ef8 Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Sun, 12 Oct 2025 19:09:33 -0500 Subject: [PATCH 04/12] fix spacing --- streams/src/test/resources/log4j2.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/test/resources/log4j2.yaml b/streams/src/test/resources/log4j2.yaml index 60c2de0a7ae3f..0942036a33c80 100644 --- a/streams/src/test/resources/log4j2.yaml +++ b/streams/src/test/resources/log4j2.yaml @@ -62,4 +62,4 @@ Configuration: level: ERROR - name: org.apache.kafka.streams.StreamsConfig - level: ERROR \ No newline at end of file + level: ERROR From c60d7af56471f72bdae7112f64388a4b957e4e77 Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Tue, 14 Oct 2025 12:15:26 -0500 Subject: [PATCH 05/12] modify expected log output for tests --- .../streams/processor/internals/PartitionGroupTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java index e29af81095bb7..9d80563d76054 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java @@ -833,7 +833,7 @@ public void shouldWaitForPollWhenLagIsNonzero() { appender.getEvents(), hasItem(Matchers.allOf( Matchers.hasProperty("level", equalTo("TRACE")), - Matchers.hasProperty("message", equalTo("[test] Lag for topic-2 is currently 1, but no data is buffered locally. Waiting to buffer some records.")) + Matchers.hasProperty("message", equalTo("[test] Lag for partition topic-2 is currently 1, but no data is buffered locally. Waiting to buffer some records.")) )) ); } @@ -868,7 +868,7 @@ public void shouldIdleAsSpecifiedWhenLagIsZero() { appender.getEvents(), hasItem(Matchers.allOf( Matchers.hasProperty("level", equalTo("TRACE")), - Matchers.hasProperty("message", equalTo("[test] Lag for topic-2 is currently 0 and current time is 0. Waiting for new data to be produced for configured idle time 1 (deadline is 1).")) + Matchers.hasProperty("message", equalTo("[test] Lag for partition topic-2 is currently 0 and current time is 0. Waiting for new data to be produced for configured idle time 1 (deadline is 1).")) )) ); } @@ -926,7 +926,7 @@ private void hasZeroFetchedLag(final PartitionGroup group, final TopicPartition appender.setClassLogger(PartitionGroup.class, Level.TRACE); assertFalse(group.readyToProcess(0L)); assertThat(appender.getEvents(), hasItem(Matchers.hasProperty("message", - startsWith(String.format("[test] Lag for %s is currently 0 and current time is %d. " + startsWith(String.format("[test] Lag for partition %s is currently 0 and current time is %d. " + "Waiting for new data to be produced for configured idle time", partition, 0L))))); } } @@ -937,7 +937,7 @@ private void hasNonZeroFetchedLag(final PartitionGroup group, final TopicPartiti appender.setClassLogger(PartitionGroup.class, Level.TRACE); assertFalse(group.readyToProcess(0L)); assertThat(appender.getEvents(), hasItem(Matchers.hasProperty("message", - equalTo(String.format("[test] Lag for %s is currently %d, but no data is buffered locally. " + equalTo(String.format("[test] Lag for partition %s is currently %d, but no data is buffered locally. " + "Waiting to buffer some records.", partition, lag))))); } } From ffecc92523af01d2f5f145a8decc66a3b275db46 Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Thu, 16 Oct 2025 14:44:08 -0500 Subject: [PATCH 06/12] revise --- .../streams/processor/internals/StreamTask.java | 14 +++++++------- .../processor/internals/TaskExecutionMetadata.java | 5 +++-- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 0a106b655b3d3..5b1c4708acbf1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -743,9 +743,7 @@ public boolean isProcessable(final long wallClockTime) { if (timeCurrentIdlingStarted.isEmpty()) { timeCurrentIdlingStarted = Optional.of(wallClockTime); } - if (log.isDebugEnabled()) { - log.debug("Task {} started idling at time {}", id, timeCurrentIdlingStarted.get()); - } + log.debug("Task {} started idling at time {}", id, timeCurrentIdlingStarted.get()); } else { timeCurrentIdlingStarted = Optional.empty(); } @@ -772,6 +770,8 @@ record = partitionGroup.nextRecord(recordInfo, wallClockTime); if (record == null) { log.trace("Task {} has no next record to process.", id()); return false; + } else { + log.trace("Task {} fetched one record {} to process.", id, record); } } @@ -780,16 +780,16 @@ record = partitionGroup.nextRecord(recordInfo, wallClockTime); if (!(record instanceof CorruptedRecord)) { doProcess(wallClockTime); + } else { + log.trace("Task {} skips processing corrupted record {}", id, record); } // update the consumed offset map after processing is done consumedOffsets.put(partition, record.offset()); commitNeeded = true; - if (log.isTraceEnabled()) { - log.trace("Task {} processed record: topic={}, partition={}, offset={}", - id, record.topic(), record.partition(), record.offset()); - } + log.trace("Task {} processed record: topic={}, partition={}, offset={}", + id, record.topic(), record.partition(), record.offset()); // after processing this record, if its partition queue's buffered size has been // decreased to the threshold, we can then resume the consumption on this partition diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java index 45f14ccf40b4d..8586ec4aa28f0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java @@ -67,14 +67,15 @@ public boolean canProcessTask(final Task task, final long now) { final String topologyName = task.id().topologyName(); if (!hasNamedTopologies) { // TODO implement error handling/backoff for non-named topologies (needs KIP) - log.debug("Task {} has no named topologies", task.id()); + log.debug("Task {} processing check for unnamed topology '{}'", task.id(), topologyName); return !pausedTopologies.contains(UNNAMED_TOPOLOGY); } else { if (pausedTopologies.contains(topologyName)) { - log.debug("Task {} has paused topologies", task.id()); + log.debug("Task {} can't be processed: topology '{}' is paused", task.id(), topologyName); return false; } else { final NamedTopologyMetadata metadata = topologyNameToErrorMetadata.get(topologyName); + log.debug("Task {} processing check for topology '{}'", task.id(), topologyName); return metadata == null || (metadata.canProcess() && metadata.canProcessTask(task, now)); } } From d67ea53de55716a66470f2360c423b34e170f009 Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Thu, 16 Oct 2025 15:16:30 -0500 Subject: [PATCH 07/12] revise --- .../apache/kafka/streams/processor/internals/StreamTask.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 5b1c4708acbf1..825d6363cf2ba 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -733,9 +733,7 @@ public boolean isProcessable(final long wallClockTime) { if (hasPendingTxCommit) { // if the task has a pending TX commit, we should just retry the commit but not process any records // thus, the task is not processable, even if there is available data in the record queue - if (log.isDebugEnabled()) { - log.debug("Stream task {} has a pending transaction commit, skip processing it.", id()); - } + log.debug("Stream task {} has a pending transaction commit, skip processing it.", id()); return false; } final boolean readyToProcess = partitionGroup.readyToProcess(wallClockTime); From a1cd7302125279bddd32181122b2241f3409ca2c Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Thu, 16 Oct 2025 15:28:37 -0500 Subject: [PATCH 08/12] reformat --- .../apache/kafka/streams/processor/internals/StreamTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 825d6363cf2ba..04f547afd7088 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -733,7 +733,7 @@ public boolean isProcessable(final long wallClockTime) { if (hasPendingTxCommit) { // if the task has a pending TX commit, we should just retry the commit but not process any records // thus, the task is not processable, even if there is available data in the record queue - log.debug("Stream task {} has a pending transaction commit, skip processing it.", id()); + log.debug("Stream task {} has a pending transaction commit, skip processing it.", id()); return false; } final boolean readyToProcess = partitionGroup.readyToProcess(wallClockTime); From d0b0e8269d78fdec5a83e088129eff44bd2113ce Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Fri, 17 Oct 2025 13:38:38 -0500 Subject: [PATCH 09/12] add more logs to partitiongroup --- .../streams/processor/internals/PartitionGroup.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java index 919b908b572cd..cc5a69b9e2e95 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java @@ -257,24 +257,32 @@ record = queue.poll(wallClockTime); if (record != null) { totalBuffered -= oldSize - queue.size(); + logger.trace("Partition {} polling next record:, oldSize={}, newSize={}, totalBuffered={}, recordTimestamp={}", + queue.partition(), oldSize, queue.size(), totalBuffered, record.timestamp); if (queue.isEmpty()) { // if a certain queue has been drained, reset the flag allBuffered = false; + logger.trace("Partition {} queue is now empty, allBuffered=false", queue.partition()); } else { nonEmptyQueuesByTime.offer(queue); } // always update the stream-time to the record's timestamp yet to be processed if it is larger if (record.timestamp > streamTime) { + final long oldStreamTime = streamTime; streamTime = record.timestamp; recordLatenessSensor.record(0, wallClockTime); + logger.trace("Partition {} stream time updated from {} to {}", queue.partition(), oldStreamTime, streamTime); } else { - recordLatenessSensor.record(streamTime - record.timestamp, wallClockTime); + final long lateness = streamTime - record.timestamp; + recordLatenessSensor.record(lateness, wallClockTime); } } + } else { + logger.trace("Partition pulling nextRecord: no queue available, totalBuffered={}", totalBuffered); } - + return record; } From 6105060f2e835338c8e4d29b353993f10c58f04e Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Tue, 21 Oct 2025 14:45:52 -0500 Subject: [PATCH 10/12] adjust logs --- .../processor/internals/PartitionGroup.java | 2 +- .../processor/internals/StreamTask.java | 21 ++++++++++--------- .../internals/TaskExecutionMetadata.java | 7 ++++--- 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java index cc5a69b9e2e95..a0c57c46d6683 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java @@ -280,7 +280,7 @@ record = queue.poll(wallClockTime); } } } else { - logger.trace("Partition pulling nextRecord: no queue available, totalBuffered={}", totalBuffered); + logger.trace("Partition pulling nextRecord: no queue available"); } return record; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 04f547afd7088..51da66afac337 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -445,12 +445,12 @@ public Map prepareCommit(final boolean clean) // TODO: this should be removed after we decouple caching with emitting flush(); if (!clean) { - log.debug("Skipped preparing {} task with id {} for commit since the task is getting closed dirty.", state(), id); + log.debug("Skipped preparing {} task for commit since the task is getting closed dirty.", state()); return null; } hasPendingTxCommit = eosEnabled; - log.debug("Prepared {} task {} for committing", state(), id); + log.debug("Prepared {} task for committing", state()); return committableOffsetsAndMetadata(); } else { log.debug("Skipped preparing {} task for commit since there is nothing to commit", state()); @@ -725,7 +725,7 @@ public boolean isProcessable(final long wallClockTime) { // a task is only closing / closed when 1) task manager is closing, 2) a rebalance is undergoing; // in either case we can just log it and move on without notifying the thread since the consumer // would soon be updated to not return any records for this task anymore. - log.info("Stream task {} is already in {} state, skip processing it.", id(), state()); + log.info("Task is already in {} state, skip processing it.", state()); return false; } @@ -733,7 +733,7 @@ public boolean isProcessable(final long wallClockTime) { if (hasPendingTxCommit) { // if the task has a pending TX commit, we should just retry the commit but not process any records // thus, the task is not processable, even if there is available data in the record queue - log.debug("Stream task {} has a pending transaction commit, skip processing it.", id()); + log.debug("Task has a pending transaction commit, skip processing it."); return false; } final boolean readyToProcess = partitionGroup.readyToProcess(wallClockTime); @@ -741,9 +741,10 @@ public boolean isProcessable(final long wallClockTime) { if (timeCurrentIdlingStarted.isEmpty()) { timeCurrentIdlingStarted = Optional.of(wallClockTime); } - log.debug("Task {} started idling at time {}", id, timeCurrentIdlingStarted.get()); + log.debug("Task started idling at time {}", timeCurrentIdlingStarted.get()); } else { timeCurrentIdlingStarted = Optional.empty(); + log.trace("Task is ready to process"); } return readyToProcess; } @@ -766,10 +767,10 @@ record = partitionGroup.nextRecord(recordInfo, wallClockTime); // if there is no record to process, return immediately if (record == null) { - log.trace("Task {} has no next record to process.", id()); + log.trace("Task has no next record to process."); return false; } else { - log.trace("Task {} fetched one record {} to process.", id, record); + log.trace("Task fetched one record {} to process.", record); } } @@ -779,15 +780,15 @@ record = partitionGroup.nextRecord(recordInfo, wallClockTime); if (!(record instanceof CorruptedRecord)) { doProcess(wallClockTime); } else { - log.trace("Task {} skips processing corrupted record {}", id, record); + log.trace("Task skips processing corrupted record {}", record); } // update the consumed offset map after processing is done consumedOffsets.put(partition, record.offset()); commitNeeded = true; - log.trace("Task {} processed record: topic={}, partition={}, offset={}", - id, record.topic(), record.partition(), record.offset()); + log.trace("Task processed record: topic={}, partition={}, offset={}", + record.topic(), record.partition(), record.offset()); // after processing this record, if its partition queue's buffered size has been // decreased to the threshold, we can then resume the consumption on this partition diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java index 8586ec4aa28f0..c5eaf47dc42a0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java @@ -52,7 +52,7 @@ public TaskExecutionMetadata(final Set allTopologyNames, this.hasNamedTopologies = !(allTopologyNames.size() == 1 && allTopologyNames.contains(UNNAMED_TOPOLOGY)); this.pausedTopologies = pausedTopologies; this.processingMode = processingMode; - this.log = new LogContext("[TaskExecutionMetadata]").logger(TaskExecutionMetadata.class); + this.log = new LogContext("").logger(TaskExecutionMetadata.class); } public boolean hasNamedTopologies() { @@ -67,16 +67,17 @@ public boolean canProcessTask(final Task task, final long now) { final String topologyName = task.id().topologyName(); if (!hasNamedTopologies) { // TODO implement error handling/backoff for non-named topologies (needs KIP) - log.debug("Task {} processing check for unnamed topology '{}'", task.id(), topologyName); + log.trace("Task {} processing check for unnamed topology '{}'", task.id(), topologyName); return !pausedTopologies.contains(UNNAMED_TOPOLOGY); } else { if (pausedTopologies.contains(topologyName)) { log.debug("Task {} can't be processed: topology '{}' is paused", task.id(), topologyName); return false; } else { + log.trace("Task {} processing check for named topology '{}'", task.id(), topologyName); final NamedTopologyMetadata metadata = topologyNameToErrorMetadata.get(topologyName); - log.debug("Task {} processing check for topology '{}'", task.id(), topologyName); return metadata == null || (metadata.canProcess() && metadata.canProcessTask(task, now)); + } } } From 563541d1fa97e1e3c37fa45d3432e0ed06f00c1c Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Wed, 22 Oct 2025 16:51:26 -0500 Subject: [PATCH 11/12] Update streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java Co-authored-by: Matthias J. Sax --- .../streams/processor/internals/TaskExecutionMetadata.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java index c5eaf47dc42a0..8b056cd693343 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java @@ -71,7 +71,7 @@ public boolean canProcessTask(final Task task, final long now) { return !pausedTopologies.contains(UNNAMED_TOPOLOGY); } else { if (pausedTopologies.contains(topologyName)) { - log.debug("Task {} can't be processed: topology '{}' is paused", task.id(), topologyName); + log.trace("Task {} can't be processed: topology '{}' is paused", task.id(), topologyName); return false; } else { log.trace("Task {} processing check for named topology '{}'", task.id(), topologyName); From 8b711dcbd1c59c83f1d550c21ba859bb654d4d1b Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Wed, 22 Oct 2025 16:51:34 -0500 Subject: [PATCH 12/12] Update streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java Co-authored-by: Matthias J. Sax --- .../apache/kafka/streams/processor/internals/PartitionGroup.java | 1 - 1 file changed, 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java index a0c57c46d6683..6a1514b9c5f83 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java @@ -282,7 +282,6 @@ record = queue.poll(wallClockTime); } else { logger.trace("Partition pulling nextRecord: no queue available"); } - return record; }