Skip to content

Commit

Permalink
m5l5 - Kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
svok authored and evgnep committed Nov 25, 2024
1 parent bb4d80a commit def93b3
Show file tree
Hide file tree
Showing 20 changed files with 621 additions and 1 deletion.
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,19 @@ Marketplace -- это площадка, на которой пользовате

## Проектные модули

## Мониторинг и логирование

1. [deploy](deploy) - Инструменты мониторинга и деплоя
2. [ok-marketplace-lib-logging-common](ok-marketplace-libs/ok-marketplace-lib-logging-common) - Общие объявления для
логирования
3. [ok-marketplace-lib-logging-kermit](ok-marketplace-libs/ok-marketplace-lib-logging-kermit) - Библиотека логирования
на базе библиотеки
Kermit
4. [ok-marketplace-lib-logging-logback](ok-marketplace-libs/ok-marketplace-lib-logging-logback) - Библиотека логирования
на базе библиотеки Logback
5. [ok-marketplace-lib-logging-socket](ok-marketplace-libs/ok-marketplace-lib-logging-socket) - Библиотека логирования
на базе TCP-сокетов

### Транспортные модели, API

1. [specs](specs) - описание API в форме OpenAPI-спецификаций
Expand All @@ -68,13 +81,15 @@ Marketplace -- это площадка, на которой пользовате
моделей с KMP
5. [ok-marketplace-common](ok-marketplace-be/ok-marketplace-common) - модуль с общими классами для модулей проекта. В
частности, там располагаются внутренние модели и контекст.
6. [ok-marketplace-mappers-log1](ok-marketplace-be/ok-marketplace-mappers-log1) - Мапер между внутренними моделями и
6. [ok-marketplace-mappers-log1](ok-marketplace-be/ok-marketplace-api-log1) - Мапер между внутренними моделями и
моделями логирования первой версии

### Фреймворки и транспорты

1. [ok-marketplace-app-spring](ok-marketplace-be/ok-marketplace-app-spring) - Приложение на Spring Framework
2. [ok-marketplace-app-ktor](ok-marketplace-be/ok-marketplace-app-ktor) - Приложение на Ktor
3. [ok-marketplace-app-rabbit](ok-marketplace-be/ok-marketplace-app-rabbit) - Микросервис на RabbitMQ
4. [ok-marketplace-app-kafka](ok-marketplace-be/ok-marketplace-app-kafka) - Микросервис на Kafka

### Модули бизнес-логики

Expand Down
37 changes: 37 additions & 0 deletions deploy/docker-compose-kafka-kraft.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
version: '3.8'
services:
kafka:
image: apache/kafka:latest
hostname: kafka
container_name: kafka
ports:
- '9092:9092'
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT_HOST://localhost:9092,PLAINTEXT://kafka:19092'
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093'
KAFKA_LISTENERS: 'CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'

app:
image: ru.otus.otuskotlin.marketplace/ok-marketplace-app-kafka:0.0.1
depends_on:
- kafka
links:
- kafka
environment:
KAFKA_HOSTS: kafka:19092
KAFKA_TOPIC_IN_V1: marketplace-ad-v1-in
KAFKA_TOPIC_OUT_V1: marketplace-ad-v1-out
KAFKA_TOPIC_IN_V2: marketplace-ad-v2-in
KAFKA_TOPIC_OUT_V2: marketplace-ad-v2-out

55 changes: 55 additions & 0 deletions deploy/docker-compose-kafka-zk.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper
container_name: zookeeper
healthcheck:
test: "[[ $$(echo srvr | nc localhost 2181 | grep -oG 'Mode: standalone') = \"Mode: standalone\" ]]"
interval: 10s
timeout: 1s
retries: 30
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"

kafka:
image: confluentinc/cp-kafka
container_name: kafka
depends_on:
zookeeper:
condition: service_healthy
healthcheck:
test: "test $$( /usr/bin/zookeeper-shell zookeeper:2181 get /brokers/ids/1 | grep { ) != ''"
interval: 3s
timeout: 2s
retries: 300
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka:29092,EXTERNAL://localhost:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
ports:
- "9092:9092"
- "9101:9101"

kafdrop:
image: obsidiandynamics/kafdrop
container_name: kafdrop
restart: "no"
ports:
- "9000:9000"
environment:
KAFKA_BROKERCONNECT: "kafka:29092"
# JVM_OPTS: "-Xms16M -Xmx48M -Xss180K -XX:-TieredCompilation -XX:+UseStringDeduplication -noverify"
depends_on:
- "kafka"
1 change: 1 addition & 0 deletions deploy/volumes/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
*/certs/
/ca/
/kfdata/
1 change: 1 addition & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ spring-test = { module = "org.springframework.boot:spring-boot-starter-test" }

# Message Queues
rabbitmq-client = { module = "com.rabbitmq:amqp-client", version = "5.20.0" }
kafka-client = { module = "org.apache.kafka:kafka-clients", version = "3.7.0" }

