Skip to content

Commit

Permalink
Implement rateLimit configuration
Browse files Browse the repository at this point in the history
Very similar to throttle implementation in #66
  • Loading branch information
albertchae committed Sep 11, 2024
1 parent bf5da4f commit c925bd6
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.inngest.springbootdemo.testfunctions;

import com.inngest.FunctionContext;
import com.inngest.InngestFunction;
import com.inngest.InngestFunctionConfigBuilder;
import com.inngest.Step;
import org.jetbrains.annotations.NotNull;

import java.time.Duration;

public class RateLimitedFunction extends InngestFunction {

@NotNull
@Override
public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) {
return builder
.id("RateLimitedFunction")
.name("RateLimited Function")
.triggerEvent("test/rateLimit")
.rateLimit(3, Duration.ofSeconds(2));
}

@Override
public Integer execute(FunctionContext ctx, Step step) {
return step.run("result", () -> 42, Integer.class);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ protected HashMap<String, InngestFunction> functions() {
addInngestFunction(functions, new InvokeFailureFunction());
addInngestFunction(functions, new TryCatchRunFunction());
addInngestFunction(functions, new ThrottledFunction());
addInngestFunction(functions, new RateLimitedFunction());
addInngestFunction(functions, new DebouncedFunction());
addInngestFunction(functions, new PriorityFunction());
addInngestFunction(functions, new IdempotentFunction());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.inngest.springbootdemo;

import com.inngest.Inngest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.springframework.beans.factory.annotation.Autowired;

import static org.junit.jupiter.api.Assertions.assertEquals;

@IntegrationTest
@Execution(ExecutionMode.CONCURRENT)
class RateLimitedFunctionIntegrationTest {
@Autowired
private DevServerComponent devServer;

@Autowired
private Inngest client;

@Test
void testFunctionIsRateLimited() throws Exception {
String event1 = InngestFunctionTestHelpers.sendEvent(client, "test/rateLimit").getIds()[0];
Thread.sleep(500);
String event2 = InngestFunctionTestHelpers.sendEvent(client, "test/rateLimit").getIds()[0];
Thread.sleep(500);
String event3 = InngestFunctionTestHelpers.sendEvent(client, "test/rateLimit").getIds()[0];

Thread.sleep(2000);

// Rate limit should only allow the first 2 events to run
assertEquals("Completed", devServer.runsByEvent(event1).first().getStatus());
assertEquals("Completed", devServer.runsByEvent(event2).first().getStatus());
assertEquals(0, devServer.runsByEvent(event3).data.length);

// new event after the rate limit period will run, but the previously skipped event will stay skipped
String event4 = InngestFunctionTestHelpers.sendEvent(client, "test/rateLimit").getIds()[0];
Thread.sleep(2000);

assertEquals(0, devServer.runsByEvent(event3).data.length);
assertEquals("Completed", devServer.runsByEvent(event4).first().getStatus());
}
}
2 changes: 2 additions & 0 deletions inngest/src/main/kotlin/com/inngest/Function.kt
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ internal class InternalFunctionConfig
@Json(serializeNull = false)
val throttle: Throttle? = null,
@Json(serializeNull = false)
val rateLimit: RateLimit? = null,
@Json(serializeNull = false)
val debounce: Debounce? = null,
@Json(serializeNull = false)
val priority: Priority? = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class InngestFunctionConfigBuilder {
private var concurrency: MutableList<Concurrency>? = null
private var retries = 3
private var throttle: Throttle? = null
private var rateLimit: RateLimit? = null
private var debounce: Debounce? = null
private var priority: Priority? = null
private var idempotency: String? = null
Expand Down Expand Up @@ -158,6 +159,21 @@ class InngestFunctionConfigBuilder {
burst: Int? = null,
): InngestFunctionConfigBuilder = apply { this.throttle = Throttle(limit, period, key, burst) }

/**
* Configure function rate limit
*
* @param limit The number of times to allow the function to run per the given `period`.
* @param period The period of time to allow the function to run `limit` times. The period begins when the first matching event
* is received
* @param key An optional expression to use for rate limiting, similar to idempotency.
*/
@JvmOverloads
fun rateLimit(
limit: Int,
period: Duration,
key: String? = null,
): InngestFunctionConfigBuilder = apply { this.rateLimit = RateLimit(limit, period, key) }

/**
* Debounce delays functions for the `period` specified. If an event is sent,
* the function will not run until at least `period` has elapsed.
Expand Down Expand Up @@ -235,6 +251,7 @@ class InngestFunctionConfigBuilder {
triggers,
concurrency,
throttle,
rateLimit,
debounce,
priority,
idempotency,
Expand Down Expand Up @@ -307,6 +324,16 @@ internal data class Throttle
val burst: Int? = null,
)

internal data class RateLimit
@JvmOverloads
constructor(
val limit: Int,
@KlaxonDuration
val period: Duration,
@Json(serializeNull = false)
val key: String? = null,
)

internal data class Debounce
@JvmOverloads
constructor(
Expand Down

0 comments on commit c925bd6

Please sign in to comment.