Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Server streams request bodies forever when not completed normally #2297

Closed
jgulotta opened this issue Jul 14, 2023 · 26 comments · Fixed by #3060
Closed

Server streams request bodies forever when not completed normally #2297

jgulotta opened this issue Jul 14, 2023 · 26 comments · Fixed by #3060
Labels
💎 Bounty bug Something isn't working 💰 Rewarded

Comments

@jgulotta
Copy link
Contributor

jgulotta commented Jul 14, 2023

Describe the bug
When a request is not completed successfully, e.g. the connection drops, the streaming body does not terminate

To Reproduce

  1. Run this app
import zio.*
import zio.http.*

object StreamForever extends ZIOAppDefault {
  def run = {
    val forever = Http.collectZIO[Request] {
      case req @ Method.POST -> Root / "forever" =>
        req.body.asStream.runForeachChunk(c => ZIO.logInfo(s"read chunk size ${c.size}"))
          .foldCauseZIO(
            c => ZIO.logWarningCause("test stream failed", c).as(Response.status(Status.InternalServerError)),
            _ => ZIO.logInfo("test stream done").as(Response.status(Status.NoContent))
          )
    }

    val server = Server.defaultWith(_.enableRequestStreaming)

    Server.serve(forever).provide(server)
  }
}
  1. Stream any large request, e.g. using HTTPie
yes hello | http --chunked POST localhost:8080/forever

The output read chunk size 5 will print continually

  1. Terminate the stream before it finishes, e.g. Ctrl+C

The output read chunk size 5 will still print continually, until the buffer is cleared, and then nothing.

Expected behaviour
The stream from req.body.asStream interrupts or dies, printing the final messages in foldCauseZIO

Curiously, I have to make one request, cancel it, and then make a second to actually get any output at all. I also expect the initial request would trigger the streaming.

@jgulotta jgulotta added the bug Something isn't working label Jul 14, 2023
@jdegoes
Copy link
Member

jdegoes commented Jul 27, 2023

/bounty $250

@algora-pbc
Copy link

algora-pbc bot commented Jul 27, 2023

💎 $250 bounty • ZIO

Steps to solve:

  1. Start working: Comment /attempt #2297 with your implementation plan
  2. Submit work: Create a pull request including /claim #2297 in the PR body to claim the bounty
  3. Receive payment: 100% of the bounty is received 2-5 days post-reward. Make sure you are eligible for payouts

Thank you for contributing to zio/zio-http!

Add a bountyShare on socials

Attempt Started (GMT+0) Solution
🔴 @FVelasquezM Aug 15, 2023, 3:49:52 AM WIP
🔴 @lackhoa Sep 23, 2023, 1:19:13 PM WIP
🟢 @kyri-petrou #3060

@FVelasquezM
Copy link

FVelasquezM commented Aug 15, 2023

Hi, I've done some debugging and I think I've located the place where this behavior is being caused. I'll try to fix it /attempt #2297

Options

@algora-pbc
Copy link

algora-pbc bot commented Aug 22, 2023

@FVelasquezM: Reminder that in 7 days the bounty will become up for grabs, so please submit a pull request before then 🙏

@FVelasquezM
Copy link

FVelasquezM commented Aug 23, 2023

I've been working on this issue for the last couple of days, but unfortunately I've been unable to solve it, I'll leave some of my findings here so, hopefully, someone more knowledgeable can pick it up from there and solve this issue:

The gist of the issue seems to be that neither AsyncBodyReader's nor ServerInboundHandler's exceptionCaught method seem to be triggered when the client drops the connection. I've occasionally manage to trigger the methods by pumping AsyncBodyReader and ServerInboundHandler with printlns, but I've been unable to do so reliably, so maybe the exceptionCaught method is not being called because of some sort of race condition happening between threads? I'm frankly at a loss here.

There could be a workaround by overriding adding a boolean var lastFound to AsyncBodyReader and setting it true when the last Chunk is found. If the connection is dropped by the client, ChannelInboundHandler's channelInactive method should be invoked, so overriding it in the AsyncBodyReader to something like this could work:

override def channelInactive(ctx: ChannelHandlerContext): Unit  = {
    if (!foundLast) {
      state match {
        case State.Buffering        =>
        case State.Direct(callback) =>
          callback.fail(new IOException("Unexpected end of stream"))
      }
    }
    ctx.fireChannelInactive();
  }

