diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/CancelOnEventFunction.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/CancelOnEventFunction.java new file mode 100644 index 00000000..794645f9 --- /dev/null +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/CancelOnEventFunction.java @@ -0,0 +1,30 @@ +package com.inngest.springbootdemo.testfunctions; + +import com.inngest.FunctionContext; +import com.inngest.InngestFunction; +import com.inngest.InngestFunctionConfigBuilder; +import com.inngest.Step; +import org.jetbrains.annotations.NotNull; + +public class CancelOnEventFunction extends InngestFunction { + + @NotNull + @Override + public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) { + return builder + .id("cancelable-fn") + .name("Cancelable Function") + .cancelOn("cancel/cancelable") + .triggerEvent("test/cancelable"); + } + + @Override + public String execute(FunctionContext ctx, Step step) { + step.waitForEvent("wait-forever", + "test/waiting-for-godot", + "10m", + null); + + return "I didn't get canceled"; + } +} diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/CancelOnMatchFunction.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/CancelOnMatchFunction.java new file mode 100644 index 00000000..4b7ac2e2 --- /dev/null +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/CancelOnMatchFunction.java @@ -0,0 +1,31 @@ +package com.inngest.springbootdemo.testfunctions; + +import com.inngest.FunctionContext; +import com.inngest.InngestFunction; +import com.inngest.InngestFunctionConfigBuilder; +import com.inngest.Step; +import org.jetbrains.annotations.NotNull; + + +public class CancelOnMatchFunction extends InngestFunction { + + @NotNull + @Override + public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) { + return builder + .id("cancel-on-match-fn") + .name("Cancel On Match Function") + .cancelOn("cancel/cancel-on-match", "event.data.userId == async.data.userId") + .triggerEvent("test/cancel-on-match"); + } + + @Override + public String execute(FunctionContext ctx, Step step) { + step.waitForEvent("wait-forever", + "test/waiting-for-godot", + "10m", + null); + + return "I didn't get canceled"; + } +} diff --git a/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/CancellationIntegrationTest.java b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/CancellationIntegrationTest.java new file mode 100644 index 00000000..bc733e05 --- /dev/null +++ b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/CancellationIntegrationTest.java @@ -0,0 +1,46 @@ +package com.inngest.springbootdemo; + +import com.inngest.Inngest; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@IntegrationTest +@Execution(ExecutionMode.CONCURRENT) +class CancellationIntegrationTest { + @Autowired + private DevServerComponent devServer; + + @Autowired + private Inngest client; + + @Test + void testCancelOnEventOnly() throws Exception { + String event = InngestFunctionTestHelpers.sendEvent(client, "test/cancelable").getIds()[0]; + Thread.sleep(1000); + InngestFunctionTestHelpers.sendEvent(client, "cancel/cancelable"); + Thread.sleep(1000); + + RunEntry run = devServer.runsByEvent(event).first(); + + assertEquals("Cancelled", run.getStatus()); + } + + @Test + void testCancelOnIf() throws Exception { + String user23Event = InngestFunctionTestHelpers.sendEvent(client, "test/cancel-on-match", Collections.singletonMap("userId", "23")).getIds()[0]; + String user42Event = InngestFunctionTestHelpers.sendEvent(client, "test/cancel-on-match", Collections.singletonMap("userId", "42")).getIds()[0]; + Thread.sleep(1000); + InngestFunctionTestHelpers.sendEvent(client, "cancel/cancel-on-match", Collections.singletonMap("userId", "42")); + Thread.sleep(1000); + + // Only the event matching the if expression is canceled + assertEquals("Running", devServer.runsByEvent(user23Event).first().getStatus()); + assertEquals("Cancelled", devServer.runsByEvent(user42Event).first().getStatus()); + } +} diff --git a/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/DemoTestConfiguration.java b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/DemoTestConfiguration.java index 707a05c8..046606ac 100644 --- a/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/DemoTestConfiguration.java +++ b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/DemoTestConfiguration.java @@ -32,6 +32,8 @@ protected HashMap functions() { addInngestFunction(functions, new MultiplyMatrixFunction()); addInngestFunction(functions, new WithOnFailureFunction()); addInngestFunction(functions, new LoopFunction()); + addInngestFunction(functions, new CancelOnEventFunction()); + addInngestFunction(functions, new CancelOnMatchFunction()); return functions; } diff --git a/inngest/src/main/kotlin/com/inngest/Function.kt b/inngest/src/main/kotlin/com/inngest/Function.kt index 0a1a01a8..40791afd 100644 --- a/inngest/src/main/kotlin/com/inngest/Function.kt +++ b/inngest/src/main/kotlin/com/inngest/Function.kt @@ -89,6 +89,8 @@ internal class InternalFunctionConfig @Json(serializeNull = false) val idempotency: String? = null, @Json(serializeNull = false) + val cancel: MutableList? = null, + @Json(serializeNull = false) val batchEvents: BatchEvents? = null, val steps: Map, ) diff --git a/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt b/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt index 5c8c0d09..ff979e37 100644 --- a/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt +++ b/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt @@ -5,6 +5,7 @@ import com.beust.klaxon.Json import com.beust.klaxon.JsonValue import com.beust.klaxon.KlaxonException import java.time.Duration +import java.time.Instant // TODO: Throw illegal argument exception class InngestFunctionConfigBuilder { @@ -18,6 +19,7 @@ class InngestFunctionConfigBuilder { private var debounce: Debounce? = null private var priority: Priority? = null private var idempotency: String? = null + private var cancel: MutableList? = null private var batchEvents: BatchEvents? = null /** @@ -84,6 +86,68 @@ class InngestFunctionConfigBuilder { return this } + /** + * Define events that can be used to cancel a running or sleeping function + * + * @param event The name of the event that should cancel the function run. + * @param if The CEL expression that must evaluate to true in order to cancel the function run. There + * are two variables available in this expression: + * - event, referencing the original function's event trigger + * - async, referencing the new cancel event. + * @param timeout An optional timeout specified as a Duration that the cancel is valid for. If this isn't + * specified, cancellation triggers are valid for up to a year or until the + * function ends. + */ + @JvmOverloads // Can only overload one of the cancelOn signatures because they would clash and not compile otherwise + fun cancelOn( + event: String, + `if`: String? = null, + timeout: Duration? = null, + ): InngestFunctionConfigBuilder { + return cancelOn( + Cancellation( + event, + `if`, + timeout?.let { durationConverter.toJson(it) }, + ), + ) + } + + /** + * Define events that can be used to cancel a running or sleeping function + * + * @param event The name of the event that should cancel the function run. + * @param if The CEL expression that must evaluate to true in order to cancel the function run. There + * are two variables available in this expression: + * - event, referencing the original function's event trigger + * - async, referencing the new cancel event. + * @param timeout An optional timeout specified as an Instant that the cancel is valid until. If this isn't + * specified, cancellation triggers are valid for up to a year or until the + * function ends. + */ + fun cancelOn( + event: String, + `if`: String? = null, + timeout: Instant? = null, + ): InngestFunctionConfigBuilder { + return cancelOn( + Cancellation( + event, + `if`, + timeout?.let { timeout.toString() }, + ), + ) + } + + internal fun cancelOn(cancellation: Cancellation): InngestFunctionConfigBuilder = + apply { + if (this.cancel == null) { + this.cancel = mutableListOf(cancellation) + } else { + this.cancel!!.add(cancellation) + } + } + /** * Configure the function to be executed with batches of events (1 to n). * Events will be added into a batch until the maxSize has been reached or @@ -261,6 +325,7 @@ class InngestFunctionConfigBuilder { debounce, priority, idempotency, + cancel, batchEvents, steps = buildSteps(serverUrl), ) @@ -357,6 +422,16 @@ internal data class Priority val run: String, ) +internal data class Cancellation + @JvmOverloads + constructor( + val event: String, + @Json(serializeNull = false) + val `if`: String? = null, + @Json(serializeNull = false) + val timeout: String? = null, + ) + internal data class BatchEvents @JvmOverloads constructor( diff --git a/inngest/src/main/kotlin/com/inngest/InngestFunctionTriggers.kt b/inngest/src/main/kotlin/com/inngest/InngestFunctionTriggers.kt index fbe0913d..6dcd5a8e 100644 --- a/inngest/src/main/kotlin/com/inngest/InngestFunctionTriggers.kt +++ b/inngest/src/main/kotlin/com/inngest/InngestFunctionTriggers.kt @@ -11,7 +11,6 @@ abstract class InngestFunctionTrigger // or interface or data class @Json(serializeNull = false) val event: String? = null, @Json(serializeNull = false, name = "expression") val `if`: String? = null, @Json(serializeNull = false) val cron: String? = null, - // IDEA - Add timeout and re-use for cancelOn? ) /** diff --git a/inngest/src/test/kotlin/com/inngest/InngestFunctionConfigBuilderTest.kt b/inngest/src/test/kotlin/com/inngest/InngestFunctionConfigBuilderTest.kt index 05b4f37d..1bbbb561 100644 --- a/inngest/src/test/kotlin/com/inngest/InngestFunctionConfigBuilderTest.kt +++ b/inngest/src/test/kotlin/com/inngest/InngestFunctionConfigBuilderTest.kt @@ -1,5 +1,7 @@ package com.inngest +import java.time.Duration +import java.time.Instant import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertFailsWith @@ -85,4 +87,23 @@ class InngestFunctionConfigBuilderTest { .build("app-id", "https://mysite.com/api/inngest") } } + + @Test + fun testCancelOnTimeout() { + val durationConfig = + InngestFunctionConfigBuilder() + .id("test-id") + .cancelOn("cancel", null, Duration.ofSeconds(6000)) + .build("app-id", "https://mysite.com/api/inngest") + + assertEquals?>(listOf(Cancellation("cancel", null, "\"6000s\"")), durationConfig.cancel) + + val instantConfig = + InngestFunctionConfigBuilder() + .id("test-id") + .cancelOn("cancel", null, Instant.ofEpochSecond(1726056053)) + .build("app-id", "https://mysite.com/api/inngest") + + assertEquals?>(listOf(Cancellation("cancel", null, "2024-09-11T12:00:53Z")), instantConfig.cancel) + } }