Skip to content

Commit

Permalink
Added Docs blocks and @SurrealDbOnlyApi to 'kill'
Browse files Browse the repository at this point in the history
  • Loading branch information
mnbjhu committed Aug 23, 2023
1 parent ce4ee27 commit bc30507
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 76 deletions.
11 changes: 8 additions & 3 deletions src/commonMain/kotlin/uk/gibby/driver/Surreal.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.*
import kotlinx.serialization.json.*
import uk.gibby.driver.annotation.SurrealDbNightlyOnlyApi
import uk.gibby.driver.exception.LiveQueryKilledException
import uk.gibby.driver.model.rpc.*
import uk.gibby.driver.rpc.kill
Expand Down Expand Up @@ -102,21 +103,25 @@ class Surreal(private val host: String, private val port: Int = 8000) {
return channel.receive()
}

fun subscribeToTableAsJson(liveQueryId: String): Flow<LiveQueryAction<JsonElement>> {
@SurrealDbNightlyOnlyApi
fun subscribeAsJson(liveQueryId: String): Flow<LiveQueryAction<JsonElement>> {
val channel = liveQueries.getOrPut(liveQueryId) { Channel() }
return channel.receiveAsFlow()
}

inline fun <reified T> subscribeToTable(liveQueryId: String): Flow<LiveQueryAction<T>> {
return subscribeToTableAsJson(liveQueryId).map { it.asType() }
@SurrealDbNightlyOnlyApi
inline fun <reified T> subscribe(liveQueryId: String): Flow<LiveQueryAction<T>> {
return subscribeAsJson(liveQueryId).map { it.asType() }
}

@SurrealDbNightlyOnlyApi
fun unsubscribe(liveQueryId: String) {
val channel = liveQueries[liveQueryId]
channel?.cancel(LiveQueryKilledException)
liveQueries.remove(liveQueryId)
}

@SurrealDbNightlyOnlyApi
internal fun triggerKill(liveQueryId: String) {
context.launch { kill(liveQueryId) }
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package uk.gibby.driver.annotation

@RequiresOptIn(message = "This function is only available while using nightly builds of SurrealDB. See https://surrealdb.com/docs/installation/nightly.")
@Target(AnnotationTarget.FUNCTION)
@Target(AnnotationTarget.FUNCTION, AnnotationTarget.CLASS)
@Retention(AnnotationRetention.BINARY)
annotation class SurrealDbNightlyOnlyApi
30 changes: 30 additions & 0 deletions src/commonMain/kotlin/uk/gibby/driver/api/LiveQueryFlow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,48 @@ import io.ktor.utils.io.core.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import uk.gibby.driver.Surreal
import uk.gibby.driver.annotation.SurrealDbNightlyOnlyApi
import uk.gibby.driver.model.rpc.LiveQueryAction

/**
* Live query flow
*
* This class is a wrapper around a flow of [LiveQueryAction]s making it [Closeable].
*
* @param T The type of the records
* @property flow The flow of [LiveQueryAction]s
* @property id The id of the live query
* @property connection The connection to the database
* @constructor Creates a new live query flow
*/
@SurrealDbNightlyOnlyApi
class LiveQueryFlow<T>(
private val flow: Flow<LiveQueryAction<T>>,
val id: String,
private val connection: Surreal
): Flow<LiveQueryAction<T>> by flow, Closeable {

/**
* Close
*
* This method will unsubscribe from the live query and trigger the kill RPC request.
*/

override fun close() {
connection.unsubscribe(id)
connection.triggerKill(id)
}

/**
* Map
*
* Used to map the records of the live query flow form [T] to [R].
*
* @param R The type of the records in the new flow
* @param transform The transform function from [T] to [R]
* @receiver The live query flow to map
* @return A new live query flow of [R]s
*/
fun <R>map(transform: (LiveQueryAction<T>) -> LiveQueryAction<R>): LiveQueryFlow<R> {
return LiveQueryFlow(
flow = flow.map { transform(it) },
Expand Down
22 changes: 18 additions & 4 deletions src/commonMain/kotlin/uk/gibby/driver/api/Observe.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,39 @@ package uk.gibby.driver.api
import kotlinx.serialization.json.JsonElement
import uk.gibby.driver.Surreal
import uk.gibby.driver.annotation.SurrealDbNightlyOnlyApi
import uk.gibby.driver.model.query.data
import uk.gibby.driver.model.rpc.LiveQueryAction
import uk.gibby.driver.rpc.live
import uk.gibby.driver.model.rpc.asType
import uk.gibby.driver.rpc.query
import kotlin.jvm.JvmName

/**
* Observe live query as json
*
* Creates a [LiveQueryFlow] for the given table.
*
* @param table Name of the table to 'LIVE SELECT' from
* @return A [LiveQueryFlow] of [JsonElement]s
*/
@JvmName("observeJson")
@SurrealDbNightlyOnlyApi
suspend fun Surreal.observeLiveQueryAsJson(table: String): LiveQueryFlow<JsonElement> {
val liveQueryId = live(table)
return LiveQueryFlow(
flow = subscribeToTableAsJson(liveQueryId),
flow = subscribeAsJson(liveQueryId),
id = liveQueryId,
connection = this
)
}


/**
* Observe live query
*
* Creates a [LiveQueryFlow] for the given table.
*
* @param T The type of the records
* @param table Name of the table to 'LIVE SELECT' from
* @return A [LiveQueryFlow] of [T]s
*/
@SurrealDbNightlyOnlyApi
suspend inline fun <reified T>Surreal.observeLiveQuery(table: String): LiveQueryFlow<T> {
val jsonFlow = observeLiveQueryAsJson(table)
Expand Down
71 changes: 20 additions & 51 deletions src/commonMain/kotlin/uk/gibby/driver/model/rpc/LiveQueryAction.kt
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
package uk.gibby.driver.model.rpc

import kotlinx.serialization.KSerializer
import kotlinx.serialization.Serializable
import kotlinx.serialization.SerializationException
import kotlinx.serialization.builtins.serializer
import kotlinx.serialization.descriptors.SerialDescriptor
import kotlinx.serialization.descriptors.buildClassSerialDescriptor
import kotlinx.serialization.encoding.CompositeDecoder
import kotlinx.serialization.encoding.Decoder
import kotlinx.serialization.encoding.Encoder
import kotlinx.serialization.json.JsonElement
import kotlinx.serialization.json.decodeFromJsonElement
import uk.gibby.driver.api.LiveQueryFlow
import uk.gibby.driver.model.Thing
import uk.gibby.driver.model.ThingSerializer
import uk.gibby.driver.model.rpc.LiveQueryAction.*
import uk.gibby.driver.surrealJson


/**
* LiveQueryAction
*
* A live query action. Can be a [Create], [Update] or [Delete].
* Holds the id of the record and the result if relevant.
*
* @param T The type of the result
*/
@Serializable(with = LiveQueryActionSerializer::class)
sealed class LiveQueryAction<out T> {
abstract val id: String
Expand All @@ -26,48 +24,19 @@ sealed class LiveQueryAction<out T> {
}


/**
* As type
*
* Maps the live query to type [T] using [decodeFromJsonElement]
*
* @param T The new type
* @return The new live query action
*/
inline fun <reified T> LiveQueryAction<JsonElement>.asType(): LiveQueryAction<T> {
return when(this) {
is LiveQueryAction.Delete -> this
is LiveQueryAction.Create -> LiveQueryAction.Create(id, surrealJson.decodeFromJsonElement(result))
is LiveQueryAction.Update -> LiveQueryAction.Update(id, surrealJson.decodeFromJsonElement(result))
is Delete -> this
is Create -> Create(id, surrealJson.decodeFromJsonElement(result))
is Update -> Update(id, surrealJson.decodeFromJsonElement(result))
}
}

class LiveQueryActionSerializer<T: Any>(private val resultSerializer: KSerializer<T>): KSerializer<LiveQueryAction<T>> {
override val descriptor: SerialDescriptor = buildClassSerialDescriptor("LiveQueryAction") {
element("action", String.serializer().descriptor)
element("id", String.serializer().descriptor)
element("result", resultSerializer.descriptor)
}

override fun deserialize(decoder: Decoder): LiveQueryAction<T> {
val input = decoder.beginStructure(descriptor)
var action: String? = null
var id: String? = null
var result: Thing<T>? = null
loop@ while (true) {
when (val i = input.decodeElementIndex(descriptor)) {
CompositeDecoder.DECODE_DONE -> break@loop
0 -> action = input.decodeStringElement(descriptor, i)
1 -> id = input.decodeStringElement(descriptor, i)
2 -> result = input.decodeSerializableElement(descriptor, i, ThingSerializer(resultSerializer))
else -> throw SerializationException("Unknown index $i")
}
}
input.endStructure(descriptor)
if (action == null || id == null || result == null) throw SerializationException("Missing fields")
return when(action) {
"CREATE" -> LiveQueryAction.Create(id, (result as Thing.Record<T>).result)
"UPDATE" -> LiveQueryAction.Update(id, (result as Thing.Record<T>).result)
"DELETE" -> LiveQueryAction.Delete(id, (result as Thing.Reference).id)
else -> throw SerializationException("Unknown action $action")
}
}

override fun serialize(encoder: Encoder, value: LiveQueryAction<T>) {
TODO("Not yet implemented")
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package uk.gibby.driver.model.rpc

import kotlinx.serialization.KSerializer
import kotlinx.serialization.SerializationException
import kotlinx.serialization.builtins.serializer
import kotlinx.serialization.descriptors.SerialDescriptor
import kotlinx.serialization.descriptors.buildClassSerialDescriptor
import kotlinx.serialization.encoding.CompositeDecoder
import kotlinx.serialization.encoding.Decoder
import kotlinx.serialization.encoding.Encoder
import uk.gibby.driver.model.Thing
import uk.gibby.driver.model.ThingSerializer

class LiveQueryActionSerializer<T: Any>(private val resultSerializer: KSerializer<T>): KSerializer<LiveQueryAction<T>> {
override val descriptor: SerialDescriptor = buildClassSerialDescriptor("LiveQueryAction") {
element("action", String.serializer().descriptor)
element("id", String.serializer().descriptor)
element("result", resultSerializer.descriptor)
}

override fun deserialize(decoder: Decoder): LiveQueryAction<T> {
val input = decoder.beginStructure(descriptor)
var action: String? = null
var id: String? = null
var result: Thing<T>? = null
loop@ while (true) {
when (val i = input.decodeElementIndex(descriptor)) {
CompositeDecoder.DECODE_DONE -> break@loop
0 -> action = input.decodeStringElement(descriptor, i)
1 -> id = input.decodeStringElement(descriptor, i)
2 -> result = input.decodeSerializableElement(descriptor, i, ThingSerializer(resultSerializer))
else -> throw SerializationException("Unknown index $i")
}
}
input.endStructure(descriptor)
if (action == null || id == null || result == null) throw SerializationException("Missing fields")
return when(action) {
"CREATE" -> LiveQueryAction.Create(id, (result as Thing.Record<T>).result)
"UPDATE" -> LiveQueryAction.Update(id, (result as Thing.Record<T>).result)
"DELETE" -> LiveQueryAction.Delete(id, (result as Thing.Reference).id)
else -> throw SerializationException("Unknown action $action")
}
}

override fun serialize(encoder: Encoder, value: LiveQueryAction<T>) {
TODO("Not yet implemented")
}

}
9 changes: 9 additions & 0 deletions src/commonMain/kotlin/uk/gibby/driver/rpc/Kill.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,16 @@ package uk.gibby.driver.rpc
import kotlinx.serialization.json.add
import kotlinx.serialization.json.buildJsonArray
import uk.gibby.driver.Surreal
import uk.gibby.driver.annotation.SurrealDbNightlyOnlyApi

/**
* Kill
*
* This method will kill a live query
*
* @param liveQueryId The id of the live query to kill
*/
@SurrealDbNightlyOnlyApi
suspend fun Surreal.kill(liveQueryId: String) {
sendRequest("kill", buildJsonArray { add(liveQueryId) })
}
Expand Down
21 changes: 8 additions & 13 deletions src/commonMain/kotlin/uk/gibby/driver/rpc/Live.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,19 @@ import kotlinx.serialization.json.buildJsonArray
import kotlinx.serialization.json.decodeFromJsonElement
import uk.gibby.driver.Surreal
import uk.gibby.driver.annotation.SurrealDbNightlyOnlyApi
import uk.gibby.driver.model.Thing
import uk.gibby.driver.surrealJson


/**
* Live
*
* This method will create a live query
*
* @param table Name of the table to 'LIVE SELECT' from
* @return The id of the live query
*/
@SurrealDbNightlyOnlyApi
suspend fun Surreal.live(table: String): String {
val response = sendRequest("live", buildJsonArray { add(table) })
return surrealJson.decodeFromJsonElement(response)
}

@SurrealDbNightlyOnlyApi
suspend fun Surreal.live(table: String, id: String): String {
val response = sendRequest("live", buildJsonArray { add("$table:$id") })
return surrealJson.decodeFromJsonElement(response)
}

@SurrealDbNightlyOnlyApi
suspend fun Surreal.live(id: Thing<*>): String {
val response = sendRequest("live", buildJsonArray { add(id.id) })
return surrealJson.decodeFromJsonElement(response)
}
3 changes: 1 addition & 2 deletions src/commonTest/kotlin/KillTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ import uk.gibby.driver.rpc.live
import uk.gibby.driver.rpc.signin
import uk.gibby.driver.rpc.use
import utils.cleanDatabase
import kotlin.test.Ignore
import kotlin.test.Test

@OptIn(SurrealDbNightlyOnlyApi::class)
class KillTest {
@OptIn(SurrealDbNightlyOnlyApi::class)
@Test
fun testKill() = runTest {
cleanDatabase()
Expand Down
4 changes: 2 additions & 2 deletions src/commonTest/kotlin/LiveQueryTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class LiveQueryTest {
connection.use("test", "test")
val result = connection.query("LIVE SELECT * FROM test;")
val liveQueryId = result.first().data<String>()
val incoming = connection.subscribeToTable<TestClass>(liveQueryId)
val incoming = connection.subscribe<TestClass>(liveQueryId)

connection.create("test", "first").content(TestClass("thing", 1))
connection.create("test", "second").content(TestClass("thing", 2))
Expand Down Expand Up @@ -88,7 +88,7 @@ class LiveQueryTest {
connection.signin("root", "root")
connection.use("test", "test")
val liveQueryId = connection.live("test")
val incoming = connection.subscribeToTable<TestClass>(liveQueryId)
val incoming = connection.subscribe<TestClass>(liveQueryId)

connection.create("test", "first").content(TestClass("thing", 1))
connection.create("test", "second").content(TestClass("thing", 2))
Expand Down

0 comments on commit bc30507

Please sign in to comment.