Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prefetch hangs on a canceled stream #3485

Open
enlait opened this issue Oct 1, 2024 · 2 comments
Open

prefetch hangs on a canceled stream #3485

enlait opened this issue Oct 1, 2024 · 2 comments
Labels

Comments

@enlait
Copy link

enlait commented Oct 1, 2024

fs2 version 3.10.2

this code does not terminate:

Stream.eval(IO.canceled).prefetch.compile.drain

expected behavior: prefetch should not change execution semantics, at least not so much as to prevent termination

@enlait enlait added the bug label Oct 1, 2024
@ValdemarGr
Copy link
Contributor

Since concurrently doesn't guarantee that the foreground stream is closed when the background reaches termination, then I think the channel should be closed once the this stream exits.

  def prefetchN[F2[x] >: F[x]: Concurrent](
      n: Int
  ): Stream[F2, O] =
    Stream.eval(Channel.bounded[F2, Chunk[O]](n)).flatMap { chan =>
      chan.stream.unchunks.concurrently {
-        chunks.through(chan.sendAll)
+        chunks.through(chan.sendAll).onFinalize(chan.close)
      }
    }

Here.

@ValdemarGr
Copy link
Contributor

Further note, this will cause the foreground stream to exit with Success which might not be correct. I think Canceled should be the right outcome in that case.

  def prefetchN[F2[x] >: F[x]: Concurrent](
      n: Int
  ): Stream[F2, O] =
-   Stream.eval(Channel.bounded[F2, Chunk[O]](n)).flatMap { chan =>
+   Stream.eval(Channel.bounded[F2, Either[ExitCase, Chunk[O]]](n)).flatMap { chan =>
      chan.stream.unchunks.concurrently {
-        chunks.through(chan.sendAll)
+        chunks.map(_.asRight[ExitCase]).through(chan.sendAll).onFinalizeCase(chan.send(_) >> chan.close)
      }
    }.flatMap{
      case Right(x) => Stream.chunk(x)
      case Left(ec) => Stream.exec(ec.toOutcome[F].embed(Concurrent[F].canceled))
    }

Maybe prefetch can be expressed via one of the other combinators that run two streams concurrently which also guarantees communication of cancellation and errors if any of the concurrent streams exits (merge? but it doesn't look like it handles cancellation properly).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants