diff --git a/ReadMe.md b/ReadMe.md index 696ee94..bd25a82 100644 --- a/ReadMe.md +++ b/ReadMe.md @@ -335,7 +335,7 @@ to parent size. ### Batch Processing and Progress Dialog Work in progress -* Helper UI for running batch processing tasks, see `BatchRunnerProgressHelperDemoApp` for example of use +* Helper UI for running batch processing tasks, see `BatchRunnerProgressDemoApp` for example of use * Component for display of progress of batch processing tasks, see `ProgressStatusDemoApp` for example of use diff --git a/build.sbt b/build.sbt index a6382c8..cff21c4 100644 --- a/build.sbt +++ b/build.sbt @@ -9,7 +9,7 @@ import scala.xml.{Node => XmlNode, NodeSeq => XmlNodeSeq, _} // JAR_BUILT_BY - Name to be added to Jar metadata field "Built-By" (defaults to System.getProperty("user.name") // -val projectVersion = "0.9.0.5-SNAPSHOT" +val projectVersion = "0.9.0.6-SNAPSHOT" val versionTagDir = if (projectVersion.endsWith("SNAPSHOT")) "master" else "v." + projectVersion val _scalaVersions = Seq("3.3.3", "2.13.14", "2.12.19") val _scalaVersion = _scalaVersions.head diff --git a/scalafx-extras-demos/src/main/scala-3/org/scalafx/extras/batch/BatchRunnerProgressHelperDemoApp.scala b/scalafx-extras-demos/src/main/scala-3/org/scalafx/extras/batch/BatchRunnerProgressHelperDemoApp.scala deleted file mode 100644 index 211a581..0000000 --- a/scalafx-extras-demos/src/main/scala-3/org/scalafx/extras/batch/BatchRunnerProgressHelperDemoApp.scala +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright (c) 2000-2023 Jarek Sacha. All Rights Reserved. - * Author's e-mail: jpsacha at gmail.com - */ - -package org.scalafx.extras.batch - -import org.scalafx.extras.BusyWorker -import org.scalafx.extras.BusyWorker.SimpleTask -import scalafx.application.JFXApp3 -import scalafx.application.JFXApp3.PrimaryStage -import scalafx.geometry.Insets -import scalafx.scene.Scene -import scalafx.scene.control.{Button, Label} -import scalafx.scene.layout.VBox -import scalafx.stage.Window - -import scala.util.Random - -object BatchRunnerProgressHelperDemoApp extends JFXApp3: - - private lazy val busyWorker = new BusyWorker(Title, parentWindow) - private val Title = "Batch Processing / Progress Dialog Demo" - - private val nTasks = 100 - private val minTime = 500 - private val maxTime = 1000 - - override def start(): Unit = - stage = new PrimaryStage: - title = Title - scene = new Scene: - content = new VBox: - padding = Insets(21) - spacing = 14 - children = Seq( - new Label( - s"""Press "Run" to initiate processing. - |Wait till processing finished or press "Abort". - |There will be $nTasks processed. - |Tasks will have randomly generated time between $minTime and $maxTime ms. - |If task's time is divisible by 6, that task will fail. - |""".stripMargin - ), - new Button("Run Sequential"): - onAction = (_) => onStart(false) - prefWidth = 120 - , - new Button(" Run Parallel "): - onAction = (_) => onStart(false) - prefWidth = 120 - ) - - private def onStart(useParallelProcessing: Boolean): Unit = busyWorker.doTask("Start") { - new SimpleTask[Unit]: - override def call(): Unit = - val helper = - new BatchRunnerProgressHelper[String]("Sample batch processing", parentWindow, useParallelProcessing): - override def createSampleTasks(): Seq[ItemTask[String]] = - val r = new Random() - (1 to nTasks).map { i => - new ItemTask[String]: - override val name = s"Task #$i" - - override def run(): String = - val t = minTime + r.nextInt(maxTime - minTime) - Thread.sleep(t) - if t % 6 == 0 then throw new Exception(s"Do not like ${t}") - s"name t = $t" - } - helper.run() - } - - def parentWindow: Option[Window] = Option(stage) diff --git a/scalafx-extras-demos/src/main/scala-3/org/scalafx/extras/batch/BatchRunnerWithProgressDemoApp.scala b/scalafx-extras-demos/src/main/scala-3/org/scalafx/extras/batch/BatchRunnerWithProgressDemoApp.scala new file mode 100644 index 0000000..309e78a --- /dev/null +++ b/scalafx-extras-demos/src/main/scala-3/org/scalafx/extras/batch/BatchRunnerWithProgressDemoApp.scala @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2000-2023 Jarek Sacha. All Rights Reserved. + * Author's e-mail: jpsacha at gmail.com + */ + +package org.scalafx.extras.batch + +import org.scalafx.extras.BusyWorker +import org.scalafx.extras.BusyWorker.SimpleTask +import scalafx.application.JFXApp3 +import scalafx.application.JFXApp3.PrimaryStage +import scalafx.geometry.Insets +import scalafx.scene.Scene +import scalafx.scene.control.{Button, Label} +import scalafx.scene.layout.VBox +import scalafx.stage.Window + +import scala.util.Random + +/** + * Example of a task that for batch execution. + */ +private class MyTask(val i: Int) extends ItemTask[String] { + + import MyTask.* + + val name = s"Task #$i" + + def run(): String = + val t = minTime + new Random().nextInt(maxTime - minTime) + Thread.sleep(t) + if t % 6 != 0 then + s"name t = $t" + else + throw new Exception(s"Do not like $t") +} + +object MyTask { + val minTime = 500 + val maxTime = 1000 +} + +/** + * Demo of `BatchRunnerWithProgress` GUI + * @author Jarek Sacha + */ +object BatchRunnerProgressDemoApp extends JFXApp3: + + private lazy val busyWorker = new BusyWorker(Title, parentWindow) + private val Title = "Batch Processing / Progress Dialog Demo" + private val nTasks = 10 + + override def start(): Unit = + stage = new PrimaryStage: + title = Title + scene = new Scene: + content = new VBox: + padding = Insets(21) + spacing = 14 + children = Seq( + new Label( + s"""Press "Run x" to initiate processing. + |Wait till processing finished or press "Abort". + |There will be $nTasks processed. + |Tasks will have randomly generated execution time, + |between ${MyTask.minTime} and ${MyTask.maxTime} ms. + |If task's time is divisible by 6, that task will fail. + |""".stripMargin + ), + new Button("Run as Sequence"): + onAction = _ => onStart(false) + prefWidth = 120 + , + new Button("Run in Parallel"): + onAction = _ => onStart(true) + prefWidth = 120 + ) + + private def onStart(runInParallel: Boolean): Unit = busyWorker.doTask("Start") { () => + type ResultType = String + val helper = + new BatchRunnerWithProgress[ResultType]("Sample batch processing", Option(stage), runInParallel): + def createTasks(): Seq[ItemTask[ResultType]] = (1 to nTasks).map { i => new MyTask(i) } + + helper.run() + } + + private def parentWindow: Option[Window] = Option(stage) + +end BatchRunnerProgressDemoApp diff --git a/scalafx-extras-demos/src/main/scala-3/org/scalafx/extras/batch/ParallelBatchRunnerDemo.scala b/scalafx-extras-demos/src/main/scala-3/org/scalafx/extras/batch/ParallelBatchRunnerDemo.scala index 450b8eb..6f233f7 100644 --- a/scalafx-extras-demos/src/main/scala-3/org/scalafx/extras/batch/ParallelBatchRunnerDemo.scala +++ b/scalafx-extras-demos/src/main/scala-3/org/scalafx/extras/batch/ParallelBatchRunnerDemo.scala @@ -29,9 +29,16 @@ package org.scalafx.extras.batch import scala.util.{Failure, Success} +import scala.util.{Failure, Success, Try} +/** + * Example of using `ParallelBatchRunner` and `ItemTask`. + * Results are printed to standard output. + * + * @author Jarek Sacha + */ object ParallelBatchRunnerDemo { - class DemoTaskItem(n: Int) extends ItemTask[Int] { + private class DemoTaskItem(n: Int) extends ItemTask[Int] { val name = s"Demo TaskItem $n" def run(): Int = { @@ -46,13 +53,12 @@ object ParallelBatchRunnerDemo { } } - def main(args: Array[String]): Unit = { val items: Seq[DemoTaskItem] = Range(0, 10).map { i => new DemoTaskItem(i) } val batchHelper = new ParallelBatchRunner(items, progressUpdate, useParallelProcessing = true) - val results = batchHelper.execute() + val results: Seq[(String, Try[Option[Int]])] = batchHelper.execute() println() println("Summarize processing") @@ -62,19 +68,21 @@ object ParallelBatchRunnerDemo { } } - def progressUpdate(running : Long, - successful: Long, - failed : Long, - canceled : Long, - executed : Long, - total : Long, - isCanceled : Boolean, - perc : Double, - message : String) : Unit = { + @FunctionalInterface + private def progressUpdate( + running: Long, + successful: Long, + failed: Long, + canceled: Long, + executed: Long, + total: Long, + isCanceled: Boolean, + perc: Double, + message: String + ): Unit = { val m = f"R:$running%2d, S:$successful%2d, F:$failed%2d, E:$executed%2d, C:$canceled, T:$total%d, " + f"C:$isCanceled, perc:${perc.toInt}%3d, $message" println(m) } - } diff --git a/scalafx-extras-demos/src/main/scala-3/org/scalafx/extras/progress_dialog/ProgressStatusDemoApp.scala b/scalafx-extras-demos/src/main/scala-3/org/scalafx/extras/progress_dialog/ProgressStatusDialogDemoApp.scala similarity index 82% rename from scalafx-extras-demos/src/main/scala-3/org/scalafx/extras/progress_dialog/ProgressStatusDemoApp.scala rename to scalafx-extras-demos/src/main/scala-3/org/scalafx/extras/progress_dialog/ProgressStatusDialogDemoApp.scala index bf10532..9aa9e68 100644 --- a/scalafx-extras-demos/src/main/scala-3/org/scalafx/extras/progress_dialog/ProgressStatusDemoApp.scala +++ b/scalafx-extras-demos/src/main/scala-3/org/scalafx/extras/progress_dialog/ProgressStatusDialogDemoApp.scala @@ -32,51 +32,44 @@ import org.scalafx.extras.{initFX, offFX, onFX, onFXAndWait} import scalafx.application.Platform /** - * Example showing use of ProgressStatusDialog - */ -object ProgressStatusDemoApp { + * Example showing use of ProgressStatusDialog + */ +object ProgressStatusDialogDemoApp: // TODO implement simulated processing using batch processing backend - def main(args: Array[String]): Unit = { + def main(args: Array[String]): Unit = initFX() Platform.implicitExit = true - val progressStatusDialog = onFXAndWait { - new ProgressStatusDialog("Processing sample tasks", None) - } + val progressStatusDialog = onFXAndWait: + new ProgressStatusDialog("Progress Status Dialog Demo", None) progressStatusDialog.abortFlag.onChange { (_, _, newValue) => - if newValue then { + if newValue then // Do not block UI, but wait till shutdown completed - offFX { - // Simulate delay due to shut down + offFX: + // Simulate delay due to shutdown Thread.sleep(3000) - onFX { + onFX: progressStatusDialog.close() - } - } - } } - onFXAndWait { + onFXAndWait: progressStatusDialog.show() - } + val n = 500 - for i <- 1 to n if !progressStatusDialog.abortFlag.value do { - onFX { + for i <- 1 to n if !progressStatusDialog.abortFlag.value do + onFX: progressStatusDialog.statusText.value = s"Processing item $i / $n" progressStatusDialog.progress.value = i / n.toDouble - } + Thread.sleep(250) - } // In case of abort leave to abort handler o close the dialog when shutdown actions are complete if !progressStatusDialog.abortFlag.value then - onFX { + onFX: progressStatusDialog.close() - } - } -} +end ProgressStatusDialogDemoApp diff --git a/scalafx-extras/src/main/scala-3/org/scalafx/extras/batch/BatchRunner.scala b/scalafx-extras/src/main/scala-3/org/scalafx/extras/batch/BatchRunner.scala index 4f40327..dd99dad 100644 --- a/scalafx-extras/src/main/scala-3/org/scalafx/extras/batch/BatchRunner.scala +++ b/scalafx-extras/src/main/scala-3/org/scalafx/extras/batch/BatchRunner.scala @@ -27,7 +27,7 @@ package org.scalafx.extras.batch -object BatchRunner { +object BatchRunner: @FunctionalInterface trait ProgressUpdater: @@ -42,14 +42,13 @@ object BatchRunner { perc: Double, message: String ): Unit -} -trait BatchRunner[T, I <: ItemTask[T]] { - import BatchRunner.ProgressUpdater +trait BatchRunner[T, I <: ItemTask[T]]: + + // TODO Is BatchRunner trait needed? It is not used on its own? - // TODO: Is this trait needed + import BatchRunner.ProgressUpdater protected def itemTasks: Seq[I] protected def progressUpdater: ProgressUpdater -} diff --git a/scalafx-extras/src/main/scala-3/org/scalafx/extras/batch/BatchRunnerProgressHelper.scala b/scalafx-extras/src/main/scala-3/org/scalafx/extras/batch/BatchRunnerWithProgress.scala similarity index 87% rename from scalafx-extras/src/main/scala-3/org/scalafx/extras/batch/BatchRunnerProgressHelper.scala rename to scalafx-extras/src/main/scala-3/org/scalafx/extras/batch/BatchRunnerWithProgress.scala index 7716767..39851a5 100644 --- a/scalafx-extras/src/main/scala-3/org/scalafx/extras/batch/BatchRunnerProgressHelper.scala +++ b/scalafx-extras/src/main/scala-3/org/scalafx/extras/batch/BatchRunnerWithProgress.scala @@ -33,20 +33,24 @@ import scalafx.geometry.{Insets, Pos} import scalafx.stage.Window import java.util.concurrent.atomic.AtomicBoolean -import scala.util.{Failure, Success} +import scala.util.{Failure, Success, Try} -abstract class BatchRunnerProgressHelper[T]( +abstract class BatchRunnerWithProgress[T]( val title: String, val parentWindow: Option[Window], val useParallelProcessing: Boolean ): - def run(): Unit = + import BatchRunnerWithProgress.TaskResult + + def run(): Seq[TaskResult[T]] = // TODO: handle exceptions - val itemTasks: Seq[ItemTask[T]] = createSampleTasks() + + val itemTasks: Seq[ItemTask[T]] = createTasks() + processItems(itemTasks) - private def processItems(items: Seq[ItemTask[T]]): Unit = + private def processItems(items: Seq[ItemTask[T]]): Seq[TaskResult[T]] = val abort = new AtomicBoolean(false) val abortingMessage = title + " - processing aborted by user. Waiting to complete..." @@ -64,7 +68,7 @@ abstract class BatchRunnerProgressHelper[T]( isCanceled: Boolean, perc: Double, message: String - ): Unit = { + ): Unit = // val m = // f"R:$running%2d, S:$successful%2d, F:$failed%2d, E:$executed%2d, T:$total%d, C:$canceled%d" + // f"C:$isCanceled, perc:${perc.toInt}%3d, $message" @@ -73,14 +77,13 @@ abstract class BatchRunnerProgressHelper[T]( onFX { progressStatus.progress.value = perc / 100d progressStatus.statusText.value = - if abort.get then abortingMessage else s"Processed ${executed.toInt} of $total..." + if abort.get then abortingMessage else s"Processed ${executed.toInt} of $total - $message" progressStatus.totalCount.value = f"$total%d" progressStatus.processedCount.value = f"$executed%d" progressStatus.successfulCount.value = f"$successful%d" progressStatus.failedCount.value = f"$failed%d" progressStatus.cancelledCount.value = f"$canceled%d" } - } try @@ -93,11 +96,10 @@ abstract class BatchRunnerProgressHelper[T]( } progressStatus.abortFlag.onChange { (_, _, newValue) => // println(s"abortFlag changed to $newValue") - if newValue then { + if newValue then offFX { runner.cancel() } - } } onFX { @@ -108,7 +110,7 @@ abstract class BatchRunnerProgressHelper[T]( } // TODO deal with canceled execution - val results = runner.execute() + val results: Seq[(String, Try[Option[T]])] = runner.execute() // println() // println("Summarize processing") @@ -126,23 +128,22 @@ abstract class BatchRunnerProgressHelper[T]( val errorDetails: Seq[String] = results.flatMap { - case (name, Success(r1)) => - r1 match - case Success(r2) => - r2 match - case Left(value) => - Option(s"$name: $value") - case Right(_) => - None - case Failure(e2) => - Option(s"$name: ERROR: ${Option(e2.getMessage).getOrElse(e2.getClass.getName)}") - + case (name, Success(r)) => + r match + case None => Option(s"$name: Cancelled") + case Some(_) => None case (name, Failure(e)) => Option(s"$name: ERROR: ${Option(e.getMessage).getOrElse(e.getClass.getName)}") } - showFinalSummary(counts, errorDetails, Option(progressStatus.window)) + // Flatten the results, keep task name + val completedResults: Seq[TaskResult[T]] = results.flatMap { (name, t) => + for ov <- t.toOption; v <- ov yield TaskResult(name, v) + } + assert(completedResults.length == counts.successful.toInt) + + showFinalSummary(counts, errorDetails, Option(progressStatus.window)) // // results.foreach { // case Success(r) => @@ -152,7 +153,7 @@ abstract class BatchRunnerProgressHelper[T]( // println(s"ERROR : ${Option(e.getMessage).getOrElse(e.getClass.getName)}") // e.printStackTrace() // } - + completedResults catch case t: Throwable => t.printStackTrace() @@ -161,10 +162,10 @@ abstract class BatchRunnerProgressHelper[T]( onFX { Option(progressStatus).foreach(_.close()) } - + end try end processItems - private def showFinalSummary(counts: CountSummary, errorDetails: Seq[String], parentWindow: Option[Window]): Unit = { + private def showFinalSummary(counts: CountSummary, errorDetails: Seq[String], parentWindow: Option[Window]): Unit = import scalafx.Includes.* import scalafx.scene.control.Alert.AlertType @@ -210,12 +211,10 @@ abstract class BatchRunnerProgressHelper[T]( maxHeight = Double.MaxValue vgrow = Priority.Always hgrow = Priority.Always - val expContent = new GridPane: maxWidth = Double.MaxValue add(label, 0, 0) add(textArea, 0, 1) - Some(expContent) val alertType = if noErrors then AlertType.Information else AlertType.Warning @@ -234,11 +233,12 @@ abstract class BatchRunnerProgressHelper[T]( }.showAndWait() } + end showFinalSummary - } - - def createSampleTasks(): Seq[ItemTask[T]] + def createTasks(): Seq[ItemTask[T]] private case class CountSummary(total: String, successful: String, failed: String, cancelled: String) +end BatchRunnerWithProgress -end BatchRunnerProgressHelper +object BatchRunnerWithProgress: + case class TaskResult[T](taskName:String, result:T) diff --git a/scalafx-extras/src/main/scala-3/org/scalafx/extras/batch/ParallelBatchRunner.scala b/scalafx-extras/src/main/scala-3/org/scalafx/extras/batch/ParallelBatchRunner.scala index a554368..6911d73 100644 --- a/scalafx-extras/src/main/scala-3/org/scalafx/extras/batch/ParallelBatchRunner.scala +++ b/scalafx-extras/src/main/scala-3/org/scalafx/extras/batch/ParallelBatchRunner.scala @@ -37,43 +37,49 @@ class ParallelBatchRunner[T, I <: ItemTask[T]]( protected val itemTasks: Seq[I], protected val progressUpdater: BatchRunner.ProgressUpdater, protected val useParallelProcessing: Boolean -) extends BatchRunner[T, I] { - private val executor = { +) extends BatchRunner[T, I]: + + // TODO: should it be just `BatchRunner` since parallel execution is an optional? + + private val executor = val processors: Int = Runtime.getRuntime.availableProcessors val processorsToUse = if useParallelProcessing then Math.max(1, processors - 1) else 1 -// println(s"Using $processorsToUse processors") + // println(s"Using $processorsToUse processors") Executors.newFixedThreadPool(processorsToUse).asInstanceOf[ThreadPoolExecutor] - } private val total = itemTasks.length - enum TaskState: + private enum TaskState: case NotStarted, Running, Succeeded, Failed, Cancelled - private class TaskHelper[T, I <: ItemTask[T]](val itemTask: I) extends Callable[Try[Either[String, T]]] { + private class TaskHelper(val itemTask: I) extends Callable[Try[Option[T]]]: - // TODO: redesign return type to be a 3-state `Try`: `Success`, `Failure`, `Cancelled` + // TODO: redesign return type to be a 3-state `Try`: + // `Success`, `Failure`, `Cancelled` or `Result`, `Error`, `Cancelled` private val _state = new AtomicReference[TaskState](TaskState.NotStarted) - def state: TaskState = _state.get() + private def state: TaskState = _state.get() private def state_=(v: TaskState): Unit = _state.set(v) /** - * @return `Success` when finished without exception or `Failure` with the exception. - * If a task completes a run, the `Success` will contain `Either.Right` with the result. - * If a task was canceled and did not run, `Success` will contain `Either.Left`. + * @return `Success` when ended without exception or `Failure` with the exception. + * The value wrapped by `Success` will be the result computed by the task. + * If the task is able to complete and compute a result, it will return a computed value (non-empty Option). + * If the task is canceled, it will return `None` (`Success(None)`) + * If the task failed, it will return `Failure`. */ - override def call(): Try[Either[String, T]] = { + override def call(): Try[Option[T]] = if isCanceled then -// println(s"Task Cancelled: ${itemTask.name}") + // println(s"Task Cancelled: ${itemTask.name}") state = TaskState.Cancelled incrementCancelled() - Success(Left("Cancelled")) + // Mark cancellation with `None` + Success(None) else - try { + try state = TaskState.Running incrementRunning() @@ -82,16 +88,14 @@ class ParallelBatchRunner[T, I <: ItemTask[T]]( state = TaskState.Succeeded incrementSucceeded() - Success(Right(result)) - } catch { + Success(Option(result)) + catch case t: Throwable => state = TaskState.Failed incrementFailed() Failure(t) - } - } - } + end TaskHelper private val _runningCount = new AtomicLong(0) private val _successfulCount = new AtomicLong(0) @@ -100,34 +104,31 @@ class ParallelBatchRunner[T, I <: ItemTask[T]]( private val _canceledCount = new AtomicLong(0) private val _cancelFlag = new AtomicBoolean(false) - private def incrementRunning(): Unit = { + private def incrementRunning(): Unit = _runningCount.incrementAndGet() updateState() - } - private def incrementSucceeded(): Unit = { + private def incrementSucceeded(): Unit = _runningCount.decrementAndGet() _successfulCount.incrementAndGet() _executedCount.incrementAndGet() updateState() - } - private def incrementCancelled(): Unit = { + private def incrementCancelled(): Unit = _canceledCount.incrementAndGet() _executedCount.incrementAndGet() updateState() - } - private def incrementFailed(): Unit = { + private def incrementFailed(): Unit = _runningCount.decrementAndGet() _failedCount.incrementAndGet() _executedCount.incrementAndGet() updateState() - } - private def updateState(): Unit = { + private def updateState(): Unit = // TODO: Update state on a separate thread to avoid blocking by the callback `progressUpdate` of current thread + // TODO: `perc` and `message` are simple derived values, do we need pass them explicitly? val perc = _executedCount.get().toDouble / total.toDouble * 100 progressUpdater.update( @@ -141,7 +142,6 @@ class ParallelBatchRunner[T, I <: ItemTask[T]]( perc = perc, message = s"${_executedCount.get()}/$total" ) - } def runningCount: Long = _runningCount.get() @@ -158,42 +158,39 @@ class ParallelBatchRunner[T, I <: ItemTask[T]]( /** * Cancel execution. */ - def cancel(): Unit = { - + def cancel(): Unit = // Prevent new tasks from tanning computations _cancelFlag.set(true) // TODO: set cancel flag for each itemTask // itemTasks.foreach(i => i.name) // updateState() - } /** - * @return results returned by each task or error + * @return results returned by each task or error. + * The first part of a tuple is task name, the second part is the result of processing. */ - def execute(): Seq[(String, Try[Try[Either[String, T]]])] = { - val batchTasks: Seq[TaskHelper[T, I]] = itemTasks.map(item => new TaskHelper(item)) + def execute(): Seq[(String, Try[Option[T]])] = + val batchTasks: Seq[TaskHelper] = itemTasks.map(item => new TaskHelper(item)) // val futures = batchTasks.map { t => executor.submit(t).asInstanceOf[JFuture[T]] } // Submit tasks - val namedFutures: Seq[(String, JFuture[Try[Either[String, T]]])] = + val namedFutures: Seq[(String, JFuture[Try[Option[T]]])] = batchTasks.map(t => (t.itemTask.name, executor.submit(t))) // _futures = Option(futures) // Mark for executor shutdown when all tasks finished executor.shutdown() - while executor.getActiveCount > 0 do { + while executor.getActiveCount > 0 do Thread.sleep(100) - } // println(s"Executor getTaskCount : ${executor.getTaskCount}") // println(s"Executor getActiveCount : ${executor.getActiveCount}") // println(s"Executor getCompletedTaskCount: ${executor.getCompletedTaskCount}") - val result = namedFutures.map((name, f) => (name, Try(f.get()))) -// println("Completed waiting for the futures") + val result = namedFutures.map((name, f) => (name, Try(f.get()).flatten)) + // println("Completed waiting for the futures") result - } -} +end ParallelBatchRunner diff --git a/scalafx-extras/src/main/scala-3/org/scalafx/extras/progress_dialog/ProgressStatusDialog.scala b/scalafx-extras/src/main/scala-3/org/scalafx/extras/progress_dialog/ProgressStatusDialog.scala index d1dcde9..63b483d 100644 --- a/scalafx-extras/src/main/scala-3/org/scalafx/extras/progress_dialog/ProgressStatusDialog.scala +++ b/scalafx-extras/src/main/scala-3/org/scalafx/extras/progress_dialog/ProgressStatusDialog.scala @@ -41,7 +41,7 @@ import scalafx.stage.{Stage, Window} import java.time.Duration -class ProgressStatusDialog(dialogTitle: String, parentWindow: Option[Window]) { +class ProgressStatusDialog(dialogTitle: String, parentWindow: Option[Window]): private val elapsedTimeService = new ElapsedTimeService() private val progressStatus = new ProgressStatus() @@ -51,28 +51,25 @@ class ProgressStatusDialog(dialogTitle: String, parentWindow: Option[Window]) { val abortFlag: BooleanProperty = BooleanProperty(false) - def updateETA(): Unit = { - val strVal = { + private def updateETA(): Unit = + val strVal = val progress = progressStatus.model.progress.value - if progress <= 0 then { + if progress <= 0 then "?" - } else { - // TODO: prevent jittering of estimate when progress value changes, + else + // TODO: prevent jitter of estimate when progress value changes, // compute running average of last predictions or something... val et = elapsedTimeService.elapsedTime.value val eta: Long = (et * (1 - progress) / progress).ceil.toLong formatDuration(Duration.ofMillis(eta)) - } - } progressStatus.model.etaTimeText.value = strVal - } elapsedTimeService.elapsedTime.onChange { (_, _, newValue) => progressStatus.model.elapsedTimeText.value = formatDuration(Duration.ofMillis(newValue.longValue())) updateETA() } - private val abortButton = new Button { + private val abortButton = new Button: text = "Abort batch processing" padding = Insets(7) margin = Insets(7) @@ -88,49 +85,43 @@ class ProgressStatusDialog(dialogTitle: String, parentWindow: Option[Window]) { } disable <== abortFlag alignmentInParent = Pos.Center - } - private val dialog: Stage = new Stage { + private val dialog: Stage = new Stage: initOwner(parentWindow.orNull) parentWindow.foreach { w => - w.delegate match { + w.delegate match case s: javafx.stage.Stage => icons ++= s.icons case x => throw new Exception(s"Invalid parent window delegate: $x") - } } title = dialogTitle resizable = false - scene = new Scene { - root = new BorderPane { + scene = new Scene: + root = new BorderPane: padding = Insets(14) center = progressStatus.view bottom = abortButton - } parentWindow.foreach(w => stylesheets = w.scene().stylesheets) - } - onShown = _ => { + onShown = _ => // TODO: prevent double initialization elapsedTimeService.doStart() - } - onCloseRequest = e => { + onCloseRequest = e => abortFlag.value = true - // Do not allow to close the window + // Do not allow closing the window e.consume() - } - } - private def formatDuration(duration: Duration): String = { + private def formatDuration(duration: Duration): String = val seconds = duration.getSeconds val absSeconds = Math.abs(seconds) - val positive = String.format("%d:%02d:%02d", absSeconds / 3600, (absSeconds % 3600) / 60, absSeconds % 60) - if seconds < 0 then "-" + positive - else positive - } + val positive = f"${absSeconds / 3600}%d:${absSeconds % 3600 / 60}%02d:${absSeconds % 60}%02d" + if seconds < 0 then + "-" + positive + else + positive - private class ElapsedTimeService extends jfxc.ScheduledService[Long] { + private class ElapsedTimeService extends jfxc.ScheduledService[Long]: private var startTime: Long = _ @@ -141,21 +132,18 @@ class ProgressStatusDialog(dialogTitle: String, parentWindow: Option[Window]) { this.period = 250.ms - override def createTask(): jfxc.Task[Long] = () => { + override def createTask(): jfxc.Task[Long] = () => val ct = System.currentTimeMillis() val et = ct - startTime onFX { _elapsedTime.value = et } et - } - def doStart(): Unit = { + def doStart(): Unit = this.restart() startTime = System.currentTimeMillis() onFX { _elapsedTime.value = 0 } - } - } def window: Window = dialog @@ -173,12 +161,10 @@ class ProgressStatusDialog(dialogTitle: String, parentWindow: Option[Window]) { def cancelledCount: StringProperty = progressStatus.model.cancelledCountText - def close(): Unit = { + def close(): Unit = elapsedTimeService.cancel() dialog.close() - } - def show(): Unit = { + def show(): Unit = dialog.show() - } -} +end ProgressStatusDialog diff --git a/scalafx-extras/src/main/scala-3/org/scalafx/extras/progress_dialog/impl/ProgressStatusModel.scala b/scalafx-extras/src/main/scala-3/org/scalafx/extras/progress_dialog/impl/ProgressStatusModel.scala index 1d0cc52..f7fbc94 100644 --- a/scalafx-extras/src/main/scala-3/org/scalafx/extras/progress_dialog/impl/ProgressStatusModel.scala +++ b/scalafx-extras/src/main/scala-3/org/scalafx/extras/progress_dialog/impl/ProgressStatusModel.scala @@ -30,7 +30,7 @@ package org.scalafx.extras.progress_dialog.impl import org.scalafx.extras.mvcfx.ModelFX import scalafx.beans.property.{DoubleProperty, StringProperty} -class ProgressStatusModel extends ModelFX { +class ProgressStatusModel extends ModelFX: val statusText = new StringProperty() val progress = new DoubleProperty() @@ -42,4 +42,3 @@ class ProgressStatusModel extends ModelFX { val successfulCountText = new StringProperty() val failedCountText = new StringProperty() val cancelledCountText = new StringProperty() -}