Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Live queries #7

Merged
merged 6 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Start Surreal
run: docker run -d -p 8000:8000 surrealdb/surrealdb:latest start --user root --pass root -- "memory"
run: docker run -d -p 8000:8000 surrealdb/surrealdb:nightly start --user root --pass root -- "memory"
- uses: actions/checkout@v3
- name: Set up JDK 11
uses: actions/setup-java@v3
Expand Down
83 changes: 71 additions & 12 deletions src/commonMain/kotlin/uk/gibby/driver/Surreal.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import io.ktor.util.collections.*
import io.ktor.websocket.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.flow.*
import kotlinx.serialization.json.*
import uk.gibby.driver.rpc.model.RpcRequest
import uk.gibby.driver.rpc.model.RpcResponse
import uk.gibby.driver.rpc.model.RpcResponseSerializer
import kotlinx.serialization.encodeToString
import uk.gibby.driver.annotation.SurrealDbNightlyOnlyApi
import uk.gibby.driver.exception.LiveQueryKilledException
import uk.gibby.driver.model.rpc.*
import uk.gibby.driver.rpc.kill

/**
* SurrealDB driver
Expand All @@ -25,6 +25,7 @@ class Surreal(private val host: String, private val port: Int = 8000) {
private var count = 0L
private var connection: DefaultClientWebSocketSession? = null
private val requests = ConcurrentMap<String, Channel<JsonElement>>()
private val liveQueries = ConcurrentMap<String, Channel<LiveQueryAction<JsonElement>>>()
private val context = CoroutineScope(Dispatchers.Default)

/**
Expand All @@ -41,31 +42,89 @@ class Surreal(private val host: String, private val port: Int = 8000) {
context.launch {
it.incoming.receiveAsFlow().collect {
it as Frame.Text
println(it.readText())
val response = try {
surrealJson.decodeFromString(RpcResponseSerializer, it.readText())
} catch (e: Exception) {
// In theory this could be an error for any request, so we cancel all of them
requests.forEach { (_, r) -> r.cancel(CancellationException("Failed to decode incoming response: ${it.readText()}\n${e.message}"))}
throw e
}
val request = requests[response.id]
if (request == null) requests.forEach { (_, r) -> r.cancel(CancellationException("Received a request with an unknown id: ${response.id} body: $response"))}
else when(response) {
is RpcResponse.Success -> request.send(response.result)
is RpcResponse.Error -> request.cancel(CancellationException("SurrealDB responded with an error: '${response.error}'"))
when(response) {
is RpcResponse.Success -> handleSuccess(response)
is RpcResponse.Error -> handleError(response)
is RpcResponse.Notification -> handleNotification(response)
}
requests.remove(response.id)
}
}
}
}

private suspend fun handleSuccess(response: RpcResponse.Success) {
val request = requests[response.id]
if (request != null) {
request.send(response.result)
} else {
requests.forEach {
// In theory this could be an error for any request, so we cancel all of them
(_, r) ->
r.cancel(CancellationException("Received a request with an unknown id: ${response.id} body: $response"))
}
}
}

private fun handleError(response: RpcResponse.Error) {
val request = requests[response.id]
if (request != null) {
request.cancel(CancellationException("SurrealDB responded with an error: '${response.error}'"))
} else {
requests.forEach {
// In theory this could be an error for any request, so we cancel all of them
(_, r) ->
r.cancel(CancellationException("Received a request with an unknown id: ${response.id} body: $response"))
}
}
requests.remove(response.id)
}

private suspend fun handleNotification(response: RpcResponse.Notification) {
val action = response.result
val liveQuery = liveQueries.getOrPut(action.id) { Channel() }
context.launch { liveQuery.send(response.result) }
}

internal suspend fun sendRequest(method: String, params: JsonArray): JsonElement {
val id = count++.toString()
val request = RpcRequest(id, method, params)
val channel = Channel<JsonElement>(1)
requests[id] = channel
println(request)
(connection ?: throw Exception("SurrealDB: Websocket not connected")).sendSerialized(request)
return channel.receive()
}

}
@SurrealDbNightlyOnlyApi
fun subscribeAsJson(liveQueryId: String): Flow<LiveQueryAction<JsonElement>> {
val channel = liveQueries.getOrPut(liveQueryId) { Channel() }
return channel.receiveAsFlow()
}

@SurrealDbNightlyOnlyApi
inline fun <reified T> subscribe(liveQueryId: String): Flow<LiveQueryAction<T>> {
return subscribeAsJson(liveQueryId).map { it.asType() }
}

@SurrealDbNightlyOnlyApi
fun unsubscribe(liveQueryId: String) {
val channel = liveQueries[liveQueryId]
channel?.cancel(LiveQueryKilledException)
liveQueries.remove(liveQueryId)
}

@SurrealDbNightlyOnlyApi
internal fun triggerKill(liveQueryId: String) {
context.launch { kill(liveQueryId) }
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package uk.gibby.driver.annotation

@RequiresOptIn(message = "This function is only available while using nightly builds of SurrealDB. See https://surrealdb.com/docs/installation/nightly.")
@Target(AnnotationTarget.FUNCTION, AnnotationTarget.CLASS)
@Retention(AnnotationRetention.BINARY)
annotation class SurrealDbNightlyOnlyApi
56 changes: 56 additions & 0 deletions src/commonMain/kotlin/uk/gibby/driver/api/LiveQueryFlow.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package uk.gibby.driver.api

import io.ktor.utils.io.core.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import uk.gibby.driver.Surreal
import uk.gibby.driver.annotation.SurrealDbNightlyOnlyApi
import uk.gibby.driver.model.rpc.LiveQueryAction

/**
* Live query flow
*
* This class is a wrapper around a flow of [LiveQueryAction]s making it [Closeable].
*
* @param T The type of the records
* @property flow The flow of [LiveQueryAction]s
* @property id The id of the live query
* @property connection The connection to the database
* @constructor Creates a new live query flow
*/
@SurrealDbNightlyOnlyApi
class LiveQueryFlow<T>(
private val flow: Flow<LiveQueryAction<T>>,
val id: String,
private val connection: Surreal
): Flow<LiveQueryAction<T>> by flow, Closeable {

/**
* Close
*
* This method will unsubscribe from the live query and trigger the kill RPC request.
*/

override fun close() {
connection.unsubscribe(id)
connection.triggerKill(id)
}

/**
* Map
*
* Used to map the records of the live query flow form [T] to [R].
*
* @param R The type of the records in the new flow
* @param transform The transform function from [T] to [R]
* @receiver The live query flow to map
* @return A new live query flow of [R]s
*/
fun <R>map(transform: (LiveQueryAction<T>) -> LiveQueryAction<R>): LiveQueryFlow<R> {
return LiveQueryFlow(
flow = flow.map { transform(it) },
id = id,
connection = connection
)
}
}
43 changes: 43 additions & 0 deletions src/commonMain/kotlin/uk/gibby/driver/api/Observe.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package uk.gibby.driver.api

