Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: sasl jass auth support #79

Merged
merged 1 commit into from
Dec 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/docs/tutorial/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,5 @@ The following configuration options are available for Kafka sources:
| **`bootstrapServers`** | A comma-separated list of host:port pairs to use for establishing the initial connection to the Kafka cluster. | `localhost:9092` |
| **`schemaRegistry.url`** | The URL of the schema registry. | `http://localhost:8081` |
| `schemaRegistry.userInfo` | The user info to use for authenticating with the schema registry. | |
| `sasl.mechanism` | The SASL mechanism to use for authenticating with the Kafka cluster. | `PLAIN` |
| `sasl.jaasConfig` | The JAAS configuration to use for authenticating with the Kafka cluster. | |
20 changes: 13 additions & 7 deletions libs/konfig/src/main/kotlin/io/typestream/konfig/Konfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import java.io.InputStream
import java.util.Properties
import kotlin.properties.ReadOnlyProperty
import kotlin.reflect.KClass
import kotlin.reflect.KParameter
import kotlin.reflect.full.primaryConstructor

class Konfig(source: InputStream) {
Expand Down Expand Up @@ -43,11 +44,11 @@ class Konfig(source: InputStream) {

private fun <T : Any> decodeParams(klass: KClass<T>, prefix: String): T {
val constructor = klass.primaryConstructor!!
val args = arrayOfNulls<Any>(constructor.parameters.size)
constructor.parameters.forEachIndexed { index, param ->
val args = mutableMapOf<KParameter, Any?>()
constructor.parameters.forEach { param ->
when (param.type.classifier) {
String::class -> args[index] = get(prefix, "${param.name}")
Int::class -> args[index] = get(prefix, "${param.name}")?.toInt()
String::class -> args[param] = get(prefix, "${param.name}")
Int::class -> args[param] = get(prefix, "${param.name}")?.toInt()
Map::class -> {
val map = HashMap<String, Any>()
val mapKey = props[prefix]
Expand All @@ -59,14 +60,19 @@ class Konfig(source: InputStream) {
}
}

args[index] = map
args[param] = map
}

is KClass<*> -> args[index] = decodeKlass(param.type.classifier as KClass<*>, prefix)
is KClass<*> -> {
val keyPrefix = "$prefix.${param.name}"
if (!param.type.isMarkedNullable || props.keys.count { it.toString().startsWith(keyPrefix) } > 0) {
args[param] = decodeKlass(param.type.classifier as KClass<*>, prefix)
}
}
else -> error("${param.type.classifier} is not supported")
}
}
return constructor.call(*args)
return constructor.callBy(args.filter { it.value != null })
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,21 @@ package io.typestream.konfig
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Nested
import org.junit.jupiter.api.Test
import java.io.FileInputStream

internal class KonfigSourceTest {

val defaultConfig = """
server.host=localhost
server.port=4242
server.protocol=https
db.endpoint=http://db.local:5432
""".trimIndent().toByteArray().inputStream()

@KonfigSource(prefix = "server")
data class ServerConfig(val host: String, val port: Int, val protocol: String = "http")

@KonfigSource(prefix = "server")
data class ServerConfig(val host: String, val port: Int, val protocol: String?)
data class ServerConfigWithNullables(val host: String, val port: Int, val protocol: String? = null)

@KonfigSource(prefix = "db")
data class DbConfig(val endpoint: String)
Expand All @@ -19,6 +28,9 @@ internal class KonfigSourceTest {
@KonfigSource
data class NestedConfig(val db: DbConfig, val server: ServerConfig)

@KonfigSource
data class NestedConfigWithNullables(val db: DbConfig, val server: ServerConfig? = null)

@KonfigSource(prefix = "nested")
data class NestedWithPrefix(val db: DbConfig, val server: ServerConfig)

Expand All @@ -28,10 +40,10 @@ internal class KonfigSourceTest {
val serverConfig by konfig.inject<ServerConfig>()
}

val konfig = Konfig(FileInputStream("src/test/resources/application.properties"))
val konfig = Konfig(defaultConfig)
val app = App(konfig)

assertThat(app.serverConfig).extracting("host", "port").containsExactly("localhost", 4242)
assertThat(app.serverConfig).extracting("host", "port", "protocol").containsExactly("localhost", 4242, "https")
}

@Test
Expand All @@ -40,12 +52,28 @@ internal class KonfigSourceTest {
val config by konfig.inject<Config>()
}

val konfig = Konfig(FileInputStream("src/test/resources/application.properties"))
val konfig = Konfig(defaultConfig)
val app = App(konfig)

assertThat(app.config).extracting("serverHost", "serverPort").containsExactly("localhost", 4242)
}

@Test
fun `loads a simple config with nullables`() {
class App(konfig: Konfig) {
val config by konfig.inject<ServerConfigWithNullables>()
}

val konfig = Konfig("""
server.host=localhost
server.port=4242
db.endpoint=http://db.local:5432
""".trimIndent().toByteArray().inputStream())
val app = App(konfig)

assertThat(app.config).extracting("host", "port", "protocol").containsExactly("localhost", 4242, null)
}

@Nested
inner class NestedTest {
@Test
Expand All @@ -54,7 +82,7 @@ internal class KonfigSourceTest {
val config by konfig.inject<NestedConfig>()
}

val konfig = Konfig(FileInputStream("src/test/resources/application.properties"))
val konfig = Konfig(defaultConfig)
val app = App(konfig)

assertThat(app.config)
Expand All @@ -68,13 +96,32 @@ internal class KonfigSourceTest {
val config by konfig.inject<NestedWithPrefix>()
}

val konfig = Konfig(FileInputStream("src/test/resources/nested-prefix.properties"))
val konfig = Konfig(
"""
nested.server.host=localhost
nested.server.port=4242
nested.db.endpoint=http://db.local:5432
""".trimIndent().toByteArray().inputStream()
)
val app = App(konfig)

assertThat(app.config)
.extracting("db.endpoint", "server.host", "server.port")
.containsExactly("http://db.local:5432", "localhost", 4242)
}

@Test
fun `loads a nested config with nullables`() {
class App(konfig: Konfig) {
val config by konfig.inject<NestedConfigWithNullables>()
}

val konfig = Konfig("db.endpoint=http://db.local:5432".trimIndent().toByteArray().inputStream())
val app = App(konfig)

assertThat(app.config)
.extracting("db.endpoint").isEqualTo("http://db.local:5432")
}
}

@Test
Expand All @@ -83,7 +130,12 @@ internal class KonfigSourceTest {
val config by konfig.inject<ServerConfig>()
}

val konfig = Konfig(FileInputStream("src/test/resources/comments.properties"))
val konfig = Konfig("""
server.host=localhost
server.port=4242
# db.endpoint=http://db.local:5432
""".trimIndent().toByteArray().inputStream())

val app = App(konfig)

assertThat(app.config).extracting("host", "port").containsExactly("localhost", 4242)
Expand Down
3 changes: 0 additions & 3 deletions libs/konfig/src/test/resources/application.properties

This file was deleted.

3 changes: 0 additions & 3 deletions libs/konfig/src/test/resources/comments.properties

This file was deleted.

3 changes: 0 additions & 3 deletions libs/konfig/src/test/resources/nested-prefix.properties

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package io.typestream.testing.konfig

import io.typestream.konfig.Konfig
import io.typestream.testing.TestKafka
import org.testcontainers.redpanda.RedpandaContainer
import java.io.ByteArrayInputStream

private fun kafkaProperties(testKafka: RedpandaContainer) = """
grpc.port=0
sources.kafka=local
sources.kafka.local.bootstrapServers=${testKafka.bootstrapServers}
sources.kafka.local.schemaRegistry.url=${testKafka.schemaRegistryAddress}
sources.kafka.local.fsRefreshRate=1
""".trimIndent()
""".trimIndent()

fun testKonfig(testKafka: TestKafka) =
io.typestream.konfig.Konfig(ByteArrayInputStream(kafkaProperties(testKafka).toByteArray()))
fun testKonfig(testKafka: TestKafka) = Konfig(kafkaProperties(testKafka).toByteArray().inputStream())
1 change: 1 addition & 0 deletions server/src/main/kotlin/io/typestream/config/KafkaConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ data class KafkaConfig(
val bootstrapServers: String,
val fsRefreshRate: Int = 60,
val schemaRegistry: SchemaRegistryConfig,
val saslConfig: SaslConfig? = null,
)
6 changes: 6 additions & 0 deletions server/src/main/kotlin/io/typestream/config/SaslConfig.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.typestream.config

import io.typestream.konfig.KonfigSource

@KonfigSource("sasl")
data class SaslConfig(val mechanism: String = "PLAIN", val jaasConfig: String)
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import io.typestream.config.KafkaConfig
import org.apache.kafka.clients.admin.Admin
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.admin.KafkaAdminClient
import org.apache.kafka.common.config.SaslConfigs
import java.util.Properties


Expand All @@ -16,6 +17,12 @@ class KafkaAdminClient(kafkaConfig: KafkaConfig) {
val props = Properties()
props[AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG] = kafkaConfig.bootstrapServers

if (kafkaConfig.saslConfig != null) {
props[AdminClientConfig.SECURITY_PROTOCOL_CONFIG] = "SASL_SSL"
props[SaslConfigs.SASL_MECHANISM] = kafkaConfig.saslConfig.mechanism
props[SaslConfigs.SASL_JAAS_CONFIG] = kafkaConfig.saslConfig.jaasConfig
}

this.kafkaAdminClient = KafkaAdminClient.create(props)
logger.info { "kafka admin created" }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import io.typestream.kafka.StreamsBuilderWrapper
import kotlinx.coroutines.flow.flow
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.Topology
Expand Down Expand Up @@ -99,6 +100,12 @@ class KafkaStreamsJob(override val id: String, val program: Program, private val
props["schema.registry.userInfo"] = kafkaConfig.schemaRegistry.userInfo
}

if (kafkaConfig.saslConfig != null) {
props[StreamsConfig.SECURITY_PROTOCOL_CONFIG] = "SASL_SSL"
props[SaslConfigs.SASL_MECHANISM] = kafkaConfig.saslConfig.mechanism
props[SaslConfigs.SASL_JAAS_CONFIG] = kafkaConfig.saslConfig.jaasConfig
}

return props
}

Expand Down
2 changes: 1 addition & 1 deletion tools/src/main/kotlin/io/typestream/tools/Config.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import io.typestream.konfig.KonfigSource


@KonfigSource("schemaRegistry")
data class SchemaRegistryConfig(val url: String, val userInfo: String?)
data class SchemaRegistryConfig(val url: String, val userInfo: String? = null)

data class KafkaConfig(val bootstrapServers: String, val schemaRegistry: SchemaRegistryConfig)

Expand Down