Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve DelayedHandler. #9051 #12077

Open
wants to merge 67 commits into
base: jetty-12.1.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
5d1a9f6
Revert delayed dispatch handling to the connections.
gregw Jul 23, 2024
8c07278
Revert delayed dispatch handling to the connections.
gregw Jul 23, 2024
548770c
Cleanup of HttpConnection
gregw Aug 6, 2024
ec0d325
Delay until MultiPartFormData
gregw Aug 7, 2024
fb85057
Merge remote-tracking branch 'origin/jetty-12.1.x' into experiment/je…
gregw Aug 7, 2024
da5f09f
Wait for release
gregw Aug 7, 2024
5993364
fixed removal of deprecated methods
gregw Aug 7, 2024
3ef3d6e
fixed no mimeType
gregw Aug 7, 2024
5ede88a
PR #12077 - add test for DelayedHandler with multipart
lachlan-roberts Aug 8, 2024
e8ab483
Merge remote-tracking branch 'origin/experiment/jetty-12.1.x/delayedD…
lachlan-roberts Aug 8, 2024
9e1f431
updates from review
gregw Aug 22, 2024
e0f5b7e
fixed test
gregw Aug 22, 2024
b7c8bcb
Merge remote-tracking branch 'origin/jetty-12.1.x' into experiment/je…
gregw Sep 12, 2024
d4dcd6a
WIP updates from review
gregw Sep 17, 2024
9eb9e7d
WIP updates from review
gregw Sep 18, 2024
72ab339
WIP updates from review
gregw Sep 18, 2024
e45ecad
WIP updates from review
gregw Sep 18, 2024
cac0d62
Merge branch 'jetty-12.1.x' into experiment/jetty-12.1.x/delayedDispatch
gregw Sep 18, 2024
a585cfc
WIP updates from review
gregw Sep 18, 2024
8c33aa4
Use lowercase for charsets #11741
gregw Oct 3, 2024
ccec3e4
Use lowercase for charsets #11741
gregw Oct 4, 2024
1001c31
Use lowercase for charsets #11741
gregw Oct 4, 2024
8c25183
Use lowercase for charsets #11741
gregw Oct 4, 2024
3d7f5da
Use lowercase for charsets #11741
gregw Oct 6, 2024
6c0b6f9
Use lowercase for charsets #11741
gregw Oct 6, 2024
c87adb6
javadoc
gregw Oct 9, 2024
1172d59
updates from review
gregw Oct 14, 2024
7913cbb
updates from review
gregw Oct 14, 2024
33aaadb
Merge branch 'jetty-12.1.x' into experiment/jetty-12.1.x/delayedDispatch
gregw Oct 14, 2024
2cb7da6
Merge branch 'fix/jetty-12.1.x/11741/mimetypes' into experiment/jetty…
gregw Oct 14, 2024
4c5be88
WIP
gregw Oct 15, 2024
bb37636
Experiment to reuse buffer in HttpConnection to make retaining chunks…
gregw Oct 15, 2024
af81974
Experiment to reuse buffer in HttpConnection to make retaining chunks…
gregw Oct 15, 2024
907da62
Merge branch 'jetty-12.1.x' into experiment/jetty-12.1.x/delayedDispatch
gregw Oct 16, 2024
b48cfda
fixed mimetype lookup
gregw Oct 16, 2024
039e1c9
Merge remote-tracking branch 'origin/jetty-12.1.x' into experiment/je…
gregw Oct 21, 2024
ba98543
Delay content until 75% of an input buffer is read.
gregw Oct 22, 2024
472233b
Delay content until 75% of an input buffer is read.
gregw Oct 22, 2024
b32294e
Merge remote-tracking branch 'origin/jetty-12.1.x' into experiment/je…
gregw Oct 22, 2024
0dd3279
updates from review
gregw Oct 23, 2024
346adea
updates from review
gregw Oct 23, 2024
da8e70f
updates from review
gregw Oct 23, 2024
8ded76f
Merge remote-tracking branch 'origin/jetty-12.1.x' into experiment/je…
gregw Oct 24, 2024
847f06e
Merge branch 'jetty-12.1.x' into experiment/jetty-12.1.x/delayedDispatch
gregw Oct 24, 2024
a9b1578
updates from review
gregw Oct 24, 2024
353a350
Merge branch 'jetty-12.1.x' into experiment/jetty-12.1.x/delayedDispatch
gregw Oct 29, 2024
2fc4aa1
configurable space
gregw Oct 29, 2024
f400278
update javadoc and updates from review
gregw Oct 30, 2024
4692460
Merge remote-tracking branch 'origin/jetty-12.1.x' into experiment/je…
gregw Oct 30, 2024
775bef8
Merge branch 'jetty-12.1.x' into experiment/jetty-12.1.x/delayedDispatch
gregw Oct 31, 2024
ecbf249
Merge branch 'jetty-12.1.x' into experiment/jetty-12.1.x/delayedDispatch
gregw Oct 31, 2024
8fe31fe
Merge remote-tracking branch 'origin/jetty-12.1.x' into experiment/je…
gregw Nov 4, 2024
0543ae0
Implement non-compact algorithm in HTTP and HTTP/2
gregw Nov 4, 2024
687a7bc
Implement non-compact algorithm in HTTP and HTTP/2
gregw Nov 4, 2024
7cae30e
Merge remote-tracking branch 'origin/jetty-12.1.x' into experiment/je…
gregw Nov 4, 2024
1657145
fixed comment
gregw Nov 5, 2024
a9c709b
Merge remote-tracking branch 'origin/jetty-12.1.x' into experiment/je…
gregw Nov 5, 2024
acfba41
improved comments
gregw Nov 5, 2024
1e838f7
improved comments
gregw Nov 6, 2024
a23c8f4
updates from review
gregw Nov 7, 2024
0e9c9ec
Merge remote-tracking branch 'origin/jetty-12.1.x' into experiment/je…
gregw Nov 7, 2024
1f89ff5
Fix test leaks
gregw Nov 7, 2024
18f655c
Fix test leaks
gregw Nov 7, 2024
73e21e7
updates from review
gregw Nov 8, 2024
278e925
Implemented for H3
gregw Nov 9, 2024
4685621
Updates from review
gregw Nov 9, 2024
4a0e1c1
Configurable chunk overhead
gregw Nov 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,12 @@ public static MimeTypes.Type getMimeTypeFromContentType(HttpField field)
if (field instanceof MimeTypes.ContentTypeField contentTypeField)
return contentTypeField.getMimeType();

