Skip to content

Commit

Permalink
Merge pull request #7 from mnbjhu/live_queries
Browse files Browse the repository at this point in the history
Live queries
  • Loading branch information
mnbjhu committed Aug 23, 2023
2 parents 865e6ff + bc30507 commit f3a0eb7
Show file tree
Hide file tree
Showing 51 changed files with 579 additions and 95 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Start Surreal
run: docker run -d -p 8000:8000 surrealdb/surrealdb:latest start --user root --pass root -- "memory"
run: docker run -d -p 8000:8000 surrealdb/surrealdb:nightly start --user root --pass root -- "memory"
- uses: actions/checkout@v3
- name: Set up JDK 11
uses: actions/setup-java@v3
Expand Down
83 changes: 71 additions & 12 deletions src/commonMain/kotlin/uk/gibby/driver/Surreal.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import io.ktor.util.collections.*
import io.ktor.websocket.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.flow.*
import kotlinx.serialization.json.*
import uk.gibby.driver.rpc.model.RpcRequest
import uk.gibby.driver.rpc.model.RpcResponse
import uk.gibby.driver.rpc.model.RpcResponseSerializer
import kotlinx.serialization.encodeToString
import uk.gibby.driver.annotation.SurrealDbNightlyOnlyApi
import uk.gibby.driver.exception.LiveQueryKilledException
import uk.gibby.driver.model.rpc.*
import uk.gibby.driver.rpc.kill

/**
* SurrealDB driver
Expand All @@ -25,6 +25,7 @@ class Surreal(private val host: String, private val port: Int = 8000) {
private var count = 0L
private var connection: DefaultClientWebSocketSession? = null
private val requests = ConcurrentMap<String, Channel<JsonElement>>()
private val liveQueries = ConcurrentMap<String, Channel<LiveQueryAction<JsonElement>>>()
private val context = CoroutineScope(Dispatchers.Default)

/**
Expand All @@ -41,31 +42,89 @@ class Surreal(private val host: String, private val port: Int = 8000) {
context.launch {
it.incoming.receiveAsFlow().collect {
it as Frame.Text
println(it.readText())
val response = try {
surrealJson.decodeFromString(RpcResponseSerializer, it.readText())
} catch (e: Exception) {
// In theory this could be an error for any request, so we cancel all of them
requests.forEach { (_, r) -> r.cancel(CancellationException("Failed to decode incoming response: ${it.readText()}\n${e.message}"))}
throw e
}
val request = requests[response.id]
if (request == null) requests.forEach { (_, r) -> r.cancel(CancellationException("Received a request with an unknown id: ${response.id} body: $response"))}
else when(response) {
is RpcResponse.Success -> request.send(response.result)
is RpcResponse.Error -> request.cancel(CancellationException("SurrealDB responded with an error: '${response.error}'"))
when(response) {
is RpcResponse.Success -> handleSuccess(response)
is RpcResponse.Error -> handleError(response)
is RpcResponse.Notification -> handleNotification(response)
}
requests.remove(response.id)
}
}
}
}

private suspend fun handleSuccess(response: RpcResponse.Success) {
val request = requests[response.id]
if (request != null) {
request.send(response.result)
} else {
requests.forEach {
// In theory this could be an error for any request, so we cancel all of them
(_, r) ->
r.cancel(CancellationException("Received a request with an unknown id: ${response.id} body: $response"))
}
}
}

private fun handleError(response: RpcResponse.Error) {
val request = requests[response.id]
if (request != null) {
request.cancel(CancellationException("SurrealDB responded with an error: '${response.error}'"))
} else {
requests.forEach {
// In theory this could be an error for any request, so we cancel all of them
(_, r) ->
r.cancel(CancellationException("Received a request with an unknown id: ${response.id} body: $response"))
}
}
requests.remove(response.id)
}

private suspend fun handleNotification(response: RpcResponse.Notification) {
val action = response.result
val liveQuery = liveQueries.getOrPut(action.id) { Channel() }
context.launch { liveQuery.send(response.result) }
}

internal suspend fun sendRequest(method: String, params: JsonArray): JsonElement {
val id = count++.toString()
val request = RpcRequest(id, method, params)
val channel = Channel<JsonElement>(1)
requests[id] = channel
println(request)
(connection ?: throw Exception("SurrealDB: Websocket not connected")).sendSerialized(request)
return channel.receive()
}

}
@SurrealDbNightlyOnlyApi
fun subscribeAsJson(liveQueryId: String): Flow<LiveQueryAction<JsonElement>> {
val channel = liveQueries.getOrPut(liveQueryId) { Channel() }
return channel.receiveAsFlow()
}

@SurrealDbNightlyOnlyApi
inline fun <reified T> subscribe(liveQueryId: String): Flow<LiveQueryAction<T>> {
return subscribeAsJson(liveQueryId).map { it.asType() }
}

@SurrealDbNightlyOnlyApi
fun unsubscribe(liveQueryId: String) {
val channel = liveQueries[liveQueryId]
channel?.cancel(LiveQueryKilledException)
liveQueries.remove(liveQueryId)
}

@SurrealDbNightlyOnlyApi
internal fun triggerKill(liveQueryId: String) {
context.launch { kill(liveQueryId) }
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package uk.gibby.driver.annotation

@RequiresOptIn(message = "This function is only available while using nightly builds of SurrealDB. See https://surrealdb.com/docs/installation/nightly.")
@Target(AnnotationTarget.FUNCTION, AnnotationTarget.CLASS)
@Retention(AnnotationRetention.BINARY)
annotation class SurrealDbNightlyOnlyApi
56 changes: 56 additions & 0 deletions src/commonMain/kotlin/uk/gibby/driver/api/LiveQueryFlow.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package uk.gibby.driver.api

import io.ktor.utils.io.core.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import uk.gibby.driver.Surreal
import uk.gibby.driver.annotation.SurrealDbNightlyOnlyApi
import uk.gibby.driver.model.rpc.LiveQueryAction

/**
* Live query flow
*
* This class is a wrapper around a flow of [LiveQueryAction]s making it [Closeable].
*
* @param T The type of the records
* @property flow The flow of [LiveQueryAction]s
* @property id The id of the live query
* @property connection The connection to the database
* @constructor Creates a new live query flow
*/
@SurrealDbNightlyOnlyApi
class LiveQueryFlow<T>(
private val flow: Flow<LiveQueryAction<T>>,
val id: String,
private val connection: Surreal
): Flow<LiveQueryAction<T>> by flow, Closeable {

/**
* Close
*
* This method will unsubscribe from the live query and trigger the kill RPC request.
*/

override fun close() {
connection.unsubscribe(id)
connection.triggerKill(id)
}

/**
* Map
*
* Used to map the records of the live query flow form [T] to [R].
*
* @param R The type of the records in the new flow
* @param transform The transform function from [T] to [R]
* @receiver The live query flow to map
* @return A new live query flow of [R]s
*/
fun <R>map(transform: (LiveQueryAction<T>) -> LiveQueryAction<R>): LiveQueryFlow<R> {
return LiveQueryFlow(
flow = flow.map { transform(it) },
id = id,
connection = connection
)
}
}
43 changes: 43 additions & 0 deletions src/commonMain/kotlin/uk/gibby/driver/api/Observe.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package uk.gibby.driver.api

