Skip to content

Commit

Permalink
Merge pull request #322 from alexarchambault/tweak-progress-reporting
Browse files Browse the repository at this point in the history
Tweak progress reporting
  • Loading branch information
alexarchambault committed Jun 15, 2023
2 parents 2200cbe + 9beda63 commit effd405
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,28 @@ class NotebookSparkSessionBuilder(implicit
commHandler: CommHandler
) extends AmmoniteSparkSessionBuilder {

override def printLine(line: String, htmlLine: String = null): Unit =
if (htmlLine == null)
publish.html(line + System.lineSeparator())
else
publish.html(htmlLine)

disableProgressBars(true)

private var progress0 = true
private var keep0 = true
private var keep0 = false
private var useBars0 = false

private var logsInDeveloperConsoleOpt = Option.empty[Boolean]

def progress(enable: Boolean = true, keep: Boolean = true): this.type = {
def progress(
enable: Boolean = true,
keep: Boolean = false,
useBars: Boolean = false
): this.type = {
progress0 = enable
keep0 = keep
useBars0 = useBars
this
}

Expand Down Expand Up @@ -73,7 +87,7 @@ class NotebookSparkSessionBuilder(implicit
html(s"""<a target="_blank" href="$url">Spark UI</a>""")

session.sparkContext.addSparkListener(
new ProgressSparkListener(session, keep0, progress0)
new ProgressSparkListener(session, keep0, progress0, useBars0)
)

session
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import scala.util.Try
final class ProgressSparkListener(
session: SparkSession,
keep: Boolean,
progress: Boolean
progress: Boolean,
useBars: Boolean
)(implicit
publish: OutputHandler,
commHandler: CommHandler
Expand Down Expand Up @@ -58,7 +59,7 @@ final class ProgressSparkListener(
def newStageElem(stageId: Int, numTasks: Int, name: String, details: String): StageElem = {

if (!elems.contains(stageId))
elems.putIfAbsent(stageId, new StageElem(stageId, numTasks, keep, name, details))
elems.putIfAbsent(stageId, new StageElem(stageId, numTasks, keep, name, details, useBars))

elems.get(stageId)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,29 @@
package org.apache.spark.sql.almondinternals

import almond.interpreter.api.OutputHandler

import java.util.concurrent.ThreadFactory
import java.util.concurrent.ScheduledThreadPoolExecutor
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger

import almond.interpreter.api.OutputHandler

final class StageElem(stageId: Int, numTasks: Int, keep: Boolean, name: String, details: String) {
import scala.concurrent.duration.DurationInt

final class StageElem(
stageId: Int,
numTasks: Int,
keep: Boolean,
name: String,
details: String,
useBars: Boolean
) {

private val htmlName = {
val sep = " at "
val idx = name.indexOf(sep)
if (idx < 0) s"<code>$name</code>"
else s"<code>${name.take(idx)}</code> at <code>${name.drop(idx + sep.length)}</code>"
}

val displayId = s"stage-info-${UUID.randomUUID()}"
val titleDisplayId = s"$displayId-title"
Expand Down Expand Up @@ -35,74 +53,99 @@ final class StageElem(stageId: Int, numTasks: Int, keep: Boolean, name: String,

def init(cancelStageCommTargetName: String, sendInitCode: Boolean)(implicit
publish: OutputHandler
): Unit = {

if (sendInitCode)
): Unit =
if (useBars)
publish.html(
s"""<script>
|var comm = Jupyter.notebook.kernel.comm_manager.new_comm('$cancelStageCommTargetName', {});
|
|function cancelStage(stageId) {
| console.log('Cancelling stage ' + stageId);
| comm.send({ 'stageId': stageId });
|}
|</script>
""".stripMargin
s"""<div class="progress">
| <div class="progress-bar bg-success" role="progressbar" style="width: 0%; ${extraStyle.mkString(
"; "
)}; color: white" aria-valuenow="0" aria-valuemin="0" aria-valuemax="100">
| 0 / $numTasks
| </div>
|</div>
|""".stripMargin,
id = displayId
)
else
publish.html(
s"<div>$htmlName</div>",
id = displayId
)

publish.html(
s"""<div>
| <span style="float: left; ${extraStyle.mkString("; ")}">$name</span>
| <span style="float: right; ${extraStyle.mkString(
"; "
)}"><a href="#" onclick="cancelStage($stageId);">(kill)</a></span>
|</div>
|<br>
|""".stripMargin,
id = titleDisplayId
)
// <br> above seems required put both divs on different lines in nteract
publish.html(
s"""<div class="progress">
| <div class="progress-bar bg-success" role="progressbar" style="width: 0%; ${extraStyle.mkString(
"; "
)}; color: white" aria-valuenow="0" aria-valuemin="0" aria-valuemax="100">
| 0 / $numTasks
| </div>
|</div>
|""".stripMargin,
id = displayId
)
}

def update()(implicit publish: OutputHandler): Unit = {

val doneTasks0 = doneTasks.get()
val startedTasks0 = startedTasks.get()

val diff = startedTasks0 - doneTasks0

val donePct = math.round(100.0 * doneTasks0.toDouble / numTasks).toInt
val startedPct = math.round(100.0 * (startedTasks0 - doneTasks0).toDouble / numTasks).toInt

publish.updateHtml(
s"""<div class="progress">
| <div class="progress-bar" role="progressbar" style="background-color: blue; width: $donePct%; ${extraStyle.mkString(
"; "
)}; color: white" aria-valuenow="$donePct" aria-valuemin="0" aria-valuemax="100">
| $doneTasks0${if (diff == 0) "" else s" + $diff"} / $numTasks
| </div>
| <div class="progress-bar" role="progressbar" style="background-color: red; width: $startedPct%" aria-valuenow="$startedPct" aria-valuemin="0" aria-valuemax="100"></div>
|</div>
|""".stripMargin,
id = displayId
)
val onGoingCount = startedTasks0 - doneTasks0

val donePct = math.round(100.0 * doneTasks0.toDouble / numTasks).toInt

if (useBars) {
val diff = startedTasks0 - doneTasks0
val startedPct = math.round(100.0 * (startedTasks0 - doneTasks0).toDouble / numTasks).toInt

publish.updateHtml(
s"""<div class="progress">
| <div class="progress-bar" role="progressbar" style="background-color: blue; width: $donePct%; ${extraStyle.mkString(
"; "
)}; color: white" aria-valuenow="$donePct" aria-valuemin="0" aria-valuemax="100">
| $doneTasks0${if (diff == 0) "" else s" + $diff"} / $numTasks
| </div>
| <div class="progress-bar" role="progressbar" style="background-color: red; width: $startedPct%" aria-valuenow="$startedPct" aria-valuemin="0" aria-valuemax="100"></div>
|</div>
|""".stripMargin,
id = displayId
)
}
else {
val taskOrTasks = if (numTasks <= 1) "task" else "tasks"
val onGoingMessage =
if (onGoingCount <= 0) ""
else s", $onGoingCount on-going"
val message =
if (stageDone0) s"<div>$htmlName (done)</div>"
else s"<div>$htmlName ($donePct% of $numTasks $taskOrTasks$onGoingMessage)</div>"
publish.updateHtml(message, id = displayId)
}

if (stageDone0 && !keep) {
Thread.sleep(3000) // Allow the user to see the completed bar before wiping it
publish.updateHtml("", id = titleDisplayId)
publish.updateHtml("", id = displayId)
// Allow the user to see the completed bar before wiping it
val delay = 3.seconds
val runnable: Runnable =
() =>
try {
publish.updateHtml("", id = displayId)
if (useBars)
publish.updateHtml("", id = titleDisplayId)
}
catch {
case t: Throwable =>
System.err.println("Error while updating message")
t.printStackTrace(System.err)
}
StageElem.scheduler.schedule(runnable, delay.length, delay.unit)
}
}

}

object StageElem {
private def keepAlive = 30.seconds
lazy val scheduler = {
val executor = new ScheduledThreadPoolExecutor(
1,
new ThreadFactory {
val count = new AtomicInteger
override def newThread(r: Runnable): Thread = {
val name = s"almond-spark-progress-${count.getAndIncrement()}"
val t = new Thread(r, name)
t.setDaemon(true)
t
}
}
)
executor.setKeepAliveTime(keepAlive.length, keepAlive.unit)
executor
}

}
Loading

0 comments on commit effd405

Please sign in to comment.