Skip to content

Commit 712daf1

Browse files
chore(implementation)!: use Jetty-12 core without servlets
Port the invoker to upgrade to Eclipse Jetty-12 version 12. Specifically using the new core APIs of Eclipse Jetty-12 that allow the overhead of a Servlet container to be avoided. BREAKING CHANGE: use Java 17 or above, as required by Eclipse Jetty-12. Signed-off-by: Lachlan Roberts <[email protected]>
1 parent cff88d1 commit 712daf1

File tree

5 files changed

+81
-74
lines changed

5 files changed

+81
-74
lines changed

invoker/core/src/main/java/com/google/cloud/functions/invoker/BackgroundFunctionExecutor.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -185,9 +185,12 @@ static Optional<Type> backgroundFunctionTypeArgument(
185185
}
186186

187187
private static Event parseLegacyEvent(Request req) throws IOException {
188-
try (BufferedReader bodyReader = new BufferedReader(
189-
new InputStreamReader(Content.Source.asInputStream(req),
190-
Objects.requireNonNullElse(Request.getCharset(req), StandardCharsets.ISO_8859_1)))) {
188+
try (BufferedReader bodyReader =
189+
new BufferedReader(
190+
new InputStreamReader(
191+
Content.Source.asInputStream(req),
192+
Objects.requireNonNullElse(
193+
Request.getCharset(req), StandardCharsets.ISO_8859_1)))) {
191194
return parseLegacyEvent(bodyReader);
192195
}
193196
}
@@ -367,6 +370,9 @@ private <CloudEventT> void serviceCloudEvent(Request req) throws Exception {
367370
@SuppressWarnings("unchecked")
368371
FunctionExecutor<CloudEventT> executor = (FunctionExecutor<CloudEventT>) functionExecutor;
369372

373+
// Read the entire request body into a byte array.
374+
// TODO: this method is deprecated for removal, use the method introduced by
375+
// https://github.com/jetty/jetty.project/pull/13939 when it is released.
370376
byte[] body = Content.Source.asByteArrayAsync(req, -1).get();
371377
MessageReader reader = HttpMessageFactory.createReaderFromMultimap(headerMap(req), body);
372378
// It's important not to set the context ClassLoader earlier, because MessageUtils will use
@@ -384,7 +390,8 @@ private <CloudEventT> void serviceCloudEvent(Request req) throws Exception {
384390
private static Map<String, List<String>> headerMap(Request req) {
385391
Map<String, List<String>> headerMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
386392
for (HttpField field : req.getHeaders()) {
387-
headerMap.computeIfAbsent(field.getName(), unused -> new ArrayList<>())
393+
headerMap
394+
.computeIfAbsent(field.getName(), unused -> new ArrayList<>())
388395
.addAll(field.getValueList());
389396
}
390397
return headerMap;

invoker/core/src/main/java/com/google/cloud/functions/invoker/http/HttpRequestImpl.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,14 @@ public Map<String, List<String>> getQueryParameters() {
8080
@Override
8181
public Map<String, HttpPart> getParts() {
8282
String contentType = request.getHeaders().get(HttpHeader.CONTENT_TYPE);
83-
if (contentType == null || !contentType.startsWith(MimeTypes.Type.MULTIPART_FORM_DATA.asString())) {
83+
if (contentType == null
84+
|| !contentType.startsWith(MimeTypes.Type.MULTIPART_FORM_DATA.asString())) {
8485
throw new IllegalStateException("Content-Type must be multipart/form-data: " + contentType);
8586
}
8687

8788
// The multipart parsing is done by the EagerContentHandler, so we just call getParts.
8889
MultiPartFormData.Parts parts = MultiPartFormData.getParts(request);
89-
if (parts == null){
90+
if (parts == null) {
9091
throw new IllegalStateException();
9192
}
9293

@@ -178,8 +179,8 @@ public Optional<String> getCharacterEncoding() {
178179

179180
@Override
180181
public InputStream getInputStream() throws IOException {
181-
Content.Source contentSource = part.createContentSource();
182-
return Content.Source.asInputStream(contentSource);
182+
Content.Source contentSource = part.createContentSource();
183+
return Content.Source.asInputStream(contentSource);
183184
}
184185

185186
@Override

invoker/core/src/main/java/com/google/cloud/functions/invoker/http/HttpResponseImpl.java

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,15 @@ public OutputStream getOutputStream() {
8787
}
8888
if (outputStream == null) {
8989
Request request = response.getRequest();
90-
int outputBufferSize = request.getConnectionMetaData().getHttpConfiguration()
91-
.getOutputBufferSize();
92-
BufferedContentSink bufferedContentSink = new BufferedContentSink(response,
93-
request.getComponents().getByteBufferPool(),
94-
false, outputBufferSize / 2, outputBufferSize);
90+
int outputBufferSize =
91+
request.getConnectionMetaData().getHttpConfiguration().getOutputBufferSize();
92+
BufferedContentSink bufferedContentSink =
93+
new BufferedContentSink(
94+
response,
95+
request.getComponents().getByteBufferPool(),
96+
false,
97+
outputBufferSize / 2,
98+
outputBufferSize);
9599
outputStream = new ContentSinkOutputStream(bufferedContentSink);
96100
}
97101
return outputStream;
@@ -104,8 +108,10 @@ public synchronized BufferedWriter getWriter() throws IOException {
104108
throw new IllegalStateException("getOutputStream called");
105109
}
106110

107-
writer = new NonBufferedWriter(WriteThroughWriter.newWriter(getOutputStream(),
108-
Objects.requireNonNullElse(charset, StandardCharsets.UTF_8)));
111+
writer =
112+
new NonBufferedWriter(
113+
WriteThroughWriter.newWriter(
114+
getOutputStream(), Objects.requireNonNullElse(charset, StandardCharsets.UTF_8)));
109115
}
110116
return writer;
111117
}
@@ -130,13 +136,11 @@ public void close(Callback callback) {
130136
}
131137

132138
/**
133-
* A {@link BufferedWriter} that does not buffer.
134-
* It is generally more efficient to buffer at the {@link Content.Sink} level,
135-
* since frequently total content is smaller than a single buffer and
136-
* the {@link Content.Sink} can turn a close into a last write that will avoid
137-
* chunking the response if at all possible. However, {@link BufferedWriter}
138-
* is in the API for {@link HttpResponse}, so we must return a writer of
139-
* that type.
139+
* A {@link BufferedWriter} that does not buffer. It is generally more efficient to buffer at the
140+
* {@link Content.Sink} level, since frequently total content is smaller than a single buffer and
141+
* the {@link Content.Sink} can turn a close into a last write that will avoid chunking the
142+
* response if at all possible. However, {@link BufferedWriter} is in the API for {@link
143+
* HttpResponse}, so we must return a writer of that type.
140144
*/
141145
private static class NonBufferedWriter extends BufferedWriter {
142146
private final Writer writer;

invoker/core/src/main/java/com/google/cloud/functions/invoker/http/TimeoutHandler.java

Lines changed: 40 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -14,76 +14,65 @@
1414

1515
package com.google.cloud.functions.invoker.http;
1616

17-
import java.util.Timer;
18-
import java.util.TimerTask;
17+
import java.time.Duration;
18+
import java.util.concurrent.TimeoutException;
1919
import java.util.concurrent.atomic.AtomicBoolean;
20-
import org.eclipse.jetty.http.HttpStatus;
2120
import org.eclipse.jetty.server.Handler;
2221
import org.eclipse.jetty.server.Request;
2322
import org.eclipse.jetty.server.Response;
2423
import org.eclipse.jetty.util.Callback;
24+
import org.eclipse.jetty.util.thread.Scheduler;
2525

2626
public class TimeoutHandler extends Handler.Wrapper {
27-
private final int timeoutMs;
27+
private final Duration timeout;
2828

2929
public TimeoutHandler(int timeoutSeconds, Handler handler) {
3030
setHandler(handler);
31-
this.timeoutMs = timeoutSeconds * 1000; // Convert seconds to milliseconds
31+
timeout = Duration.ofSeconds(timeoutSeconds);
3232
}
3333

3434
@Override
3535
public boolean handle(Request request, Response response, Callback callback) throws Exception {
36-
// Wrap the callback to ensure it is only called once between the handler and the timeout task.
37-
AtomicBoolean completed = new AtomicBoolean(false);
38-
Callback wrappedCallback = new Callback() {
39-
@Override
40-
public void succeeded() {
41-
if (completed.compareAndSet(false, true)) {
42-
callback.succeeded();
43-
}
44-
}
36+
// Wrap the callback to ensure it is only completed once between the
37+
// handler and the timeout task.
38+
Callback wrappedCallback = new ProtectedCallback(callback);
39+
Scheduler.Task timeoutTask =
40+
request
41+
.getComponents()
42+
.getScheduler()
43+
.schedule(
44+
() -> wrappedCallback.failed(new TimeoutException("Function execution timed out")),
45+
timeout);
4546

46-
@Override
47-
public void failed(Throwable x) {
48-
if (completed.compareAndSet(false, true)) {
49-
callback.failed(x);
50-
}
51-
}
47+
// Cancel the timeout if the request completes the callback first.
48+
return super.handle(request, response, Callback.from(timeoutTask::cancel, wrappedCallback));
49+
}
5250

53-
@Override
54-
public InvocationType getInvocationType() {
55-
return callback.getInvocationType();
56-
}
57-
};
51+
private static class ProtectedCallback implements Callback {
52+
private final Callback callback;
53+
private final AtomicBoolean completed = new AtomicBoolean(false);
5854

59-
// TODO: consider wrapping the request/response to throw if they are used after timeout.
60-
// TODO: Use org.eclipse.jetty.io.CyclicTimeouts which is optimized for timeouts which are almost always cancelled.
61-
Timer timer = new Timer(true);
62-
TimerTask timeoutTask =
63-
new TimerTask() {
64-
@Override
65-
public void run() {
66-
// TODO: there is a race between the handler writing response and timeout firing.
67-
// This timeout firing doesn't stop the thread handling the request / response it just writes an error to the response.
68-
Response.writeError(
69-
request,
70-
response,
71-
callback,
72-
HttpStatus.REQUEST_TIMEOUT_408,
73-
"Function execution timed out");
74-
}
75-
};
55+
public ProtectedCallback(Callback callback) {
56+
this.callback = callback;
57+
}
7658

77-
timer.schedule(timeoutTask, timeoutMs);
59+
@Override
60+
public void succeeded() {
61+
if (completed.compareAndSet(false, true)) {
62+
callback.succeeded();
63+
}
64+
}
7865

79-
boolean handle;
80-
try {
81-
handle = super.handle(request, response, wrappedCallback);
82-
timeoutTask.cancel();
83-
} finally {
84-
timer.purge();
66+
@Override
67+
public void failed(Throwable x) {
68+
if (completed.compareAndSet(false, true)) {
69+
callback.failed(x);
70+
}
8571
}
8672

87-
return handle;
73+
@Override
74+
public InvocationType getInvocationType() {
75+
return callback.getInvocationType();
76+
}
8877
}
89-
}
78+
}

invoker/core/src/main/java/com/google/cloud/functions/invoker/runner/Invoker.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,13 @@ private void startServer(boolean join) throws Exception {
283283
server.setErrorHandler(
284284
new ErrorHandler() {
285285
@Override
286-
protected void generateResponse(Request request, Response response, int code, String message, Throwable cause, Callback callback) {
286+
protected void generateResponse(
287+
Request request,
288+
Response response,
289+
int code,
290+
String message,
291+
Throwable cause,
292+
Callback callback) {
287293
// Suppress error body
288294
callback.succeeded();
289295
}

0 commit comments

Comments
 (0)