Skip to content

Commit

Permalink
KAFKA-18272: Deprecated protocol api usage should be logged at info l…
Browse files Browse the repository at this point in the history
…evel (#18313)

This makes it possible to enable request logs for deprecated protocol api versions without enabling it for the rest. Combined with the ability to enable/disable dynamically, it makes it a bit easier to collect the information about deprecated clients that is not available via metrics.

This isn't particularly useful in trunk/4.0 since there are no deprecated api versions in these versions, but it will be useful for older branches. I intend to backport to those branches and add a release note in the backport regarding the change in behavior.

I manually verified that:
1. If the request logger is configured at `INFO` level, only deprecated protocol api versions are logged and they are logged at `INFO` level.
2. If the request logger is configured at `DEBUG` level, all requests are logged but the log level is `INFO` for deprecated protocol api versions and `DEBUG` for the rest.
3. If the request logger is configured at `WARN` level (the default), no requests are logged.

Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
ijuma committed Dec 27, 2024
1 parent 0a6028d commit 4dab5dd
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ public int size() {
return size;
}

public boolean isApiVersionDeprecated() {
return apiKey().isVersionDeprecated(apiVersion());
}

public ResponseHeader toResponseHeader() {
return new ResponseHeader(data.correlationId(), apiKey().responseHeaderVersion(apiVersion()));
}
Expand Down
20 changes: 15 additions & 5 deletions core/src/main/scala/kafka/network/RequestChannel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ object RequestChannel extends Logging {
private val ResponseQueueSizeMetric = "ResponseQueueSize"
val ProcessorMetricTag = "processor"

private def isRequestLoggingEnabled: Boolean = requestLogger.underlying.isDebugEnabled
/**
* Deprecated protocol apis are logged at info level while the rest are logged at debug level.
* That makes it possible to enable the former without enabling latter.
*/
private def isRequestLoggingEnabled(header: RequestHeader): Boolean = requestLogger.underlying.isDebugEnabled ||
(requestLogger.underlying.isInfoEnabled && header.isApiVersionDeprecated())

sealed trait BaseRequest
case object ShutdownRequest extends BaseRequest
Expand Down Expand Up @@ -84,7 +89,7 @@ object RequestChannel extends Logging {
// This is constructed on creation of a Request so that the JSON representation is computed before the request is
// processed by the api layer. Otherwise, a ProduceRequest can occur without its data (ie. it goes into purgatory).
val requestLog: Option[JsonNode] =
if (RequestChannel.isRequestLoggingEnabled) Some(RequestConvertToJson.request(loggableRequest))
if (RequestChannel.isRequestLoggingEnabled(context.header)) Some(RequestConvertToJson.request(loggableRequest))
else None

def header: RequestHeader = context.header
Expand Down Expand Up @@ -128,7 +133,7 @@ object RequestChannel extends Logging {
}

def responseNode(response: AbstractResponse): Option[JsonNode] = {
if (RequestChannel.isRequestLoggingEnabled)
if (RequestChannel.isRequestLoggingEnabled(context.header))
Some(RequestConvertToJson.response(response, context.apiVersion))
else
None
Expand Down Expand Up @@ -249,14 +254,19 @@ object RequestChannel extends Logging {
// the total time spent on authentication, which may be significant for SASL/SSL.
recordNetworkThreadTimeCallback.foreach(record => record.accept(networkThreadTimeNanos))

if (isRequestLoggingEnabled) {
if (isRequestLoggingEnabled(header)) {
val desc = RequestConvertToJson.requestDescMetrics(header, requestLog.toJava, response.responseLog.toJava,
context, session, isForwarded,
totalTimeMs, requestQueueTimeMs, apiLocalTimeMs,
apiRemoteTimeMs, apiThrottleTimeMs, responseQueueTimeMs,
responseSendTimeMs, temporaryMemoryBytes,
messageConversionsTimeMs)
requestLogger.debug("Completed request:" + desc.toString)
val logPrefix = "Completed request: {}"
// log deprecated apis at `info` level to allow them to be selectively enabled
if (header.isApiVersionDeprecated())
requestLogger.info(logPrefix, desc)
else
requestLogger.debug(logPrefix, desc)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ public static JsonNode requestHeaderNode(RequestHeader header) {
header.data(), header.headerVersion(), false
);
node.set("requestApiKeyName", new TextNode(header.apiKey().toString()));
if (header.apiKey().isVersionDeprecated(header.apiVersion())) {
if (header.isApiVersionDeprecated()) {
node.set("requestApiVersionDeprecated", BooleanNode.TRUE);
}
return node;
Expand Down

0 comments on commit 4dab5dd

Please sign in to comment.