Skip to content

Commit

Permalink
M5l4 rabbit
Browse files Browse the repository at this point in the history
  • Loading branch information
svok authored and evgnep committed Aug 24, 2024
1 parent cd5c159 commit f688eab
Show file tree
Hide file tree
Showing 20 changed files with 769 additions and 2 deletions.
7 changes: 7 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ coroutines-reactive = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-react

jackson-kotlin = { module = "com.fasterxml.jackson.module:jackson-module-kotlin", version.ref = "jackson" }
jackson-datatype = { module = "com.fasterxml.jackson.datatype:jackson-datatype-jsr310", version.ref = "jackson" }
jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind", version.ref = "jackson" }

# Logging
logback = { module = "ch.qos.logback:logback-classic", version.ref = "logback" }
Expand Down Expand Up @@ -73,6 +74,9 @@ spring-webflux = { module = "org.springframework.boot:spring-boot-starter-webflu
spring-webflux-ui = { module = "org.springdoc:springdoc-openapi-starter-webflux-ui", version = "2.3.0" }
spring-test = { module = "org.springframework.boot:spring-boot-starter-test" }

# Message Queues
rabbitmq-client = { module = "com.rabbitmq:amqp-client", version = "5.20.0" }

# Testing
kotest-junit5 = { module = "io.kotest:kotest-runner-junit5", version.ref = "kotest" }
kotest-core = { module = "io.kotest:kotest-assertions-core", version.ref = "kotest" }
Expand All @@ -81,6 +85,7 @@ kotest-property = { module = "io.kotest:kotest-property", version.ref = "kotest"
mockito-kotlin = { module = "org.mockito.kotlin:mockito-kotlin", version = "5.2.1" }

testcontainers-core = { module = "org.testcontainers:testcontainers", version.ref = "testcontainers" }
testcontainers-rabbitmq = { module = "org.testcontainers:rabbitmq", version.ref = "testcontainers" }

[bundles]
kotest = ["kotest-junit5", "kotest-core", "kotest-datatest", "kotest-property"]
Expand All @@ -91,6 +96,7 @@ kotlin-jvm = { id = "org.jetbrains.kotlin.jvm", version.ref = "kotlin" }
openapi-generator = { id = "org.openapi.generator", version.ref = "openapi-generator" }
crowdproj-generator = { id = "com.crowdproj.generator", version = "0.2.0" }
kotlinx-serialization = { id = "org.jetbrains.kotlin.plugin.serialization", version.ref = "kotlin" }
shadowJar = { id = "com.github.johnrengelman.shadow", version = "8.1.1" }

# Spring
spring-boot = { id = "org.springframework.boot", version.ref = "spring-boot" }
Expand All @@ -102,3 +108,4 @@ ktor = { id = "io.ktor.plugin", version.ref = "ktor" }

# Docker
muschko-remote = { id = "com.bmuschko.docker-remote-api", version.ref = "muschko" }
muschko-java = { id = "com.bmuschko.docker-java-application", version.ref = "muschko" }
2 changes: 2 additions & 0 deletions ok-marketplace-be/ok-marketplace-app-ktor/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ kotlin {

// Stubs
implementation(project(":ok-marketplace-stubs"))
// RabbitMQ
// implementation(project(":ok-marketplace-app-rabbit"))

implementation(libs.kotlinx.serialization.core)
implementation(libs.kotlinx.serialization.json)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ fun Application.moduleJvm(
}
module(appSettings)

// Неофициальное задание. Попробуйте сделать этот код работающим
// val rabbitServer = RabbitApp(appSettings, this@moduleJvm)
// rabbitServer?.start()

routing {
route("v1") {
install(ContentNegotiation) {
Expand All @@ -45,3 +49,4 @@ fun Application.moduleJvm(
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,18 @@ ktor:
# logger: socket
# socketLogger:
# port: 24224

# Пример конфига для RabbitMQ
#rabbit:
# enable: false
# host: localhost
# port: 5672
## username: guest
## password: guest
# v1:
# keyIn: mkpl-ads-v1-in
# keyOut: mkpl-ads-v1-out
# exchange: mkpl-ads-v1-exchange
# queue: mkpl-ads-v1-queue
# consumerTag: "mkpl-ads-v1-consumer"
# exchangeType: direct
37 changes: 37 additions & 0 deletions ok-marketplace-be/ok-marketplace-app-rabbit/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
plugins {
id("build-jvm")
application
alias(libs.plugins.shadowJar)
alias(libs.plugins.muschko.java)
}

application {
mainClass.set("ru.otus.otuskotlin.marketplace.app.rabbit.ApplicationKt")
}

dependencies {

implementation(kotlin("stdlib"))

implementation(libs.rabbitmq.client)
implementation(libs.jackson.databind)
implementation(libs.logback)
implementation(libs.coroutines.core)

implementation(project(":ok-marketplace-common"))
implementation(project(":ok-marketplace-app-common"))
implementation("ru.otus.otuskotlin.marketplace.libs:ok-marketplace-lib-logging-logback")

// v1 api
implementation(project(":ok-marketplace-api-v1-jackson"))
implementation(project(":ok-marketplace-api-v1-mappers"))

// v2 api
implementation(project(":ok-marketplace-api-v2-kmp"))

implementation(project(":ok-marketplace-biz"))
implementation(project(":ok-marketplace-stubs"))

testImplementation(libs.testcontainers.rabbitmq)
testImplementation(kotlin("test"))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package ru.otus.otuskotlin.marketplace.app.rabbit

import kotlinx.coroutines.runBlocking
import ru.otus.otuskotlin.marketplace.app.rabbit.config.MkplAppSettings
import ru.otus.otuskotlin.marketplace.app.rabbit.config.RabbitConfig
import ru.otus.otuskotlin.marketplace.app.rabbit.mappers.fromArgs
import ru.otus.otuskotlin.marketplace.common.MkplCorSettings
import ru.otus.otuskotlin.marketplace.logging.common.MpLoggerProvider
import ru.otus.otuskotlin.marketplace.logging.jvm.mpLoggerLogback

fun main(vararg args: String) = runBlocking {
val appSettings = MkplAppSettings(
rabbit = RabbitConfig.fromArgs(*args),
corSettings = MkplCorSettings(
loggerProvider = MpLoggerProvider { mpLoggerLogback(it) }
)
)
val app = RabbitApp(appSettings = appSettings, this)
app.start()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package ru.otus.otuskotlin.marketplace.app.rabbit

import kotlinx.coroutines.*
import ru.otus.otuskotlin.marketplace.app.rabbit.config.MkplAppSettings
import ru.otus.otuskotlin.marketplace.app.rabbit.controllers.IRabbitMqController
import ru.otus.otuskotlin.marketplace.app.rabbit.controllers.RabbitDirectControllerV1
import ru.otus.otuskotlin.marketplace.app.rabbit.controllers.RabbitDirectControllerV2
import java.util.concurrent.atomic.AtomicBoolean

// Класс запускает все контроллеры
@OptIn(ExperimentalCoroutinesApi::class)
class RabbitApp(
appSettings: MkplAppSettings,
private val scope: CoroutineScope = CoroutineScope(Dispatchers.Default),
) : AutoCloseable {
private val logger = appSettings.corSettings.loggerProvider.logger(this::class)
private val controllers: List<IRabbitMqController> = listOf(
RabbitDirectControllerV1(appSettings),
RabbitDirectControllerV2(appSettings),
)
private val runFlag = AtomicBoolean(true)

fun start() {
runFlag.set(true)
controllers.forEach {
scope.launch(
Dispatchers.IO.limitedParallelism(1) + CoroutineName("thread-${it.exchangeConfig.consumerTag}")
) {
while (runFlag.get()) {
try {
logger.info("Process...${it.exchangeConfig.consumerTag}")
it.process()
} catch (e: RuntimeException) {
// логируем, что-то делаем
logger.error("Обработка завалена, возможно из-за потери соединения с RabbitMQ. Рестартуем")
e.printStackTrace()
}
}
}
}
}

override fun close() {
runFlag.set(false)
controllers.forEach { it.close() }
scope.cancel()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package ru.otus.otuskotlin.marketplace.app.rabbit.config

import ru.otus.otuskotlin.marketplace.app.common.IMkplAppSettings

interface IMkplAppRabbitSettings: IMkplAppSettings {
val rabbit: RabbitConfig
val controllersConfigV1: RabbitExchangeConfiguration
val controllersConfigV2: RabbitExchangeConfiguration
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package ru.otus.otuskotlin.marketplace.app.rabbit.config

import ru.otus.otuskotlin.marketplace.app.common.IMkplAppSettings
import ru.otus.otuskotlin.marketplace.biz.MkplAdProcessor
import ru.otus.otuskotlin.marketplace.common.MkplCorSettings

data class MkplAppSettings(
override val corSettings: MkplCorSettings = MkplCorSettings(),
override val processor: MkplAdProcessor = MkplAdProcessor(corSettings),
override val rabbit: RabbitConfig = RabbitConfig(),
override val controllersConfigV1: RabbitExchangeConfiguration = RabbitExchangeConfiguration.NONE,
override val controllersConfigV2: RabbitExchangeConfiguration = RabbitExchangeConfiguration.NONE,
): IMkplAppSettings, IMkplAppRabbitSettings
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package ru.otus.otuskotlin.marketplace.app.rabbit.config

data class RabbitConfig(
val host: String = HOST,
val port: Int = PORT,
val user: String = USER,
val password: String = PASSWORD
) {
companion object {
const val HOST = "localhost"
const val PORT = 5672
const val USER = "guest"
const val PASSWORD = "guest"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package ru.otus.otuskotlin.marketplace.app.rabbit.config

// наш класс настроек взаимодействия с RMQ
data class RabbitExchangeConfiguration(
val keyIn: String = "",
val keyOut: String = "",
// Отправляем сообщение в обменник
val exchange: String = "",
// Подписываемся на очередь
val queue: String = "",
val consumerTag: String = "",
val exchangeType: String = "direct" // Объявляем обменник типа "direct" (сообщения передаются в те очереди, где ключ совпадает)
) {
companion object {
val NONE = RabbitExchangeConfiguration()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package ru.otus.otuskotlin.marketplace.app.rabbit.controllers

import ru.otus.otuskotlin.marketplace.app.rabbit.config.RabbitExchangeConfiguration

interface IRabbitMqController {
val exchangeConfig: RabbitExchangeConfiguration
suspend fun process()
fun close()
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package ru.otus.otuskotlin.marketplace.app.rabbit.controllers

import com.rabbitmq.client.Channel
import com.rabbitmq.client.Delivery
import ru.otus.otuskotlin.marketplace.api.v1.apiV1Mapper
import ru.otus.otuskotlin.marketplace.api.v1.models.IRequest
import ru.otus.otuskotlin.marketplace.app.common.controllerHelper
import ru.otus.otuskotlin.marketplace.app.rabbit.config.MkplAppSettings
import ru.otus.otuskotlin.marketplace.common.MkplContext
import ru.otus.otuskotlin.marketplace.common.helpers.asMkplError
import ru.otus.otuskotlin.marketplace.common.models.MkplState
import ru.otus.otuskotlin.marketplace.mappers.v1.fromTransport
import ru.otus.otuskotlin.marketplace.mappers.v1.toTransportAd

// наследник RabbitProcessorBase, увязывает транспортную и бизнес-части
class RabbitDirectControllerV1(
private val appSettings: MkplAppSettings,
) : RabbitProcessorBase(
rabbitConfig = appSettings.rabbit,
exchangeConfig = appSettings.controllersConfigV1,
loggerProvider = appSettings.corSettings.loggerProvider,
) {
override suspend fun Channel.processMessage(message: Delivery) {
appSettings.controllerHelper(
{
val req = apiV1Mapper.readValue(message.body, IRequest::class.java)
fromTransport(req)
},
{
val res = toTransportAd()
apiV1Mapper.writeValueAsBytes(res).also {
basicPublish(exchangeConfig.exchange, exchangeConfig.keyOut, null, it)
}
},
this@RabbitDirectControllerV1::class,
"rabbitmq-v1-processor"
)
}

override fun Channel.onError(e: Throwable, delivery: Delivery) {
val context = MkplContext()
e.printStackTrace()
context.state = MkplState.FAILING
context.errors.add(e.asMkplError())
val response = context.toTransportAd()
apiV1Mapper.writeValueAsBytes(response).also {
basicPublish(exchangeConfig.exchange, exchangeConfig.keyOut, null, it)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package ru.otus.otuskotlin.marketplace.app.rabbit.controllers

import com.rabbitmq.client.Channel
import com.rabbitmq.client.Delivery
import ru.otus.otuskotlin.marketplace.api.v2.apiV2RequestDeserialize
import ru.otus.otuskotlin.marketplace.api.v2.apiV2ResponseSerialize
import ru.otus.otuskotlin.marketplace.api.v2.mappers.fromTransport
import ru.otus.otuskotlin.marketplace.api.v2.mappers.toTransportAd
import ru.otus.otuskotlin.marketplace.api.v2.models.IRequest
import ru.otus.otuskotlin.marketplace.app.common.controllerHelper
import ru.otus.otuskotlin.marketplace.app.rabbit.config.MkplAppSettings
import ru.otus.otuskotlin.marketplace.common.MkplContext
import ru.otus.otuskotlin.marketplace.common.helpers.asMkplError
import ru.otus.otuskotlin.marketplace.common.models.MkplState

class RabbitDirectControllerV2(
private val appSettings: MkplAppSettings,
) : RabbitProcessorBase(
rabbitConfig = appSettings.rabbit,
exchangeConfig = appSettings.controllersConfigV2,
loggerProvider = appSettings.corSettings.loggerProvider,
) {

override suspend fun Channel.processMessage(message: Delivery) {
appSettings.controllerHelper(
{
val req = apiV2RequestDeserialize<IRequest>(String(message.body))
fromTransport(req)
},
{
val res = toTransportAd()
apiV2ResponseSerialize(res).also {
basicPublish(exchangeConfig.exchange, exchangeConfig.keyOut, null, it.toByteArray())
}
},
RabbitDirectControllerV2::class,
"rabbitmq-v2-processor"
)
}

override fun Channel.onError(e: Throwable, delivery: Delivery) {
val context = MkplContext()
e.printStackTrace()
context.state = MkplState.FAILING
context.errors.add(e.asMkplError())
val response = context.toTransportAd()
apiV2ResponseSerialize(response).also {
basicPublish(exchangeConfig.exchange, exchangeConfig.keyOut, null, it.toByteArray())
}
}
}
Loading

0 comments on commit f688eab

Please sign in to comment.