Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,13 @@ boolean readyToProcess(final long wallClockTime) {

if (!queue.isEmpty()) {
// this partition is ready for processing
logger.trace("Partition {} has buffered data, ready for processing", partition);
idlePartitionDeadlines.remove(partition);
queued.add(partition);
} else {
final Long fetchedLag = fetchedLags.getOrDefault(partition, -1L);

logger.trace("Fetched lag for {} is {}", partition, fetchedLag);
logger.trace("Fetched lag for partition {} is {}", partition, fetchedLag);

if (fetchedLag == -1L) {
// must wait to fetch metadata for the partition
Expand All @@ -141,7 +142,7 @@ boolean readyToProcess(final long wallClockTime) {
// must wait to poll the data we know to be on the broker
idlePartitionDeadlines.remove(partition);
logger.trace(
"Lag for {} is currently {}, but no data is buffered locally. Waiting to buffer some records.",
"Lag for partition {} is currently {}, but no data is buffered locally. Waiting to buffer some records.",
partition,
fetchedLag
);
Expand All @@ -157,7 +158,7 @@ boolean readyToProcess(final long wallClockTime) {
final long deadline = idlePartitionDeadlines.get(partition);
if (wallClockTime < deadline) {
logger.trace(
"Lag for {} is currently 0 and current time is {}. Waiting for new data to be produced for configured idle time {} (deadline is {}).",
"Lag for partition {} is currently 0 and current time is {}. Waiting for new data to be produced for configured idle time {} (deadline is {}).",
partition,
wallClockTime,
maxTaskIdleMs,
Expand All @@ -166,6 +167,7 @@ boolean readyToProcess(final long wallClockTime) {
return false;
} else {
// this partition is ready for processing due to the task idling deadline passing
logger.trace("Partition {} is ready for processing due to the task idling deadline passing", partition);
if (enforced == null) {
enforced = new HashMap<>();
}
Expand Down Expand Up @@ -255,24 +257,32 @@ record = queue.poll(wallClockTime);

if (record != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if record be every be null here? -- Should we actually add an else and log an ERROR (or WARN) log, or even throw IllegalStateException as this should always be true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Record should not be null. If it is null then the queue itself is also empty (which is detected in the outer loop).

I tried to add a else statement that throws an exception, and is detected by SpotBugs as "redundant null check". Therefore i think it's fine to drop it.

totalBuffered -= oldSize - queue.size();
logger.trace("Partition {} polling next record:, oldSize={}, newSize={}, totalBuffered={}, recordTimestamp={}",
queue.partition(), oldSize, queue.size(), totalBuffered, record.timestamp);

if (queue.isEmpty()) {
// if a certain queue has been drained, reset the flag
allBuffered = false;
logger.trace("Partition {} queue is now empty, allBuffered=false", queue.partition());
} else {
nonEmptyQueuesByTime.offer(queue);
}

// always update the stream-time to the record's timestamp yet to be processed if it is larger
if (record.timestamp > streamTime) {
final long oldStreamTime = streamTime;
streamTime = record.timestamp;
recordLatenessSensor.record(0, wallClockTime);
logger.trace("Partition {} stream time updated from {} to {}", queue.partition(), oldStreamTime, streamTime);
} else {
recordLatenessSensor.record(streamTime - record.timestamp, wallClockTime);
final long lateness = streamTime - record.timestamp;
recordLatenessSensor.record(lateness, wallClockTime);
}
}
} else {
logger.trace("Partition pulling nextRecord: no queue available");
}

return record;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final boolean clean)
// TODO: this should be removed after we decouple caching with emitting
flush();
if (!clean) {
log.debug("Skipped preparing {} task with id {} for commit since the task is getting closed dirty.", state(), id);
log.debug("Skipped preparing {} task for commit since the task is getting closed dirty.", state());
return null;
}
hasPendingTxCommit = eosEnabled;
Expand Down Expand Up @@ -725,23 +725,26 @@ public boolean isProcessable(final long wallClockTime) {
// a task is only closing / closed when 1) task manager is closing, 2) a rebalance is undergoing;
// in either case we can just log it and move on without notifying the thread since the consumer
// would soon be updated to not return any records for this task anymore.
log.info("Stream task {} is already in {} state, skip processing it.", id(), state());
log.info("Task is already in {} state, skip processing it.", state());

return false;
}

if (hasPendingTxCommit) {
// if the task has a pending TX commit, we should just retry the commit but not process any records
// thus, the task is not processable, even if there is available data in the record queue
log.debug("Task has a pending transaction commit, skip processing it.");
return false;
}
final boolean readyToProcess = partitionGroup.readyToProcess(wallClockTime);
if (!readyToProcess) {
if (timeCurrentIdlingStarted.isEmpty()) {
timeCurrentIdlingStarted = Optional.of(wallClockTime);
}
log.debug("Task started idling at time {}", timeCurrentIdlingStarted.get());
} else {
timeCurrentIdlingStarted = Optional.empty();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also add a "resumed" log similar to "started idling" ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added .Since this branch is called more often I set it to TRACE

log.trace("Task is ready to process");
}
return readyToProcess;
}
Expand All @@ -764,7 +767,10 @@ record = partitionGroup.nextRecord(recordInfo, wallClockTime);

// if there is no record to process, return immediately
if (record == null) {
log.trace("Task has no next record to process.");
return false;
} else {
log.trace("Task fetched one record {} to process.", record);
}
}

Expand All @@ -773,15 +779,22 @@ record = partitionGroup.nextRecord(recordInfo, wallClockTime);

if (!(record instanceof CorruptedRecord)) {
doProcess(wallClockTime);
} else {
log.trace("Task skips processing corrupted record {}", record);
}

// update the consumed offset map after processing is done
consumedOffsets.put(partition, record.offset());
commitNeeded = true;

log.trace("Task processed record: topic={}, partition={}, offset={}",
record.topic(), record.partition(), record.offset());

// after processing this record, if its partition queue's buffered size has been
// decreased to the threshold, we can then resume the consumption on this partition
if (recordInfo.queue().size() <= maxBufferedSize) {
log.trace("Resume consumption for partition {}: buffered size {} is under the threshold {}",
partition, recordInfo.queue().size(), maxBufferedSize);
partitionsToResume.add(partition);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,15 @@ public class TaskExecutionMetadata {
private final Collection<Task> successfullyProcessed = new HashSet<>();
// map of topologies experiencing errors/currently under backoff
private final ConcurrentHashMap<String, NamedTopologyMetadata> topologyNameToErrorMetadata = new ConcurrentHashMap<>();
private final Logger log;

public TaskExecutionMetadata(final Set<String> allTopologyNames,
final Set<String> pausedTopologies,
final ProcessingMode processingMode) {
this.hasNamedTopologies = !(allTopologyNames.size() == 1 && allTopologyNames.contains(UNNAMED_TOPOLOGY));
this.pausedTopologies = pausedTopologies;
this.processingMode = processingMode;
this.log = new LogContext("").logger(TaskExecutionMetadata.class);
}

public boolean hasNamedTopologies() {
Expand All @@ -65,13 +67,17 @@ public boolean canProcessTask(final Task task, final long now) {
final String topologyName = task.id().topologyName();
if (!hasNamedTopologies) {
// TODO implement error handling/backoff for non-named topologies (needs KIP)
log.trace("Task {} processing check for unnamed topology '{}'", task.id(), topologyName);
return !pausedTopologies.contains(UNNAMED_TOPOLOGY);
} else {
if (pausedTopologies.contains(topologyName)) {
log.debug("Task {} can't be processed: topology '{}' is paused", task.id(), topologyName);
return false;
} else {
log.trace("Task {} processing check for named topology '{}'", task.id(), topologyName);
final NamedTopologyMetadata metadata = topologyNameToErrorMetadata.get(topologyName);
return metadata == null || (metadata.canProcess() && metadata.canProcessTask(task, now));

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ public void shouldWaitForPollWhenLagIsNonzero() {
appender.getEvents(),
hasItem(Matchers.allOf(
Matchers.hasProperty("level", equalTo("TRACE")),
Matchers.hasProperty("message", equalTo("[test] Lag for topic-2 is currently 1, but no data is buffered locally. Waiting to buffer some records."))
Matchers.hasProperty("message", equalTo("[test] Lag for partition topic-2 is currently 1, but no data is buffered locally. Waiting to buffer some records."))
))
);
}
Expand Down Expand Up @@ -868,7 +868,7 @@ public void shouldIdleAsSpecifiedWhenLagIsZero() {
appender.getEvents(),
hasItem(Matchers.allOf(
Matchers.hasProperty("level", equalTo("TRACE")),
Matchers.hasProperty("message", equalTo("[test] Lag for topic-2 is currently 0 and current time is 0. Waiting for new data to be produced for configured idle time 1 (deadline is 1)."))
Matchers.hasProperty("message", equalTo("[test] Lag for partition topic-2 is currently 0 and current time is 0. Waiting for new data to be produced for configured idle time 1 (deadline is 1)."))
))
);
}
Expand Down Expand Up @@ -926,7 +926,7 @@ private void hasZeroFetchedLag(final PartitionGroup group, final TopicPartition
appender.setClassLogger(PartitionGroup.class, Level.TRACE);
assertFalse(group.readyToProcess(0L));
assertThat(appender.getEvents(), hasItem(Matchers.hasProperty("message",
startsWith(String.format("[test] Lag for %s is currently 0 and current time is %d. "
startsWith(String.format("[test] Lag for partition %s is currently 0 and current time is %d. "
+ "Waiting for new data to be produced for configured idle time", partition, 0L)))));
}
}
Expand All @@ -937,7 +937,7 @@ private void hasNonZeroFetchedLag(final PartitionGroup group, final TopicPartiti
appender.setClassLogger(PartitionGroup.class, Level.TRACE);
assertFalse(group.readyToProcess(0L));
assertThat(appender.getEvents(), hasItem(Matchers.hasProperty("message",
equalTo(String.format("[test] Lag for %s is currently %d, but no data is buffered locally. "
equalTo(String.format("[test] Lag for partition %s is currently %d, but no data is buffered locally. "
+ "Waiting to buffer some records.", partition, lag)))));
}
}
Expand Down