Skip to content

Commit

Permalink
added ACL extraction mode
Browse files Browse the repository at this point in the history
  • Loading branch information
simplesteph committed Mar 6, 2018
1 parent 6bde564 commit 0baa827
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 37 deletions.
19 changes: 18 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ Overall we use the typesafe config library to configure this project (LINK TODO)
## Environment variables
The [default configurations](src/main/resources/application.conf) can be overwritten using the following environment variables:

- `DEBUG=true`: enable debug mode (print configs etc)
- `EXTRACT=true`: enable extract mode (get all the ACLs from Kafka formatted as a CSV)
- `REFRESH_FREQUENCY_MS=10000`: how often to check for changes in ACLs in Kafka and in the Source. 10000 ms by default
- `AUTHORIZER_CLASS`: override the authorizer class if you're not using the `SimpleAclAuthorizer`
- `AUTHORIZER_ZOOKEEPER_CONNECT`: zookeeper connection string
Expand Down Expand Up @@ -120,6 +120,23 @@ Add the entry to your `/etc/hosts` file
127.0.0.1 kafka1
```

## Extracting ACLs

You can initially extract all your existing ACL in Kafka by running the program with the config `extract=true` or `export EXTRACT=true`

Output should look like:
```
[2018-03-06 21:49:44,704] INFO Running ACL Extraction mode (ExtractAcl)
[2018-03-06 21:49:44,704] INFO Getting ACLs from Kafka (ExtractAcl)
[2018-03-06 21:49:44,704] INFO Closing Authorizer (ExtractAcl)
KafkaPrincipal,ResourceType,ResourceName,Operation,PermissionType,Host
User:bob,Group,bar,Write,Deny,12.34.56.78
User:alice,Topic,foo,Read,Allow,*
User:peter,Cluster,kafka-cluster,Create,Allow,*
```

You can then use place this CSV anywhere and use it as your source of truth.

# Compatibility

Expand Down
7 changes: 5 additions & 2 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ ksm {
debug = false
debug = ${?DEBUG}

"refresh.frequency.ms" = 10000
"refresh.frequency.ms" = ${?REFRESH_FREQUENCY_MS}
extract = false
extract = ${?EXTRACT}

refresh.frequency.ms = 10000
refresh.frequency.ms = ${?REFRESH_FREQUENCY_MS}
}

authorizer {
Expand Down
1 change: 1 addition & 0 deletions src/main/scala/com/github/simplesteph/ksm/AppConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class AppConfig(config: Config) {
object KSM {
private val ksmConfig = config.getConfig("ksm")
val refreshFrequencyMs: Int = ksmConfig.getInt("refresh.frequency.ms")
val extract: Boolean = ksmConfig.getBoolean("extract")
}

}
25 changes: 25 additions & 0 deletions src/main/scala/com/github/simplesteph/ksm/ExtractAcl.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.github.simplesteph.ksm

import com.github.simplesteph.ksm.parser.AclParser
import kafka.security.auth.{ Acl, Authorizer, Resource }
import org.slf4j.{ Logger, LoggerFactory }

class ExtractAcl(
authorizer: Authorizer,
aclParser: AclParser) {

val log: Logger = LoggerFactory.getLogger(classOf[ExtractAcl].getSimpleName)

def extract(): Unit = {
log.info("Running ACL Extraction mode")
log.info("Getting ACLs from Kafka")
val kafkaAcls = authorizer.getAcls()
log.info("Closing Authorizer")
authorizer.close()
val acls: Set[(Resource, Acl)] = AclSynchronizer.flattenKafkaAcls(kafkaAcls)
val extracted = aclParser.formatAcls(acls.toList)
log.info("================ CURRENT ACLS ===============")
println(extracted)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.github.simplesteph.ksm

import java.util.concurrent.{ Executors, ScheduledFuture, TimeUnit }

import com.github.simplesteph.ksm.parser.CsvAclParser
import com.typesafe.config.ConfigFactory
import org.slf4j.LoggerFactory

Expand All @@ -12,24 +13,28 @@ object KafkaSecurityManager extends App {
val config = ConfigFactory.load()
val appConfig: AppConfig = new AppConfig(config)

val aclSynchronizer = new AclSynchronizer(
appConfig.Authorizer.authorizer,
appConfig.Source.sourceAcl,
appConfig.Notification.notification)

val executor = Executors.newScheduledThreadPool(1)
val f: ScheduledFuture[_] = executor.scheduleAtFixedRate(aclSynchronizer, 1,
appConfig.KSM.refreshFrequencyMs, TimeUnit.MILLISECONDS)

Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
aclSynchronizer.close()
log.info("Kafka Security Manager is shutting down...")
f.cancel(false)
log.info("Waiting for thread to cleanly shutdown (10 seconds maximum)")
executor.shutdown()
executor.awaitTermination(10, TimeUnit.SECONDS)
log.info("Kafka Security Manager has shut down...")
}
})
if (appConfig.KSM.extract) {
new ExtractAcl(appConfig.Authorizer.authorizer, CsvAclParser).extract()
} else {
val aclSynchronizer = new AclSynchronizer(
appConfig.Authorizer.authorizer,
appConfig.Source.sourceAcl,
appConfig.Notification.notification)

val executor = Executors.newScheduledThreadPool(1)
val f: ScheduledFuture[_] = executor.scheduleAtFixedRate(aclSynchronizer, 1,
appConfig.KSM.refreshFrequencyMs, TimeUnit.MILLISECONDS)

Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
aclSynchronizer.close()
log.info("Kafka Security Manager is shutting down...")
f.cancel(false)
log.info("Waiting for thread to cleanly shutdown (10 seconds maximum)")
executor.shutdown()
executor.awaitTermination(10, TimeUnit.SECONDS)
log.info("Kafka Security Manager has shut down...")
}
})
}
}
14 changes: 14 additions & 0 deletions src/main/scala/com/github/simplesteph/ksm/parser/AclParser.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.github.simplesteph.ksm.parser

import java.io.Reader

import com.github.simplesteph.ksm.source.SourceAclResult
import kafka.security.auth.{ Acl, Resource }

trait AclParser {

def aclsFromReader(reader: Reader): SourceAclResult

def formatAcls(acls: List[(Resource, Acl)]): String

}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.github.simplesteph.ksm.parser

import java.io.Reader
import java.io.{ OutputStream, Reader }

import com.github.simplesteph.ksm.source.SourceAclResult
import com.github.tototoshi.csv.{ CSVFormat, CSVReader, QUOTE_MINIMAL, Quoting }
import com.github.tototoshi.csv.{ CSVFormat, CSVReader, CSVWriter, QUOTE_MINIMAL, Quoting }
import kafka.security.auth._
import org.apache.kafka.common.utils.SecurityUtils

Expand All @@ -16,7 +16,7 @@ import scala.util.Try
* The CSV is expected to have headers as outlined below and in the example
* Empty lines in the CSV should be ignored
*/
object CsvParser {
object CsvAclParser extends AclParser {

final val KAFKA_PRINCIPAL_COL = "KafkaPrincipal"
final val RESOURCE_TYPE_COL = "ResourceType"
Expand All @@ -25,7 +25,7 @@ object CsvParser {
final val PERMISSION_TYPE_COL = "PermissionType"
final val HOST_COL = "Host"

final val EXPECTED_COLS = Set(
final val EXPECTED_COLS = List(
KAFKA_PRINCIPAL_COL,
RESOURCE_TYPE_COL, RESOURCE_NAME_COL, OPERATION_COL,
PERMISSION_TYPE_COL, HOST_COL)
Expand Down Expand Up @@ -64,7 +64,7 @@ object CsvParser {
* @param reader we use the reader interface to use string and files interchangeably in the parser
* @return sourceAclResult
*/
def aclsFromCsv(reader: Reader): SourceAclResult = {
override def aclsFromReader(reader: Reader): SourceAclResult = {
val csv = CSVReader.open(reader).allWithHeaders().filter(_.nonEmpty)

// parse the CSV
Expand All @@ -77,4 +77,29 @@ object CsvParser {
SourceAclResult(acls, errors)
}

def asCsv(r: Resource, a: Acl): String = {
List(
a.principal.toString,
r.resourceType.toString,
r.name,
a.operation.toString,
a.permissionType.toString,
a.host).mkString(",")
}

override def formatAcls(acls: List[(Resource, Acl)]): String = {
val sb = new StringBuilder
// header
sb.append(EXPECTED_COLS.mkString(","))
sb.append(System.getProperty("line.separator"))
// rows
acls.foreach {
case (r, a) =>
sb.append(asCsv(r, a))
sb.append(System.getProperty("line.separator"))
}
sb.toString()
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.github.simplesteph.ksm.source

import java.io.{ File, FileReader }

import com.github.simplesteph.ksm.parser.CsvParser
import com.github.simplesteph.ksm.parser.CsvAclParser
import com.typesafe.config.Config

class FileSourceAcl extends SourceAcl {
Expand Down Expand Up @@ -31,7 +31,7 @@ class FileSourceAcl extends SourceAcl {
val file = new File(filename)
if (file.lastModified() > lastModified) {
val reader = new FileReader(file)
val res = CsvParser.aclsFromCsv(reader)
val res = CsvAclParser.aclsFromReader(reader)
reader.close()
lastModified = file.lastModified()
Some(res)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import java.nio.charset.Charset
import java.util.Base64

import com.fasterxml.jackson.databind.ObjectMapper
import com.github.simplesteph.ksm.parser.CsvParser
import com.github.simplesteph.ksm.parser.CsvAclParser
import com.typesafe.config.Config
import org.slf4j.LoggerFactory
import skinny.http.{ HTTP, HTTPException, Request, Response }
Expand Down Expand Up @@ -54,7 +54,7 @@ class GitHubSourceAcl extends SourceAcl {
val b64encodedContent = objectMapper.readTree(response.textBody).get("content").asText()
val data = new String(Base64.getDecoder.decode(b64encodedContent), Charset.forName("UTF-8"))
// use the CSV Parser
Some(CsvParser.aclsFromCsv(new StringReader(data)))
Some(CsvAclParser.aclsFromReader(new StringReader(data)))
case 304 =>
None
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import kafka.security.auth._
import org.apache.kafka.common.utils.SecurityUtils
import org.scalatest.{FlatSpec, Matchers}

class CsvParserTest extends FlatSpec with Matchers {
class CsvAclParserTest extends FlatSpec with Matchers {

val row = Map(
"KafkaPrincipal" -> "User:alice",
Expand All @@ -21,7 +21,7 @@ class CsvParserTest extends FlatSpec with Matchers {
val acl = Acl(SecurityUtils.parseKafkaPrincipal("User:alice"), Allow, "*", Read)

"parseRow" should "correctly parse a Row" in {
CsvParser.parseRow(row) shouldBe((resource, acl))
CsvAclParser.parseRow(row) shouldBe((resource, acl))
}

"aclsFromCsv" should "correctly parse a Correct CSV" in {
Expand All @@ -42,7 +42,7 @@ class CsvParserTest extends FlatSpec with Matchers {
val res2 = Resource(Group, "bar")
val res3 = Resource(Cluster, "kafka-cluster")

val res = CsvParser.aclsFromCsv(new StringReader(csv))
val res = CsvAclParser.aclsFromReader(new StringReader(csv))

res.errs shouldBe List()

Expand Down Expand Up @@ -74,7 +74,7 @@ class CsvParserTest extends FlatSpec with Matchers {
val res2 = Resource(Group, "bar")
val res3 = Resource(Cluster, "kafka-cluster")

val res = CsvParser.aclsFromCsv(new StringReader(csv))
val res = CsvAclParser.aclsFromReader(new StringReader(csv))

res.errs.size shouldBe 2
val throwable1 = res.errs.head.get
Expand All @@ -101,9 +101,39 @@ class CsvParserTest extends FlatSpec with Matchers {
|User:peter,Cluster,kafka-cluster,Create,Allow
|""".stripMargin

val res = CsvParser.aclsFromCsv(new StringReader(csv))
val res = CsvAclParser.aclsFromReader(new StringReader(csv))

res.errs.size shouldBe 3

}

"asCsv" should "correctly write CSV Row" in {
val acl1 = Acl(SecurityUtils.parseKafkaPrincipal("User:alice"), Allow, "*", Read)
val res1 = Resource(Topic, "foo")
val res = CsvAclParser.asCsv(res1, acl1)
res shouldBe "User:alice,Topic,foo,Read,Allow,*"
}

"asCsv" should "correctly format entire ACL" in {
val csv =
"""KafkaPrincipal,ResourceType,ResourceName,Operation,PermissionType,Host
|User:alice,Topic,foo,Read,Allow,*
|User:bob,Group,bar,Write,Deny,12.34.56.78
|User:peter,Cluster,kafka-cluster,Create,Allow,*
|""".stripMargin


val acl1 = Acl(SecurityUtils.parseKafkaPrincipal("User:alice"), Allow, "*", Read)
val acl2 = Acl(SecurityUtils.parseKafkaPrincipal("User:bob"), Deny, "12.34.56.78", Write)
val acl3 = Acl(SecurityUtils.parseKafkaPrincipal("User:peter"), Allow, "*", Create)

val res1 = Resource(Topic, "foo")
val res2 = Resource(Group, "bar")
val res3 = Resource(Cluster, "kafka-cluster")

val res = CsvAclParser.formatAcls(List((res1, acl1),(res2, acl2), (res3, acl3)))

res shouldBe csv

}
}

0 comments on commit 0baa827

Please sign in to comment.