Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.inngest.springbootdemo.testfunctions;

import com.inngest.FunctionContext;
import com.inngest.InngestFunction;
import com.inngest.InngestFunctionConfigBuilder;
import com.inngest.Step;
import org.jetbrains.annotations.NotNull;

public class CancelOnEventFunction extends InngestFunction {

@NotNull
@Override
public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) {
return builder
.id("cancelable-fn")
.name("Cancelable Function")
.cancelOn("cancel/cancelable")
.triggerEvent("test/cancelable");
}

@Override
public String execute(FunctionContext ctx, Step step) {
step.waitForEvent("wait-forever",
"test/waiting-for-godot",
"10m",
null);

return "I didn't get canceled";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.inngest.springbootdemo.testfunctions;

import com.inngest.FunctionContext;
import com.inngest.InngestFunction;
import com.inngest.InngestFunctionConfigBuilder;
import com.inngest.Step;
import org.jetbrains.annotations.NotNull;


public class CancelOnMatchFunction extends InngestFunction {

@NotNull
@Override
public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) {
return builder
.id("cancel-on-match-fn")
.name("Cancel On Match Function")
.cancelOn("cancel/cancel-on-match", "event.data.userId == async.data.userId")
.triggerEvent("test/cancel-on-match");
}

@Override
public String execute(FunctionContext ctx, Step step) {
step.waitForEvent("wait-forever",
"test/waiting-for-godot",
"10m",
null);

return "I didn't get canceled";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.inngest.springbootdemo;

import com.inngest.Inngest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.Collections;

import static org.junit.jupiter.api.Assertions.assertEquals;

@IntegrationTest
@Execution(ExecutionMode.CONCURRENT)
class CancellationIntegrationTest {
@Autowired
private DevServerComponent devServer;

@Autowired
private Inngest client;

@Test
void testCancelOnEventOnly() throws Exception {
String event = InngestFunctionTestHelpers.sendEvent(client, "test/cancelable").getIds()[0];
Thread.sleep(1000);
InngestFunctionTestHelpers.sendEvent(client, "cancel/cancelable");
Thread.sleep(1000);

RunEntry<Object> run = devServer.runsByEvent(event).first();

assertEquals("Cancelled", run.getStatus());
}

@Test
void testCancelOnIf() throws Exception {
String user23Event = InngestFunctionTestHelpers.sendEvent(client, "test/cancel-on-match", Collections.singletonMap("userId", "23")).getIds()[0];
String user42Event = InngestFunctionTestHelpers.sendEvent(client, "test/cancel-on-match", Collections.singletonMap("userId", "42")).getIds()[0];
Thread.sleep(1000);
InngestFunctionTestHelpers.sendEvent(client, "cancel/cancel-on-match", Collections.singletonMap("userId", "42"));
Thread.sleep(1000);

// Only the event matching the if expression is canceled
assertEquals("Running", devServer.runsByEvent(user23Event).first().getStatus());
assertEquals("Cancelled", devServer.runsByEvent(user42Event).first().getStatus());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ protected HashMap<String, InngestFunction> functions() {
addInngestFunction(functions, new MultiplyMatrixFunction());
addInngestFunction(functions, new WithOnFailureFunction());
addInngestFunction(functions, new LoopFunction());
addInngestFunction(functions, new CancelOnEventFunction());
addInngestFunction(functions, new CancelOnMatchFunction());

return functions;
}
Expand Down
2 changes: 2 additions & 0 deletions inngest/src/main/kotlin/com/inngest/Function.kt
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ internal class InternalFunctionConfig
@Json(serializeNull = false)
val idempotency: String? = null,
@Json(serializeNull = false)
val cancel: MutableList<Cancellation>? = null,
@Json(serializeNull = false)
val batchEvents: BatchEvents? = null,
val steps: Map<String, StepConfig>,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.beust.klaxon.Json
import com.beust.klaxon.JsonValue
import com.beust.klaxon.KlaxonException
import java.time.Duration
import java.time.Instant

// TODO: Throw illegal argument exception
class InngestFunctionConfigBuilder {
Expand All @@ -18,6 +19,7 @@ class InngestFunctionConfigBuilder {
private var debounce: Debounce? = null
private var priority: Priority? = null
private var idempotency: String? = null
private var cancel: MutableList<Cancellation>? = null
private var batchEvents: BatchEvents? = null

/**
Expand Down Expand Up @@ -84,6 +86,68 @@ class InngestFunctionConfigBuilder {
return this
}

/**
* Define events that can be used to cancel a running or sleeping function
*
* @param event The name of the event that should cancel the function run.
* @param if The CEL expression that must evaluate to true in order to cancel the function run. There
* are two variables available in this expression:
* - event, referencing the original function's event trigger
* - async, referencing the new cancel event.
* @param timeout An optional timeout specified as a Duration that the cancel is valid for. If this isn't
* specified, cancellation triggers are valid for up to a year or until the
* function ends.
*/
@JvmOverloads // Can only overload one of the cancelOn signatures because they would clash and not compile otherwise
fun cancelOn(
event: String,
`if`: String? = null,
timeout: Duration? = null,
): InngestFunctionConfigBuilder {
return cancelOn(
Cancellation(
event,
`if`,
timeout?.let { durationConverter.toJson(it) },
),
)
}

/**
* Define events that can be used to cancel a running or sleeping function
*
* @param event The name of the event that should cancel the function run.
* @param if The CEL expression that must evaluate to true in order to cancel the function run. There
* are two variables available in this expression:
* - event, referencing the original function's event trigger
* - async, referencing the new cancel event.
* @param timeout An optional timeout specified as an Instant that the cancel is valid until. If this isn't
* specified, cancellation triggers are valid for up to a year or until the
* function ends.
*/
fun cancelOn(
event: String,
`if`: String? = null,
timeout: Instant? = null,
): InngestFunctionConfigBuilder {
return cancelOn(
Cancellation(
event,
`if`,
timeout?.let { timeout.toString() },
),
)
}

internal fun cancelOn(cancellation: Cancellation): InngestFunctionConfigBuilder =
apply {
if (this.cancel == null) {
this.cancel = mutableListOf(cancellation)
} else {
this.cancel!!.add(cancellation)
}
}

/**
* Configure the function to be executed with batches of events (1 to n).
* Events will be added into a batch until the maxSize has been reached or
Expand Down Expand Up @@ -261,6 +325,7 @@ class InngestFunctionConfigBuilder {
debounce,
priority,
idempotency,
cancel,
batchEvents,
steps = buildSteps(serverUrl),
)
Expand Down Expand Up @@ -357,6 +422,16 @@ internal data class Priority
val run: String,
)

internal data class Cancellation
@JvmOverloads
constructor(
val event: String,
@Json(serializeNull = false)
val `if`: String? = null,
@Json(serializeNull = false)
val timeout: String? = null,
)

internal data class BatchEvents
@JvmOverloads
constructor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ abstract class InngestFunctionTrigger // or interface or data class
@Json(serializeNull = false) val event: String? = null,
@Json(serializeNull = false, name = "expression") val `if`: String? = null,
@Json(serializeNull = false) val cron: String? = null,
// IDEA - Add timeout and re-use for cancelOn?
)

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.inngest

import java.time.Duration
import java.time.Instant
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
Expand Down Expand Up @@ -85,4 +87,23 @@ class InngestFunctionConfigBuilderTest {
.build("app-id", "https://mysite.com/api/inngest")
}
}

@Test
fun testCancelOnTimeout() {
val durationConfig =
InngestFunctionConfigBuilder()
.id("test-id")
.cancelOn("cancel", null, Duration.ofSeconds(6000))
.build("app-id", "https://mysite.com/api/inngest")

assertEquals<List<Cancellation>?>(listOf(Cancellation("cancel", null, "\"6000s\"")), durationConfig.cancel)
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The klaxon serializer for this adds inner quotes to the value

override fun toJson(value: Any): String = """"${(value as Duration).seconds}s""""
. I tried removing them and several tests broke, so I went with escaping them here.


val instantConfig =
InngestFunctionConfigBuilder()
.id("test-id")
.cancelOn("cancel", null, Instant.ofEpochSecond(1726056053))
.build("app-id", "https://mysite.com/api/inngest")

assertEquals<List<Cancellation>?>(listOf(Cancellation("cancel", null, "2024-09-11T12:00:53Z")), instantConfig.cancel)
}
}