Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -1104,7 +1104,10 @@ lazy val std = crossProject(JSPlatform, JVMPlatform, NativePlatform)
ProblemFilters.exclude[FinalMethodProblem](
"cats.effect.std.Dispatcher#RegState#Unstarted.toString"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"cats.effect.std.Dispatcher#Registration#Primary.*")
"cats.effect.std.Dispatcher#Registration#Primary.*"),
// #4500, private class:
ProblemFilters.exclude[ReversedMissingMethodProblem](
"cats.effect.std.Supervisor#State.numberOfFibers")
)
)
.jsSettings(
Expand Down
273 changes: 145 additions & 128 deletions std/shared/src/main/scala/cats/effect/std/Supervisor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ object Supervisor {
def apply[F[_]: Concurrent]: Resource[F, Supervisor[F]] =
apply[F](false)

private sealed abstract class State[F[_]] {
private[std] sealed abstract class State[F[_]] {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the changes in Supervisor are just to make it testable (its internal state inspectable).


def remove(token: Unique.Token): F[Unit]

Expand All @@ -150,6 +150,8 @@ object Supervisor {
*/
def add(token: Unique.Token, fiber: Fiber[F, Throwable, ?]): F[Boolean]

private[std] def numberOfFibers: F[Int] // for testing

// these are allowed to destroy the state, since they're only called during closing:
val joinAll: F[Unit]
val cancelAll: F[Unit]
Expand All @@ -169,152 +171,161 @@ object Supervisor {
case (st, _) =>
doneR.set(true) *> st.cancelAll
}
} yield new Supervisor[F] {

def supervise[A](fa: F[A]): F[Fiber[F, Throwable, A]] =
F.uncancelable { _ =>
val monitor: (F[A], F[Unit]) => F[Fiber[F, Throwable, A]] = checkRestart match {
case Some(restart) => { (fa, fin) =>
F.deferred[Outcome[F, Throwable, A]] flatMap { resultR =>
F.ref(false) flatMap { canceledR =>
F.deferred[Fiber[F, Throwable, A]].flatMap { firstCurrent =>
// `currentR` holds (a `Deferred` to) the current
// incarnation of the fiber executing `fa`:
F.ref(firstCurrent).flatMap { currentR =>
def action(current: Deferred[F, Fiber[F, Throwable, A]]): F[Unit] = {
F uncancelable { _ =>
val started = F start {
fa guaranteeCase { oc =>
F.deferred[Fiber[F, Throwable, A]].flatMap { newCurrent =>
// we're replacing the `Deferred` holding
// the current fiber with a new one before
// the current fiber finishes, and even
// before we check for the cancel signal;
// this guarantees, that the fiber reachable
// through `currentR` is the last one (or
// null, see below):
currentR.set(newCurrent) *> {
canceledR.get flatMap { canceled =>
doneR.get flatMap { done =>
if (!canceled && !done && restart(oc)) {
action(newCurrent)
} else {
// we must complete `newCurrent`,
// because `cancel` below may wait
// for it; we signal that it is not
// restarted with `null`:
newCurrent.complete(null) *> fin.guarantee(
resultR.complete(oc).void)
}
} yield new SupervisorImpl[F](checkRestart, doneR, state)
}

private[std] final class SupervisorImpl[F[_]](
checkRestart: Option[Outcome[F, Throwable, ?] => Boolean],
doneR: Ref[F, Boolean],
private[std] val state: State[F]
)(implicit F: Concurrent[F])
extends Supervisor[F] {

def supervise[A](fa: F[A]): F[Fiber[F, Throwable, A]] =
F.uncancelable { _ =>
val monitor: (F[A], F[Unit]) => F[Fiber[F, Throwable, A]] = checkRestart match {
case Some(restart) => { (fa, fin) =>
F.deferred[Outcome[F, Throwable, A]] flatMap { resultR =>
F.ref(false) flatMap { canceledR =>
F.deferred[Fiber[F, Throwable, A]].flatMap { firstCurrent =>
// `currentR` holds (a `Deferred` to) the current
// incarnation of the fiber executing `fa`:
F.ref(firstCurrent).flatMap { currentR =>
def action(current: Deferred[F, Fiber[F, Throwable, A]]): F[Unit] = {
F uncancelable { _ =>
val started = F start {
fa guaranteeCase { oc =>
F.deferred[Fiber[F, Throwable, A]].flatMap { newCurrent =>
// we're replacing the `Deferred` holding
// the current fiber with a new one before
// the current fiber finishes, and even
// before we check for the cancel signal;
// this guarantees, that the fiber reachable
// through `currentR` is the last one (or
// null, see below):
currentR.set(newCurrent) *> {
canceledR.get flatMap { canceled =>
doneR.get flatMap { done =>
if (!canceled && !done && restart(oc)) {
action(newCurrent)
} else {
// we must complete `newCurrent`,
// because `cancel` below may wait
// for it; we signal that it is not
// restarted with `null`:
newCurrent.complete(null) *> fin.guarantee(
resultR.complete(oc).void)
}
}
}
}
}
}

started flatMap { f => current.complete(f).void }
}

started flatMap { f => current.complete(f).void }
}
}

action(firstCurrent).as(
new Fiber[F, Throwable, A] {

private[this] val delegateF = currentR.get.flatMap(_.get)

val cancel: F[Unit] = F uncancelable { _ =>
// after setting `canceledR`, at
// most one restart happens, and
// the fiber we get through `delegateF`
// is the final one:
canceledR.set(true) *> delegateF flatMap {
case null =>
// ok, task wasn't restarted, but we
// wait for the result to be completed
// (and the finalizer to run):
resultR.get.void
case fiber =>
fiber.cancel *> fiber.join flatMap {
case Outcome.Canceled() =>
// cancel successful (or self-canceled),
// but we don't know if the `guaranteeCase`
// above ran so we need to double check:
delegateF.flatMap {
case null =>
// ok, the `guaranteeCase`
// certainly executed/ing:
resultR.get.void
case fiber2 =>
// we cancelled the fiber before it did
// anything, so the finalizer didn't run,
// we need to do it now:
val cleanup = fin.guarantee(
resultR.complete(Outcome.Canceled()).void
)
if (fiber2 eq fiber) {
cleanup
} else {
// this should never happen
cleanup *> F.raiseError(new AssertionError(
"unexpected fiber (this is a bug in Supervisor)"))
}
}
case _ =>
// finished in error/success,
// the outcome will certainly
// be completed:
resultR.get.void
}
}
action(firstCurrent).as(
new Fiber[F, Throwable, A] {

private[this] val delegateF = currentR.get.flatMap(_.get)

val cancel: F[Unit] = F uncancelable { _ =>
// after setting `canceledR`, at
// most one restart happens, and
// the fiber we get through `delegateF`
// is the final one:
canceledR.set(true) *> delegateF flatMap {
case null =>
// ok, task wasn't restarted, but we
// wait for the result to be completed
// (and the finalizer to run):
resultR.get.void
case fiber =>
fiber.cancel *> fiber.join flatMap {
case Outcome.Canceled() =>
// cancel successful (or self-canceled),
// but we don't know if the `guaranteeCase`
// above ran so we need to double check:
delegateF.flatMap {
case null =>
// ok, the `guaranteeCase`
// certainly executed/ing:
resultR.get.void
case fiber2 =>
// we cancelled the fiber before it did
// anything, so the finalizer didn't run,
// we need to do it now:
val cleanup = fin.guarantee(
resultR.complete(Outcome.Canceled()).void
)
if (fiber2 eq fiber) {
cleanup
} else {
// this should never happen
cleanup *> F.raiseError(new AssertionError(
"unexpected fiber (this is a bug in Supervisor)"))
}
}
case _ =>
// finished in error/success,
// the outcome will certainly
// be completed:
resultR.get.void
}
}

def join = resultR.get
}
)
}

def join = resultR.get
}
)
}
}
}
}
}

case None => (fa, fin) => F.start(fa.guarantee(fin))
case None => { (fa, fin) =>
F.start(fa).flatMap { fib => F.start(fib.join.guarantee(fin)).as(fib) }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the actual fix.

(I believe the other case is already correct, because cancel joins the underlying fiber.)

}
}

for {
done <- F.ref(false)
insertResult <- F.deferred[Boolean]
token <- F.unique
cleanup = state.remove(token)
fiber <- monitor(
// if the supervisor have been (or is now)
// shutting down, inserting into state will
// fail; so we need to wait for the positive result
// of inserting, before actually doing the task:
insertResult
.get
.ifM(
fa,
F.canceled *> F.raiseError[A](new AssertionError(
"supervised fiber couldn't cancel (this is a bug in Supervisor)"))
),
done.set(true) *> cleanup
)
insertOk <- state.add(token, fiber)
_ <- insertResult.complete(insertOk)
// `cleanup` could run BEFORE the `state.add`
// (if `fa` is very fast), in which case it doesn't
// remove the fiber from the state, so we re-check:
_ <- done.get.ifM(cleanup, F.unit)
_ <- {
if (!insertOk) {
F.raiseError(new IllegalStateException("supervisor already shutdown"))
} else {
F.unit
}
for {
done <- F.ref(false)
insertResult <- F.deferred[Boolean]
token <- F.unique
cleanup = state.remove(token)
fiber <- monitor(
// if the supervisor have been (or is now)
// shutting down, inserting into state will
// fail; so we need to wait for the positive result
// of inserting, before actually doing the task:
insertResult
.get
.ifM(
fa,
F.canceled *> F.raiseError[A](new AssertionError(
"supervised fiber couldn't cancel (this is a bug in Supervisor)"))
),
done.set(true) *> cleanup
)
insertOk <- state.add(token, fiber)
_ <- insertResult.complete(insertOk)
// `cleanup` could run BEFORE the `state.add`
// (if `fa` is very fast), in which case it doesn't
// remove the fiber from the state, so we re-check:
_ <- done.get.ifM(cleanup, F.unit)
_ <- {
if (!insertOk) {
F.raiseError(new IllegalStateException("supervisor already shutdown"))
} else {
F.unit
}
} yield fiber
}
}
}
} yield fiber
}
}

private[effect] def applyForConcurrent[F[_]](
Expand All @@ -335,6 +346,9 @@ object Supervisor {
case map => (map.updated(token, fiber), true)
}

private[std] final override def numberOfFibers: F[Int] = // for testing
stateRef.get.map(_.size)

private[this] val allFibers: F[List[Fiber[F, Throwable, ?]]] = {
// we're closing, so we won't need the state any more,
// so we're using `null` as a sentinel to reject later
Expand Down Expand Up @@ -371,6 +385,9 @@ object Supervisor {
F.delay(state.put(token, fiber)) *> doneR.get.map(!_)
}

private[std] final override def numberOfFibers: F[Int] = // for testing
F.delay { state.size() }

private[this] val allFibers: F[List[Fiber[F, Throwable, ?]]] =
F delay {
val fibers = ListBuffer.empty[Fiber[F, Throwable, ?]]
Expand Down
29 changes: 29 additions & 0 deletions tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -294,5 +294,34 @@ class SupervisorSpec extends BaseSpec with DetectPlatform {

tsk.parReplicateA_(if (isJVM) 1000 else 1).as(ok)
}

def superviseCancelRace(mkSupervisor: Resource[IO, Supervisor[IO]]) = {
val N = if (isJVM) 1000 else 5
val M = if (isJVM) 20 else 2
val tsk = mkSupervisor.use { supervisor =>
supervisor
.supervise(IO.unit)
.flatMap(_.cancel)
.replicateA_(N)
.parReplicateA_(M)
.flatMap { _ =>
// let's wait a bit (for cleanup to happen):
IO.sleep(0.2.second) *> {
val st = supervisor.asInstanceOf[Supervisor.SupervisorImpl[IO]].state
// the supervised fibers must've been cleaned up from the internal state:
st.numberOfFibers.flatMap { numFibs => IO(numFibs mustEqual 0) }
}
}
}
tsk.as(ok)
}

"supervise / cancel race cleanup" in real {
superviseCancelRace(constructor(false, None))
}

"supervise / cancel race cleanup (with restart)" in real {
superviseCancelRace(constructor(false, Some(_ => true)))
}
}
}
Loading