Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion buffer-netty/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ dependencies {

annotationProcessor(projects.micronautInjectJava)

testRuntimeOnly(projects.micronautHttpTck) // leak detection module
testRuntimeOnly(libs.micronaut.test.netty.leak)
}

tasks {
Expand Down
3 changes: 2 additions & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ micronaut-build-plugins="7.6.4"
micronaut-groovy = "4.7.0"
micronaut-session = "4.6.0"
micronaut-sql = "5.3.0"
micronaut-test = "4.6.2"
micronaut-test = "4.10.2"
micronaut-validation = "4.9.0"
micronaut-rxjava2 = "2.7.0"
micronaut-rxjava3 = "3.7.0"
Expand Down Expand Up @@ -264,6 +264,7 @@ micronaut-test-junit5 = { module = "io.micronaut.test:micronaut-test-junit5", ve
micronaut-test-kotest5 = { module = "io.micronaut.test:micronaut-test-kotest5", version.ref = "micronaut-test" }
micronaut-test-spock = { module = "io.micronaut.test:micronaut-test-spock", version.ref = "micronaut-test" }
micronaut-test-type-pollution = { module = "io.micronaut.test:micronaut-test-type-pollution", version.ref = "micronaut-test" }
micronaut-test-netty-leak = { module = "io.micronaut.test:micronaut-test-netty-leak", version.ref = "micronaut-test" }

micronaut-sql-jdbc = { module = "io.micronaut.sql:micronaut-jdbc", version.ref = "micronaut-sql" }
micronaut-sql-jdbc-tomcat = { module = "io.micronaut.sql:micronaut-jdbc-tomcat", version.ref = "micronaut-sql" }
Expand Down
3 changes: 2 additions & 1 deletion http-server-netty/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ dependencies {
testImplementation(libs.bcpkix)
testImplementation(libs.managed.netty.pkitesting)
testImplementation(projects.micronautJacksonDatabind)
testImplementation(projects.micronautHttpTck)
// Add Micronaut Jackson XML after v4 Migration
// testImplementation(libs.managed.micronaut.xml) {
// exclude module:'micronaut-inject'
Expand Down Expand Up @@ -125,12 +124,14 @@ dependencies {
exclude(group = "io.micronaut")
}
testImplementation(libs.junit.jupiter.api)
testImplementation(libs.micronaut.test.netty.leak)
}

tasks.withType<Test>().configureEach {
forkEvery = 100
maxParallelForks = 4
useJUnitPlatform()
systemProperty("junit.jupiter.extensions.autodetection.enabled", "true")
}

//tasks.withType(Test).configureEach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.micronaut.http.server.ResponseLifecycle;
import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.codec.http.HttpContent;
import io.netty.util.LeakPresenceDetector;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

Expand Down Expand Up @@ -97,7 +98,7 @@ private NettyByteBodyFactory byteBodyFactory() {
}

private static class NettyConcatenatingSubscriber extends ConcatenatingSubscriber implements BufferConsumer {
static final Separators JSON_NETTY = Separators.jsonSeparators(NettyReadBufferFactory.of(ByteBufAllocator.DEFAULT));
static final Separators JSON_NETTY = LeakPresenceDetector.staticInitializer(() -> Separators.jsonSeparators(NettyReadBufferFactory.of(ByteBufAllocator.DEFAULT)));

private final EventLoopFlow flow;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,8 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
if (idle.state() == IdleState.ALL_IDLE) {
ctx.close();
}
super.userEventTriggered(ctx, evt);
}
super.userEventTriggered(ctx, evt);
}

/**
Expand All @@ -276,6 +276,8 @@ public void handleFakeRequest(io.netty.handler.codec.http2.Http2Stream onStream,
stream.onHeadersRead(fhr, empty);
if (!empty) {
stream.onDataRead(fhr.content(), true);
} else {
fhr.content().release();
}
}
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ import io.micronaut.http.hateoas.JsonError
import io.micronaut.http.hateoas.Link
import io.micronaut.http.server.netty.AbstractMicronautSpec
import io.micronaut.json.JsonSyntaxException
import io.micronaut.scheduling.TaskExecutors
import jakarta.inject.Inject
import jakarta.inject.Named
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import reactor.core.scheduler.Schedulers
import spock.lang.Issue

import java.util.concurrent.CompletableFuture
import java.util.concurrent.Executor

class JsonBodyBindingSpec extends AbstractMicronautSpec {

Expand Down Expand Up @@ -388,6 +392,9 @@ class JsonBodyBindingSpec extends AbstractMicronautSpec {
@Controller(value = "/json", produces = io.micronaut.http.MediaType.APPLICATION_JSON)
@Requires(property = "test.controller", value = "JsonController")
static class JsonController {
@Inject
@Named(TaskExecutors.BLOCKING)
Executor blocking

@Post("/params")
String params(String name, int age) {
Expand Down Expand Up @@ -472,7 +479,7 @@ class JsonBodyBindingSpec extends AbstractMicronautSpec {
@Post("/publisher-object")
Publisher<String> publisherObject(@Body Publisher<Foo> publisher) {
return Flux.from(publisher)
.subscribeOn(Schedulers.boundedElastic())
.subscribeOn(Schedulers.fromExecutor(blocking))
.map({ Foo foo ->
foo.toString()
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class ServerRequestContextSpec extends Specification {
Mono.fromCallable({ ->
def request = ServerRequestContext.currentRequest().orElseThrow { -> new RuntimeException("no request") }
request.uri.toString()
}).subscribeOn(Schedulers.boundedElastic())
}).subscribeOn(Schedulers.fromExecutor(executorService))
}

@Get("/reactor-context")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@ import io.micronaut.context.ApplicationContext
import io.micronaut.context.BeanProvider
import io.micronaut.context.annotation.Replaces
import io.micronaut.context.annotation.Requires
import io.micronaut.http.annotation.Body
import io.micronaut.core.annotation.Nullable
import io.micronaut.http.annotation.Body
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.http.annotation.Post
import io.micronaut.http.netty.channel.EventLoopGroupConfiguration
import io.micronaut.http.netty.channel.EventLoopGroupRegistry
import io.micronaut.http.server.HttpServerConfiguration
import io.micronaut.http.server.netty.NettyHttpServer
import io.micronaut.http.tck.netty.TestLeakDetector
import io.micronaut.http.server.util.DefaultHttpHostResolver
import io.micronaut.runtime.server.EmbeddedServer
import io.netty.bootstrap.Bootstrap
Expand All @@ -36,8 +35,6 @@ class FuzzyInputSpec extends Specification {

def 'http1 cleartext buffer leaks'() {
given:
TestLeakDetector.startTracking("")

ApplicationContext ctx = ApplicationContext.run([
'spec.name': 'FuzzyInputSpec',
"micronaut.server.port": "-1",
Expand Down Expand Up @@ -67,7 +64,7 @@ class FuzzyInputSpec extends Specification {
channel.closeFuture().sync()

then:
TestLeakDetector.stopTrackingAndReportLeaks()
noExceptionThrown()

cleanup:
embeddedServer.stop()
Expand All @@ -82,7 +79,6 @@ class FuzzyInputSpec extends Specification {
def 'http1 cleartext embedded channel'() {
given:
FlagAppender.clear()
TestLeakDetector.startTracking("")

ApplicationContext ctx = ApplicationContext.run([
'spec.name': 'FuzzyInputSpec',
Expand All @@ -107,7 +103,6 @@ class FuzzyInputSpec extends Specification {
then:
embeddedChannel.checkException()

TestLeakDetector.stopTrackingAndReportLeaks()
FlagAppender.checkTriggered()

where:
Expand All @@ -131,7 +126,6 @@ class FuzzyInputSpec extends Specification {
def 'http2 cleartext embedded channel'() {
given:
FlagAppender.clear()
TestLeakDetector.startTracking("")

ApplicationContext ctx = ApplicationContext.run([
'spec.name': 'FuzzyInputSpec',
Expand All @@ -157,7 +151,6 @@ class FuzzyInputSpec extends Specification {
then:
embeddedChannel.checkException()

TestLeakDetector.stopTrackingAndReportLeaks()
FlagAppender.checkTriggered()

where:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler
import io.netty.handler.ssl.SslContextBuilder
import io.netty.handler.ssl.SupportedCipherSuiteFilter
import io.netty.handler.ssl.util.InsecureTrustManagerFactory
import io.netty.util.ReferenceCountUtil
import jakarta.inject.Singleton
import org.slf4j.LoggerFactory
import reactor.core.publisher.Mono
Expand Down Expand Up @@ -77,8 +78,10 @@ class AccessLogSpec extends Specification {
server.start()

def responses = new CopyOnWriteArrayList<FullHttpResponse>()

def group = new NioEventLoopGroup(1)
Bootstrap bootstrap = new Bootstrap()
.group(new NioEventLoopGroup(1))
.group(group)
.channel(NioSocketChannel)
.option(ChannelOption.AUTO_READ, true)
.handler(new ChannelInitializer<Channel>() {
Expand Down Expand Up @@ -128,7 +131,8 @@ class AccessLogSpec extends Specification {
responses*.content().forEach(ByteBuf::release)
server.close()
channel.close()
bootstrap.config().group().shutdownGracefully()
ctx.close()
group.shutdownGracefully()
}

@Issue('https://github.com/micronaut-projects/micronaut-core/issues/6782')
Expand All @@ -144,8 +148,10 @@ class AccessLogSpec extends Specification {
server.start()

def responses = new CopyOnWriteArrayList<FullHttpResponse>()

def group = new NioEventLoopGroup(1)
Bootstrap bootstrap = new Bootstrap()
.group(new NioEventLoopGroup(1))
.group(group)
.channel(NioSocketChannel)
.option(ChannelOption.AUTO_READ, true)
.handler(new ChannelInitializer<Channel>() {
Expand Down Expand Up @@ -194,7 +200,8 @@ class AccessLogSpec extends Specification {
responses*.content().forEach(ByteBuf::release)
server.close()
channel.close()
bootstrap.config().group().shutdownGracefully()
ctx.close()
group.shutdownGracefully()
}

@Issue('https://github.com/micronaut-projects/micronaut-core/issues/6782')
Expand All @@ -209,8 +216,10 @@ class AccessLogSpec extends Specification {
server.start()

def responses = new CopyOnWriteArrayList<FullHttpResponse>()

def group = new NioEventLoopGroup(1)
Bootstrap bootstrap = new Bootstrap()
.group(new NioEventLoopGroup(1))
.group(group)
.channel(NioSocketChannel)
.option(ChannelOption.AUTO_READ, true)
.handler(new ChannelInitializer<Channel>() {
Expand Down Expand Up @@ -272,7 +281,7 @@ class AccessLogSpec extends Specification {
responses*.content().forEach(ByteBuf::release)
server.close()
channel.close()
bootstrap.config().group().shutdownGracefully()
group.shutdownGracefully()
}

@Issue('https://github.com/micronaut-projects/micronaut-core/issues/6782')
Expand Down Expand Up @@ -314,8 +323,10 @@ class AccessLogSpec extends Specification {
ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT,
ApplicationProtocolNames.HTTP_2))
.build()

def group = new NioEventLoopGroup(1)
Bootstrap bootstrap = new Bootstrap()
.group(new NioEventLoopGroup(1))
.group(group)
.channel(NioSocketChannel)
.option(ChannelOption.AUTO_READ, true)
.handler(new ChannelInitializer<Channel>() {
Expand Down Expand Up @@ -386,7 +397,9 @@ class AccessLogSpec extends Specification {
responses*.content().forEach(ByteBuf::release)
server.close()
channel.close()
bootstrap.config().group().shutdownGracefully()
ctx.close()
ReferenceCountUtil.release(ctx)
group.shutdownGracefully()
}

@Issue('https://github.com/micronaut-projects/micronaut-core/issues/6782')
Expand Down Expand Up @@ -415,8 +428,9 @@ class AccessLogSpec extends Specification {
request4.headers().add(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), ':https')

def responses = new CopyOnWriteArrayList<FullHttpResponse>()
def group = new NioEventLoopGroup(1)
Bootstrap bootstrap = new Bootstrap()
.group(new NioEventLoopGroup(1))
.group(group)
.channel(NioSocketChannel)
.option(ChannelOption.AUTO_READ, true)
.handler(new ChannelInitializer<Channel>() {
Expand Down Expand Up @@ -484,7 +498,8 @@ class AccessLogSpec extends Specification {
responses*.content().forEach(ByteBuf::release)
server.close()
channel.close()
bootstrap.config().group().shutdownGracefully()
ctx.close()
group.shutdownGracefully()
}

@Issue('https://github.com/micronaut-projects/micronaut-core/issues/6782')
Expand Down Expand Up @@ -514,8 +529,10 @@ class AccessLogSpec extends Specification {
request4.headers().add(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), ':https')

def responses = new CopyOnWriteArrayList<FullHttpResponse>()

def group = new NioEventLoopGroup(1)
Bootstrap bootstrap = new Bootstrap()
.group(new NioEventLoopGroup(1))
.group(group)
.channel(NioSocketChannel)
.option(ChannelOption.AUTO_READ, true)
.handler(new ChannelInitializer<Channel>() {
Expand Down Expand Up @@ -580,7 +597,8 @@ class AccessLogSpec extends Specification {
responses*.content().forEach(ByteBuf::release)
server.close()
channel.close()
bootstrap.config().group().shutdownGracefully()
ctx.close()
group.shutdownGracefully()
}

@Issue('https://github.com/micronaut-projects/micronaut-core/issues/6782')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@ class H2cSpec extends Specification {

private CompletableFuture requestUpgrade(DefaultFullHttpRequest initialRequest) {
def responseFuture = new CompletableFuture()
def group = new NioEventLoopGroup(1)
def bootstrap = new Bootstrap()
.remoteAddress(embeddedServer.host, embeddedServer.port)
.group(new NioEventLoopGroup())
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
Expand Down Expand Up @@ -120,7 +121,7 @@ class H2cSpec extends Specification {
channel.writeAndFlush(initialRequest)
channel.read()

return responseFuture
return responseFuture.whenComplete((r, e) -> group.shutdownGracefully())
}

void 'test using direct netty http2 client'() {
Expand All @@ -129,6 +130,9 @@ class H2cSpec extends Specification {

expect:
responseFuture.get(10, TimeUnit.SECONDS) != null

cleanup:
ReferenceCountUtil.release(responseFuture.getNow(null))
}

void 'test using micronaut http client: retrieve'() {
Expand Down
Loading
Loading