Skip to content

Commit

Permalink
KAFKA-17450 Reduced the handlers for handling ShareAcknowledgeResponse (
Browse files Browse the repository at this point in the history
#17061)

Currently there are 4 handler functions present for handling ShareAcknowledge responses. ShareConsumeRequestManager had an interface and the respective handlers would implement it. Instead of having 4 different handlers for this, now using AcknowledgeRequestType, we could merge the code and have only 2 handler functions, one for ShareAcknowledge success and one for ShareAcknowledge failure, eliminating the need for the interface.

This PR also fixes a bug - We were not using the time at which the response was received while handling the ShareAcknowledge response, we were using an outdated time. Now the bug is fixed.

Reviewers: Manikumar Reddy <[email protected]>, Andrew Schofield <[email protected]>, Chia-Ping Tsai <[email protected]>
  • Loading branch information
ShivsundarR authored Sep 3, 2024
1 parent 359ddce commit 743e185
Showing 1 changed file with 84 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ private Optional<UnsentRequest> maybeBuildRequest(AcknowledgeRequestState acknow
return Optional.empty();
}

UnsentRequest request = acknowledgeRequestState.buildRequest(currentTimeMs);
UnsentRequest request = acknowledgeRequestState.buildRequest();
if (request == null) {
asyncDone = false;
return Optional.empty();
Expand Down Expand Up @@ -371,8 +371,6 @@ public CompletableFuture<Map<TopicIdPartition, Acknowledgements>> commitSync(
sessionHandler,
nodeId,
acknowledgementsMapForNode,
this::handleShareAcknowledgeSuccess,
this::handleShareAcknowledgeFailure,
resultHandler,
AcknowledgeRequestType.COMMIT_SYNC
));
Expand Down Expand Up @@ -417,8 +415,6 @@ public void commitAsync(final Map<TopicIdPartition, Acknowledgements> acknowledg
sessionHandler,
nodeId,
acknowledgementsMapForNode,
this::handleShareAcknowledgeSuccess,
this::handleShareAcknowledgeFailure,
resultHandler,
AcknowledgeRequestType.COMMIT_ASYNC
));
Expand Down Expand Up @@ -492,8 +488,6 @@ public CompletableFuture<Void> acknowledgeOnClose(final Map<TopicIdPartition, Ac
sessionHandler,
nodeId,
acknowledgementsMapForNode,
this::handleShareAcknowledgeCloseSuccess,
this::handleShareAcknowledgeCloseFailure,
resultHandler,
AcknowledgeRequestType.CLOSE
));
Expand Down Expand Up @@ -613,74 +607,100 @@ private void handleShareAcknowledgeSuccess(Node fetchTarget,
ShareAcknowledgeRequestData requestData,
AcknowledgeRequestState acknowledgeRequestState,
ClientResponse resp,
long currentTimeMs) {
long responseCompletionTimeMs) {
try {
log.debug("Completed ShareAcknowledge request from node {} successfully", fetchTarget.id());
final ShareAcknowledgeResponse response = (ShareAcknowledgeResponse) resp.responseBody();
final ShareSessionHandler handler = acknowledgeRequestState.sessionHandler();
ShareAcknowledgeResponse response = (ShareAcknowledgeResponse) resp.responseBody();

if (acknowledgeRequestState.onClose()) {
response.data().responses().forEach(topic -> topic.partitions().forEach(partition -> {
TopicIdPartition tip = new TopicIdPartition(topic.topicId(),
partition.partitionIndex(),
metadata.topicNames().get(topic.topicId()));
if (partition.errorCode() != Errors.NONE.code()) {
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
}
acknowledgeRequestState.handleAcknowledgeErrorCode(tip, Errors.forCode(partition.errorCode()));
}));

final short requestVersion = resp.requestHeader().apiVersion();
acknowledgeRequestState.onSuccessfulAttempt(responseCompletionTimeMs);
acknowledgeRequestState.processingComplete();

if (!handler.handleResponse(response, requestVersion)) {
acknowledgeRequestState.onFailedAttempt(currentTimeMs);
if (response.error().exception() instanceof RetriableException && !acknowledgeRequestState.onClose()) {
// We retry the request until the timer expires, unless we are closing.
acknowledgeRequestState.retryRequest();
if (!closeFuture.isDone()) {
closeFuture.complete(null);
}

metricsManager.recordLatency(resp.requestLatencyMs());
} else {
if (!acknowledgeRequestState.sessionHandler.handleResponse(response, resp.requestHeader().apiVersion())) {
// Received a response-level error code.
acknowledgeRequestState.onFailedAttempt(responseCompletionTimeMs);

if (response.error().exception() instanceof RetriableException) {
// We retry the request until the timer expires, unless we are closing.
acknowledgeRequestState.moveAllToIncompleteAcks();
} else {
response.data().responses().forEach(shareAcknowledgeTopicResponse -> shareAcknowledgeTopicResponse.partitions().forEach(partitionData -> {
TopicIdPartition tip = new TopicIdPartition(shareAcknowledgeTopicResponse.topicId(),
partitionData.partitionIndex(),
metadata.topicNames().get(shareAcknowledgeTopicResponse.topicId()));

acknowledgeRequestState.handleAcknowledgeErrorCode(tip, response.error());
metricsManager.recordLatency(resp.requestLatencyMs());
}));
acknowledgeRequestState.processingComplete();
}
} else {
AtomicBoolean shouldRetry = new AtomicBoolean(false);
// Check all partition level error codes
response.data().responses().forEach(shareAcknowledgeTopicResponse -> shareAcknowledgeTopicResponse.partitions().forEach(partitionData -> {
Errors partitionError = Errors.forCode(partitionData.errorCode());
TopicIdPartition tip = new TopicIdPartition(shareAcknowledgeTopicResponse.topicId(),
partitionData.partitionIndex(),
metadata.topicNames().get(shareAcknowledgeTopicResponse.topicId()));

acknowledgeRequestState.handleAcknowledgeErrorCode(tip, response.error());
metricsManager.recordLatency(resp.requestLatencyMs());
}));
}
} else {
AtomicBoolean shouldRetry = new AtomicBoolean(false);
// Check all partition level error codes
response.data().responses().forEach(shareAcknowledgeTopicResponse -> shareAcknowledgeTopicResponse.partitions().forEach(partitionData -> {
Errors partitionError = Errors.forCode(partitionData.errorCode());
TopicIdPartition tip = new TopicIdPartition(shareAcknowledgeTopicResponse.topicId(),
partitionData.partitionIndex(),
metadata.topicNames().get(shareAcknowledgeTopicResponse.topicId()));
if (partitionError.exception() != null) {
if (partitionError.exception() instanceof RetriableException && !acknowledgeRequestState.onClose()) {
// Move to incomplete acknowledgements to retry
acknowledgeRequestState.moveToIncompleteAcks(tip);
shouldRetry.set(true);
if (partitionError.exception() != null) {
if (partitionError.exception() instanceof RetriableException) {
// Move to incomplete acknowledgements to retry
acknowledgeRequestState.moveToIncompleteAcks(tip);
shouldRetry.set(true);
} else {
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
acknowledgeRequestState.handleAcknowledgeErrorCode(tip, partitionError);
}
} else {
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
acknowledgeRequestState.handleAcknowledgeErrorCode(tip, partitionError);
}
}));

if (shouldRetry.get()) {
acknowledgeRequestState.onFailedAttempt(responseCompletionTimeMs);
} else {
acknowledgeRequestState.handleAcknowledgeErrorCode(tip, partitionError);
acknowledgeRequestState.onSuccessfulAttempt(responseCompletionTimeMs);
}
}));

if (shouldRetry.get()) {
acknowledgeRequestState.onFailedAttempt(currentTimeMs);
} else {
acknowledgeRequestState.onSuccessfulAttempt(currentTimeMs);
acknowledgeRequestState.processingComplete();
metricsManager.recordLatency(resp.requestLatencyMs());
}
acknowledgeRequestState.processingComplete();
}
metricsManager.recordLatency(resp.requestLatencyMs());
} finally {
log.debug("Removing pending request for node {} - success", fetchTarget.id());
nodesWithPendingRequests.remove(fetchTarget.id());

if (acknowledgeRequestState.onClose()) {
log.debug("Removing node from ShareSession {}", fetchTarget.id());
sessionHandlers.remove(fetchTarget.id());
}
}
}

private void handleShareAcknowledgeFailure(Node fetchTarget,
ShareAcknowledgeRequestData requestData,
AcknowledgeRequestState acknowledgeRequestState,
Throwable error,
long currentTimeMs) {
long responseCompletionTimeMs) {
try {
log.debug("Completed ShareAcknowledge request from node {} unsuccessfully {}", fetchTarget.id(), Errors.forException(error));
acknowledgeRequestState.sessionHandler().handleError(error);
acknowledgeRequestState.onFailedAttempt(currentTimeMs);
acknowledgeRequestState.onFailedAttempt(responseCompletionTimeMs);

requestData.topics().forEach(topic -> topic.partitions().forEach(partition -> {
TopicIdPartition tip = new TopicIdPartition(topic.topicId(),
Expand All @@ -689,62 +709,16 @@ private void handleShareAcknowledgeFailure(Node fetchTarget,
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
acknowledgeRequestState.handleAcknowledgeErrorCode(tip, Errors.forException(error));
}));
} finally {
log.debug("Removing pending request for node {} - failed", fetchTarget.id());
nodesWithPendingRequests.remove(fetchTarget.id());
}
}

private void handleShareAcknowledgeCloseSuccess(Node fetchTarget,
ShareAcknowledgeRequestData requestData,
AcknowledgeRequestState acknowledgeRequestState,
ClientResponse resp,
long currentTimeMs) {
try {
log.debug("Completed ShareAcknowledge on close request from node {} successfully", fetchTarget.id());
final ShareAcknowledgeResponse response = (ShareAcknowledgeResponse) resp.responseBody();

response.data().responses().forEach(topic -> topic.partitions().forEach(partition -> {
TopicIdPartition tip = new TopicIdPartition(topic.topicId(),
partition.partitionIndex(),
metadata.topicNames().get(topic.topicId()));
if (partition.errorCode() != Errors.NONE.code()) {
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
}
acknowledgeRequestState.handleAcknowledgeErrorCode(tip, Errors.forCode(partition.errorCode()));
}));

acknowledgeRequestState.onSuccessfulAttempt(currentTimeMs);
metricsManager.recordLatency(resp.requestLatencyMs());
acknowledgeRequestState.processingComplete();
} finally {
log.debug("Removing pending request for node {} - success", fetchTarget.id());
nodesWithPendingRequests.remove(fetchTarget.id());
sessionHandlers.remove(fetchTarget.id());
}
}

private void handleShareAcknowledgeCloseFailure(Node fetchTarget,
ShareAcknowledgeRequestData requestData,
AcknowledgeRequestState acknowledgeRequestState,
Throwable error,
long currentTimeMs) {
try {
log.debug("Completed ShareAcknowledge on close request from node {} unsuccessfully {}", fetchTarget.id(), Errors.forException(error));
acknowledgeRequestState.sessionHandler().handleError(error);
acknowledgeRequestState.onFailedAttempt(currentTimeMs);

requestData.topics().forEach(topic -> topic.partitions().forEach(partition -> {
TopicIdPartition tip = new TopicIdPartition(topic.topicId(),
partition.partitionIndex(),
metadata.topicNames().get(topic.topicId()));
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
acknowledgeRequestState.handleAcknowledgeErrorCode(tip, Errors.forException(error));
}));
} finally {
log.debug("Removing pending request for node {} - failed", fetchTarget.id());
nodesWithPendingRequests.remove(fetchTarget.id());
sessionHandlers.remove(fetchTarget.id());

if (acknowledgeRequestState.onClose()) {
log.debug("Removing node from ShareSession {}", fetchTarget.id());
sessionHandlers.remove(fetchTarget.id());
}
}
}

Expand Down Expand Up @@ -803,16 +777,6 @@ public class AcknowledgeRequestState extends TimedRequestState {
*/
private final Map<TopicIdPartition, Acknowledgements> inFlightAcknowledgements;

/**
* The handler to call on a successful response from ShareAcknowledge.
*/
private final ResponseHandler<ClientResponse> successHandler;

/**
* The handler to call on a failed response from ShareAcknowledge.
*/
private final ResponseHandler<Throwable> errorHandler;

/**
* This handles completing a future when all results are known.
*/
Expand All @@ -831,23 +795,19 @@ public class AcknowledgeRequestState extends TimedRequestState {
ShareSessionHandler sessionHandler,
int nodeId,
Map<TopicIdPartition, Acknowledgements> acknowledgementsMap,
ResponseHandler<ClientResponse> successHandler,
ResponseHandler<Throwable> errorHandler,
ResultHandler resultHandler,
AcknowledgeRequestType acknowledgeRequestType) {
super(logContext, owner, retryBackoffMs, retryBackoffMaxMs, deadlineTimer(time, deadlineMs));
this.sessionHandler = sessionHandler;
this.nodeId = nodeId;
this.successHandler = successHandler;
this.errorHandler = errorHandler;
this.acknowledgementsToSend = acknowledgementsMap;
this.resultHandler = resultHandler;
this.inFlightAcknowledgements = new HashMap<>();
this.incompleteAcknowledgements = new HashMap<>();
this.requestType = acknowledgeRequestType;
}

UnsentRequest buildRequest(long currentTimeMs) {
UnsentRequest buildRequest() {
// If this is the closing request, close the share session by setting the final epoch
if (onClose()) {
sessionHandler.notifyClose();
Expand All @@ -868,13 +828,9 @@ UnsentRequest buildRequest(long currentTimeMs) {

BiConsumer<ClientResponse, Throwable> responseHandler = (clientResponse, error) -> {
if (error != null) {
errorHandler.handle(nodeToSend, requestBuilder.data(), this, error, currentTimeMs);
processingComplete();
handleShareAcknowledgeFailure(nodeToSend, requestBuilder.data(), this, error, clientResponse.receivedTimeMs());
} else {
successHandler.handle(nodeToSend, requestBuilder.data(), this, clientResponse, currentTimeMs);
if (onClose() && !closeFuture.isDone()) {
closeFuture.complete(null);
}
handleShareAcknowledgeSuccess(nodeToSend, requestBuilder.data(), this, clientResponse, clientResponse.receivedTimeMs());
}
};

Expand Down Expand Up @@ -973,7 +929,11 @@ void processingComplete() {
resultHandler.completeIfEmpty();
}

void retryRequest() {
/**
* Moves all the in-flight acknowledgements to incomplete acknowledgements to retry
* in the next request.
*/
void moveAllToIncompleteAcks() {
incompleteAcknowledgements.putAll(inFlightAcknowledgements);
inFlightAcknowledgements.clear();
}
Expand All @@ -982,13 +942,14 @@ boolean maybeExpire() {
return numAttempts > 0 && isExpired();
}

/**
* Moves the in-flight acknowledgements for a given partition to incomplete acknowledgements to retry
* in the next request.
*/
public void moveToIncompleteAcks(TopicIdPartition tip) {
Acknowledgements acks = inFlightAcknowledgements.remove(tip);
if (acks != null) {
Acknowledgements existingAcks = incompleteAcknowledgements.putIfAbsent(tip, acks);
if (existingAcks != null) {
incompleteAcknowledgements.get(tip).merge(acks);
}
incompleteAcknowledgements.put(tip, acks);
}
}

Expand All @@ -1001,18 +962,6 @@ public boolean onCommitAsync() {
}
}

/**
* Defines the contract for handling responses from brokers.
* @param <T> Type of response, usually either {@link ClientResponse} or {@link Throwable}
*/
@FunctionalInterface
private interface ResponseHandler<T> {
/**
* Handle the response from the given {@link Node target}
*/
void handle(Node target, ShareAcknowledgeRequestData request, AcknowledgeRequestState requestState, T response, long currentTimeMs);
}

/**
* Sends a ShareAcknowledgeCommitCallback event to the application when it is done
* processing all the remaining acknowledgement request states.
Expand Down

0 comments on commit 743e185

Please sign in to comment.