diff --git a/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/AzPubSubConfig.java b/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/AzPubSubConfig.java index eb63f1c66f646..2ab710daa2302 100644 --- a/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/AzPubSubConfig.java +++ b/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/AzPubSubConfig.java @@ -42,7 +42,27 @@ public class AzPubSubConfig extends AbstractConfig { "", Importance.MEDIUM, DSTS_METADATA_FILE_DOC) - ; + .define("azpubsub.topic.max.qps", + Type.INT, + 1000, + Importance.MEDIUM, + "Topic Qps") + .define("azpubsub.qps.throttling.level", + Type.INT, + 0, + Importance.MEDIUM, + "Topic Qps throttling level. 0: throttling is disabled; 1: throttling at topic level; 2: throttling at clientId + topic level.") + .define("azpubsub.clientid.topic.max.qps", + Type.INT, + 1000, + Importance.MEDIUM, + "Topic Qps") + .define("azpubsub.timer.task.execution.interval.in.ms", + Type.LONG, + 300000, + Importance.MEDIUM, + "The interval of timer background task executions, in milliseconds") + ; } public static AzPubSubConfig fromProps(Map configProviderProps) { diff --git a/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/ThreadCounterTimerTask.java b/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/ThreadCounterTimerTask.java new file mode 100644 index 0000000000000..6a2704913f2e2 --- /dev/null +++ b/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/ThreadCounterTimerTask.java @@ -0,0 +1,43 @@ +package com.microsoft.azpubsub.security.auth; + +import java.util.TimerTask; + +public class ThreadCounterTimerTask extends TimerTask { + private String topicName = null; + private String clientId = null; + private int threadCount = 0; + private Long ioThreadId = 0L; + private int throttlingLevel = 0; + private Long intervalInMs = 300000L; + private TopicThreadCounter topicThreadCounterInstance = null; + + public ThreadCounterTimerTask(long interval, int level) { + intervalInMs = interval; + throttlingLevel = level; + topicThreadCounterInstance = TopicThreadCounter.getInstance(this.intervalInMs, this.throttlingLevel); + } + + public void setTopicName(String topic) { + topicName = topic; + } + + public void setClientId(String client) { clientId = client; } + + public void setIoThreadId(Long threadId) { + ioThreadId = threadId; + } + + public int getThreadCount() { + return threadCount; + } + + public void setThrottlingLevel (int level) { throttlingLevel = level; } + public int getThrottlingLevel () { return throttlingLevel; } + + @Override + public void run() { + if(null != topicName) { + threadCount = topicThreadCounterInstance.add(topicName, System.currentTimeMillis(), ioThreadId, clientId); + } + } +} diff --git a/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/TopicThreadCounter.java b/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/TopicThreadCounter.java new file mode 100644 index 0000000000000..5bb7e24442e33 --- /dev/null +++ b/azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/TopicThreadCounter.java @@ -0,0 +1,48 @@ +package com.microsoft.azpubsub.security.auth; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +public class TopicThreadCounter { + private static TopicThreadCounter instance = null; + private static Object lock = new Object(); + private Long interval = null; + private int throttlingLevel = 0; + ConcurrentHashMap> topicThreadMap = new ConcurrentHashMap<>(); + + public TopicThreadCounter(Long intvl, int level) { + interval = intvl; + throttlingLevel = level; + } + + static TopicThreadCounter getInstance(Long interval, int level) { + synchronized (lock) { + if(null == instance) { + instance = new TopicThreadCounter(interval, level); + } + } + return instance; + } + + public int add(String topic, Long currentTimeInMs, Long threadId, String clientId) { + String key = TopicThreadCounter.makeKey(this.throttlingLevel, topic, clientId, threadId); + if(!topicThreadMap.containsKey(key)) { + topicThreadMap.put(key, new TreeMap<>()); + } + topicThreadMap.get(key).put(currentTimeInMs, threadId); + NavigableMap subMap= topicThreadMap.get(key).tailMap(currentTimeInMs - interval, false); + HashSet hs = new HashSet<>(); + for(Map.Entry element: subMap.entrySet()) { + hs.add((Long)element.getValue()); + } + topicThreadMap.put(key, new TreeMap<>(subMap)); + return hs.size(); + } + + public static String makeKey(int throttlingLevel, String topic, String clientId, Long threaId) { + if(1 == throttlingLevel) return String.format("ClientId:%s|ThreadId:%d|Topic:%s", clientId, threaId, topic); + else if(2 == throttlingLevel) return String.format("ClientId:%s|ThreadId:%d", clientId, threaId); + else if(3 == throttlingLevel) return String.format("Topic:%s|ThreadId:%d", topic, threaId); + return topic; + } +} diff --git a/azpubsub/src/main/scala/com/microsoft/azpubsub/security/auth/AzPubSubAclAuthorizerV2.scala b/azpubsub/src/main/scala/com/microsoft/azpubsub/security/auth/AzPubSubAclAuthorizerV2.scala new file mode 100644 index 0000000000000..9b28ae7118ea0 --- /dev/null +++ b/azpubsub/src/main/scala/com/microsoft/azpubsub/security/auth/AzPubSubAclAuthorizerV2.scala @@ -0,0 +1,135 @@ +package com.microsoft.azpubsub.security.auth + +import java.net.InetAddress +import java.util +import java.util.Timer +import java.util.concurrent._ + +import com.yammer.metrics.core.{Meter, MetricName} +import kafka.metrics.KafkaMetricsGroup +import kafka.security.authorizer.AclAuthorizer +import kafka.utils.Logging +import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} +import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult} + +import scala.collection.JavaConverters.asScalaSetConverter +import scala.collection.mutable + +/* + * AzPubSub ACL Authorizer to handle the certificate & role based principal type + */ + +object AzPubSubAclAuthorizerV2 { + val configThrottlingLevel = "azpubsub.qps.throttling.level" + val configTopicThrottlingQps = "azpubsub.topic.max.qps" + val configClientidTopicThrottlingQps = "azpubsub.clientid.topic.max.qps" + val configMetterSuccessRatePerSec = "AuthorizerSuccessPerSec" + val configMetterFailureRatePerSec = "AuthorizerFailurePerSec" + val configTimerTaskExecutionInterval = "azpubsub.timer.task.execution.interval.in.ms" +} + + +class AzPubSubAclAuthorizerV2 extends AclAuthorizer with Logging with KafkaMetricsGroup { + override def metricName(name: String, metricTags: scala.collection.Map[String, String]): MetricName = { + explicitMetricName("azpubsub.security", "AuthorizerMetrics", name, metricTags) + } + + override def configure(javaConfigs: util.Map[String, _]): Unit = { + super.configure(javaConfigs) + config = AzPubSubConfig.fromProps(javaConfigs) + throttlingLevel = config.getInt(AzPubSubAclAuthorizerV2.configThrottlingLevel) + throttlingTopicQps = config.getInt(AzPubSubAclAuthorizerV2.configTopicThrottlingQps) + throttlingClientIdTopicQps = config.getInt(AzPubSubAclAuthorizerV2.configTopicThrottlingQps) + timerTaskExecutionInterval = config.getLong(AzPubSubAclAuthorizerV2.configTimerTaskExecutionInterval) + topicThreadCounterTimerTask = new ThreadCounterTimerTask(timerTaskExecutionInterval, throttlingLevel) + trigger.scheduleAtFixedRate(topicThreadCounterTimerTask, 2, timerTaskExecutionInterval) + topicThreadCounterTimerTask.setIoThreadId(Thread.currentThread().getId) + } + + private var config: AzPubSubConfig = null + private var throttlingLevel: Int = 0 + private var throttlingTopicQps: Int = 0 + private var throttlingClientIdTopicQps: Int = 0 + private var timerTaskExecutionInterval: Long = 0 + private val successRate: Meter = newMeter(AzPubSubAclAuthorizerV2.configMetterSuccessRatePerSec, "success", TimeUnit.SECONDS) + private val failureRate: Meter = newMeter(AzPubSubAclAuthorizerV2.configMetterFailureRatePerSec, "failure", TimeUnit.SECONDS) + private var topicThreadCounterTimerTask : ThreadCounterTimerTask = null + private val trigger: Timer = new Timer(true) + + var ints = new mutable.HashMap[String, mutable.TreeSet[Long]] + + override def authorize(requestContext: AuthorizableRequestContext, actions: util.List[Action]): util.List[AuthorizationResult] = { + if(throttlingLevel > 0) { + actions.forEach( a => if( a.resourcePattern().resourceType() == org.apache.kafka.common.resource.ResourceType.TOPIC) { + topicThreadCounterTimerTask.setTopicName(a.resourcePattern.name()) + topicThreadCounterTimerTask.setClientId(requestContext.clientId()) + + val key = makeKey(a.resourcePattern().name(), requestContext.clientId()) + if(!ints.contains(key)) ints.put(key, new mutable.TreeSet[Long]()) + + val threadCount = Math.max(topicThreadCounterTimerTask.getThreadCount, 1) + + var count = 1 + while (throttlingLevel == 1 && ints.get(key).size * threadCount > throttlingTopicQps || throttlingLevel == 2 && ints.get(key).size * threadCount > throttlingClientIdTopicQps) { + val pivot = ints.get(key).get.minBy(x => x > System.currentTimeMillis - 1000) + val (_, after) = ints.get(key).get.partition(x => x > pivot) + ints.put(key, after) + Thread.sleep(count) + count *= 2 + } + + ints.get(key).get += System.currentTimeMillis + } ) + } + + var res : util.List[AuthorizationResult] = null + + if (requestContext.principal().getClass == classOf[AzPubSubPrincipal]) { + val tmpPrincipal = requestContext.principal().asInstanceOf[AzPubSubPrincipal] + for(role <- tmpPrincipal.getRoles.asScala) { + val context : AuthorizableRequestContext = new AuthorizableRequestContext { + override def listenerName(): String = requestContext.listenerName() + + override def securityProtocol(): SecurityProtocol = requestContext.securityProtocol() + + override def principal(): KafkaPrincipal = { + new KafkaPrincipal(tmpPrincipal.getPrincipalType, role) + } + + override def clientAddress(): InetAddress = requestContext.clientAddress() + + override def requestType(): Int = requestContext.requestType() + + override def requestVersion(): Int = requestContext.requestVersion() + + override def clientId(): String = requestContext.clientId() + + override def correlationId(): Int = requestContext.correlationId() + } + res = super.authorize(context, actions) + if (res.contains(AuthorizationResult.ALLOWED) ) { + successRate.mark() + res + } + } + failureRate.mark() + return res + } + res = super.authorize(requestContext, actions) + if(null != res && res.contains(AuthorizationResult.ALLOWED)) { + successRate.mark() + } + else { + failureRate.mark() + } + return res + } + + private def makeKey(topic: String, clientId: String) : String = { + throttlingLevel match { + case 1 => String.format("ClientId:%s|Topic:%s", topic, clientId) + case 2 => String.format("ClientId:%s", clientId) + case _ => String.format("Topic:%s", topic) + } + } +} diff --git a/config/log4j.properties b/config/log4j.properties index 6e7f82bd20ad6..20ca886aa7219 100644 --- a/config/log4j.properties +++ b/config/log4j.properties @@ -14,8 +14,8 @@ # limitations under the License. # Unspecified loggers and loggers with additivity=true output to server.log and stdout -# Note that INFO only applies to unspecified loggers, the log level of the child logger is used otherwise -log4j.rootLogger=INFO, stdout, kafkaAppender +# Note that DEBUG only applies to unspecified loggers, the log level of the child logger is used otherwise +log4j.rootLogger=DEBUG, stdout, kafkaAppender log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout @@ -57,12 +57,13 @@ log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n -# Change the line below to adjust ZK client logging -log4j.logger.org.apache.zookeeper=INFO +# Change the two lines below to adjust ZK client logging +log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG +log4j.logger.org.apache.zookeeper=DEBUG # Change the two lines below to adjust the general broker logging level (output to server.log and stdout) -log4j.logger.kafka=INFO -log4j.logger.org.apache.kafka=INFO +log4j.logger.kafka=DEBUG +log4j.logger.org.apache.kafka=DEBUG # Change to DEBUG or TRACE to enable request logging log4j.logger.kafka.request.logger=WARN, requestAppender @@ -79,7 +80,7 @@ log4j.additivity.kafka.network.RequestChannel$=false log4j.logger.kafka.controller=TRACE, controllerAppender log4j.additivity.kafka.controller=false -log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender +log4j.logger.kafka.log.LogCleaner=DEBUG, cleanerAppender log4j.additivity.kafka.log.LogCleaner=false log4j.logger.kafka.log.SkimpyOffsetMap=INFO, cleanerAppender @@ -88,7 +89,7 @@ log4j.additivity.kafka.log.SkimpyOffsetMap=false log4j.logger.state.change.logger=TRACE, stateChangeAppender log4j.additivity.state.change.logger=false -# Access denials are logged at INFO level, change to DEBUG to also log allowed accesses -log4j.logger.kafka.authorizer.logger=INFO, authorizerAppender +# Access denials are logged at DEBUG level, change to DEBUG to also log allowed accesses +log4j.logger.kafka.authorizer.logger=DEBUG, authorizerAppender log4j.additivity.kafka.authorizer.logger=false diff --git a/config/server.properties b/config/server.properties index 46208b1523f63..abb7a3df61396 100644 --- a/config/server.properties +++ b/config/server.properties @@ -28,21 +28,65 @@ broker.id=0 # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 -#listeners=PLAINTEXT://:9092 -# Hostname and port the broker will advertise to producers and consumers. If not set, +listeners=PLAINTEXT://:9092 +advertised.listeners=PLAINTEXT://:9092 +listener.security.protocol.map=PLAINTEXT:PLAINTEXT + +# SSL Settings +#ssl.protocol= TLS +#ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 +#ssl.keystore.type=JKS +#ssl.keystore.location=D:\\Work\\AzPubSub\\SSL\\adityacerts\\certs\\broker_adinigamdev2_server.keystore.jks +#ssl.keystore.location=D:\\Work\\AzPubSub\\SSL\\scripts\\broker_WADE-Z240_server.keystore.jks +#ssl.keystore.password=abcdefgh +#ssl.key.password=abcdefgh +#ssl.truststore.type=JKS +#ssl.truststore.location=D:\\Work\\AzPubSub\\SSL\\adityacerts\\certs\\broker_adinigamdev2_server.truststore.jks +#ssl.truststore.location=D:\\Work\\AzPubSub\\SSL\\scripts\\broker_WADE-Z240_server.truststore.jks +#ssl.truststore.password=abcdefgh +#ssl.client.auth=required + +# using APPKI +ssl.protocol=TLSv1.2 +#ssl.keystore.type=Windows-MY +ssl.keymanager.algorithm=APPKIV2 +ssl.trustmanager.algorithm=AzPubSub +ssl.enabled.protocols=TLSv1.2 +# From kafka 2.0 onwards, host name verification of servers is enabled by default and getting auth failure errors were logged because, the kafka hostname (ip) didnt match the certificate CN (machinename). +# As the Kafka hostname (i.e. logical IP) and certificate CN doesnt match, disabling the hostname verification by setting the below property to empty +ssl.endpoint.identification.algorithm= +ssl.appki.provider.class=com.microsoft.autopilot.azpubsub.ssl.AzPubSubProvider +security.providers=com.microsoft.autopilot.azpubsub.ssl.AzPubSubSecurityProviderCreator +authorizer.class.name=com.microsoft.azpubsub.security.auth.AzPubSubAclAuthorizer +ssl.client.auth=required + +# Broker to Broker Settings +security.inter.broker.protocol=plaintext + +# SASL OAUTHBEARER +listener.name.sasl_plaintext.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub="admin"; +sasl.enabled.mechanisms=OAUTHBEARER +listener.name.sasl_plaintext.oauthbearer.connections.max.reauth.ms=30000 + + +# backup, good settings. +#listener.name.sasl_plaintext.oauthbearer.sasl.server.callback.handler.class=com.microsoft.azpubsub.security.oauthbearer.OAuthAuthenticateValidatorCallbackHandler +#principal.builder.class=com.microsoft.azpubsub.security.authenticator.AzPubSubPrincipalBuilder +#allow.everyone.if.no.acl.found=true + + +# Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). -#advertised.listeners=PLAINTEXT://your.host.name:9092 +# advertised.listeners=PLAINTEXT://127.0.0.1:9092 # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details -#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL - # The number of threads that the server uses for receiving requests from the network and sending responses to the network num.network.threads=3 # The number of threads that the server uses for processing requests, which may include disk I/O -num.io.threads=8 +num.io.threads=3 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 @@ -58,6 +102,7 @@ socket.request.max.bytes=104857600 # A comma separated list of directories under which to store log files log.dirs=/tmp/kafka-logs +log.retention.minutes=5 # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across @@ -120,7 +165,7 @@ log.retention.check.interval.ms=300000 # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. -zookeeper.connect=localhost:2181 +zookeeper.connect=127.0.0.1:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000 @@ -134,3 +179,7 @@ zookeeper.connection.timeout.ms=6000 # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. group.initial.rebalance.delay.ms=0 + +# leader rebalance +auto.leader.rebalance.enable=true +#leader.imbalance.check.interval.seconds=10