From 63cbc5216b0da4ee4f754fad0d69a244f8d092c6 Mon Sep 17 00:00:00 2001 From: Luca Pette Date: Sun, 17 Dec 2023 22:23:05 +0100 Subject: [PATCH] feat: sasl jass auth support (#79) --- docs/docs/tutorial/configuration.md | 2 + .../kotlin/io/typestream/konfig/Konfig.kt | 20 ++++-- .../io/typestream/konfig/KonfigSourceTest.kt | 68 ++++++++++++++++--- .../src/test/resources/application.properties | 3 - .../src/test/resources/comments.properties | 3 - .../test/resources/nested-prefix.properties | 3 - .../typestream/testing/konfig/TestKonfig.kt | 7 +- .../io/typestream/config/KafkaConfig.kt | 1 + .../kotlin/io/typestream/config/SaslConfig.kt | 6 ++ .../io/typestream/kafka/KafkaAdminClient.kt | 7 ++ .../typestream/scheduler/KafkaStreamsJob.kt | 7 ++ .../main/kotlin/io/typestream/tools/Config.kt | 2 +- 12 files changed, 100 insertions(+), 29 deletions(-) delete mode 100644 libs/konfig/src/test/resources/application.properties delete mode 100644 libs/konfig/src/test/resources/comments.properties delete mode 100644 libs/konfig/src/test/resources/nested-prefix.properties create mode 100644 server/src/main/kotlin/io/typestream/config/SaslConfig.kt diff --git a/docs/docs/tutorial/configuration.md b/docs/docs/tutorial/configuration.md index 55f3f74..235a299 100644 --- a/docs/docs/tutorial/configuration.md +++ b/docs/docs/tutorial/configuration.md @@ -39,3 +39,5 @@ The following configuration options are available for Kafka sources: | **`bootstrapServers`** | A comma-separated list of host:port pairs to use for establishing the initial connection to the Kafka cluster. | `localhost:9092` | | **`schemaRegistry.url`** | The URL of the schema registry. | `http://localhost:8081` | | `schemaRegistry.userInfo` | The user info to use for authenticating with the schema registry. | | +| `sasl.mechanism` | The SASL mechanism to use for authenticating with the Kafka cluster. | `PLAIN` | +| `sasl.jaasConfig` | The JAAS configuration to use for authenticating with the Kafka cluster. | | diff --git a/libs/konfig/src/main/kotlin/io/typestream/konfig/Konfig.kt b/libs/konfig/src/main/kotlin/io/typestream/konfig/Konfig.kt index 0ec9a03..286b0b6 100644 --- a/libs/konfig/src/main/kotlin/io/typestream/konfig/Konfig.kt +++ b/libs/konfig/src/main/kotlin/io/typestream/konfig/Konfig.kt @@ -4,6 +4,7 @@ import java.io.InputStream import java.util.Properties import kotlin.properties.ReadOnlyProperty import kotlin.reflect.KClass +import kotlin.reflect.KParameter import kotlin.reflect.full.primaryConstructor class Konfig(source: InputStream) { @@ -43,11 +44,11 @@ class Konfig(source: InputStream) { private fun decodeParams(klass: KClass, prefix: String): T { val constructor = klass.primaryConstructor!! - val args = arrayOfNulls(constructor.parameters.size) - constructor.parameters.forEachIndexed { index, param -> + val args = mutableMapOf() + constructor.parameters.forEach { param -> when (param.type.classifier) { - String::class -> args[index] = get(prefix, "${param.name}") - Int::class -> args[index] = get(prefix, "${param.name}")?.toInt() + String::class -> args[param] = get(prefix, "${param.name}") + Int::class -> args[param] = get(prefix, "${param.name}")?.toInt() Map::class -> { val map = HashMap() val mapKey = props[prefix] @@ -59,14 +60,19 @@ class Konfig(source: InputStream) { } } - args[index] = map + args[param] = map } - is KClass<*> -> args[index] = decodeKlass(param.type.classifier as KClass<*>, prefix) + is KClass<*> -> { + val keyPrefix = "$prefix.${param.name}" + if (!param.type.isMarkedNullable || props.keys.count { it.toString().startsWith(keyPrefix) } > 0) { + args[param] = decodeKlass(param.type.classifier as KClass<*>, prefix) + } + } else -> error("${param.type.classifier} is not supported") } } - return constructor.call(*args) + return constructor.callBy(args.filter { it.value != null }) } } diff --git a/libs/konfig/src/test/kotlin/io/typestream/konfig/KonfigSourceTest.kt b/libs/konfig/src/test/kotlin/io/typestream/konfig/KonfigSourceTest.kt index 8d631a9..0c164b4 100644 --- a/libs/konfig/src/test/kotlin/io/typestream/konfig/KonfigSourceTest.kt +++ b/libs/konfig/src/test/kotlin/io/typestream/konfig/KonfigSourceTest.kt @@ -3,12 +3,21 @@ package io.typestream.konfig import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Nested import org.junit.jupiter.api.Test -import java.io.FileInputStream internal class KonfigSourceTest { + val defaultConfig = """ + server.host=localhost + server.port=4242 + server.protocol=https + db.endpoint=http://db.local:5432 + """.trimIndent().toByteArray().inputStream() + + @KonfigSource(prefix = "server") + data class ServerConfig(val host: String, val port: Int, val protocol: String = "http") + @KonfigSource(prefix = "server") - data class ServerConfig(val host: String, val port: Int, val protocol: String?) + data class ServerConfigWithNullables(val host: String, val port: Int, val protocol: String? = null) @KonfigSource(prefix = "db") data class DbConfig(val endpoint: String) @@ -19,6 +28,9 @@ internal class KonfigSourceTest { @KonfigSource data class NestedConfig(val db: DbConfig, val server: ServerConfig) + @KonfigSource + data class NestedConfigWithNullables(val db: DbConfig, val server: ServerConfig? = null) + @KonfigSource(prefix = "nested") data class NestedWithPrefix(val db: DbConfig, val server: ServerConfig) @@ -28,10 +40,10 @@ internal class KonfigSourceTest { val serverConfig by konfig.inject() } - val konfig = Konfig(FileInputStream("src/test/resources/application.properties")) + val konfig = Konfig(defaultConfig) val app = App(konfig) - assertThat(app.serverConfig).extracting("host", "port").containsExactly("localhost", 4242) + assertThat(app.serverConfig).extracting("host", "port", "protocol").containsExactly("localhost", 4242, "https") } @Test @@ -40,12 +52,28 @@ internal class KonfigSourceTest { val config by konfig.inject() } - val konfig = Konfig(FileInputStream("src/test/resources/application.properties")) + val konfig = Konfig(defaultConfig) val app = App(konfig) assertThat(app.config).extracting("serverHost", "serverPort").containsExactly("localhost", 4242) } + @Test + fun `loads a simple config with nullables`() { + class App(konfig: Konfig) { + val config by konfig.inject() + } + + val konfig = Konfig(""" + server.host=localhost + server.port=4242 + db.endpoint=http://db.local:5432 + """.trimIndent().toByteArray().inputStream()) + val app = App(konfig) + + assertThat(app.config).extracting("host", "port", "protocol").containsExactly("localhost", 4242, null) + } + @Nested inner class NestedTest { @Test @@ -54,7 +82,7 @@ internal class KonfigSourceTest { val config by konfig.inject() } - val konfig = Konfig(FileInputStream("src/test/resources/application.properties")) + val konfig = Konfig(defaultConfig) val app = App(konfig) assertThat(app.config) @@ -68,13 +96,32 @@ internal class KonfigSourceTest { val config by konfig.inject() } - val konfig = Konfig(FileInputStream("src/test/resources/nested-prefix.properties")) + val konfig = Konfig( + """ + nested.server.host=localhost + nested.server.port=4242 + nested.db.endpoint=http://db.local:5432 + """.trimIndent().toByteArray().inputStream() + ) val app = App(konfig) assertThat(app.config) .extracting("db.endpoint", "server.host", "server.port") .containsExactly("http://db.local:5432", "localhost", 4242) } + + @Test + fun `loads a nested config with nullables`() { + class App(konfig: Konfig) { + val config by konfig.inject() + } + + val konfig = Konfig("db.endpoint=http://db.local:5432".trimIndent().toByteArray().inputStream()) + val app = App(konfig) + + assertThat(app.config) + .extracting("db.endpoint").isEqualTo("http://db.local:5432") + } } @Test @@ -83,7 +130,12 @@ internal class KonfigSourceTest { val config by konfig.inject() } - val konfig = Konfig(FileInputStream("src/test/resources/comments.properties")) + val konfig = Konfig(""" + server.host=localhost + server.port=4242 + # db.endpoint=http://db.local:5432 + """.trimIndent().toByteArray().inputStream()) + val app = App(konfig) assertThat(app.config).extracting("host", "port").containsExactly("localhost", 4242) diff --git a/libs/konfig/src/test/resources/application.properties b/libs/konfig/src/test/resources/application.properties deleted file mode 100644 index 2545c2f..0000000 --- a/libs/konfig/src/test/resources/application.properties +++ /dev/null @@ -1,3 +0,0 @@ -server.host=localhost -server.port=4242 -db.endpoint=http://db.local:5432 \ No newline at end of file diff --git a/libs/konfig/src/test/resources/comments.properties b/libs/konfig/src/test/resources/comments.properties deleted file mode 100644 index 1ee5242..0000000 --- a/libs/konfig/src/test/resources/comments.properties +++ /dev/null @@ -1,3 +0,0 @@ -# comment -server.host=localhost -server.port=4242 diff --git a/libs/konfig/src/test/resources/nested-prefix.properties b/libs/konfig/src/test/resources/nested-prefix.properties deleted file mode 100644 index b5456bd..0000000 --- a/libs/konfig/src/test/resources/nested-prefix.properties +++ /dev/null @@ -1,3 +0,0 @@ -nested.server.host=localhost -nested.server.port=4242 -nested.db.endpoint=http://db.local:5432 diff --git a/libs/testing/src/main/kotlin/io/typestream/testing/konfig/TestKonfig.kt b/libs/testing/src/main/kotlin/io/typestream/testing/konfig/TestKonfig.kt index e5034a5..eb1c823 100644 --- a/libs/testing/src/main/kotlin/io/typestream/testing/konfig/TestKonfig.kt +++ b/libs/testing/src/main/kotlin/io/typestream/testing/konfig/TestKonfig.kt @@ -1,8 +1,8 @@ package io.typestream.testing.konfig +import io.typestream.konfig.Konfig import io.typestream.testing.TestKafka import org.testcontainers.redpanda.RedpandaContainer -import java.io.ByteArrayInputStream private fun kafkaProperties(testKafka: RedpandaContainer) = """ grpc.port=0 @@ -10,7 +10,6 @@ sources.kafka=local sources.kafka.local.bootstrapServers=${testKafka.bootstrapServers} sources.kafka.local.schemaRegistry.url=${testKafka.schemaRegistryAddress} sources.kafka.local.fsRefreshRate=1 - """.trimIndent() +""".trimIndent() -fun testKonfig(testKafka: TestKafka) = - io.typestream.konfig.Konfig(ByteArrayInputStream(kafkaProperties(testKafka).toByteArray())) +fun testKonfig(testKafka: TestKafka) = Konfig(kafkaProperties(testKafka).toByteArray().inputStream()) diff --git a/server/src/main/kotlin/io/typestream/config/KafkaConfig.kt b/server/src/main/kotlin/io/typestream/config/KafkaConfig.kt index 7f5c052..a193b25 100644 --- a/server/src/main/kotlin/io/typestream/config/KafkaConfig.kt +++ b/server/src/main/kotlin/io/typestream/config/KafkaConfig.kt @@ -4,4 +4,5 @@ data class KafkaConfig( val bootstrapServers: String, val fsRefreshRate: Int = 60, val schemaRegistry: SchemaRegistryConfig, + val saslConfig: SaslConfig? = null, ) diff --git a/server/src/main/kotlin/io/typestream/config/SaslConfig.kt b/server/src/main/kotlin/io/typestream/config/SaslConfig.kt new file mode 100644 index 0000000..ae52609 --- /dev/null +++ b/server/src/main/kotlin/io/typestream/config/SaslConfig.kt @@ -0,0 +1,6 @@ +package io.typestream.config + +import io.typestream.konfig.KonfigSource + +@KonfigSource("sasl") +data class SaslConfig(val mechanism: String = "PLAIN", val jaasConfig: String) diff --git a/server/src/main/kotlin/io/typestream/kafka/KafkaAdminClient.kt b/server/src/main/kotlin/io/typestream/kafka/KafkaAdminClient.kt index e2d6339..e5b882e 100644 --- a/server/src/main/kotlin/io/typestream/kafka/KafkaAdminClient.kt +++ b/server/src/main/kotlin/io/typestream/kafka/KafkaAdminClient.kt @@ -5,6 +5,7 @@ import io.typestream.config.KafkaConfig import org.apache.kafka.clients.admin.Admin import org.apache.kafka.clients.admin.AdminClientConfig import org.apache.kafka.clients.admin.KafkaAdminClient +import org.apache.kafka.common.config.SaslConfigs import java.util.Properties @@ -16,6 +17,12 @@ class KafkaAdminClient(kafkaConfig: KafkaConfig) { val props = Properties() props[AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG] = kafkaConfig.bootstrapServers + if (kafkaConfig.saslConfig != null) { + props[AdminClientConfig.SECURITY_PROTOCOL_CONFIG] = "SASL_SSL" + props[SaslConfigs.SASL_MECHANISM] = kafkaConfig.saslConfig.mechanism + props[SaslConfigs.SASL_JAAS_CONFIG] = kafkaConfig.saslConfig.jaasConfig + } + this.kafkaAdminClient = KafkaAdminClient.create(props) logger.info { "kafka admin created" } } diff --git a/server/src/main/kotlin/io/typestream/scheduler/KafkaStreamsJob.kt b/server/src/main/kotlin/io/typestream/scheduler/KafkaStreamsJob.kt index e8f9072..fffac9a 100644 --- a/server/src/main/kotlin/io/typestream/scheduler/KafkaStreamsJob.kt +++ b/server/src/main/kotlin/io/typestream/scheduler/KafkaStreamsJob.kt @@ -12,6 +12,7 @@ import io.typestream.kafka.StreamsBuilderWrapper import kotlinx.coroutines.flow.flow import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.streams.KafkaStreams import org.apache.kafka.streams.StreamsConfig import org.apache.kafka.streams.Topology @@ -99,6 +100,12 @@ class KafkaStreamsJob(override val id: String, val program: Program, private val props["schema.registry.userInfo"] = kafkaConfig.schemaRegistry.userInfo } + if (kafkaConfig.saslConfig != null) { + props[StreamsConfig.SECURITY_PROTOCOL_CONFIG] = "SASL_SSL" + props[SaslConfigs.SASL_MECHANISM] = kafkaConfig.saslConfig.mechanism + props[SaslConfigs.SASL_JAAS_CONFIG] = kafkaConfig.saslConfig.jaasConfig + } + return props } diff --git a/tools/src/main/kotlin/io/typestream/tools/Config.kt b/tools/src/main/kotlin/io/typestream/tools/Config.kt index eb10625..e6aa667 100644 --- a/tools/src/main/kotlin/io/typestream/tools/Config.kt +++ b/tools/src/main/kotlin/io/typestream/tools/Config.kt @@ -5,7 +5,7 @@ import io.typestream.konfig.KonfigSource @KonfigSource("schemaRegistry") -data class SchemaRegistryConfig(val url: String, val userInfo: String?) +data class SchemaRegistryConfig(val url: String, val userInfo: String? = null) data class KafkaConfig(val bootstrapServers: String, val schemaRegistry: SchemaRegistryConfig)