-
Notifications
You must be signed in to change notification settings - Fork 23
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add module 'catbird-effect3' for cats-effect 3.x
- Loading branch information
Showing
8 changed files
with
353 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
35 changes: 35 additions & 0 deletions
35
effect3/src/main/scala/io/catbird/util/effect/FutureInstances.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
package io.catbird.util.effect | ||
|
||
import cats.effect.kernel.{ MonadCancel, Outcome } | ||
import com.twitter.util.{ Future, Monitor } | ||
import io.catbird.util.FutureMonadError | ||
|
||
import java.lang.Throwable | ||
|
||
import scala.Unit | ||
|
||
trait FutureInstances { | ||
implicit final val futureMonadCancelInstance | ||
: MonadCancel[Future, Throwable] with MonadCancel.Uncancelable[Future, Throwable] = | ||
new FutureMonadError with MonadCancel[Future, Throwable] with MonadCancel.Uncancelable[Future, Throwable] { | ||
|
||
final override def forceR[A, B](fa: Future[A])(fb: Future[B]): Future[B] = | ||
fa.liftToTry.flatMap { resultA => | ||
resultA.handle(Monitor.catcher) | ||
fb | ||
} | ||
|
||
/** | ||
* Special implementation so exceptions in release are cought by the `Monitor`. | ||
*/ | ||
final override def bracketCase[A, B](acquire: Future[A])(use: A => Future[B])( | ||
release: (A, Outcome[Future, Throwable, B]) => Future[Unit] | ||
): Future[B] = | ||
acquire | ||
.flatMap(a => | ||
use(a).liftToTry | ||
.flatMap(result => release(a, tryToFutureOutcome(result)).handle(Monitor.catcher).map(_ => result)) | ||
) | ||
.lowerFromTry | ||
} | ||
} |
53 changes: 53 additions & 0 deletions
53
effect3/src/main/scala/io/catbird/util/effect/RerunnableInstances.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
package io.catbird.util.effect | ||
|
||
import cats.effect.Clock | ||
import cats.effect.kernel.{ MonadCancel, Outcome, Sync } | ||
import com.twitter.util.{ Future, Monitor } | ||
import io.catbird.util.{ Rerunnable, RerunnableMonadError } | ||
|
||
import java.lang.Throwable | ||
import java.util.concurrent.TimeUnit | ||
import java.lang.System | ||
|
||
import scala.Unit | ||
import scala.concurrent.duration.FiniteDuration | ||
|
||
trait RerunnableInstances { | ||
implicit final val rerunnableInstance | ||
: Sync[Rerunnable] with Clock[Rerunnable] with MonadCancel.Uncancelable[Rerunnable, Throwable] = | ||
new RerunnableMonadError | ||
with Sync[Rerunnable] | ||
with Clock[Rerunnable] | ||
with MonadCancel.Uncancelable[Rerunnable, Throwable] { | ||
|
||
final override def suspend[A](hint: Sync.Type)(thunk: => A): Rerunnable[A] = | ||
Rerunnable(thunk) | ||
|
||
final override def realTime: Rerunnable[FiniteDuration] = | ||
Rerunnable(FiniteDuration(System.currentTimeMillis(), TimeUnit.MILLISECONDS)) | ||
|
||
final override def monotonic: Rerunnable[FiniteDuration] = | ||
Rerunnable(FiniteDuration(System.nanoTime(), TimeUnit.NANOSECONDS)) | ||
|
||
final override def forceR[A, B](fa: Rerunnable[A])(fb: Rerunnable[B]): Rerunnable[B] = | ||
fa.liftToTry.flatMap { resultA => | ||
resultA.handle(Monitor.catcher) | ||
fb | ||
} | ||
|
||
/** | ||
* Special implementation so exceptions in release are cought by the `Monitor`. | ||
*/ | ||
final override def bracketCase[A, B](acquire: Rerunnable[A])(use: A => Rerunnable[B])( | ||
release: (A, Outcome[Rerunnable, Throwable, B]) => Rerunnable[Unit] | ||
): Rerunnable[B] = new Rerunnable[B] { | ||
final def run: Future[B] = | ||
acquire.run.flatMap { a => | ||
val future = use(a).run | ||
future.transform(result => | ||
release(a, tryToRerunnableOutcome(result)).run.handle(Monitor.catcher).flatMap(_ => future) | ||
) | ||
} | ||
} | ||
} | ||
} |
57 changes: 57 additions & 0 deletions
57
effect3/src/main/scala/io/catbird/util/effect/package.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
package io.catbird.util | ||
|
||
import cats.effect.{ Async, IO } | ||
import com.twitter.util.{ Future, Return, Throw, Try } | ||
|
||
import java.lang.Throwable | ||
|
||
import cats.effect.Outcome | ||
import cats.effect.kernel.Resource.ExitCase | ||
|
||
import scala.util.{ Left, Right } | ||
|
||
package object effect extends FutureInstances with RerunnableInstances { | ||
|
||
/** | ||
* Converts the `Future` to `F`. | ||
*/ | ||
def futureToAsync[F[_], A](fa: => Future[A])(implicit F: Async[F]): F[A] = F.async_ { k => | ||
fa.respond { | ||
case Return(a) => k(Right(a)) | ||
case Throw(err) => k(Left(err)) | ||
} | ||
} | ||
|
||
/** | ||
* Converts the `Rerunnable` to `IO`. | ||
*/ | ||
final def rerunnableToIO[A](fa: Rerunnable[A]): IO[A] = | ||
futureToAsync[IO, A](fa.run) | ||
|
||
/** | ||
* Convert a twitter-util Try to cats-effect ExitCase | ||
*/ | ||
final def tryToExitCase[A](ta: Try[A]): ExitCase = | ||
ta match { | ||
case Return(_) => ExitCase.Succeeded | ||
case Throw(e) => ExitCase.Errored(e) | ||
} | ||
|
||
/** | ||
* Convert a twitter-util Try to cats-effect Outcome for Rerunnable | ||
*/ | ||
final def tryToRerunnableOutcome[A](ta: Try[A]): Outcome[Rerunnable, Throwable, A] = | ||
ta match { | ||
case Return(a) => Outcome.Succeeded(Rerunnable.const(a)) | ||
case Throw(e) => Outcome.Errored(e) | ||
} | ||
|
||
/** | ||
* Convert a twitter-util Try to cats-effect Outcome for Future | ||
*/ | ||
final def tryToFutureOutcome[A](ta: Try[A]): Outcome[Future, Throwable, A] = | ||
ta match { | ||
case Return(a) => Outcome.Succeeded(Future.value(a)) | ||
case Throw(e) => Outcome.Errored(e) | ||
} | ||
} |
27 changes: 27 additions & 0 deletions
27
effect3/src/test/scala/io/catbird/util/effect/FutureSuite.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
package io.catbird.util.effect | ||
|
||
import cats.Eq | ||
import cats.effect.laws.MonadCancelTests | ||
import cats.instances.all._ | ||
import cats.laws.discipline.MonadErrorTests | ||
import cats.laws.discipline.arbitrary._ | ||
import com.twitter.conversions.DurationOps._ | ||
import com.twitter.util.Future | ||
import io.catbird.util.{ ArbitraryInstances, EqInstances, futureEqWithFailure } | ||
import org.scalatest.funsuite.AnyFunSuite | ||
import org.scalatest.prop.Configuration | ||
import org.typelevel.discipline.scalatest.FunSuiteDiscipline | ||
|
||
class FutureSuite | ||
extends AnyFunSuite | ||
with FunSuiteDiscipline | ||
with Configuration | ||
with ArbitraryInstances | ||
with EqInstances { | ||
|
||
implicit def futureEq[A](implicit A: Eq[A]): Eq[Future[A]] = | ||
futureEqWithFailure(1.seconds) | ||
|
||
checkAll("Future[Int]", MonadErrorTests[Future, Throwable].monadError[Int, Int, Int]) | ||
checkAll("Future[Int]", MonadCancelTests[Future, Throwable].monadCancel[Int, Int, Int]) | ||
} |
48 changes: 48 additions & 0 deletions
48
effect3/src/test/scala/io/catbird/util/effect/RerunnableClockSuite.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
package io.catbird.util.effect | ||
|
||
import java.time.Instant | ||
import java.util.concurrent.TimeUnit | ||
|
||
import cats.effect.Clock | ||
import com.twitter.util.Await | ||
import io.catbird.util.Rerunnable | ||
import org.scalatest.{ Outcome } | ||
import org.scalatest.concurrent.{Eventually, IntegrationPatience} | ||
import org.scalatest.funsuite.FixtureAnyFunSuite | ||
|
||
/** | ||
* We'll use `eventually` and a reasonably big tolerance here to prevent CI from failing if it is a bit slow. | ||
* | ||
* Technically the implementation is just an extremely thin wrapper around `System.currentTimeMillis()` | ||
* and `System.nanoTime()` so as long as the result is the same order of magnitude (and therefore the | ||
* unit-conversion is correct) we should be fine. | ||
*/ | ||
class RerunnableClockSuite extends FixtureAnyFunSuite with Eventually with IntegrationPatience { | ||
|
||
protected final class FixtureParam { | ||
def now: Instant = Instant.now() | ||
} | ||
|
||
test("Retrieval of real time") { f => | ||
eventually { | ||
val result = Await.result( | ||
Clock[Rerunnable].realTime.map(duration => Instant.ofEpochMilli(duration.toMillis)).run | ||
) | ||
|
||
assert(java.time.Duration.between(result, f.now).abs().toMillis < 50) | ||
} | ||
} | ||
|
||
test("Retrieval of monotonic time") { f => | ||
eventually { | ||
val result = Await.result( | ||
Clock[Rerunnable].monotonic.map(duration => duration.toNanos).run | ||
) | ||
|
||
val durationBetween = Math.abs(System.nanoTime() - result) | ||
assert(TimeUnit.MILLISECONDS.convert(durationBetween, TimeUnit.NANOSECONDS) < 5) | ||
} | ||
} | ||
|
||
override protected def withFixture(test: OneArgTest): Outcome = withFixture(test.toNoArgTest(new FixtureParam)) | ||
} |
48 changes: 48 additions & 0 deletions
48
effect3/src/test/scala/io/catbird/util/effect/RerunnableSuite.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
package io.catbird.util.effect | ||
|
||
import cats.effect.MonadCancel | ||
import cats.effect.kernel.testkit.SyncTypeGenerators | ||
import cats.effect.laws.{ ClockTests, MonadCancelTests, SyncTests } | ||
import cats.instances.either._ | ||
import cats.instances.int._ | ||
import cats.instances.tuple._ | ||
import cats.instances.unit._ | ||
import cats.laws.discipline.MonadErrorTests | ||
import cats.laws.discipline.arbitrary._ | ||
import com.twitter.util.{ Await, Monitor, Throw } | ||
import io.catbird.util.{ ArbitraryInstances, EqInstances, Rerunnable } | ||
import org.scalatest.funsuite.AnyFunSuite | ||
import org.scalatest.prop.Configuration | ||
import org.typelevel.discipline.scalatest.FunSuiteDiscipline | ||
|
||
class RerunnableSuite | ||
extends AnyFunSuite | ||
with FunSuiteDiscipline | ||
with Configuration | ||
with ArbitraryInstances | ||
with SyncTypeGenerators | ||
with EqInstances | ||
with Runners { | ||
|
||
checkAll("Rerunnable[Int]", ClockTests[Rerunnable].clock) | ||
checkAll("Rerunnable[Int]", MonadErrorTests[Rerunnable, Throwable].monadError[Int, Int, Int]) | ||
checkAll("Rerunnable[Int]", MonadCancelTests[Rerunnable, Throwable].monadCancel[Int, Int, Int]) | ||
checkAll("Rerunnable[Int]", SyncTests[Rerunnable].sync[Int, Int, Int]) | ||
|
||
test("Exceptions thrown by release are handled by Monitor") { | ||
val useException = new Exception("thrown by use") | ||
val releaseException = new Exception("thrown by release") | ||
|
||
var monitoredException: Throwable = null | ||
val monitor = Monitor.mk { case e => monitoredException = e; true; } | ||
|
||
val rerunnable = MonadCancel[Rerunnable, Throwable] | ||
.bracket(Rerunnable.Unit)(_ => Rerunnable.raiseError(useException))(_ => Rerunnable.raiseError(releaseException)) | ||
.liftToTry | ||
|
||
val result = Await.result(Monitor.using(monitor)(rerunnable.run)) | ||
|
||
assert(result == Throw(useException)) | ||
assert(monitoredException == releaseException) | ||
} | ||
} |
64 changes: 64 additions & 0 deletions
64
effect3/src/test/scala/io/catbird/util/effect/Runners.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
package io.catbird.util.effect | ||
|
||
import cats.Eq | ||
import cats.effect.{ IO, Outcome, unsafe } | ||
import cats.effect.testkit.TestContext | ||
import cats.effect.unsafe.IORuntimeConfig | ||
import io.catbird.util.{ EqInstances, Rerunnable } | ||
import org.scalacheck.Prop | ||
|
||
import scala.annotation.implicitNotFound | ||
import scala.concurrent.duration.FiniteDuration | ||
import scala.concurrent.duration._ | ||
import scala.language.implicitConversions | ||
|
||
/** | ||
* Test helpers mostly taken from the cats-effect IOSpec. | ||
*/ | ||
trait Runners { self: EqInstances => | ||
|
||
implicit val ticker: Ticker = Ticker(TestContext()) | ||
|
||
implicit def eqIOA[A: Eq](implicit ticker: Ticker): Eq[IO[A]] = | ||
Eq.by(unsafeRun(_)) | ||
|
||
implicit def rerunnableEq[A](implicit A: Eq[A]): Eq[Rerunnable[A]] = | ||
Eq.by[Rerunnable[A], IO[A]](rerunnableToIO) | ||
|
||
implicit def boolRunnings(rerunnableB: Rerunnable[Boolean])(implicit ticker: Ticker): Prop = | ||
Prop(unsafeRun(rerunnableToIO(rerunnableB)).fold(false, _ => false, _.getOrElse(false))) | ||
|
||
def unsafeRun[A](ioa: IO[A])(implicit ticker: Ticker): Outcome[Option, Throwable, A] = | ||
try { | ||
var results: Outcome[Option, Throwable, A] = Outcome.Succeeded(None) | ||
|
||
ioa.unsafeRunAsync { | ||
case Left(t) => results = Outcome.Errored(t) | ||
case Right(a) => results = Outcome.Succeeded(Some(a)) | ||
}(unsafe.IORuntime(ticker.ctx, ticker.ctx, scheduler, () => (), IORuntimeConfig())) | ||
|
||
ticker.ctx.tickAll(1.days) | ||
|
||
results | ||
} catch { | ||
case t: Throwable => | ||
t.printStackTrace() | ||
throw t | ||
} | ||
|
||
def scheduler(implicit ticker: Ticker): unsafe.Scheduler = | ||
new unsafe.Scheduler { | ||
import ticker.ctx | ||
|
||
def sleep(delay: FiniteDuration, action: Runnable): Runnable = { | ||
val cancel = ctx.schedule(delay, action) | ||
new Runnable { def run() = cancel() } | ||
} | ||
|
||
def nowMillis() = ctx.now().toMillis | ||
def monotonicNanos() = ctx.now().toNanos | ||
} | ||
|
||
@implicitNotFound("could not find an instance of Ticker; try using `in ticked { implicit ticker =>`") | ||
case class Ticker(ctx: TestContext) | ||
} |