diff --git a/core/jvm-native/src/main/scala/cats/effect/unsafe/PollingSystem.scala b/core/jvm-native/src/main/scala/cats/effect/unsafe/PollingSystem.scala index cc36ef2b8d..1587799ec1 100644 --- a/core/jvm-native/src/main/scala/cats/effect/unsafe/PollingSystem.scala +++ b/core/jvm-native/src/main/scala/cats/effect/unsafe/PollingSystem.scala @@ -91,6 +91,23 @@ abstract class PollingSystem { */ def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean + /** + * Makes a best-effort to steal completed I/O events. Not all polling systems support this. + * + * This method is safe to call concurrently from threads that do not own the poller. + * + * @param poller + * the thread-local [[Poller]] used to poll events. + * + * @param reportFailure + * callback that handles any failures that occur during stealing. + * + * @return + * whether any events were stolen. e.g. if the method returned due to timeout, this should + * be `false`. + */ + def steal(poller: Poller, reportFailure: Throwable => Unit): Boolean + /** * @return * whether poll should be called again (i.e., there are more events to be polled) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/SelectorSystem.scala b/core/jvm/src/main/scala/cats/effect/unsafe/SelectorSystem.scala index bbb853b947..2bdc432ff4 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/SelectorSystem.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/SelectorSystem.scala @@ -63,7 +63,7 @@ final class SelectorSystem private (provider: SelectorProvider) extends PollingS } catch { case ex if NonFatal(ex) => error = ex - readyOps = -1 // interest all waiters + readyOps = -1 // notify all waiters } val value = if (error ne null) Left(error) else Right(readyOps) @@ -98,6 +98,8 @@ final class SelectorSystem private (provider: SelectorProvider) extends PollingS } else false } + def steal(poller: Poller, reportFailure: Throwable => Unit): Boolean = false + def needsPoll(poller: Poller): Boolean = !poller.selector.keys().isEmpty() diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/SleepSystem.scala b/core/jvm/src/main/scala/cats/effect/unsafe/SleepSystem.scala index 46ffa909e3..af0cfc1869 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/SleepSystem.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/SleepSystem.scala @@ -42,6 +42,8 @@ object SleepSystem extends PollingSystem { false } + def steal(poller: Poller, reportFailure: Throwable => Unit): Boolean = false + def needsPoll(poller: Poller): Boolean = false def interrupt(targetThread: Thread, targetPoller: Poller): Unit = diff --git a/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala index 9874edb58e..c415c6b9f5 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala @@ -62,6 +62,8 @@ object EpollSystem extends PollingSystem { def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean = poller.poll(nanos) + def steal(poller: Poller, reportFailure: Throwable => Unit): Boolean = false + def needsPoll(poller: Poller): Boolean = poller.needsPoll() def interrupt(targetThread: Thread, targetPoller: Poller): Unit = () diff --git a/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala index 3a26a4eb6d..54cca6f15c 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala @@ -61,6 +61,8 @@ object KqueueSystem extends PollingSystem { def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean = poller.poll(nanos) + def steal(poller: Poller, reportFailure: Throwable => Unit): Boolean = false + def needsPoll(poller: Poller): Boolean = poller.needsPoll() diff --git a/core/native/src/main/scala/cats/effect/unsafe/PollingExecutorScheduler.scala b/core/native/src/main/scala/cats/effect/unsafe/PollingExecutorScheduler.scala index c37a16677f..6f3ac8fb0e 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/PollingExecutorScheduler.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/PollingExecutorScheduler.scala @@ -43,6 +43,7 @@ abstract class PollingExecutorScheduler(pollEvery: Int) poller.poll(nanos.nanos) true } + def steal(poller: Poller, reportFailure: Throwable => Unit): Boolean = false def needsPoll(poller: Poller) = needsPoll def interrupt(targetThread: Thread, targetPoller: Poller): Unit = () } diff --git a/core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala index cea4bca406..6a3ead7c36 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala @@ -36,6 +36,8 @@ object SleepSystem extends PollingSystem { false } + def steal(poller: Poller, reportFailure: Throwable => Unit): Boolean = false + def needsPoll(poller: Poller): Boolean = false def interrupt(targetThread: Thread, targetPoller: Poller): Unit = () diff --git a/tests/jvm/src/test/scala/cats/effect/IOPlatformSpecification.scala b/tests/jvm/src/test/scala/cats/effect/IOPlatformSpecification.scala index 63ad3e78ca..cbd448c127 100644 --- a/tests/jvm/src/test/scala/cats/effect/IOPlatformSpecification.scala +++ b/tests/jvm/src/test/scala/cats/effect/IOPlatformSpecification.scala @@ -513,6 +513,8 @@ trait IOPlatformSpecification extends DetectPlatform { self: BaseSpec with Scala } } + def steal(poller: Poller, reportFailure: Throwable => Unit) = false + def makeApi(access: (Poller => Unit) => Unit): DummySystem.Api = new DummyPoller { def poll = IO.async_[Unit] { cb =>