Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18135: ShareConsumer HB UnsupportedVersion msg mixed with Consumer HB #18101

Merged
merged 8 commits into from
Dec 28, 2024

Conversation

peterxcli
Copy link
Contributor

@peterxcli peterxcli commented Dec 8, 2024

UnsupportedVersion error is handled in the parent AbstractHeartbeatRequestManager, so used by consumer and share consumer. If a share consumer gets the errors, it will end up with a msg that is currently specific to consumer

case UNSUPPORTED_VERSION:
message = "The cluster doesn't yet support the new consumer group protocol." +
" Set group.protocol=classic to revert to the classic protocol until the cluster is upgraded.";

Handle the UnsupportedVersion separately in the existing
handleSpecificError (note that the unsupported version for consumer may also end up containing a msg specific to SubscriptionPattern not supported in HB v0, if regex is used without the required v1)

  • add onSpecificFailure for consumerHB and shareHB
  • add testUnsupportedVersionGeneratedOnTheBroker to cover both handleSpecificError and handleSpecificFailure/exception

@@ -100,6 +100,9 @@ public abstract class AbstractHeartbeatRequestManager<R extends AbstractResponse
"group protocol. Set group.protocol=classic on the consumer configs to revert to the CLASSIC protocol " +
"until the cluster is upgraded.";

public static final String SHARE_PROTOCOL_NOT_SUPPORTED_MSG = "The cluster does not support the new SHARE " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would change it to "The cluster does not support the share group protocol. To use share groups, the cluster must have the share group protocol enabled." I think this message will remain valid over the next few releases without us needing to update it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I would expect this constant should go in the ShareMembershipMgr right?

@AndrewJSchofield
Copy link
Member

We should get @lianetm to make sure she's happy too.

Copy link
Contributor

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes! Took a first look, here are some comments

@@ -100,6 +100,9 @@ public abstract class AbstractHeartbeatRequestManager<R extends AbstractResponse
"group protocol. Set group.protocol=classic on the consumer configs to revert to the CLASSIC protocol " +
"until the cluster is upgraded.";

public static final String SHARE_PROTOCOL_NOT_SUPPORTED_MSG = "The cluster does not support the new SHARE " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I would expect this constant should go in the ShareMembershipMgr right?

