Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spring Cloud Gateway - TimeOut Limiter for some reason slowing down / interacting with Rate Limiter #3581

Open
dreamstar-enterprises opened this issue Nov 1, 2024 · 0 comments

Comments

@dreamstar-enterprises
Copy link

dreamstar-enterprises commented Nov 1, 2024

My Rate Limiter is a Global One shown below:

Rate Limiter

@Configuration
internal class RequestRateLimiterConfig(
    private val redisRateLimiter: RedisRateLimiter,
    private val defaultKeyResolver: KeyResolver
) {
    companion object {
        const val RATE_LIMITER_ID = "redis-rate-limiter"  // default key used by Spring Cloud Gateway
    }

    private val logger = LoggerFactory.getLogger(RequestRateLimiterConfig::class.java)

    @Bean
    fun requestRateLimiter(): GlobalFilter {

        return GlobalFilter { exchange, chain ->
            val keyMono = defaultKeyResolver.resolve(exchange)
            keyMono
                .flatMap { key ->
                    if (key.isNullOrEmpty()) {
                        // if key is null or empty, return error
                        logger.warn("Empty session ID detected. Sending error response.")
                        return@flatMap LocalExceptionHandlers.missingKey(exchange)
                    } else {
                        // if key is present, continue with rate limiting
                        logger.info("Resolved key: $key")
                        redisRateLimiter.isAllowed(RATE_LIMITER_ID, key)
                            .flatMap { response ->
                                if (!response.isAllowed) {
                                    logger.warn("Rate limit exceeded for key: $key")
                                    LocalExceptionHandlers.rateLimitExceeded(exchange)
                                } else {
                                    chain.filter(exchange)
                                }
                            }
                    }
                }.then()
        }
    }
}
/**
 * More Rate Limiter configuration
 */
@Configuration
internal class RedisRateLimiterConfig(
    private val sessionProperties: SessionProperties
) {

    private val logger = LoggerFactory.getLogger(RedisRateLimiterConfig::class.java)

    /**
     * Redis Rate Limited
     */
    @Bean
    fun redisRateLimiter(): RedisRateLimiter {
        return RedisRateLimiter(10, 20, 1)
    }

    /**
     * Default Key Resolver
     */
    @Bean
    fun defaultKeyResolver(): KeyResolver {
        return KeyResolver { exchange: ServerWebExchange ->
            val sessionId = exchange.request.cookies[sessionProperties.SESSION_COOKIE_NAME]?.first()?.value
            if (sessionId.isNullOrBlank()) {
                logger.warn("No session ID found in cookie.")
                Mono.just("")
            } else {
                logger.info("Resolved session ID for Rate Limiting: $sessionId")
                Mono.justOrEmpty(sessionId)
            }
        }
    }

}**

Load Test 1

When I do an Apache Benchmark Load test, I get the following back (as expected)

ab -v 4 -n 25 -c 1 -H 'Cookie: BFF-SESSIONID=BFF-3f3400dd34c22190cebfc18ff8eda96e7d248ea3ef10e52e8a95c4df41a37e14-162166622840154-47453-c244a0f4-9fcc-44ad-b6d8-fdd7aa965d5c' http://127.0.0.1:9090/bff/api/v1/resource/contracts/hello

