Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18736: Do not send fields if not needed #19181

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from

Conversation

cadonna
Copy link
Member

@cadonna cadonna commented Mar 10, 2025

The Streams heartbeat request has some fields that are always sent. Those are:

  • group ID
  • member ID
  • member epoch
  • group instance ID (if static membership is used)

Then it has fields that are only sent when joining:

  • topology and topology epoch
  • rebalance timeout
  • process ID
  • endpoint
  • client tags

Finally, the assignment is only sent if it changed compared to the last sent request.

The Streams heartbeat request has some fields that are always sent.
Those are:
- group ID
- member ID
- member epoch
- group instance ID (if static membership is used)

Then it has fields that are only sent when joining:
- topology and topology epoch
- rebalance timeout
- process ID
- endpoint
- client tags

Finally, the assignment is only sent if it changed compared to the
last sent request.
@cadonna
Copy link
Member Author

cadonna commented Mar 10, 2025

Call for review: @aliehsaeedii

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR Overview

This PR optimizes the Streams heartbeat request by ensuring that unchanged assignment fields are not unnecessarily sent. Key changes include the introduction of a nested helper class to track the last sent assignment data, the conditional sending of task assignments based on the member state, and ensuring that partitions and subtopologies are consistently sorted.

Reviewed Changes

File Description
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java Introduces a new SentFields nested class to track sent request fields; conditionally builds and sends task assignments for JOINING versus non-JOINING states; applies sorting improvements for consistent ordering.

Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.

Comments suppressed due to low confidence (2)

clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:127

  • Consider using Objects.equals(reconciledAssignment, sentFields.assignment) to safely compare assignments and avoid a potential NullPointerException if reconciledAssignment ever becomes null.
if (!reconciledAssignment.equals(sentFields.assignment)) {

clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:65

  • [nitpick] The name 'SentFields' could be more descriptive (for example, 'LastSentFields') to clearly indicate that it stores the fields of the most recent request.
static class SentFields {
Copy link
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for PR @cadonna - I made a pass over all lgtm, just a few minor comments.


if (joining) {
StreamsGroupHeartbeatRequestData.Topology topology = new StreamsGroupHeartbeatRequestData.Topology();
topology.setSubtopologies(getTopologyFromStreams(streamsRebalanceData.subtopologies()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming here seems a little off setSubtopologies -> getTopology -> x.subTopologies but I feel like we've discussed this before and I can't recall the discussion or the outcome.

)
public void testBuildingHeartbeatShutdownRequested(final MemberState memberState) {
final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState =
new StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, membershipManager, 1234);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: parameters on separate lines

assertEquals(requestDataBeforeReset.warmupTasks(), requestDataAfterReset.warmupTasks());
}

private static Stream<Arguments> provideNonJoiningStates() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra spacing here

when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH);
when(membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID));
final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState =
new StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData, membershipManager, 1234);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: parameters on separate lines

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cadonna thanks for this patch. I leave some trivial questions. PTAL

@@ -119,9 +146,9 @@ private static List<StreamsGroupHeartbeatRequestData.TaskIds> convertTaskIdColle
.map(entry -> {
StreamsGroupHeartbeatRequestData.TaskIds ids = new StreamsGroupHeartbeatRequestData.TaskIds();
ids.setSubtopologyId(entry.getKey());
ids.setPartitions(entry.getValue());
ids.setPartitions(entry.getValue().stream().sorted().collect(Collectors.toList()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excuse me, is the sorting operation necessary in production, or is it primarily used for testing purposes?

@@ -61,9 +61,23 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager {

static class HeartbeatState {

// Fields of StreamsGroupHeartbeatRequest sent in the most recent request
static class LastSentFields {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this struct contains only one field, perhaps we could directly incorporate the assignment field into HeartbeatState?

topology.setEpoch(streamsRebalanceData.topologyEpoch());
data.setTopology(topology);
data.setRebalanceTimeoutMs(rebalanceTimeoutMs);
data.setProcessId(streamsRebalanceData.processId().toString());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, what is the rationale for using String instead of Uuid as the data type for processId? By contrast, SubscriptionInfoData uses Uuid

// Fields of StreamsGroupHeartbeatRequest sent in the most recent request
static class LastSentFields {

private StreamsRebalanceData.Assignment assignment = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can initialize it to Assignment.EMPTY to avoid NPE in the future?

Timer.class,
(mock, context) -> {
when(mock.isExpired()).thenReturn(true);
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

; is unnecessary

.setValue(entry.getValue())
)
.collect(Collectors.toList()));
data.setActiveTasks(convertTaskIdCollection(Set.of()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excuse me, what is the rationale for initializing these fields as empty lists, rather than leaving them as null?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants