Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand Down Expand Up @@ -83,6 +84,11 @@ public class QoSHandler extends ConditionalHandler.Abstract
private final AtomicInteger state = new AtomicInteger();
private final Map<Integer, Queue<Entry>> queues = new ConcurrentHashMap<>();
private final Set<Integer> priorities = new ConcurrentSkipListSet<>(Comparator.reverseOrder());
private final AtomicLong totalCount = new AtomicLong();
private final AtomicLong suspendedCount = new AtomicLong();
private final AtomicLong resumedCount = new AtomicLong();
private final AtomicLong expiredCount = new AtomicLong();
private final AtomicLong exceededCount = new AtomicLong();
private CyclicTimeouts<Entry> timeouts;
private int maxRequests;
private int maxSuspendedRequests = 1024;
Expand Down Expand Up @@ -135,7 +141,7 @@ public int getMaxSuspendedRequestCount()
/**
* <p>Sets the max number of suspended requests.</p>
* <p>Once the max suspended request limit is reached,
* the request is failed with a HTTP status of
* the request is failed with an HTTP status of
* {@code 503 Service unavailable}.</p>
* <p>A negative value indicate an unlimited number
* of suspended requests.</p>
Expand Down Expand Up @@ -173,13 +179,43 @@ public void setMaxSuspend(Duration maxSuspend)
this.maxSuspend = maxSuspend;
}

@ManagedAttribute("The number of suspended requests")
@ManagedAttribute("The current number of suspended requests")
public int getSuspendedRequestCount()
{
int permits = state.get();
return Math.max(0, -permits);
}

@ManagedAttribute("The total number of processed requests")
public long getTotalRequestCount()
{
return totalCount.get();
}

@ManagedAttribute("The total number of resumed requests")
public long getTotalSuspendedRequestCount()
{
return suspendedCount.get();
}

@ManagedAttribute("The total number of resumed requests")
public long getTotalResumedRequestCount()
{
return resumedCount.get();
}

@ManagedAttribute("The total number of requests that expired while suspended")
public long getTotalExpiredRequestCount()
{
return expiredCount.get();
}

@ManagedAttribute("The total number of requests that exceeded the maximum number of suspended requests")
public long getTotalExceededRequestCount()
{
return exceededCount.get();
}

@Override
protected void doStart() throws Exception
{
Expand Down Expand Up @@ -223,8 +259,10 @@ private boolean process(Request request, Response response, Callback callback) t
if (LOG.isDebugEnabled())
LOG.debug("{} processing {}", this, request);

boolean expired = false;
totalCount.incrementAndGet();

boolean tooManyRequests = false;
boolean expiredReHandled = false;

// The read lock allows concurrency with resume(),
// which is the common case, but not with expire().
Expand Down Expand Up @@ -258,7 +296,7 @@ else if (request.getAttribute(EXPIRED_ATTRIBUTE_NAME) == null)
// This is a request that was suspended, it expired, and was re-handled.
// Do not suspend it again, just complete it with 503 unavailable.
state.incrementAndGet();
expired = true;
expiredReHandled = true;
}
}
}
Expand All @@ -267,11 +305,10 @@ else if (request.getAttribute(EXPIRED_ATTRIBUTE_NAME) == null)
lock.readLock().unlock();
}

if (expired || tooManyRequests)
{
notAvailable(response, callback);
return true;
}
if (tooManyRequests)
return tooManyRequests(response, callback);
if (expiredReHandled)
return expiredReHandled(response, callback);

return handleWithPermit(request, response, callback);
}
Expand All @@ -282,7 +319,18 @@ protected boolean onConditionsNotMet(Request request, Response response, Callbac
return nextHandler(request, response, callback);
}

private void notAvailable(Response response, Callback callback)
private boolean tooManyRequests(Response response, Callback callback)
{
exceededCount.incrementAndGet();
return notAvailable(response, callback);
}

private boolean expiredReHandled(Response response, Callback callback)
{
return notAvailable(response, callback);
}

private boolean notAvailable(Response response, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("{} rejecting {}", this, response.getRequest());
Expand All @@ -291,6 +339,7 @@ private void notAvailable(Response response, Callback callback)
callback.failed(new IllegalStateException("Response already committed"));
else
response.write(true, null, callback);
return true;
}

/**
Expand All @@ -309,21 +358,37 @@ protected int getPriority(Request request)
}

/**
* <p>Fails the given suspended request/response with the given error code and failure.</p>
* <p>Fails the given suspended request/response with the given status code and failure.</p>
* <p>This method is called only for suspended requests, in case of timeout while suspended,
* or in case of failure when trying to handle a resumed request.</p>
* or in case a resumed request is not handled, or in case of failure when trying to handle
* a resumed request.</p>
*
* @param request the request to fail
* @param response the response to fail
* @param callback the callback to complete
* @param status the failure status code
* @param failure the failure
* @param failure the failure, which may be {@code null} if the resumed request was not handled
*/
protected void failSuspended(Request request, Response response, Callback callback, int status, Throwable failure)
{
Response.writeError(request, response, callback, status, null, failure);
}

