Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,8 @@ project(':clients') {
compile libs.lz4
compile libs.snappy
compile libs.slf4jApi
compile files('../repository/jni4net.j-0.8.8.0.jar')
compile files('../repository/WcfSoapClient.j4n.jar')

testCompile libs.bcpkix
testCompile libs.junit
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package org.apache.kafka.common.security.dsts;

import javax.security.auth.Subject;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.login.LoginException;
import javax.security.auth.spi.LoginModule;
import java.util.Map;
import org.apache.kafka.common.security.dsts.*;

public class DstsLoginModule implements LoginModule{
static {
DstsSaslServerProvider.initialize();
DstsSaslClientProvider.initialize();
}

@Override
public void initialize(Subject subject, CallbackHandler callbackHandler, Map<String, ?> sharedState, Map<String, ?> options) {

}

@Override
public boolean login() throws LoginException {
return true;
}

@Override
public boolean logout() throws LoginException {
return true;
}

@Override
public boolean commit() throws LoginException {
return true;
}

@Override
public boolean abort() throws LoginException {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package org.apache.kafka.common.security.dsts;

import org.apache.kafka.common.errors.IllegalSaslStateException;
import org.apache.kafka.common.metrics.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.security.auth.callback.CallbackHandler;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslClientFactory;
import javax.security.sasl.SaslException;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;

import net.sf.jni4net.Bridge;
import java.io.*;
import sample.authentication.wcfsoapclient.*;

/**
* SaslClient implementation for SASL/DSTS.
*
*/
public class DstsSaslClient implements SaslClient{
private static final Logger log = LoggerFactory.getLogger(DstsSaslClient.class);

enum State {
SEND_CLIENT_FIRST_MESSAGE,
RECEIVE_SERVER_FINAL_MESSAGE,
COMPLETE,
FAILED
};

private State state;

private void setState(State state) {
log.debug("Setting SASL/DSTS client state to {}", state);
this.state = state;
}

public DstsSaslClient(){
setState(State.SEND_CLIENT_FIRST_MESSAGE);
}

@Override
public byte[] evaluateChallenge(byte[] challenge) throws SaslException{
try {
switch (state) {
case SEND_CLIENT_FIRST_MESSAGE:
if (challenge != null && challenge.length != 0)
throw new SaslException("Expected empty challenge");
setState(State.RECEIVE_SERVER_FINAL_MESSAGE);

// Set Token
try {
Bridge.setVerbose(true);

Bridge.init(new File("D:\\git\\kafka\\core\\build\\distributions\\kafka_2.11-1.1.1-SNAPSHOT\\libs"));
Bridge.LoadAndRegisterAssemblyFrom(new java.io.File("D:\\git\\kafka\\core\\build\\distributions\\kafka_2.11-1.1.1-SNAPSHOT\\libs", "WcfSoapClient.j4n.dll"));
system.Object token = WcfSoapClient.GetSecurityToken();
return token.toString().getBytes();
}catch (Exception ex){
ex.printStackTrace();
}
return new byte[0];
case RECEIVE_SERVER_FINAL_MESSAGE:
setState(State.COMPLETE);
return null;
default:
throw new IllegalSaslStateException("Unexpected challenge in Sasl client state " + state);
}
}catch (SaslException e) {
setState(State.FAILED);
throw e;
}
}

@Override
public boolean isComplete() {
return state == State.COMPLETE;
}

@Override
public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException {
if (!isComplete())
throw new IllegalStateException("Authentication exchange has not completed");
return Arrays.copyOfRange(incoming, offset, offset + len);
}

@Override
public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException {
if (!isComplete())
throw new IllegalStateException("Authentication exchange has not completed");
return Arrays.copyOfRange(outgoing, offset, offset + len);
}

@Override
public Object getNegotiatedProperty(String propName) {
if (!isComplete())
throw new IllegalStateException("Authentication exchange has not completed");
return null;
}

@Override
public void dispose() throws SaslException {
}

@Override
public boolean hasInitialResponse() {
return true;
}

@Override
public String getMechanismName() {
return "DSTS";
}

public static class DstsSaslClientFactory implements SaslClientFactory {

@Override
public SaslClient createSaslClient(String[] mechanisms,
String authorizationId,
String protocol,
String serverName,
Map<String, ?> props,
CallbackHandler cbh) throws SaslException {

return new DstsSaslClient();
}

@Override
public String[] getMechanismNames(Map<String, ?> props) {
return new String[]{"DSTS"};
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.apache.kafka.common.security.dsts;

import java.security.Provider;
import java.security.Security;

import org.apache.kafka.common.security.dsts.DstsSaslClient.DstsSaslClientFactory;

public class DstsSaslClientProvider extends Provider {
private static final long DserialVersionUID = 1L;

@SuppressWarnings("deprecation")
protected DstsSaslClientProvider() {
super("Simple SASL/DSTS Client Provider", 1.0, "Simple SASL/DSTS Client Provider for Kafka");
put("SaslClientFactory.DSTS", DstsSaslClientFactory.class.getName());
}

public static void initialize() {
Security.addProvider(new DstsSaslClientProvider());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package org.apache.kafka.common.security.dsts;

import org.apache.kafka.common.errors.IllegalSaslStateException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.security.scram.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.security.auth.callback.CallbackHandler;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import javax.security.sasl.SaslServerFactory;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;

/**
* Dsts SaslServer implementation for SASL/DSTS.
* <p>
* DSTS is a token based authentication method supported in Microsoft internally
*
*/
public class DstsSaslServer implements SaslServer {
private static final Logger log = LoggerFactory.getLogger(DstsSaslServer.class);

enum State {
RECEIVE_CLIENT_FIRST_MESSAGE,
COMPLETE,
FAILED
};

private State state;

public DstsSaslServer() {
setState(DstsSaslServer.State.RECEIVE_CLIENT_FIRST_MESSAGE);
}

/**
* @throws SaslAuthenticationException if the requested authorization id is not the same as username.
* <p>
* <b>Note:</b> This method may throw {@link SaslAuthenticationException} to provide custom error messages
* to clients. But care should be taken to avoid including any information in the exception message that
* should not be leaked to unauthenticated clients. It may be safer to throw {@link SaslException} in
* most cases so that a standard error message is returned to clients.
* </p>
*/
@Override
public byte[] evaluateResponse(byte[] response) throws SaslException, SaslAuthenticationException {
switch (state) {
case RECEIVE_CLIENT_FIRST_MESSAGE:
setState(State.COMPLETE);
return new byte[0];
default:
throw new IllegalSaslStateException("Unexpected challenge in Sasl server state " + state);
}
}

private void setState(State state) {
log.debug("Setting SASL/DSTS server state to {}", state);
this.state = state;
}

@Override
public String getMechanismName(){
return "DSTS";
}

@Override
public boolean isComplete(){
return state==State.COMPLETE;
}

@Override
public String getAuthorizationID(){
return "testID";
}

@Override
public byte[] unwrap(byte[] incoming, int offset, int len){
return Arrays.copyOfRange(incoming, offset, offset + len);
}

@Override
public byte[] wrap(byte[] outgoing, int offset, int len){
return Arrays.copyOfRange(outgoing, offset, offset + len);
}

@Override
public Object getNegotiatedProperty(String propName){
return null;
}

@Override
public void dispose(){
}

public static class DstsSaslServerFactory implements SaslServerFactory {
@Override
public SaslServer createSaslServer(String mechanism, String protocol, String serverName, Map<String, ?> props, CallbackHandler cbh)
throws SaslException {
return new DstsSaslServer();
}

@Override
public String[] getMechanismNames(Map<String, ?> props) {
return new String[]{"DSTS"};
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.apache.kafka.common.security.dsts;

import java.security.Provider;
import java.security.Security;

import org.apache.kafka.common.security.dsts.DstsSaslServer.DstsSaslServerFactory;

public class DstsSaslServerProvider extends Provider {
private static final long serialVersionUID = 1L;

@SuppressWarnings("deprecation")
protected DstsSaslServerProvider() {
super("SASL/SCRAM Server Provider", 1.0, "SASL/SCRAM Server Provider for Kafka");
put("SaslServerFactory.DSTS", DstsSaslServerFactory.class.getName());
}

public static void initialize() {
Security.addProvider(new DstsSaslServerProvider());
}
}
3 changes: 3 additions & 0 deletions config/kafka_server_jaas.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
KafkaServer {
org.apache.kafka.common.security.dsts.DstsLoginModule required;
};
6 changes: 4 additions & 2 deletions config/server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ broker.id=0
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://:9092,SASL_PLAINTEXT://:9094

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
Expand Down Expand Up @@ -133,4 +133,6 @@ zookeeper.connection.timeout.ms=6000
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
group.initial.rebalance.delay.ms=0

sasl.enabled.mechanisms=DSTS