# Testing
kotest-junit5 = { module = "io.kotest:kotest-runner-junit5", version.ref = "kotest" }
Expand Down
2 changes: 2 additions & 0 deletions ok-marketplace-be/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
plugins {
alias(libs.plugins.kotlin.jvm) apply false
alias(libs.plugins.kotlin.multiplatform) apply false
alias(libs.plugins.muschko.remote) apply false
alias(libs.plugins.muschko.java) apply false
}

group = "ru.otus.otuskotlin.marketplace"
Expand Down
31 changes: 31 additions & 0 deletions ok-marketplace-be/ok-marketplace-app-kafka/actions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
Получить список топиков
```shell
docker exec -ti kafka /usr/bin/kafka-topics --list --bootstrap-server localhost:9092
```

Отправить сообщение
```shell
docker exec -ti kafka /usr/bin/kafka-console-producer --topic topic1 --bootstrap-server localhost:9092
```
Каждая строка - одно сообщение. Прервать - Ctrl+Z

Получить сообщения
```shell
docker exec -ti kafka /usr/bin/kafka-console-consumer --from-beginning --topic topic1 --bootstrap-server localhost:9092
```

Получить сообщения как consumer1
```shell
docker exec -ti kafka /usr/bin/kafka-console-consumer --group consumer1 --topic topic1 --bootstrap-server localhost:9092

```

Отправить сообщение с ключом через двоеточие (key:value)
```shell
docker exec -ti kafka /usr/bin/kafka-console-producer \
--topic topic1 \
--property "parse.key=true" \
--property "key.separator=:" \
--bootstrap-server \
localhost:9092
```
35 changes: 35 additions & 0 deletions ok-marketplace-be/ok-marketplace-app-kafka/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
plugins {
application
id("build-jvm")
alias(libs.plugins.muschko.java)
}

application {
mainClass.set("ru.otus.otuskotlin.marketplace.app.kafka.MainKt")
}

docker {
javaApplication {
baseImage.set("openjdk:17.0.2-slim")
}
}

