Skip to content

Commit

Permalink
feat: sasl jass auth support (#79)
Browse files Browse the repository at this point in the history
  • Loading branch information
lucapette authored Dec 17, 2023
1 parent 815a47c commit 63cbc52
Show file tree
Hide file tree
Showing 12 changed files with 100 additions and 29 deletions.
2 changes: 2 additions & 0 deletions docs/docs/tutorial/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,5 @@ The following configuration options are available for Kafka sources:
| **`bootstrapServers`** | A comma-separated list of host:port pairs to use for establishing the initial connection to the Kafka cluster. | `localhost:9092` |
| **`schemaRegistry.url`** | The URL of the schema registry. | `http://localhost:8081` |
| `schemaRegistry.userInfo` | The user info to use for authenticating with the schema registry. | |
| `sasl.mechanism` | The SASL mechanism to use for authenticating with the Kafka cluster. | `PLAIN` |
| `sasl.jaasConfig` | The JAAS configuration to use for authenticating with the Kafka cluster. | |
20 changes: 13 additions & 7 deletions libs/konfig/src/main/kotlin/io/typestream/konfig/Konfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import java.io.InputStream
import java.util.Properties
import kotlin.properties.ReadOnlyProperty
import kotlin.reflect.KClass
import kotlin.reflect.KParameter
import kotlin.reflect.full.primaryConstructor

class Konfig(source: InputStream) {
Expand Down Expand Up @@ -43,11 +44,11 @@ class Konfig(source: InputStream) {

private fun <T : Any> decodeParams(klass: KClass<T>, prefix: String): T {
val constructor = klass.primaryConstructor!!
val args = arrayOfNulls<Any>(constructor.parameters.size)
constructor.parameters.forEachIndexed { index, param ->
val args = mutableMapOf<KParameter, Any?>()
constructor.parameters.forEach { param ->
when (param.type.classifier) {
String::class -> args[index] = get(prefix, "${param.name}")
Int::class -> args[index] = get(prefix, "${param.name}")?.toInt()
String::class -> args[param] = get(prefix, "${param.name}")
Int::class -> args[param] = get(prefix, "${param.name}")?.toInt()
Map::class -> {
val map = HashMap<String, Any>()
val mapKey = props[prefix]
Expand All @@ -59,14 +60,19 @@ class Konfig(source: InputStream) {
}
}

args[index] = map
args[param] = map
}

is KClass<*> -> args[index] = decodeKlass(param.type.classifier as KClass<*>, prefix)
is KClass<*> -> {
val keyPrefix = "$prefix.${param.name}"
if (!param.type.isMarkedNullable || props.keys.count { it.toString().startsWith(keyPrefix) } > 0) {
args[param] = decodeKlass(param.type.classifier as KClass<*>, prefix)
}
}
else -> error("${param.type.classifier} is not supported")
}
}
return constructor.call(*args)
return constructor.callBy(args.filter { it.value != null })
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,21 @@ package io.typestream.konfig
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Nested
import org.junit.jupiter.api.Test
import java.io.FileInputStream

internal class KonfigSourceTest {

val defaultConfig = """
server.host=localhost
server.port=4242
server.protocol=https
db.endpoint=http://db.local:5432
""".trimIndent().toByteArray().inputStream()

@KonfigSource(prefix = "server")
data class ServerConfig(val host: String, val port: Int, val protocol: String = "http")

@KonfigSource(prefix = "server")
data class ServerConfig(val host: String, val port: Int, val protocol: String?)
data class ServerConfigWithNullables(val host: String, val port: Int, val protocol: String? = null)

@KonfigSource(prefix = "db")
data class DbConfig(val endpoint: String)
Expand All @@ -19,6 +28,9 @@ internal class KonfigSourceTest {
@KonfigSource
data class NestedConfig(val db: DbConfig, val server: ServerConfig)

@KonfigSource
data class NestedConfigWithNullables(val db: DbConfig, val server: ServerConfig? = null)

@KonfigSource(prefix = "nested")
data class NestedWithPrefix(val db: DbConfig, val server: ServerConfig)

Expand All @@ -28,10 +40,10 @@ internal class KonfigSourceTest {
val serverConfig by konfig.inject<ServerConfig>()
}

val konfig = Konfig(FileInputStream("src/test/resources/application.properties"))
val konfig = Konfig(defaultConfig)
val app = App(konfig)

assertThat(app.serverConfig).extracting("host", "port").containsExactly("localhost", 4242)
assertThat(app.serverConfig).extracting("host", "port", "protocol").containsExactly("localhost", 4242, "https")
}

@Test
Expand All @@ -40,12 +52,28 @@ internal class KonfigSourceTest {
val config by konfig.inject<Config>()
}

val konfig = Konfig(FileInputStream("src/test/resources/application.properties"))
val konfig = Konfig(defaultConfig)
val app = App(konfig)

assertThat(app.config).extracting("serverHost", "serverPort").containsExactly("localhost", 4242)
}

@Test
fun `loads a simple config with nullables`() {
class App(konfig: Konfig) {
val config by konfig.inject<ServerConfigWithNullables>()
}

val konfig = Konfig("""
server.host=localhost
server.port=4242
db.endpoint=http://db.local:5432
""".trimIndent().toByteArray().inputStream())
val app = App(konfig)

assertThat(app.config).extracting("host", "port", "protocol").containsExactly("localhost", 4242, null)
}

@Nested
inner class NestedTest {
@Test
Expand All @@ -54,7 +82,7 @@ internal class KonfigSourceTest {
val config by konfig.inject<NestedConfig>()
}

val konfig = Konfig(FileInputStream("src/test/resources/application.properties"))
val konfig = Konfig(defaultConfig)
val app = App(konfig)

assertThat(app.config)
Expand All @@ -68,13 +96,32 @@ internal class KonfigSourceTest {
val config by konfig.inject<NestedWithPrefix>()
}

val konfig = Konfig(FileInputStream("src/test/resources/nested-prefix.properties"))
val konfig = Konfig(
"""
nested.server.host=localhost
nested.server.port=4242
nested.db.endpoint=http://db.local:5432
""".trimIndent().toByteArray().inputStream()
)
val app = App(konfig)

assertThat(app.config)
.extracting("db.endpoint", "server.host", "server.port")
.containsExactly("http://db.local:5432", "localhost", 4242)
}

@Test
fun `loads a nested config with nullables`() {
class App(konfig: Konfig) {
val config by konfig.inject<NestedConfigWithNullables>()
}

val konfig = Konfig("db.endpoint=http://db.local:5432".trimIndent().toByteArray().inputStream())
val app = App(konfig)

assertThat(app.config)
.extracting("db.endpoint").isEqualTo("http://db.local:5432")
}
}

@Test
Expand All @@ -83,7 +130,12 @@ internal class KonfigSourceTest {
val config by konfig.inject<ServerConfig>()
}

val konfig = Konfig(FileInputStream("src/test/resources/comments.properties"))
val konfig = Konfig("""
server.host=localhost
server.port=4242
# db.endpoint=http://db.local:5432
""".trimIndent().toByteArray().inputStream())

val app = App(konfig)

assertThat(app.config).extracting("host", "port").containsExactly("localhost", 4242)
Expand Down
3 changes: 0 additions & 3 deletions libs/konfig/src/test/resources/application.properties

This file was deleted.

3 changes: 0 additions & 3 deletions libs/konfig/src/test/resources/comments.properties

This file was deleted.

3 changes: 0 additions & 3 deletions libs/konfig/src/test/resources/nested-prefix.properties

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package io.typestream.testing.konfig

import io.typestream.konfig.Konfig
import io.typestream.testing.TestKafka
import org.testcontainers.redpanda.RedpandaContainer
import java.io.ByteArrayInputStream

private fun kafkaProperties(testKafka: RedpandaContainer) = """
grpc.port=0
sources.kafka=local
sources.kafka.local.bootstrapServers=${testKafka.bootstrapServers}
sources.kafka.local.schemaRegistry.url=${testKafka.schemaRegistryAddress}
sources.kafka.local.fsRefreshRate=1
""".trimIndent()
""".trimIndent()

fun testKonfig(testKafka: TestKafka) =
io.typestream.konfig.Konfig(ByteArrayInputStream(kafkaProperties(testKafka).toByteArray()))
fun testKonfig(testKafka: TestKafka) = Konfig(kafkaProperties(testKafka).toByteArray().inputStream())
1 change: 1 addition & 0 deletions server/src/main/kotlin/io/typestream/config/KafkaConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ data class KafkaConfig(
val bootstrapServers: String,
val fsRefreshRate: Int = 60,
val schemaRegistry: SchemaRegistryConfig,
val saslConfig: SaslConfig? = null,
)
6 changes: 6 additions & 0 deletions server/src/main/kotlin/io/typestream/config/SaslConfig.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.typestream.config

import io.typestream.konfig.KonfigSource

@KonfigSource("sasl")
data class SaslConfig(val mechanism: String = "PLAIN", val jaasConfig: String)
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import io.typestream.config.KafkaConfig
import org.apache.kafka.clients.admin.Admin
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.admin.KafkaAdminClient
import org.apache.kafka.common.config.SaslConfigs
import java.util.Properties


Expand All @@ -16,6 +17,12 @@ class KafkaAdminClient(kafkaConfig: KafkaConfig) {
val props = Properties()
props[AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG] = kafkaConfig.bootstrapServers

if (kafkaConfig.saslConfig != null) {
props[AdminClientConfig.SECURITY_PROTOCOL_CONFIG] = "SASL_SSL"
props[SaslConfigs.SASL_MECHANISM] = kafkaConfig.saslConfig.mechanism
props[SaslConfigs.SASL_JAAS_CONFIG] = kafkaConfig.saslConfig.jaasConfig
}

this.kafkaAdminClient = KafkaAdminClient.create(props)
logger.info { "kafka admin created" }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import io.typestream.kafka.StreamsBuilderWrapper
import kotlinx.coroutines.flow.flow
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.Topology
Expand Down Expand Up @@ -99,6 +100,12 @@ class KafkaStreamsJob(override val id: String, val program: Program, private val
props["schema.registry.userInfo"] = kafkaConfig.schemaRegistry.userInfo
}

if (kafkaConfig.saslConfig != null) {
props[StreamsConfig.SECURITY_PROTOCOL_CONFIG] = "SASL_SSL"
props[SaslConfigs.SASL_MECHANISM] = kafkaConfig.saslConfig.mechanism
props[SaslConfigs.SASL_JAAS_CONFIG] = kafkaConfig.saslConfig.jaasConfig
}

return props
}

Expand Down
2 changes: 1 addition & 1 deletion tools/src/main/kotlin/io/typestream/tools/Config.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import io.typestream.konfig.KonfigSource


@KonfigSource("schemaRegistry")
data class SchemaRegistryConfig(val url: String, val userInfo: String?)
data class SchemaRegistryConfig(val url: String, val userInfo: String? = null)

data class KafkaConfig(val bootstrapServers: String, val schemaRegistry: SchemaRegistryConfig)

Expand Down

0 comments on commit 63cbc52

Please sign in to comment.