From bebd3aea6495495139f4d918dcb2ecafc027a8ef Mon Sep 17 00:00:00 2001 From: james Date: Tue, 15 Aug 2023 00:57:07 +0100 Subject: [PATCH] Adding lq --- .../kotlin/uk/gibby/driver/Surreal.kt | 48 ++++++++++++++++--- .../gibby/driver/rpc/functions/LiveQuery.kt | 18 +++++++ .../gibby/driver/rpc/model/LiveQueryAction.kt | 7 +++ .../uk/gibby/driver/rpc/model/RpcResponse.kt | 5 ++ .../driver/rpc/model/RpcResponseSerializer.kt | 8 +++- src/commonTest/kotlin/LiveQueryTest.kt | 27 +++++++++++ 6 files changed, 106 insertions(+), 7 deletions(-) create mode 100644 src/commonMain/kotlin/uk/gibby/driver/rpc/functions/LiveQuery.kt create mode 100644 src/commonMain/kotlin/uk/gibby/driver/rpc/model/LiveQueryAction.kt create mode 100644 src/commonTest/kotlin/LiveQueryTest.kt diff --git a/src/commonMain/kotlin/uk/gibby/driver/Surreal.kt b/src/commonMain/kotlin/uk/gibby/driver/Surreal.kt index eef928b..825af94 100644 --- a/src/commonMain/kotlin/uk/gibby/driver/Surreal.kt +++ b/src/commonMain/kotlin/uk/gibby/driver/Surreal.kt @@ -12,6 +12,7 @@ import uk.gibby.driver.rpc.model.RpcRequest import uk.gibby.driver.rpc.model.RpcResponse import uk.gibby.driver.rpc.model.RpcResponseSerializer import kotlinx.serialization.encodeToString +import uk.gibby.driver.rpc.model.LiveQueryAction /** * SurrealDB driver @@ -25,6 +26,7 @@ class Surreal(private val host: String, private val port: Int = 8000) { private var count = 0L private var connection: DefaultClientWebSocketSession? = null private val requests = ConcurrentMap>() + private val liveQueries = ConcurrentMap>() private val context = CoroutineScope(Dispatchers.Default) /** @@ -47,13 +49,36 @@ class Surreal(private val host: String, private val port: Int = 8000) { requests.forEach { (_, r) -> r.cancel(CancellationException("Failed to decode incoming response: ${it.readText()}\n${e.message}"))} throw e } - val request = requests[response.id] - if (request == null) requests.forEach { (_, r) -> r.cancel(CancellationException("Received a request with an unknown id: ${response.id} body: $response"))} - else when(response) { - is RpcResponse.Success -> request.send(response.result) - is RpcResponse.Error -> request.cancel(CancellationException("SurrealDB responded with an error: '${response.error}'")) + if(response is RpcResponse.Notification) { + val action = response.result + val liveQuery = liveQueries[action.id] + if (liveQuery != null) { + liveQuery.send(response.result) + } + else { + println("Couldn't find live query with id ${action.id}") + requests.forEach { + (_, r) -> r.cancel(CancellationException("Received a request with an unknown id: ${response.id} body: $response")) + } + } + } + if(response.id != null) { + val request = requests[response.id] + if (request != null) { + when(response) { + is RpcResponse.Success -> request.send(response.result) + is RpcResponse.Error -> request.cancel(CancellationException("SurrealDB responded with an error: '${response.error}'")) + else -> TODO() + } + requests.remove(response.id) + } + else { + if (response.id == null) println("SurrealDB: Received a response with no id: $response") + else requests.forEach { + (_, r) -> r.cancel(CancellationException("Received a request with an unknown id: ${response.id} body: $response")) + } + } } - requests.remove(response.id) } } } @@ -68,4 +93,15 @@ class Surreal(private val host: String, private val port: Int = 8000) { return channel.receive() } + internal fun subscribe(liveQueryId: String): Channel { + val channel = Channel() + println("Live query $liveQueryId created") + liveQueries[liveQueryId] = channel.apply { + invokeOnClose { + println("Live query $liveQueryId closed") + liveQueries.remove(liveQueryId) + } + } + return channel + } } \ No newline at end of file diff --git a/src/commonMain/kotlin/uk/gibby/driver/rpc/functions/LiveQuery.kt b/src/commonMain/kotlin/uk/gibby/driver/rpc/functions/LiveQuery.kt new file mode 100644 index 0000000..fc2306b --- /dev/null +++ b/src/commonMain/kotlin/uk/gibby/driver/rpc/functions/LiveQuery.kt @@ -0,0 +1,18 @@ +package uk.gibby.driver.rpc.functions + +import kotlinx.coroutines.channels.consumeEach +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.consumeAsFlow +import kotlinx.serialization.json.JsonElement +import kotlinx.serialization.json.add +import kotlinx.serialization.json.buildJsonArray +import kotlinx.serialization.json.decodeFromJsonElement +import uk.gibby.driver.Surreal +import uk.gibby.driver.rpc.model.LiveQueryAction +import uk.gibby.driver.surrealJson + +suspend fun Surreal.live(table: String): Flow { + val response = sendRequest("live", buildJsonArray{ add(table) }) + val id: String = surrealJson.decodeFromJsonElement(response) + return subscribe(id).consumeAsFlow() +} \ No newline at end of file diff --git a/src/commonMain/kotlin/uk/gibby/driver/rpc/model/LiveQueryAction.kt b/src/commonMain/kotlin/uk/gibby/driver/rpc/model/LiveQueryAction.kt new file mode 100644 index 0000000..3c186e1 --- /dev/null +++ b/src/commonMain/kotlin/uk/gibby/driver/rpc/model/LiveQueryAction.kt @@ -0,0 +1,7 @@ +package uk.gibby.driver.rpc.model + +import kotlinx.serialization.Serializable +import kotlinx.serialization.json.JsonObject + +@Serializable +data class LiveQueryAction(val action: String, val id: String, val result: JsonObject? = null) \ No newline at end of file diff --git a/src/commonMain/kotlin/uk/gibby/driver/rpc/model/RpcResponse.kt b/src/commonMain/kotlin/uk/gibby/driver/rpc/model/RpcResponse.kt index 2b8deae..6e4bc3a 100644 --- a/src/commonMain/kotlin/uk/gibby/driver/rpc/model/RpcResponse.kt +++ b/src/commonMain/kotlin/uk/gibby/driver/rpc/model/RpcResponse.kt @@ -13,5 +13,10 @@ sealed class RpcResponse { @Serializable data class Error(override val id: String, val error: JsonElement): RpcResponse() + + @Serializable + data class Notification(val result: LiveQueryAction): RpcResponse() { + override val id: String? = null + } } diff --git a/src/commonMain/kotlin/uk/gibby/driver/rpc/model/RpcResponseSerializer.kt b/src/commonMain/kotlin/uk/gibby/driver/rpc/model/RpcResponseSerializer.kt index 3b7872b..4f44719 100644 --- a/src/commonMain/kotlin/uk/gibby/driver/rpc/model/RpcResponseSerializer.kt +++ b/src/commonMain/kotlin/uk/gibby/driver/rpc/model/RpcResponseSerializer.kt @@ -11,6 +11,7 @@ import kotlinx.serialization.encoding.Decoder import kotlinx.serialization.encoding.Encoder import kotlinx.serialization.encoding.decodeStructure import kotlinx.serialization.json.JsonElement +import kotlinx.serialization.json.decodeFromJsonElement import kotlinx.serialization.json.encodeToJsonElement object RpcResponseSerializer: KSerializer { @@ -35,7 +36,12 @@ object RpcResponseSerializer: KSerializer { } return if(error != null) { RpcResponse.Error(id!!, surrealJson.encodeToJsonElement(error)) - } else RpcResponse.Success(id!!, result ?: surrealJson.encodeToJsonElement(String.serializer().nullable, null)) + } else if(id != null) { + RpcResponse.Success(id!!, result ?: surrealJson.encodeToJsonElement(String.serializer().nullable, null)) + } + else { + RpcResponse.Notification(surrealJson.decodeFromJsonElement(result!!)) + } } override fun serialize(encoder: Encoder, value: RpcResponse) { diff --git a/src/commonTest/kotlin/LiveQueryTest.kt b/src/commonTest/kotlin/LiveQueryTest.kt new file mode 100644 index 0000000..c815b47 --- /dev/null +++ b/src/commonTest/kotlin/LiveQueryTest.kt @@ -0,0 +1,27 @@ +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.runTest +import kotlinx.serialization.json.buildJsonObject +import kotlinx.serialization.json.put +import uk.gibby.driver.Surreal +import uk.gibby.driver.rpc.functions.* +import uk.gibby.driver.rpc.model.bind +import utils.cleanDatabase +import kotlin.test.Test + +class LiveQueryTest { + @Test + fun testLiveQuery() = runTest { + cleanDatabase() + val connection = Surreal("localhost", 8000) + connection.connect() + connection.signin("root", "root") + connection.use("test", "test") + val incoming = connection.live("test") + connection.create("test").content(buildJsonObject { put("thing", "thing") }) + println(incoming.first()) + } +} \ No newline at end of file