dependencies {
implementation(libs.kafka.client)
implementation(libs.coroutines.core)
implementation(libs.kotlinx.atomicfu)

implementation("ru.otus.otuskotlin.marketplace.libs:ok-marketplace-lib-logging-logback")

implementation(project(":ok-marketplace-app-common"))

// transport models
implementation(project(":ok-marketplace-common"))
implementation(project(":ok-marketplace-api-v1-jackson"))
implementation(project(":ok-marketplace-api-v1-mappers"))
implementation(project(":ok-marketplace-api-v2-kmp"))
// logic
implementation(project(":ok-marketplace-biz"))

testImplementation(kotlin("test-junit"))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package ru.otus.otuskotlin.marketplace.app.kafka

import ru.otus.otuskotlin.marketplace.app.common.IMkplAppSettings
import ru.otus.otuskotlin.marketplace.biz.MkplAdProcessor
import ru.otus.otuskotlin.marketplace.common.MkplCorSettings
import ru.otus.otuskotlin.marketplace.logging.common.MpLoggerProvider
import ru.otus.otuskotlin.marketplace.logging.jvm.mpLoggerLogback

class AppKafkaConfig(
val kafkaHosts: List<String> = KAFKA_HOSTS,
val kafkaGroupId: String = KAFKA_GROUP_ID,
val kafkaTopicInV1: String = KAFKA_TOPIC_IN_V1,
val kafkaTopicOutV1: String = KAFKA_TOPIC_OUT_V1,
val kafkaTopicInV2: String = KAFKA_TOPIC_IN_V2,
val kafkaTopicOutV2: String = KAFKA_TOPIC_OUT_V2,
override val corSettings: MkplCorSettings = MkplCorSettings(
loggerProvider = MpLoggerProvider { mpLoggerLogback(it) }
),
override val processor: MkplAdProcessor = MkplAdProcessor(corSettings),
): IMkplAppSettings {
companion object {
const val KAFKA_HOST_VAR = "KAFKA_HOSTS"
const val KAFKA_TOPIC_IN_V1_VAR = "KAFKA_TOPIC_IN_V1"
const val KAFKA_TOPIC_OUT_V1_VAR = "KAFKA_TOPIC_OUT_V1"
const val KAFKA_TOPIC_IN_V2_VAR = "KAFKA_TOPIC_IN_V2"
const val KAFKA_TOPIC_OUT_V2_VAR = "KAFKA_TOPIC_OUT_V2"
const val KAFKA_GROUP_ID_VAR = "KAFKA_GROUP_ID"

val KAFKA_HOSTS by lazy { (System.getenv(KAFKA_HOST_VAR) ?: "").split("\\s*[,; ]\\s*") }
val KAFKA_GROUP_ID by lazy { System.getenv(KAFKA_GROUP_ID_VAR) ?: "marketplace" }
val KAFKA_TOPIC_IN_V1 by lazy { System.getenv(KAFKA_TOPIC_IN_V1_VAR) ?: "marketplace-ad-v1-in" }
val KAFKA_TOPIC_OUT_V1 by lazy { System.getenv(KAFKA_TOPIC_OUT_V1_VAR) ?: "marketplace-ad-v1-out" }
val KAFKA_TOPIC_IN_V2 by lazy { System.getenv(KAFKA_TOPIC_IN_V2_VAR) ?: "marketplace-ad-v2-in" }
val KAFKA_TOPIC_OUT_V2 by lazy { System.getenv(KAFKA_TOPIC_OUT_V2_VAR) ?: "marketplace-ad-v2-out" }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package ru.otus.otuskotlin.marketplace.app.kafka

import kotlinx.atomicfu.atomic
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.errors.WakeupException
import ru.otus.otuskotlin.marketplace.app.common.controllerHelper
import java.time.Duration
import java.util.*

/**
* Основной класс для обслуживания Кафка-интерфейса
*/
class AppKafkaConsumer(
private val config: AppKafkaConfig,
consumerStrategies: List<IConsumerStrategy>,
private val consumer: Consumer<String, String> = config.createKafkaConsumer(),
private val producer: Producer<String, String> = config.createKafkaProducer()
) : AutoCloseable {
private val log = config.corSettings.loggerProvider.logger(this::class)
private val process = atomic(true) // пояснить
private val topicsAndStrategyByInputTopic: Map<String, TopicsAndStrategy> = consumerStrategies.associate {
val topics = it.topics(config)
topics.input to TopicsAndStrategy(topics.input, topics.output, it)
}

/**
* Блокирующая функция старта получения и обработки сообщений из Кафки. Для неблокирующей версии см. [[startSusp]]
*/
fun start(): Unit = runBlocking { startSusp() }

/**
* Неблокирующая функция старта получения и обработки сообщений из Кафки. Блокирующая версия - см. [[start]]
*/
suspend fun startSusp() {
process.value = true
try {
consumer.subscribe(topicsAndStrategyByInputTopic.keys)
while (process.value) {
val records: ConsumerRecords<String, String> = withContext(Dispatchers.IO) {
consumer.poll(Duration.ofSeconds(1))
}
if (!records.isEmpty)
log.debug("Receive ${records.count()} messages")

records.forEach { record: ConsumerRecord<String, String> ->
try {
val (_, outputTopic, strategy) = topicsAndStrategyByInputTopic[record.topic()]
?: throw RuntimeException("Receive message from unknown topic ${record.topic()}")

val resp = config.controllerHelper(
{ strategy.deserialize(record.value(), this) },
{ strategy.serialize(this) },
KafkaConsumer::class,
"kafka-consumer"
)
sendResponse(resp, outputTopic)
} catch (ex: Exception) {
log.error("error", e = ex)
}
}
}
} catch (ex: WakeupException) {
// ignore for shutdown
} catch (ex: RuntimeException) {
// exception handling
withContext(NonCancellable) {
throw ex
}
} finally {
withContext(NonCancellable) {
consumer.close()
}
}
}

private suspend fun sendResponse(json: String, outputTopic: String) {
val resRecord = ProducerRecord(
outputTopic,
// null,
UUID.randomUUID().toString(),
json
)
log.info("sending ${resRecord.key()} to $outputTopic:\n$json")
withContext(Dispatchers.IO) {
producer.send(resRecord)
}
}

/**
* Корректное завершение для методов [[start]], [[startSusp]]
*/
override fun close() {
process.value = false
}

private data class TopicsAndStrategy(
val inputTopic: String,
val outputTopic: String,
val strategy: IConsumerStrategy
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package ru.otus.otuskotlin.marketplace.app.kafka

import ru.otus.otuskotlin.marketplace.api.v1.apiV1RequestDeserialize
import ru.otus.otuskotlin.marketplace.api.v1.apiV1ResponseSerialize
import ru.otus.otuskotlin.marketplace.api.v1.models.IRequest
import ru.otus.otuskotlin.marketplace.api.v1.models.IResponse
import ru.otus.otuskotlin.marketplace.common.MkplContext
import ru.otus.otuskotlin.marketplace.mappers.v1.fromTransport
import ru.otus.otuskotlin.marketplace.mappers.v1.toTransportAd

class ConsumerStrategyV1 : IConsumerStrategy {
override fun topics(config: AppKafkaConfig): InputOutputTopics {
return InputOutputTopics(config.kafkaTopicInV1, config.kafkaTopicOutV1)
}

override fun serialize(source: MkplContext): String {
val response: IResponse = source.toTransportAd()
return apiV1ResponseSerialize(response)
}

override fun deserialize(value: String, target: MkplContext) {
val request: IRequest = apiV1RequestDeserialize(value)
target.fromTransport(request)
}
}
Loading

0 comments on commit def93b3

Please sign in to comment.