Skip to content

Commit

Permalink
KAFKA-17442: Handled persister errors with write async calls (KIP-932) (
Browse files Browse the repository at this point in the history
#16956)

The PR makes the persister write RPC async. Also handles the errors from persister as per the review comment here:
Addressing review comment for PR: #16397 (comment)

Reviewers: Andrew Schofield <[email protected]>, Abhinav Dixit <[email protected]>, Jun Rao <[email protected]>
  • Loading branch information
apoorvmittal10 authored Sep 1, 2024
1 parent b527691 commit 89418b6
Show file tree
Hide file tree
Showing 4 changed files with 440 additions and 223 deletions.
197 changes: 128 additions & 69 deletions core/src/main/java/kafka/server/share/SharePartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@
*/
package kafka.server.share;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.FencedStateEpochException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
Expand All @@ -36,7 +41,6 @@
import org.apache.kafka.server.group.share.ReadShareGroupStateResult;
import org.apache.kafka.server.group.share.TopicData;
import org.apache.kafka.server.group.share.WriteShareGroupStateParameters;
import org.apache.kafka.server.group.share.WriteShareGroupStateResult;
import org.apache.kafka.server.share.ShareAcknowledgementBatch;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.server.util.timer.TimerTask;
Expand Down Expand Up @@ -342,9 +346,8 @@ public long nextFetchOffset() {
* in-flight records and the next fetch offset is updated to the next offset that should be
* fetched from the leader.
*
* @param memberId The member id of the client that is fetching the record.
* @param memberId The member id of the client that is fetching the record.
* @param fetchPartitionData The fetched records for the share partition.
*
* @return A future which is completed when the records are acquired.
*/
public CompletableFuture<List<AcquiredRecords>> acquire(
Expand Down Expand Up @@ -463,21 +466,21 @@ public CompletableFuture<List<AcquiredRecords>> acquire(
* from the in-flight records once persisted. The next fetch offset is updated to the next offset
* that should be fetched from the leader, if required.
*
* @param memberId The member id of the client that is fetching the record.
* @param memberId The member id of the client that is fetching the record.
* @param acknowledgementBatches The acknowledgement batch list for the share partition.
*
* @return A future which is completed when the records are acknowledged.
*/
public CompletableFuture<Optional<Throwable>> acknowledge(
public CompletableFuture<Void> acknowledge(
String memberId,
List<ShareAcknowledgementBatch> acknowledgementBatches
) {
log.trace("Acknowledgement batch request for share partition: {}-{}", groupId, topicIdPartition);

CompletableFuture<Void> future = new CompletableFuture<>();
Throwable throwable = null;
lock.writeLock().lock();
List<InFlightState> updatedStates = new ArrayList<>();
List<PersisterStateBatch> stateBatches = new ArrayList<>();
lock.writeLock().lock();
try {
// Avoided using enhanced for loop as need to check if the last batch have offsets
// in the range.
Expand Down Expand Up @@ -528,30 +531,30 @@ public CompletableFuture<Optional<Throwable>> acknowledge(

// If the acknowledgement is successful then persist state, complete the state transition
// and update the cached state for start offset. Else rollback the state transition.
rollbackOrProcessStateUpdates(throwable, updatedStates, stateBatches);
rollbackOrProcessStateUpdates(future, throwable, updatedStates, stateBatches);
} finally {
lock.writeLock().unlock();
}

return CompletableFuture.completedFuture(Optional.ofNullable(throwable));
return future;
}

/**
* Release the acquired records for the share partition. The next fetch offset is updated to the next offset
* that should be fetched from the leader.
*
* @param memberId The member id of the client whose records shall be released.
*
* @return A future which is completed when the records are released.
*/
public CompletableFuture<Optional<Throwable>> releaseAcquiredRecords(String memberId) {
public CompletableFuture<Void> releaseAcquiredRecords(String memberId) {
log.trace("Release acquired records request for share partition: {}-{} memberId: {}", groupId, topicIdPartition, memberId);

CompletableFuture<Void> future = new CompletableFuture<>();
Throwable throwable = null;
lock.writeLock().lock();
List<InFlightState> updatedStates = new ArrayList<>();
List<PersisterStateBatch> stateBatches = new ArrayList<>();

lock.writeLock().lock();
try {
RecordState recordState = RecordState.AVAILABLE;
// Iterate over multiple fetched batches. The state can vary per offset entry
Expand Down Expand Up @@ -584,11 +587,11 @@ && checkForStartOffsetWithinBatch(inFlightBatch.firstOffset(), inFlightBatch.las

// If the release acquired records is successful then persist state, complete the state transition
// and update the cached state for start offset. Else rollback the state transition.
rollbackOrProcessStateUpdates(throwable, updatedStates, stateBatches);
rollbackOrProcessStateUpdates(future, throwable, updatedStates, stateBatches);
} finally {
lock.writeLock().unlock();
}
return CompletableFuture.completedFuture(Optional.ofNullable(throwable));
return future;
}

private Optional<Throwable> releaseAcquiredRecordsForPerOffsetBatch(String memberId,
Expand Down Expand Up @@ -880,14 +883,27 @@ private void initialize() {
}

TopicData<PartitionAllData> state = response.topicsData().get(0);
if (state.topicId() != topicIdPartition.topicId() || state.partitions().size() != 1
|| state.partitions().get(0).partition() != topicIdPartition.partition()) {
if (state.topicId() != topicIdPartition.topicId() || state.partitions().size() != 1) {
log.error("Failed to initialize the share partition: {}-{}. Invalid topic partition response: {}.",
groupId, topicIdPartition, response);
throw new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition));
}

PartitionAllData partitionData = state.partitions().get(0);
if (partitionData.partition() != topicIdPartition.partition()) {
log.error("Failed to initialize the share partition: {}-{}. Invalid partition response: {}.",
groupId, topicIdPartition, partitionData);
throw new IllegalStateException(String.format("Failed to initialize the share partition %s-%s",
groupId, topicIdPartition));
}

if (partitionData.errorCode() != Errors.NONE.code()) {
KafkaException ex = fetchPersisterError(partitionData.errorCode(), partitionData.errorMessage());
log.error("Failed to initialize the share partition: {}-{}. Exception occurred: {}.",
groupId, topicIdPartition, partitionData);
throw ex;
}

// Set the state epoch and end offset from the persisted state.
startOffset = partitionData.startOffset() != -1 ? partitionData.startOffset() : 0;
stateEpoch = partitionData.stateEpoch();
Expand Down Expand Up @@ -1338,21 +1354,36 @@ private Optional<Throwable> acknowledgeCompleteBatch(

// Visible for testing
void rollbackOrProcessStateUpdates(
CompletableFuture<Void> future,
Throwable throwable,
List<InFlightState> updatedStates,
List<PersisterStateBatch> stateBatches
) {
if (stateBatches.isEmpty() && updatedStates.isEmpty())
return;

lock.writeLock().lock();
try {
if (throwable != null || !isWriteShareGroupStateSuccessful(stateBatches)) {
if (throwable != null) {
// Log in DEBUG to avoid flooding of logs for a faulty client.
log.debug("Request failed for updating state, rollback any changed state"
+ " for the share partition: {}-{}", groupId, topicIdPartition);
updatedStates.forEach(state -> state.completeStateTransition(false));
} else {
future.completeExceptionally(throwable);
return;
}

if (stateBatches.isEmpty() && updatedStates.isEmpty()) {
future.complete(null);
return;
}

writeShareGroupState(stateBatches).whenComplete((result, exception) -> {
if (exception != null) {
log.error("Failed to write state to persister for the share partition: {}-{}",
groupId, topicIdPartition, exception);
updatedStates.forEach(state -> state.completeStateTransition(false));
future.completeExceptionally(exception);
return;
}

log.trace("State change request successful for share partition: {}-{}",
groupId, topicIdPartition);
updatedStates.forEach(state -> {
Expand All @@ -1362,7 +1393,8 @@ void rollbackOrProcessStateUpdates(
});
// Update the cached state and start and end offsets after acknowledging/releasing the acquired records.
maybeUpdateCachedStateAndOffsets();
}
future.complete(null);
});
} finally {
lock.writeLock().unlock();
}
Expand Down Expand Up @@ -1494,46 +1526,71 @@ private long findLastOffsetAcknowledged() {
}

// Visible for testing
boolean isWriteShareGroupStateSuccessful(List<PersisterStateBatch> stateBatches) {
WriteShareGroupStateResult response;
try {
response = persister.writeState(new WriteShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionStateBatchData>()
.setGroupId(this.groupId)
.setTopicsData(Collections.singletonList(new TopicData<>(topicIdPartition.topicId(),
Collections.singletonList(PartitionFactory.newPartitionStateBatchData(
topicIdPartition.partition(), stateEpoch, startOffset, 0, stateBatches))))
).build()).build()).get();
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to write the share group state for share partition: {}-{}", groupId, topicIdPartition, e);
throw new IllegalStateException(String.format("Failed to write the share group state for share partition %s-%s",
groupId, topicIdPartition), e);
}
CompletableFuture<Void> writeShareGroupState(List<PersisterStateBatch> stateBatches) {
CompletableFuture<Void> future = new CompletableFuture<>();
persister.writeState(new WriteShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionStateBatchData>()
.setGroupId(this.groupId)
.setTopicsData(Collections.singletonList(new TopicData<>(topicIdPartition.topicId(),
Collections.singletonList(PartitionFactory.newPartitionStateBatchData(
topicIdPartition.partition(), stateEpoch, startOffset, 0, stateBatches))))
).build()).build())
.whenComplete((result, exception) -> {
if (exception != null) {
log.error("Failed to write the share group state for share partition: {}-{}", groupId, topicIdPartition, exception);
future.completeExceptionally(new IllegalStateException(String.format("Failed to write the share group state for share partition %s-%s",
groupId, topicIdPartition), exception));
return;
}

if (response == null || response.topicsData() == null || response.topicsData().size() != 1) {
log.error("Failed to write the share group state for share partition: {}-{}. Invalid state found: {}",
groupId, topicIdPartition, response);
throw new IllegalStateException(String.format("Failed to write the share group state for share partition %s-%s",
groupId, topicIdPartition));
}
if (result == null || result.topicsData() == null || result.topicsData().size() != 1) {
log.error("Failed to write the share group state for share partition: {}-{}. Invalid state found: {}",
groupId, topicIdPartition, result);
future.completeExceptionally(new IllegalStateException(String.format("Failed to write the share group state for share partition %s-%s",
groupId, topicIdPartition)));
return;
}

TopicData<PartitionErrorData> state = response.topicsData().get(0);
if (state.topicId() != topicIdPartition.topicId() || state.partitions().size() != 1
|| state.partitions().get(0).partition() != topicIdPartition.partition()) {
log.error("Failed to write the share group state for share partition: {}-{}. Invalid topic partition response: {}",
groupId, topicIdPartition, response);
throw new IllegalStateException(String.format("Failed to write the share group state for share partition %s-%s",
groupId, topicIdPartition));
}
TopicData<PartitionErrorData> state = result.topicsData().get(0);
if (state.topicId() != topicIdPartition.topicId() || state.partitions().size() != 1
|| state.partitions().get(0).partition() != topicIdPartition.partition()) {
log.error("Failed to write the share group state for share partition: {}-{}. Invalid topic partition response: {}",
groupId, topicIdPartition, result);
future.completeExceptionally(new IllegalStateException(String.format("Failed to write the share group state for share partition %s-%s",
groupId, topicIdPartition)));
return;
}

PartitionErrorData partitionData = state.partitions().get(0);
if (partitionData.errorCode() != Errors.NONE.code()) {
Exception exception = Errors.forCode(partitionData.errorCode()).exception(partitionData.errorMessage());
log.error("Failed to write the share group state for share partition: {}-{} due to exception",
groupId, topicIdPartition, exception);
return false;
PartitionErrorData partitionData = state.partitions().get(0);
if (partitionData.errorCode() != Errors.NONE.code()) {
KafkaException ex = fetchPersisterError(partitionData.errorCode(), partitionData.errorMessage());
log.error("Failed to write the share group state for share partition: {}-{} due to exception",
groupId, topicIdPartition, ex);
future.completeExceptionally(ex);
return;
}
future.complete(null);
});
return future;
}

private KafkaException fetchPersisterError(short errorCode, String errorMessage) {
Errors error = Errors.forCode(errorCode);
switch (error) {
case NOT_COORDINATOR:
case COORDINATOR_NOT_AVAILABLE:
case COORDINATOR_LOAD_IN_PROGRESS:
return new CoordinatorNotAvailableException(errorMessage);
case GROUP_ID_NOT_FOUND:
case UNKNOWN_TOPIC_OR_PARTITION:
return new InvalidRequestException(errorMessage);
case FENCED_STATE_EPOCH:
return new FencedStateEpochException(errorMessage);
case FENCED_LEADER_EPOCH:
return new NotLeaderOrFollowerException(errorMessage);
default:
return new UnknownServerException(errorMessage);
}
return true;
}

private AcquisitionLockTimerTask scheduleAcquisitionLockTimeout(String memberId, long firstOffset, long lastOffset) {
Expand All @@ -1554,9 +1611,9 @@ private AcquisitionLockTimerTask scheduleAcquisitionLockTimeout(
long lastOffset,
long delayMs
) {
AcquisitionLockTimerTask acquistionLockTimerTask = acquisitionLockTimerTask(memberId, firstOffset, lastOffset, delayMs);
timer.add(acquistionLockTimerTask);
return acquistionLockTimerTask;
AcquisitionLockTimerTask acquisitionLockTimerTask = acquisitionLockTimerTask(memberId, firstOffset, lastOffset, delayMs);
timer.add(acquisitionLockTimerTask);
return acquisitionLockTimerTask;
}

private AcquisitionLockTimerTask acquisitionLockTimerTask(
Expand Down Expand Up @@ -1598,15 +1655,17 @@ && checkForStartOffsetWithinBatch(inFlightBatch.firstOffset(), inFlightBatch.las
}
}

if (!stateBatches.isEmpty() && !isWriteShareGroupStateSuccessful(stateBatches)) {

// Even if write share group state RPC call fails, we will still go ahead with the state transition.
log.error("Failed to write the share group state on acquisition lock timeout for share partition: {}-{} memberId: {}. " +
"Proceeding with state transition.", groupId, topicIdPartition, memberId);
if (!stateBatches.isEmpty()) {
writeShareGroupState(stateBatches).whenComplete((result, exception) -> {
if (exception != null) {
log.error("Failed to write the share group state on acquisition lock timeout for share partition: {}-{} memberId: {}",
groupId, topicIdPartition, memberId, exception);
}
// Even if write share group state RPC call fails, we will still go ahead with the state transition.
// Update the cached state and start and end offsets after releasing the acquisition lock on timeout.
maybeUpdateCachedStateAndOffsets();
});
}

// Update the cached state and start and end offsets after releasing the acquisition lock on timeout.
maybeUpdateCachedStateAndOffsets();
} finally {
lock.writeLock().unlock();
}
Expand Down
Loading

0 comments on commit 89418b6

Please sign in to comment.