Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parEvalMap messes up unmasking of cancellation #3311

Open
Jasper-M opened this issue Sep 25, 2023 · 0 comments
Open

parEvalMap messes up unmasking of cancellation #3311

Jasper-M opened this issue Sep 25, 2023 · 0 comments
Labels

Comments

@Jasper-M
Copy link
Contributor

Jasper-M commented Sep 25, 2023

Reproduction (scastie):

import cats.effect.std.Queue
import cats.effect.IO
import cats.effect.testkit.TestControl
import cats.effect.unsafe.implicits.global
import fs2.Stream
import cats.syntax.all._
import scala.concurrent.duration._

var parallelism = 1

val queue = Queue.unbounded[IO, Int].unsafeRunSync()

val drainQueue = queue.tryTakeN(None).flatMap(_.traverse_(i => IO.println(s"cleaning up leftover $i")))

val run = IO.uncancelable( poll =>
  Stream.repeatEval(poll(queue.take))
    .parEvalMap(parallelism){ i =>
      poll(IO.println(s"processing: $i"))
        .onCancel(IO.println(s"cleaning up: $i"))
    }
    .compile
    .drain
    .guarantee(drainQueue)
)

val program = TestControl.executeEmbed(run.background.surround(List(1,2,3,4,5).traverse_(i => queue.offer(i).delayBy(500.millis))))


program.unsafeRunSync()
println()

parallelism = 2
program.unsafeRunSync()

Output:

processing: 1
processing: 2
processing: 3
processing: 4
cleaning up leftover 5

processing: 1
processing: 2
processing: 3
processing: 4
processing: 5
cats.effect.testkit.TestControl$NonTerminationException: Program under test failed produce a result ...

Not sure if it's related to other parEvalMap issues such as #3076.
It's probably a more general IO <-> Stream interop thing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant