Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.microsoft.azpubsub.kafka.admin
import kafka.admin.AclCommand
import kafka.admin.AclCommand.{AclCommandOptions, AdminClientService}
import kafka.admin.AclCommand.{AclCommandOptions, AdminClientService, confirmAction, getResourceFilter, getResourceFilterToAcls}
import kafka.utils.{CommandLineUtils, Exit, Json, Logging}
import org.apache.kafka.common.acl.{AccessControlEntry, AclOperation, AclPermissionType}
import org.apache.kafka.common.resource.{ResourcePattern, ResourcePatternFilter, ResourceType}
Expand Down Expand Up @@ -44,6 +44,32 @@ object AzPubSubAclCommand extends Logging {

class AzPubSubAdminClientService(opts: AzPubSubAclCommandOptions) extends AdminClientService(opts) {

private val Newline = scala.util.Properties.lineSeparator

override def removeAcls(): Unit = {
withAdminClient(opts) { adminClient =>
val filters = getResourceFilter(opts, dieIfNoResourceFound = false)
val resourceToAcls = getAcls(adminClient, filters)

val filterToAcl = getResourceFilterToAcls(opts)

for ((filter, acls) <- filterToAcl) {
val filteredPrincipalAclOperationsMap = getPrincipalAclOperationsMap(resourceToAcls, filter)
if (acls.isEmpty) {
if (confirmAction(opts, s"Are you sure you want to delete all ACLs for resource filter `$filter`? (y/n)"))
removeAcls(adminClient, acls, filter)
} else {
val updatedAcls = dropAclsWithSharedOperation(opts, acls, filteredPrincipalAclOperationsMap)
if (confirmAction(opts, s"Are you sure you want to remove ACLs: $Newline ${updatedAcls.map("\t" + _).mkString(Newline)} $Newline from resource filter `$filter`? (y/n)")) {
removeAcls(adminClient, updatedAcls, filter)
}
}
}

listAcls()
}
}

override def printAcls(filters: Set[ResourcePatternFilter], listPrincipals: Set[KafkaPrincipal], resourceToAcls: Map[ResourcePattern, Set[AccessControlEntry]]): Unit = {
if (!opts.options.has(opts.outputAsProducerConsumerOpt)) {
super.printAcls(filters, listPrincipals, resourceToAcls)
Expand All @@ -57,7 +83,7 @@ class AzPubSubAdminClientService(opts: AzPubSubAclCommandOptions) extends AdminC
val allPrincipalsFilteredResourceToAcls = resourceToAcls.mapValues(acls =>
acls.filterNot(acl => listPrincipals.forall(
principal => !principal.toString.equals(acl.principal)))).filter(entry => entry._2.nonEmpty)
var producerConsumerAclMap = aclToProducerConsumerMapping(allPrincipalsFilteredResourceToAcls)
val producerConsumerAclMap = aclToProducerConsumerMapping(allPrincipalsFilteredResourceToAcls)
outputAsJson(producerConsumerAclMap)
}
}
Expand All @@ -80,52 +106,52 @@ class AzPubSubAdminClientService(opts: AzPubSubAclCommandOptions) extends AdminC
}

def GetProducerAclOperations(): Set[AclOperation] = {
var dummyArgs = Array[String]("--bootstrap-server", "localhost:9092", "--add", "--allow-principal", "User:Bob", "--producer", "--topic", "Test-topic")
var dummyOpt = new AclCommandOptions(dummyArgs)
var resourceMap = AclCommand.getProducerResourceFilterToAcls(dummyOpt)
val dummyArgs = Array[String]("--bootstrap-server", "localhost:9092", "--add", "--allow-principal", "User:Bob", "--producer", "--topic", "Test-topic")
val dummyOpt = new AclCommandOptions(dummyArgs)
val resourceMap = AclCommand.getProducerResourceFilterToAcls(dummyOpt)
for ((key, value) <- resourceMap) {
if (key.resourceType() == ResourceType.TOPIC) {
var aclOperationList = Set[AclOperation]()
value.foreach(acl => aclOperationList += acl.operation())
return aclOperationList
}
}
return Set[AclOperation]()
Set[AclOperation]()
}

def GetConsumerAclOperations(): Set[AclOperation] = {
var dummyArgs = Array[String]("--bootstrap-server", "localhost:9092", "--add", "--allow-principal", "User:Bob", "--consumer", "--topic", "Test-topic", "--group", "Test-group")
var dummyOpt = new AclCommandOptions(dummyArgs)
var resourceMap = AclCommand.getConsumerResourceFilterToAcls(dummyOpt)
val dummyArgs = Array[String]("--bootstrap-server", "localhost:9092", "--add", "--allow-principal", "User:Bob", "--consumer", "--topic", "Test-topic", "--group", "Test-group")
val dummyOpt = new AclCommandOptions(dummyArgs)
val resourceMap = AclCommand.getConsumerResourceFilterToAcls(dummyOpt)
for ((key, value) <- resourceMap) {
if (key.resourceType() == ResourceType.TOPIC) {
var aclOperationList = Set[AclOperation]()
value.foreach(acl => aclOperationList += acl.operation())
return aclOperationList
}
}
return Set[AclOperation]()
Set[AclOperation]()
}

def GetGroupAclOperations(): Set[AclOperation] = {
var dummyArgs = Array[String]("--bootstrap-server", "localhost:9092", "--add", "--allow-principal", "User:Bob", "--consumer", "--topic", "Test-topic", "--group", "Test-group")
var dummyOpt = new AclCommandOptions(dummyArgs)
var resourceMap = AclCommand.getConsumerResourceFilterToAcls(dummyOpt)
val dummyArgs = Array[String]("--bootstrap-server", "localhost:9092", "--add", "--allow-principal", "User:Bob", "--consumer", "--topic", "Test-topic", "--group", "Test-group")
val dummyOpt = new AclCommandOptions(dummyArgs)
val resourceMap = AclCommand.getConsumerResourceFilterToAcls(dummyOpt)
for ((key, value) <- resourceMap) {
if (key.resourceType() == ResourceType.GROUP) {
var aclOperationList = Set[AclOperation]()
value.foreach(acl => aclOperationList += acl.operation())
return aclOperationList
}
}
return Set[AclOperation]()
Set[AclOperation]()
}

def aclToProducerConsumerMapping(resourceToAcls:Map[ResourcePattern,Set[AccessControlEntry]]):mutable.Map[ResourcePattern,mutable.Set[AzPubSubAccessControlEntry]] = {
var producerConsumerGroupAclMap = mutable.Map[ResourcePattern, mutable.Set[AzPubSubAccessControlEntry]]()
var producerAclOperations = GetProducerAclOperations()
var consumerAclOperations = GetConsumerAclOperations()
var groupAclOperations = GetGroupAclOperations()
val producerAclOperations = GetProducerAclOperations()
val consumerAclOperations = GetConsumerAclOperations()
val groupAclOperations = GetGroupAclOperations()

resourceToAcls.foreach(resource => {
producerConsumerGroupAclMap += (resource._1 -> mutable.Set[AzPubSubAccessControlEntry]())
Expand All @@ -140,35 +166,73 @@ class AzPubSubAdminClientService(opts: AzPubSubAclCommandOptions) extends AdminC
})
principalAclMap.foreach { case (principal, acls) => {
var strayAcls = acls
var filteredAclOperations = mutable.Set[AclOperation]()
var filteredAcls = acls.filter(x => (x.host() == "*" && x.permissionType() == AclPermissionType.ALLOW))
val filteredAclOperations = mutable.Set[AclOperation]()
val filteredAcls = acls.filter(x => (x.host() == "*" && x.permissionType() == AclPermissionType.ALLOW))
filteredAcls.foreach(x => filteredAclOperations.add(x.operation()))
if (resource._1.resourceType() == ResourceType.TOPIC) {
if (producerAclOperations.subsetOf(filteredAclOperations)) {
strayAcls = strayAcls.filterNot(x => (x.host() == "*" && x.permissionType() == AclPermissionType.ALLOW && producerAclOperations.contains(x.operation())))
var modifiedAcl = new AzPubSubAccessControlEntry(principal, "*", AclOperation.ANY, AclPermissionType.ALLOW, "PRODUCER")
val modifiedAcl = new AzPubSubAccessControlEntry(principal, "*", AclOperation.ANY, AclPermissionType.ALLOW, "PRODUCER")
producerConsumerGroupAclMap(resource._1).add(modifiedAcl)
}
if (consumerAclOperations.subsetOf(filteredAclOperations)) {
strayAcls = strayAcls.filterNot(x => (x.host() == "*" && x.permissionType() == AclPermissionType.ALLOW && consumerAclOperations.contains(x.operation())))
var modifiedAcl = new AzPubSubAccessControlEntry(principal, "*", AclOperation.ANY, AclPermissionType.ALLOW, "CONSUMER")
val modifiedAcl = new AzPubSubAccessControlEntry(principal, "*", AclOperation.ANY, AclPermissionType.ALLOW, "CONSUMER")
producerConsumerGroupAclMap(resource._1).add(modifiedAcl)
}
}
else if (resource._1.resourceType() == ResourceType.GROUP) {
if (groupAclOperations.subsetOf(filteredAclOperations)) {
strayAcls = strayAcls.filterNot(x => (x.host() == "*" && x.permissionType() == AclPermissionType.ALLOW && groupAclOperations.contains(x.operation())))
var modifiedAcl = new AzPubSubAccessControlEntry(principal, "*", AclOperation.ANY, AclPermissionType.ALLOW, "GROUP")
val modifiedAcl = new AzPubSubAccessControlEntry(principal, "*", AclOperation.ANY, AclPermissionType.ALLOW, "GROUP")
producerConsumerGroupAclMap(resource._1).add(modifiedAcl)
}
}
strayAcls.foreach(acl => {
var modifiedAcl = new AzPubSubAccessControlEntry(principal, acl.host(), acl.operation(), acl.permissionType(), "NONE")
val modifiedAcl = new AzPubSubAccessControlEntry(principal, acl.host(), acl.operation(), acl.permissionType(), "NONE")
producerConsumerGroupAclMap(resource._1).add(modifiedAcl)
})
}}
})
return producerConsumerGroupAclMap
producerConsumerGroupAclMap
}

