Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 108 additions & 46 deletions core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicConfig, CreatableTopicConfigCollection}
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{CreateTopicsRequest, CreateTopicsResponse, RequestContext, RequestHeader}
import org.apache.kafka.common.requests.{AbstractResponse, CreateTopicsRequest, CreateTopicsResponse, EnvelopeResponse, RequestContext, RequestHeader}
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
Expand Down Expand Up @@ -198,6 +198,22 @@ class DefaultAutoTopicCreationManager(
.setTopics(topicsToCreate)
)

// Capture request header information for proper envelope response parsing
val requestHeaderForParsing = requestContext.map { context =>
val requestVersion =
channelManager.controllerApiVersions.toScala match {
case None =>
ApiKeys.CREATE_TOPICS.latestVersion()
case Some(nodeApiVersions) =>
nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS)
}

new RequestHeader(ApiKeys.CREATE_TOPICS,
requestVersion,
context.clientId,
context.correlationId)
}

val requestCompletionHandler = new ControllerRequestCompletionHandler {
override def onTimeout(): Unit = {
clearInflightRequests(creatableTopics)
Expand All @@ -213,6 +229,33 @@ class DefaultAutoTopicCreationManager(
} else {
if (response.hasResponse) {
response.responseBody() match {
case envelopeResponse: EnvelopeResponse =>
// Unwrap the envelope response to get the actual CreateTopicsResponse
val envelopeError = envelopeResponse.error()
if (envelopeError != Errors.NONE) {
warn(s"Auto topic creation failed for ${creatableTopics.keys} with envelope error: ${envelopeError}")
} else {
requestHeaderForParsing match {
case Some(requestHeader) =>
try {
// Use the captured request header for proper envelope response parsing
val createTopicsResponse = AbstractResponse.parseResponse(
envelopeResponse.responseData(), requestHeader).asInstanceOf[CreateTopicsResponse]

createTopicsResponse.data().topics().forEach(topicResult => {
val error = Errors.forCode(topicResult.errorCode)
if (error != Errors.NONE) {
warn(s"Auto topic creation failed for ${topicResult.name} with error '${error.name}': ${topicResult.errorMessage}")
}
})
} catch {
case e: Exception =>
warn(s"Failed to parse envelope response for auto topic creation of ${creatableTopics.keys}", e)
}
case None =>
warn(s"Cannot parse envelope response without original request header information")
}
}
case createTopicsResponse: CreateTopicsResponse =>
createTopicsResponse.data().topics().forEach(topicResult => {
val error = Errors.forCode(topicResult.errorCode)
Expand All @@ -229,26 +272,13 @@ class DefaultAutoTopicCreationManager(
}
}

val request = requestContext.map { context =>
val requestVersion =
channelManager.controllerApiVersions.toScala match {
case None =>
// We will rely on the Metadata request to be retried in the case
// that the latest version is not usable by the controller.
ApiKeys.CREATE_TOPICS.latestVersion()
case Some(nodeApiVersions) =>
nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS)
}

// Borrow client information such as client id and correlation id from the original request,
// in order to correlate the create request with the original metadata request.
val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
requestVersion,
context.clientId,
context.correlationId)
ForwardingManager.buildEnvelopeRequest(context,
createTopicsRequest.build(requestVersion).serializeWithHeader(requestHeader))
}.getOrElse(createTopicsRequest)
val request = (requestContext, requestHeaderForParsing) match {
case (Some(context), Some(requestHeader)) =>
ForwardingManager.buildEnvelopeRequest(context,
createTopicsRequest.build(requestHeader.apiVersion()).serializeWithHeader(requestHeader))
case _ =>
createTopicsRequest
}

channelManager.sendRequest(request, requestCompletionHandler)

Expand Down Expand Up @@ -364,6 +394,22 @@ class DefaultAutoTopicCreationManager(
.setTopics(topicsToCreate)
)

// Capture request header information for proper envelope response parsing
val requestHeaderForParsing = requestContext.map { context =>
val requestVersion =
channelManager.controllerApiVersions.toScala match {
case None =>
ApiKeys.CREATE_TOPICS.latestVersion()
case Some(nodeApiVersions) =>
nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS)
}

new RequestHeader(ApiKeys.CREATE_TOPICS,
requestVersion,
context.clientId,
context.correlationId)
}

val requestCompletionHandler = new ControllerRequestCompletionHandler {
override def onTimeout(): Unit = {
clearInflightRequests(creatableTopics)
Expand All @@ -382,36 +428,52 @@ class DefaultAutoTopicCreationManager(
warn(s"Auto topic creation failed for ${creatableTopics.keys} with version mismatch exception: ${versionException.getMessage}")
cacheTopicCreationErrors(creatableTopics.keys.toSet, versionException.getMessage, timeoutMs)
} else {
response.responseBody() match {
case createTopicsResponse: CreateTopicsResponse =>
cacheTopicCreationErrorsFromResponse(createTopicsResponse, timeoutMs)
case _ =>
debug(s"Auto topic creation completed for ${creatableTopics.keys} with response ${response.responseBody}.")
if (response.hasResponse) {
response.responseBody() match {
case envelopeResponse: EnvelopeResponse =>
// Unwrap the envelope response to get the actual CreateTopicsResponse
val envelopeError = envelopeResponse.error()
if (envelopeError != Errors.NONE) {
warn(s"Auto topic creation failed for ${creatableTopics.keys} with envelope error: ${envelopeError}")
cacheTopicCreationErrors(creatableTopics.keys.toSet, s"Envelope error: ${envelopeError}", timeoutMs)
} else {
requestHeaderForParsing match {
case Some(requestHeader) =>
try {
// Use the captured request header for proper envelope response parsing
val createTopicsResponse = AbstractResponse.parseResponse(
envelopeResponse.responseData(), requestHeader).asInstanceOf[CreateTopicsResponse]

cacheTopicCreationErrorsFromResponse(createTopicsResponse, timeoutMs)
} catch {
case e: Exception =>
warn(s"Failed to parse envelope response for auto topic creation of ${creatableTopics.keys}", e)
cacheTopicCreationErrors(creatableTopics.keys.toSet, s"Response parsing error: ${e.getMessage}", timeoutMs)
}
case None =>
warn(s"Cannot parse envelope response without original request header information")
cacheTopicCreationErrors(creatableTopics.keys.toSet, "Missing request header for envelope parsing", timeoutMs)
}
}
case createTopicsResponse: CreateTopicsResponse =>
cacheTopicCreationErrorsFromResponse(createTopicsResponse, timeoutMs)
case other =>
warn(s"Auto topic creation request received unexpected response type: ${other.getClass.getSimpleName}")
cacheTopicCreationErrors(creatableTopics.keys.toSet, s"Unexpected response type: ${other.getClass.getSimpleName}", timeoutMs)
}
}
debug(s"Auto topic creation completed for ${creatableTopics.keys} with response ${response.responseBody}.")
}
}
}

val request = requestContext.map { context =>
val requestVersion =
channelManager.controllerApiVersions.toScala match {
case None =>
// We will rely on the Metadata request to be retried in the case
// that the latest version is not usable by the controller.
ApiKeys.CREATE_TOPICS.latestVersion()
case Some(nodeApiVersions) =>
nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS)
}

// Borrow client information such as client id and correlation id from the original request,
// in order to correlate the create request with the original metadata request.
val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
requestVersion,
context.clientId,
context.correlationId)
ForwardingManager.buildEnvelopeRequest(context,
createTopicsRequest.build(requestVersion).serializeWithHeader(requestHeader))
}.getOrElse(createTopicsRequest)
val request = (requestContext, requestHeaderForParsing) match {
case (Some(context), Some(requestHeader)) =>
ForwardingManager.buildEnvelopeRequest(context,
createTopicsRequest.build(requestHeader.apiVersion()).serializeWithHeader(requestHeader))
case _ =>
createTopicsRequest
}

channelManager.sendRequest(request, requestCompletionHandler)

Expand Down
Loading