-
Notifications
You must be signed in to change notification settings - Fork 435
Chunked transmission lasts longer than timeout #4214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
2244b8c
to
aded4cc
Compare
...ty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsRequestTimeoutTest.scala
Outdated
Show resolved
Hide resolved
There are two differences compared to the original report:
|
49a1e12
to
ad42e13
Compare
Thanks for your precise comment. I've rewritten it to |
.streamBody(Fs2Streams[IO])(inputStream) | ||
.send(backend) | ||
.map{ response => | ||
response.code shouldBe StatusCode.Ok |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm well if this test passes, something is wrong - we set the timeout to 1s, so we should never receive a response if it takes 2s to send it? unless the request timeout is for something else?
anyway, this doesn't test the scenario from the test case - where the transmission is interrupted half-way because of connection problems; I don't know if we can simulate this in a test case, but using a timeout is a good approximation. But probably a good way to check if we can at all reproduce the bug is to run: a long-running client sender process; a server process; then kill -9
the client process when it's half-way sending the data, and seeing on the server if received the incomplete data in the server logic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have given it another try as you suggested. There are playServer and longLastingClient but I don't know what's wrong with that approach. Suggestions are welcome 💡
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When you write "I don't know what's wrong with this approach ", do you mean that it works as expected (that is: you run both, interrupt the client brutally after some time, and the server properly closes the connection), or is there something else that's wrong?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm trying with the following steps:
$ scala-cli run playServer.scala
[playServer-pekko.actor.default-dispatcher-4] INFO org.apache.pekko.event.slf4j.Slf4jLogger - Slf4jLogger started
[application-pekko.actor.default-dispatcher-6] INFO org.apache.pekko.event.slf4j.Slf4jLogger - Slf4jLogger started
[main] INFO play.api.Play - Application started (Dev) (no global state)
[main] INFO play.core.server.NettyServer - Listening for HTTP on /0:0:0:0:0:0:0:0:9000
$ scala-cli run longLastingClient.scala
ByteString(65, 65, 65, 65, 65, 65, 65, 65, 65, 65) 17:46:14.514330
ByteString(65, 65, 65, 65, 65, 65, 65, 65, 65, 65) 17:46:15.534314
ByteString(65, 65, 65, 65, 65, 65, 65, 65, 65, 65) 17:46:16.553033
ByteString(65, 65, 65, 65, 65, 65, 65, 65, 65, 65) 17:46:17.573135
....
- server side
Received 10000 bytes, Source(SourceShape(Map.out(1467551936))) bytes in total
-
$ ps aux | grep longLastingClient | awk '{print $2}' | head -n 1 | xargs kill -9
-
and nothing new (error/exception/whatever) on server side 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the lack of exception/error might be the problem here ;) but first there are two problems in the code:
- in the client code, you claim to send 10000 bytes in the content-length, but in fact you might send more?
- in the server code, you do
stream.map(_.length)
, which just creates aSource[Long]
, that is a description of a stream that produces lenghts of received byte-strings (byte chunks). You never run (receive) the stream, and that's where you'd expect to see errors (when the stream is being run)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the reproducing test case :). Though if you are closing the connection & socket on the client side, the server side receives a "data complete signal" (even though it's less than the declared amount), and continues to process it. So SimpleSubscriber.onComplete
is called, which gathers all buffers into a byte array and passes it for parsing. Hence I would say that the current behavior is at least somewhat correct. What would you expect instead?
As for the LEAK
warning, that must be a bug somewhere. Although this unfortunately I can't reproduce. I tried following your line of thought, but I can't see where the future is GCed before being fully handled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would expect received bytes to be skipped (not pushed down) and 400 Bad request
to be returned immediately.
Not sure if I can reproduce LEAK with this sample code but in my production code I can easily reproduce it by:
- Repeating partial requests in the loop
- Limiting the heap size for the server
After doing ~200-300 requests I get
2025-04-22 09:32:38,452 ERROR[KQueueEventLoopGroup-2-12] i.n.u.ResourceLeakDetector - [ResourceLeakDetector.java:337] LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
#1:
Hint: 'HttpStreamsServerHandler#0-body-publisher' will handle the message from this point.
io.netty.handler.codec.http.DefaultHttpContent.touch(DefaultHttpContent.java:86)
io.netty.handler.codec.http.DefaultHttpContent.touch(DefaultHttpContent.java:25)
io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:115)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
org.playframework.netty.http.HttpStreamsHandler.handleReadHttpContent(HttpStreamsHandler.java:194)
org.playframework.netty.http.HttpStreamsHandler.channelRead(HttpStreamsHandler.java:165)
org.playframework.netty.http.HttpStreamsServerHandler.channelRead(HttpStreamsServerHandler.java:96)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:107)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:359)
io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1429)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918)
io.netty.channel.kqueue.AbstractKQueueStreamChannel$KQueueStreamUnsafe.readReady(AbstractKQueueStreamChannel.java:543)
io.netty.channel.kqueue.AbstractKQueueChannel$AbstractKQueueUnsafe.readEOF(AbstractKQueueChannel.java:552)
io.netty.channel.kqueue.AbstractKQueueChannel$AbstractKQueueUnsafe.handle(AbstractKQueueChannel.java:435)
io.netty.channel.kqueue.KQueueIoHandler$DefaultKqueueIoRegistration.handle(KQueueIoHandler.java:396)
io.netty.channel.kqueue.KQueueIoHandler.processReady(KQueueIoHandler.java:181)
io.netty.channel.kqueue.KQueueIoHandler.run(KQueueIoHandler.java:237)
io.netty.channel.SingleThreadIoEventLoop.runIo(SingleThreadIoEventLoop.java:204)
io.netty.channel.SingleThreadIoEventLoop.run(SingleThreadIoEventLoop.java:175)
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:1073)
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.base/java.lang.Thread.run(Thread.java:1583)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks; could you maybe create two new issues: (1) return 400 if not all declared bytes are received, and (2) leak when running many incomplete requests in Netty?
I think these are separate from the one discussed in this PR
For (1) I'd also like to verify other servers, what's the "living standard"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know akka/pekko is doing exactly this. When I call toStrictEntity
HttpMessageParser
returns EntityStreamError(ErrorInfo( "Entity stream truncation. The HTTP parser was receiving an entity when the underlying connection was closed unexpectedly."))
.
toStrictEntity
throws throw IllegalRequestException(StatusCodes.BadRequest, info)
which is then handled inside ExceptionHandler.default
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I updated the title of the previously created bug [BUG] Server should return 400 Bad request if not all body bytes received #4169 for the first problem
- Created a new bug for the LEAK [BUG] LEAK: ByteBuf.release() was not called before it's garbage-collected #4539
4281e96
to
f1aad13
Compare
*Why I did it?* In order to have a test which might confirm an issue with an interrupted request *How I did it:* I prepared `NettyCatsRequestTimeoutTest` with the folloing test scenario: - send first chunk (100 bytes) - sleep - send second chunk (100 bytes)
- add PlayServerTest instead of NettyCatsServerTest - improve fs2 implementation
This reverts commit 02ba4b0.
f1aad13
to
c8f0be9
Compare
Why I did it?
In order to have a test which might confirm an issue
with an interrupted request
How I did it:
I prepared
NettyCatsRequestTimeoutTest
with the folloing test scenario:Closes #4169