Skip to content

Commit

Permalink
Nicer looking Spark progress reporting by default
Browse files Browse the repository at this point in the history
No more progress bars, but simple text (updated as jobs progress)

Call .progress(useBars = true) to keep the old progress bars
  • Loading branch information
alexarchambault committed Jun 15, 2023
1 parent 08bde28 commit 9beda63
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,18 @@ class NotebookSparkSessionBuilder(implicit

private var progress0 = true
private var keep0 = false
private var useBars0 = false

private var logsInDeveloperConsoleOpt = Option.empty[Boolean]

def progress(enable: Boolean = true, keep: Boolean = false): this.type = {
def progress(
enable: Boolean = true,
keep: Boolean = false,
useBars: Boolean = false
): this.type = {
progress0 = enable
keep0 = keep
useBars0 = useBars
this
}

Expand Down Expand Up @@ -81,7 +87,7 @@ class NotebookSparkSessionBuilder(implicit
html(s"""<a target="_blank" href="$url">Spark UI</a>""")

session.sparkContext.addSparkListener(
new ProgressSparkListener(session, keep0, progress0)
new ProgressSparkListener(session, keep0, progress0, useBars0)
)

session
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import scala.util.Try
final class ProgressSparkListener(
session: SparkSession,
keep: Boolean,
progress: Boolean
progress: Boolean,
useBars: Boolean
)(implicit
publish: OutputHandler,
commHandler: CommHandler
Expand Down Expand Up @@ -58,7 +59,7 @@ final class ProgressSparkListener(
def newStageElem(stageId: Int, numTasks: Int, name: String, details: String): StageElem = {

if (!elems.contains(stageId))
elems.putIfAbsent(stageId, new StageElem(stageId, numTasks, keep, name, details))
elems.putIfAbsent(stageId, new StageElem(stageId, numTasks, keep, name, details, useBars))

elems.get(stageId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,21 @@ import java.util.concurrent.atomic.AtomicInteger

import scala.concurrent.duration.DurationInt

final class StageElem(stageId: Int, numTasks: Int, keep: Boolean, name: String, details: String) {
final class StageElem(
stageId: Int,
numTasks: Int,
keep: Boolean,
name: String,
details: String,
useBars: Boolean
) {

private val htmlName = {
val sep = " at "
val idx = name.indexOf(sep)
if (idx < 0) s"<code>$name</code>"
else s"<code>${name.take(idx)}</code> at <code>${name.drop(idx + sep.length)}</code>"
}

val displayId = s"stage-info-${UUID.randomUUID()}"
val titleDisplayId = s"$displayId-title"
Expand Down Expand Up @@ -39,77 +53,70 @@ final class StageElem(stageId: Int, numTasks: Int, keep: Boolean, name: String,

def init(cancelStageCommTargetName: String, sendInitCode: Boolean)(implicit
publish: OutputHandler
): Unit = {

if (sendInitCode)
): Unit =
if (useBars)
publish.html(
s"""<script>
|var comm = Jupyter.notebook.kernel.comm_manager.new_comm('$cancelStageCommTargetName', {});
|
|function cancelStage(stageId) {
| console.log('Cancelling stage ' + stageId);
| comm.send({ 'stageId': stageId });
|}
|</script>
""".stripMargin
s"""<div class="progress">
| <div class="progress-bar bg-success" role="progressbar" style="width: 0%; ${extraStyle.mkString(
"; "
)}; color: white" aria-valuenow="0" aria-valuemin="0" aria-valuemax="100">
| 0 / $numTasks
| </div>
|</div>
|""".stripMargin,
id = displayId
)
else
publish.html(
s"<div>$htmlName</div>",
id = displayId
)

publish.html(
s"""<div>
| <span style="float: left; ${extraStyle.mkString("; ")}">$name</span>
| <span style="float: right; ${extraStyle.mkString(
"; "
)}"><a href="#" onclick="cancelStage($stageId);">(kill)</a></span>
|</div>
|<br>
|""".stripMargin,
id = titleDisplayId
)
// <br> above seems required put both divs on different lines in nteract
publish.html(
s"""<div class="progress">
| <div class="progress-bar bg-success" role="progressbar" style="width: 0%; ${extraStyle.mkString(
"; "
)}; color: white" aria-valuenow="0" aria-valuemin="0" aria-valuemax="100">
| 0 / $numTasks
| </div>
|</div>
|""".stripMargin,
id = displayId
)
}

def update()(implicit publish: OutputHandler): Unit = {

val doneTasks0 = doneTasks.get()
val startedTasks0 = startedTasks.get()

val diff = startedTasks0 - doneTasks0

val donePct = math.round(100.0 * doneTasks0.toDouble / numTasks).toInt
val startedPct = math.round(100.0 * (startedTasks0 - doneTasks0).toDouble / numTasks).toInt

publish.updateHtml(
s"""<div class="progress">
| <div class="progress-bar" role="progressbar" style="background-color: blue; width: $donePct%; ${extraStyle.mkString(
"; "
)}; color: white" aria-valuenow="$donePct" aria-valuemin="0" aria-valuemax="100">
| $doneTasks0${if (diff == 0) "" else s" + $diff"} / $numTasks
| </div>
| <div class="progress-bar" role="progressbar" style="background-color: red; width: $startedPct%" aria-valuenow="$startedPct" aria-valuemin="0" aria-valuemax="100"></div>
|</div>
|""".stripMargin,
id = displayId
)
val onGoingCount = startedTasks0 - doneTasks0

val donePct = math.round(100.0 * doneTasks0.toDouble / numTasks).toInt

if (useBars) {
val diff = startedTasks0 - doneTasks0
val startedPct = math.round(100.0 * (startedTasks0 - doneTasks0).toDouble / numTasks).toInt

publish.updateHtml(
s"""<div class="progress">
| <div class="progress-bar" role="progressbar" style="background-color: blue; width: $donePct%; ${extraStyle.mkString(
"; "
)}; color: white" aria-valuenow="$donePct" aria-valuemin="0" aria-valuemax="100">
| $doneTasks0${if (diff == 0) "" else s" + $diff"} / $numTasks
| </div>
| <div class="progress-bar" role="progressbar" style="background-color: red; width: $startedPct%" aria-valuenow="$startedPct" aria-valuemin="0" aria-valuemax="100"></div>
|</div>
|""".stripMargin,
id = displayId
)
}
else {
val taskOrTasks = if (numTasks <= 1) "task" else "tasks"
val onGoingMessage =
if (onGoingCount <= 0) ""
else s", $onGoingCount on-going"
val message =
if (stageDone0) s"<div>$htmlName (done)</div>"
else s"<div>$htmlName ($donePct% of $numTasks $taskOrTasks$onGoingMessage)</div>"
publish.updateHtml(message, id = displayId)
}

if (stageDone0 && !keep) {
// Allow the user to see the completed bar before wiping it
val delay = 3.seconds
val runnable: Runnable =
() =>
try {
publish.updateHtml("", id = titleDisplayId)
publish.updateHtml("", id = displayId)
if (useBars)
publish.updateHtml("", id = titleDisplayId)
}
catch {
case t: Throwable =>
Expand Down

0 comments on commit 9beda63

Please sign in to comment.