return MimeTypes.CACHE.get(field.getValue());
String contentType = field.getValue();
int semicolon = contentType.indexOf(';');
if (semicolon >= 0)
contentType = contentType.substring(0, semicolon).trim();

return MimeTypes.CACHE.get(contentType);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private static class HTTP2ClientConnection extends HTTP2Connection implements Ca

private HTTP2ClientConnection(HTTP2Client client, EndPoint endpoint, HTTP2ClientSession session, Promise<Session> sessionPromise, Session.Listener listener)
{
super(client.getByteBufferPool(), client.getExecutor(), endpoint, session, client.getInputBufferSize());
super(client.getByteBufferPool(), client.getExecutor(), endpoint, session, client.getInputBufferSize(), -1);
this.client = client;
this.promise = sessionPromise;
this.listener = listener;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
Expand Down Expand Up @@ -58,16 +59,23 @@ public class HTTP2Connection extends AbstractConnection implements Parser.Listen
private final ByteBufferPool bufferPool;
private final HTTP2Session session;
private final int bufferSize;
private final int minBufferSpace;
private final ExecutionStrategy strategy;
private boolean useInputDirectByteBuffers;
private boolean useOutputDirectByteBuffers;

protected HTTP2Connection(ByteBufferPool bufferPool, Executor executor, EndPoint endPoint, HTTP2Session session, int bufferSize)
{
this(bufferPool, executor, endPoint, session, bufferSize, -1);
}

protected HTTP2Connection(ByteBufferPool bufferPool, Executor executor, EndPoint endPoint, HTTP2Session session, int bufferSize, int minBufferSpace)
{
super(endPoint, executor);
this.bufferPool = bufferPool;
this.session = session;
this.bufferSize = bufferSize;
this.minBufferSpace = minBufferSpace < 0 ? Math.min(1500, bufferSize) : minBufferSpace;
this.strategy = new AdaptiveExecutionStrategy(producer, executor);
LifeCycle.start(strategy);
}
Expand Down Expand Up @@ -147,6 +155,7 @@ public void onClose(Throwable cause)
super.onClose(cause);

LifeCycle.stop(strategy);
producer.stop();
}

