Skip to content

Commit

Permalink
WIP: Rerunnable: Sync + Clock + MonadError
Browse files Browse the repository at this point in the history
  • Loading branch information
felixbr committed Oct 24, 2020
1 parent 440bbfb commit 056bfa4
Show file tree
Hide file tree
Showing 14 changed files with 159 additions and 395 deletions.
7 changes: 4 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
val catsVersion = "2.0.0"
val catsEffectVersion = "2.0.0"
val catsVersion = "2.2.0"
val catsEffectVersion = "3.0.0-M2"
val utilVersion = "20.9.0"
val finagleVersion = "20.9.0"

Expand Down Expand Up @@ -92,7 +92,8 @@ lazy val effect = project
.settings(
libraryDependencies ++= Seq(
"org.typelevel" %% "cats-effect" % catsEffectVersion,
"org.typelevel" %% "cats-effect-laws" % catsEffectVersion % Test
"org.typelevel" %% "cats-effect-laws" % catsEffectVersion % Test,
"org.typelevel" %% "cats-effect-testkit" % catsEffectVersion % Test
),
scalacOptions in Test ~= {
_.filterNot(Set("-Yno-imports", "-Yno-predef"))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
package io.catbird.util.effect

import cats.effect.{ Bracket, ExitCase }
import com.twitter.util.{ Future, Monitor }
import com.twitter.util.{Future, Monitor}
import io.catbird.util.FutureMonadError
import java.lang.Throwable

import cats.effect.kernel.Resource.{Bracket, ExitCase}

import scala.Unit

trait FutureInstances {
implicit final val futureBracketInstance: Bracket[Future, Throwable] =
new FutureMonadError with Bracket[Future, Throwable] {
implicit final val futureBracketInstance: Bracket[Future] =
new FutureMonadError with Bracket[Future] {
final def bracketCase[A, B](acquire: Future[A])(use: A => Future[B])(
release: (A, ExitCase[Throwable]) => Future[Unit]
release: (A, ExitCase) => Future[Unit]
): Future[B] =
acquire
.flatMap(a =>
Expand Down
24 changes: 0 additions & 24 deletions effect/src/main/scala/io/catbird/util/effect/RerunnableClock.scala

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,18 +1,52 @@
package io.catbird.util.effect

import cats.effect.{ Effect, ExitCase, IO, SyncIO }
import cats.effect.{ Clock, IO, SyncIO }
import com.twitter.util.{ Future, Monitor, Promise }
import io.catbird.util.{ Rerunnable, RerunnableMonadError }
import java.lang.Throwable
import java.util.concurrent.TimeUnit
import java.lang.System

import cats.Applicative
import cats.effect.kernel.{ MonadCancel, Poll, Sync }
import cats.effect.kernel.Resource.{ Bracket, ExitCase }

import scala.Unit
import scala.concurrent.duration.FiniteDuration
import scala.util.{ Either, Left, Right }

trait RerunnableInstances {
implicit final val rerunnableInstance: Sync[Rerunnable] with Clock[Rerunnable] with Bracket[Rerunnable] =
new RerunnableMonadError with Sync[Rerunnable] with Clock[Rerunnable] with Bracket[Rerunnable] {

final override def suspend[A](hint: Sync.Type)(thunk: => A): Rerunnable[A] =
Rerunnable(thunk)

final override def realTime: Rerunnable[FiniteDuration] =
Rerunnable(FiniteDuration(System.currentTimeMillis(), TimeUnit.MILLISECONDS))

final override def monotonic: Rerunnable[FiniteDuration] =
Rerunnable(FiniteDuration(System.nanoTime(), TimeUnit.NANOSECONDS))

final override def bracketCase[A, B](acquire: Rerunnable[A])(use: A => Rerunnable[B])(
release: (A, ExitCase) => Rerunnable[Unit]
): Rerunnable[B] = new Rerunnable[B] {
final def run: Future[B] =
acquire.run.flatMap { a =>
val future = use(a).run
future.transform(result =>
release(a, tryToExitCase(result)).run.handle(Monitor.catcher).flatMap(_ => future)
)
}
}
}

/*
implicit final val rerunnableEffectInstance: Effect[Rerunnable] =
new RerunnableMonadError with Effect[Rerunnable] {
new RerunnableMonadError { // with Effect[Rerunnable] {
final def suspend[A](thunk: => Rerunnable[A]): Rerunnable[A] = Rerunnable.suspend[A](thunk)
override final def delay[A](thunk: => A): Rerunnable[A] = Rerunnable[A](thunk)
//override final def delay[A](thunk: => A): Rerunnable[A] = Rerunnable[A](thunk)
final def async[A](k: (Either[Throwable, A] => Unit) => Unit): Rerunnable[A] =
new Rerunnable[A] {
Expand Down Expand Up @@ -50,11 +84,11 @@ trait RerunnableInstances {
}
}
final def runAsync[A](fa: Rerunnable[A])(cb: Either[Throwable, A] => IO[Unit]): SyncIO[Unit] =
rerunnableToIO[A](fa).runAsync(cb)
// final def runAsync[A](fa: Rerunnable[A])(cb: Either[Throwable, A] => IO[Unit]): SyncIO[Unit] =
// rerunnableToIO[A](fa).runAsync(cb)
final def bracketCase[A, B](acquire: Rerunnable[A])(use: A => Rerunnable[B])(
release: (A, ExitCase[Throwable]) => Rerunnable[Unit]
release: (A, ExitCase) => Rerunnable[Unit]
): Rerunnable[B] = new Rerunnable[B] {
final def run: Future[B] =
acquire.run.flatMap { a =>
Expand All @@ -65,4 +99,5 @@ trait RerunnableInstances {
}
}
}
*/
}
46 changes: 0 additions & 46 deletions effect/src/main/scala/io/catbird/util/effect/RerunnableTimer.scala

This file was deleted.

42 changes: 20 additions & 22 deletions effect/src/main/scala/io/catbird/util/effect/package.scala
Original file line number Diff line number Diff line change
@@ -1,53 +1,51 @@
package io.catbird.util

import cats.effect.{ Async, ContextShift, ExitCase, IO }
import cats.effect.{ Async, IO }
import com.twitter.util.{ Future, Return, Throw, Try }

import scala.None
import java.lang.Throwable

import cats.effect.Outcome
import cats.effect.kernel.Resource.ExitCase

import scala.util.{ Left, Right }

package object effect extends FutureInstances with RerunnableInstances {

/**
* Converts the `Future` to `F` without changing the underlying execution (same thread pool!).
* Converts the `Future` to `F`.
*/
def futureToAsync[F[_], A](fa: => Future[A])(implicit F: Async[F]): F[A] = F.async { k =>
fa.respond {
case Return(a) => k(Right[Throwable, A](a))
case Throw(err) => k(Left[Throwable, A](err))
}
}

/**
* The same as `futureToAsync` but doesn't stay on the thread pool of the `Future` and instead shifts execution
* back to the one provided by `ContextShift[F]` (which is usually the default one).
*
* This is likely what you want when you interact with libraries that return a `Future` like `finagle-http` where
* the `Future` is running on a thread pool controlled by the library (e.g. the underlying Netty pool).
* It also is closer to the behavior of `IO.fromFuture` for Scala futures which also shifts back.
*/
def futureToAsyncAndShift[F[_], A](fa: => Future[A])(implicit F: Async[F], CS: ContextShift[F]): F[A] =
F.guarantee(futureToAsync[F, A](fa))(CS.shift)
F.pure(None) // No cancel token
}

/**
* Converts the `Rerunnable` to `F` without changing the underlying execution (same thread pool!).
* Converts the `Rerunnable` to `IO`.
*/
final def rerunnableToIO[A](fa: Rerunnable[A]): IO[A] =
futureToAsync[IO, A](fa.run)

/**
* The same as `rerunnableToIO` but doesn't stay on the thread pool of the `Rerunnable` and instead shifts execution
* back to the one provided by `ContextShift[F]` (which is usually the default one).
* Convert a twitter-util Try to cats-effect ExitCase
*/
final def rerunnableToIOAndShift[A](fa: Rerunnable[A])(implicit CS: ContextShift[IO]): IO[A] =
futureToAsyncAndShift[IO, A](fa.run)
final def tryToExitCase[A](ta: Try[A]): ExitCase =
ta match {
case Return(_) => ExitCase.Succeeded
case Throw(e) => ExitCase.Errored(e)
}

/**
* Convert a twitter-util Try to cats-effect ExitCase
* Convert a twitter-util Try to cats-effect Outcome
*/
final def tryToExitCase[A](ta: Try[A]): ExitCase[Throwable] =
final def tryToRerunnableOutcome[A](ta: Try[A]): Outcome[Rerunnable, Throwable, A] =
ta match {
case Return(_) => ExitCase.complete
case Throw(e) => ExitCase.error(e)
case Return(a) => Outcome.Succeeded(Rerunnable.const(a))
case Throw(e) => Outcome.Errored(e)
}
}
Loading

0 comments on commit 056bfa4

Please sign in to comment.