/**
* <p>Fails the given suspended request/response with status code {@value HttpStatus#SERVICE_UNAVAILABLE_503}
* and a {@link TimeoutException} failure.</p>
* <p>By default, calls {@link #failSuspended(Request, Response, Callback, int, Throwable)}.</p>
*
* @param request the request to fail
* @param response the response to fail
* @param callback the callback to complete
*/
protected void expireSuspended(Request request, Response response, Callback callback)
{
expiredCount.incrementAndGet();
failSuspended(request, response, callback, HttpStatus.SERVICE_UNAVAILABLE_503, new TimeoutException());
}

private boolean handleWithPermit(Request request, Response response, Callback callback) throws Exception
{
if (LOG.isDebugEnabled())
Expand All @@ -334,6 +399,7 @@ private boolean handleWithPermit(Request request, Response response, Callback ca

private void suspend(Request request, Response response, Callback callback)
{
suspendedCount.incrementAndGet();
int priority = Math.max(0, getPriority(request));
if (LOG.isDebugEnabled())
LOG.debug("{} suspending priority={} {}", this, priority, request);
Expand Down Expand Up @@ -408,6 +474,22 @@ private void execute(Request request, Runnable task)
request.getComponents().getExecutor().execute(task);
}

@Override
public String toString()
{
return "%s[maxReq=%d,maxSus=%d,sus/res/tot/exp/exc=(%d,%d)/%d/%d/%d/%d]".formatted(
super.toString(),
getMaxRequestCount(),
getMaxSuspendedRequestCount(),
getSuspendedRequestCount(),
getTotalSuspendedRequestCount(),
getTotalResumedRequestCount(),
getTotalRequestCount(),
getTotalExpiredRequestCount(),
getTotalExceededRequestCount()
);
}

private class Entry implements CyclicTimeouts.Expirable, Runnable
{
private final Request request;
Expand Down Expand Up @@ -458,14 +540,15 @@ private void expire()
}

if (removed)
execute(request, () -> failSuspended(request, response, callback, HttpStatus.SERVICE_UNAVAILABLE_503, new TimeoutException()));
execute(request, () -> expireSuspended(request, response, callback));
}

@Override
public void run()
{
try
{
resumedCount.incrementAndGet();
boolean handled = handleWithPermit(request, response, callback);
if (LOG.isDebugEnabled())
LOG.debug("{} handled={} {}", QoSHandler.this, handled, request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -115,6 +116,10 @@ public boolean handle(Request request, Response response, Callback callback)
await().atMost(5, TimeUnit.SECONDS).until(callbacks::size, is(i + 1));
}

await().atMost(5, TimeUnit.SECONDS).until(qosHandler::getTotalRequestCount, equalTo((long)maxRequests));
assertEquals(0, qosHandler.getTotalSuspendedRequestCount());
assertEquals(0, qosHandler.getTotalResumedRequestCount());

// Send one more request, it should be suspended by QoSHandler.
LocalConnector.LocalEndPoint endPoint = connector.executeRequest("""
GET /%d HTTP/1.1
Expand All @@ -123,8 +128,10 @@ public boolean handle(Request request, Response response, Callback callback)
""".formatted(maxRequests));
endPoints.add(endPoint);

assertEquals(maxRequests, callbacks.size());
await().atMost(5, TimeUnit.SECONDS).until(qosHandler::getTotalRequestCount, equalTo(maxRequests + 1L));
await().atMost(5, TimeUnit.SECONDS).until(qosHandler::getTotalSuspendedRequestCount, equalTo(1L));
await().atMost(5, TimeUnit.SECONDS).until(qosHandler::getSuspendedRequestCount, is(1));
await().atMost(5, TimeUnit.SECONDS).until(callbacks::size, is(maxRequests));

// Finish and verify the waiting requests.
List<Callback> copy = List.copyOf(callbacks);
Expand All @@ -140,6 +147,8 @@ public boolean handle(Request request, Response response, Callback callback)

// The suspended request should have been resumed.
await().atMost(5, TimeUnit.SECONDS).until(qosHandler::getSuspendedRequestCount, is(0));
await().atMost(5, TimeUnit.SECONDS).until(qosHandler::getTotalSuspendedRequestCount, equalTo(1L));
await().atMost(5, TimeUnit.SECONDS).until(qosHandler::getTotalResumedRequestCount, equalTo(1L));
await().atMost(5, TimeUnit.SECONDS).until(callbacks::size, is(1));

// Finish the resumed request that is now waiting.
Expand Down Expand Up @@ -189,6 +198,7 @@ public boolean handle(Request request, Response response, Callback callback)
// Do not succeed the callback of the first request.
// Wait for the second request to time out.
await().atMost(2 * timeout, TimeUnit.MILLISECONDS).until(qosHandler::getSuspendedRequestCount, is(0));
await().atMost(5, TimeUnit.SECONDS).until(qosHandler::getTotalExpiredRequestCount, is(1L));

String text = endPoint1.getResponse(false, 5, TimeUnit.SECONDS);
HttpTester.Response response = HttpTester.parseResponse(text);
Expand Down Expand Up @@ -478,6 +488,8 @@ public boolean handle(Request request, Response response, Callback callback)
""".formatted(i)));
assertEquals(HttpStatus.SERVICE_UNAVAILABLE_503, response.getStatus());
}
await().atMost(5, TimeUnit.SECONDS).until(qosHandler::getTotalExceededRequestCount, is(2L));

// Wait for the other requests to finish normally.
endPoints.forEach(endPoint ->
{
Expand Down