From a429d5720fcee900703ed31575c2ecf6eef87b6b Mon Sep 17 00:00:00 2001 From: Sergey Okatov Date: Tue, 4 Jun 2024 23:17:45 +0500 Subject: [PATCH] m5l5 - Kafka (#23) * m5l5 - Kafka * m5l5 - Kafka: refining --- README.md | 17 ++- deploy/docker-compose-kafka-kraft.yaml | 37 ++++++ deploy/docker-compose-kafka-zk.yml | 55 +++++++++ deploy/volumes/.gitignore | 1 + gradle.properties | 2 +- gradle/libs.versions.toml | 1 + ok-marketplace-be/build.gradle.kts | 2 + .../ok-marketplace-app-kafka/actions.md | 31 +++++ .../ok-marketplace-app-kafka/build.gradle.kts | 35 ++++++ .../src/main/kotlin/AppKafkaConfig.kt | 36 ++++++ .../src/main/kotlin/AppKafkaConsumer.kt | 110 ++++++++++++++++++ .../src/main/kotlin/ConsumerStrategyV1.kt | 25 ++++ .../src/main/kotlin/ConsumerStrategyV2.kt | 25 ++++ .../src/main/kotlin/IConsumerStrategy.kt | 21 ++++ .../src/main/kotlin/InputOutputTopics.kt | 15 +++ .../src/main/kotlin/Main.kt | 7 ++ .../src/main/kotlin/kafkaUtils.kt | 28 +++++ .../src/main/resources/logback.xml | 13 +++ .../src/test/kotlin/KafkaControllerTest.kt | 79 +++++++++++++ .../src/test/kotlin/SimpleKafkaTest.kt | 83 +++++++++++++ ok-marketplace-be/settings.gradle.kts | 1 + 21 files changed, 622 insertions(+), 2 deletions(-) create mode 100644 deploy/docker-compose-kafka-kraft.yaml create mode 100644 deploy/docker-compose-kafka-zk.yml create mode 100644 ok-marketplace-be/ok-marketplace-app-kafka/actions.md create mode 100644 ok-marketplace-be/ok-marketplace-app-kafka/build.gradle.kts create mode 100644 ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/AppKafkaConfig.kt create mode 100644 ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/AppKafkaConsumer.kt create mode 100644 ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/ConsumerStrategyV1.kt create mode 100644 ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/ConsumerStrategyV2.kt create mode 100644 ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/IConsumerStrategy.kt create mode 100644 ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/InputOutputTopics.kt create mode 100644 ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/Main.kt create mode 100644 ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/kafkaUtils.kt create mode 100644 ok-marketplace-be/ok-marketplace-app-kafka/src/main/resources/logback.xml create mode 100644 ok-marketplace-be/ok-marketplace-app-kafka/src/test/kotlin/KafkaControllerTest.kt create mode 100644 ok-marketplace-be/ok-marketplace-app-kafka/src/test/kotlin/SimpleKafkaTest.kt diff --git a/README.md b/README.md index 01522c3..b21f915 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,19 @@ Marketplace -- это площадка, на которой пользовате ## Проектные модули +## Мониторинг и логирование + +1. [deploy](deploy) - Инструменты мониторинга и деплоя +2. [ok-marketplace-lib-logging-common](ok-marketplace-libs/ok-marketplace-lib-logging-common) - Общие объявления для + логирования +3. [ok-marketplace-lib-logging-kermit](ok-marketplace-libs/ok-marketplace-lib-logging-kermit) - Библиотека логирования + на базе библиотеки + Kermit +4. [ok-marketplace-lib-logging-logback](ok-marketplace-libs/ok-marketplace-lib-logging-logback) - Библиотека логирования + на базе библиотеки Logback +5. [ok-marketplace-lib-logging-socket](ok-marketplace-libs/ok-marketplace-lib-logging-socket) - Библиотека логирования + на базе TCP-сокетов + ### Транспортные модели, API 1. [specs](specs) - описание API в форме OpenAPI-спецификаций @@ -69,13 +82,15 @@ Marketplace -- это площадка, на которой пользовате моделей с KMP 5. [ok-marketplace-common](ok-marketplace-be/ok-marketplace-common) - модуль с общими классами для модулей проекта. В частности, там располагаются внутренние модели и контекст. -6. [ok-marketplace-mappers-log1](ok-marketplace-be/ok-marketplace-mappers-log1) - Мапер между внутренними моделями и +6. [ok-marketplace-mappers-log1](ok-marketplace-be/ok-marketplace-api-log1) - Мапер между внутренними моделями и моделями логирования первой версии ### Фреймворки и транспорты 1. [ok-marketplace-app-spring](ok-marketplace-be/ok-marketplace-app-spring) - Приложение на Spring Framework 2. [ok-marketplace-app-ktor](ok-marketplace-be/ok-marketplace-app-ktor) - Приложение на Ktor +3. [ok-marketplace-app-rabbit](ok-marketplace-be/ok-marketplace-app-rabbit) - Микросервис на RabbitMQ +4. [ok-marketplace-app-kafka](ok-marketplace-be/ok-marketplace-app-kafka) - Микросервис на Kafka ### Модули бизнес-логики diff --git a/deploy/docker-compose-kafka-kraft.yaml b/deploy/docker-compose-kafka-kraft.yaml new file mode 100644 index 0000000..c4e5117 --- /dev/null +++ b/deploy/docker-compose-kafka-kraft.yaml @@ -0,0 +1,37 @@ +version: '3.8' +services: + kafka: + image: apache/kafka:latest + hostname: kafka + container_name: kafka + ports: + - '9092:9092' + environment: + KAFKA_NODE_ID: 1 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' + KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT_HOST://localhost:9092,PLAINTEXT://kafka:19092' + KAFKA_PROCESS_ROLES: 'broker,controller' + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093' + KAFKA_LISTENERS: 'CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092' + KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' + KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' + CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw' + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' + + app: + image: ru.otus.otuskotlin.marketplace/ok-marketplace-app-kafka:0.0.1 + depends_on: + - kafka + links: + - kafka + environment: + KAFKA_HOSTS: kafka:19092 + KAFKA_TOPIC_IN_V1: marketplace-ad-v1-in + KAFKA_TOPIC_OUT_V1: marketplace-ad-v1-out + KAFKA_TOPIC_IN_V2: marketplace-ad-v2-in + KAFKA_TOPIC_OUT_V2: marketplace-ad-v2-out + diff --git a/deploy/docker-compose-kafka-zk.yml b/deploy/docker-compose-kafka-zk.yml new file mode 100644 index 0000000..e846cbc --- /dev/null +++ b/deploy/docker-compose-kafka-zk.yml @@ -0,0 +1,55 @@ +version: '3.8' +services: + zookeeper: + image: confluentinc/cp-zookeeper + container_name: zookeeper + healthcheck: + test: "[[ $$(echo srvr | nc localhost 2181 | grep -oG 'Mode: standalone') = \"Mode: standalone\" ]]" + interval: 10s + timeout: 1s + retries: 30 + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + ports: + - "2181:2181" + + kafka: + image: confluentinc/cp-kafka + container_name: kafka + depends_on: + zookeeper: + condition: service_healthy + healthcheck: + test: "test $$( /usr/bin/zookeeper-shell zookeeper:2181 get /brokers/ids/1 | grep { ) != ''" + interval: 3s + timeout: 2s + retries: 300 + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092" + KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka:29092,EXTERNAL://localhost:9092" + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT" + KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_JMX_PORT: 9101 + KAFKA_JMX_HOSTNAME: localhost + ports: + - "9092:9092" + - "9101:9101" + + kafdrop: + image: obsidiandynamics/kafdrop + container_name: kafdrop + restart: "no" + ports: + - "9000:9000" + environment: + KAFKA_BROKERCONNECT: "kafka:29092" +# JVM_OPTS: "-Xms16M -Xmx48M -Xss180K -XX:-TieredCompilation -XX:+UseStringDeduplication -noverify" + depends_on: + - "kafka" diff --git a/deploy/volumes/.gitignore b/deploy/volumes/.gitignore index 9890ed5..701d192 100644 --- a/deploy/volumes/.gitignore +++ b/deploy/volumes/.gitignore @@ -1,2 +1,3 @@ */certs/ /ca/ +/kfdata/ diff --git a/gradle.properties b/gradle.properties index fb560e2..733f509 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,7 +1,7 @@ kotlin.code.style=official +org.gradle.jvmargs=-Xmx2g -XX:MaxMetaspaceSize=512m -Dfile.encoding=UTF-8 kotlin.mpp.enableCInteropCommonization=true kotlin.native.ignoreDisabledTargets=true -org.gradle.jvmargs=-Xmx2g -XX:MaxMetaspaceSize=512m -Dfile.encoding=UTF-8 #kotlin.native.cacheKind.linuxX64=none kotlinVersion=1.9.22 diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 09ab485..9a9bd26 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -76,6 +76,7 @@ spring-test = { module = "org.springframework.boot:spring-boot-starter-test" } # Message Queues rabbitmq-client = { module = "com.rabbitmq:amqp-client", version = "5.20.0" } +kafka-client = { module = "org.apache.kafka:kafka-clients", version = "3.7.0" } # Testing kotest-junit5 = { module = "io.kotest:kotest-runner-junit5", version.ref = "kotest" } diff --git a/ok-marketplace-be/build.gradle.kts b/ok-marketplace-be/build.gradle.kts index bfd4495..305ef06 100644 --- a/ok-marketplace-be/build.gradle.kts +++ b/ok-marketplace-be/build.gradle.kts @@ -1,6 +1,8 @@ plugins { alias(libs.plugins.kotlin.jvm) apply false alias(libs.plugins.kotlin.multiplatform) apply false + alias(libs.plugins.muschko.remote) apply false + alias(libs.plugins.muschko.java) apply false } group = "ru.otus.otuskotlin.marketplace" diff --git a/ok-marketplace-be/ok-marketplace-app-kafka/actions.md b/ok-marketplace-be/ok-marketplace-app-kafka/actions.md new file mode 100644 index 0000000..2411e41 --- /dev/null +++ b/ok-marketplace-be/ok-marketplace-app-kafka/actions.md @@ -0,0 +1,31 @@ +Получить список топиков +```shell +docker exec -ti kafka /usr/bin/kafka-topics --list --bootstrap-server localhost:9092 +``` + +Отправить сообщение +```shell +docker exec -ti kafka /usr/bin/kafka-console-producer --topic topic1 --bootstrap-server localhost:9092 +``` +Каждая строка - одно сообщение. Прервать - Ctrl+Z + +Получить сообщения +```shell +docker exec -ti kafka /usr/bin/kafka-console-consumer --from-beginning --topic topic1 --bootstrap-server localhost:9092 +``` + +Получить сообщения как consumer1 +```shell +docker exec -ti kafka /usr/bin/kafka-console-consumer --group consumer1 --topic topic1 --bootstrap-server localhost:9092 + +``` + +Отправить сообщение с ключом через двоеточие (key:value) +```shell +docker exec -ti kafka /usr/bin/kafka-console-producer \ + --topic topic1 \ + --property "parse.key=true" \ + --property "key.separator=:" \ + --bootstrap-server \ + localhost:9092 +``` diff --git a/ok-marketplace-be/ok-marketplace-app-kafka/build.gradle.kts b/ok-marketplace-be/ok-marketplace-app-kafka/build.gradle.kts new file mode 100644 index 0000000..53e03ee --- /dev/null +++ b/ok-marketplace-be/ok-marketplace-app-kafka/build.gradle.kts @@ -0,0 +1,35 @@ +plugins { + application + id("build-jvm") + alias(libs.plugins.muschko.java) +} + +application { + mainClass.set("ru.otus.otuskotlin.marketplace.app.kafka.MainKt") +} + +docker { + javaApplication { + baseImage.set("openjdk:17.0.2-slim") + } +} + +dependencies { + implementation(libs.kafka.client) + implementation(libs.coroutines.core) + implementation(libs.kotlinx.atomicfu) + + implementation("ru.otus.otuskotlin.marketplace.libs:ok-marketplace-lib-logging-logback") + + implementation(project(":ok-marketplace-app-common")) + + // transport models + implementation(project(":ok-marketplace-common")) + implementation(project(":ok-marketplace-api-v1-jackson")) + implementation(project(":ok-marketplace-api-v1-mappers")) + implementation(project(":ok-marketplace-api-v2-kmp")) + // logic + implementation(project(":ok-marketplace-biz")) + + testImplementation(kotlin("test-junit")) +} diff --git a/ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/AppKafkaConfig.kt b/ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/AppKafkaConfig.kt new file mode 100644 index 0000000..25ccb7a --- /dev/null +++ b/ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/AppKafkaConfig.kt @@ -0,0 +1,36 @@ +package ru.otus.otuskotlin.marketplace.app.kafka + +import ru.otus.otuskotlin.marketplace.app.common.IMkplAppSettings +import ru.otus.otuskotlin.marketplace.biz.MkplAdProcessor +import ru.otus.otuskotlin.marketplace.common.MkplCorSettings +import ru.otus.otuskotlin.marketplace.logging.common.MpLoggerProvider +import ru.otus.otuskotlin.marketplace.logging.jvm.mpLoggerLogback + +class AppKafkaConfig( + val kafkaHosts: List = KAFKA_HOSTS, + val kafkaGroupId: String = KAFKA_GROUP_ID, + val kafkaTopicInV1: String = KAFKA_TOPIC_IN_V1, + val kafkaTopicOutV1: String = KAFKA_TOPIC_OUT_V1, + val kafkaTopicInV2: String = KAFKA_TOPIC_IN_V2, + val kafkaTopicOutV2: String = KAFKA_TOPIC_OUT_V2, + override val corSettings: MkplCorSettings = MkplCorSettings( + loggerProvider = MpLoggerProvider { mpLoggerLogback(it) } + ), + override val processor: MkplAdProcessor = MkplAdProcessor(corSettings), +): IMkplAppSettings { + companion object { + const val KAFKA_HOST_VAR = "KAFKA_HOSTS" + const val KAFKA_TOPIC_IN_V1_VAR = "KAFKA_TOPIC_IN_V1" + const val KAFKA_TOPIC_OUT_V1_VAR = "KAFKA_TOPIC_OUT_V1" + const val KAFKA_TOPIC_IN_V2_VAR = "KAFKA_TOPIC_IN_V2" + const val KAFKA_TOPIC_OUT_V2_VAR = "KAFKA_TOPIC_OUT_V2" + const val KAFKA_GROUP_ID_VAR = "KAFKA_GROUP_ID" + + val KAFKA_HOSTS by lazy { (System.getenv(KAFKA_HOST_VAR) ?: "").split("\\s*[,; ]\\s*") } + val KAFKA_GROUP_ID by lazy { System.getenv(KAFKA_GROUP_ID_VAR) ?: "marketplace" } + val KAFKA_TOPIC_IN_V1 by lazy { System.getenv(KAFKA_TOPIC_IN_V1_VAR) ?: "marketplace-ad-v1-in" } + val KAFKA_TOPIC_OUT_V1 by lazy { System.getenv(KAFKA_TOPIC_OUT_V1_VAR) ?: "marketplace-ad-v1-out" } + val KAFKA_TOPIC_IN_V2 by lazy { System.getenv(KAFKA_TOPIC_IN_V2_VAR) ?: "marketplace-ad-v2-in" } + val KAFKA_TOPIC_OUT_V2 by lazy { System.getenv(KAFKA_TOPIC_OUT_V2_VAR) ?: "marketplace-ad-v2-out" } + } +} diff --git a/ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/AppKafkaConsumer.kt b/ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/AppKafkaConsumer.kt new file mode 100644 index 0000000..6e07c94 --- /dev/null +++ b/ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/AppKafkaConsumer.kt @@ -0,0 +1,110 @@ +package ru.otus.otuskotlin.marketplace.app.kafka + +import kotlinx.atomicfu.atomic +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.NonCancellable +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withContext +import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.consumer.ConsumerRecords +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.clients.producer.Producer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.errors.WakeupException +import ru.otus.otuskotlin.marketplace.app.common.controllerHelper +import java.time.Duration +import java.util.* + +/** + * Основной класс для обслуживания Кафка-интерфейса + */ +class AppKafkaConsumer( + private val config: AppKafkaConfig, + consumerStrategies: List, + private val consumer: Consumer = config.createKafkaConsumer(), + private val producer: Producer = config.createKafkaProducer() +) : AutoCloseable { + private val log = config.corSettings.loggerProvider.logger(this::class) + private val process = atomic(true) // пояснить + private val topicsAndStrategyByInputTopic: Map = consumerStrategies.associate { + val topics = it.topics(config) + topics.input to TopicsAndStrategy(topics.input, topics.output, it) + } + + /** + * Блокирующая функция старта получения и обработки сообщений из Кафки. Для неблокирующей версии см. [[startSusp]] + */ + fun start(): Unit = runBlocking { startSusp() } + + /** + * Неблокирующая функция старта получения и обработки сообщений из Кафки. Блокирующая версия - см. [[start]] + */ + suspend fun startSusp() { + process.value = true + try { + consumer.subscribe(topicsAndStrategyByInputTopic.keys) + while (process.value) { + val records: ConsumerRecords = withContext(Dispatchers.IO) { + consumer.poll(Duration.ofSeconds(1)) + } + if (!records.isEmpty) + log.debug("Receive ${records.count()} messages") + + records.forEach { record: ConsumerRecord -> + try { + val (_, outputTopic, strategy) = topicsAndStrategyByInputTopic[record.topic()] + ?: throw RuntimeException("Receive message from unknown topic ${record.topic()}") + + val resp = config.controllerHelper( + { strategy.deserialize(record.value(), this) }, + { strategy.serialize(this) }, + KafkaConsumer::class, + "kafka-consumer" + ) + sendResponse(resp, outputTopic) + } catch (ex: Exception) { + log.error("error", e = ex) + } + } + } + } catch (ex: WakeupException) { + // ignore for shutdown + } catch (ex: RuntimeException) { + // exception handling + withContext(NonCancellable) { + throw ex + } + } finally { + withContext(NonCancellable) { + consumer.close() + } + } + } + + private suspend fun sendResponse(json: String, outputTopic: String) { + val resRecord = ProducerRecord( + outputTopic, +// null, + UUID.randomUUID().toString(), + json + ) + log.info("sending ${resRecord.key()} to $outputTopic:\n$json") + withContext(Dispatchers.IO) { + producer.send(resRecord) + } + } + + /** + * Корректное завершение для методов [[start]], [[startSusp]] + */ + override fun close() { + process.value = false + } + + private data class TopicsAndStrategy( + val inputTopic: String, + val outputTopic: String, + val strategy: IConsumerStrategy + ) +} diff --git a/ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/ConsumerStrategyV1.kt b/ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/ConsumerStrategyV1.kt new file mode 100644 index 0000000..5e8bfaf --- /dev/null +++ b/ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/ConsumerStrategyV1.kt @@ -0,0 +1,25 @@ +package ru.otus.otuskotlin.marketplace.app.kafka + +import ru.otus.otuskotlin.marketplace.api.v1.apiV1RequestDeserialize +import ru.otus.otuskotlin.marketplace.api.v1.apiV1ResponseSerialize +import ru.otus.otuskotlin.marketplace.api.v1.models.IRequest +import ru.otus.otuskotlin.marketplace.api.v1.models.IResponse +import ru.otus.otuskotlin.marketplace.common.MkplContext +import ru.otus.otuskotlin.marketplace.mappers.v1.fromTransport +import ru.otus.otuskotlin.marketplace.mappers.v1.toTransportAd + +class ConsumerStrategyV1 : IConsumerStrategy { + override fun topics(config: AppKafkaConfig): InputOutputTopics { + return InputOutputTopics(config.kafkaTopicInV1, config.kafkaTopicOutV1) + } + + override fun serialize(source: MkplContext): String { + val response: IResponse = source.toTransportAd() + return apiV1ResponseSerialize(response) + } + + override fun deserialize(value: String, target: MkplContext) { + val request: IRequest = apiV1RequestDeserialize(value) + target.fromTransport(request) + } +} diff --git a/ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/ConsumerStrategyV2.kt b/ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/ConsumerStrategyV2.kt new file mode 100644 index 0000000..7464e39 --- /dev/null +++ b/ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/ConsumerStrategyV2.kt @@ -0,0 +1,25 @@ +package ru.otus.otuskotlin.marketplace.app.kafka + +import ru.otus.otuskotlin.marketplace.api.v2.apiV2RequestDeserialize +import ru.otus.otuskotlin.marketplace.api.v2.apiV2ResponseSerialize +import ru.otus.otuskotlin.marketplace.api.v2.mappers.fromTransport +import ru.otus.otuskotlin.marketplace.api.v2.mappers.toTransportAd +import ru.otus.otuskotlin.marketplace.api.v2.models.IRequest +import ru.otus.otuskotlin.marketplace.api.v2.models.IResponse +import ru.otus.otuskotlin.marketplace.common.MkplContext + +class ConsumerStrategyV2 : IConsumerStrategy { + override fun topics(config: AppKafkaConfig): InputOutputTopics { + return InputOutputTopics(config.kafkaTopicInV2, config.kafkaTopicOutV2) + } + + override fun serialize(source: MkplContext): String { + val response: IResponse = source.toTransportAd() + return apiV2ResponseSerialize(response) + } + + override fun deserialize(value: String, target: MkplContext) { + val request: IRequest = apiV2RequestDeserialize(value) + target.fromTransport(request) + } +} diff --git a/ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/IConsumerStrategy.kt b/ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/IConsumerStrategy.kt new file mode 100644 index 0000000..19815c9 --- /dev/null +++ b/ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/IConsumerStrategy.kt @@ -0,0 +1,21 @@ +package ru.otus.otuskotlin.marketplace.app.kafka + +import ru.otus.otuskotlin.marketplace.common.MkplContext + +/** + * Интерфейс стратегии для обслуживания версии API + */ +interface IConsumerStrategy { + /** + * Топики, для которых применяется стратегия + */ + fun topics(config: AppKafkaConfig): InputOutputTopics + /** + * Сериализатор для версии API + */ + fun serialize(source: MkplContext): String + /** + * Десериализатор для версии API + */ + fun deserialize(value: String, target: MkplContext) +} diff --git a/ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/InputOutputTopics.kt b/ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/InputOutputTopics.kt new file mode 100644 index 0000000..28efac7 --- /dev/null +++ b/ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/InputOutputTopics.kt @@ -0,0 +1,15 @@ +package ru.otus.otuskotlin.marketplace.app.kafka + +/** + * Структура для хранения входящего и исходящего топиков для версии API + */ +data class InputOutputTopics( + /** + * Топик входящих в приложение сообщений + */ + val input: String, + /** + * Топик для исходящих из приложения сообщений + */ + val output: String, +) diff --git a/ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/Main.kt b/ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/Main.kt new file mode 100644 index 0000000..9950293 --- /dev/null +++ b/ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/Main.kt @@ -0,0 +1,7 @@ +package ru.otus.otuskotlin.marketplace.app.kafka + +fun main() { + val config = AppKafkaConfig() + val consumer = AppKafkaConsumer(config, listOf(ConsumerStrategyV1(), ConsumerStrategyV2())) + consumer.start() +} diff --git a/ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/kafkaUtils.kt b/ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/kafkaUtils.kt new file mode 100644 index 0000000..5f75ecc --- /dev/null +++ b/ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/kafkaUtils.kt @@ -0,0 +1,28 @@ +package ru.otus.otuskotlin.marketplace.app.kafka + +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.serialization.StringSerializer +import java.util.* + +fun AppKafkaConfig.createKafkaConsumer() : KafkaConsumer { + val props = Properties().apply { + put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHosts) + put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId) + put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java) + put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java) + } + return KafkaConsumer(props) +} + +fun AppKafkaConfig.createKafkaProducer(): KafkaProducer { + val props = Properties().apply { + put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHosts) + put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java) + put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java) + } + return KafkaProducer(props) +} diff --git a/ok-marketplace-be/ok-marketplace-app-kafka/src/main/resources/logback.xml b/ok-marketplace-be/ok-marketplace-app-kafka/src/main/resources/logback.xml new file mode 100644 index 0000000..ed5c82c --- /dev/null +++ b/ok-marketplace-be/ok-marketplace-app-kafka/src/main/resources/logback.xml @@ -0,0 +1,13 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + \ No newline at end of file diff --git a/ok-marketplace-be/ok-marketplace-app-kafka/src/test/kotlin/KafkaControllerTest.kt b/ok-marketplace-be/ok-marketplace-app-kafka/src/test/kotlin/KafkaControllerTest.kt new file mode 100644 index 0000000..23468c7 --- /dev/null +++ b/ok-marketplace-be/ok-marketplace-app-kafka/src/test/kotlin/KafkaControllerTest.kt @@ -0,0 +1,79 @@ +package ru.otus.otuskotlin.marketplace.app.kafka + +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.consumer.MockConsumer +import org.apache.kafka.clients.consumer.OffsetResetStrategy +import org.apache.kafka.clients.producer.MockProducer +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.serialization.StringSerializer +import org.junit.Test +import ru.otus.otuskotlin.marketplace.api.v1.apiV1RequestSerialize +import ru.otus.otuskotlin.marketplace.api.v1.apiV1ResponseDeserialize +import ru.otus.otuskotlin.marketplace.api.v1.models.AdCreateObject +import ru.otus.otuskotlin.marketplace.api.v1.models.AdCreateRequest +import ru.otus.otuskotlin.marketplace.api.v1.models.AdCreateResponse +import ru.otus.otuskotlin.marketplace.api.v1.models.AdDebug +import ru.otus.otuskotlin.marketplace.api.v1.models.AdRequestDebugMode +import ru.otus.otuskotlin.marketplace.api.v1.models.AdRequestDebugStubs +import ru.otus.otuskotlin.marketplace.api.v1.models.AdVisibility +import ru.otus.otuskotlin.marketplace.api.v1.models.DealSide +import java.util.* +import kotlin.test.assertEquals + + +class KafkaControllerTest { + @Test + fun runKafka() { + val consumer = MockConsumer(OffsetResetStrategy.EARLIEST) + val producer = MockProducer(true, StringSerializer(), StringSerializer()) + val config = AppKafkaConfig() + val inputTopic = config.kafkaTopicInV1 + val outputTopic = config.kafkaTopicOutV1 + + val app = AppKafkaConsumer(config, listOf(ConsumerStrategyV1()), consumer = consumer, producer = producer) + consumer.schedulePollTask { + consumer.rebalance(Collections.singletonList(TopicPartition(inputTopic, 0))) + consumer.addRecord( + ConsumerRecord( + inputTopic, + PARTITION, + 0L, + "test-1", + apiV1RequestSerialize( + AdCreateRequest( + ad = AdCreateObject( + title = "Требуется болт", + description = "some testing ad to check them all", + visibility = AdVisibility.OWNER_ONLY, + adType = DealSide.DEMAND, + ), + debug = AdDebug( + mode = AdRequestDebugMode.STUB, + stub = AdRequestDebugStubs.SUCCESS, + ), + ), + ) + ) + ) + app.close() + } + + val startOffsets: MutableMap = mutableMapOf() + val tp = TopicPartition(inputTopic, PARTITION) + startOffsets[tp] = 0L + consumer.updateBeginningOffsets(startOffsets) + + app.start() + + val message = producer.history().first() + val result = apiV1ResponseDeserialize(message.value()) + assertEquals(outputTopic, message.topic()) + assertEquals("Требуется болт", result.ad?.title) + } + + companion object { + const val PARTITION = 0 + } +} + + diff --git a/ok-marketplace-be/ok-marketplace-app-kafka/src/test/kotlin/SimpleKafkaTest.kt b/ok-marketplace-be/ok-marketplace-app-kafka/src/test/kotlin/SimpleKafkaTest.kt new file mode 100644 index 0000000..819092e --- /dev/null +++ b/ok-marketplace-be/ok-marketplace-app-kafka/src/test/kotlin/SimpleKafkaTest.kt @@ -0,0 +1,83 @@ +package ru.otus.otuskotlin.marketplace.app.kafka + +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord +import java.time.Duration +import java.time.Instant +import java.util.* +import kotlin.test.Ignore +import kotlin.test.Test + +/** + * Для запуска этого теста, требуется поднять Кафку на порту 9092. + * Поднять можно с помощью /deploy/docker-compose-kafka-cp.yml + */ +@Ignore +class SimpleKafkaTest { + private val topicName = "producer-topic" + + /** + * Отправка сообщения в Кафку + */ + @Test + fun producerTest() { + // create instance for properties to access producer configs + val props = Properties().apply { + //Assign localhost id + put("bootstrap.servers", "localhost:9092") + //Set acknowledgements for producer requests. + put("acks", "all") + //If the request fails, the producer can automatically retry, + put("retries", 0) + //Specify buffer size in config + put("batch.size", 16384) + //The buffer.memory controls the total amount of memory available to the producer for buffering. + put("buffer.memory", 33554432) + put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") + put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") + } + + KafkaProducer(props).use { producer -> + (0..<10).forEach { + val key = "key#$it" + val value = "Message numbere $it" + producer.send(ProducerRecord(topicName, key, value)) + } + println("Message sent successfully") + } + } + + /** + * Получение сообщения из топика Кафки + */ + @Test + fun consumerTest() { + val props = Properties().apply { + put("bootstrap.servers", "localhost:9092") + // Here the consumer group should be set + put("group.id", "test") + put("enable.auto.commit", "true") + put("auto.commit.interval.ms", "1000") + put("session.timeout.ms", "30000") + // https://kafka.apache.org/documentation/#consumerconfigs_auto.offset.reset + put("auto.offset.reset", "earliest") + put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer") + put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") + } + KafkaConsumer(props).use { consumer -> + + //Kafka Consumer subscribes list of topics here. + consumer.subscribe(listOf(topicName)) + val timeWithTimeout = Instant.now() + Duration.ofSeconds(2) + + while (timeWithTimeout > Instant.now()) { + val records = consumer.poll(Duration.ofMillis(100)) + records.forEach { record -> + // print the offset,key and value for the consumer records. + println("topic = ${record.topic()}, offset = ${record.offset()}, key = ${record.key()}, value = ${record.value()}") + } + } + } + } +} diff --git a/ok-marketplace-be/settings.gradle.kts b/ok-marketplace-be/settings.gradle.kts index a6ff22f..f1d8b9e 100644 --- a/ok-marketplace-be/settings.gradle.kts +++ b/ok-marketplace-be/settings.gradle.kts @@ -43,4 +43,5 @@ include(":ok-marketplace-app-common") include(":ok-marketplace-app-spring") include(":ok-marketplace-app-ktor") include(":ok-marketplace-app-rabbit") +include(":ok-marketplace-app-kafka")