def dropAclsWithSharedOperation(opt: AzPubSubAclCommandOptions, acls: Set[AccessControlEntry], principalAclOperationsMap: Map[String, Set[AclOperation]]): Set[AccessControlEntry] ={
val producerAclOperations = GetProducerAclOperations()
val consumerAclOperations = GetConsumerAclOperations()

var updatedAcls = acls
acls.foreach(acl => {
if (acl.operation() == AclOperation.DESCRIBE) {
if ((opt.options.has(opt.producerOpt) && consumerAclOperations.subsetOf(principalAclOperationsMap(acl.principal())))
|| (opt.options.has(opt.consumerOpt) && producerAclOperations.subsetOf(principalAclOperationsMap(acl.principal())))) {
updatedAcls -= acl
}
}
})

updatedAcls
}

def getPrincipalAclOperationsMap(resourceToAcls: Map[ResourcePattern, Set[AccessControlEntry]], filter: ResourcePatternFilter): Map[String, Set[AclOperation]] ={
val principalAclOperationsMap = mutable.Map[String, Set[AclOperation]]()

resourceToAcls.foreach(resource => {
if (resource._1.name() == filter.name() && resource._1.resourceType() == filter.resourceType() && resource._1.patternType() == filter.patternType()) {
resource._2.foreach(acl => {
if (acl.host() == "*" && acl.permissionType() == AclPermissionType.ALLOW) {
if (principalAclOperationsMap.contains(acl.principal())) {
principalAclOperationsMap(acl.principal()) += acl.operation()
}
else {
principalAclOperationsMap += (acl.principal() -> Set(acl.operation()))
}
}
})
}
})

principalAclOperationsMap.toMap
}
}

