From 056bfa44587aac643f9f07468086a5739aec8bad Mon Sep 17 00:00:00 2001 From: Felix Bruckmeier Date: Sat, 24 Oct 2020 00:01:29 +0200 Subject: [PATCH] WIP: Rerunnable: Sync + Clock + MonadError --- build.sbt | 7 +- .../catbird/util/effect/FutureInstances.scala | 12 +-- .../catbird/util/effect/RerunnableClock.scala | 24 ----- .../util/effect/RerunnableContextShift.scala | 73 --------------- .../util/effect/RerunnableInstances.scala | 47 ++++++++-- .../catbird/util/effect/RerunnableTimer.scala | 46 ---------- .../io/catbird/util/effect/package.scala | 42 +++++---- .../util/effect/ContextShiftingSuite.scala | 64 -------------- .../io/catbird/util/effect/FutureSuite.scala | 16 ++-- .../util/effect/RerunnableClockSuite.scala | 8 +- .../effect/RerunnableContextShiftSuite.scala | 88 ------------------- .../catbird/util/effect/RerunnableSuite.scala | 28 +++--- .../util/effect/RerunnableTimerSuite.scala | 37 -------- .../io/catbird/util/effect/Runners.scala | 62 +++++++++++++ 14 files changed, 159 insertions(+), 395 deletions(-) delete mode 100644 effect/src/main/scala/io/catbird/util/effect/RerunnableClock.scala delete mode 100644 effect/src/main/scala/io/catbird/util/effect/RerunnableContextShift.scala delete mode 100644 effect/src/main/scala/io/catbird/util/effect/RerunnableTimer.scala delete mode 100644 effect/src/test/scala/io/catbird/util/effect/ContextShiftingSuite.scala delete mode 100644 effect/src/test/scala/io/catbird/util/effect/RerunnableContextShiftSuite.scala delete mode 100644 effect/src/test/scala/io/catbird/util/effect/RerunnableTimerSuite.scala create mode 100644 effect/src/test/scala/io/catbird/util/effect/Runners.scala diff --git a/build.sbt b/build.sbt index ba3aee0b..1bdbc403 100644 --- a/build.sbt +++ b/build.sbt @@ -1,5 +1,5 @@ -val catsVersion = "2.0.0" -val catsEffectVersion = "2.0.0" +val catsVersion = "2.2.0" +val catsEffectVersion = "3.0.0-M2" val utilVersion = "20.9.0" val finagleVersion = "20.9.0" @@ -92,7 +92,8 @@ lazy val effect = project .settings( libraryDependencies ++= Seq( "org.typelevel" %% "cats-effect" % catsEffectVersion, - "org.typelevel" %% "cats-effect-laws" % catsEffectVersion % Test + "org.typelevel" %% "cats-effect-laws" % catsEffectVersion % Test, + "org.typelevel" %% "cats-effect-testkit" % catsEffectVersion % Test ), scalacOptions in Test ~= { _.filterNot(Set("-Yno-imports", "-Yno-predef")) diff --git a/effect/src/main/scala/io/catbird/util/effect/FutureInstances.scala b/effect/src/main/scala/io/catbird/util/effect/FutureInstances.scala index 89e4eb58..fbb79ce4 100644 --- a/effect/src/main/scala/io/catbird/util/effect/FutureInstances.scala +++ b/effect/src/main/scala/io/catbird/util/effect/FutureInstances.scala @@ -1,16 +1,18 @@ package io.catbird.util.effect -import cats.effect.{ Bracket, ExitCase } -import com.twitter.util.{ Future, Monitor } +import com.twitter.util.{Future, Monitor} import io.catbird.util.FutureMonadError import java.lang.Throwable + +import cats.effect.kernel.Resource.{Bracket, ExitCase} + import scala.Unit trait FutureInstances { - implicit final val futureBracketInstance: Bracket[Future, Throwable] = - new FutureMonadError with Bracket[Future, Throwable] { + implicit final val futureBracketInstance: Bracket[Future] = + new FutureMonadError with Bracket[Future] { final def bracketCase[A, B](acquire: Future[A])(use: A => Future[B])( - release: (A, ExitCase[Throwable]) => Future[Unit] + release: (A, ExitCase) => Future[Unit] ): Future[B] = acquire .flatMap(a => diff --git a/effect/src/main/scala/io/catbird/util/effect/RerunnableClock.scala b/effect/src/main/scala/io/catbird/util/effect/RerunnableClock.scala deleted file mode 100644 index 3750e960..00000000 --- a/effect/src/main/scala/io/catbird/util/effect/RerunnableClock.scala +++ /dev/null @@ -1,24 +0,0 @@ -package io.catbird.util.effect - -import java.util.concurrent.TimeUnit - -import cats.effect.Clock -import io.catbird.util.Rerunnable -import scala.Long -import java.lang.System - -import scala.concurrent.duration.TimeUnit - -object RerunnableClock { - - def apply(): RerunnableClock = new RerunnableClock -} - -final private[effect] class RerunnableClock extends Clock[Rerunnable] { - - override def realTime(unit: TimeUnit): Rerunnable[Long] = - Rerunnable(unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS)) - - override def monotonic(unit: TimeUnit): Rerunnable[Long] = - Rerunnable(unit.convert(System.nanoTime(), TimeUnit.NANOSECONDS)) -} diff --git a/effect/src/main/scala/io/catbird/util/effect/RerunnableContextShift.scala b/effect/src/main/scala/io/catbird/util/effect/RerunnableContextShift.scala deleted file mode 100644 index 761d2b98..00000000 --- a/effect/src/main/scala/io/catbird/util/effect/RerunnableContextShift.scala +++ /dev/null @@ -1,73 +0,0 @@ -package io.catbird.util.effect - -import cats.effect.ContextShift -import com.twitter.util.{ Future, FuturePool, Promise } -import io.catbird.util.Rerunnable - -import scala.Unit -import java.lang.Runnable -import java.util.concurrent.ExecutorService - -import scala.concurrent.{ ExecutionContext, ExecutionContextExecutorService } - -/** - * The goal here is to provide an implicit instance for `ContextShift[Rerunnable]`, so you can use libraries like - * `fs2` in a finagle-based application without converting between `Future` and `IO` everywhere. - * - * Usage: - * {{{ - * implicit val rerunnableCS: ContextShift[Rerunnable] = RerunnableContextShift.global - * }}} - */ -object RerunnableContextShift { - - final def fromExecutionContext(ec: ExecutionContext): ContextShift[Rerunnable] = - new RerunnableContextShift(ec) - - final def fromExecutorService(es: ExecutorService): ContextShift[Rerunnable] = - fromExecutionContext(ExecutionContext.fromExecutorService(es)) - - final def fromExecutionContextExecutorService(eces: ExecutionContextExecutorService): ContextShift[Rerunnable] = - fromExecutorService(eces) - - final lazy val global: ContextShift[Rerunnable] = - fromExecutionContext(scala.concurrent.ExecutionContext.global) - - /** - * Mirrors the api of `scala.concurrent.ExecutionContext.Implicit.global`. - * - * Usage: - * {{{ - * import io.catbird.util.effect.RerunnableContextShift.Implicits.global - * }}} - */ - object Implicits { - final implicit def global: ContextShift[Rerunnable] = RerunnableContextShift.global - } -} - -final private[effect] class RerunnableContextShift private (ec: ExecutionContext) extends ContextShift[Rerunnable] { - private final lazy val futurePool = FuturePool.interruptible(ec.asInstanceOf[ExecutionContextExecutorService]) - - override def shift: Rerunnable[Unit] = - Rerunnable.withFuturePool(futurePool)(()) // This is a bit of a hack, but it will have to do - - override def evalOn[A](targetEc: ExecutionContext)(fa: Rerunnable[A]): Rerunnable[A] = - for { - r <- executeOn(targetEc)(fa).liftToTry - _ <- shift - a <- Rerunnable.fromFuture(Future.value(r).lowerFromTry) - } yield a - - private def executeOn[A](targetEc: ExecutionContext)(fa: Rerunnable[A]): Rerunnable[A] = - Rerunnable.fromFuture { - val p = Promise[A]() - - targetEc.execute(new Runnable { - override def run(): Unit = - fa.run.proxyTo[A](p) - }) - - p - } -} diff --git a/effect/src/main/scala/io/catbird/util/effect/RerunnableInstances.scala b/effect/src/main/scala/io/catbird/util/effect/RerunnableInstances.scala index ad95766b..f39ff21b 100644 --- a/effect/src/main/scala/io/catbird/util/effect/RerunnableInstances.scala +++ b/effect/src/main/scala/io/catbird/util/effect/RerunnableInstances.scala @@ -1,18 +1,52 @@ package io.catbird.util.effect -import cats.effect.{ Effect, ExitCase, IO, SyncIO } +import cats.effect.{ Clock, IO, SyncIO } import com.twitter.util.{ Future, Monitor, Promise } import io.catbird.util.{ Rerunnable, RerunnableMonadError } import java.lang.Throwable +import java.util.concurrent.TimeUnit +import java.lang.System + +import cats.Applicative +import cats.effect.kernel.{ MonadCancel, Poll, Sync } +import cats.effect.kernel.Resource.{ Bracket, ExitCase } + import scala.Unit +import scala.concurrent.duration.FiniteDuration import scala.util.{ Either, Left, Right } trait RerunnableInstances { + implicit final val rerunnableInstance: Sync[Rerunnable] with Clock[Rerunnable] with Bracket[Rerunnable] = + new RerunnableMonadError with Sync[Rerunnable] with Clock[Rerunnable] with Bracket[Rerunnable] { + + final override def suspend[A](hint: Sync.Type)(thunk: => A): Rerunnable[A] = + Rerunnable(thunk) + + final override def realTime: Rerunnable[FiniteDuration] = + Rerunnable(FiniteDuration(System.currentTimeMillis(), TimeUnit.MILLISECONDS)) + + final override def monotonic: Rerunnable[FiniteDuration] = + Rerunnable(FiniteDuration(System.nanoTime(), TimeUnit.NANOSECONDS)) + + final override def bracketCase[A, B](acquire: Rerunnable[A])(use: A => Rerunnable[B])( + release: (A, ExitCase) => Rerunnable[Unit] + ): Rerunnable[B] = new Rerunnable[B] { + final def run: Future[B] = + acquire.run.flatMap { a => + val future = use(a).run + future.transform(result => + release(a, tryToExitCase(result)).run.handle(Monitor.catcher).flatMap(_ => future) + ) + } + } + } + + /* implicit final val rerunnableEffectInstance: Effect[Rerunnable] = - new RerunnableMonadError with Effect[Rerunnable] { + new RerunnableMonadError { // with Effect[Rerunnable] { final def suspend[A](thunk: => Rerunnable[A]): Rerunnable[A] = Rerunnable.suspend[A](thunk) - override final def delay[A](thunk: => A): Rerunnable[A] = Rerunnable[A](thunk) + //override final def delay[A](thunk: => A): Rerunnable[A] = Rerunnable[A](thunk) final def async[A](k: (Either[Throwable, A] => Unit) => Unit): Rerunnable[A] = new Rerunnable[A] { @@ -50,11 +84,11 @@ trait RerunnableInstances { } } - final def runAsync[A](fa: Rerunnable[A])(cb: Either[Throwable, A] => IO[Unit]): SyncIO[Unit] = - rerunnableToIO[A](fa).runAsync(cb) +// final def runAsync[A](fa: Rerunnable[A])(cb: Either[Throwable, A] => IO[Unit]): SyncIO[Unit] = +// rerunnableToIO[A](fa).runAsync(cb) final def bracketCase[A, B](acquire: Rerunnable[A])(use: A => Rerunnable[B])( - release: (A, ExitCase[Throwable]) => Rerunnable[Unit] + release: (A, ExitCase) => Rerunnable[Unit] ): Rerunnable[B] = new Rerunnable[B] { final def run: Future[B] = acquire.run.flatMap { a => @@ -65,4 +99,5 @@ trait RerunnableInstances { } } } + */ } diff --git a/effect/src/main/scala/io/catbird/util/effect/RerunnableTimer.scala b/effect/src/main/scala/io/catbird/util/effect/RerunnableTimer.scala deleted file mode 100644 index 67a510f5..00000000 --- a/effect/src/main/scala/io/catbird/util/effect/RerunnableTimer.scala +++ /dev/null @@ -1,46 +0,0 @@ -package io.catbird.util.effect - -import cats.effect.{ Clock, Timer } -import io.catbird.util.Rerunnable -import com.twitter.util.Future -import com.twitter.util -import scala.Unit - -import scala.concurrent.duration.FiniteDuration - -/** - * Can be used to construct a `cats.effect.Timer` instance for `Rerunnable` which let's you delay execution or - * retrieve the current time via `RerunnableClock`. - * - * Usage: - * {{{ - * // In a Finagle application - * implicit val timer: Timer[Rerunnable] = RerunnableTimer(com.twitter.finagle.util.DefaultTimer) - * - * // In tests (for instant execution of delays) - * implicit val timer: Timer[Rerunnable] = RerunnableTimer(com.twitter.util.Timer.Nil) - * - * // A dedicated `JavaTimer` - * implicit val timer: Timer[Rerunnable] = RerunnableTimer() - * }}} - */ -object RerunnableTimer { - - def apply(implicit twitterTimer: util.Timer): RerunnableTimer = new RerunnableTimer - - def apply(): RerunnableTimer = { - implicit val twitterTimer: util.Timer = new util.JavaTimer - - new RerunnableTimer - } -} - -final private[effect] class RerunnableTimer private (implicit underlyingTimer: util.Timer) extends Timer[Rerunnable] { - - override val clock: Clock[Rerunnable] = RerunnableClock() - - override def sleep(duration: FiniteDuration): Rerunnable[Unit] = - Rerunnable.fromFuture( - Future.Unit.delayed(util.Duration.fromNanoseconds(duration.toNanos)) - ) -} diff --git a/effect/src/main/scala/io/catbird/util/effect/package.scala b/effect/src/main/scala/io/catbird/util/effect/package.scala index 539005e7..c809e076 100644 --- a/effect/src/main/scala/io/catbird/util/effect/package.scala +++ b/effect/src/main/scala/io/catbird/util/effect/package.scala @@ -1,53 +1,51 @@ package io.catbird.util -import cats.effect.{ Async, ContextShift, ExitCase, IO } +import cats.effect.{ Async, IO } import com.twitter.util.{ Future, Return, Throw, Try } + +import scala.None import java.lang.Throwable +import cats.effect.Outcome +import cats.effect.kernel.Resource.ExitCase + import scala.util.{ Left, Right } package object effect extends FutureInstances with RerunnableInstances { /** - * Converts the `Future` to `F` without changing the underlying execution (same thread pool!). + * Converts the `Future` to `F`. */ def futureToAsync[F[_], A](fa: => Future[A])(implicit F: Async[F]): F[A] = F.async { k => fa.respond { case Return(a) => k(Right[Throwable, A](a)) case Throw(err) => k(Left[Throwable, A](err)) } - } - /** - * The same as `futureToAsync` but doesn't stay on the thread pool of the `Future` and instead shifts execution - * back to the one provided by `ContextShift[F]` (which is usually the default one). - * - * This is likely what you want when you interact with libraries that return a `Future` like `finagle-http` where - * the `Future` is running on a thread pool controlled by the library (e.g. the underlying Netty pool). - * It also is closer to the behavior of `IO.fromFuture` for Scala futures which also shifts back. - */ - def futureToAsyncAndShift[F[_], A](fa: => Future[A])(implicit F: Async[F], CS: ContextShift[F]): F[A] = - F.guarantee(futureToAsync[F, A](fa))(CS.shift) + F.pure(None) // No cancel token + } /** - * Converts the `Rerunnable` to `F` without changing the underlying execution (same thread pool!). + * Converts the `Rerunnable` to `IO`. */ final def rerunnableToIO[A](fa: Rerunnable[A]): IO[A] = futureToAsync[IO, A](fa.run) /** - * The same as `rerunnableToIO` but doesn't stay on the thread pool of the `Rerunnable` and instead shifts execution - * back to the one provided by `ContextShift[F]` (which is usually the default one). + * Convert a twitter-util Try to cats-effect ExitCase */ - final def rerunnableToIOAndShift[A](fa: Rerunnable[A])(implicit CS: ContextShift[IO]): IO[A] = - futureToAsyncAndShift[IO, A](fa.run) + final def tryToExitCase[A](ta: Try[A]): ExitCase = + ta match { + case Return(_) => ExitCase.Succeeded + case Throw(e) => ExitCase.Errored(e) + } /** - * Convert a twitter-util Try to cats-effect ExitCase + * Convert a twitter-util Try to cats-effect Outcome */ - final def tryToExitCase[A](ta: Try[A]): ExitCase[Throwable] = + final def tryToRerunnableOutcome[A](ta: Try[A]): Outcome[Rerunnable, Throwable, A] = ta match { - case Return(_) => ExitCase.complete - case Throw(e) => ExitCase.error(e) + case Return(a) => Outcome.Succeeded(Rerunnable.const(a)) + case Throw(e) => Outcome.Errored(e) } } diff --git a/effect/src/test/scala/io/catbird/util/effect/ContextShiftingSuite.scala b/effect/src/test/scala/io/catbird/util/effect/ContextShiftingSuite.scala deleted file mode 100644 index f96b013a..00000000 --- a/effect/src/test/scala/io/catbird/util/effect/ContextShiftingSuite.scala +++ /dev/null @@ -1,64 +0,0 @@ -package io.catbird.util.effect - -import cats.effect.{ ContextShift, IO } -import com.twitter.util.{ ExecutorServiceFuturePool, Future, FuturePool } -import org.scalatest.Outcome -import org.scalatest.funsuite.FixtureAnyFunSuite - -import scala.concurrent.ExecutionContext - -class ContextShiftingSuite extends FixtureAnyFunSuite with ThreadPoolNamingSupport { - - protected final class FixtureParam { - val ioPoolName = "io-pool" - val futurePoolName = "future-pool" - - val ioPool = newNamedThreadPool(ioPoolName) - - val futurePool: ExecutorServiceFuturePool = // threadpool of Future (often managed by a library like finagle-http) - FuturePool(newNamedThreadPool(futurePoolName)) - - def newIO: IO[String] = IO(currentThreadName()) - - def newFuture: Future[String] = futurePool.apply { - // Not 100% sure why but this sleep is needed to reproduce the error. There might be an optimization if the - // Future is already resolved at some point - Thread.sleep(200) - currentThreadName() - } - } - - test("After resolving the Future with futureToAsync stay on the Future threadpool") { f => - implicit val contextShift: ContextShift[IO] = // threadpool of IO (often provided by IOApp) - IO.contextShift(ExecutionContext.fromExecutor(f.ioPool)) - - val (futurePoolName, ioPoolName) = (for { - futurePoolName <- futureToAsync[IO, String](f.newFuture) - - ioPoolName <- f.newIO - } yield (futurePoolName, ioPoolName)).start(contextShift).flatMap(_.join).unsafeRunSync() - - assert(futurePoolName == f.futurePoolName) - assert(ioPoolName == f.futurePoolName) // Uh oh, this is likely not what the user wants - } - - test("After resolving the Future with futureToAsyncAndShift shift back to the threadpool of ContextShift[F]") { f => - implicit val contextShift: ContextShift[IO] = // threadpool of IO (often provided by IOApp) - IO.contextShift(ExecutionContext.fromExecutor(f.ioPool)) - - // If you'd use `futureToAsync` here instead, this whole thing would sometimes stay on the future-pool - val (futurePoolName, ioPoolName) = (for { - futurePoolName <- futureToAsyncAndShift[IO, String](f.newFuture) - - ioPoolName <- f.newIO - } yield (futurePoolName, ioPoolName)) - .start(contextShift) // start the computation on the default threadpool... - .flatMap(_.join) // ...then block until we have the results - .unsafeRunSync() - - assert(futurePoolName == f.futurePoolName) - assert(ioPoolName == f.ioPoolName) - } - - override protected def withFixture(test: OneArgTest): Outcome = withFixture(test.toNoArgTest(new FixtureParam)) -} diff --git a/effect/src/test/scala/io/catbird/util/effect/FutureSuite.scala b/effect/src/test/scala/io/catbird/util/effect/FutureSuite.scala index 424ab90c..c74f7494 100644 --- a/effect/src/test/scala/io/catbird/util/effect/FutureSuite.scala +++ b/effect/src/test/scala/io/catbird/util/effect/FutureSuite.scala @@ -1,16 +1,12 @@ package io.catbird.util.effect import cats.Eq -import cats.effect.laws.discipline.BracketTests -import cats.effect.laws.util.{ TestContext, TestInstances } -import cats.instances.either._ -import cats.instances.int._ -import cats.instances.tuple._ -import cats.instances.unit._ +import cats.laws.discipline.MonadErrorTests +import cats.instances.all._ import cats.laws.discipline.arbitrary._ import com.twitter.conversions.DurationOps._ import com.twitter.util.Future -import io.catbird.util.{ ArbitraryInstances, futureEqWithFailure } +import io.catbird.util.{ ArbitraryInstances, EqInstances, futureEqWithFailure } import org.scalatest.funsuite.AnyFunSuite import org.scalatest.prop.Configuration import org.typelevel.discipline.scalatest.FunSuiteDiscipline @@ -20,10 +16,10 @@ class FutureSuite with FunSuiteDiscipline with Configuration with ArbitraryInstances - with TestInstances { - implicit val context: TestContext = TestContext() + with EqInstances { + implicit def futureEq[A](implicit A: Eq[A]): Eq[Future[A]] = futureEqWithFailure(1.seconds) - checkAll("Future[Int]", BracketTests[Future, Throwable].bracket[Int, Int, Int]) + checkAll("Future[Int]", MonadErrorTests[Future, Throwable].monadError[Int, Int, Int]) } diff --git a/effect/src/test/scala/io/catbird/util/effect/RerunnableClockSuite.scala b/effect/src/test/scala/io/catbird/util/effect/RerunnableClockSuite.scala index 067baa4f..671f28d5 100644 --- a/effect/src/test/scala/io/catbird/util/effect/RerunnableClockSuite.scala +++ b/effect/src/test/scala/io/catbird/util/effect/RerunnableClockSuite.scala @@ -6,7 +6,7 @@ import java.util.concurrent.TimeUnit import cats.effect.Clock import com.twitter.util.Await import io.catbird.util.Rerunnable -import org.scalatest.Outcome +import org.scalatest.{ Outcome } import org.scalatest.concurrent.Eventually import org.scalatest.funsuite.FixtureAnyFunSuite @@ -21,14 +21,12 @@ class RerunnableClockSuite extends FixtureAnyFunSuite with Eventually { protected final class FixtureParam { def now: Instant = Instant.now() - - val clock: Clock[Rerunnable] = RerunnableClock() } test("Retrieval of real time") { f => eventually { val result = Await.result( - f.clock.realTime(TimeUnit.MILLISECONDS).map(Instant.ofEpochMilli).run + Clock[Rerunnable].realTime.map(duration => Instant.ofEpochMilli(duration.toMillis)).run ) assert(java.time.Duration.between(result, f.now).abs().toMillis < 50) @@ -38,7 +36,7 @@ class RerunnableClockSuite extends FixtureAnyFunSuite with Eventually { test("Retrieval of monotonic time") { f => eventually { val result = Await.result( - f.clock.monotonic(TimeUnit.NANOSECONDS).run + Clock[Rerunnable].monotonic.map(duration => duration.toNanos).run ) val durationBetween = Math.abs(System.nanoTime() - result) diff --git a/effect/src/test/scala/io/catbird/util/effect/RerunnableContextShiftSuite.scala b/effect/src/test/scala/io/catbird/util/effect/RerunnableContextShiftSuite.scala deleted file mode 100644 index b58bba20..00000000 --- a/effect/src/test/scala/io/catbird/util/effect/RerunnableContextShiftSuite.scala +++ /dev/null @@ -1,88 +0,0 @@ -package io.catbird.util.effect - -import cats.effect.{ ContextShift, IO, Sync } -import com.twitter.util.{ Await, Future, FuturePool } -import io.catbird.util.Rerunnable -import org.scalatest.Outcome -import org.scalatest.funsuite.FixtureAnyFunSuite - -class RerunnableContextShiftSuite extends FixtureAnyFunSuite with ThreadPoolNamingSupport { - - protected final class FixtureParam { - val futurePoolName = "future-pool" - val otherPoolName = "other-pool" - val ioPoolName = "io-pool" - - val futurePool = FuturePool.interruptible(newNamedThreadPool(futurePoolName)) - val otherPool = newNamedThreadPool(otherPoolName) - val ioPool = newNamedThreadPool(ioPoolName) - - def newIO: IO[String] = IO(currentThreadName()) - - def newFuture: Future[String] = futurePool(currentThreadName()) - - def newRerunnable: Rerunnable[String] = Rerunnable(currentThreadName()) - } - - test("ContextShift[Rerunnable].shift shifts to the pool of the instance") { f => - implicit val cs: ContextShift[Rerunnable] = - RerunnableContextShift.fromExecutionContext(f.ioPool) - - val (poolName1, poolName2, poolName3) = - (for { - poolName1 <- Rerunnable.fromFuture(f.newFuture) - - _ <- ContextShift[Rerunnable](cs).shift - - poolName2 <- Sync[Rerunnable].delay(currentThreadName()) - - poolName3 <- Rerunnable.fromFuture(f.newFuture) - } yield (poolName1, poolName2, poolName3)).run.await - - assert(poolName1 == f.futurePoolName) - assert(poolName2 == f.ioPoolName) - assert(poolName2 == f.ioPoolName) - } - - test("ContextShift[Rerunnable].evalOn executes on correct pool and shifts back to previous pool") { f => - implicit val cs: ContextShift[Rerunnable] = - RerunnableContextShift.fromExecutionContext(f.ioPool) - - val (poolName1, poolName2, poolName3) = - (for { - poolName1 <- f.newRerunnable - - poolName2 <- ContextShift[Rerunnable].evalOn(f.otherPool)(f.newRerunnable) - - poolName3 <- f.newRerunnable - } yield (poolName1, poolName2, poolName3)).run.await - - assert(poolName1 == currentThreadName()) // The first rerunnable is not explicitly evaluated on a dedicated pool - assert(poolName2 == f.otherPoolName) - assert(poolName3 == f.ioPoolName) - } - - test("ContextShift[Rerunnable].evalOn executes on correct pool and shifts back to future pool") { f => - implicit val cs: ContextShift[Rerunnable] = - RerunnableContextShift.fromExecutionContext(f.ioPool) - - val (poolName1, poolName2, poolName3) = - (for { - poolName1 <- Rerunnable.fromFuture(f.newFuture) // The future was started on a dedicated pool (e.g. netty) - - poolName2 <- ContextShift[Rerunnable].evalOn(f.otherPool)(f.newRerunnable) - - poolName3 <- Rerunnable.fromFuture(f.newFuture) - } yield (poolName1, poolName2, poolName3)).run.await - - assert(poolName1 == f.futurePoolName) - assert(poolName2 == f.otherPoolName) - assert(poolName3 == f.futurePoolName) - } - - implicit private class FutureAwaitOps[A](future: Future[A]) { - def await: A = Await.result(future) - } - - override protected def withFixture(test: OneArgTest): Outcome = withFixture(test.toNoArgTest(new FixtureParam)) -} diff --git a/effect/src/test/scala/io/catbird/util/effect/RerunnableSuite.scala b/effect/src/test/scala/io/catbird/util/effect/RerunnableSuite.scala index ccc2d765..5bbdb8b9 100644 --- a/effect/src/test/scala/io/catbird/util/effect/RerunnableSuite.scala +++ b/effect/src/test/scala/io/catbird/util/effect/RerunnableSuite.scala @@ -1,32 +1,36 @@ package io.catbird.util.effect -import cats.effect.laws.discipline.EffectTests -import cats.effect.laws.discipline.arbitrary.catsEffectLawsArbitraryForIO -import cats.effect.laws.util.{ TestContext, TestInstances } -import cats.effect.{ Bracket, IO } +import cats.Eq +import cats.effect.laws.{ ClockTests, MonadCancelTests, SyncTests } +import cats.effect.testkit.TestContext +import cats.effect.{ IO, Outcome, unsafe } +import cats.effect.Resource.Bracket import cats.instances.either._ import cats.instances.int._ import cats.instances.tuple._ import cats.instances.unit._ -import cats.kernel.Eq +import cats.laws.discipline.MonadErrorTests import cats.laws.discipline.arbitrary._ import com.twitter.util.{ Await, Monitor, Throw } -import io.catbird.util.{ ArbitraryInstances, Rerunnable } +import io.catbird.util.{ ArbitraryInstances, EqInstances, Rerunnable } +import org.scalacheck.Prop import org.scalatest.funsuite.AnyFunSuite import org.scalatest.prop.Configuration import org.typelevel.discipline.scalatest.FunSuiteDiscipline +import cats.effect.testkit.SyncTypeGenerators.arbitrarySyncType class RerunnableSuite extends AnyFunSuite with FunSuiteDiscipline with Configuration with ArbitraryInstances - with TestInstances { - implicit val context: TestContext = TestContext() - implicit def rerunnableEq[A](implicit A: Eq[A]): Eq[Rerunnable[A]] = - Eq.by[Rerunnable[A], IO[A]](rerunnableToIO) + with EqInstances + with Runners { - checkAll("Rerunnable[Int]", EffectTests[Rerunnable].effect[Int, Int, Int]) + checkAll("Rerunnable[Int]", MonadErrorTests[Rerunnable, Throwable].monadError[Int, Int, Int]) + checkAll("Rerunnable[Int]", ClockTests[Rerunnable].clock[Int, Int, Int]) + checkAll("Rerunnable[Int]", SyncTests[Rerunnable].sync[Int, Int, Int]) + //checkAll("Rerunnable[Int]", MonadCancelTests[Rerunnable, Throwable].monadCancel[Int, Int, Int]) test("Exceptions thrown by release are handled by Monitor") { val useException = new Exception("thrown by use") @@ -35,7 +39,7 @@ class RerunnableSuite var monitoredException: Throwable = null val monitor = Monitor.mk { case e => monitoredException = e; true; } - val rerunnable = Bracket[Rerunnable, Throwable] + val rerunnable = Bracket[Rerunnable] .bracket(Rerunnable.Unit)(_ => Rerunnable.raiseError(useException))(_ => Rerunnable.raiseError(releaseException)) .liftToTry diff --git a/effect/src/test/scala/io/catbird/util/effect/RerunnableTimerSuite.scala b/effect/src/test/scala/io/catbird/util/effect/RerunnableTimerSuite.scala deleted file mode 100644 index 24bd2a64..00000000 --- a/effect/src/test/scala/io/catbird/util/effect/RerunnableTimerSuite.scala +++ /dev/null @@ -1,37 +0,0 @@ -package io.catbird.util.effect - -import cats.effect.Timer -import org.scalatest.Outcome -import org.scalatest.funsuite.FixtureAnyFunSuite -import com.twitter.util -import com.twitter.util.{ Await, Future } -import io.catbird.util.Rerunnable - -import scala.concurrent.duration._ - -class RerunnableTimerSuite extends FixtureAnyFunSuite { - - protected final class FixtureParam { - val twitterTimer: util.Timer = new util.JavaTimer() - } - - test("A timer can be used to delay execution") { f => - implicit val timer: Timer[Rerunnable] = RerunnableTimer(f.twitterTimer) - - val result = Await.result( - Future.selectIndex( - Vector( - for { - _ <- Timer[Rerunnable].sleep(100.milliseconds).run - r <- Future.value("slow") - } yield r, - Future.value("fast").delayed(util.Duration.fromMilliseconds(50))(f.twitterTimer) - ) - ) - ) - - assert(result == 1) // The first future is delayed for longer, so we expect the second one to win - } - - override protected def withFixture(test: OneArgTest): Outcome = withFixture(test.toNoArgTest(new FixtureParam)) -} diff --git a/effect/src/test/scala/io/catbird/util/effect/Runners.scala b/effect/src/test/scala/io/catbird/util/effect/Runners.scala new file mode 100644 index 00000000..e9451d30 --- /dev/null +++ b/effect/src/test/scala/io/catbird/util/effect/Runners.scala @@ -0,0 +1,62 @@ +package io.catbird.util.effect + +import cats.Eq +import cats.effect.{ IO, Outcome, unsafe } +import cats.effect.testkit.TestContext +import io.catbird.util.{ EqInstances, Rerunnable } +import org.scalacheck.Prop + +import scala.annotation.implicitNotFound +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration._ + +/** + * Test helpers mostly taken from the cats-effect IOSpec. + */ +trait Runners { self: EqInstances => + + implicit val ticker: Ticker = Ticker(TestContext()) + + implicit def eqIOA[A: Eq](implicit ticker: Ticker): Eq[IO[A]] = + Eq.by(unsafeRun(_)) + + implicit def rerunnableEq[A](implicit A: Eq[A]): Eq[Rerunnable[A]] = + Eq.by[Rerunnable[A], IO[A]](rerunnableToIO) + + implicit def boolRunnings(rerunnableB: Rerunnable[Boolean])(implicit ticker: Ticker): Prop = + Prop(unsafeRun(rerunnableToIO(rerunnableB)).fold(false, _ => false, _.getOrElse(false))) + + def unsafeRun[A](ioa: IO[A])(implicit ticker: Ticker): Outcome[Option, Throwable, A] = + try { + var results: Outcome[Option, Throwable, A] = Outcome.Succeeded(None) + + ioa.unsafeRunAsync { + case Left(t) => results = Outcome.Errored(t) + case Right(a) => results = Outcome.Succeeded(Some(a)) + }(unsafe.IORuntime(ticker.ctx, ticker.ctx, scheduler, () => ())) + + ticker.ctx.tickAll(1.days) + + results + } catch { + case t: Throwable => + t.printStackTrace() + throw t + } + + def scheduler(implicit ticker: Ticker): unsafe.Scheduler = + new unsafe.Scheduler { + import ticker.ctx + + def sleep(delay: FiniteDuration, action: Runnable): Runnable = { + val cancel = ctx.schedule(delay, action) + new Runnable { def run() = cancel() } + } + + def nowMillis() = ctx.now().toMillis + def monotonicNanos() = ctx.now().toNanos + } + + @implicitNotFound("could not find an instance of Ticker; try using `in ticked { implicit ticker =>`") + case class Ticker(ctx: TestContext) +}