import kotlinx.serialization.json.JsonElement
import uk.gibby.driver.Surreal
import uk.gibby.driver.annotation.SurrealDbNightlyOnlyApi
import uk.gibby.driver.rpc.live
import uk.gibby.driver.model.rpc.asType
import kotlin.jvm.JvmName

/**
* Observe live query as json
*
* Creates a [LiveQueryFlow] for the given table.
*
* @param table Name of the table to 'LIVE SELECT' from
* @return A [LiveQueryFlow] of [JsonElement]s
*/
@JvmName("observeJson")
@SurrealDbNightlyOnlyApi
suspend fun Surreal.observeLiveQueryAsJson(table: String): LiveQueryFlow<JsonElement> {
val liveQueryId = live(table)
return LiveQueryFlow(
flow = subscribeAsJson(liveQueryId),
id = liveQueryId,
connection = this
)
}


/**
* Observe live query
*
* Creates a [LiveQueryFlow] for the given table.
*
* @param T The type of the records
* @param table Name of the table to 'LIVE SELECT' from
* @return A [LiveQueryFlow] of [T]s
*/
@SurrealDbNightlyOnlyApi
suspend inline fun <reified T>Surreal.observeLiveQuery(table: String): LiveQueryFlow<T> {
val jsonFlow = observeLiveQueryAsJson(table)
return jsonFlow.map { it.asType<T>() }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package uk.gibby.driver.exception

import kotlinx.coroutines.CancellationException

object LiveQueryKilledException: CancellationException("Live query has been killed")
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package uk.gibby.driver.exception

data class QueryException(val detail: String): Exception("SurrealDB responded with an error: '$detail'")
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package uk.gibby.driver.rpc.model
package uk.gibby.driver.model

import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package uk.gibby.driver.rpc.model
package uk.gibby.driver.model

import kotlinx.serialization.Serializable
import kotlinx.serialization.json.JsonElement
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package uk.gibby.driver.rpc.model
package uk.gibby.driver.model

import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.KSerializer
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package uk.gibby.driver.rpc.model
package uk.gibby.driver.model.query

import kotlinx.serialization.json.JsonElement
import kotlinx.serialization.json.encodeToJsonElement
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package uk.gibby.driver.rpc.model
package uk.gibby.driver.model.query

import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.JsonElement
import kotlinx.serialization.json.decodeFromJsonElement
import uk.gibby.driver.rpc.exception.QueryException
import uk.gibby.driver.exception.QueryException
import uk.gibby.driver.surrealJson

@Serializable
Expand Down
42 changes: 42 additions & 0 deletions src/commonMain/kotlin/uk/gibby/driver/model/rpc/LiveQueryAction.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package uk.gibby.driver.model.rpc

import kotlinx.serialization.Serializable
import kotlinx.serialization.json.JsonElement
import kotlinx.serialization.json.decodeFromJsonElement
import uk.gibby.driver.model.rpc.LiveQueryAction.*
import uk.gibby.driver.surrealJson


/**
* LiveQueryAction
*
* A live query action. Can be a [Create], [Update] or [Delete].
* Holds the id of the record and the result if relevant.
*
* @param T The type of the result
*/
@Serializable(with = LiveQueryActionSerializer::class)
sealed class LiveQueryAction<out T> {
abstract val id: String
data class Create<T>(override val id: String, val result: T): LiveQueryAction<T>()
data class Update<T>(override val id: String, val result: T): LiveQueryAction<T>()
data class Delete(override val id: String, val deletedId: String): LiveQueryAction<Nothing>()
}


/**
* As type
*
* Maps the live query to type [T] using [decodeFromJsonElement]
*
* @param T The new type
* @return The new live query action
*/
inline fun <reified T> LiveQueryAction<JsonElement>.asType(): LiveQueryAction<T> {
return when(this) {
is Delete -> this
is Create -> Create(id, surrealJson.decodeFromJsonElement(result))
is Update -> Update(id, surrealJson.decodeFromJsonElement(result))
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package uk.gibby.driver.model.rpc

import kotlinx.serialization.KSerializer
import kotlinx.serialization.SerializationException
import kotlinx.serialization.builtins.serializer
import kotlinx.serialization.descriptors.SerialDescriptor
import kotlinx.serialization.descriptors.buildClassSerialDescriptor
import kotlinx.serialization.encoding.CompositeDecoder
import kotlinx.serialization.encoding.Decoder
import kotlinx.serialization.encoding.Encoder
import uk.gibby.driver.model.Thing
import uk.gibby.driver.model.ThingSerializer

class LiveQueryActionSerializer<T: Any>(private val resultSerializer: KSerializer<T>): KSerializer<LiveQueryAction<T>> {
override val descriptor: SerialDescriptor = buildClassSerialDescriptor("LiveQueryAction") {
element("action", String.serializer().descriptor)
element("id", String.serializer().descriptor)
element("result", resultSerializer.descriptor)
}

override fun deserialize(decoder: Decoder): LiveQueryAction<T> {
val input = decoder.beginStructure(descriptor)
var action: String? = null
var id: String? = null
var result: Thing<T>? = null
loop@ while (true) {
when (val i = input.decodeElementIndex(descriptor)) {
CompositeDecoder.DECODE_DONE -> break@loop
0 -> action = input.decodeStringElement(descriptor, i)
1 -> id = input.decodeStringElement(descriptor, i)
2 -> result = input.decodeSerializableElement(descriptor, i, ThingSerializer(resultSerializer))
else -> throw SerializationException("Unknown index $i")
}
}
input.endStructure(descriptor)
if (action == null || id == null || result == null) throw SerializationException("Missing fields")
return when(action) {
"CREATE" -> LiveQueryAction.Create(id, (result as Thing.Record<T>).result)
"UPDATE" -> LiveQueryAction.Update(id, (result as Thing.Record<T>).result)
"DELETE" -> LiveQueryAction.Delete(id, (result as Thing.Reference).id)
else -> throw SerializationException("Unknown action $action")
}
}

override fun serialize(encoder: Encoder, value: LiveQueryAction<T>) {
TODO("Not yet implemented")
}

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package uk.gibby.driver.rpc.model
package uk.gibby.driver.model.rpc

import kotlinx.serialization.Serializable
import kotlinx.serialization.json.JsonArray
Expand Down
Loading

0 comments on commit f3a0eb7

Please sign in to comment.