Skip to content

Commit

Permalink
KAFKA-18339: Remove raw unversioned direct SASL protocol (KIP-896) (#…
Browse files Browse the repository at this point in the history
…18295)

Clients that support SASL but don't implement KIP-43 (eg Kafka producer/consumer 0.9.0.x) will
fail to connect after this change.

Added unit tests and also manually tested with the console producer 0.9.0.

While testing, I noticed that the logged message when a 0.9.0 Java client is used without sasl is
slightly misleading - fixed that too.

Reviewers: Manikumar Reddy <[email protected]>
  • Loading branch information
ijuma committed Dec 27, 2024
1 parent 4dab5dd commit 4d8ee92
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public class SaslServerAuthenticator implements Authenticator {
* state and likewise ends at either {@link #COMPLETE} or {@link #FAILED}.
*/
private enum SaslState {
INITIAL_REQUEST, // May be GSSAPI token, SaslHandshake or ApiVersions for authentication
INITIAL_REQUEST, // May be SaslHandshake or ApiVersions for authentication
HANDSHAKE_OR_VERSIONS_REQUEST, // May be SaslHandshake or ApiVersions
HANDSHAKE_REQUEST, // After an ApiVersions request, next request must be SaslHandshake
AUTHENTICATE, // Authentication tokens (SaslHandshake v1 and above indicate SaslAuthenticate headers)
Expand Down Expand Up @@ -277,15 +277,11 @@ public void authenticate() throws IOException {
case REAUTH_PROCESS_HANDSHAKE:
case HANDSHAKE_OR_VERSIONS_REQUEST:
case HANDSHAKE_REQUEST:
case INITIAL_REQUEST:
handleKafkaRequest(clientToken);
break;
case REAUTH_BAD_MECHANISM:
throw new SaslAuthenticationException(reauthInfo.badMechanismErrorMessage);
case INITIAL_REQUEST:
if (handleKafkaRequest(clientToken))
break;
// For default GSSAPI, fall through to authenticate using the client token as the first GSSAPI packet.
// This is required for interoperability with 0.9.0.x clients which do not send handshake request
case AUTHENTICATE:
handleSaslToken(clientToken);
// When the authentication exchange is complete and no more tokens are expected from the client,
Expand Down Expand Up @@ -503,63 +499,51 @@ private void handleSaslToken(byte[] clientToken) throws IOException {
}
}

private boolean handleKafkaRequest(byte[] requestBytes) throws IOException, AuthenticationException {
boolean isKafkaRequest = false;
String clientMechanism = null;
/**
* @throws InvalidRequestException if the request is not in Kafka format or if the API key is invalid. Clients
* that support SASL without support for KIP-43 (e.g. Kafka Clients 0.9.x) are in the former bucket - the first
* packet such clients send is a GSSAPI token starting with 0x60.
*/
private void handleKafkaRequest(byte[] requestBytes) throws IOException, AuthenticationException {
try {
ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
RequestHeader header = RequestHeader.parse(requestBuffer);
ApiKeys apiKey = header.apiKey();

// A valid Kafka request header was received. SASL authentication tokens are now expected only
// following a SaslHandshakeRequest since this is not a GSSAPI client token from a Kafka 0.9.0.x client.
if (saslState == SaslState.INITIAL_REQUEST)
setSaslState(SaslState.HANDSHAKE_OR_VERSIONS_REQUEST);
isKafkaRequest = true;

// Raise an error prior to parsing if the api cannot be handled at this layer. This avoids
// unnecessary exposure to some of the more complex schema types.
if (apiKey != ApiKeys.API_VERSIONS && apiKey != ApiKeys.SASL_HANDSHAKE)
throw new IllegalSaslStateException("Unexpected Kafka request of type " + apiKey + " during SASL handshake.");
throw new InvalidRequestException("Unexpected Kafka request of type " + apiKey + " during SASL handshake.");

LOG.debug("Handling Kafka request {} during {}", apiKey, reauthInfo.authenticationOrReauthenticationText());


RequestContext requestContext = new RequestContext(header, connectionId, clientAddress(), Optional.of(clientPort()),
KafkaPrincipal.ANONYMOUS, listenerName, securityProtocol, ClientInformation.EMPTY, false);
RequestAndSize requestAndSize = requestContext.parseRequest(requestBuffer);

// A valid Kafka request was received, we can now update the sasl state
if (saslState == SaslState.INITIAL_REQUEST)
setSaslState(SaslState.HANDSHAKE_OR_VERSIONS_REQUEST);

if (apiKey == ApiKeys.API_VERSIONS)
handleApiVersionsRequest(requestContext, (ApiVersionsRequest) requestAndSize.request);
else
clientMechanism = handleHandshakeRequest(requestContext, (SaslHandshakeRequest) requestAndSize.request);
else {
String clientMechanism = handleHandshakeRequest(requestContext, (SaslHandshakeRequest) requestAndSize.request);
if (!reauthInfo.reauthenticating() || reauthInfo.saslMechanismUnchanged(clientMechanism)) {
createSaslServer(clientMechanism);
setSaslState(SaslState.AUTHENTICATE);
}
}
} catch (InvalidRequestException e) {
if (saslState == SaslState.INITIAL_REQUEST) {
// InvalidRequestException is thrown if the request is not in Kafka format or if the API key
// is invalid. For compatibility with 0.9.0.x where the first packet is a GSSAPI token
// starting with 0x60, revert to GSSAPI for both these exceptions.
if (LOG.isDebugEnabled()) {
StringBuilder tokenBuilder = new StringBuilder();
for (byte b : requestBytes) {
tokenBuilder.append(String.format("%02x", b));
if (tokenBuilder.length() >= 20)
break;
}
LOG.debug("Received client packet of length {} starting with bytes 0x{}, process as GSSAPI packet", requestBytes.length, tokenBuilder);
}
if (enabledMechanisms.contains(SaslConfigs.GSSAPI_MECHANISM)) {
LOG.debug("First client packet is not a SASL mechanism request, using default mechanism GSSAPI");
clientMechanism = SaslConfigs.GSSAPI_MECHANISM;
} else
throw new UnsupportedSaslMechanismException("Exception handling first SASL packet from client, GSSAPI is not supported by server", e);
} else
throw e;
}
if (clientMechanism != null && (!reauthInfo.reauthenticating()
|| reauthInfo.saslMechanismUnchanged(clientMechanism))) {
createSaslServer(clientMechanism);
setSaslState(SaslState.AUTHENTICATE);
// InvalidRequestException is thrown if the request is not in Kafka format or if the API key is invalid.
// If it's the initial request, this could be an ancient client (see method documentation for more details),
// a client configured with the wrong security protocol or a non kafka-client altogether (eg http client).
throw new InvalidRequestException("Invalid request, potential reasons: kafka client configured with the " +
"wrong security protocol, it does not support KIP-43 or it is not a kafka client.", e);
}
throw e;
}
return isKafkaRequest;
}

private String handleHandshakeRequest(RequestContext context, SaslHandshakeRequest handshakeRequest) throws IOException, UnsupportedSaslMechanismException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
package org.apache.kafka.common.security.authenticator;

import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.errors.IllegalSaslStateException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.RequestHeaderData;
import org.apache.kafka.common.message.SaslAuthenticateRequestData;
import org.apache.kafka.common.message.SaslHandshakeRequestData;
import org.apache.kafka.common.network.ChannelBuilders;
Expand Down Expand Up @@ -63,6 +64,7 @@
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -77,7 +79,6 @@
import static org.apache.kafka.common.security.scram.internals.ScramMechanism.SCRAM_SHA_256;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.eq;
Expand Down Expand Up @@ -107,7 +108,7 @@ public void testOversizeRequest() throws IOException {
}

@Test
public void testUnexpectedRequestType() throws IOException {
public void testUnexpectedRequestTypeWithValidRequestHeader() throws IOException {
TransportLayer transportLayer = mock(TransportLayer.class);
Map<String, ?> configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
Collections.singletonList(SCRAM_SHA_256.mechanismName()));
Expand All @@ -126,13 +127,35 @@ public void testUnexpectedRequestType() throws IOException {
return headerBuffer.remaining();
});

try {
authenticator.authenticate();
fail("Expected authenticate() to raise an exception");
} catch (IllegalSaslStateException e) {
// expected exception
}
assertThrows(InvalidRequestException.class, () -> authenticator.authenticate());
verify(transportLayer, times(2)).read(any(ByteBuffer.class));
}

@Test
public void testInvalidRequestHeader() throws IOException {
TransportLayer transportLayer = mock(TransportLayer.class);
Map<String, ?> configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
Collections.singletonList(SCRAM_SHA_256.mechanismName()));
SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer,
SCRAM_SHA_256.mechanismName(), new DefaultChannelMetadataRegistry());

short invalidApiKeyId = (short) (Arrays.stream(ApiKeys.values()).mapToInt(k -> k.id).max().getAsInt() + 1);
ByteBuffer headerBuffer = RequestTestUtils.serializeRequestHeader(new RequestHeader(
new RequestHeaderData()
.setRequestApiKey(invalidApiKeyId)
.setRequestApiVersion((short) 0),
(short) 2));

when(transportLayer.read(any(ByteBuffer.class))).then(invocation -> {
invocation.<ByteBuffer>getArgument(0).putInt(headerBuffer.remaining());
return 4;
}).then(invocation -> {
// serialize only the request header. the authenticator should not parse beyond this
invocation.<ByteBuffer>getArgument(0).put(headerBuffer.duplicate());
return headerBuffer.remaining();
});

assertThrows(InvalidRequestException.class, () -> authenticator.authenticate());
verify(transportLayer, times(2)).read(any(ByteBuffer.class));
}

Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import kafka.server.{ApiVersionManager, BrokerReconfigurable, KafkaConfig}
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import kafka.utils._
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.errors.InvalidRequestException
import org.apache.kafka.common.errors.{InvalidRequestException, UnsupportedVersionException}
import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool}
import org.apache.kafka.common.metrics._
import org.apache.kafka.common.metrics.stats.{Avg, CumulativeSum, Meter, Rate}
Expand Down Expand Up @@ -1107,8 +1107,10 @@ private[kafka] class Processor(
val header = RequestHeader.parse(buffer)
if (apiVersionManager.isApiEnabled(header.apiKey, header.apiVersion)) {
header
} else {
} else if (header.isApiVersionDeprecated()) {
throw new InvalidRequestException(s"Received request api key ${header.apiKey} with version ${header.apiVersion} which is not enabled")
} else {
throw new UnsupportedVersionException(s"Received request api key ${header.apiKey} with version ${header.apiVersion} which is not supported")
}
}

Expand Down

0 comments on commit 4d8ee92

Please sign in to comment.