Expand Down
14 changes: 7 additions & 7 deletions core/src/main/scala/kafka/admin/AclCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ object AclCommand extends Logging {

class AdminClientService(val opts: AclCommandOptions) extends AclCommandService with Logging {

private def withAdminClient(opts: AclCommandOptions)(f: Admin => Unit): Unit = {
protected def withAdminClient(opts: AclCommandOptions)(f: Admin => Unit): Unit = {
val props = if (opts.options.has(opts.commandConfigOpt))
Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
else
Expand Down Expand Up @@ -171,7 +171,7 @@ object AclCommand extends Logging {
}
}

private def removeAcls(adminClient: Admin, acls: Set[AccessControlEntry], filter: ResourcePatternFilter): Unit = {
protected def removeAcls(adminClient: Admin, acls: Set[AccessControlEntry], filter: ResourcePatternFilter): Unit = {
if (acls.isEmpty)
adminClient.deleteAcls(List(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).asJava).all().get()
else {
Expand All @@ -180,7 +180,7 @@ object AclCommand extends Logging {
}
}

private def getAcls(adminClient: Admin, filters: Set[ResourcePatternFilter]): Map[ResourcePattern, Set[AccessControlEntry]] = {
protected def getAcls(adminClient: Admin, filters: Set[ResourcePatternFilter]): Map[ResourcePattern, Set[AccessControlEntry]] = {
val aclBindings =
if (filters.isEmpty) adminClient.describeAcls(AclBindingFilter.ANY).values().get().asScala.toList
else {
Expand Down Expand Up @@ -428,7 +428,7 @@ object AclCommand extends Logging {
resourceToAcl
}

private def getResourceFilterToAcls(opts: AclCommandOptions): Map[ResourcePatternFilter, Set[AccessControlEntry]] = {
def getResourceFilterToAcls(opts: AclCommandOptions): Map[ResourcePatternFilter, Set[AccessControlEntry]] = {
var resourceToAcls = Map.empty[ResourcePatternFilter, Set[AccessControlEntry]]

//if none of the --producer or --consumer options are specified , just construct ACLs from CLI options.
Expand Down Expand Up @@ -530,14 +530,14 @@ object AclCommand extends Logging {
Set.empty[String]
}

private def getPrincipals(opts: AclCommandOptions, principalOptionSpec: ArgumentAcceptingOptionSpec[String]): Set[KafkaPrincipal] = {
def getPrincipals(opts: AclCommandOptions, principalOptionSpec: ArgumentAcceptingOptionSpec[String]): Set[KafkaPrincipal] = {
if (opts.options.has(principalOptionSpec))
opts.options.valuesOf(principalOptionSpec).asScala.map(s => JSecurityUtils.parseKafkaPrincipal(s.trim)).toSet
else
Set.empty[KafkaPrincipal]
}

private def getResourceFilter(opts: AclCommandOptions, dieIfNoResourceFound: Boolean = true): Set[ResourcePatternFilter] = {
def getResourceFilter(opts: AclCommandOptions, dieIfNoResourceFound: Boolean = true): Set[ResourcePatternFilter] = {
val patternType: PatternType = opts.options.valueOf(opts.resourcePatternType)

var resourceFilters = Set.empty[ResourcePatternFilter]
Expand All @@ -563,7 +563,7 @@ object AclCommand extends Logging {
resourceFilters
}

private def confirmAction(opts: AclCommandOptions, msg: String): Boolean = {
def confirmAction(opts: AclCommandOptions, msg: String): Boolean = {
if (opts.options.has(opts.forceOpt))
return true
println(msg)
Expand Down