-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
19 changed files
with
580 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
version: '3.8' | ||
services: | ||
kafka: | ||
image: apache/kafka:latest | ||
hostname: kafka | ||
container_name: kafka | ||
ports: | ||
- '9092:9092' | ||
environment: | ||
KAFKA_NODE_ID: 1 | ||
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' | ||
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT_HOST://localhost:9092,PLAINTEXT://kafka:19092' | ||
KAFKA_PROCESS_ROLES: 'broker,controller' | ||
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093' | ||
KAFKA_LISTENERS: 'CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092' | ||
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' | ||
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' | ||
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw' | ||
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 | ||
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 | ||
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 | ||
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 | ||
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' | ||
|
||
app: | ||
image: ru.otus.otuskotlin.marketplace/ok-marketplace-app-kafka:0.0.1 | ||
depends_on: | ||
- kafka | ||
links: | ||
- kafka | ||
environment: | ||
KAFKA_HOSTS: kafka:19092 | ||
KAFKA_TOPIC_IN_V1: marketplace-ad-v1-in | ||
KAFKA_TOPIC_OUT_V1: marketplace-ad-v1-out | ||
KAFKA_TOPIC_IN_V2: marketplace-ad-v2-in | ||
KAFKA_TOPIC_OUT_V2: marketplace-ad-v2-out | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
version: '3.8' | ||
services: | ||
zookeeper: | ||
image: confluentinc/cp-zookeeper | ||
container_name: zookeeper | ||
healthcheck: | ||
test: "[[ $$(echo srvr | nc localhost 2181 | grep -oG 'Mode: standalone') = \"Mode: standalone\" ]]" | ||
interval: 10s | ||
timeout: 1s | ||
retries: 30 | ||
environment: | ||
ZOOKEEPER_CLIENT_PORT: 2181 | ||
ZOOKEEPER_TICK_TIME: 2000 | ||
ports: | ||
- "2181:2181" | ||
|
||
kafka: | ||
image: confluentinc/cp-kafka | ||
container_name: kafka | ||
depends_on: | ||
zookeeper: | ||
condition: service_healthy | ||
healthcheck: | ||
test: "test $$( /usr/bin/zookeeper-shell zookeeper:2181 get /brokers/ids/1 | grep { ) != ''" | ||
interval: 3s | ||
timeout: 2s | ||
retries: 300 | ||
environment: | ||
KAFKA_BROKER_ID: 1 | ||
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' | ||
KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092" | ||
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka:29092,EXTERNAL://localhost:9092" | ||
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT" | ||
KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL" | ||
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 | ||
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 | ||
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 | ||
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 | ||
KAFKA_JMX_PORT: 9101 | ||
KAFKA_JMX_HOSTNAME: localhost | ||
ports: | ||
- "9092:9092" | ||
- "9101:9101" | ||
|
||
kafdrop: | ||
image: obsidiandynamics/kafdrop | ||
container_name: kafdrop | ||
restart: "no" | ||
ports: | ||
- "9000:9000" | ||
environment: | ||
KAFKA_BROKERCONNECT: "kafka:29092" | ||
# JVM_OPTS: "-Xms16M -Xmx48M -Xss180K -XX:-TieredCompilation -XX:+UseStringDeduplication -noverify" | ||
depends_on: | ||
- "kafka" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,3 @@ | ||
*/certs/ | ||
/ca/ | ||
/kfdata/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
Получить список топиков | ||
```shell | ||
docker exec -ti kafka /usr/bin/kafka-topics --list --bootstrap-server localhost:9092 | ||
``` | ||
|
||
Отправить сообщение | ||
```shell | ||
docker exec -ti kafka /usr/bin/kafka-console-producer --topic topic1 --bootstrap-server localhost:9092 | ||
``` | ||
Каждая строка - одно сообщение. Прервать - Ctrl+Z | ||
|
||
Получить сообщения | ||
```shell | ||
docker exec -ti kafka /usr/bin/kafka-console-consumer --from-beginning --topic topic1 --bootstrap-server localhost:9092 | ||
``` | ||
|
||
Получить сообщения как consumer1 | ||
```shell | ||
docker exec -ti kafka /usr/bin/kafka-console-consumer --group consumer1 --topic topic1 --bootstrap-server localhost:9092 | ||
|
||
``` | ||
|
||
Отправить сообщение с ключом через двоеточие (key:value) | ||
```shell | ||
docker exec -ti kafka /usr/bin/kafka-console-producer \ | ||
--topic topic1 \ | ||
--property "parse.key=true" \ | ||
--property "key.separator=:" \ | ||
--bootstrap-server \ | ||
localhost:9092 | ||
``` |
35 changes: 35 additions & 0 deletions
35
ok-marketplace-be/ok-marketplace-app-kafka/build.gradle.kts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
plugins { | ||
application | ||
id("build-jvm") | ||
alias(libs.plugins.muschko.java) | ||
} | ||
|
||
application { | ||
mainClass.set("ru.otus.otuskotlin.marketplace.app.kafka.MainKt") | ||
} | ||
|
||
docker { | ||
javaApplication { | ||
baseImage.set("openjdk:17.0.2-slim") | ||
} | ||
} | ||
|
||
dependencies { | ||
implementation(libs.kafka.client) | ||
implementation(libs.coroutines.core) | ||
implementation(libs.kotlinx.atomicfu) | ||
|
||
implementation("ru.otus.otuskotlin.marketplace.libs:ok-marketplace-lib-logging-logback") | ||
|
||
implementation(project(":ok-marketplace-app-common")) | ||
|
||
// transport models | ||
implementation(project(":ok-marketplace-common")) | ||
implementation(project(":ok-marketplace-api-v1-jackson")) | ||
implementation(project(":ok-marketplace-api-v1-mappers")) | ||
implementation(project(":ok-marketplace-api-v2-kmp")) | ||
// logic | ||
implementation(project(":ok-marketplace-biz")) | ||
|
||
testImplementation(kotlin("test-junit")) | ||
} |
36 changes: 36 additions & 0 deletions
36
ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/AppKafkaConfig.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
package ru.otus.otuskotlin.marketplace.app.kafka | ||
|
||
import ru.otus.otuskotlin.marketplace.app.common.IMkplAppSettings | ||
import ru.otus.otuskotlin.marketplace.biz.MkplAdProcessor | ||
import ru.otus.otuskotlin.marketplace.common.MkplCorSettings | ||
import ru.otus.otuskotlin.marketplace.logging.common.MpLoggerProvider | ||
import ru.otus.otuskotlin.marketplace.logging.jvm.mpLoggerLogback | ||
|
||
class AppKafkaConfig( | ||
val kafkaHosts: List<String> = KAFKA_HOSTS, | ||
val kafkaGroupId: String = KAFKA_GROUP_ID, | ||
val kafkaTopicInV1: String = KAFKA_TOPIC_IN_V1, | ||
val kafkaTopicOutV1: String = KAFKA_TOPIC_OUT_V1, | ||
val kafkaTopicInV2: String = KAFKA_TOPIC_IN_V2, | ||
val kafkaTopicOutV2: String = KAFKA_TOPIC_OUT_V2, | ||
override val corSettings: MkplCorSettings = MkplCorSettings( | ||
loggerProvider = MpLoggerProvider { mpLoggerLogback(it) } | ||
), | ||
override val processor: MkplAdProcessor = MkplAdProcessor(corSettings), | ||
): IMkplAppSettings { | ||
companion object { | ||
const val KAFKA_HOST_VAR = "KAFKA_HOSTS" | ||
const val KAFKA_TOPIC_IN_V1_VAR = "KAFKA_TOPIC_IN_V1" | ||
const val KAFKA_TOPIC_OUT_V1_VAR = "KAFKA_TOPIC_OUT_V1" | ||
const val KAFKA_TOPIC_IN_V2_VAR = "KAFKA_TOPIC_IN_V2" | ||
const val KAFKA_TOPIC_OUT_V2_VAR = "KAFKA_TOPIC_OUT_V2" | ||
const val KAFKA_GROUP_ID_VAR = "KAFKA_GROUP_ID" | ||
|
||
val KAFKA_HOSTS by lazy { (System.getenv(KAFKA_HOST_VAR) ?: "").split("\\s*[,; ]\\s*") } | ||
val KAFKA_GROUP_ID by lazy { System.getenv(KAFKA_GROUP_ID_VAR) ?: "marketplace" } | ||
val KAFKA_TOPIC_IN_V1 by lazy { System.getenv(KAFKA_TOPIC_IN_V1_VAR) ?: "marketplace-ad-v1-in" } | ||
val KAFKA_TOPIC_OUT_V1 by lazy { System.getenv(KAFKA_TOPIC_OUT_V1_VAR) ?: "marketplace-ad-v1-out" } | ||
val KAFKA_TOPIC_IN_V2 by lazy { System.getenv(KAFKA_TOPIC_IN_V2_VAR) ?: "marketplace-ad-v2-in" } | ||
val KAFKA_TOPIC_OUT_V2 by lazy { System.getenv(KAFKA_TOPIC_OUT_V2_VAR) ?: "marketplace-ad-v2-out" } | ||
} | ||
} |
106 changes: 106 additions & 0 deletions
106
ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/AppKafkaConsumer.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
package ru.otus.otuskotlin.marketplace.app.kafka | ||
|
||
import kotlinx.atomicfu.atomic | ||
import kotlinx.coroutines.Dispatchers | ||
import kotlinx.coroutines.NonCancellable | ||
import kotlinx.coroutines.runBlocking | ||
import kotlinx.coroutines.withContext | ||
import org.apache.kafka.clients.consumer.Consumer | ||
import org.apache.kafka.clients.consumer.ConsumerRecord | ||
import org.apache.kafka.clients.consumer.ConsumerRecords | ||
import org.apache.kafka.clients.consumer.KafkaConsumer | ||
import org.apache.kafka.clients.producer.Producer | ||
import org.apache.kafka.clients.producer.ProducerRecord | ||
import org.apache.kafka.common.errors.WakeupException | ||
import ru.otus.otuskotlin.marketplace.app.common.controllerHelper | ||
import ru.otus.otuskotlin.marketplace.common.MkplContext | ||
import java.time.Duration | ||
import java.util.* | ||
|
||
data class InputOutputTopics(val input: String, val output: String) | ||
|
||
interface ConsumerStrategy { | ||
fun topics(config: AppKafkaConfig): InputOutputTopics | ||
fun serialize(source: MkplContext): String | ||
fun deserialize(value: String, target: MkplContext) | ||
} | ||
|
||
class AppKafkaConsumer( | ||
private val config: AppKafkaConfig, | ||
consumerStrategies: List<ConsumerStrategy>, | ||
private val consumer: Consumer<String, String> = config.createKafkaConsumer(), | ||
private val producer: Producer<String, String> = config.createKafkaProducer() | ||
) { | ||
private val log = config.corSettings.loggerProvider.logger(this::class) | ||
private val process = atomic(true) // пояснить | ||
private val topicsAndStrategyByInputTopic: Map<String, TopicsAndStrategy> = consumerStrategies.associate { | ||
val topics = it.topics(config) | ||
topics.input to TopicsAndStrategy(topics.input, topics.output, it) | ||
} | ||
|
||
fun start() = runBlocking { startSusp() } | ||
|
||
suspend fun startSusp() { | ||
process.value = true | ||
try { | ||
consumer.subscribe(topicsAndStrategyByInputTopic.keys) | ||
while (process.value) { | ||
val records: ConsumerRecords<String, String> = withContext(Dispatchers.IO) { | ||
consumer.poll(Duration.ofSeconds(1)) | ||
} | ||
if (!records.isEmpty) | ||
log.debug("Receive ${records.count()} messages") | ||
|
||
records.forEach { record: ConsumerRecord<String, String> -> | ||
try { | ||
val (_, outputTopic, strategy) = topicsAndStrategyByInputTopic[record.topic()] | ||
?: throw RuntimeException("Receive message from unknown topic ${record.topic()}") | ||
|
||
val resp = config.controllerHelper( | ||
{ strategy.deserialize(record.value(), this) }, | ||
{ strategy.serialize(this) }, | ||
KafkaConsumer::class, | ||
"kafka-consumer" | ||
) | ||
sendResponse(resp, outputTopic) | ||
} catch (ex: Exception) { | ||
log.error("error", e = ex) | ||
} | ||
} | ||
} | ||
} catch (ex: WakeupException) { | ||
// ignore for shutdown | ||
} catch (ex: RuntimeException) { | ||
// exception handling | ||
withContext(NonCancellable) { | ||
throw ex | ||
} | ||
} finally { | ||
withContext(NonCancellable) { | ||
consumer.close() | ||
} | ||
} | ||
} | ||
|
||
private suspend fun sendResponse(json: String, outputTopic: String) { | ||
val resRecord = ProducerRecord( | ||
outputTopic, | ||
UUID.randomUUID().toString(), | ||
json | ||
) | ||
log.info("sending ${resRecord.key()} to $outputTopic:\n$json") | ||
withContext(Dispatchers.IO) { | ||
producer.send(resRecord) | ||
} | ||
} | ||
|
||
fun stop() { | ||
process.value = false | ||
} | ||
|
||
private data class TopicsAndStrategy( | ||
val inputTopic: String, | ||
val outputTopic: String, | ||
val strategy: ConsumerStrategy | ||
) | ||
} |
25 changes: 25 additions & 0 deletions
25
ok-marketplace-be/ok-marketplace-app-kafka/src/main/kotlin/ConsumerStrategyV1.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
package ru.otus.otuskotlin.marketplace.app.kafka | ||
|
||
import ru.otus.otuskotlin.marketplace.api.v1.apiV1RequestDeserialize | ||
import ru.otus.otuskotlin.marketplace.api.v1.apiV1ResponseSerialize | ||
import ru.otus.otuskotlin.marketplace.api.v1.models.IRequest | ||
import ru.otus.otuskotlin.marketplace.api.v1.models.IResponse | ||
import ru.otus.otuskotlin.marketplace.common.MkplContext | ||
import ru.otus.otuskotlin.marketplace.mappers.v1.fromTransport | ||
import ru.otus.otuskotlin.marketplace.mappers.v1.toTransportAd | ||
|
||
class ConsumerStrategyV1 : ConsumerStrategy { | ||
override fun topics(config: AppKafkaConfig): InputOutputTopics { | ||
return InputOutputTopics(config.kafkaTopicInV1, config.kafkaTopicOutV1) | ||
} | ||
|
||
override fun serialize(source: MkplContext): String { | ||
val response: IResponse = source.toTransportAd() | ||
return apiV1ResponseSerialize(response) | ||
} | ||
|
||
override fun deserialize(value: String, target: MkplContext) { | ||
val request: IRequest = apiV1RequestDeserialize(value) | ||
target.fromTransport(request) | ||
} | ||
} |
Oops, something went wrong.