There's an issue with this, however, as NettyRuntime is introducing a listener to the channel's close future:

ctx.channel().closeFuture.addListener(close)

When a client drops the listener is interrupting the fiber before AsyncBodyReader's channelInactive method is fully completed and, as such, the stream is left hanging.

As a temporary workaround, I've introduced a sleep before the fiber is interrupted. This seems to work, but only twice, the first two requests dropped by the client fail as expected, generating the "test stream failed" message described in the issue, however, any further drops hang (but small requests made with echo hello | http --chunked POST localhost:8080/forever work as expected). I don't know what's causing this behavior.

@algora-pbc
Copy link

algora-pbc bot commented Aug 29, 2023

The bounty is up for grabs! Everyone is welcome to /attempt #2297 🙌

@lackhoa
Copy link
Contributor

lackhoa commented Sep 23, 2023

/attempt #2297 I'll try for a few days to do this.

Options

@lackhoa
Copy link
Contributor

lackhoa commented Sep 27, 2023

Hm, the crux of the issue is that the request handler thread is interrupted (by something, which is probably correct anyway). But the mystery remains is why "foldCauseZIO" doesn't catch that? Here's a comment from the ZIO code:

  /**
   * A more powerful version of `foldZIO` that allows recovering from any kind
   * of failure except external interruption.
   */
  override final def foldCauseZIO[R, E2, B](

This is in conflict with the doc, which says: "They can recover from any error, even fiber interruptions."

Any ZIO guru out there can explain to me how "fiber interruption" is different from "external interruption"? Thanks. And here's the actual cause of failure:

 scala.MatchError: Then(Then(Then(Then(Interrupt(Runtime(109460,1695811465858,),Stack trace for thread "zio-fiber-109460":
 ),Interrupt(Runtime(109460,1695811465858,),Stack trace for thread "zio-fiber-109460":
 )),Interrupt(Runtime(109460,1695811465858,),Stack trace for thread "zio-fiber-109460":
 )),Interrupt(Runtime(109460,1695811465858,),Stack trace for thread "zio-fiber-109460":
 )),Interrupt(Runtime(109460,1695811465858,),Stack trace for thread "zio-fiber-109460":
 )) (of class zio.Cause$Then)

@adamgfraser
Copy link
Contributor

@lackhoa External interruption is when a fiber is interrupted by another fiber using Fiber#interrupt. External interruption can never be recovered from because someone else is telling you that you are interrupted, so you don't have the right to say "no I'm not interrupted". Internal interruption is when you interrupt yourself using ZIO.interrupt. Since you are the one who interrupted yourself, you can say "no I'm not interrupted" and recover from it like any normal cause.

@lackhoa
Copy link
Contributor

lackhoa commented Sep 28, 2023

So this is my summary of this issue so far (possibly conclusive?):

Starting from the original description of the issue: from the test it appears that when the client disconnects after having sent a large request body, the server continues to iterate on the body stream and does not get interrupted. Two points:

  1. The server does infact get interrupted by some other fiber: and the "continually printing" behavior can be explained by the console being slow and lags behind the server output.

  2. The server hangs on first request: that can be explained by the "Body.asStream" call not being able to emit chunks into the output stream. This is resolved by invoke NettyBody callback in a separate fiber to avoid deadlocks #2402.

There is a way to run code upon external interruption: using ZIO.interrupt. This is a sample response (incorporating fix from #2402)

        _ <- bodyStream.runForeach{_ => ZIO.attempt {
          val iteration = counter.incrementAndGet
          if (iteration == threshold.get) {
            println(s"Iteration: ${iteration}")
            threshold.set(threshold.get * 1000)
          }
        }
        }
        .foldCauseZIO(
          failure = cause => cause match {
            case Cause.Interrupt(failure, _) => ZIO.logWarningCause("!!!!!!${failure.threadName} interrupted", cause).as(Response.status(Status.InternalServerError))
            case _                           => ZIO.logWarningCause(s"!!!!!! test stream failed", cause).as(Response.status(Status.InternalServerError))
          },
          success = _ => ZIO.logInfo("test stream done").as(Response.status(Status.NoContent)),
        )
        .onInterrupt(interruptors => ZIO.fiberIdWith{fiberId => ZIO.succeed{
          interruptors.foreach{interruptor => println(s"${fiberId} interrupted by ${interruptor}")}
        }})

Response:

[info] Iteration: 1
[info] Iteration: 1000
[info] Runtime(39,1695890331561,) interrupted by Runtime(6799,1695890331871,)

Question remains: should the request handler fiber be interrupted when client disconnects like that? I don't know, would like input from maintainers to proceed. (UPDATE: well on second thought, obviously we can't return a response when the client disconnects, so the API is correct unless I'm missing something.)

So tldr: the only bug is found in #2402.

Tag: @jdegoes, @jgulotta

@jgulotta
Copy link
Contributor Author

It seems indeed the stream is interrupted externally not running forever, and I was also unclear on the fact that such interruption is not caught by foldCauseZIO.

The server hangs on first request: that can be explained by the "Body.asStream" call not being able to emit chunks into the output stream

I would expect that behavior to be random, no? For me it reliably does nothing on the first call and reliably does something on subsequent calls.

Maybe classloading on the initial call is slow enough that the buffer gets filled but on subsequent calls they're already loaded so it's fast enough.

What that suggests is the channel is marked active and read from too early. Demand should probably be signaled from downstream before data is read. So maybe ZStream.async as the push-pull adapter is not the best tool for a streaming body, or it should be done in a way that only when there is space in the buffer do we try to read from the channel.

obviously we can't return a response when the client disconnects, so the API is correct unless I'm missing something

A response certainly cannot be sent over the wire, but I was expecting the streaming body to fail upon disconnect. That scenario might build up an error response, and that response would never be sent, but all other expected error handling could occur, like metrics and logging.

I think it's reasonable to fail the streaming body instead of interrupting it on the grounds that client disconnects are expected cases, e.g. with TCP FIN/RST packets. I expect interrupts in situations like the server is shutting down, not normal per-connection handling. The programming model for that is a little friendlier I think. I already have to translate all expected error cases into a response in order to serve the app, it might be nice if I didn't have to think about every handler getting interrupted too.

Put another way, it's one thing to have silent interruption when I opt into it with something like ZIO.race, but it's another to have silent interruption as an "expected" behavior over which I have no control. In almost no case do I want my HTTP service to just stop processing a request without telling me what happened. I'd also rather not have to add onInterrupt to every handler.

Also, interruptions not due to client disconnections could still send an error response over an active channel.

That's not to say that interrupting in this case is wrong in any sense. It's a valid way to handle the scenario. It's just not what I was expected and not what I personally desire.

@lackhoa
Copy link
Contributor

lackhoa commented Sep 29, 2023

@jgulotta

I would expect that behavior to be random, no?

The buffer is 16 chunk. In fact I think it must be filled every time.

Tbh I don't know why sometimes it works either. I could look into it... but hey! It's broken code, and there's little incentive to investigate why broken code breaks the way it does. I'd advise you to test from myazinn's PR, and tell me if you can find a bug with it (I find that it works 100% of the time).

In almost no case do I want my HTTP service to just stop processing a request without telling me what happened. I'd also rather not have to add onInterrupt to every handler.

Personally I agree, allowing the handler fiber to run to completion and then discarding the response would be fair game.

@jgulotta
Copy link
Contributor Author

I'm certain that PR is functionally correct in as much as forking the fiber will always allow the stream to process chunks that get emitted to it during the callback registration. The fact that callback registration can and does emit chunks to the stream at all is what feels off to me. I don't really know - perhaps the buffering that's happening is inherently necessary for some reason.

In any case, I guess this issue can be closed as not a bug. Perhaps a new issue is warranted to decide/design how client disconnections should be surfaced, if the maintainers are so inclined to reconsider the current interruption approach

@algora-pbc
Copy link

algora-pbc bot commented Sep 30, 2023

@lackhoa: Reminder that in 7 days the bounty will become up for grabs, so please submit a pull request before then 🙏

@jgulotta
Copy link
Contributor Author

jgulotta commented Oct 5, 2023

Actually seems I'm still having the issue which motivated this report, even incorporating the forked fiber from the other MR, and it turns out I had not isolated the cause as I thought.

It seems related to ZStream.toInputStream (my real code uses that) and its behavior during interruption, which I think is resulting in a deadlock. I still haven't quite minimized things down to something I'd want to file into a new issue for ZStream but figure I'll post somewhere just to get the information out there.

I'm now trying to diagnose from this request handler

    ZIO.scoped {
      req.body
        .asStream
// using .toInputStream.flatMap deadlocks or something
// it never reaches the "interrupted" message
// after enough non-interruptions, new requests are never handled
        .toInputStream
        .flatMap { in =>
          ZIO.logInfo("transferring") *>
            ZIO.attemptBlocking(in.transferTo(new java.io.OutputStream() {
              def write(byte: Int): Unit = {}
            })) <*
            ZIO.logInfo("transferred")
        }
// not using .toInputStream.flatMap and just using .runDrain reliably completes and prints "interrupted"
//      .runDrain
        .foldCauseZIO(
          c => ZIO.logWarningCause("test stream failed", Cause.fail(c)).as(Response.status(Status.InternalServerError)),
          _ => ZIO.logInfo("test stream done").as(Response.status(Status.NoContent))
        )
        .onInterrupt(ZIO.logWarning("interrupted"))
    }

@lackhoa
Copy link
Contributor

lackhoa commented Oct 6, 2023

@jgulotta The reason why "onInterrupt" isn't called is because what you're doing can't be interrupted. This is the docs of "transferTo"

     * This method may block indefinitely reading from the input stream, or
     * writing to the output stream. The behavior for the case where the input
     * and/or output stream is <i>asynchronously closed</i>, or the thread
     * interrupted during the transfer, is highly input and output stream
     * specific, and therefore not specified.

However, even then, I think there's still a bug in the body stream (it should stop the stream when the client disconnects, but I can't figure out how to do it yet). I think you should file a different issue with that program. We're getting quite far off with this issue already :) It should be put to rest.

@jgulotta
Copy link
Contributor Author

jgulotta commented Oct 6, 2023

In the general case where one can pass in an arbitrary input and/or output stream, you're right, the behavior through the interface is unspecified. In this specific case that is not the reason because the classes involved are well defined: ZInputStream and a do-nothing OutputStream. The only thing that will happen is that it drains the input stream until it hits the end or a failure. That means continually executing the effect from .toPull, which does (or should) respect interruption, and which propagates as an exception.

The trouble seems to be the effect from .toPull never completes when it's executed, i.e. pulling hangs the process. The same behavior happens even if you stay in pure-ZIO world with e.g. ZIO.whileLoop that pulls from the iterator instead of using .transferTo.

I do intend to file another issue when I feel I've actually isolated it enough. But I also think we're still in the spirit of this one as the only difference is instead of .runForeachChunk there is .toInputStream.flatMap , which matches my real code and hangs forever. I incorrectly thought it was unimportant in the original report

@lackhoa
Copy link
Contributor

lackhoa commented Oct 7, 2023

@jgulotta Ok you’re right. The doc I quoted above did say that the behavior is implementation-specific. In that case I have no idea why it can’t be interrupted. I’ll have to look into the implementation a bit more…

@algora-pbc
Copy link

algora-pbc bot commented Oct 7, 2023

The bounty is up for grabs! Everyone is welcome to /attempt #2297 🙌

@lackhoa
Copy link
Contributor

lackhoa commented Oct 9, 2023

@jdegoes Hi, could you add another bounty for this issue? I believe the original issue has been solved by @myazinn and me, having another bounty would motivate further effort to resolve the issue.

@pwharned
Copy link

pwharned commented Nov 6, 2023

Is this issue still open? I'd like to take a look.

@lackhoa
Copy link
Contributor

lackhoa commented Nov 6, 2023

Is this issue still open? I'd like to take a look.

Trust me, you don't :)

@pwharned
Copy link

pwharned commented Nov 6, 2023

Don't underestimate my willingness to humiliate myself. :)

@lackhoa
Copy link
Contributor

lackhoa commented Nov 7, 2023

Don't underestimate my willingness to humiliate myself. :)

Well if what you're after is the bounty, you're never gonna get it no matter how good you are. This issue is like a hydra, you cut off one head, it grows back two other.

So it's worse if you manage to do something, because you won't get any reward for it. Read the previous comments if you don't believe me.

Copy link

algora-pbc bot commented Aug 28, 2024

💡 @kyri-petrou submitted a pull request that claims the bounty. You can visit your bounty board to reward.

Copy link

algora-pbc bot commented Aug 28, 2024

🎉🎈 @kyri-petrou has been awarded $250! 🎈🎊

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
💎 Bounty bug Something isn't working 💰 Rewarded
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants