Skip to content

Commit

Permalink
M5l3 WebSocket
Browse files Browse the repository at this point in the history
  • Loading branch information
svok authored and evgnep committed Aug 20, 2024
1 parent 74f4c00 commit 85cf1a7
Show file tree
Hide file tree
Showing 33 changed files with 934 additions and 4 deletions.
1 change: 1 addition & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ ktor-server-cors = { module = "io.ktor:ktor-server-cors", version.ref = "ktor" }
ktor-server-yaml = { module = "io.ktor:ktor-server-config-yaml", version.ref = "ktor" }
ktor-server-negotiation = { module = "io.ktor:ktor-server-content-negotiation", version.ref = "ktor" }
ktor-server-calllogging = { module = "io.ktor:ktor-server-call-logging", version.ref = "ktor" }
ktor-server-websocket = { module = "io.ktor:ktor-server-websockets", version.ref = "ktor" }
ktor-server-test = { module = "io.ktor:ktor-server-test-host", version.ref = "ktor" }

# Spring
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ fun MkplContext.toTransportAd(): IResponse = when (val cmd = command) {
MkplCommand.DELETE -> toTransportDelete()
MkplCommand.SEARCH -> toTransportSearch()
MkplCommand.OFFERS -> toTransportOffers()
MkplCommand.INIT -> toTransportInit()
MkplCommand.FINISH -> object: IResponse {
override val responseType: String? = null
override val result: ResponseResult? = null
override val errors: List<Error>? = null
}
MkplCommand.NONE -> throw UnknownMkplCommand(cmd)
}

Expand Down Expand Up @@ -51,6 +57,11 @@ fun MkplContext.toTransportOffers() = AdOffersResponse(
ads = adsResponse.toTransportAd()
)

fun MkplContext.toTransportInit() = AdInitResponse(
result = state.toResult(),
errors = errors.toTransportErrors(),
)

fun List<MkplAd>.toTransportAd(): List<AdResponseObject>? = this
.map { it.toTransportAd() }
.toList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ fun MkplContext.toTransportAd(): IResponse = when (val cmd = command) {
MkplCommand.DELETE -> toTransportDelete()
MkplCommand.SEARCH -> toTransportSearch()
MkplCommand.OFFERS -> toTransportOffers()
MkplCommand.INIT -> toTransportInit()
MkplCommand.FINISH -> throw UnknownMkplCommand(cmd)
MkplCommand.NONE -> throw UnknownMkplCommand(cmd)
}

Expand Down Expand Up @@ -51,6 +53,11 @@ fun MkplContext.toTransportOffers() = AdOffersResponse(
ads = adsResponse.toTransportAd()
)

fun MkplContext.toTransportInit() = AdInitResponse(
result = state.toResult(),
errors = errors.toTransportErrors(),
)

