diff --git a/ReadMe.md b/ReadMe.md index d0f5684..696ee94 100644 --- a/ReadMe.md +++ b/ReadMe.md @@ -334,7 +334,9 @@ to parent size. ### Batch Processing and Progress Dialog -Work in progress, see `ProgressStatusDemoApp` for example of use +Work in progress +* Helper UI for running batch processing tasks, see `BatchRunnerProgressHelperDemoApp` for example of use +* Component for display of progress of batch processing tasks, see `ProgressStatusDemoApp` for example of use Demos diff --git a/build.sbt b/build.sbt index d0a224d..a6382c8 100644 --- a/build.sbt +++ b/build.sbt @@ -9,7 +9,7 @@ import scala.xml.{Node => XmlNode, NodeSeq => XmlNodeSeq, _} // JAR_BUILT_BY - Name to be added to Jar metadata field "Built-By" (defaults to System.getProperty("user.name") // -val projectVersion = "0.9.0.3-SNAPSHOT" +val projectVersion = "0.9.0.5-SNAPSHOT" val versionTagDir = if (projectVersion.endsWith("SNAPSHOT")) "master" else "v." + projectVersion val _scalaVersions = Seq("3.3.3", "2.13.14", "2.12.19") val _scalaVersion = _scalaVersions.head diff --git a/scalafx-extras-demos/src/main/scala-3/org/scalafx/extras/batch/BatchRunnerProgressHelperDemoApp.scala b/scalafx-extras-demos/src/main/scala-3/org/scalafx/extras/batch/BatchRunnerProgressHelperDemoApp.scala new file mode 100644 index 0000000..211a581 --- /dev/null +++ b/scalafx-extras-demos/src/main/scala-3/org/scalafx/extras/batch/BatchRunnerProgressHelperDemoApp.scala @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2000-2023 Jarek Sacha. All Rights Reserved. + * Author's e-mail: jpsacha at gmail.com + */ + +package org.scalafx.extras.batch + +import org.scalafx.extras.BusyWorker +import org.scalafx.extras.BusyWorker.SimpleTask +import scalafx.application.JFXApp3 +import scalafx.application.JFXApp3.PrimaryStage +import scalafx.geometry.Insets +import scalafx.scene.Scene +import scalafx.scene.control.{Button, Label} +import scalafx.scene.layout.VBox +import scalafx.stage.Window + +import scala.util.Random + +object BatchRunnerProgressHelperDemoApp extends JFXApp3: + + private lazy val busyWorker = new BusyWorker(Title, parentWindow) + private val Title = "Batch Processing / Progress Dialog Demo" + + private val nTasks = 100 + private val minTime = 500 + private val maxTime = 1000 + + override def start(): Unit = + stage = new PrimaryStage: + title = Title + scene = new Scene: + content = new VBox: + padding = Insets(21) + spacing = 14 + children = Seq( + new Label( + s"""Press "Run" to initiate processing. + |Wait till processing finished or press "Abort". + |There will be $nTasks processed. + |Tasks will have randomly generated time between $minTime and $maxTime ms. + |If task's time is divisible by 6, that task will fail. + |""".stripMargin + ), + new Button("Run Sequential"): + onAction = (_) => onStart(false) + prefWidth = 120 + , + new Button(" Run Parallel "): + onAction = (_) => onStart(false) + prefWidth = 120 + ) + + private def onStart(useParallelProcessing: Boolean): Unit = busyWorker.doTask("Start") { + new SimpleTask[Unit]: + override def call(): Unit = + val helper = + new BatchRunnerProgressHelper[String]("Sample batch processing", parentWindow, useParallelProcessing): + override def createSampleTasks(): Seq[ItemTask[String]] = + val r = new Random() + (1 to nTasks).map { i => + new ItemTask[String]: + override val name = s"Task #$i" + + override def run(): String = + val t = minTime + r.nextInt(maxTime - minTime) + Thread.sleep(t) + if t % 6 == 0 then throw new Exception(s"Do not like ${t}") + s"name t = $t" + } + helper.run() + } + + def parentWindow: Option[Window] = Option(stage) diff --git a/scalafx-extras-demos/src/main/scala-3/org/scalafx/extras/batch/ParallelBatchRunnerDemo.scala b/scalafx-extras-demos/src/main/scala-3/org/scalafx/extras/batch/ParallelBatchRunnerDemo.scala new file mode 100644 index 0000000..450b8eb --- /dev/null +++ b/scalafx-extras-demos/src/main/scala-3/org/scalafx/extras/batch/ParallelBatchRunnerDemo.scala @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2011-2024, ScalaFX Project + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of the ScalaFX Project nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE SCALAFX PROJECT OR ITS CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package org.scalafx.extras.batch + +import scala.util.{Failure, Success} + + +object ParallelBatchRunnerDemo { + class DemoTaskItem(n: Int) extends ItemTask[Int] { + val name = s"Demo TaskItem $n" + + def run(): Int = { + // println(s"Item ${n} - start") + Thread.sleep(300) + if n == 7 then + // println(s"Item ${n} - error") + throw new IllegalArgumentException(s"Don't give me $n") + + // println(s"Item ${n} - end") + n + } + } + + + def main(args: Array[String]): Unit = { + val items: Seq[DemoTaskItem] = Range(0, 10).map { i => new DemoTaskItem(i) } + + val batchHelper = new ParallelBatchRunner(items, progressUpdate, useParallelProcessing = true) + + val results = batchHelper.execute() + + println() + println("Summarize processing") + results.foreach { + case (name, Success(r)) => println(s"$name: SUCCESS: $r") + case (name, Failure(e)) => println(s"$name: ERROR : ${e.getMessage}") + } + } + + def progressUpdate(running : Long, + successful: Long, + failed : Long, + canceled : Long, + executed : Long, + total : Long, + isCanceled : Boolean, + perc : Double, + message : String) : Unit = { + val m = + f"R:$running%2d, S:$successful%2d, F:$failed%2d, E:$executed%2d, C:$canceled, T:$total%d, " + + f"C:$isCanceled, perc:${perc.toInt}%3d, $message" + println(m) + } + +} diff --git a/scalafx-extras/src/main/scala-3/org/scalafx/extras/batch/BatchRunner.scala b/scalafx-extras/src/main/scala-3/org/scalafx/extras/batch/BatchRunner.scala new file mode 100644 index 0000000..4f40327 --- /dev/null +++ b/scalafx-extras/src/main/scala-3/org/scalafx/extras/batch/BatchRunner.scala @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2011-2024, ScalaFX Project + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of the ScalaFX Project nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE SCALAFX PROJECT OR ITS CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package org.scalafx.extras.batch + +object BatchRunner { + + @FunctionalInterface + trait ProgressUpdater: + def update( + running: Long, + successful: Long, + failed: Long, + canceled: Long, + executed: Long, + total: Long, + isCanceled: Boolean, + perc: Double, + message: String + ): Unit +} +trait BatchRunner[T, I <: ItemTask[T]] { + + import BatchRunner.ProgressUpdater + + // TODO: Is this trait needed + + protected def itemTasks: Seq[I] + + protected def progressUpdater: ProgressUpdater +} diff --git a/scalafx-extras/src/main/scala-3/org/scalafx/extras/batch/BatchRunnerProgressHelper.scala b/scalafx-extras/src/main/scala-3/org/scalafx/extras/batch/BatchRunnerProgressHelper.scala new file mode 100644 index 0000000..7716767 --- /dev/null +++ b/scalafx-extras/src/main/scala-3/org/scalafx/extras/batch/BatchRunnerProgressHelper.scala @@ -0,0 +1,244 @@ +/* + * Copyright (c) 2011-2024, ScalaFX Project + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of the ScalaFX Project nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE SCALAFX PROJECT OR ITS CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package org.scalafx.extras.batch + +import org.scalafx.extras.progress_dialog.ProgressStatusDialog +import org.scalafx.extras.{offFX, onFX, onFXAndWait} +import scalafx.geometry.{Insets, Pos} +import scalafx.stage.Window + +import java.util.concurrent.atomic.AtomicBoolean +import scala.util.{Failure, Success} + +abstract class BatchRunnerProgressHelper[T]( + val title: String, + val parentWindow: Option[Window], + val useParallelProcessing: Boolean +): + + def run(): Unit = + // TODO: handle exceptions + val itemTasks: Seq[ItemTask[T]] = createSampleTasks() + processItems(itemTasks) + + private def processItems(items: Seq[ItemTask[T]]): Unit = + + val abort = new AtomicBoolean(false) + val abortingMessage = title + " - processing aborted by user. Waiting to complete..." + + var progressStatus: ProgressStatusDialog = null + + @FunctionalInterface + def progressUpdate( + running: Long, + successful: Long, + failed: Long, + canceled: Long, + executed: Long, + total: Long, + isCanceled: Boolean, + perc: Double, + message: String + ): Unit = { + // val m = + // f"R:$running%2d, S:$successful%2d, F:$failed%2d, E:$executed%2d, T:$total%d, C:$canceled%d" + + // f"C:$isCanceled, perc:${perc.toInt}%3d, $message" + // println(m) + + onFX { + progressStatus.progress.value = perc / 100d + progressStatus.statusText.value = + if abort.get then abortingMessage else s"Processed ${executed.toInt} of $total..." + progressStatus.totalCount.value = f"$total%d" + progressStatus.processedCount.value = f"$executed%d" + progressStatus.successfulCount.value = f"$successful%d" + progressStatus.failedCount.value = f"$failed%d" + progressStatus.cancelledCount.value = f"$canceled%d" + } + } + + try + + val runner = new ParallelBatchRunner(items, progressUpdate, useParallelProcessing) + + // Initialize status updates + { + progressStatus = onFXAndWait { + new ProgressStatusDialog(s"$title - Batch processing progress", parentWindow) + } + progressStatus.abortFlag.onChange { (_, _, newValue) => + // println(s"abortFlag changed to $newValue") + if newValue then { + offFX { + runner.cancel() + } + } + } + + onFX { + progressStatus.show() + progressStatus.progress.value = -0.01 + progressStatus.statusText.value = s"Processed 0 of ${items.length}..." + } + } + + // TODO deal with canceled execution + val results = runner.execute() + + // println() + // println("Summarize processing") + // results.foreach { + // case (name, Success(r)) => println(s"$name: SUCCESS: $r") + // case (name, Failure(e)) => println(s"$name: ERROR : ${Option(e.getMessage).getOrElse(e.getClass.getName)}") + // } + + val counts = CountSummary( + total = progressStatus.totalCount.value, + successful = progressStatus.successfulCount.value, + failed = progressStatus.failedCount.value, + cancelled = progressStatus.cancelledCount.value + ) + + val errorDetails: Seq[String] = + results.flatMap { + case (name, Success(r1)) => + r1 match + case Success(r2) => + r2 match + case Left(value) => + Option(s"$name: $value") + case Right(_) => + None + case Failure(e2) => + Option(s"$name: ERROR: ${Option(e2.getMessage).getOrElse(e2.getClass.getName)}") + + case (name, Failure(e)) => + Option(s"$name: ERROR: ${Option(e.getMessage).getOrElse(e.getClass.getName)}") + } + + showFinalSummary(counts, errorDetails, Option(progressStatus.window)) + + // + // results.foreach { + // case Success(r) => + // // TODO implement details + // println(r) + // case Failure(e) => + // println(s"ERROR : ${Option(e.getMessage).getOrElse(e.getClass.getName)}") + // e.printStackTrace() + // } + + catch + case t: Throwable => + t.printStackTrace() + throw t + finally + onFX { + Option(progressStatus).foreach(_.close()) + } + + end processItems + + private def showFinalSummary(counts: CountSummary, errorDetails: Seq[String], parentWindow: Option[Window]): Unit = { + + import scalafx.Includes.* + import scalafx.scene.control.Alert.AlertType + import scalafx.scene.control.{Alert, Label, TextArea} + import scalafx.scene.layout.{GridPane, Priority} + + // Rename `title` to avoid name clashes + val dialogTitle = title + + // Pane with count of each outcome type + val contentPane: GridPane = new GridPane: + private var rowCount = 0 + + def nLabel(s: String): Label = + new Label(s): + alignmentInParent = Pos.CenterRight + + def addRow(label: String, value: String): Unit = + add(Label(label), 0, rowCount) + add(nLabel(value), 1, rowCount) + rowCount += 1 + + padding = Insets(14, 14, 14, 28) + hgap = 14 + addRow("Total", counts.total) + addRow("Successful", counts.successful) + addRow("Failed", counts.failed) + addRow("Cancelled", counts.cancelled) + + val noErrors = counts.total == counts.successful + + // Optional pane showing list of errors + val errorDetailPane = + if noErrors then + None + else + val label = new Label("Processing errors:") + val textArea = new TextArea: + text = errorDetails.mkString("\n") + editable = false + wrapText = true + maxWidth = Double.MaxValue + maxHeight = Double.MaxValue + vgrow = Priority.Always + hgrow = Priority.Always + + val expContent = new GridPane: + maxWidth = Double.MaxValue + add(label, 0, 0) + add(textArea, 0, 1) + + Some(expContent) + + val alertType = if noErrors then AlertType.Information else AlertType.Warning + + // Create and show the dialog + onFXAndWait { + new Alert(alertType) { + initOwner(parentWindow.orNull) + this.title = dialogTitle + headerText = "Item processing summary" + dialogPane().content = contentPane + parentWindow.foreach { w => dialogPane().stylesheets = w.scene().stylesheets } + + // Set expandable Exception into the dialog pane, if there are errors top report + errorDetailPane.foreach(p => dialogPane().expandableContent = p) + + }.showAndWait() + } + + } + + def createSampleTasks(): Seq[ItemTask[T]] + + private case class CountSummary(total: String, successful: String, failed: String, cancelled: String) + +end BatchRunnerProgressHelper diff --git a/scalafx-extras/src/main/scala-3/org/scalafx/extras/batch/ItemTask.scala b/scalafx-extras/src/main/scala-3/org/scalafx/extras/batch/ItemTask.scala new file mode 100644 index 0000000..6881d97 --- /dev/null +++ b/scalafx-extras/src/main/scala-3/org/scalafx/extras/batch/ItemTask.scala @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2011-2024, ScalaFX Project + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of the ScalaFX Project nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE SCALAFX PROJECT OR ITS CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package org.scalafx.extras.batch + +/** + * Processing task that can be run by a batch processor + * + * @tparam T type of the status result of processing an item, it can be `Unit` + */ +trait ItemTask[T]: + + // TODO add 'cancel' flag and listener callback + + /** Name of the item */ + def name: String + + /** Task performed by the item */ + def run(): T diff --git a/scalafx-extras/src/main/scala-3/org/scalafx/extras/batch/ParallelBatchRunner.scala b/scalafx-extras/src/main/scala-3/org/scalafx/extras/batch/ParallelBatchRunner.scala new file mode 100644 index 0000000..a554368 --- /dev/null +++ b/scalafx-extras/src/main/scala-3/org/scalafx/extras/batch/ParallelBatchRunner.scala @@ -0,0 +1,199 @@ +/* + * Copyright (c) 2011-2024, ScalaFX Project + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of the ScalaFX Project nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE SCALAFX PROJECT OR ITS CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package org.scalafx.extras.batch + +import java.util +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference} +import java.util.concurrent.{Callable, Executors, ThreadPoolExecutor, Future as JFuture} +import scala.jdk.FutureConverters.* +import scala.util.{Failure, Success, Try} + +class ParallelBatchRunner[T, I <: ItemTask[T]]( + protected val itemTasks: Seq[I], + protected val progressUpdater: BatchRunner.ProgressUpdater, + protected val useParallelProcessing: Boolean +) extends BatchRunner[T, I] { + private val executor = { + val processors: Int = Runtime.getRuntime.availableProcessors + val processorsToUse = if useParallelProcessing then Math.max(1, processors - 1) else 1 +// println(s"Using $processorsToUse processors") + Executors.newFixedThreadPool(processorsToUse).asInstanceOf[ThreadPoolExecutor] + } + + private val total = itemTasks.length + + enum TaskState: + case NotStarted, Running, Succeeded, Failed, Cancelled + + private class TaskHelper[T, I <: ItemTask[T]](val itemTask: I) extends Callable[Try[Either[String, T]]] { + + // TODO: redesign return type to be a 3-state `Try`: `Success`, `Failure`, `Cancelled` + + private val _state = new AtomicReference[TaskState](TaskState.NotStarted) + + def state: TaskState = _state.get() + + private def state_=(v: TaskState): Unit = _state.set(v) + + /** + * @return `Success` when finished without exception or `Failure` with the exception. + * If a task completes a run, the `Success` will contain `Either.Right` with the result. + * If a task was canceled and did not run, `Success` will contain `Either.Left`. + */ + override def call(): Try[Either[String, T]] = { + if isCanceled then +// println(s"Task Cancelled: ${itemTask.name}") + state = TaskState.Cancelled + incrementCancelled() + + Success(Left("Cancelled")) + else + try { + state = TaskState.Running + incrementRunning() + + val result = itemTask.run() + + state = TaskState.Succeeded + incrementSucceeded() + + Success(Right(result)) + } catch { + case t: Throwable => + state = TaskState.Failed + incrementFailed() + + Failure(t) + } + } + } + + private val _runningCount = new AtomicLong(0) + private val _successfulCount = new AtomicLong(0) + private val _failedCount = new AtomicLong(0) + private val _executedCount = new AtomicLong(0) + private val _canceledCount = new AtomicLong(0) + private val _cancelFlag = new AtomicBoolean(false) + + private def incrementRunning(): Unit = { + _runningCount.incrementAndGet() + updateState() + } + + private def incrementSucceeded(): Unit = { + _runningCount.decrementAndGet() + _successfulCount.incrementAndGet() + _executedCount.incrementAndGet() + updateState() + } + + private def incrementCancelled(): Unit = { + _canceledCount.incrementAndGet() + _executedCount.incrementAndGet() + updateState() + } + + private def incrementFailed(): Unit = { + _runningCount.decrementAndGet() + _failedCount.incrementAndGet() + _executedCount.incrementAndGet() + updateState() + } + + private def updateState(): Unit = { + + // TODO: Update state on a separate thread to avoid blocking by the callback `progressUpdate` of current thread + + val perc = _executedCount.get().toDouble / total.toDouble * 100 + progressUpdater.update( + running = runningCount, + successful = successfulCount, + failed = failedCount, + executed = executedCount, + canceled = canceledCount, + total = total, + isCanceled = isCanceled, + perc = perc, + message = s"${_executedCount.get()}/$total" + ) + } + + def runningCount: Long = _runningCount.get() + + def successfulCount: Long = _successfulCount.get() + + def failedCount: Long = _failedCount.get() + + def canceledCount: Long = _canceledCount.get() + + def executedCount: Long = _executedCount.get() + + def isCanceled: Boolean = _cancelFlag.get() + + /** + * Cancel execution. + */ + def cancel(): Unit = { + + // Prevent new tasks from tanning computations + _cancelFlag.set(true) + + // TODO: set cancel flag for each itemTask + // itemTasks.foreach(i => i.name) + // updateState() + } + + /** + * @return results returned by each task or error + */ + def execute(): Seq[(String, Try[Try[Either[String, T]]])] = { + val batchTasks: Seq[TaskHelper[T, I]] = itemTasks.map(item => new TaskHelper(item)) + + // val futures = batchTasks.map { t => executor.submit(t).asInstanceOf[JFuture[T]] } + + // Submit tasks + val namedFutures: Seq[(String, JFuture[Try[Either[String, T]]])] = + batchTasks.map(t => (t.itemTask.name, executor.submit(t))) + // _futures = Option(futures) + + // Mark for executor shutdown when all tasks finished + executor.shutdown() + + while executor.getActiveCount > 0 do { + Thread.sleep(100) + } + + // println(s"Executor getTaskCount : ${executor.getTaskCount}") + // println(s"Executor getActiveCount : ${executor.getActiveCount}") + // println(s"Executor getCompletedTaskCount: ${executor.getCompletedTaskCount}") + + val result = namedFutures.map((name, f) => (name, Try(f.get()))) +// println("Completed waiting for the futures") + result + } +}