Comment on lines 414 to 417
if (!handleSpecificError(response, currentTimeMs)) {
logger.error("{} failed due to {}: {}", heartbeatRequestName(), error, errorMessage);
handleFatalFailure(error.exception(errorMessage));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would be simpler to remove the UnsupportedVersion handling from here completely? In the end we're duplicating the logic we already have in the default case further down right? We would only need the logic in the Share/Consumer managers, simply because even if the error is not specific the handling is. Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I think the logic in default and UnsupportedVersion handling are the same. Thanks!

Comment on lines 105 to 111
if (errorMessage.contains(REGEX_RESOLUTION_NOT_SUPPORTED_MSG)) {
// If the error is about regex subscription, use the original error message
message = errorMessage;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect we don't need this here. This regex error is generated on the client when building the request (it generates a response with exception) so the handling lands on the onFailure path (not on the onResponse path that uses this handleSpeficifError).

I would expect we just need to have here the same handling we had for the UnsupportedVersion in the parent class before this PR:

logger.error("{} failed due to {}: {}", heartbeatRequestName(), error, errorMessage);
handleFatalFailure(error.exception(CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG));

That being said, we do need to ensure that we handle responses for each consumer, to cover this bit that is now on the abstract mgr (so it could wrongly apply a consumer msg to the share consumer):

if (isHBApiUnsupportedErrorMsg(exception)) {
// This is expected to be the case where building the request fails because the node does not support
// the API. Propagate custom message.
handleFatalFailure(new UnsupportedVersionException(CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG, exception));

Both consumers need to ensure they propagate their specific msgs when that exception is in a response.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I guess we should add a handle onSpeficFailure in each consumer just like handleSpecificError did?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The implementation of that onSpecificFailure/handleSpecificExceptionInResponse on each consumer should cover this bit I expect, with the correct msg:

if (isHBApiUnsupportedErrorMsg(exception)) {
// This is expected to be the case where building the request fails because the node does not support
// the API. Propagate custom message.
handleFatalFailure(new UnsupportedVersionException(CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG, exception));

And in the AbstractHBMgr onFailure probably just need something like:

    if (exception instanceof RetriableException) {
        ...
    } else if (!onSpecificFailure()) {
        handleFatalFailure(exception);
    }

We don't need any specifics for the regex case I would expect. It should land on the handleFatalFailure and propagate the msg it has (it will only ever be generated when building consumer HB, so no risk for the share consumer)

With that seems we cover the specifics of handling responses with exceptions (generated on the client) , and in a very similar way to how we handle responses with error codes (generated on the broker).

Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few more changes requested. I have been working with the code on my local environment, and I think there's still something not totally right. This PR isn't making it worse in any way, but I think there's another improvement we can make here which would improve it more.

@AndrewJSchofield
Copy link
Member

@peterxcli It seems to me that this doesn't quite work as well as it might. In my testing, connecting a share consumer to a broker which does not support SHARE_GROUP_HEARTBEAT does not emit the new message. What actually happens is that org.apache.kafka.common.errors.UnsupportedVersionException: The node does not support SHARE_GROUP_HEARTBEAT is thrown.

The consumer checks the ApiVersions response and notices that SHARE_GROUP_HEARTBEAT is not supported before it's even sent a request. As a result, in AbstractHeartbeatRequestManager.makeHeartbeatRequest, in the whenComplete call, the response is null. This means we end up in onFailure rather than onResponse, and this means the new code is never actually executed.

What do you think @lianetm? Am I missing something?

@AndrewJSchofield
Copy link
Member

@peterxcli Actually, I've realised that this is actually a temporary problem. In AK 4.0, SHARE_GROUP_HEARTBEAT is marked as an unstable API. If the broker doesn't support the RPC at all, then it will behave as I said. In AK 4.1, I expect we'll mark the API as stable, and then the consumer will send the request. Then the new message will be used if share groups are not enabled. So, I think this is as good as it can be and the approach in the PR is OK.

@lianetm
Copy link
Contributor

lianetm commented Dec 13, 2024

Just to answer @AndrewJSchofield's comment #18101 (comment) , you got it right, we basically have 2 paths to get the Unsupported:

  1. generated on the client when building the request if the broker does not support the API or version required => HB managers get a client response object with exception
  2. generated on the broker (API supported but not enabled) => HB managers get client response with ErrorCode

For the KafkaConsumer we can have the 2 paths now but Share will always go down 1 as you described until the API becomes stable.

Comment on lines 105 to 111
if (errorMessage.contains(REGEX_RESOLUTION_NOT_SUPPORTED_MSG)) {
// If the error is about regex subscription, use the original error message
message = errorMessage;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The implementation of that onSpecificFailure/handleSpecificExceptionInResponse on each consumer should cover this bit I expect, with the correct msg:

if (isHBApiUnsupportedErrorMsg(exception)) {
// This is expected to be the case where building the request fails because the node does not support
// the API. Propagate custom message.
handleFatalFailure(new UnsupportedVersionException(CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG, exception));

And in the AbstractHBMgr onFailure probably just need something like:

    if (exception instanceof RetriableException) {
        ...
    } else if (!onSpecificFailure()) {
        handleFatalFailure(exception);
    }

We don't need any specifics for the regex case I would expect. It should land on the handleFatalFailure and propagate the msg it has (it will only ever be generated when building consumer HB, so no risk for the share consumer)

With that seems we cover the specifics of handling responses with exceptions (generated on the client) , and in a very similar way to how we handle responses with error codes (generated on the broker).

@@ -422,6 +425,29 @@ public void testHeartbeatResponseOnErrorHandling(final Errors error, final boole
}
}

@ParameterizedTest
@ValueSource(strings = {SHARE_PROTOCOL_NOT_SUPPORTED_MSG})
public void testUnsupportedVersion(String errorMsg) {
Copy link
Contributor

@lianetm lianetm Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is truly testUnsupportedVersionGeneratedOnTheClient (client response with exception). Renaming will maybe clarify, and we should ensure we have coverage for something like testUnsupportedVersionGeneratedOnTheBroker (client response with ErrorCode), is it already covered?

The outcome of both tests should be the same, but they execute diff code paths modified in this PR (handleSpecificError and handleSpecificFailure/exception)

@AndrewJSchofield AndrewJSchofield added the KIP-932 Queues for Kafka label Dec 16, 2024
@github-actions github-actions bot added streams core Kafka Broker producer tools connect kraft mirror-maker-2 dependencies Pull requests that update a dependency file storage Pull requests that target the storage module tiered-storage Related to the Tiered Storage feature build Gradle build or GitHub Actions and removed small Small PRs labels Dec 18, 2024
@github-actions github-actions bot added the small Small PRs label Dec 18, 2024
@peterxcli
Copy link
Contributor Author

peterxcli commented Dec 20, 2024

Rebase CI hotfix #18284.

@peterxcli
Copy link
Contributor Author

Three unrelated tests failed.

UserQuotaTest testProducerConsumerOverrideLowerQuota(String, String).quorum=kraft.groupProtocol=classic org.opentest4j.AssertionFailedError: Client with id=QuotasTestProducer-1 should have been throttled, 0.0 ==> expected: but was: 6.92s
PlaintextAdminIntegrationTest testConsumerGroups(String, String).quorum=kraft.groupProtocol=consumer org.opentest4j.AssertionFailedError: Expected the offset for partition 0 to eventually become 1. 21.57s
UserClientIdQuotaTest testThrottledProducerConsumer(String, String).quorum=kraft.groupProtocol=classic org.opentest4j.AssertionFailedError: Client with id=QuotasTestProducer-!@#$%^&*() should have been throttled, 0.0 ==> expected: but was: 3.31s

Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of final comments. This is a nice improvement.

@@ -461,15 +434,24 @@ protected void handleFatalFailure(Throwable error) {
membershipManager().transitionToFatal();
}

/**
* Error handling specific failure to a group type when sending the request.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add "and no response has been received" to the comment. This is the crucial difference between the usage of handleSpecificFailure and handleSpecificExceptionInResponse.

String message = CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG;
if (errorMessage.equals(REGEX_RESOLUTION_NOT_SUPPORTED_MSG)) {
message = REGEX_RESOLUTION_NOT_SUPPORTED_MSG;
logger.error("{} failed due to regex resolution not support: {}", heartbeatRequestName(), message);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "regex resolution not supported".

…ove error handling

- Move SHARE_PROTOCOL_NOT_SUPPORTED_MSG constant to AbstractHeartbeatRequestManager
- Add test case for unsupported version error handling in ShareHeartbeatRequestManager
- Improve error message handling for regex resolution errors
- Split error handling into two distinct methods: handleSpecificFailure and
  handleSpecificExceptionInResponse
- Move UnsupportedVersion handling from onFailure to specific implementations
- Rename handleSpecificError to handleSpecificExceptionInResponse for clarity
- Improve error logging messages to better distinguish between client-side and
  broker-side version errors

This change makes the error handling more modular and clearer by separating
client-side request failures from broker response errors.
- Split UnsupportedVersion tests into client-side and broker-side cases
- Add separate test methods for each scenario in both Consumer and Share
  heartbeat managers
- Fix test helper methods to properly simulate client vs broker version errors
- Rename test methods to better describe test scenarios

This change improves test clarity by explicitly testing both client-side and
Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@AndrewJSchofield
Copy link
Member

Failing test is known as flaky and tracked by https://issues.apache.org/jira/browse/KAFKA-18298.

@AndrewJSchofield AndrewJSchofield merged commit be4d1a6 into apache:trunk Dec 28, 2024
8 of 9 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
build Gradle build or GitHub Actions ci-approved clients connect consumer core Kafka Broker dependencies Pull requests that update a dependency file KIP-932 Queues for Kafka kraft mirror-maker-2 producer storage Pull requests that target the storage module streams tiered-storage Related to the Tiered Storage feature tools
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants