Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EagerContentHandler. #9051 #12077

Open
wants to merge 79 commits into
base: jetty-12.1.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 63 commits
Commits
Show all changes
79 commits
Select commit Hold shift + click to select a range
5d1a9f6
Revert delayed dispatch handling to the connections.
gregw Jul 23, 2024
8c07278
Revert delayed dispatch handling to the connections.
gregw Jul 23, 2024
548770c
Cleanup of HttpConnection
gregw Aug 6, 2024
ec0d325
Delay until MultiPartFormData
gregw Aug 7, 2024
fb85057
Merge remote-tracking branch 'origin/jetty-12.1.x' into experiment/je…
gregw Aug 7, 2024
da5f09f
Wait for release
gregw Aug 7, 2024
5993364
fixed removal of deprecated methods
gregw Aug 7, 2024
3ef3d6e
fixed no mimeType
gregw Aug 7, 2024
5ede88a
PR #12077 - add test for DelayedHandler with multipart
lachlan-roberts Aug 8, 2024
e8ab483
Merge remote-tracking branch 'origin/experiment/jetty-12.1.x/delayedD…
lachlan-roberts Aug 8, 2024
9e1f431
updates from review
gregw Aug 22, 2024
e0f5b7e
fixed test
gregw Aug 22, 2024
b7c8bcb
Merge remote-tracking branch 'origin/jetty-12.1.x' into experiment/je…
gregw Sep 12, 2024
d4dcd6a
WIP updates from review
gregw Sep 17, 2024
9eb9e7d
WIP updates from review
gregw Sep 18, 2024
72ab339
WIP updates from review
gregw Sep 18, 2024
e45ecad
WIP updates from review
gregw Sep 18, 2024
cac0d62
Merge branch 'jetty-12.1.x' into experiment/jetty-12.1.x/delayedDispatch
gregw Sep 18, 2024
a585cfc
WIP updates from review
gregw Sep 18, 2024
8c33aa4
Use lowercase for charsets #11741
gregw Oct 3, 2024
ccec3e4
Use lowercase for charsets #11741
gregw Oct 4, 2024
1001c31
Use lowercase for charsets #11741
gregw Oct 4, 2024
8c25183
Use lowercase for charsets #11741
gregw Oct 4, 2024
3d7f5da
Use lowercase for charsets #11741
gregw Oct 6, 2024
6c0b6f9
Use lowercase for charsets #11741
gregw Oct 6, 2024
c87adb6
javadoc
gregw Oct 9, 2024
1172d59
updates from review
gregw Oct 14, 2024
7913cbb
updates from review
gregw Oct 14, 2024
33aaadb
Merge branch 'jetty-12.1.x' into experiment/jetty-12.1.x/delayedDispatch
gregw Oct 14, 2024
2cb7da6
Merge branch 'fix/jetty-12.1.x/11741/mimetypes' into experiment/jetty…
gregw Oct 14, 2024
4c5be88
WIP
gregw Oct 15, 2024
bb37636
Experiment to reuse buffer in HttpConnection to make retaining chunks…
gregw Oct 15, 2024
af81974
Experiment to reuse buffer in HttpConnection to make retaining chunks…
gregw Oct 15, 2024
907da62
Merge branch 'jetty-12.1.x' into experiment/jetty-12.1.x/delayedDispatch
gregw Oct 16, 2024
b48cfda
fixed mimetype lookup
gregw Oct 16, 2024
039e1c9
Merge remote-tracking branch 'origin/jetty-12.1.x' into experiment/je…
gregw Oct 21, 2024
ba98543
Delay content until 75% of an input buffer is read.
gregw Oct 22, 2024
472233b
Delay content until 75% of an input buffer is read.
gregw Oct 22, 2024
b32294e
Merge remote-tracking branch 'origin/jetty-12.1.x' into experiment/je…
gregw Oct 22, 2024
0dd3279
updates from review
gregw Oct 23, 2024
346adea
updates from review
gregw Oct 23, 2024
da8e70f
updates from review
gregw Oct 23, 2024
8ded76f
Merge remote-tracking branch 'origin/jetty-12.1.x' into experiment/je…
gregw Oct 24, 2024
847f06e
Merge branch 'jetty-12.1.x' into experiment/jetty-12.1.x/delayedDispatch
gregw Oct 24, 2024
a9b1578
updates from review
gregw Oct 24, 2024
353a350
Merge branch 'jetty-12.1.x' into experiment/jetty-12.1.x/delayedDispatch
gregw Oct 29, 2024
2fc4aa1
configurable space
gregw Oct 29, 2024
f400278
update javadoc and updates from review
gregw Oct 30, 2024
4692460
Merge remote-tracking branch 'origin/jetty-12.1.x' into experiment/je…
gregw Oct 30, 2024
775bef8
Merge branch 'jetty-12.1.x' into experiment/jetty-12.1.x/delayedDispatch
gregw Oct 31, 2024
ecbf249
Merge branch 'jetty-12.1.x' into experiment/jetty-12.1.x/delayedDispatch
gregw Oct 31, 2024
8fe31fe
Merge remote-tracking branch 'origin/jetty-12.1.x' into experiment/je…
gregw Nov 4, 2024
0543ae0
Implement non-compact algorithm in HTTP and HTTP/2
gregw Nov 4, 2024
687a7bc
Implement non-compact algorithm in HTTP and HTTP/2
gregw Nov 4, 2024
7cae30e
Merge remote-tracking branch 'origin/jetty-12.1.x' into experiment/je…
gregw Nov 4, 2024
1657145
fixed comment
gregw Nov 5, 2024
a9c709b
Merge remote-tracking branch 'origin/jetty-12.1.x' into experiment/je…
gregw Nov 5, 2024
acfba41
improved comments
gregw Nov 5, 2024
1e838f7
improved comments
gregw Nov 6, 2024
a23c8f4
updates from review
gregw Nov 7, 2024
0e9c9ec
Merge remote-tracking branch 'origin/jetty-12.1.x' into experiment/je…
gregw Nov 7, 2024
1f89ff5
Fix test leaks
gregw Nov 7, 2024
18f655c
Fix test leaks
gregw Nov 7, 2024
73e21e7
updates from review
gregw Nov 8, 2024
278e925
Implemented for H3
gregw Nov 9, 2024
4685621
Updates from review
gregw Nov 9, 2024
4a0e1c1
Configurable chunk overhead
gregw Nov 11, 2024
9a23ecc
Updates after review
gregw Nov 12, 2024
2f06d2f
deprecated DelayedHandler
gregw Nov 12, 2024
db9f4d6
Merge remote-tracking branch 'origin/jetty-12.1.x' into experiment/je…
gregw Nov 12, 2024
50265b4
Fixed several XmlConfiguration issues so that a Builder pattern can b…
gregw Nov 12, 2024
2b0fe1b
Fixed several XmlConfiguration issues so that a Builder pattern can b…
gregw Nov 13, 2024
e391c0b
Updates from review
gregw Nov 13, 2024
da36c86
Updates from review
gregw Nov 13, 2024
6bfc885
Added distribution test cases for EagerContentHandler modules.
sbordet Nov 13, 2024
4997eed
Merged eager modules to one module
gregw Nov 14, 2024
a68e9b3
Removed the check on inputState.
gregw Nov 14, 2024
494fe3a
Merge remote-tracking branch 'origin/jetty-12.1.x' into experiment/je…
gregw Nov 14, 2024
e8ce082
Fixed bad content type test
gregw Nov 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,12 @@ public static MimeTypes.Type getMimeTypeFromContentType(HttpField field)
if (field instanceof MimeTypes.ContentTypeField contentTypeField)
return contentTypeField.getMimeType();

return MimeTypes.CACHE.get(field.getValue());
String contentType = field.getValue();
int semicolon = contentType.indexOf(';');
if (semicolon >= 0)
contentType = contentType.substring(0, semicolon).trim();

return MimeTypes.CACHE.get(contentType);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private static class HTTP2ClientConnection extends HTTP2Connection implements Ca

private HTTP2ClientConnection(HTTP2Client client, EndPoint endpoint, HTTP2ClientSession session, Promise<Session> sessionPromise, Session.Listener listener)
{
super(client.getByteBufferPool(), client.getExecutor(), endpoint, session, client.getInputBufferSize());
super(client.getByteBufferPool(), client.getExecutor(), endpoint, session, client.getInputBufferSize(), 0);
gregw marked this conversation as resolved.
Show resolved Hide resolved
this.client = client;
this.promise = sessionPromise;
this.listener = listener;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
Expand Down Expand Up @@ -58,16 +59,23 @@ public class HTTP2Connection extends AbstractConnection implements Parser.Listen
private final ByteBufferPool bufferPool;
private final HTTP2Session session;
private final int bufferSize;
private final int minBufferSpace;
private final ExecutionStrategy strategy;
private boolean useInputDirectByteBuffers;
private boolean useOutputDirectByteBuffers;

protected HTTP2Connection(ByteBufferPool bufferPool, Executor executor, EndPoint endPoint, HTTP2Session session, int bufferSize)
{
this(bufferPool, executor, endPoint, session, bufferSize, -1);
}

protected HTTP2Connection(ByteBufferPool bufferPool, Executor executor, EndPoint endPoint, HTTP2Session session, int bufferSize, int minBufferSpace)
{
super(endPoint, executor);
this.bufferPool = bufferPool;
this.session = session;
this.bufferSize = bufferSize;
this.minBufferSpace = minBufferSpace < 0 ? Math.min(1024, bufferSize) : minBufferSpace;
gregw marked this conversation as resolved.
Show resolved Hide resolved
this.strategy = new AdaptiveExecutionStrategy(producer, executor);
LifeCycle.start(strategy);
}
Expand Down Expand Up @@ -147,6 +155,7 @@ public void onClose(Throwable cause)
super.onClose(cause);

LifeCycle.stop(strategy);
producer.stop();
}

@Override
Expand All @@ -157,12 +166,20 @@ public void onFillable()
produce();
}

private int fill(EndPoint endPoint, ByteBuffer buffer)
private int fill(EndPoint endPoint, ByteBuffer buffer, boolean compact)
{
int padding = 0;
try
{
if (endPoint.isInputShutdown())
return -1;

if (!compact)
{
// Add padding content to avoid compaction
padding = buffer.limit();
buffer.position(0);
gregw marked this conversation as resolved.
Show resolved Hide resolved
}
return endPoint.fill(buffer);
}
catch (IOException x)
Expand All @@ -171,6 +188,11 @@ private int fill(EndPoint endPoint, ByteBuffer buffer)
LOG.debug("Could not read from {}", endPoint, x);
return -1;
}
finally
{
if (!compact && padding > 0)
buffer.position(padding);
}
}

@Override
Expand Down Expand Up @@ -303,16 +325,19 @@ public void onConnectionFailure(int error, String reason)

protected class HTTP2Producer implements ExecutionStrategy.Producer
{
private static final RetainableByteBuffer.Mutable STOPPED = new RetainableByteBuffer.NonRetainableByteBuffer(BufferUtil.EMPTY_BUFFER);
private final Callback fillableCallback = new FillableCallback();
private final AtomicReference<RetainableByteBuffer.Mutable> savedBuffer = new AtomicReference<>();
gregw marked this conversation as resolved.
Show resolved Hide resolved
private RetainableByteBuffer.Mutable networkBuffer;
private boolean shutdown;
private boolean failed;

private void setInputBuffer(ByteBuffer byteBuffer)
{
acquireNetworkBuffer();
RetainableByteBuffer.Mutable networkBuffer = acquireBuffer();
if (!networkBuffer.append(byteBuffer))
LOG.warn("overflow");
gregw marked this conversation as resolved.
Show resolved Hide resolved
saveBuffer(networkBuffer);
gregw marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand All @@ -328,13 +353,15 @@ public Runnable produce()
return null;

boolean interested = false;
acquireNetworkBuffer();
int filled = 0;
gregw marked this conversation as resolved.
Show resolved Hide resolved
networkBuffer = acquireBuffer();
try
{
boolean parse = networkBuffer.hasRemaining();

while (true)
{
boolean compact = true;
if (parse)
{
while (networkBuffer.hasRemaining())
Expand All @@ -349,17 +376,30 @@ public Runnable produce()
LOG.debug("Dequeued new task {}", task);
if (task != null)
return task;
}

// If more references than 1 (ie not just us), don't refill into buffer and risk compaction.
if (networkBuffer.isRetained())
reacquireNetworkBuffer();
// If the application has retained the content chunks then we must not overwrite content.
if (networkBuffer.isRetained())
{
// If there is sufficient space available, we can top up the buffer rather than allocate a new one
if (minBufferSpace > 0 && BufferUtil.space(networkBuffer.getByteBuffer()) >= minBufferSpace)
{
// do not compact the buffer
compact = false;
}
else
{
// otherwise reacquire the buffer and fill into the new buffer.
if (LOG.isDebugEnabled())
LOG.debug("Released retained {}", networkBuffer);
networkBuffer.release();
networkBuffer = acquireBuffer();
}
}

// Here we know that this.networkBuffer is not retained by
// application code: either it has been released, or it's a new one.
int filled = fill(getEndPoint(), networkBuffer.getByteBuffer());
filled = fill(getEndPoint(), networkBuffer.getByteBuffer(), compact);
if (LOG.isDebugEnabled())
LOG.debug("Filled {} bytes in {}", filled, networkBuffer);
LOG.debug("Filled {} bytes compacted {} in {}", filled, compact, networkBuffer);

if (filled > 0)
{
Expand All @@ -381,50 +421,63 @@ else if (filled == 0)
}
finally
{
releaseNetworkBuffer();
if (networkBuffer.isRetained() && !shutdown)
{
saveBuffer(networkBuffer);
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Released after process {}", networkBuffer);
networkBuffer.release();
}
networkBuffer = null;
if (interested)
getEndPoint().fillInterested(fillableCallback);
}
}

private void acquireNetworkBuffer()
private RetainableByteBuffer.Mutable acquireBuffer()
{
if (networkBuffer == null)
{
networkBuffer = bufferPool.acquire(bufferSize, isUseInputDirectByteBuffers()).asMutable();
if (LOG.isDebugEnabled())
LOG.debug("Acquired {}", networkBuffer);
}
RetainableByteBuffer.Mutable buffer = savedBuffer.getAndSet(null);
if (buffer == null)
buffer = bufferPool.acquire(bufferSize, isUseInputDirectByteBuffers()).asMutable();
if (LOG.isDebugEnabled())
LOG.debug("Acquired {}", buffer);
return buffer;
}

private void reacquireNetworkBuffer()
private void saveBuffer(RetainableByteBuffer.Mutable buffer)
{
RetainableByteBuffer.Mutable currentBuffer = networkBuffer;
if (currentBuffer == null)
throw new IllegalStateException();

if (currentBuffer.hasRemaining())
throw new IllegalStateException();

currentBuffer.release();
networkBuffer = bufferPool.acquire(bufferSize, isUseInputDirectByteBuffers());
if (LOG.isDebugEnabled())
LOG.debug("Reacquired {}<-{}", currentBuffer, networkBuffer);
if (savedBuffer.compareAndSet(null, buffer))
{
if (LOG.isDebugEnabled())
LOG.debug("Saved {}", buffer);
gregw marked this conversation as resolved.
Show resolved Hide resolved
}
else
{
if (savedBuffer.get() == STOPPED)
{
if (LOG.isDebugEnabled())
LOG.debug("Released in save {}", buffer);
gregw marked this conversation as resolved.
Show resolved Hide resolved
buffer.release();
}
else
{
throw new IllegalStateException("Buffer already saved");
}
}
}

private void releaseNetworkBuffer()
private void stop()
{
RetainableByteBuffer.Mutable currentBuffer = networkBuffer;
if (currentBuffer == null)
throw new IllegalStateException();

if (currentBuffer.hasRemaining() && !shutdown && !failed)
throw new IllegalStateException();

currentBuffer.release();
networkBuffer = null;
if (LOG.isDebugEnabled())
LOG.debug("Released {}", currentBuffer);
RetainableByteBuffer.Mutable buffer = savedBuffer.getAndSet(STOPPED);
if (buffer != null)
{
if (LOG.isDebugEnabled())
LOG.debug("Released in stop {}", buffer);
buffer.release();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection

public HTTP2ServerConnection(Connector connector, EndPoint endPoint, HttpConfiguration httpConfig, HTTP2ServerSession session, ServerSessionListener listener)
{
super(connector.getByteBufferPool(), connector.getExecutor(), endPoint, session, httpConfig.getInputBufferSize());
super(connector.getByteBufferPool(), connector.getExecutor(), endPoint, session, httpConfig.getInputBufferSize(), httpConfig.getMinInputBufferSpace());
this.connector = connector;
this.listener = listener;
this.httpConfig = httpConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ protected MetaData.Request newRequest(String method, String path, HttpFields fie
@AfterEach
public void dispose() throws Exception
{
// Stop the client so that all connections are closed and any saved buffers are released
LifeCycle.stop(httpClient);
try
{
if (serverBufferPool != null)
Expand All @@ -152,7 +154,6 @@ public void dispose() throws Exception
}
finally
{
LifeCycle.stop(httpClient);
LifeCycle.stop(server);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public boolean isFillInterested()
protected void onFillInterestedFailed(Throwable cause)
{
if (LOG.isDebugEnabled())
LOG.debug("{} onFillInterestedFailed {}", this, cause);
LOG.debug("onFillInterestedFailed {}", this, cause);
if (_endPoint.isOpen())
{
boolean close = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.IntUnaryOperator;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -66,6 +67,7 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable
private final long _maxDirectMemory;
private final IntUnaryOperator _bucketIndexFor;
private final AtomicBoolean _evictor = new AtomicBoolean(false);
private final AtomicLong _reserved = new AtomicLong();
private boolean _statisticsEnabled;

/**
Expand Down Expand Up @@ -175,6 +177,12 @@ private long maxMemory(long maxMemory)
return maxMemory;
}

@ManagedAttribute("The current number of allocated bytes reserved to be added to the pool once released")
public long getReserved()
{
return _reserved.get();
}

@ManagedAttribute("Whether statistics are enabled")
public boolean isStatisticsEnabled()
{
Expand Down Expand Up @@ -214,6 +222,7 @@ public RetainableByteBuffer.Mutable acquire(int size, boolean direct)
if (entry == null)
{
ByteBuffer buffer = BufferUtil.allocate(bucket.getCapacity(), direct);
_reserved.addAndGet(buffer.capacity());
return new ReservedBuffer(buffer, bucket);
}

Expand Down Expand Up @@ -249,6 +258,7 @@ public boolean releaseAndRemove(RetainableByteBuffer buffer)

private void reserve(RetainedBucket bucket, ByteBuffer byteBuffer)
{
_reserved.addAndGet(-byteBuffer.capacity());
bucket.recordRelease();

// Try to reserve an entry to put the buffer into the pool.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
<Call name="insertHandler">
<Arg>
<New id="DelayedHandler" class="org.eclipse.jetty.server.handler.DelayedHandler">
<Set name="maxRetainedContent" property="jetty.delayed.maxRetainedContent"/>
gregw marked this conversation as resolved.
Show resolved Hide resolved
</New>
</Arg>
</Call>
Expand Down
1 change: 1 addition & 0 deletions jetty-core/jetty-server/src/main/config/etc/jetty.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
<Set name="generateRedirectBody" property="jetty.httpConfig.generateRedirectBody"/>
<Set name="useInputDirectByteBuffers" property="jetty.httpConfig.useInputDirectByteBuffers"/>
<Set name="useOutputDirectByteBuffers" property="jetty.httpConfig.useOutputDirectByteBuffers"/>
<Set name="minInputBufferSpace" property="jetty.httpConfig.minInputBufferSpace"/>
</New>

<!-- =========================================================== -->
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@

[description]
Applies DelayedHandler to entire server.
Delays request handling until any body content has arrived, to minimize blocking.
Delays request handling until body content has arrived, to minimize blocking.
For form data and multipart, the handling is delayed until the entire request body has
been asynchronously read. For all other content types, the delay is until the first byte
has arrived.
been asynchronously read. For all other content types, the delay is for up to a limited size,
which by default is one input buffer.
gregw marked this conversation as resolved.
Show resolved Hide resolved

[tags]
server
Expand All @@ -18,3 +18,8 @@ threadlimit
[xml]
etc/jetty-delayed.xml


[ini-template]
#tag::documentation[]
## The maximum bytes to retain whilst delaying content; or 0 for no delay; or -1 (default) for a default value.
gregw marked this conversation as resolved.
Show resolved Hide resolved
# jetty.delayed.maxRetainedContent=-1
Loading
Loading