Concurrency Level:      1
Time taken for tests:   2.429 seconds
Complete requests:      25
Failed requests:        0
Total transferred:      20100 bytes
HTML transferred:       325 bytes
Requests per second:    10.29 [#/sec] (mean)
Time per request:       97.149 [ms] (mean)
Time per request:       97.149 [ms] (mean, across all concurrent requests)
Transfer rate:          8.08 [Kbytes/sec] received

Time Limiter (No. 4 in the below)

However when I add a TimeLimiter to my router, I get very different results (I've isolated this being due to the TimeLimiter)

@Configuration
internal class RoutingConfig(
    private val serverProperties: ServerProperties,
    private val ignoreFilter: IgnoreFilterConfig,
) {

    private val logger = LoggerFactory.getLogger(RoutingConfig::class.java)

    @Bean
    fun routeLocator(
        builder: RouteLocatorBuilder,
        tokenRelayGatewayFilterFactory: TokenRelayGatewayFilterFactory,
        timeLimiterRegistry: TimeLimiterRegistry,
    ): RouteLocator {
        return builder.routes()

            // routing for Resource Server
            .route("resource-server") { r ->
                r.path("${serverProperties.resourceServerPrefix}/**")
                    .filters { f ->

                        // 1. Token relay filter first for authentication
                        f.filter(tokenRelayGatewayFilterFactory.apply())

                        // 2. Circuit Breaker before retry
//                        f.circuitBreaker { circuitBreakerConfig ->
//                            circuitBreakerConfig.setName("resourceServerCircuitBreaker")
//                            circuitBreakerConfig.setFallbackUri("forward:/fallback")
//                            circuitBreakerConfig.setStatusCodes(
//                                setOf(
//                                    HttpStatus.INTERNAL_SERVER_ERROR.value().toString(),            // 500
//                                    HttpStatus.NOT_IMPLEMENTED.value().toString(),                  // 501
//                                    HttpStatus.BAD_GATEWAY.value().toString(),                      // 502
//                                    HttpStatus.SERVICE_UNAVAILABLE.value().toString(),              // 503
//                                    HttpStatus.GATEWAY_TIMEOUT.value().toString(),                  // 504
//                                    HttpStatus.HTTP_VERSION_NOT_SUPPORTED.value().toString(),       // 505
//                                    HttpStatus.VARIANT_ALSO_NEGOTIATES.value().toString(),          // 506
//                                    HttpStatus.INSUFFICIENT_STORAGE.value().toString(),             // 507
//                                    HttpStatus.LOOP_DETECTED.value().toString(),                    // 508
//                                    HttpStatus.BANDWIDTH_LIMIT_EXCEEDED.value().toString(),         // 509
//                                    HttpStatus.NOT_EXTENDED.value().toString(),                     // 510
//                                    HttpStatus.NETWORK_AUTHENTICATION_REQUIRED.value().toString()   // 511
//                                )
//                            )
//                        }

                        // 3. Retry filter before timeout to allow retries
                        f.retry { retryConfig ->
                            retryConfig.retries = 3
                            retryConfig.setMethods(HttpMethod.GET)
                            retryConfig.setBackoff(
                                Duration.ofMillis(100),
                                Duration.ofMillis(1000),
                                2,
                                true
                            )
                            // add status codes that should trigger retry
                            retryConfig.setStatuses(
                                HttpStatus.INTERNAL_SERVER_ERROR,           // 500
                                HttpStatus.NOT_IMPLEMENTED,                 // 501
                                HttpStatus.BAD_GATEWAY,                     // 502
                                HttpStatus.SERVICE_UNAVAILABLE,             // 503
                                // HttpStatus.GATEWAY_TIMEOUT,              // 504
                                HttpStatus.HTTP_VERSION_NOT_SUPPORTED,      // 505
                                HttpStatus.VARIANT_ALSO_NEGOTIATES,         // 506
                                HttpStatus.INSUFFICIENT_STORAGE,            // 507
                                HttpStatus.LOOP_DETECTED,                   // 508
                                HttpStatus.BANDWIDTH_LIMIT_EXCEEDED,        // 509
                                HttpStatus.NOT_EXTENDED,                    // 510
                                HttpStatus.NETWORK_AUTHENTICATION_REQUIRED  // 511
                            )
                            // add exception types that should trigger retry
                            retryConfig.setExceptions(
                                IOException::class.java,
//                                TimeoutException::class.java,
                                ConnectException::class.java
                            )

                            retryConfig.validate()
                        }

                        f.filter { exchange, chain ->
                            val retryCount = exchange.getAttribute<Int>("retry_count") ?: 0
                            exchange.attributes["retry_count"] = retryCount + 1
                            logger.warn("Request attempt ${retryCount + 1} for ${exchange.request.uri}")
                            chain.filter(exchange)
                        }

                        // 4. Time limiter after retry
                        f.filter { exchange, chain ->

                            // exclude specific static resources from time-out limiter
                            val requestPath = exchange.request.uri.path
                            if (ignoreFilter.shouldSkipRequestPath(requestPath)) {
                                // allow the request to proceed without the time limiter
                                return@filter chain.filter(exchange)
                            }

                            val timeLimiter = timeLimiterRegistry.timeLimiter("resourceServerTimeLimiter")
                            val startTime = System.currentTimeMillis()

                            val futureSupplier = Supplier {
                                chain.filter(exchange)
                                .doOnSuccess {
                                    val duration = System.currentTimeMillis() - startTime
                                    logger.debug("Request completed within time limit for path: $requestPath, duration: ${duration}ms")
                                }
                                .doOnCancel {
                                    // Mark exchange as completed on cancellation
                                    exchange.attributes["cancelled"] = true
                                }
                                .toFuture()
                            }

                            Mono.fromCallable(timeLimiter.decorateFutureSupplier(futureSupplier))
                                .onErrorResume { throwable ->
                                    when (throwable) {
                                        is TimeoutException -> {
                                            val duration = System.currentTimeMillis() - startTime
                                            logger.warn("Request timed out for path: $requestPath, duration: ${duration}ms")

                                            // Mark exchange as completed
                                            exchange.attributes["completed"] = true

                                            // First set the status code
                                            Mono.defer {
                                                LocalExceptionHandlers.timeout(exchange)
                                            }
                                        }
                                        is CancellationException -> {
                                            logger.warn("Request cancelled for path: $requestPath")

                                            // Mark exchange as completed
                                            exchange.attributes["completed"] = true

                                            // First set the status code
                                            Mono.defer {
                                                LocalExceptionHandlers.timeout(exchange)
                                            }
                                        }
                                        else -> {
                                            logger.error("Unexpected error for path: $requestPath", throwable)
                                            if (exchange.attributes["completed"] == true ||
                                                exchange.attributes["cancelled"] == true) {
                                                Mono.empty()
                                            } else {
                                                logger.error("Unexpected error for path: $requestPath", throwable)
                                                Mono.error(throwable)
                                            }
                                        }
                                    }
                                }
                                .doOnError { throwable ->
                                    if (throwable !is TimeoutException &&
                                        throwable !is CancellationException &&
                                        exchange.attributes["completed"] != true &&
                                        exchange.attributes["cancelled"] != true) {
                                        logger.error("Unhandled error for path: $requestPath", throwable)
                                    }
                                }
                        }

                        // 5. Basic request cleanup
                        f.removeRequestHeader("Cookie")
                    }
                    .uri(serverProperties.resourceServerUri)
            }
            .build()
    }

}

Load Test 2

Then when I run the Apache Benchmarking test (here concurrency is set to 10, but I get the same when it is set to 1)
ab -v 4 -n 25 -c 10 -H 'Cookie: BFF-SESSIONID=BFF-3f3400dd34c22190cebfc18ff8eda96e7d248ea3ef10e52e8a95c4df41a37e14-162166622840154-47453-c244a0f4-9fcc-44ad-b6d8-fdd7aa965d5c' http://127.0.0.1:9090/bff/api/v1/resource/contracts/hello

I get this:


Concurrency Level:      10
Time taken for tests:   21.616 seconds
Complete requests:      25
Failed requests:        23
   (Connect: 0, Receive: 0, Length: 23, Exceptions: 0)
Total transferred:      21134 bytes
HTML transferred:       8167 bytes
Requests per second:    1.16 [#/sec] (mean)
Time per request:       8646.540 [ms] (mean)
Time per request:       864.654 [ms] (mean, across all concurrent requests)
Transfer rate:          0.95 [Kbytes/sec] received

I can't figure out where in the TimeLimiter I am going wrong. Is it to do wit Kotlin, and not using a flow or Co-routine?

I spent 3 hours trying to debug this yesterday, but made no progress.

Commenting out the TimeLimiter code, I get back to the original, much faster benchmark results

Can someone please help?

LocalErrorHandlers

For reference here are my LocalExceptionHandlers (just a container for returning custom errors)

/**********************************************************************************************************************/
/*********************************************** LOCAL EXCEPTION HANDLERS *********************************************/
/**********************************************************************************************************************/

internal object LocalExceptionHandlers {

/**********************************************************************************************************************/
/* RATE LIMITER - MISSING KEY ERROR. */
/**********************************************************************************************************************/

    /**
     * Missing Key error response to handle missing key scenarios
     */
    internal fun missingKey(
        exchange: ServerWebExchange,
    ): Mono<Void> {

        val response = exchange.response
        response.statusCode = HttpStatus.BAD_REQUEST
        response.headers.contentType = MediaType.APPLICATION_JSON
        response.headers.add("Rate-Limiting-Exception", "Key not resolved")

        // create ThrottlingException
        val throttlingException = ThrottlingException(
            type = URI.create(URIErrorTypes.BAD_REQUEST.type),
            httpStatus = HttpStatus.BAD_REQUEST,
            title = "Rate Limiting Exception",
            message = "Missing key",
            cause = null,
            ErrorCodeTypes.THROTTLING_EXCEPTION.code,
            ErrorCategoryTypes.RATE_LIMITING_ERROR.type,
        )

        // convert to ProblemDetailsExtended
        val problemDetailsExtended = ProblemDetailsExtended(
            type = throttlingException.type,
            status = throttlingException.httpStatus.value(),
            title = throttlingException.title,
            detail = throttlingException.message,
            instance = null,
            code = throttlingException.code,
            errorCategory = throttlingException.errorCategory,
            errors = listOf(
                ErrorDetail(
                    detail = throttlingException.cause?.message ?: throttlingException.message,
                    code = throttlingException.code.toString()
                )
            )
        )

        // serialize to JSON
        val errorResponse = JSONUtilities.objectMapper.writeValueAsString(problemDetailsExtended)
        val responseBody = response.bufferFactory().wrap(errorResponse.toByteArray())

        return response.writeWith(Mono.just(responseBody))
    }

/**********************************************************************************************************************/
/* RATE LIMITER - TOO MANY REQUESTS. */
/**********************************************************************************************************************/

    /**
     * Missing Key error response to handle missing key scenarios
     */
    internal fun rateLimitExceeded(
        exchange: ServerWebExchange,
    ): Mono<Void> {

        val response = exchange.response
        response.statusCode = HttpStatus.TOO_MANY_REQUESTS
        response.headers.contentType = MediaType.APPLICATION_JSON
        response.headers.add("Rate-Limiting-Exception", "Too many requests")

        // create ThrottlingException
        val throttlingException = ThrottlingException(
            type = URI.create(URIErrorTypes.TOO_MANY_REQUESTS.type),
            httpStatus = HttpStatus.TOO_MANY_REQUESTS,
            title = "Rate Limiting Exception",
            message = "Too many requests",
            cause = null,
            ErrorCodeTypes.THROTTLING_EXCEPTION.code,
            ErrorCategoryTypes.RATE_LIMITING_ERROR.type,
        )

        // convert to ProblemDetailsExtended
        val problemDetailsExtended = ProblemDetailsExtended(
            type = throttlingException.type,
            status = throttlingException.httpStatus.value(),
            title = throttlingException.title,
            detail = throttlingException.message,
            instance = null,
            code = throttlingException.code,
            errorCategory = throttlingException.errorCategory,
            errors = listOf(
                ErrorDetail(
                    detail = throttlingException.cause?.message ?: throttlingException.message,
                    code = throttlingException.code.toString()
                )
            )
        )

        // serialize to JSON
        val errorResponse = JSONUtilities.objectMapper.writeValueAsString(problemDetailsExtended)
        val responseBody = response.bufferFactory().wrap(errorResponse.toByteArray())

        return response.writeWith(Mono.just(responseBody))
    }

/**********************************************************************************************************************/
/* TIMEOUT LIMITER - TIME LIMIT EXCEEDED. */
/**********************************************************************************************************************/

    /**
     * Timeout Limiter Error
     */
    internal fun timeout(
        exchange: ServerWebExchange
    ): Mono<Void> {

        val response = exchange.response
//        response.statusCode = HttpStatus.GATEWAY_TIMEOUT
        response.headers.contentType = MediaType.APPLICATION_JSON
        response.headers.add("Timeout-Exception", "Time limit exceeded")

        // create ThrottlingException
        val throttlingException = ThrottlingException(
            type = URI.create(URIErrorTypes.SERVICE_UNAVAILABLE.type),
            httpStatus = HttpStatus.GATEWAY_TIMEOUT,
            title = "Timeout Exception",
            message = "Time limit exceeded",
            cause = null,
            ErrorCodeTypes.GENERAL_EXCEPTION.code,
            ErrorCategoryTypes.TIME_LIMITING_ERROR.type,
        )

        // convert to ProblemDetailsExtended
        val problemDetailsExtended = ProblemDetailsExtended(
            type = throttlingException.type,
            status = throttlingException.httpStatus.value(),
            title = throttlingException.title,
            detail = throttlingException.message,
            instance = null,
            code = throttlingException.code,
            errorCategory = throttlingException.errorCategory,
            errors = listOf(
                ErrorDetail(
                    detail = throttlingException.cause?.message ?: throttlingException.message,
                    code = throttlingException.code.toString()
                )
            )
        )

        // serialize to JSON
        val errorResponse = JSONUtilities.objectMapper.writeValueAsString(problemDetailsExtended)
        val responseBody = response.bufferFactory().wrap(errorResponse.toByteArray())

        return response.writeWith(Mono.just(responseBody))

    }
}

/**********************************************************************************************************************/
/**************************************************** END OF KOTLIN ***************************************************/
/**********************************************************************************************************************/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant