Skip to content

Commit

Permalink
Improve Netty server adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
jaguililla committed Nov 4, 2023
1 parent 60700bf commit 72a71d0
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 28 deletions.
6 changes: 3 additions & 3 deletions http/http_server_netty/api/http_server_netty.api
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ public final class com/hexagonkt/http/server/netty/NettyKt {
}

public final class com/hexagonkt/http/server/netty/NettyRequestAdapter : com/hexagonkt/http/model/HttpRequestPort {
public fun <init> (Lio/netty/handler/codec/http/HttpMethod;Lio/netty/handler/codec/http/HttpRequest;Ljava/util/List;Ljava/net/InetSocketAddress;Lio/netty/handler/codec/http/HttpHeaders;)V
public fun <init> (Lio/netty/handler/codec/http/HttpMethod;Lio/netty/handler/codec/http/HttpRequest;Ljava/util/List;Lio/netty/channel/Channel;Lio/netty/handler/codec/http/HttpHeaders;)V
public fun authorization ()Lcom/hexagonkt/http/model/Authorization;
public fun bodyString ()Ljava/lang/String;
public fun certificate ()Ljava/security/cert/X509Certificate;
Expand Down Expand Up @@ -46,8 +46,8 @@ public final class com/hexagonkt/http/server/netty/NettyRequestAdapter : com/hex

public class com/hexagonkt/http/server/netty/NettyServerAdapter : com/hexagonkt/http/server/HttpServerPort {
public fun <init> ()V
public fun <init> (IIIIZZJJ)V
public synthetic fun <init> (IIIIZZJJILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun <init> (IIIIZZJJZZZZ)V
public synthetic fun <init> (IIIIZZJJZZZZILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun groupSupplier (I)Lio/netty/channel/MultithreadEventLoopGroup;
public fun options ()Ljava/util/Map;
public fun runtimePort ()I
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,28 @@ internal class HttpChannelInitializer(
private val handlers: Map<HttpMethod, HttpHandler>,
private val executorGroup: EventExecutorGroup?,
private val settings: HttpServerSettings,
private val keepAliveHandler: Boolean = true,
private val httpAggregatorHandler: Boolean = true,
private val chunkedHandler: Boolean = true,
private val enableWebsockets: Boolean = true,
) : ChannelInitializer<SocketChannel>() {

override fun initChannel(channel: SocketChannel) {
val pipeline = channel.pipeline()

pipeline.addLast(HttpServerCodec())
// pipeline.addLast(HttpServerKeepAliveHandler())
pipeline.addLast(HttpObjectAggregator(Int.MAX_VALUE))
// pipeline.addLast(ChunkedWriteHandler())

if (keepAliveHandler)
pipeline.addLast(HttpServerKeepAliveHandler())
if (httpAggregatorHandler)
pipeline.addLast(HttpObjectAggregator(Int.MAX_VALUE))
if (chunkedHandler)
pipeline.addLast(ChunkedWriteHandler())
if (settings.zip)
pipeline.addLast(HttpContentCompressor())

val nettyServerHandler = NettyServerHandler(handlers, null)
val nettyServerHandler = NettyServerHandler(handlers, null, enableWebsockets)

if (executorGroup == null)
pipeline.addLast(nettyServerHandler)
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import io.netty.channel.ChannelInitializer
import io.netty.channel.socket.SocketChannel
import io.netty.handler.codec.http.*
import io.netty.handler.ssl.SslContext
import io.netty.handler.stream.ChunkedWriteHandler
import io.netty.util.concurrent.EventExecutorGroup

internal class HttpsChannelInitializer(
Expand All @@ -15,6 +16,10 @@ internal class HttpsChannelInitializer(
private val sslSettings: SslSettings,
private val executorGroup: EventExecutorGroup?,
private val settings: HttpServerSettings,
private val keepAliveHandler: Boolean = true,
private val httpAggregatorHandler: Boolean = true,
private val chunkedHandler: Boolean = true,
private val enableWebsockets: Boolean = true,
) : ChannelInitializer<SocketChannel>() {

override fun initChannel(channel: SocketChannel) {
Expand All @@ -24,14 +29,18 @@ internal class HttpsChannelInitializer(

pipeline.addLast(sslHandler)
pipeline.addLast(HttpServerCodec())
// pipeline.addLast(HttpServerKeepAliveHandler())
pipeline.addLast(HttpObjectAggregator(Int.MAX_VALUE))
// pipeline.addLast(ChunkedWriteHandler())

if (keepAliveHandler)
pipeline.addLast(HttpServerKeepAliveHandler())
if (httpAggregatorHandler)
pipeline.addLast(HttpObjectAggregator(Int.MAX_VALUE))
if (chunkedHandler)
pipeline.addLast(ChunkedWriteHandler())
if (settings.zip)
pipeline.addLast(HttpContentCompressor())

val serverHandler = NettyServerHandler(handlers, handlerSsl)
val serverHandler = NettyServerHandler(handlers, handlerSsl, enableWebsockets)

if (executorGroup == null)
pipeline.addLast(serverHandler)
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.hexagonkt.http.parseContentType
import io.netty.buffer.ByteBufHolder
import io.netty.buffer.ByteBufUtil
import io.netty.buffer.Unpooled
import io.netty.channel.Channel
import io.netty.handler.codec.http.HttpHeaderNames.*
import io.netty.handler.codec.http.HttpHeaders
import io.netty.handler.codec.http.QueryStringDecoder
Expand All @@ -26,10 +27,12 @@ class NettyRequestAdapter(
methodName: NettyHttpMethod,
req: HttpRequest,
override val certificateChain: List<X509Certificate>,
address: InetSocketAddress,
channel: Channel,
nettyHeaders: HttpHeaders,
) : HttpRequestPort {

private val address: InetSocketAddress by lazy { channel.remoteAddress() as InetSocketAddress }

override val accept: List<ContentType> by lazy {
nettyHeaders.getAll(ACCEPT).flatMap { it.split(",") }.map { parseContentType(it) }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ open class NettyServerAdapter(
private val soKeepAlive: Boolean = true,
private val shutdownQuietSeconds: Long = 0,
private val shutdownTimeoutSeconds: Long = 0,
private val keepAliveHandler: Boolean = true,
private val httpAggregatorHandler: Boolean = true,
private val chunkedHandler: Boolean = true,
private val enableWebsockets: Boolean = true,
) : HttpServerPort {

private var nettyChannel: Channel? = null
Expand Down Expand Up @@ -120,7 +124,15 @@ open class NettyServerAdapter(
) =
when {
sslSettings != null -> sslInitializer(sslSettings, handlers, group, settings)
else -> HttpChannelInitializer(handlers, group, settings)
else -> HttpChannelInitializer(
handlers,
group,
settings,
keepAliveHandler,
httpAggregatorHandler,
chunkedHandler,
enableWebsockets,
)
}

private fun sslInitializer(
Expand All @@ -129,7 +141,17 @@ open class NettyServerAdapter(
group: DefaultEventExecutorGroup?,
settings: HttpServerSettings
): HttpsChannelInitializer =
HttpsChannelInitializer(handlers, sslContext(sslSettings), sslSettings, group, settings)
HttpsChannelInitializer(
handlers,
sslContext(sslSettings),
sslSettings,
group,
settings,
keepAliveHandler,
httpAggregatorHandler,
chunkedHandler,
enableWebsockets,
)

private fun sslContext(sslSettings: SslSettings): SslContext {
val keyManager = createKeyManagerFactory(sslSettings)
Expand Down Expand Up @@ -191,5 +213,9 @@ open class NettyServerAdapter(
NettyServerAdapter::soKeepAlive to soKeepAlive,
NettyServerAdapter::shutdownQuietSeconds to shutdownQuietSeconds,
NettyServerAdapter::shutdownTimeoutSeconds to shutdownTimeoutSeconds,
NettyServerAdapter::keepAliveHandler to keepAliveHandler,
NettyServerAdapter::httpAggregatorHandler to httpAggregatorHandler,
NettyServerAdapter::chunkedHandler to chunkedHandler,
NettyServerAdapter::enableWebsockets to enableWebsockets,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ import io.netty.handler.codec.http.cookie.ServerCookieEncoder.STRICT as STRICT_E
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory
import io.netty.handler.ssl.SslHandler
import io.netty.handler.ssl.SslHandshakeCompletionEvent
import java.net.InetSocketAddress
import java.security.cert.X509Certificate
import java.util.concurrent.Flow.*
import com.hexagonkt.http.model.HttpRequest as HexagonHttpRequest

internal class NettyServerHandler(
private val handlers: Map<HttpMethod, HttpHandler>,
private val sslHandler: SslHandler?,
private val enableWebsockets: Boolean = true,
) : ChannelInboundHandlerAdapter() {

private var certificates: List<X509Certificate> = emptyList()
Expand All @@ -47,18 +47,17 @@ internal class NettyServerHandler(
}

private fun readHttpRequest(context: ChannelHandlerContext, nettyRequest: HttpRequest) {
// val result = nettyRequest.decoderResult()
//
// if (result.isFailure)
// throw IllegalStateException(result.cause())
val result = nettyRequest.decoderResult()

if (result.isFailure)
throw IllegalStateException(result.cause())

val channel = context.channel()
val address = channel.remoteAddress() as InetSocketAddress
val method = nettyRequest.method()
val pathHandler = handlers[method]

val headers = nettyRequest.headers()
val request = NettyRequestAdapter(method, nettyRequest, certificates, address, headers)
val request = NettyRequestAdapter(method, nettyRequest, certificates, channel, headers)

if (pathHandler == null) {
writeResponse(context, request, HttpResponse(), HttpUtil.isKeepAlive(nettyRequest))
Expand All @@ -68,15 +67,12 @@ internal class NettyServerHandler(
val resultContext = pathHandler.process(request)
val response = resultContext.event.response

val body = response.body
val connection = headers[CONNECTION]?.lowercase()
val upgrade = headers[UPGRADE]?.lowercase()
val isWebSocket =
if (enableWebsockets) isWebsocket(headers, method, response.status)
else false

val body = response.body
val isSse = body is Publisher<*>
val isWebSocket = connection == "upgrade"
&& upgrade == "websocket"
&& method == GET
&& response.status == ACCEPTED_202

when {
isSse -> handleSse(context, request, response, body)
Expand All @@ -85,6 +81,15 @@ internal class NettyServerHandler(
}
}

private fun isWebsocket(headers: HttpHeaders, method: HttpMethod, status: HttpStatus): Boolean {
val connection = headers[CONNECTION]?.lowercase()
val upgrade = headers[UPGRADE]?.lowercase()
return connection == "upgrade"
&& upgrade == "websocket"
&& method == GET
&& status == ACCEPTED_202
}

@Suppress("UNCHECKED_CAST") // Body not cast to Publisher<HttpServerEvent> due to type erasure
private fun handleSse(
context: ChannelHandlerContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,23 @@ internal class AdapterBenchmarkIT : BenchmarkIT(clientAdapter, serverAdapter)
internal class AdapterSseTest : SseTest(clientAdapter, serverAdapter)
@DisabledInNativeImage // TODO Fix this
internal class AdapterWebSocketsTest : WebSocketsTest(clientAdapter, serverAdapter)

val liteServerAdapter: () -> NettyServerAdapter = {
NettyServerAdapter(
keepAliveHandler = false,
httpAggregatorHandler = false,
chunkedHandler = false,
enableWebsockets = false,
)
}

internal class LiteAdapterBooksTest : BooksTest(clientAdapter, liteServerAdapter)
internal class LiteAdapterErrorsTest : ErrorsTest(clientAdapter, liteServerAdapter)
internal class LiteAdapterFiltersTest : FiltersTest(clientAdapter, liteServerAdapter)
@DisabledOnOs(WINDOWS) // TODO Make this work on GitHub runners
internal class LiteAdapterHttpsTest : HttpsTest(clientAdapter, liteServerAdapter)
internal class LiteAdapterZipTest : ZipTest(clientAdapter, liteServerAdapter)
internal class LiteAdapterCookiesTest : CookiesTest(clientAdapter, liteServerAdapter)
internal class LiteAdapterCorsTest : CorsTest(clientAdapter, liteServerAdapter)
internal class LiteAdapterBenchmarkIT : BenchmarkIT(clientAdapter, liteServerAdapter)
internal class LiteAdapterSseTest : SseTest(clientAdapter, liteServerAdapter)

0 comments on commit 72a71d0

Please sign in to comment.