Skip to content

Commit

Permalink
KAFKA-15859: Complete delayed RemoteListOffsets requests when replica…
Browse files Browse the repository at this point in the history
… moves away from broker (#17487)

Removed the ListOffsetsMetadata wrapper class. Complete the response when replica is not a leader.

Reviewers: Jun Rao <[email protected]>
  • Loading branch information
kamalcph authored Oct 16, 2024
1 parent 491395e commit 604564c
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 47 deletions.
2 changes: 1 addition & 1 deletion core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public class RemoteLogManager implements Closeable {

private volatile boolean remoteLogManagerConfigured = false;
private final Timer remoteReadTimer;
private DelayedOperationPurgatory<DelayedRemoteListOffsets> delayedRemoteListOffsetsPurgatory;
private volatile DelayedOperationPurgatory<DelayedRemoteListOffsets> delayedRemoteListOffsetsPurgatory;

/**
* Creates RemoteLogManager instance with the given arguments.
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1261,8 +1261,14 @@ class UnifiedLog(@volatile var logStartOffset: Long,
*
* @param targetTimestamp The given timestamp for offset fetching.
* @param remoteLogManager Optional RemoteLogManager instance if it exists.
* @return The offset of the first message whose timestamp is greater than or equals to the given timestamp.
* None if no such message is found.
* @return the offset-result holder
* <ul>
* <li>When the partition is not enabled with remote storage, then it contains offset of the first message
* whose timestamp is greater than or equals to the given timestamp; None if no such message is found.
* <li>When the partition is enabled with remote storage, then it contains the job/task future and gets
* completed in the async fashion.
* <li>All special timestamp offset results are returned immediately irrespective of the remote storage.
* </ul>
*/
@nowarn("cat=deprecation")
def fetchOffsetByTimestamp(targetTimestamp: Long, remoteLogManager: Option[RemoteLogManager] = None): OffsetResultHolder = {
Expand Down
43 changes: 16 additions & 27 deletions core/src/main/scala/kafka/server/DelayedRemoteListOffsets.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,26 @@
package kafka.server

import com.yammer.metrics.core.Meter
import kafka.log.AsyncOffsetReadFutureHolder
import kafka.utils.Pool
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.ApiException
import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse, ListOffsetsTopicResponse}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.requests.ListOffsetsResponse
import org.apache.kafka.server.metrics.KafkaMetricsGroup

import java.util.concurrent.TimeUnit
import scala.collection.{Map, mutable}
import scala.jdk.CollectionConverters._

case class ListOffsetsPartitionStatus(var responseOpt: Option[ListOffsetsPartitionResponse] = None,
futureHolderOpt: Option[AsyncOffsetReadFutureHolder[Either[Exception, Option[TimestampAndOffset]]]] = None,
lastFetchableOffset: Option[Long] = None,
maybeOffsetsError: Option[ApiException] = None) {
@volatile var completed = false

override def toString: String = {
s"[responseOpt: $responseOpt, lastFetchableOffset: $lastFetchableOffset, " +
s"maybeOffsetsError: $maybeOffsetsError, completed: $completed]"
}
}

case class ListOffsetsMetadata(statusByPartition: mutable.Map[TopicPartition, ListOffsetsPartitionStatus]) {

override def toString: String = {
s"ListOffsetsMetadata(statusByPartition=$statusByPartition)"
}
}

class DelayedRemoteListOffsets(delayMs: Long,
version: Int,
metadata: ListOffsetsMetadata,
statusByPartition: mutable.Map[TopicPartition, ListOffsetsPartitionStatus],
replicaManager: ReplicaManager,
responseCallback: List[ListOffsetsTopicResponse] => Unit) extends DelayedOperation(delayMs) {

// Mark the status as completed, if there is no async task to track.
// If there is a task to track, then build the response as REQUEST_TIMED_OUT by default.
metadata.statusByPartition.foreachEntry { (topicPartition, status) =>
statusByPartition.foreachEntry { (topicPartition, status) =>
status.completed = status.futureHolderOpt.isEmpty
if (status.futureHolderOpt.isDefined) {
status.responseOpt = Some(buildErrorResponse(Errors.REQUEST_TIMED_OUT, topicPartition.partition()))
Expand All @@ -69,7 +48,7 @@ class DelayedRemoteListOffsets(delayMs: Long,
* Call-back to execute when a delayed operation gets expired and hence forced to complete.
*/
override def onExpiration(): Unit = {
metadata.statusByPartition.foreachEntry { (topicPartition, status) =>
statusByPartition.foreachEntry { (topicPartition, status) =>
if (!status.completed) {
debug(s"Expiring list offset request for partition $topicPartition with status $status")
status.futureHolderOpt.foreach(futureHolder => futureHolder.jobFuture.cancel(true))
Expand All @@ -83,7 +62,7 @@ class DelayedRemoteListOffsets(delayMs: Long,
* in subclasses and will be called exactly once in forceComplete()
*/
override def onComplete(): Unit = {
val responseTopics = metadata.statusByPartition.groupBy(e => e._1.topic()).map {
val responseTopics = statusByPartition.groupBy(e => e._1.topic()).map {
case (topic, status) =>
new ListOffsetsTopicResponse().setName(topic).setPartitions(status.values.flatMap(s => s.responseOpt).toList.asJava)
}.toList
Expand All @@ -99,8 +78,18 @@ class DelayedRemoteListOffsets(delayMs: Long,
*/
override def tryComplete(): Boolean = {
var completable = true
metadata.statusByPartition.foreachEntry { (partition, status) =>
statusByPartition.foreachEntry { (partition, status) =>
if (!status.completed) {
try {
replicaManager.getPartitionOrException(partition)
} catch {
case e: ApiException =>
status.futureHolderOpt.foreach { futureHolder =>
futureHolder.jobFuture.cancel(false)
futureHolder.taskFuture.complete(Left(e))
}
}

status.futureHolderOpt.foreach { futureHolder =>
if (futureHolder.taskFuture.isDone) {
val response = futureHolder.taskFuture.get() match {
Expand Down
46 changes: 46 additions & 0 deletions core/src/main/scala/kafka/server/ListOffsetsPartitionStatus.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server

import kafka.log.AsyncOffsetReadFutureHolder
import org.apache.kafka.common.errors.ApiException
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset

class ListOffsetsPartitionStatus(val futureHolderOpt: Option[AsyncOffsetReadFutureHolder[Either[Exception, Option[TimestampAndOffset]]]],
val lastFetchableOffset: Option[Long],
val maybeOffsetsError: Option[ApiException]) {

@volatile var responseOpt: Option[ListOffsetsPartitionResponse] = None
@volatile var completed = false

override def toString: String = {
s"[responseOpt: $responseOpt, lastFetchableOffset: $lastFetchableOffset, " +
s"maybeOffsetsError: $maybeOffsetsError, completed: $completed]"
}
}

object ListOffsetsPartitionStatus {
def apply(responseOpt: Option[ListOffsetsPartitionResponse],
futureHolderOpt: Option[AsyncOffsetReadFutureHolder[Either[Exception, Option[TimestampAndOffset]]]] = None,
lastFetchableOffset: Option[Long] = None,
maybeOffsetsError: Option[ApiException] = None): ListOffsetsPartitionStatus = {
val status = new ListOffsetsPartitionStatus(futureHolderOpt, lastFetchableOffset, maybeOffsetsError)
status.responseOpt = responseOpt
status
}
}
11 changes: 6 additions & 5 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -463,11 +463,12 @@ class ReplicaManager(val config: KafkaConfig,
})
}

private def completeDelayedFetchOrProduceRequests(topicPartition: TopicPartition): Unit = {
private def completeDelayedOperationsWhenNotPartitionLeader(topicPartition: TopicPartition): Unit = {
val topicPartitionOperationKey = TopicPartitionOperationKey(topicPartition)
delayedProducePurgatory.checkAndComplete(topicPartitionOperationKey)
delayedFetchPurgatory.checkAndComplete(topicPartitionOperationKey)
delayedRemoteFetchPurgatory.checkAndComplete(topicPartitionOperationKey)
delayedRemoteListOffsetsPurgatory.checkAndComplete(topicPartitionOperationKey)
}

/**
Expand Down Expand Up @@ -625,7 +626,7 @@ class ReplicaManager(val config: KafkaConfig,
}
// If we were the leader, we may have some operations still waiting for completion.
// We force completion to prevent them from timing out.
completeDelayedFetchOrProduceRequests(topicPartition)
completeDelayedOperationsWhenNotPartitionLeader(topicPartition)
}

// Third delete the logs and checkpoint.
Expand Down Expand Up @@ -1556,7 +1557,7 @@ class ReplicaManager(val config: KafkaConfig,
if (delayedRemoteListOffsetsRequired(statusByPartition)) {
val delayMs: Long = if (timeoutMs > 0) timeoutMs else config.remoteLogManagerConfig.remoteListOffsetsRequestTimeoutMs()
// create delayed remote list offsets operation
val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version, ListOffsetsMetadata(statusByPartition), responseCallback)
val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version, statusByPartition, this, responseCallback)
// create a list of (topic, partition) pairs to use as keys for this delayed remote list offsets operation
val listOffsetsRequestKeys = statusByPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
// try to complete the request immediately, otherwise put it into the purgatory
Expand Down Expand Up @@ -2451,7 +2452,7 @@ class ReplicaManager(val config: KafkaConfig,
s"epoch $controllerEpoch with correlation id $correlationId for ${partitionsToMakeFollower.size} partitions")

partitionsToMakeFollower.foreach { partition =>
completeDelayedFetchOrProduceRequests(partition.topicPartition)
completeDelayedOperationsWhenNotPartitionLeader(partition.topicPartition)
}

if (isShuttingDown.get()) {
Expand Down Expand Up @@ -3028,7 +3029,7 @@ class ReplicaManager(val config: KafkaConfig,
replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets)
stateChangeLogger.info(s"Started fetchers as part of become-follower for ${partitionsToStartFetching.size} partitions")

partitionsToStartFetching.keySet.foreach(completeDelayedFetchOrProduceRequests)
partitionsToStartFetching.keySet.foreach(completeDelayedOperationsWhenNotPartitionLeader)

updateLeaderAndFollowerMetrics(followerTopicSet)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package kafka.server

import kafka.log.AsyncOffsetReadFutureHolder
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
Expand All @@ -37,6 +38,7 @@ class DelayedRemoteListOffsetsTest {

val delayMs = 10
val timer = new MockTimer()
val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
type T = Either[Exception, Option[TimestampAndOffset]]
val purgatory =
new DelayedOperationPurgatory[DelayedRemoteListOffsets]("test-purgatory", timer, purgeInterval = 10)
Expand Down Expand Up @@ -71,14 +73,14 @@ class DelayedRemoteListOffsetsTest {
true
})

val metadata = ListOffsetsMetadata(mutable.Map(
val statusByPartition = mutable.Map(
new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Some(holder)),
new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Some(holder)),
new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Some(holder))
))
)

val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, metadata, responseCallback)
val listOffsetsRequestKeys = metadata.statusByPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, statusByPartition, replicaManager, responseCallback)
val listOffsetsRequestKeys = statusByPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
assertEquals(0, DelayedRemoteListOffsetsMetrics.aggregateExpirationMeter.count())
assertEquals(0, DelayedRemoteListOffsetsMetrics.partitionExpirationMeters.size)
purgatory.tryCompleteElseWatch(delayedRemoteListOffsets, listOffsetsRequestKeys)
Expand Down Expand Up @@ -123,14 +125,14 @@ class DelayedRemoteListOffsetsTest {
true
})

val metadata = ListOffsetsMetadata(mutable.Map(
val statusByPartition = mutable.Map(
new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Some(holder)),
new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Some(holder)),
new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Some(holder))
))
)

val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, metadata, responseCallback)
val listOffsetsRequestKeys = metadata.statusByPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, statusByPartition, replicaManager, responseCallback)
val listOffsetsRequestKeys = statusByPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
purgatory.tryCompleteElseWatch(delayedRemoteListOffsets, listOffsetsRequestKeys)

assertEquals(0, cancelledCount)
Expand Down Expand Up @@ -179,17 +181,75 @@ class DelayedRemoteListOffsetsTest {
when(errorFutureHolder.taskFuture).thenAnswer(_ => errorTaskFuture)
when(errorFutureHolder.jobFuture).thenReturn(jobFuture)

val metadata = ListOffsetsMetadata(mutable.Map(
val statusByPartition = mutable.Map(
new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Some(holder)),
new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Some(holder)),
new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Some(errorFutureHolder))
))
)

val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, metadata, responseCallback)
val listOffsetsRequestKeys = metadata.statusByPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, statusByPartition, replicaManager, responseCallback)
val listOffsetsRequestKeys = statusByPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
purgatory.tryCompleteElseWatch(delayedRemoteListOffsets, listOffsetsRequestKeys)

assertEquals(0, cancelledCount)
assertEquals(listOffsetsRequestKeys.size, numResponse)
}

@Test
def testPartialResponseWhenNotLeaderOrFollowerExceptionOnOnePartition(): Unit = {
var numResponse = 0
val responseCallback = (response: List[ListOffsetsTopicResponse]) => {
response.foreach { topic =>
topic.partitions().forEach { partition =>
if (topic.name().equals("test1") && partition.partitionIndex() == 0) {
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code(), partition.errorCode())
assertEquals(ListOffsetsResponse.UNKNOWN_TIMESTAMP, partition.timestamp())
assertEquals(ListOffsetsResponse.UNKNOWN_OFFSET, partition.offset())
assertEquals(-1, partition.leaderEpoch())
} else {
assertEquals(Errors.NONE.code(), partition.errorCode())
assertEquals(100L, partition.timestamp())
assertEquals(100L, partition.offset())
assertEquals(50, partition.leaderEpoch())
}
numResponse += 1
}
}
}

val timestampAndOffset = new TimestampAndOffset(100L, 100L, Optional.of(50))
val taskFuture = new CompletableFuture[T]()
taskFuture.complete(Right(Some(timestampAndOffset)))

var cancelledCount = 0
val jobFuture = mock(classOf[CompletableFuture[Void]])
val holder: AsyncOffsetReadFutureHolder[T] = mock(classOf[AsyncOffsetReadFutureHolder[T]])
when(holder.taskFuture).thenAnswer(_ => taskFuture)
when(holder.jobFuture).thenReturn(jobFuture)
when(jobFuture.cancel(anyBoolean())).thenAnswer(_ => {
cancelledCount += 1
true
})

when(replicaManager.getPartitionOrException(new TopicPartition("test1", 0)))
.thenThrow(new NotLeaderOrFollowerException("Not leader or follower!"))
val errorFutureHolder: AsyncOffsetReadFutureHolder[T] = mock(classOf[AsyncOffsetReadFutureHolder[T]])
val errorTaskFuture = new CompletableFuture[T]()
when(errorFutureHolder.taskFuture).thenAnswer(_ => errorTaskFuture)
when(errorFutureHolder.jobFuture).thenReturn(jobFuture)

val statusByPartition = mutable.Map(
new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Some(holder)),
new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Some(holder)),
new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Some(errorFutureHolder)),
new TopicPartition("test1", 1) -> ListOffsetsPartitionStatus(None, Some(holder))
)

val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, statusByPartition, replicaManager, responseCallback)
val listOffsetsRequestKeys = statusByPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
purgatory.tryCompleteElseWatch(delayedRemoteListOffsets, listOffsetsRequestKeys)

assertEquals(1, cancelledCount)
assertEquals(listOffsetsRequestKeys.size, numResponse)
}
}

0 comments on commit 604564c

Please sign in to comment.