-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* m5l5 - Kafka * m5l5 - Kafka: refining
- Loading branch information
Showing
21 changed files
with
622 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
version: '3.8' | ||
services: | ||
kafka: | ||
image: apache/kafka:latest | ||
hostname: kafka | ||
container_name: kafka | ||
ports: | ||
- '9092:9092' | ||
environment: | ||
KAFKA_NODE_ID: 1 | ||
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' | ||
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT_HOST://localhost:9092,PLAINTEXT://kafka:19092' | ||
KAFKA_PROCESS_ROLES: 'broker,controller' | ||
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093' | ||
KAFKA_LISTENERS: 'CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092' | ||
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' | ||
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' | ||
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw' | ||
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 | ||
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 | ||
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 | ||
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 | ||
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' | ||
|
||
app: | ||
image: ru.otus.otuskotlin.marketplace/ok-marketplace-app-kafka:0.0.1 | ||
depends_on: | ||
- kafka | ||
links: | ||
- kafka | ||
environment: | ||
KAFKA_HOSTS: kafka:19092 | ||
KAFKA_TOPIC_IN_V1: marketplace-ad-v1-in | ||
KAFKA_TOPIC_OUT_V1: marketplace-ad-v1-out | ||
KAFKA_TOPIC_IN_V2: marketplace-ad-v2-in | ||
KAFKA_TOPIC_OUT_V2: marketplace-ad-v2-out | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
version: '3.8' | ||
services: | ||
zookeeper: | ||
image: confluentinc/cp-zookeeper | ||
container_name: zookeeper | ||
healthcheck: | ||
test: "[[ $$(echo srvr | nc localhost 2181 | grep -oG 'Mode: standalone') = \"Mode: standalone\" ]]" | ||
interval: 10s | ||
timeout: 1s | ||
retries: 30 | ||
environment: | ||
ZOOKEEPER_CLIENT_PORT: 2181 | ||
ZOOKEEPER_TICK_TIME: 2000 | ||
ports: | ||
- "2181:2181" | ||
|
||
kafka: | ||
image: confluentinc/cp-kafka | ||
container_name: kafka | ||
depends_on: | ||
zookeeper: | ||
condition: service_healthy | ||
healthcheck: | ||
test: "test $$( /usr/bin/zookeeper-shell zookeeper:2181 get /brokers/ids/1 | grep { ) != ''" | ||
interval: 3s | ||
timeout: 2s | ||
retries: 300 | ||
environment: | ||
KAFKA_BROKER_ID: 1 | ||
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' | ||
KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092" | ||
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka:29092,EXTERNAL://localhost:9092" | ||
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT" | ||
KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL" | ||
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 | ||
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 | ||
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 | ||
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 | ||
KAFKA_JMX_PORT: 9101 | ||
KAFKA_JMX_HOSTNAME: localhost | ||
ports: | ||
- "9092:9092" | ||
- "9101:9101" | ||
|
||
kafdrop: | ||
image: obsidiandynamics/kafdrop | ||
container_name: kafdrop | ||
restart: "no" | ||
ports: | ||
- "9000:9000" | ||
environment: | ||
KAFKA_BROKERCONNECT: "kafka:29092" | ||
# JVM_OPTS: "-Xms16M -Xmx48M -Xss180K -XX:-TieredCompilation -XX:+UseStringDeduplication -noverify" | ||
depends_on: | ||
- "kafka" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,3 @@ | ||
*/certs/ | ||
/ca/ | ||
/kfdata/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
Получить список топиков | ||
```shell | ||
docker exec -ti kafka /usr/bin/kafka-topics --list --bootstrap-server localhost:9092 | ||
``` | ||
|
||
Отправить сообщение | ||
```shell | ||
docker exec -ti kafka /usr/bin/kafka-console-producer --topic topic1 --bootstrap-server localhost:9092 | ||
``` | ||
Каждая строка - одно сообщение. Прервать - Ctrl+Z | ||
|
||
Получить сообщения | ||
```shell | ||
docker exec -ti kafka /usr/bin/kafka-console-consumer --from-beginning --topic topic1 --bootstrap-server localhost:9092 | ||
``` | ||
|
||
Получить сообщения как consumer1 | ||
```shell | ||
docker exec -ti kafka /usr/bin/kafka-console-consumer --group consumer1 --topic topic1 --bootstrap-server localhost:9092 | ||
|
||
``` | ||
|
||
Отправить сообщение с ключом через двоеточие (key:value) | ||
```shell | ||
docker exec -ti kafka /usr/bin/kafka-console-producer \ | ||
--topic topic1 \ | ||
--property "parse.key=true" \ | ||
--property "key.separator=:" \ | ||
--bootstrap-server \ | ||
localhost:9092 | ||
``` |
35 changes: 35 additions & 0 deletions
35
ok-marketplace-be/ok-marketplace-app-kafka/build.gradle.kts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
plugins { | ||
application | ||
id("build-jvm") | ||
alias(libs.plugins.muschko.java) | ||
} | ||
|
||
application { | ||
mainClass.set("ru.otus.otuskotlin.marketplace.app.kafka.MainKt") | ||
} | ||
|
||
docker { | ||
javaApplication { | ||
baseImage.set("openjdk:17.0.2-slim") | ||
} | ||
} | ||
|
||
dependencies { | ||
implementation(libs.kafka.client) | ||
implementation(libs.coroutines.core) | ||
implementation(libs.kotlinx.atomicfu) | ||
|
||
implementation("ru.otus.otuskotlin.marketplace.libs:ok-marketplace-lib-logging-logback") | ||
|
||
implementation(project(":ok-marketplace-app-common")) | ||
|
||
// transport models | ||
implementation(project(":ok-marketplace-common")) | ||
implementation(project(":ok-marketplace-api-v1-jackson")) | ||
implementation(project(":ok-marketplace-api-v1-mappers")) | ||
implementation(project(":ok-marketplace-api-v2-kmp")) | ||
// logic | ||
implementation(project(":ok-marketplace-biz")) | ||
|
||
testImplementation(kotlin("test-junit")) | ||
} |
36 changes: 36 additions & 0 deletions
36
ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/AppKafkaConfig.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
package ru.otus.otuskotlin.marketplace.app.kafka | ||
|
||
import ru.otus.otuskotlin.marketplace.app.common.IMkplAppSettings | ||
import ru.otus.otuskotlin.marketplace.biz.MkplAdProcessor | ||
import ru.otus.otuskotlin.marketplace.common.MkplCorSettings | ||
import ru.otus.otuskotlin.marketplace.logging.common.MpLoggerProvider | ||
import ru.otus.otuskotlin.marketplace.logging.jvm.mpLoggerLogback | ||
|
||
class AppKafkaConfig( | ||
val kafkaHosts: List<String> = KAFKA_HOSTS, | ||
val kafkaGroupId: String = KAFKA_GROUP_ID, | ||
val kafkaTopicInV1: String = KAFKA_TOPIC_IN_V1, | ||
val kafkaTopicOutV1: String = KAFKA_TOPIC_OUT_V1, | ||
val kafkaTopicInV2: String = KAFKA_TOPIC_IN_V2, | ||
val kafkaTopicOutV2: String = KAFKA_TOPIC_OUT_V2, | ||
override val corSettings: MkplCorSettings = MkplCorSettings( | ||
loggerProvider = MpLoggerProvider { mpLoggerLogback(it) } | ||
), | ||
override val processor: MkplAdProcessor = MkplAdProcessor(corSettings), | ||
): IMkplAppSettings { | ||
companion object { | ||
const val KAFKA_HOST_VAR = "KAFKA_HOSTS" | ||
const val KAFKA_TOPIC_IN_V1_VAR = "KAFKA_TOPIC_IN_V1" | ||
const val KAFKA_TOPIC_OUT_V1_VAR = "KAFKA_TOPIC_OUT_V1" | ||
const val KAFKA_TOPIC_IN_V2_VAR = "KAFKA_TOPIC_IN_V2" | ||
const val KAFKA_TOPIC_OUT_V2_VAR = "KAFKA_TOPIC_OUT_V2" | ||
const val KAFKA_GROUP_ID_VAR = "KAFKA_GROUP_ID" | ||
|
||
val KAFKA_HOSTS by lazy { (System.getenv(KAFKA_HOST_VAR) ?: "").split("\\s*[,; ]\\s*") } | ||
val KAFKA_GROUP_ID by lazy { System.getenv(KAFKA_GROUP_ID_VAR) ?: "marketplace" } | ||
val KAFKA_TOPIC_IN_V1 by lazy { System.getenv(KAFKA_TOPIC_IN_V1_VAR) ?: "marketplace-ad-v1-in" } | ||
val KAFKA_TOPIC_OUT_V1 by lazy { System.getenv(KAFKA_TOPIC_OUT_V1_VAR) ?: "marketplace-ad-v1-out" } | ||
val KAFKA_TOPIC_IN_V2 by lazy { System.getenv(KAFKA_TOPIC_IN_V2_VAR) ?: "marketplace-ad-v2-in" } | ||
val KAFKA_TOPIC_OUT_V2 by lazy { System.getenv(KAFKA_TOPIC_OUT_V2_VAR) ?: "marketplace-ad-v2-out" } | ||
} | ||
} |
110 changes: 110 additions & 0 deletions
110
ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/AppKafkaConsumer.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
package ru.otus.otuskotlin.marketplace.app.kafka | ||
|
||
import kotlinx.atomicfu.atomic | ||
import kotlinx.coroutines.Dispatchers | ||
import kotlinx.coroutines.NonCancellable | ||
import kotlinx.coroutines.runBlocking | ||
import kotlinx.coroutines.withContext | ||
import org.apache.kafka.clients.consumer.Consumer | ||
import org.apache.kafka.clients.consumer.ConsumerRecord | ||
import org.apache.kafka.clients.consumer.ConsumerRecords | ||
import org.apache.kafka.clients.consumer.KafkaConsumer | ||
import org.apache.kafka.clients.producer.Producer | ||
import org.apache.kafka.clients.producer.ProducerRecord | ||
import org.apache.kafka.common.errors.WakeupException | ||
import ru.otus.otuskotlin.marketplace.app.common.controllerHelper | ||
import java.time.Duration | ||
import java.util.* | ||
|
||
/** | ||
* Основной класс для обслуживания Кафка-интерфейса | ||
*/ | ||
class AppKafkaConsumer( | ||
private val config: AppKafkaConfig, | ||
consumerStrategies: List<IConsumerStrategy>, | ||
private val consumer: Consumer<String, String> = config.createKafkaConsumer(), | ||
private val producer: Producer<String, String> = config.createKafkaProducer() | ||
) : AutoCloseable { | ||
private val log = config.corSettings.loggerProvider.logger(this::class) | ||
private val process = atomic(true) // пояснить | ||
private val topicsAndStrategyByInputTopic: Map<String, TopicsAndStrategy> = consumerStrategies.associate { | ||
val topics = it.topics(config) | ||
topics.input to TopicsAndStrategy(topics.input, topics.output, it) | ||
} | ||
|
||
/** | ||
* Блокирующая функция старта получения и обработки сообщений из Кафки. Для неблокирующей версии см. [[startSusp]] | ||
*/ | ||
fun start(): Unit = runBlocking { startSusp() } | ||
|
||
/** | ||
* Неблокирующая функция старта получения и обработки сообщений из Кафки. Блокирующая версия - см. [[start]] | ||
*/ | ||
suspend fun startSusp() { | ||
process.value = true | ||
try { | ||
consumer.subscribe(topicsAndStrategyByInputTopic.keys) | ||
while (process.value) { | ||
val records: ConsumerRecords<String, String> = withContext(Dispatchers.IO) { | ||
consumer.poll(Duration.ofSeconds(1)) | ||
} | ||
if (!records.isEmpty) | ||
log.debug("Receive ${records.count()} messages") | ||
|
||
records.forEach { record: ConsumerRecord<String, String> -> | ||
try { | ||
val (_, outputTopic, strategy) = topicsAndStrategyByInputTopic[record.topic()] | ||
?: throw RuntimeException("Receive message from unknown topic ${record.topic()}") | ||
|
||
val resp = config.controllerHelper( | ||
{ strategy.deserialize(record.value(), this) }, | ||
{ strategy.serialize(this) }, | ||
KafkaConsumer::class, | ||
"kafka-consumer" | ||
) | ||
sendResponse(resp, outputTopic) | ||
} catch (ex: Exception) { | ||
log.error("error", e = ex) | ||
} | ||
} | ||
} | ||
} catch (ex: WakeupException) { | ||
// ignore for shutdown | ||
} catch (ex: RuntimeException) { | ||
// exception handling | ||
withContext(NonCancellable) { | ||
throw ex | ||
} | ||
} finally { | ||
withContext(NonCancellable) { | ||
consumer.close() | ||
} | ||
} | ||
} | ||
|
||
private suspend fun sendResponse(json: String, outputTopic: String) { | ||
val resRecord = ProducerRecord( | ||
outputTopic, | ||
// null, | ||
UUID.randomUUID().toString(), | ||
json | ||
) | ||
log.info("sending ${resRecord.key()} to $outputTopic:\n$json") | ||
withContext(Dispatchers.IO) { | ||
producer.send(resRecord) | ||
} | ||
} | ||
|
||
/** | ||
* Корректное завершение для методов [[start]], [[startSusp]] | ||
*/ | ||
override fun close() { | ||
process.value = false | ||
} | ||
|
||
private data class TopicsAndStrategy( | ||
val inputTopic: String, | ||
val outputTopic: String, | ||
val strategy: IConsumerStrategy | ||
) | ||
} |
Oops, something went wrong.