import kotlinx.serialization.json.JsonElement
import uk.gibby.driver.Surreal
import uk.gibby.driver.annotation.SurrealDbNightlyOnlyApi
import uk.gibby.driver.rpc.live
import uk.gibby.driver.model.rpc.asType
import kotlin.jvm.JvmName

/**
* Observe live query as json
*
* Creates a [LiveQueryFlow] for the given table.
*
* @param table Name of the table to 'LIVE SELECT' from
* @return A [LiveQueryFlow] of [JsonElement]s
*/
@JvmName("observeJson")
@SurrealDbNightlyOnlyApi
suspend fun Surreal.observeLiveQueryAsJson(table: String): LiveQueryFlow<JsonElement> {
val liveQueryId = live(table)
return LiveQueryFlow(
flow = subscribeAsJson(liveQueryId),
id = liveQueryId,
connection = this
)
}


/**
* Observe live query
*
* Creates a [LiveQueryFlow] for the given table.
*
* @param T The type of the records
* @param table Name of the table to 'LIVE SELECT' from
* @return A [LiveQueryFlow] of [T]s
*/
@SurrealDbNightlyOnlyApi
suspend inline fun <reified T>Surreal.observeLiveQuery(table: String): LiveQueryFlow<T> {
val jsonFlow = observeLiveQueryAsJson(table)
return jsonFlow.map { it.asType<T>() }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package uk.gibby.driver.exception

import kotlinx.coroutines.CancellationException

object LiveQueryKilledException: CancellationException("Live query has been killed")
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package uk.gibby.driver.rpc.exception
package uk.gibby.driver.exception

data class QueryException(val detail: String): Exception("SurrealDB responded with an error: '$detail'")
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package uk.gibby.driver.rpc.model
package uk.gibby.driver.model

import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package uk.gibby.driver.rpc.model
package uk.gibby.driver.model

import kotlinx.serialization.Serializable
import kotlinx.serialization.json.JsonElement
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package uk.gibby.driver.rpc.model
package uk.gibby.driver.model

import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.KSerializer
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package uk.gibby.driver.rpc.model
package uk.gibby.driver.model.query

import kotlinx.serialization.json.JsonElement
import kotlinx.serialization.json.encodeToJsonElement
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package uk.gibby.driver.rpc.model
package uk.gibby.driver.model.query

import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.JsonElement
import kotlinx.serialization.json.decodeFromJsonElement
import uk.gibby.driver.rpc.exception.QueryException
import uk.gibby.driver.exception.QueryException
import uk.gibby.driver.surrealJson

@Serializable
Expand Down
42 changes: 42 additions & 0 deletions src/commonMain/kotlin/uk/gibby/driver/model/rpc/LiveQueryAction.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package uk.gibby.driver.model.rpc

import kotlinx.serialization.Serializable
import kotlinx.serialization.json.JsonElement
import kotlinx.serialization.json.decodeFromJsonElement
import uk.gibby.driver.model.rpc.LiveQueryAction.*
import uk.gibby.driver.surrealJson


/**
* LiveQueryAction
*
* A live query action. Can be a [Create], [Update] or [Delete].
* Holds the id of the record and the result if relevant.
*
* @param T The type of the result
*/
@Serializable(with = LiveQueryActionSerializer::class)
sealed class LiveQueryAction<out T> {
abstract val id: String
data class Create<T>(override val id: String, val result: T): LiveQueryAction<T>()
data class Update<T>(override val id: String, val result: T): LiveQueryAction<T>()
data class Delete(override val id: String, val deletedId: String): LiveQueryAction<Nothing>()
}


/**
* As type
*
* Maps the live query to type [T] using [decodeFromJsonElement]
*
* @param T The new type
* @return The new live query action
*/
inline fun <reified T> LiveQueryAction<JsonElement>.asType(): LiveQueryAction<T> {
return when(this) {
is Delete -> this
is Create -> Create(id, surrealJson.decodeFromJsonElement(result))
is Update -> Update(id, surrealJson.decodeFromJsonElement(result))
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package uk.gibby.driver.model.rpc

import kotlinx.serialization.KSerializer
import kotlinx.serialization.SerializationException
import kotlinx.serialization.builtins.serializer
import kotlinx.serialization.descriptors.SerialDescriptor
import kotlinx.serialization.descriptors.buildClassSerialDescriptor
import kotlinx.serialization.encoding.CompositeDecoder
import kotlinx.serialization.encoding.Decoder
import kotlinx.serialization.encoding.Encoder
import uk.gibby.driver.model.Thing
import uk.gibby.driver.model.ThingSerializer

class LiveQueryActionSerializer<T: Any>(private val resultSerializer: KSerializer<T>): KSerializer<LiveQueryAction<T>> {
override val descriptor: SerialDescriptor = buildClassSerialDescriptor("LiveQueryAction") {
element("action", String.serializer().descriptor)
element("id", String.serializer().descriptor)
element("result", resultSerializer.descriptor)
}

override fun deserialize(decoder: Decoder): LiveQueryAction<T> {
val input = decoder.beginStructure(descriptor)
var action: String? = null
var id: String? = null
var result: Thing<T>? = null
loop@ while (true) {
when (val i = input.decodeElementIndex(descriptor)) {
CompositeDecoder.DECODE_DONE -> break@loop
0 -> action = input.decodeStringElement(descriptor, i)
1 -> id = input.decodeStringElement(descriptor, i)
2 -> result = input.decodeSerializableElement(descriptor, i, ThingSerializer(resultSerializer))
else -> throw SerializationException("Unknown index $i")
}
}
input.endStructure(descriptor)
if (action == null || id == null || result == null) throw SerializationException("Missing fields")
return when(action) {
"CREATE" -> LiveQueryAction.Create(id, (result as Thing.Record<T>).result)
"UPDATE" -> LiveQueryAction.Update(id, (result as Thing.Record<T>).result)
"DELETE" -> LiveQueryAction.Delete(id, (result as Thing.Reference).id)
else -> throw SerializationException("Unknown action $action")
}
}

override fun serialize(encoder: Encoder, value: LiveQueryAction<T>) {
TODO("Not yet implemented")
}

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package uk.gibby.driver.rpc.model
package uk.gibby.driver.model.rpc

import kotlinx.serialization.Serializable
import kotlinx.serialization.json.JsonArray
Expand Down
Loading