@Override
Expand All @@ -157,12 +166,20 @@ public void onFillable()
produce();
}

private int fill(EndPoint endPoint, ByteBuffer buffer)
private int fill(EndPoint endPoint, ByteBuffer buffer, boolean compact)
{
int padding = 0;
try
{
if (endPoint.isInputShutdown())
return -1;

if (!compact)
{
// Add padding content to avoid compaction
padding = buffer.limit();
buffer.position(0);
gregw marked this conversation as resolved.
Show resolved Hide resolved
}
return endPoint.fill(buffer);
}
catch (IOException x)
Expand All @@ -171,6 +188,11 @@ private int fill(EndPoint endPoint, ByteBuffer buffer)
LOG.debug("Could not read from {}", endPoint, x);
return -1;
}
finally
{
if (!compact && padding > 0)
buffer.position(padding);
}
}

@Override
Expand Down Expand Up @@ -303,16 +325,19 @@ public void onConnectionFailure(int error, String reason)

protected class HTTP2Producer implements ExecutionStrategy.Producer
{
private static final RetainableByteBuffer.Mutable STOPPED = new RetainableByteBuffer.NonRetainableByteBuffer(BufferUtil.EMPTY_BUFFER);
private final Callback fillableCallback = new FillableCallback();
private final AtomicReference<RetainableByteBuffer.Mutable> heldBuffer = new AtomicReference<>();
private RetainableByteBuffer.Mutable networkBuffer;
private boolean shutdown;
private boolean failed;

private void setInputBuffer(ByteBuffer byteBuffer)
{
acquireNetworkBuffer();
RetainableByteBuffer.Mutable networkBuffer = acquireBuffer();
if (!networkBuffer.append(byteBuffer))
LOG.warn("overflow");
throw new IllegalStateException("overflow");
holdBuffer(networkBuffer);
}

@Override
Expand All @@ -328,13 +353,14 @@ public Runnable produce()
return null;

boolean interested = false;
acquireNetworkBuffer();
networkBuffer = acquireBuffer();
try
{
boolean parse = networkBuffer.hasRemaining();

while (true)
{
boolean compact = true;
if (parse)
{
while (networkBuffer.hasRemaining())
Expand All @@ -349,17 +375,30 @@ public Runnable produce()
LOG.debug("Dequeued new task {}", task);
if (task != null)
return task;
}

// If more references than 1 (ie not just us), don't refill into buffer and risk compaction.
if (networkBuffer.isRetained())
reacquireNetworkBuffer();
// If the application has retained the content chunks then we must not overwrite content.
if (networkBuffer.isRetained())
{
// If there is sufficient space available, we can top up the buffer rather than allocate a new one
if (minBufferSpace > 0 && BufferUtil.space(networkBuffer.getByteBuffer()) >= minBufferSpace)
{
// do not compact the buffer
compact = false;
}
else
{
// otherwise reacquire the buffer and fill into the new buffer.
if (LOG.isDebugEnabled())
LOG.debug("Released retained {}", networkBuffer);
networkBuffer.release();
networkBuffer = acquireBuffer();
}
}

// Here we know that this.networkBuffer is not retained by
// application code: either it has been released, or it's a new one.
int filled = fill(getEndPoint(), networkBuffer.getByteBuffer());
int filled = fill(getEndPoint(), networkBuffer.getByteBuffer(), compact);
gregw marked this conversation as resolved.
Show resolved Hide resolved
if (LOG.isDebugEnabled())
LOG.debug("Filled {} bytes in {}", filled, networkBuffer);
LOG.debug("Filled {} bytes compacted {} in {}", filled, compact, networkBuffer);

if (filled > 0)
{
Expand All @@ -381,50 +420,63 @@ else if (filled == 0)
}
finally
{
releaseNetworkBuffer();
if (networkBuffer.isRetained() && !shutdown)
{
holdBuffer(networkBuffer);
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Released after process {}", networkBuffer);
networkBuffer.release();
}
networkBuffer = null;
if (interested)
getEndPoint().fillInterested(fillableCallback);
}
}

private void acquireNetworkBuffer()
private RetainableByteBuffer.Mutable acquireBuffer()
{
if (networkBuffer == null)
{
networkBuffer = bufferPool.acquire(bufferSize, isUseInputDirectByteBuffers()).asMutable();
if (LOG.isDebugEnabled())
LOG.debug("Acquired {}", networkBuffer);
}
RetainableByteBuffer.Mutable buffer = heldBuffer.getAndSet(null);
if (buffer == null)
buffer = bufferPool.acquire(bufferSize, isUseInputDirectByteBuffers()).asMutable();
if (LOG.isDebugEnabled())
LOG.debug("Acquired {}", buffer);
return buffer;
}

private void reacquireNetworkBuffer()
private void holdBuffer(RetainableByteBuffer.Mutable buffer)
{
RetainableByteBuffer.Mutable currentBuffer = networkBuffer;
if (currentBuffer == null)
throw new IllegalStateException();

if (currentBuffer.hasRemaining())
throw new IllegalStateException();

currentBuffer.release();
networkBuffer = bufferPool.acquire(bufferSize, isUseInputDirectByteBuffers());
if (LOG.isDebugEnabled())
LOG.debug("Reacquired {}<-{}", currentBuffer, networkBuffer);
if (heldBuffer.compareAndSet(null, buffer))
{
if (LOG.isDebugEnabled())
LOG.debug("Held {}", buffer);
}
else
{
if (heldBuffer.get() == STOPPED)
{
if (LOG.isDebugEnabled())
LOG.debug("Released instead of holding {}", buffer);
buffer.release();
}
else
{
throw new IllegalStateException("Buffer already saved");
}
}
}

private void releaseNetworkBuffer()
private void stop()
{
RetainableByteBuffer.Mutable currentBuffer = networkBuffer;
if (currentBuffer == null)
throw new IllegalStateException();

if (currentBuffer.hasRemaining() && !shutdown && !failed)
throw new IllegalStateException();

currentBuffer.release();
networkBuffer = null;
if (LOG.isDebugEnabled())
LOG.debug("Released {}", currentBuffer);
RetainableByteBuffer.Mutable buffer = heldBuffer.getAndSet(STOPPED);
if (buffer != null)
{
if (LOG.isDebugEnabled())
LOG.debug("Released in stop {}", buffer);
buffer.release();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection

public HTTP2ServerConnection(Connector connector, EndPoint endPoint, HttpConfiguration httpConfig, HTTP2ServerSession session, ServerSessionListener listener)
{
super(connector.getByteBufferPool(), connector.getExecutor(), endPoint, session, httpConfig.getInputBufferSize());
super(connector.getByteBufferPool(), connector.getExecutor(), endPoint, session, httpConfig.getInputBufferSize(), httpConfig.getMinInputBufferSpace());
this.connector = connector;
this.listener = listener;
this.httpConfig = httpConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ protected MetaData.Request newRequest(String method, String path, HttpFields fie
@AfterEach
public void dispose() throws Exception
{
// Stop the client so that all connections are closed and any saved buffers are released
LifeCycle.stop(httpClient);
try
{
if (serverBufferPool != null)
Expand All @@ -152,7 +154,6 @@ public void dispose() throws Exception
}
finally
{
LifeCycle.stop(httpClient);
LifeCycle.stop(server);
}
}
Expand Down
Loading
Loading