Skip to content

Commit

Permalink
m5l5 - Kafka: refining
Browse files Browse the repository at this point in the history
  • Loading branch information
svok committed Jun 4, 2024
1 parent 88468e3 commit aa35c15
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,33 +13,33 @@ import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.errors.WakeupException
import ru.otus.otuskotlin.marketplace.app.common.controllerHelper
import ru.otus.otuskotlin.marketplace.common.MkplContext
import java.time.Duration
import java.util.*

data class InputOutputTopics(val input: String, val output: String)

interface ConsumerStrategy {
fun topics(config: AppKafkaConfig): InputOutputTopics
fun serialize(source: MkplContext): String
fun deserialize(value: String, target: MkplContext)
}

/**
* Основной класс для обслуживания Кафка-интерфейса
*/
class AppKafkaConsumer(
private val config: AppKafkaConfig,
consumerStrategies: List<ConsumerStrategy>,
consumerStrategies: List<IConsumerStrategy>,
private val consumer: Consumer<String, String> = config.createKafkaConsumer(),
private val producer: Producer<String, String> = config.createKafkaProducer()
) {
) : AutoCloseable {
private val log = config.corSettings.loggerProvider.logger(this::class)
private val process = atomic(true) // пояснить
private val topicsAndStrategyByInputTopic: Map<String, TopicsAndStrategy> = consumerStrategies.associate {
val topics = it.topics(config)
topics.input to TopicsAndStrategy(topics.input, topics.output, it)
}

fun start() = runBlocking { startSusp() }
/**
* Блокирующая функция старта получения и обработки сообщений из Кафки. Для неблокирующей версии см. [[startSusp]]
*/
fun start(): Unit = runBlocking { startSusp() }

/**
* Неблокирующая функция старта получения и обработки сообщений из Кафки. Блокирующая версия - см. [[start]]
*/
suspend fun startSusp() {
process.value = true
try {
Expand Down Expand Up @@ -85,6 +85,7 @@ class AppKafkaConsumer(
private suspend fun sendResponse(json: String, outputTopic: String) {
val resRecord = ProducerRecord(
outputTopic,
// null,
UUID.randomUUID().toString(),
json
)
Expand All @@ -94,13 +95,16 @@ class AppKafkaConsumer(
}
}

fun stop() {
/**
* Корректное завершение для методов [[start]], [[startSusp]]
*/
override fun close() {
process.value = false
}

private data class TopicsAndStrategy(
val inputTopic: String,
val outputTopic: String,
val strategy: ConsumerStrategy
val strategy: IConsumerStrategy
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import ru.otus.otuskotlin.marketplace.common.MkplContext
import ru.otus.otuskotlin.marketplace.mappers.v1.fromTransport
import ru.otus.otuskotlin.marketplace.mappers.v1.toTransportAd

class ConsumerStrategyV1 : ConsumerStrategy {
class ConsumerStrategyV1 : IConsumerStrategy {
override fun topics(config: AppKafkaConfig): InputOutputTopics {
return InputOutputTopics(config.kafkaTopicInV1, config.kafkaTopicOutV1)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import ru.otus.otuskotlin.marketplace.api.v2.models.IRequest
import ru.otus.otuskotlin.marketplace.api.v2.models.IResponse
import ru.otus.otuskotlin.marketplace.common.MkplContext

class ConsumerStrategyV2 : ConsumerStrategy {
class ConsumerStrategyV2 : IConsumerStrategy {
override fun topics(config: AppKafkaConfig): InputOutputTopics {
return InputOutputTopics(config.kafkaTopicInV2, config.kafkaTopicOutV2)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package ru.otus.otuskotlin.marketplace.app.kafka

import ru.otus.otuskotlin.marketplace.common.MkplContext

/**
* Интерфейс стратегии для обслуживания версии API
*/
interface IConsumerStrategy {
/**
* Топики, для которых применяется стратегия
*/
fun topics(config: AppKafkaConfig): InputOutputTopics
/**
* Сериализатор для версии API
*/
fun serialize(source: MkplContext): String
/**
* Десериализатор для версии API
*/
fun deserialize(value: String, target: MkplContext)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package ru.otus.otuskotlin.marketplace.app.kafka

/**
* Структура для хранения входящего и исходящего топиков для версии API
*/
data class InputOutputTopics(
/**
* Топик входящих в приложение сообщений
*/
val input: String,
/**
* Топик для исходящих из приложения сообщений
*/
val output: String,
)
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,23 @@ class KafkaControllerTest {
PARTITION,
0L,
"test-1",
apiV1RequestSerialize(AdCreateRequest(
ad = AdCreateObject(
title = "Требуется болт",
description = "some testing ad to check them all",
visibility = AdVisibility.OWNER_ONLY,
adType = DealSide.DEMAND
apiV1RequestSerialize(
AdCreateRequest(
ad = AdCreateObject(
title = "Требуется болт",
description = "some testing ad to check them all",
visibility = AdVisibility.OWNER_ONLY,
adType = DealSide.DEMAND,
),
debug = AdDebug(
mode = AdRequestDebugMode.STUB,
stub = AdRequestDebugStubs.SUCCESS,
),
),
debug = AdDebug(
mode = AdRequestDebugMode.STUB,
stub = AdRequestDebugStubs.SUCCESS
)
))
)
)
)
app.stop()
app.close()
}

val startOffsets: MutableMap<TopicPartition, Long> = mutableMapOf()
Expand Down

0 comments on commit aa35c15

Please sign in to comment.