Skip to content

Commit

Permalink
Basic lq working
Browse files Browse the repository at this point in the history
  • Loading branch information
mnbjhu committed Aug 22, 2023
1 parent bebd3ae commit b844dfb
Show file tree
Hide file tree
Showing 9 changed files with 355 additions and 80 deletions.
110 changes: 63 additions & 47 deletions src/commonMain/kotlin/uk/gibby/driver/Surreal.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@ import io.ktor.util.collections.*
import io.ktor.websocket.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.flow.*
import kotlinx.serialization.json.*
import uk.gibby.driver.rpc.model.RpcRequest
import uk.gibby.driver.rpc.model.RpcResponse
import uk.gibby.driver.rpc.model.RpcResponseSerializer
import kotlinx.serialization.encodeToString
import uk.gibby.driver.rpc.model.LiveQueryAction
import uk.gibby.driver.rpc.exception.LiveQueryKilledException
import uk.gibby.driver.rpc.functions.kill
import uk.gibby.driver.rpc.model.*

/**
* SurrealDB driver
Expand All @@ -26,7 +24,7 @@ class Surreal(private val host: String, private val port: Int = 8000) {
private var count = 0L
private var connection: DefaultClientWebSocketSession? = null
private val requests = ConcurrentMap<String, Channel<JsonElement>>()
private val liveQueries = ConcurrentMap<String, Channel<LiveQueryAction>>()
private val liveQueries = ConcurrentMap<String, Channel<LiveQueryAction<JsonElement>>>()
private val context = CoroutineScope(Dispatchers.Default)

/**
Expand All @@ -46,44 +44,53 @@ class Surreal(private val host: String, private val port: Int = 8000) {
val response = try {
surrealJson.decodeFromString(RpcResponseSerializer, it.readText())
} catch (e: Exception) {
// In theory this could be an error for any request, so we cancel all of them
requests.forEach { (_, r) -> r.cancel(CancellationException("Failed to decode incoming response: ${it.readText()}\n${e.message}"))}
throw e
}
if(response is RpcResponse.Notification) {
val action = response.result
val liveQuery = liveQueries[action.id]
if (liveQuery != null) {
liveQuery.send(response.result)
}
else {
println("Couldn't find live query with id ${action.id}")
requests.forEach {
(_, r) -> r.cancel(CancellationException("Received a request with an unknown id: ${response.id} body: $response"))
}
}
}
if(response.id != null) {
val request = requests[response.id]
if (request != null) {
when(response) {
is RpcResponse.Success -> request.send(response.result)
is RpcResponse.Error -> request.cancel(CancellationException("SurrealDB responded with an error: '${response.error}'"))
else -> TODO()
}
requests.remove(response.id)
}
else {
if (response.id == null) println("SurrealDB: Received a response with no id: $response")
else requests.forEach {
(_, r) -> r.cancel(CancellationException("Received a request with an unknown id: ${response.id} body: $response"))
}
}
when(response) {
is RpcResponse.Success -> handleSuccess(response)
is RpcResponse.Error -> handleError(response)
is RpcResponse.Notification -> handleNotification(response)
}
}
}
}
}

private suspend fun handleSuccess(response: RpcResponse.Success) {
val request = requests[response.id]
if (request != null) {
request.send(response.result)
} else {
requests.forEach {
// In theory this could be an error for any request, so we cancel all of them
(_, r) ->
r.cancel(CancellationException("Received a request with an unknown id: ${response.id} body: $response"))
}
}
}

private fun handleError(response: RpcResponse.Error) {
val request = requests[response.id]
if (request != null) {
request.cancel(CancellationException("SurrealDB responded with an error: '${response.error}'"))
} else {
requests.forEach {
// In theory this could be an error for any request, so we cancel all of them
(_, r) ->
r.cancel(CancellationException("Received a request with an unknown id: ${response.id} body: $response"))
}
}
requests.remove(response.id)
}

private suspend fun handleNotification(response: RpcResponse.Notification) {
val action = response.result
val liveQuery = liveQueries.getOrPut(action.id) { Channel() }
context.launch { liveQuery.send(response.result) }
}

internal suspend fun sendRequest(method: String, params: JsonArray): JsonElement {
val id = count++.toString()
val request = RpcRequest(id, method, params)
Expand All @@ -93,15 +100,24 @@ class Surreal(private val host: String, private val port: Int = 8000) {
return channel.receive()
}

internal fun subscribe(liveQueryId: String): Channel<LiveQueryAction> {
val channel = Channel<LiveQueryAction>()
println("Live query $liveQueryId created")
liveQueries[liveQueryId] = channel.apply {
invokeOnClose {
println("Live query $liveQueryId closed")
liveQueries.remove(liveQueryId)
}
}
return channel
fun subscribeAsJson(liveQueryId: String): Flow<LiveQueryAction<JsonElement>> {
val channel = liveQueries.getOrPut(liveQueryId) { Channel() }
return channel.receiveAsFlow()
}

inline fun <reified T> subscribe(liveQueryId: String): Flow<LiveQueryAction<T>> {
return subscribeAsJson(liveQueryId).map { it.asType() }
}

fun unsubscribe(liveQueryId: String) {
val channel = liveQueries[liveQueryId]
channel?.cancel(LiveQueryKilledException)
liveQueries.remove(liveQueryId)
}

internal fun triggerKill(liveQueryId: String) {
context.launch { kill(liveQueryId) }
}
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package uk.gibby.driver.rpc.exception

import kotlinx.coroutines.CancellationException

object LiveQueryKilledException: CancellationException("Live query has been killed")
10 changes: 10 additions & 0 deletions src/commonMain/kotlin/uk/gibby/driver/rpc/functions/Kill.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package uk.gibby.driver.rpc.functions

import kotlinx.serialization.json.add
import kotlinx.serialization.json.buildJsonArray
import uk.gibby.driver.Surreal

suspend fun Surreal.kill(liveQueryId: String) {
sendRequest("kill", buildJsonArray { add(liveQueryId) })
}

56 changes: 56 additions & 0 deletions src/commonMain/kotlin/uk/gibby/driver/rpc/functions/Live.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package uk.gibby.driver.rpc.functions

import io.ktor.utils.io.core.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.serialization.json.JsonElement
import kotlinx.serialization.json.add
import kotlinx.serialization.json.buildJsonArray
import kotlinx.serialization.json.decodeFromJsonElement
import uk.gibby.driver.Surreal
import uk.gibby.driver.rpc.model.LiveQueryAction
import uk.gibby.driver.rpc.model.asType
import uk.gibby.driver.surrealJson
import kotlin.jvm.JvmName


suspend fun Surreal.live(table: String): String {
val response = sendRequest("live", buildJsonArray { add(table) })
return surrealJson.decodeFromJsonElement(response)
}


@JvmName("observeJson")
suspend fun Surreal.observeAsJson(liveQueryId: String): LiveQueryFlow<JsonElement> {
val id = live(liveQueryId)
return LiveQueryFlow(
flow = subscribe(id),
id = id,
connection = this
)
}

suspend inline fun <reified T>Surreal.observe(table: String): LiveQueryFlow<T> {
val jsonFlow = observeAsJson(table)
return jsonFlow.map { it.asType<T>() }
}

class LiveQueryFlow<T>(
private val flow: Flow<LiveQueryAction<T>>,
val id: String,
private val connection: Surreal
): Flow<LiveQueryAction<T>> by flow, Closeable {
override fun close() {
connection.unsubscribe(id)
connection.triggerKill(id)
}

fun <R>map(transform: (LiveQueryAction<T>) -> LiveQueryAction<R>): LiveQueryFlow<R> {
return LiveQueryFlow(
flow = flow.map { transform(it) },
id = id,
connection = connection
)
}
}
18 changes: 0 additions & 18 deletions src/commonMain/kotlin/uk/gibby/driver/rpc/functions/LiveQuery.kt

This file was deleted.

67 changes: 63 additions & 4 deletions src/commonMain/kotlin/uk/gibby/driver/rpc/model/LiveQueryAction.kt
Original file line number Diff line number Diff line change
@@ -1,7 +1,66 @@
package uk.gibby.driver.rpc.model

import kotlinx.serialization.Serializable
import kotlinx.serialization.json.JsonObject
import kotlinx.serialization.*
import kotlinx.serialization.builtins.serializer
import kotlinx.serialization.descriptors.SerialDescriptor
import kotlinx.serialization.descriptors.buildClassSerialDescriptor
import kotlinx.serialization.encoding.CompositeDecoder
import kotlinx.serialization.encoding.Decoder
import kotlinx.serialization.encoding.Encoder
import kotlinx.serialization.json.JsonElement
import kotlinx.serialization.json.decodeFromJsonElement
import uk.gibby.driver.surrealJson

@Serializable
data class LiveQueryAction(val action: String, val id: String, val result: JsonObject? = null)
@Serializable(with = LiveQueryActionSerializer::class)
sealed class LiveQueryAction<out T>(val id: String) {
class Create<T>(id: String, val result: T): LiveQueryAction<T>(id)
class Update<T>(id: String, val result: T): LiveQueryAction<T>(id)
class Delete(id: String, val deletedId: String): LiveQueryAction<Nothing>(id)

}


inline fun <reified T>LiveQueryAction<JsonElement>.asType(): LiveQueryAction<T> {
return when(this) {
is LiveQueryAction.Delete -> this
is LiveQueryAction.Create -> LiveQueryAction.Create(id, surrealJson.decodeFromJsonElement(result))
is LiveQueryAction.Update -> LiveQueryAction.Update(id, surrealJson.decodeFromJsonElement(result))
}
}

class LiveQueryActionSerializer<T: Any>(private val resultSerializer: KSerializer<T>): KSerializer<LiveQueryAction<T>> {
override val descriptor: SerialDescriptor = buildClassSerialDescriptor("LiveQueryAction") {
element("action", String.serializer().descriptor)
element("id", String.serializer().descriptor)
element("result", resultSerializer.descriptor)
}

override fun deserialize(decoder: Decoder): LiveQueryAction<T> {
val input = decoder.beginStructure(descriptor)
var action: String? = null
var id: String? = null
var result: Thing<T>? = null
loop@ while (true) {
when (val i = input.decodeElementIndex(descriptor)) {
CompositeDecoder.DECODE_DONE -> break@loop
0 -> action = input.decodeStringElement(descriptor, i)
1 -> id = input.decodeStringElement(descriptor, i)
2 -> result = input.decodeSerializableElement(descriptor, i, ThingSerializer(resultSerializer))
else -> throw SerializationException("Unknown index $i")
}
}
input.endStructure(descriptor)
if (action == null || id == null || result == null) throw SerializationException("Missing fields")
return when(action) {
"CREATE" -> LiveQueryAction.Create(id, (result as Thing.Record<T>).result)
"UPDATE" -> LiveQueryAction.Update(id, (result as Thing.Record<T>).result)
"DELETE" -> LiveQueryAction.Delete(id, (result as Thing.Reference).id)
else -> throw SerializationException("Unknown action $action")
}
}

override fun serialize(encoder: Encoder, value: LiveQueryAction<T>) {
TODO("Not yet implemented")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ sealed class RpcResponse {
data class Error(override val id: String, val error: JsonElement): RpcResponse()

@Serializable
data class Notification(val result: LiveQueryAction): RpcResponse() {
data class Notification(val result: LiveQueryAction<JsonElement>): RpcResponse() {
override val id: String? = null
}
}
Expand Down
21 changes: 21 additions & 0 deletions src/commonTest/kotlin/KillTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import kotlinx.coroutines.test.runTest
import uk.gibby.driver.Surreal
import uk.gibby.driver.rpc.functions.kill
import uk.gibby.driver.rpc.functions.live
import uk.gibby.driver.rpc.functions.signin
import uk.gibby.driver.rpc.functions.use
import utils.cleanDatabase
import kotlin.test.Test

class KillTest {
@Test
fun testKill() = runTest {
cleanDatabase()
val connection = Surreal("localhost", 8000)
connection.connect()
connection.signin("root", "root")
connection.use("test", "test")
val liveQueryId = connection.live("test")
connection.kill(liveQueryId)
}
}
Loading

0 comments on commit b844dfb

Please sign in to comment.