diff --git a/build.gradle b/build.gradle index 7586eb5912425..0fc5927e37356 100644 --- a/build.gradle +++ b/build.gradle @@ -793,6 +793,8 @@ project(':clients') { compile libs.lz4 compile libs.snappy compile libs.slf4jApi + compile files('../repository/jni4net.j-0.8.8.0.jar') + compile files('../repository/WcfSoapClient.j4n.jar') testCompile libs.bcpkix testCompile libs.junit diff --git a/clients/src/main/java/org/apache/kafka/common/security/dsts/DstsLoginModule.java b/clients/src/main/java/org/apache/kafka/common/security/dsts/DstsLoginModule.java new file mode 100644 index 0000000000000..0edd36a237f46 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/security/dsts/DstsLoginModule.java @@ -0,0 +1,40 @@ +package org.apache.kafka.common.security.dsts; + +import javax.security.auth.Subject; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.login.LoginException; +import javax.security.auth.spi.LoginModule; +import java.util.Map; +import org.apache.kafka.common.security.dsts.*; + +public class DstsLoginModule implements LoginModule{ + static { + DstsSaslServerProvider.initialize(); + DstsSaslClientProvider.initialize(); + } + + @Override + public void initialize(Subject subject, CallbackHandler callbackHandler, Map sharedState, Map options) { + + } + + @Override + public boolean login() throws LoginException { + return true; + } + + @Override + public boolean logout() throws LoginException { + return true; + } + + @Override + public boolean commit() throws LoginException { + return true; + } + + @Override + public boolean abort() throws LoginException { + return false; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/security/dsts/DstsSaslClient.java b/clients/src/main/java/org/apache/kafka/common/security/dsts/DstsSaslClient.java new file mode 100644 index 0000000000000..779e547149cfa --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/security/dsts/DstsSaslClient.java @@ -0,0 +1,136 @@ +package org.apache.kafka.common.security.dsts; + +import org.apache.kafka.common.errors.IllegalSaslStateException; +import org.apache.kafka.common.metrics.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.callback.CallbackHandler; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslClientFactory; +import javax.security.sasl.SaslException; +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; + +import net.sf.jni4net.Bridge; +import java.io.*; +import sample.authentication.wcfsoapclient.*; + +/** + * SaslClient implementation for SASL/DSTS. + * + */ +public class DstsSaslClient implements SaslClient{ + private static final Logger log = LoggerFactory.getLogger(DstsSaslClient.class); + + enum State { + SEND_CLIENT_FIRST_MESSAGE, + RECEIVE_SERVER_FINAL_MESSAGE, + COMPLETE, + FAILED + }; + + private State state; + + private void setState(State state) { + log.debug("Setting SASL/DSTS client state to {}", state); + this.state = state; + } + + public DstsSaslClient(){ + setState(State.SEND_CLIENT_FIRST_MESSAGE); + } + + @Override + public byte[] evaluateChallenge(byte[] challenge) throws SaslException{ + try { + switch (state) { + case SEND_CLIENT_FIRST_MESSAGE: + if (challenge != null && challenge.length != 0) + throw new SaslException("Expected empty challenge"); + setState(State.RECEIVE_SERVER_FINAL_MESSAGE); + + // Set Token + try { + Bridge.setVerbose(true); + + Bridge.init(new File("D:\\git\\kafka\\core\\build\\distributions\\kafka_2.11-1.1.1-SNAPSHOT\\libs")); + Bridge.LoadAndRegisterAssemblyFrom(new java.io.File("D:\\git\\kafka\\core\\build\\distributions\\kafka_2.11-1.1.1-SNAPSHOT\\libs", "WcfSoapClient.j4n.dll")); + system.Object token = WcfSoapClient.GetSecurityToken(); + return token.toString().getBytes(); + }catch (Exception ex){ + ex.printStackTrace(); + } + return new byte[0]; + case RECEIVE_SERVER_FINAL_MESSAGE: + setState(State.COMPLETE); + return null; + default: + throw new IllegalSaslStateException("Unexpected challenge in Sasl client state " + state); + } + }catch (SaslException e) { + setState(State.FAILED); + throw e; + } + } + + @Override + public boolean isComplete() { + return state == State.COMPLETE; + } + + @Override + public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException { + if (!isComplete()) + throw new IllegalStateException("Authentication exchange has not completed"); + return Arrays.copyOfRange(incoming, offset, offset + len); + } + + @Override + public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException { + if (!isComplete()) + throw new IllegalStateException("Authentication exchange has not completed"); + return Arrays.copyOfRange(outgoing, offset, offset + len); + } + + @Override + public Object getNegotiatedProperty(String propName) { + if (!isComplete()) + throw new IllegalStateException("Authentication exchange has not completed"); + return null; + } + + @Override + public void dispose() throws SaslException { + } + + @Override + public boolean hasInitialResponse() { + return true; + } + + @Override + public String getMechanismName() { + return "DSTS"; + } + + public static class DstsSaslClientFactory implements SaslClientFactory { + + @Override + public SaslClient createSaslClient(String[] mechanisms, + String authorizationId, + String protocol, + String serverName, + Map props, + CallbackHandler cbh) throws SaslException { + + return new DstsSaslClient(); + } + + @Override + public String[] getMechanismNames(Map props) { + return new String[]{"DSTS"}; + } + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/security/dsts/DstsSaslClientProvider.java b/clients/src/main/java/org/apache/kafka/common/security/dsts/DstsSaslClientProvider.java new file mode 100644 index 0000000000000..5c60502275d4a --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/security/dsts/DstsSaslClientProvider.java @@ -0,0 +1,20 @@ +package org.apache.kafka.common.security.dsts; + +import java.security.Provider; +import java.security.Security; + +import org.apache.kafka.common.security.dsts.DstsSaslClient.DstsSaslClientFactory; + +public class DstsSaslClientProvider extends Provider { + private static final long DserialVersionUID = 1L; + + @SuppressWarnings("deprecation") + protected DstsSaslClientProvider() { + super("Simple SASL/DSTS Client Provider", 1.0, "Simple SASL/DSTS Client Provider for Kafka"); + put("SaslClientFactory.DSTS", DstsSaslClientFactory.class.getName()); + } + + public static void initialize() { + Security.addProvider(new DstsSaslClientProvider()); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/security/dsts/DstsSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/dsts/DstsSaslServer.java new file mode 100644 index 0000000000000..08e4428faa896 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/security/dsts/DstsSaslServer.java @@ -0,0 +1,110 @@ +package org.apache.kafka.common.security.dsts; + +import org.apache.kafka.common.errors.IllegalSaslStateException; +import org.apache.kafka.common.errors.SaslAuthenticationException; +import org.apache.kafka.common.security.scram.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.callback.CallbackHandler; +import javax.security.sasl.SaslException; +import javax.security.sasl.SaslServer; +import javax.security.sasl.SaslServerFactory; +import java.security.NoSuchAlgorithmException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; + +/** + * Dsts SaslServer implementation for SASL/DSTS. + *

+ * DSTS is a token based authentication method supported in Microsoft internally + * + */ +public class DstsSaslServer implements SaslServer { + private static final Logger log = LoggerFactory.getLogger(DstsSaslServer.class); + + enum State { + RECEIVE_CLIENT_FIRST_MESSAGE, + COMPLETE, + FAILED + }; + + private State state; + + public DstsSaslServer() { + setState(DstsSaslServer.State.RECEIVE_CLIENT_FIRST_MESSAGE); + } + + /** + * @throws SaslAuthenticationException if the requested authorization id is not the same as username. + *

+ * Note: This method may throw {@link SaslAuthenticationException} to provide custom error messages + * to clients. But care should be taken to avoid including any information in the exception message that + * should not be leaked to unauthenticated clients. It may be safer to throw {@link SaslException} in + * most cases so that a standard error message is returned to clients. + *

+ */ + @Override + public byte[] evaluateResponse(byte[] response) throws SaslException, SaslAuthenticationException { + switch (state) { + case RECEIVE_CLIENT_FIRST_MESSAGE: + setState(State.COMPLETE); + return new byte[0]; + default: + throw new IllegalSaslStateException("Unexpected challenge in Sasl server state " + state); + } + } + + private void setState(State state) { + log.debug("Setting SASL/DSTS server state to {}", state); + this.state = state; + } + + @Override + public String getMechanismName(){ + return "DSTS"; + } + + @Override + public boolean isComplete(){ + return state==State.COMPLETE; + } + + @Override + public String getAuthorizationID(){ + return "testID"; + } + + @Override + public byte[] unwrap(byte[] incoming, int offset, int len){ + return Arrays.copyOfRange(incoming, offset, offset + len); + } + + @Override + public byte[] wrap(byte[] outgoing, int offset, int len){ + return Arrays.copyOfRange(outgoing, offset, offset + len); + } + + @Override + public Object getNegotiatedProperty(String propName){ + return null; + } + + @Override + public void dispose(){ + } + + public static class DstsSaslServerFactory implements SaslServerFactory { + @Override + public SaslServer createSaslServer(String mechanism, String protocol, String serverName, Map props, CallbackHandler cbh) + throws SaslException { + return new DstsSaslServer(); + } + + @Override + public String[] getMechanismNames(Map props) { + return new String[]{"DSTS"}; + } + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/security/dsts/DstsSaslServerProvider.java b/clients/src/main/java/org/apache/kafka/common/security/dsts/DstsSaslServerProvider.java new file mode 100644 index 0000000000000..e81e0839eb97d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/security/dsts/DstsSaslServerProvider.java @@ -0,0 +1,20 @@ +package org.apache.kafka.common.security.dsts; + +import java.security.Provider; +import java.security.Security; + +import org.apache.kafka.common.security.dsts.DstsSaslServer.DstsSaslServerFactory; + +public class DstsSaslServerProvider extends Provider { + private static final long serialVersionUID = 1L; + + @SuppressWarnings("deprecation") + protected DstsSaslServerProvider() { + super("SASL/SCRAM Server Provider", 1.0, "SASL/SCRAM Server Provider for Kafka"); + put("SaslServerFactory.DSTS", DstsSaslServerFactory.class.getName()); + } + + public static void initialize() { + Security.addProvider(new DstsSaslServerProvider()); + } +} diff --git a/config/kafka_server_jaas.conf b/config/kafka_server_jaas.conf new file mode 100644 index 0000000000000..24c47c2d47326 --- /dev/null +++ b/config/kafka_server_jaas.conf @@ -0,0 +1,3 @@ +KafkaServer { + org.apache.kafka.common.security.dsts.DstsLoginModule required; +}; \ No newline at end of file diff --git a/config/server.properties b/config/server.properties index 898d8ebc28c5c..357a10b85e9fa 100644 --- a/config/server.properties +++ b/config/server.properties @@ -28,7 +28,7 @@ broker.id=0 # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 -#listeners=PLAINTEXT://:9092 +listeners=PLAINTEXT://:9092,SASL_PLAINTEXT://:9094 # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value @@ -133,4 +133,6 @@ zookeeper.connection.timeout.ms=6000 # The default value for this is 3 seconds. # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. -group.initial.rebalance.delay.ms=0 \ No newline at end of file +group.initial.rebalance.delay.ms=0 + +sasl.enabled.mechanisms=DSTS \ No newline at end of file