From bf5da4fe94b802426c8f9c2cba7c19908f32c767 Mon Sep 17 00:00:00 2001 From: albertchae <217050+albertchae@users.noreply.github.com> Date: Mon, 9 Sep 2024 21:47:38 -0700 Subject: [PATCH] Add idempotency configuration (#80) Copied doc from https://github.com/inngest/inngest/blob/6594ccb692121bfe62b11676fe002416df13300b/docs/SDK_SPEC.md?plain=1#L534-L538 https://www.inngest.com/docs/guides/handling-idempotency#at-the-function-level-the-consumer The integration test validates idempotency by - checking an observable side effect, in this case incrementing a static counter variable - check that the second run of the same idempotency was ignored by the dev server --- .../testfunctions/IdempotentFunction.java | 25 +++++++++ .../springbootdemo/DemoTestConfiguration.java | 1 + .../IdempotentFunctionIntegrationTest.java | 51 +++++++++++++++++++ .../InngestFunctionTestHelpers.java | 7 ++- .../src/main/kotlin/com/inngest/Function.kt | 3 ++ .../inngest/InngestFunctionConfigBuilder.kt | 12 ++++- 6 files changed, 97 insertions(+), 2 deletions(-) create mode 100644 inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/IdempotentFunction.java create mode 100644 inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/IdempotentFunctionIntegrationTest.java diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/IdempotentFunction.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/IdempotentFunction.java new file mode 100644 index 00000000..ece2d147 --- /dev/null +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/IdempotentFunction.java @@ -0,0 +1,25 @@ +package com.inngest.springbootdemo.testfunctions; + +import com.inngest.*; +import lombok.Getter; +import org.jetbrains.annotations.NotNull; + +public class IdempotentFunction extends InngestFunction { + + @NotNull + @Override + public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) { + return builder + .id("idempotent-fn") + .name("Idempotent Function") + .triggerEvent("test/idempotent") + .idempotency("event.data.companyId"); + } + + @Getter + private static int counter = 0; + @Override + public Integer execute(FunctionContext ctx, Step step) { + return step.run("increment-count", () -> counter++, Integer.class); + } +} diff --git a/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/DemoTestConfiguration.java b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/DemoTestConfiguration.java index c782dc31..d4614f0f 100644 --- a/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/DemoTestConfiguration.java +++ b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/DemoTestConfiguration.java @@ -26,6 +26,7 @@ protected HashMap functions() { addInngestFunction(functions, new ThrottledFunction()); addInngestFunction(functions, new DebouncedFunction()); addInngestFunction(functions, new PriorityFunction()); + addInngestFunction(functions, new IdempotentFunction()); return functions; } diff --git a/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/IdempotentFunctionIntegrationTest.java b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/IdempotentFunctionIntegrationTest.java new file mode 100644 index 00000000..21caa47c --- /dev/null +++ b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/IdempotentFunctionIntegrationTest.java @@ -0,0 +1,51 @@ +package com.inngest.springbootdemo; + +import com.inngest.Inngest; +import com.inngest.springbootdemo.testfunctions.IdempotentFunction; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.Collections; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@IntegrationTest +@Execution(ExecutionMode.CONCURRENT) +class IdempotentFunctionIntegrationTest { + @Autowired + private DevServerComponent devServer; + + @Autowired + private Inngest client; + + @Test + void testIdempotencyKey() throws Exception { + Map dataPayload = Collections.singletonMap("companyId", 42); + String eventWithIdempotencyKey = InngestFunctionTestHelpers.sendEvent(client, "test/idempotent", dataPayload).getIds()[0]; + String eventWithSameIdempotencyKey = InngestFunctionTestHelpers.sendEvent(client, "test/idempotent", dataPayload).getIds()[0]; + + Thread.sleep(2000); + + // With the same idempotency key, only one of the events should have run + RunEntry firstRun = devServer.runsByEvent(eventWithIdempotencyKey).first(); + assertEquals(0, devServer.runsByEvent(eventWithSameIdempotencyKey).data.length); + assertEquals("Completed", firstRun.getStatus()); + + // This would be 2 if the function was not idempotent + assertEquals(1, IdempotentFunction.getCounter()); + + + Map differentDataPayload = Collections.singletonMap("companyId", 43); + String eventWithDifferentIdempotencyKey = InngestFunctionTestHelpers.sendEvent(client, "test/idempotent", differentDataPayload).getIds()[0]; + + Thread.sleep(2000); + + // Event with a different idempotency key will run once + RunEntry otherRun = devServer.runsByEvent(eventWithDifferentIdempotencyKey).first(); + assertEquals("Completed", otherRun.getStatus()); + assertEquals(2, IdempotentFunction.getCounter()); + } +} diff --git a/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/InngestFunctionTestHelpers.java b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/InngestFunctionTestHelpers.java index 8f0f3b6d..cce75134 100644 --- a/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/InngestFunctionTestHelpers.java +++ b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/InngestFunctionTestHelpers.java @@ -3,12 +3,17 @@ import com.inngest.*; import java.util.HashMap; +import java.util.Map; import java.util.Objects; public class InngestFunctionTestHelpers { static SendEventsResponse sendEvent(Inngest inngest, String eventName) { - InngestEvent event = new InngestEvent(eventName, new HashMap()); + return sendEvent(inngest, eventName, new HashMap()); + } + + static SendEventsResponse sendEvent(Inngest inngest, String eventName, Map data) { + InngestEvent event = new InngestEvent(eventName, data); SendEventsResponse response = inngest.send(event); assert Objects.requireNonNull(response).getIds().length > 0; diff --git a/inngest/src/main/kotlin/com/inngest/Function.kt b/inngest/src/main/kotlin/com/inngest/Function.kt index 92c1538a..f1a83171 100644 --- a/inngest/src/main/kotlin/com/inngest/Function.kt +++ b/inngest/src/main/kotlin/com/inngest/Function.kt @@ -82,8 +82,11 @@ internal class InternalFunctionConfig val throttle: Throttle? = null, @Json(serializeNull = false) val debounce: Debounce? = null, + @Json(serializeNull = false) val priority: Priority? = null, @Json(serializeNull = false) + val idempotency: String? = null, + @Json(serializeNull = false) val batchEvents: BatchEvents? = null, val steps: Map, ) diff --git a/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt b/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt index 30e5bf3b..04b480f6 100644 --- a/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt +++ b/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt @@ -16,6 +16,7 @@ class InngestFunctionConfigBuilder { private var throttle: Throttle? = null private var debounce: Debounce? = null private var priority: Priority? = null + private var idempotency: String? = null private var batchEvents: BatchEvents? = null /** @@ -182,7 +183,6 @@ class InngestFunctionConfigBuilder { ): InngestFunctionConfigBuilder = apply { this.debounce = Debounce(period, key, timeout) } /** - * * Configure how the priority of a function run is decided when multiple * functions are triggered at the same time. * @@ -194,6 +194,15 @@ class InngestFunctionConfigBuilder { */ fun priority(run: String): InngestFunctionConfigBuilder = apply { this.priority = Priority(run) } + /** + * Allow the specification of an idempotency key using event data. If + * specified, this overrides the `rateLimit` object. + * + * @param idempotencyKey An expression using event payload data for a + * unique string key for idempotency. + */ + fun idempotency(idempotencyKey: String): InngestFunctionConfigBuilder = apply { this.idempotency = idempotencyKey } + private fun buildSteps(serveUrl: String): Map { val scheme = serveUrl.split("://")[0] return mapOf( @@ -228,6 +237,7 @@ class InngestFunctionConfigBuilder { throttle, debounce, priority, + idempotency, batchEvents, steps = buildSteps(serverUrl), )