Skip to content

Commit

Permalink
Adding lq
Browse files Browse the repository at this point in the history
  • Loading branch information
mnbjhu committed Aug 14, 2023
1 parent eb80e1a commit bebd3ae
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 7 deletions.
48 changes: 42 additions & 6 deletions src/commonMain/kotlin/uk/gibby/driver/Surreal.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import uk.gibby.driver.rpc.model.RpcRequest
import uk.gibby.driver.rpc.model.RpcResponse
import uk.gibby.driver.rpc.model.RpcResponseSerializer
import kotlinx.serialization.encodeToString
import uk.gibby.driver.rpc.model.LiveQueryAction

/**
* SurrealDB driver
Expand All @@ -25,6 +26,7 @@ class Surreal(private val host: String, private val port: Int = 8000) {
private var count = 0L
private var connection: DefaultClientWebSocketSession? = null
private val requests = ConcurrentMap<String, Channel<JsonElement>>()
private val liveQueries = ConcurrentMap<String, Channel<LiveQueryAction>>()
private val context = CoroutineScope(Dispatchers.Default)

/**
Expand All @@ -47,13 +49,36 @@ class Surreal(private val host: String, private val port: Int = 8000) {
requests.forEach { (_, r) -> r.cancel(CancellationException("Failed to decode incoming response: ${it.readText()}\n${e.message}"))}
throw e
}
val request = requests[response.id]
if (request == null) requests.forEach { (_, r) -> r.cancel(CancellationException("Received a request with an unknown id: ${response.id} body: $response"))}
else when(response) {
is RpcResponse.Success -> request.send(response.result)
is RpcResponse.Error -> request.cancel(CancellationException("SurrealDB responded with an error: '${response.error}'"))
if(response is RpcResponse.Notification) {
val action = response.result
val liveQuery = liveQueries[action.id]
if (liveQuery != null) {
liveQuery.send(response.result)
}
else {
println("Couldn't find live query with id ${action.id}")
requests.forEach {
(_, r) -> r.cancel(CancellationException("Received a request with an unknown id: ${response.id} body: $response"))
}
}
}
if(response.id != null) {
val request = requests[response.id]
if (request != null) {
when(response) {
is RpcResponse.Success -> request.send(response.result)
is RpcResponse.Error -> request.cancel(CancellationException("SurrealDB responded with an error: '${response.error}'"))
else -> TODO()
}
requests.remove(response.id)
}
else {
if (response.id == null) println("SurrealDB: Received a response with no id: $response")
else requests.forEach {
(_, r) -> r.cancel(CancellationException("Received a request with an unknown id: ${response.id} body: $response"))
}
}
}
requests.remove(response.id)
}
}
}
Expand All @@ -68,4 +93,15 @@ class Surreal(private val host: String, private val port: Int = 8000) {
return channel.receive()
}

internal fun subscribe(liveQueryId: String): Channel<LiveQueryAction> {
val channel = Channel<LiveQueryAction>()
println("Live query $liveQueryId created")
liveQueries[liveQueryId] = channel.apply {
invokeOnClose {
println("Live query $liveQueryId closed")
liveQueries.remove(liveQueryId)
}
}
return channel
}
}
18 changes: 18 additions & 0 deletions src/commonMain/kotlin/uk/gibby/driver/rpc/functions/LiveQuery.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package uk.gibby.driver.rpc.functions

import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.serialization.json.JsonElement
import kotlinx.serialization.json.add
import kotlinx.serialization.json.buildJsonArray
import kotlinx.serialization.json.decodeFromJsonElement
import uk.gibby.driver.Surreal
import uk.gibby.driver.rpc.model.LiveQueryAction
import uk.gibby.driver.surrealJson

suspend fun Surreal.live(table: String): Flow<LiveQueryAction> {
val response = sendRequest("live", buildJsonArray{ add(table) })
val id: String = surrealJson.decodeFromJsonElement(response)
return subscribe(id).consumeAsFlow()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package uk.gibby.driver.rpc.model

import kotlinx.serialization.Serializable
import kotlinx.serialization.json.JsonObject

@Serializable
data class LiveQueryAction(val action: String, val id: String, val result: JsonObject? = null)
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,10 @@ sealed class RpcResponse {

@Serializable
data class Error(override val id: String, val error: JsonElement): RpcResponse()

@Serializable
data class Notification(val result: LiveQueryAction): RpcResponse() {
override val id: String? = null
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import kotlinx.serialization.encoding.Decoder
import kotlinx.serialization.encoding.Encoder
import kotlinx.serialization.encoding.decodeStructure
import kotlinx.serialization.json.JsonElement
import kotlinx.serialization.json.decodeFromJsonElement
import kotlinx.serialization.json.encodeToJsonElement

object RpcResponseSerializer: KSerializer<RpcResponse> {
Expand All @@ -35,7 +36,12 @@ object RpcResponseSerializer: KSerializer<RpcResponse> {
}
return if(error != null) {
RpcResponse.Error(id!!, surrealJson.encodeToJsonElement(error))
} else RpcResponse.Success(id!!, result ?: surrealJson.encodeToJsonElement(String.serializer().nullable, null))
} else if(id != null) {
RpcResponse.Success(id!!, result ?: surrealJson.encodeToJsonElement(String.serializer().nullable, null))
}
else {
RpcResponse.Notification(surrealJson.decodeFromJsonElement(result!!))
}
}

override fun serialize(encoder: Encoder, value: RpcResponse) {
Expand Down
27 changes: 27 additions & 0 deletions src/commonTest/kotlin/LiveQueryTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runTest
import kotlinx.serialization.json.buildJsonObject
import kotlinx.serialization.json.put
import uk.gibby.driver.Surreal
import uk.gibby.driver.rpc.functions.*
import uk.gibby.driver.rpc.model.bind
import utils.cleanDatabase
import kotlin.test.Test

class LiveQueryTest {
@Test
fun testLiveQuery() = runTest {
cleanDatabase()
val connection = Surreal("localhost", 8000)
connection.connect()
connection.signin("root", "root")
connection.use("test", "test")
val incoming = connection.live("test")
connection.create("test").content(buildJsonObject { put("thing", "thing") })
println(incoming.first())
}
}

0 comments on commit bebd3ae

Please sign in to comment.