-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor(BREAKING): Back [Tasks] with coroutines instead of javafx Ta…
…sks. Really just meant for coroutine compatibility with Java. In most cases in Kotlin code, it is sufficient to use coroutines/async/await directly
- Loading branch information
Showing
5 changed files
with
106 additions
and
312 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,264 +1,96 @@ | ||
package org.janelia.saalfeldlab.fx | ||
|
||
import com.google.common.util.concurrent.ThreadFactoryBuilder | ||
import io.github.oshai.kotlinlogging.KotlinLogging | ||
import javafx.beans.value.ChangeListener | ||
import javafx.concurrent.Task | ||
import javafx.concurrent.Worker | ||
import javafx.concurrent.Worker.State.* | ||
import javafx.concurrent.WorkerStateEvent | ||
import javafx.event.EventHandler | ||
import org.janelia.saalfeldlab.fx.util.InvokeOnJavaFXApplicationThread | ||
import java.util.concurrent.ExecutorService | ||
import java.util.concurrent.Executors | ||
import java.util.concurrent.ThreadFactory | ||
import kotlinx.coroutines.* | ||
import java.util.function.BiConsumer | ||
import java.util.function.Consumer | ||
import java.util.function.Function | ||
import java.util.function.Supplier | ||
|
||
/** | ||
* Utility class for workign with [UtilityTask] | ||
*/ | ||
class Tasks private constructor() { | ||
|
||
companion object { | ||
@JvmSynthetic | ||
fun <T> createTask(call: (UtilityTask<T>) -> T): UtilityTask<T> { | ||
return UtilityTask(call) | ||
fun <T> createTask(call: suspend () -> T): UtilityTask<T> { | ||
return UtilityTask(CoroutineScope(Dispatchers.Default)) { call() } | ||
} | ||
|
||
@JvmStatic | ||
fun <T> createTask(call: Function<UtilityTask<T>, T>): UtilityTask<T> { | ||
return createTask { call.apply(it) } | ||
fun <T> createTask(call: Supplier<T>): UtilityTask<T> { | ||
return createTask { call.get() } | ||
} | ||
|
||
@JvmStatic | ||
fun createTask(call: Consumer<UtilityTask<Unit>>): UtilityTask<Unit> { | ||
return createTask { call.accept(it) } | ||
fun createTask(call: Runnable): UtilityTask<Unit> { | ||
return createTask { call.run() } | ||
} | ||
} | ||
} | ||
|
||
private val THREAD_FACTORY: ThreadFactory = ThreadFactoryBuilder() | ||
.setDaemon(true) | ||
.setNameFormat("task-thread-%d") | ||
.build() | ||
|
||
private val TASK_SERVICE = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() - 1, THREAD_FACTORY) | ||
|
||
/** | ||
* Convenience wrapper class around [Task] | ||
* | ||
* @param V type of super class Task<V> | ||
* @property onCall called during [Task.call], but wrapped with exception handling | ||
* @constructor Create empty Utility task | ||
*/ | ||
class UtilityTask<V>(private val onCall: (UtilityTask<V>) -> V) : Task<V>() { | ||
|
||
private var executorService : ExecutorService = TASK_SERVICE | ||
private var onFailedSet = false | ||
@Suppress("OPT_IN_USAGE") | ||
class UtilityTask<V> internal constructor( | ||
private val scope: CoroutineScope = CoroutineScope(Dispatchers.Default), | ||
private val block: suspend CoroutineScope.() -> V | ||
) : Deferred<V> by scope.async(block = block) { | ||
|
||
companion object { | ||
private val LOG = KotlinLogging.logger { } | ||
} | ||
|
||
override fun call(): V? { | ||
try { | ||
/* If no `onEnd/onFail` has been set, then we should listen for thrown exceptions and throw them */ | ||
if (!onFailedSet) setDefaultOnFailed() | ||
return onCall(this) | ||
} catch (e: Exception) { | ||
if (isCancelled) { | ||
LOG.trace(e) { "Task Cancelled (cancelled=$isCancelled)" } | ||
return null | ||
} | ||
throw RuntimeException(e) | ||
} | ||
private val LOG = KotlinLogging.logger { } | ||
} | ||
|
||
public override fun updateValue(value: V) { | ||
super.updateValue(value) | ||
} | ||
|
||
private fun setDefaultOnFailed() { | ||
InvokeOnJavaFXApplicationThread { | ||
this.onFailed { _, task -> LOG.error(task.exception) {"Task Failed"} } | ||
@JvmSynthetic | ||
fun onSuccess(onSuccess: (V) -> Unit) = apply { | ||
invokeOnCompletion { cause -> | ||
cause ?: onSuccess(getCompleted()) | ||
} | ||
} | ||
|
||
/** | ||
* Builder-style function to set [SUCCEEDED] callback. | ||
* | ||
* @param append flag to determine behavior if an existing `onSuccess` callback is present: | ||
* - if `true`, the current callback will be called prior to this `consumer` being called | ||
* - if `false`, the prior callback will be removed and never called. | ||
* - if `null`, this will throw a runtime exception if an existing callback is present. | ||
* - This is meant to help unintended overrides of existing callbacks when `append` is not explicitly specified | ||
* @param consumer to be called when [SUCCEEDED] | ||
* @return this | ||
*/ | ||
@JvmSynthetic | ||
fun onSuccess(append: Boolean? = null, consumer: (WorkerStateEvent, UtilityTask<V>) -> Unit): UtilityTask<V> { | ||
val appendCallbacks = onSucceeded?.appendCallbacks(append, consumer) | ||
val consumerEvent = EventHandler<WorkerStateEvent> { event -> consumer(event, this) } | ||
setOnSucceeded(appendCallbacks ?: consumerEvent) | ||
return this | ||
fun onSuccess(onSuccess: Consumer<V>) = apply { | ||
onSuccess { onSuccess.accept(it) } | ||
} | ||
|
||
/** | ||
* Builder-style function to set [CANCELLED] callback. | ||
* | ||
* @param append flag to determine behavior if an existing `onCancelled` callback is present: | ||
* - if `true`, the current callback will be called prior to this `consumer` being called | ||
* - if `false`, the prior callback will be removed and never called. | ||
* - if `null`, this will throw a runtime exception if an existing callback is present. | ||
* - This is meant to help unintended overrides of existing callbacks when `append` is not explicitly specified | ||
* @param consumer to be called when [CANCELLED] | ||
* @return this | ||
*/ | ||
@JvmSynthetic | ||
fun onCancelled(append: Boolean? = null, consumer: (WorkerStateEvent, UtilityTask<V>) -> Unit): UtilityTask<V> { | ||
val appendCallbacks = onCancelled?.appendCallbacks(append, consumer) | ||
val consumerEvent = EventHandler<WorkerStateEvent> { event -> consumer(event, this) } | ||
setOnCancelled(appendCallbacks ?: consumerEvent) | ||
return this | ||
fun onCancelled(onCancelled: (CancellationException) -> Unit) = apply { | ||
invokeOnCompletion { cause -> | ||
(cause as? CancellationException)?.let { onCancelled(it) } | ||
} | ||
} | ||
|
||
/** | ||
* Builder-style function to set [FAILED] callback. | ||
* | ||
* @param append flag to determine behavior if an existing `onFailed` callback is present: | ||
* - if `true`, the current callback will be called prior to this `consumer` being called | ||
* - if `false`, the prior callback will be removed and never called. | ||
* - if `null`, this will throw a runtime exception if an existing callback is present. | ||
* - This is meant to help unintended overrides of existing callbacks when `append` is not explicitly specified | ||
* @param consumer to be called when [FAILED] | ||
* @return this | ||
*/ | ||
@JvmSynthetic | ||
fun onFailed(append: Boolean? = null, consumer: (WorkerStateEvent, UtilityTask<V>) -> Unit): UtilityTask<V> { | ||
this.onFailedSet = true | ||
val eventHandler = onFailed?.appendCallbacks(append, consumer) ?: EventHandler { event -> consumer(event, this) } | ||
this.setOnFailed(eventHandler) | ||
return this | ||
fun onCancelled(onCancelled: Consumer<CancellationException>) = apply { | ||
onCancelled { onCancelled.accept(it) } | ||
} | ||
|
||
|
||
private var onEndListener: ChangeListener<Worker.State>? = null | ||
|
||
/** | ||
* Builder-style function to set when the task ends, either by [SUCCEEDED], [CANCELLED], or [FAILED]. | ||
* | ||
* @param append flag to determine behavior if an existing `onEnd` callback is present: | ||
* - if `true`, the current callback will be called prior to this `consumer` being called | ||
* - if `false`, the prior callback will be removed and never called. | ||
* - if `null`, this will throw a runtime exception if an existing callback is present. | ||
* - This is meant to help unintended overrides of existing callbacks when `append` is not explicitly specified | ||
* @param consumer to be called when task ends | ||
* @return this | ||
*/ | ||
@JvmSynthetic | ||
fun onEnd(append: Boolean? = null, consumer: (UtilityTask<V>) -> Unit): UtilityTask<V> { | ||
//TODO Caleb: Consider renaming `onEnd` to `finally` since this is trigger on end for ANY reason, even if an | ||
// Exception was thrown. Or Maybe a separate `finally` which does what this currently does, and then change `onEnd` | ||
// to NOT trigger if an excpetion occures (that isn't handled by the exception handler) | ||
onEndListener = onEndListener?.let { oldListener -> | ||
stateProperty().removeListener(oldListener) | ||
if (append == null) | ||
throw TaskStateCallbackOverrideException("Overriding existing handler; If intentional, pass `false` for `append`") | ||
if (append) { | ||
ChangeListener { obs, oldv, newv -> | ||
when (newv) { | ||
SUCCEEDED, CANCELLED, FAILED -> { | ||
oldListener.changed(obs, oldv, newv) | ||
consumer(this) | ||
} | ||
|
||
else -> Unit | ||
} | ||
} | ||
} else null | ||
} ?: ChangeListener { _, _, newv -> | ||
when (newv) { | ||
SUCCEEDED, CANCELLED, FAILED -> consumer(this) | ||
else -> Unit | ||
fun onFailed(onFailed: (Throwable) -> Unit) = apply { | ||
invokeOnCompletion { cause -> | ||
when (cause) { | ||
null, is CancellationException -> Unit | ||
else -> onFailed(cause) | ||
} | ||
} | ||
this.stateProperty().addListener(onEndListener) | ||
return this | ||
} | ||
|
||
|
||
/** | ||
* | ||
* @see [onSuccess] | ||
*/ | ||
@JvmOverloads | ||
fun onSuccess(append: Boolean? = null, consumer: BiConsumer<WorkerStateEvent, UtilityTask<V>>): UtilityTask<V> { | ||
return onSuccess(append) { e, t -> consumer.accept(e, t) } | ||
} | ||
|
||
/** | ||
* | ||
* @see [onCancelled] | ||
*/ | ||
@JvmOverloads | ||
fun onCancelled(append: Boolean? = null, consumer: BiConsumer<WorkerStateEvent, UtilityTask<V>>): UtilityTask<V> { | ||
return onCancelled(append) { e, t -> consumer.accept(e, t) } | ||
fun onFailed(onFailed: Consumer<Throwable>) = apply { | ||
onFailed { onFailed.accept(it) } | ||
} | ||
|
||
/** | ||
* | ||
* @see [onFailed] | ||
*/@JvmOverloads | ||
fun onFailed(append: Boolean? = null, consumer: BiConsumer<WorkerStateEvent, UtilityTask<V>>): UtilityTask<V> { | ||
return onFailed(append) { e, t -> consumer.accept(e, t) } | ||
} | ||
|
||
/** | ||
* | ||
* @see [onEnd] | ||
*/ | ||
@JvmOverloads | ||
fun onEnd(append: Boolean? = null, consumer: Consumer<UtilityTask<V>>): UtilityTask<V> { | ||
return onEnd(append) { t -> consumer.accept(t) } | ||
@JvmSynthetic | ||
fun onEnd(onEnd: (V?, Throwable?) -> Unit) = apply { | ||
invokeOnCompletion { cause -> | ||
val (value, error) = cause?.let { null to it } ?: (getCompleted() to null) | ||
onEnd(value, error) | ||
} | ||
} | ||
|
||
/** | ||
* Submit this task to the [executorService]. | ||
* | ||
* @param executorService to execute this task on. | ||
* @return this task | ||
*/ | ||
@JvmOverloads | ||
fun submit(executorService: ExecutorService = this.executorService) : UtilityTask<V> { | ||
this.executorService = executorService | ||
this.executorService.submit(this) | ||
return this | ||
fun onEnd(onEnd: BiConsumer<V?, Throwable?>) = apply { | ||
onEnd { result, cause -> onEnd.accept(result, cause) } | ||
} | ||
|
||
/** | ||
* Submit this task to the [executorService], and block while waiting for it to return. | ||
* This will return after the task completes, but possibbly BEFORE the [onSuccess]/[onEnd] call finish. | ||
* | ||
* @param executorService to execute this task on. | ||
* @return the result of this task, blocking if not yet done. | ||
*/ | ||
@JvmOverloads | ||
fun submitAndWait(executorService: ExecutorService = this.executorService): V { | ||
this.executorService = executorService | ||
this.executorService.submit(this) | ||
return this.get() | ||
} | ||
fun get() = runBlocking { await() } | ||
|
||
private fun EventHandler<WorkerStateEvent>.appendCallbacks(append: Boolean? = false, consumer: (WorkerStateEvent, UtilityTask<V>) -> Unit): EventHandler<WorkerStateEvent>? { | ||
if (append == null) throw TaskStateCallbackOverrideException("Overriding existing handler; If intentional, pass `false` for `append`") | ||
if (!append) return null | ||
return EventHandler { event -> | ||
this.handle(event) | ||
consumer(event, this@UtilityTask) | ||
} | ||
fun wait() = apply { | ||
runBlocking { join() } | ||
} | ||
|
||
private class TaskStateCallbackOverrideException(override val message: String?) : RuntimeException(message) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.