fun List<MkplAd>.toTransportAd(): List<AdResponseObject>? = this
.map { it.toTransportAd() }
.toList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ suspend inline fun <T> IMkplAppSettings.controllerHelper(
logger.error(
msg = "Request $logId failed for ${clazz.simpleName}",
marker = "BIZ",
data = ctx.toLog(logId)
data = ctx.toLog(logId),
e = e,
)
ctx.state = MkplState.FAILING
ctx.errors.add(e.asMkplError())
Expand Down
1 change: 1 addition & 0 deletions ok-marketplace-be/ok-marketplace-app-ktor/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ kotlin {
implementation(libs.ktor.server.negotiation)
implementation(libs.ktor.server.headers.response)
implementation(libs.ktor.server.headers.caching)
implementation(libs.ktor.server.websocket)

// // Для того, чтоб получать содержимое запроса более одного раза
// В Application.main добавить `install(DoubleReceive)`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import io.ktor.server.plugins.contentnegotiation.*
import io.ktor.server.plugins.cors.routing.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import io.ktor.server.websocket.*
import ru.otus.otuskotlin.marketplace.api.v2.apiV2Mapper
import ru.otus.otuskotlin.marketplace.app.ktor.v2.v2Ad
import ru.otus.otuskotlin.marketplace.app.ktor.plugins.initAppSettings
import ru.otus.otuskotlin.marketplace.app.ktor.v2.wsHandlerV2

fun Application.module(
appSettings: MkplAppSettings = initAppSettings()
Expand All @@ -28,6 +30,7 @@ fun Application.module(
*/
anyHost()
}
install(WebSockets)

routing {
get("/") {
Expand All @@ -38,6 +41,9 @@ fun Application.module(
json(apiV2Mapper)
}
v2Ad(appSettings)
webSocket("/ws") {
wsHandlerV2(appSettings)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package ru.otus.otuskotlin.marketplace.app.ktor.base

import ru.otus.otuskotlin.marketplace.common.ws.IMkplWsSession
import ru.otus.otuskotlin.marketplace.common.ws.IMkplWsSessionRepo

class KtorWsSessionRepo: IMkplWsSessionRepo {
private val sessions: MutableSet<IMkplWsSession> = mutableSetOf()
override fun add(session: IMkplWsSession) {
sessions.add(session)
}

override fun clearAll() {
sessions.clear()
}

override fun remove(session: IMkplWsSession) {
sessions.remove(session)
}

override suspend fun <T> sendAll(obj: T) {
sessions.forEach { it.send(obj) }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package ru.otus.otuskotlin.marketplace.app.ktor.base

import io.ktor.websocket.*
import ru.otus.otuskotlin.marketplace.api.v2.apiV2ResponseSerialize
import ru.otus.otuskotlin.marketplace.api.v2.models.IResponse
import ru.otus.otuskotlin.marketplace.common.ws.IMkplWsSession

data class KtorWsSessionV2(
private val session: WebSocketSession
) : IMkplWsSession {
override suspend fun <T> send(obj: T) {
require(obj is IResponse)
session.send(Frame.Text(apiV2ResponseSerialize(obj)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package ru.otus.otuskotlin.marketplace.app.ktor.plugins

import io.ktor.server.application.*
import ru.otus.otuskotlin.marketplace.app.ktor.MkplAppSettings
import ru.otus.otuskotlin.marketplace.app.ktor.base.KtorWsSessionRepo
import ru.otus.otuskotlin.marketplace.biz.MkplAdProcessor
import ru.otus.otuskotlin.marketplace.common.MkplCorSettings

fun Application.initAppSettings(): MkplAppSettings {
val corSettings = MkplCorSettings(
loggerProvider = getLoggerProviderConf(),
wsSessions = KtorWsSessionRepo(),
)
return MkplAppSettings(
appUrls = environment.config.propertyOrNull("ktor.urls")?.getList() ?: emptyList(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package ru.otus.otuskotlin.marketplace.app.ktor.v2

import io.ktor.websocket.*
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.receiveAsFlow
import ru.otus.otuskotlin.marketplace.api.v2.apiV2RequestDeserialize
import ru.otus.otuskotlin.marketplace.api.v2.apiV2ResponseSerialize
import ru.otus.otuskotlin.marketplace.api.v2.mappers.fromTransport
import ru.otus.otuskotlin.marketplace.api.v2.mappers.toTransportAd
import ru.otus.otuskotlin.marketplace.api.v2.mappers.toTransportInit
import ru.otus.otuskotlin.marketplace.api.v2.models.IRequest
import ru.otus.otuskotlin.marketplace.app.common.controllerHelper
import ru.otus.otuskotlin.marketplace.app.ktor.MkplAppSettings
import ru.otus.otuskotlin.marketplace.app.ktor.base.KtorWsSessionV2
import ru.otus.otuskotlin.marketplace.common.models.MkplCommand
import kotlin.reflect.KClass

private val clWsV2: KClass<*> = WebSocketSession::wsHandlerV2::class
suspend fun WebSocketSession.wsHandlerV2(appSettings: MkplAppSettings) = with(KtorWsSessionV2(this)) {
// Обновление реестра сессий
val sessions = appSettings.corSettings.wsSessions
sessions.add(this)

// Handle init request
appSettings.controllerHelper(
{
command = MkplCommand.INIT
wsSession = this@with
},
{ outgoing.send(Frame.Text(apiV2ResponseSerialize(toTransportInit()))) },
clWsV2,
"wsV2-init"
)

// Handle flow
incoming.receiveAsFlow()
.mapNotNull { it ->
val frame = it as? Frame.Text ?: return@mapNotNull
// Handle without flow destruction
try {
appSettings.controllerHelper(
{
fromTransport(apiV2RequestDeserialize<IRequest>(frame.readText()))
wsSession = this@with
},
{
val result = apiV2ResponseSerialize(toTransportAd())
// If change request, response is sent to everyone
outgoing.send(Frame.Text(result))
},
clWsV2,
"wsV2-handle"
)

} catch (_: ClosedReceiveChannelException) {
sessions.remove(this@with)
} catch (e: Throwable) {
println("FFF")
}
}
.onCompletion {
// Handle finish request
appSettings.controllerHelper(
{
command = MkplCommand.FINISH
wsSession = this@with
},
{ },
clWsV2,
"wsV2-finish"
)
sessions.remove(this@with)
}
.collect()
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import io.ktor.server.plugins.callloging.*
import io.ktor.server.plugins.contentnegotiation.*
import io.ktor.server.plugins.defaultheaders.*
import io.ktor.server.routing.*
import io.ktor.server.websocket.*
import org.slf4j.event.Level
import ru.otus.otuskotlin.marketplace.api.v1.apiV1Mapper
import ru.otus.otuskotlin.marketplace.app.ktor.v1.v1Ad
import ru.otus.otuskotlin.marketplace.app.ktor.plugins.initAppSettings
import ru.otus.otuskotlin.marketplace.app.ktor.v1.wsHandlerV1

// function with config (application.conf)
//fun main(args: Array<String>): Unit = io.ktor.server.cio.EngineMain.main(args)
Expand All @@ -37,6 +39,9 @@ fun Application.moduleJvm(
}
}
v1Ad(appSettings)
webSocket("/ws") {
wsHandlerV1(appSettings)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package ru.otus.otuskotlin.marketplace.app.ktor.base

import io.ktor.websocket.*
import ru.otus.otuskotlin.marketplace.api.v1.apiV1ResponseSerialize
import ru.otus.otuskotlin.marketplace.api.v1.models.IResponse
import ru.otus.otuskotlin.marketplace.common.ws.IMkplWsSession

data class KtorWsSessionV1(
private val session: WebSocketSession
) : IMkplWsSession {
override suspend fun <T> send(obj: T) {
require(obj is IResponse)
session.send(Frame.Text(apiV1ResponseSerialize(obj)))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package ru.otus.otuskotlin.marketplace.app.ktor.v1

import com.fasterxml.jackson.module.kotlin.readValue
import io.ktor.websocket.*
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.receiveAsFlow
import ru.otus.otuskotlin.marketplace.api.v1.apiV1Mapper
import ru.otus.otuskotlin.marketplace.api.v1.models.IRequest
import ru.otus.otuskotlin.marketplace.app.common.controllerHelper
import ru.otus.otuskotlin.marketplace.app.ktor.MkplAppSettings
import ru.otus.otuskotlin.marketplace.app.ktor.base.KtorWsSessionV1
import ru.otus.otuskotlin.marketplace.common.models.MkplCommand
import ru.otus.otuskotlin.marketplace.mappers.v1.fromTransport
import ru.otus.otuskotlin.marketplace.mappers.v1.toTransportAd
import ru.otus.otuskotlin.marketplace.mappers.v1.toTransportInit
import kotlin.reflect.KClass

private val clWsV1: KClass<*> = WebSocketSession::wsHandlerV1::class
suspend fun WebSocketSession.wsHandlerV1(appSettings: MkplAppSettings) = with(KtorWsSessionV1(this)) {
val sessions = appSettings.corSettings.wsSessions
sessions.add(this)

// Handle init request
appSettings.controllerHelper(
{
command = MkplCommand.INIT
wsSession = this@with
},
{ outgoing.send(Frame.Text(apiV1Mapper.writeValueAsString(toTransportInit()))) },
clWsV1,
"wsV1-init"
)

// Handle flow
incoming.receiveAsFlow().mapNotNull {
val frame = it as? Frame.Text ?: return@mapNotNull
// Handle without flow destruction
try {
appSettings.controllerHelper(
{
fromTransport(apiV1Mapper.readValue<IRequest>(frame.readText()))
wsSession = this@with
},
{
val result = apiV1Mapper.writeValueAsString(toTransportAd())
// If change request, response is sent to everyone
outgoing.send(Frame.Text(result))
},
clWsV1,
"wsV1-handle"
)

} catch (_: ClosedReceiveChannelException) {
sessions.remove(this@with)
} finally {
// Handle finish request
appSettings.controllerHelper(
{
command = MkplCommand.FINISH
wsSession = this@with
},
{ },
clWsV1,
"wsV1-finish"
)
sessions.remove(this@with)
}
}.collect()
}
Loading

0 comments on commit 85cf1a7

Please sign in to comment.