From ccddba014a8d9246762e94a81deb7fe8bb31a1e4 Mon Sep 17 00:00:00 2001 From: albertchae <217050+albertchae@users.noreply.github.com> Date: Thu, 12 Sep 2024 18:51:02 -0700 Subject: [PATCH] Add cancelOn configuration (#82) - followed similar pattern to `InngestFunctionTriggers` but didn't reuse that class for a few reasons - `if` concept is similar but for triggers it gets serialized as `expression` but is `if` for cancel - `timeout` is only used by cancel and not for triggers and vice versa with `cron`, so it seems the `InngestFunctionTrigger` class's fields would be sparsely populated for all the concrete cases and not provide too much value in sharing code - While I like the idea of calling the "thing" that sets off Cancel as "CancelTrigger" or "CancellationTrigger", I thought this could be confusing since [triggers are their own key under configuration](https://github.com/inngest/inngest/blame/0ac11f2c312c066e517e53749052dd89fd2926ba/docs/SDK_SPEC.md#L478) - `Cancellation` name avoided "trigger" for reason above and followed pattern in https://github.com/inngest/inngest-js/blob/0e51903d5968a7287dd7e518bd5cc8acec3e6f3e/packages/inngest/src/types.ts#L901 - I originally was going to implement with `match` as well per https://www.inngest.com/docs/reference/typescript/functions/cancel-on, but I saw later that [it's currently deprecated so I stuck to `if` only](https://github.com/inngest/inngest-js/blob/0e51903d5968a7287dd7e518bd5cc8acec3e6f3e/packages/inngest/src/types.ts#L946). - Per the flexibility of timeout in https://github.com/inngest/inngest/blob/0ac11f2c312c066e517e53749052dd89fd2926ba/docs/SDK_SPEC.md?plain=1#L580-L583 The type for timeout is either `Duration` or `Instant`. --- .../testfunctions/CancelOnEventFunction.java | 30 ++++++++ .../testfunctions/CancelOnMatchFunction.java | 31 ++++++++ .../CancellationIntegrationTest.java | 46 ++++++++++++ .../springbootdemo/DemoTestConfiguration.java | 2 + .../src/main/kotlin/com/inngest/Function.kt | 2 + .../inngest/InngestFunctionConfigBuilder.kt | 75 +++++++++++++++++++ .../com/inngest/InngestFunctionTriggers.kt | 1 - .../InngestFunctionConfigBuilderTest.kt | 21 ++++++ 8 files changed, 207 insertions(+), 1 deletion(-) create mode 100644 inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/CancelOnEventFunction.java create mode 100644 inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/CancelOnMatchFunction.java create mode 100644 inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/CancellationIntegrationTest.java diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/CancelOnEventFunction.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/CancelOnEventFunction.java new file mode 100644 index 00000000..794645f9 --- /dev/null +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/CancelOnEventFunction.java @@ -0,0 +1,30 @@ +package com.inngest.springbootdemo.testfunctions; + +import com.inngest.FunctionContext; +import com.inngest.InngestFunction; +import com.inngest.InngestFunctionConfigBuilder; +import com.inngest.Step; +import org.jetbrains.annotations.NotNull; + +public class CancelOnEventFunction extends InngestFunction { + + @NotNull + @Override + public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) { + return builder + .id("cancelable-fn") + .name("Cancelable Function") + .cancelOn("cancel/cancelable") + .triggerEvent("test/cancelable"); + } + + @Override + public String execute(FunctionContext ctx, Step step) { + step.waitForEvent("wait-forever", + "test/waiting-for-godot", + "10m", + null); + + return "I didn't get canceled"; + } +} diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/CancelOnMatchFunction.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/CancelOnMatchFunction.java new file mode 100644 index 00000000..4b7ac2e2 --- /dev/null +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/CancelOnMatchFunction.java @@ -0,0 +1,31 @@ +package com.inngest.springbootdemo.testfunctions; + +import com.inngest.FunctionContext; +import com.inngest.InngestFunction; +import com.inngest.InngestFunctionConfigBuilder; +import com.inngest.Step; +import org.jetbrains.annotations.NotNull; + + +public class CancelOnMatchFunction extends InngestFunction { + + @NotNull + @Override + public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) { + return builder + .id("cancel-on-match-fn") + .name("Cancel On Match Function") + .cancelOn("cancel/cancel-on-match", "event.data.userId == async.data.userId") + .triggerEvent("test/cancel-on-match"); + } + + @Override + public String execute(FunctionContext ctx, Step step) { + step.waitForEvent("wait-forever", + "test/waiting-for-godot", + "10m", + null); + + return "I didn't get canceled"; + } +} diff --git a/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/CancellationIntegrationTest.java b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/CancellationIntegrationTest.java new file mode 100644 index 00000000..bc733e05 --- /dev/null +++ b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/CancellationIntegrationTest.java @@ -0,0 +1,46 @@ +package com.inngest.springbootdemo; + +import com.inngest.Inngest; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@IntegrationTest +@Execution(ExecutionMode.CONCURRENT) +class CancellationIntegrationTest { + @Autowired + private DevServerComponent devServer; + + @Autowired + private Inngest client; + + @Test + void testCancelOnEventOnly() throws Exception { + String event = InngestFunctionTestHelpers.sendEvent(client, "test/cancelable").getIds()[0]; + Thread.sleep(1000); + InngestFunctionTestHelpers.sendEvent(client, "cancel/cancelable"); + Thread.sleep(1000); + + RunEntry run = devServer.runsByEvent(event).first(); + + assertEquals("Cancelled", run.getStatus()); + } + + @Test + void testCancelOnIf() throws Exception { + String user23Event = InngestFunctionTestHelpers.sendEvent(client, "test/cancel-on-match", Collections.singletonMap("userId", "23")).getIds()[0]; + String user42Event = InngestFunctionTestHelpers.sendEvent(client, "test/cancel-on-match", Collections.singletonMap("userId", "42")).getIds()[0]; + Thread.sleep(1000); + InngestFunctionTestHelpers.sendEvent(client, "cancel/cancel-on-match", Collections.singletonMap("userId", "42")); + Thread.sleep(1000); + + // Only the event matching the if expression is canceled + assertEquals("Running", devServer.runsByEvent(user23Event).first().getStatus()); + assertEquals("Cancelled", devServer.runsByEvent(user42Event).first().getStatus()); + } +} diff --git a/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/DemoTestConfiguration.java b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/DemoTestConfiguration.java index 707a05c8..046606ac 100644 --- a/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/DemoTestConfiguration.java +++ b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/DemoTestConfiguration.java @@ -32,6 +32,8 @@ protected HashMap functions() { addInngestFunction(functions, new MultiplyMatrixFunction()); addInngestFunction(functions, new WithOnFailureFunction()); addInngestFunction(functions, new LoopFunction()); + addInngestFunction(functions, new CancelOnEventFunction()); + addInngestFunction(functions, new CancelOnMatchFunction()); return functions; } diff --git a/inngest/src/main/kotlin/com/inngest/Function.kt b/inngest/src/main/kotlin/com/inngest/Function.kt index 0a1a01a8..40791afd 100644 --- a/inngest/src/main/kotlin/com/inngest/Function.kt +++ b/inngest/src/main/kotlin/com/inngest/Function.kt @@ -89,6 +89,8 @@ internal class InternalFunctionConfig @Json(serializeNull = false) val idempotency: String? = null, @Json(serializeNull = false) + val cancel: MutableList? = null, + @Json(serializeNull = false) val batchEvents: BatchEvents? = null, val steps: Map, ) diff --git a/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt b/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt index 5c8c0d09..ff979e37 100644 --- a/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt +++ b/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt @@ -5,6 +5,7 @@ import com.beust.klaxon.Json import com.beust.klaxon.JsonValue import com.beust.klaxon.KlaxonException import java.time.Duration +import java.time.Instant // TODO: Throw illegal argument exception class InngestFunctionConfigBuilder { @@ -18,6 +19,7 @@ class InngestFunctionConfigBuilder { private var debounce: Debounce? = null private var priority: Priority? = null private var idempotency: String? = null + private var cancel: MutableList? = null private var batchEvents: BatchEvents? = null /** @@ -84,6 +86,68 @@ class InngestFunctionConfigBuilder { return this } + /** + * Define events that can be used to cancel a running or sleeping function + * + * @param event The name of the event that should cancel the function run. + * @param if The CEL expression that must evaluate to true in order to cancel the function run. There + * are two variables available in this expression: + * - event, referencing the original function's event trigger + * - async, referencing the new cancel event. + * @param timeout An optional timeout specified as a Duration that the cancel is valid for. If this isn't + * specified, cancellation triggers are valid for up to a year or until the + * function ends. + */ + @JvmOverloads // Can only overload one of the cancelOn signatures because they would clash and not compile otherwise + fun cancelOn( + event: String, + `if`: String? = null, + timeout: Duration? = null, + ): InngestFunctionConfigBuilder { + return cancelOn( + Cancellation( + event, + `if`, + timeout?.let { durationConverter.toJson(it) }, + ), + ) + } + + /** + * Define events that can be used to cancel a running or sleeping function + * + * @param event The name of the event that should cancel the function run. + * @param if The CEL expression that must evaluate to true in order to cancel the function run. There + * are two variables available in this expression: + * - event, referencing the original function's event trigger + * - async, referencing the new cancel event. + * @param timeout An optional timeout specified as an Instant that the cancel is valid until. If this isn't + * specified, cancellation triggers are valid for up to a year or until the + * function ends. + */ + fun cancelOn( + event: String, + `if`: String? = null, + timeout: Instant? = null, + ): InngestFunctionConfigBuilder { + return cancelOn( + Cancellation( + event, + `if`, + timeout?.let { timeout.toString() }, + ), + ) + } + + internal fun cancelOn(cancellation: Cancellation): InngestFunctionConfigBuilder = + apply { + if (this.cancel == null) { + this.cancel = mutableListOf(cancellation) + } else { + this.cancel!!.add(cancellation) + } + } + /** * Configure the function to be executed with batches of events (1 to n). * Events will be added into a batch until the maxSize has been reached or @@ -261,6 +325,7 @@ class InngestFunctionConfigBuilder { debounce, priority, idempotency, + cancel, batchEvents, steps = buildSteps(serverUrl), ) @@ -357,6 +422,16 @@ internal data class Priority val run: String, ) +internal data class Cancellation + @JvmOverloads + constructor( + val event: String, + @Json(serializeNull = false) + val `if`: String? = null, + @Json(serializeNull = false) + val timeout: String? = null, + ) + internal data class BatchEvents @JvmOverloads constructor( diff --git a/inngest/src/main/kotlin/com/inngest/InngestFunctionTriggers.kt b/inngest/src/main/kotlin/com/inngest/InngestFunctionTriggers.kt index fbe0913d..6dcd5a8e 100644 --- a/inngest/src/main/kotlin/com/inngest/InngestFunctionTriggers.kt +++ b/inngest/src/main/kotlin/com/inngest/InngestFunctionTriggers.kt @@ -11,7 +11,6 @@ abstract class InngestFunctionTrigger // or interface or data class @Json(serializeNull = false) val event: String? = null, @Json(serializeNull = false, name = "expression") val `if`: String? = null, @Json(serializeNull = false) val cron: String? = null, - // IDEA - Add timeout and re-use for cancelOn? ) /** diff --git a/inngest/src/test/kotlin/com/inngest/InngestFunctionConfigBuilderTest.kt b/inngest/src/test/kotlin/com/inngest/InngestFunctionConfigBuilderTest.kt index 05b4f37d..1bbbb561 100644 --- a/inngest/src/test/kotlin/com/inngest/InngestFunctionConfigBuilderTest.kt +++ b/inngest/src/test/kotlin/com/inngest/InngestFunctionConfigBuilderTest.kt @@ -1,5 +1,7 @@ package com.inngest +import java.time.Duration +import java.time.Instant import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertFailsWith @@ -85,4 +87,23 @@ class InngestFunctionConfigBuilderTest { .build("app-id", "https://mysite.com/api/inngest") } } + + @Test + fun testCancelOnTimeout() { + val durationConfig = + InngestFunctionConfigBuilder() + .id("test-id") + .cancelOn("cancel", null, Duration.ofSeconds(6000)) + .build("app-id", "https://mysite.com/api/inngest") + + assertEquals?>(listOf(Cancellation("cancel", null, "\"6000s\"")), durationConfig.cancel) + + val instantConfig = + InngestFunctionConfigBuilder() + .id("test-id") + .cancelOn("cancel", null, Instant.ofEpochSecond(1726056053)) + .build("app-id", "https://mysite.com/api/inngest") + + assertEquals?>(listOf(Cancellation("cancel", null, "2024-09-11T12:00:53Z")), instantConfig.cancel) + } }