Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.avaje.jex.http3.flupke.core.H3ServerProvider;
import io.avaje.jex.http3.flupke.webtransport.WebTransportEntry;
import io.avaje.jex.http3.flupke.webtransport.WebTransportHandler;
import io.avaje.jex.security.Role;
import io.avaje.jex.spi.JexPlugin;
import io.avaje.spi.ServiceProvider;
import tech.kwik.core.server.ServerConnectionConfig;
Expand All @@ -33,8 +34,8 @@ public final class FlupkeJexPlugin implements JexPlugin {
private String certAlias;

/**
* Constructor for automatic registration with default configuration.
* Use {@link #create()} when registering manually.
* Constructor for automatic registration with default configuration. Use {@link #create()} when
* registering manually.
*/
public FlupkeJexPlugin() {}

Expand Down Expand Up @@ -115,9 +116,10 @@ public FlupkeJexPlugin connectionConfig(Consumer<ServerConnectionConfig.Builder>
*
* @param path The URL path (e.g., "/my-webtransport-endpoint").
* @param handler The fully configured WebTransportHandler.
* @param roles The roles assigned to the registered CONNECT endpoint.
* @return This plugin instance for chaining.
*/
public FlupkeJexPlugin webTransport(String path, WebTransportHandler handler) {
public FlupkeJexPlugin webTransport(String path, WebTransportHandler handler, Role... roles) {
this.wts.add(new WebTransportEntry(path, handler));
return this;
}
Expand All @@ -130,10 +132,11 @@ public FlupkeJexPlugin webTransport(String path, WebTransportHandler handler) {
*
* @param path The URL path (e.g., "/my-webtransport-endpoint").
* @param consumer A consumer to configure the {@code WebTransportHandler.Builder}.
* @param roles The roles assigned to the registered CONNECT endpoint.
* @return This plugin instance for chaining.
* @see WebTransportHandler.Builder
*/
public FlupkeJexPlugin webTransport(String path, Consumer<WebTransportHandler.Builder> consumer) {
public FlupkeJexPlugin webTransport(String path, Consumer<WebTransportHandler.Builder> consumer, Role... roles) {
var b = WebTransportHandler.builder();
consumer.accept(b);
this.wts.add(new WebTransportEntry(path, b.build()));
Expand All @@ -145,5 +148,13 @@ public void apply(Jex jex) {
jex.config()
.serverProvider(
new H3ServerProvider(consumer, connection, certAlias, wts, extensions, socket));

wts.forEach(
entry ->
jex.routing()
.connect(
entry.path(),
ctx -> ctx.exchange().setAttribute("webtransport-handler", true),
entry.roles()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class FlupkeExchange extends HttpExchange {
private int statusCode = 0;
private InputStream is;
private final PlaceholderOutputStream os = new PlaceholderOutputStream();
private boolean webtransportHandler;

public FlupkeExchange(HttpServerRequest request, HttpServerResponse response, HttpContext ctx) {
this.request = request;
Expand Down Expand Up @@ -70,6 +71,9 @@ public HttpContext getHttpContext() {

@Override
public void close() {
if (webtransportHandler) {
return;
}
try (var __ = is;
var ___ = response.getOutputStream(); ) {
} catch (IOException e) {
Expand All @@ -89,6 +93,10 @@ public OutputStream getResponseBody() {

@Override
public void sendResponseHeaders(int status, long responseLength) throws IOException {
if (webtransportHandler) {
os.wrapped = response.getOutputStream();
return;
}
statusCode = status;
if (responseLength > 0) {
responseHeaders.add("Content-length", Long.toString(responseLength));
Expand Down Expand Up @@ -126,6 +134,11 @@ public Object getAttribute(String name) {
@Override
public void setAttribute(String name, Object value) {
attributes.put(name, value);
if ("webtransport-handler".equals(name)) {
this.webtransportHandler = true;
statusCode = 200;
response.setStatus(statusCode);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
import io.avaje.jex.http3.flupke.webtransport.WebTransportEvent.Close;
import io.avaje.jex.http3.flupke.webtransport.WebTransportEvent.Open;
import io.avaje.jex.http3.flupke.webtransport.WebTransportEvent.UniStream;
import io.avaje.jex.security.Role;
import tech.kwik.flupke.webtransport.Session;

/** Entry for webtransport */
public final record WebTransportEntry(String path, WebTransportHandler handler)
public final record WebTransportEntry(String path, WebTransportHandler handler, Role... roles)
implements Consumer<Session> {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

Expand Down Expand Up @@ -30,6 +31,7 @@
import io.avaje.jex.http3.flupke.webtransport.WebTransportEvent.BiStream;
import io.avaje.jex.ssl.SslPlugin;
import tech.kwik.flupke.Http3Client;
import tech.kwik.flupke.HttpError;
import tech.kwik.flupke.webtransport.ClientSessionFactory;
import tech.kwik.flupke.webtransport.Session;
import tech.kwik.flupke.webtransport.WebTransportStream;
Expand Down Expand Up @@ -154,6 +156,32 @@ private final void startServer(FlupkeJexPlugin webTransport) {
localhost = URI.create("https://localhost:%s/".formatted(jex.port()));
}

@Test
void testFilterClose() throws Exception {
jex =
Jex.create()
.plugin(ssl)
.filter(
(ctx, chain) -> {
ctx.status(403).write("Unauthorized");
})
.plugin(FlupkeJexPlugin.create().webTransport("/filter", b -> {}))
.port(0)
.start();
localhost = URI.create("https://localhost:%s/".formatted(jex.port()));

var clientSessionFactory =
ClientSessionFactory.newBuilder()
.serverUri(localhost.resolve("/filter"))
.httpClient(client)
.build();
var ex =
assertThrows(
HttpError.class,
() -> clientSessionFactory.createSession(localhost.resolve("/filter")));
assertEquals(403, ex.getStatusCode());
}

@Test
void testServerInitiatedUnidirectionalStream() throws Exception {
CountDownLatch clientLatch = new CountDownLatch(1);
Expand Down
6 changes: 6 additions & 0 deletions avaje-jex/src/main/java/io/avaje/jex/DefaultRouting.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ private void add(Type verb, String path, ExchangeHandler handler, Role... roles)
// HTTP verbs
// ********************************************************************************************

@Override
public Routing connect(String path, ExchangeHandler handler, Role... roles) {
add(Type.CONNECT, path, handler, roles);
return this;
}

@Override
public Routing get(String path, ExchangeHandler handler, Role... roles) {
add(Type.GET, path, handler, roles);
Expand Down
11 changes: 11 additions & 0 deletions avaje-jex/src/main/java/io/avaje/jex/Routing.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ public interface Routing {
*/
Routing head(String path, ExchangeHandler handler, Role... roles);

/**
* Adds a CONNECT handler to the route configuration.
*
* @param path The path pattern to match the request URI.
* @param handler The handler to invoke when a CONNECT request matches the path.
* @param roles roles that are associated with this endpoint.
*/
Routing connect(String path, ExchangeHandler handler, Role... roles);

/**
* Adds a GET handler to the route configuration.
*
Expand Down Expand Up @@ -200,6 +209,7 @@ interface Entry {

/** The type of route entry. */
enum Type {
CONNECT,
GET,
POST,
PUT,
Expand All @@ -209,4 +219,5 @@ enum Type {
TRACE,
OPTIONS;
}

}
Loading