Skip to content

Commit

Permalink
Implement rateLimit configuration (#84)
Browse files Browse the repository at this point in the history
Very similar to throttle implementation in #66

The test timings need to account for GCRA buckets https://www.inngest.com/docs/guides/rate-limiting#how-rate-limiting-works
  • Loading branch information
albertchae authored Sep 13, 2024
1 parent befe29b commit 2e01336
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.inngest.springbootdemo.testfunctions;

import com.inngest.FunctionContext;
import com.inngest.InngestFunction;
import com.inngest.InngestFunctionConfigBuilder;
import com.inngest.Step;
import org.jetbrains.annotations.NotNull;

import java.time.Duration;

public class RateLimitedFunction extends InngestFunction {

@NotNull
@Override
public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) {
return builder
.id("RateLimitedFunction")
.name("RateLimited Function")
.triggerEvent("test/rateLimit")
.rateLimit(2, Duration.ofSeconds(6));
}

@Override
public Integer execute(FunctionContext ctx, Step step) {
return step.run("result", () -> 42, Integer.class);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ protected HashMap<String, InngestFunction> functions() {
addInngestFunction(functions, new InvokeFailureFunction());
addInngestFunction(functions, new TryCatchRunFunction());
addInngestFunction(functions, new ThrottledFunction());
addInngestFunction(functions, new RateLimitedFunction());
addInngestFunction(functions, new DebouncedFunction());
addInngestFunction(functions, new PriorityFunction());
addInngestFunction(functions, new IdempotentFunction());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.inngest.springbootdemo;

import com.inngest.Inngest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.springframework.beans.factory.annotation.Autowired;

import static org.junit.jupiter.api.Assertions.assertEquals;

@IntegrationTest
@Execution(ExecutionMode.CONCURRENT)
class RateLimitedFunctionIntegrationTest {
@Autowired
private DevServerComponent devServer;

@Autowired
private Inngest client;

@Test
void testFunctionIsRateLimited() throws Exception {
String event1 = InngestFunctionTestHelpers.sendEvent(client, "test/rateLimit").getIds()[0];
// Rate limit test function is limited to 2 over 6 seconds. Based on the simplistic description of GCRA in
// https://www.inngest.com/docs/guides/rate-limiting#how-rate-limiting-works
// we need to sleep at least 3 seconds here for the second event not to get rate limited
Thread.sleep(3500);
String event2 = InngestFunctionTestHelpers.sendEvent(client, "test/rateLimit").getIds()[0];
Thread.sleep(1000);
String event3 = InngestFunctionTestHelpers.sendEvent(client, "test/rateLimit").getIds()[0];

// Sleep at least 6 seconds for the rate limit bucket to be completely cleared
Thread.sleep(6000);

// Rate limit should only allow the first 2 events to run
assertEquals("Completed", devServer.runsByEvent(event1).first().getStatus());
assertEquals("Completed", devServer.runsByEvent(event2).first().getStatus());
assertEquals(0, devServer.runsByEvent(event3).data.length);

// new event after the rate limit period will run, but the previously skipped event will stay skipped
String event4 = InngestFunctionTestHelpers.sendEvent(client, "test/rateLimit").getIds()[0];
Thread.sleep(4000);

assertEquals(0, devServer.runsByEvent(event3).data.length);
assertEquals("Completed", devServer.runsByEvent(event4).first().getStatus());
}
}
2 changes: 2 additions & 0 deletions inngest/src/main/kotlin/com/inngest/Function.kt
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ internal class InternalFunctionConfig
@Json(serializeNull = false)
val throttle: Throttle? = null,
@Json(serializeNull = false)
val rateLimit: RateLimit? = null,
@Json(serializeNull = false)
val debounce: Debounce? = null,
@Json(serializeNull = false)
val priority: Priority? = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class InngestFunctionConfigBuilder {
private var concurrency: MutableList<Concurrency>? = null
private var retries = 3
private var throttle: Throttle? = null
private var rateLimit: RateLimit? = null
private var debounce: Debounce? = null
private var priority: Priority? = null
private var idempotency: String? = null
Expand Down Expand Up @@ -164,6 +165,21 @@ class InngestFunctionConfigBuilder {
burst: Int? = null,
): InngestFunctionConfigBuilder = apply { this.throttle = Throttle(limit, period, key, burst) }

/**
* Configure function rate limit
*
* @param limit The number of times to allow the function to run per the given `period`.
* @param period The period of time to allow the function to run `limit` times. The period begins when the first matching event
* is received
* @param key An optional expression to use for rate limiting, similar to idempotency.
*/
@JvmOverloads
fun rateLimit(
limit: Int,
period: Duration,
key: String? = null,
): InngestFunctionConfigBuilder = apply { this.rateLimit = RateLimit(limit, period, key) }

/**
* Debounce delays functions for the `period` specified. If an event is sent,
* the function will not run until at least `period` has elapsed.
Expand Down Expand Up @@ -241,6 +257,7 @@ class InngestFunctionConfigBuilder {
triggers,
concurrency,
throttle,
rateLimit,
debounce,
priority,
idempotency,
Expand Down Expand Up @@ -313,6 +330,16 @@ internal data class Throttle
val burst: Int? = null,
)

internal data class RateLimit
@JvmOverloads
constructor(
val limit: Int,
@KlaxonDuration
val period: Duration,
@Json(serializeNull = false)
val key: String? = null,
)

internal data class Debounce
@JvmOverloads
constructor(
Expand Down

0 comments on commit 2e01336

Please sign in to comment.