From 111ca8df33164c31f52a0c1b6fb35baeb47f2555 Mon Sep 17 00:00:00 2001 From: Alex Archambault Date: Thu, 15 Jun 2023 15:39:38 +0200 Subject: [PATCH 1/5] Make output look nicer Printing messages as HTML, with sections, when possible --- .../NotebookSparkSessionBuilder.scala | 6 +++ .../AmmoniteSparkSessionBuilder.scala | 43 +++++++++++++------ 2 files changed, 36 insertions(+), 13 deletions(-) diff --git a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/NotebookSparkSessionBuilder.scala b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/NotebookSparkSessionBuilder.scala index 00cf47d..801c7c6 100644 --- a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/NotebookSparkSessionBuilder.scala +++ b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/NotebookSparkSessionBuilder.scala @@ -20,6 +20,12 @@ class NotebookSparkSessionBuilder(implicit commHandler: CommHandler ) extends AmmoniteSparkSessionBuilder { + override def printLine(line: String, htmlLine: String = null): Unit = + if (htmlLine == null) + publish.html(line + System.lineSeparator()) + else + publish.html(htmlLine) + private var progress0 = true private var keep0 = true diff --git a/modules/core/src/main/scala/org/apache/spark/sql/ammonitesparkinternals/AmmoniteSparkSessionBuilder.scala b/modules/core/src/main/scala/org/apache/spark/sql/ammonitesparkinternals/AmmoniteSparkSessionBuilder.scala index cb22ae6..185b46f 100644 --- a/modules/core/src/main/scala/org/apache/spark/sql/ammonitesparkinternals/AmmoniteSparkSessionBuilder.scala +++ b/modules/core/src/main/scala/org/apache/spark/sql/ammonitesparkinternals/AmmoniteSparkSessionBuilder.scala @@ -137,6 +137,9 @@ class AmmoniteSparkSessionBuilder(implicit import AmmoniteSparkSessionBuilder.normalize + def printLine(line: String, htmlLine: String = null): Unit = + println(line) + private val options0: scala.collection.Map[String, String] = { def fieldVia(name: String): Option[scala.collection.mutable.HashMap[String, String]] = @@ -153,7 +156,10 @@ class AmmoniteSparkSessionBuilder(implicit fieldVia("org$apache$spark$sql$SparkSession$Builder$$options") .orElse(fieldVia("options")) .getOrElse { - println("Warning: can't read SparkSession Builder options (options field not found)") + printLine( + "Warning: can't read SparkSession Builder options (options field not found)", + "Warning: can't read SparkSession Builder options (options field not found)" + ) Map.empty[String, String] } } @@ -164,7 +170,10 @@ class AmmoniteSparkSessionBuilder(implicit envVar <- AmmoniteSparkSessionBuilder.confEnvVars path <- sys.env.get(envVar) } { - println(s"Loading spark conf from ${AmmoniteSparkSessionBuilder.prettyDir(path)}") + printLine( + s"Loading spark conf from ${AmmoniteSparkSessionBuilder.prettyDir(path)}", + s"Loading spark conf from ${AmmoniteSparkSessionBuilder.prettyDir(path)}" + ) loadConf(path) } @@ -261,7 +270,10 @@ class AmmoniteSparkSessionBuilder(implicit deps = ("spark-yarn", SparkDependencies.sparkYarnDependency) :: deps if (deps.nonEmpty) { - println(s"Loading ${deps.map(_._1).mkString(", ")}") + printLine( + s"Loading ${deps.map(_._1).mkString(", ")}", + s"Loading ${deps.map("" + _._1 + "").mkString(", ")}" + ) interpApi.load.ivy(deps.map(_._2): _*) } } @@ -321,7 +333,7 @@ class AmmoniteSparkSessionBuilder(implicit val (sparkJars, sparkDistClassPath) = sys.env.get("SPARK_HOME") match { case None => - println("Getting spark JARs") + printLine("Getting spark JARs") val sparkJars0 = SparkDependencies.sparkJars(interpApi.repositories(), interpApi.resolutionHooks, Nil) (sparkJars0, Nil) @@ -416,13 +428,16 @@ class AmmoniteSparkSessionBuilder(implicit hadoopConfDirOpt match { case None => - println( + printLine( "Warning: core-site.xml not found in the classpath, and no hadoop conf found via HADOOP_CONF_DIR, " + - "YARN_CONF_DIR, or at /etc/hadoop/conf" + "YARN_CONF_DIR, or at /etc/hadoop/conf", + "Warning: core-site.xml not found in the classpath, and no hadoop conf found via HADOOP_CONF_DIR, " + + "YARN_CONF_DIR, or at /etc/hadoop/conf" ) case Some(dir) => - println( - s"Adding Hadoop conf dir ${AmmoniteSparkSessionBuilder.prettyDir(dir)} to classpath" + printLine( + s"Adding Hadoop conf dir ${AmmoniteSparkSessionBuilder.prettyDir(dir)} to classpath", + s"Adding Hadoop conf dir ${AmmoniteSparkSessionBuilder.prettyDir(dir)} to classpath" ) interpApi.load.cp(os.Path(dir)) } @@ -434,18 +449,20 @@ class AmmoniteSparkSessionBuilder(implicit hiveConfDirOpt match { case None => - println( - "Warning: hive-site.xml not found in the classpath, and no Hive conf found via HIVE_CONF_DIR" + printLine( + "Warning: hive-site.xml not found in the classpath, and no Hive conf found via HIVE_CONF_DIR", + "Warning: hive-site.xml not found in the classpath, and no Hive conf found via HIVE_CONF_DIR" ) case Some(dir) => - println( - s"Adding Hive conf dir ${AmmoniteSparkSessionBuilder.prettyDir(dir)} to classpath" + printLine( + s"Adding Hive conf dir ${AmmoniteSparkSessionBuilder.prettyDir(dir)} to classpath", + s"Adding Hive conf dir ${AmmoniteSparkSessionBuilder.prettyDir(dir)} to classpath" ) interpApi.load.cp(os.Path(dir)) } } - println("Creating SparkSession") + printLine("Creating SparkSession") val session = super.getOrCreate() if (interpApi != null) From d845aac168c0acba5051fe64ac3850970dcffd17 Mon Sep 17 00:00:00 2001 From: Alex Archambault Date: Thu, 15 Jun 2023 15:40:49 +0200 Subject: [PATCH 2/5] Disable Spark's own console progress bars by default from notebooks --- .../NotebookSparkSessionBuilder.scala | 2 + .../AmmoniteSparkSessionBuilder.scala | 46 ++++++++++++++++--- 2 files changed, 42 insertions(+), 6 deletions(-) diff --git a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/NotebookSparkSessionBuilder.scala b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/NotebookSparkSessionBuilder.scala index 801c7c6..b591d52 100644 --- a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/NotebookSparkSessionBuilder.scala +++ b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/NotebookSparkSessionBuilder.scala @@ -26,6 +26,8 @@ class NotebookSparkSessionBuilder(implicit else publish.html(htmlLine) + disableProgressBars(true) + private var progress0 = true private var keep0 = true diff --git a/modules/core/src/main/scala/org/apache/spark/sql/ammonitesparkinternals/AmmoniteSparkSessionBuilder.scala b/modules/core/src/main/scala/org/apache/spark/sql/ammonitesparkinternals/AmmoniteSparkSessionBuilder.scala index 185b46f..28376ac 100644 --- a/modules/core/src/main/scala/org/apache/spark/sql/ammonitesparkinternals/AmmoniteSparkSessionBuilder.scala +++ b/modules/core/src/main/scala/org/apache/spark/sql/ammonitesparkinternals/AmmoniteSparkSessionBuilder.scala @@ -86,23 +86,46 @@ object AmmoniteSparkSessionBuilder { javaDirs.exists(path.startsWith) } - def forceProgressBars(sc: SparkContext): Boolean = - sc.progressBar.nonEmpty || { + def trySetProgressBars(sc: SparkContext, value: Option[ConsoleProgressBar]): Boolean = { + def attempt(methodName: String): Boolean = try { val method = classOf[org.apache.spark.SparkContext] .getDeclaredMethod( - "org$apache$spark$SparkContext$$_progressBar_$eq", + methodName, classOf[Option[Any]] ) method.setAccessible(true) - method.invoke(sc, Some(new ConsoleProgressBar(sc))) + method.invoke(sc, value) true } catch { case _: NoSuchMethodException => false } - } + + attempt("org$apache$spark$SparkContext$$_progressBar_$eq") || attempt("_progressBar_$eq") + } + + def tryGetProgressBars(sc: SparkContext): Option[Option[ConsoleProgressBar]] = { + def attempt(methodName: String): Option[Option[ConsoleProgressBar]] = + try { + val method = classOf[org.apache.spark.SparkContext].getDeclaredMethod(methodName) + method.setAccessible(true) + Some(method.invoke(sc).asInstanceOf[Option[ConsoleProgressBar]]) + } + catch { + case _: NoSuchMethodException => + None + } + + attempt("org$apache$spark$SparkContext$$_progressBar").orElse(attempt("_progressBar")) + } + + def forceProgressBars(sc: SparkContext): Boolean = + sc.progressBar.nonEmpty || trySetProgressBars(sc, Some(new ConsoleProgressBar(sc))) + + def disableProgressBars(sc: SparkContext): Boolean = + sc.progressBar.isEmpty || trySetProgressBars(sc, None) def userAddedClassPath(cl: ClassLoader): Stream[Seq[URL]] = if (cl == null) Stream.empty @@ -202,13 +225,19 @@ class AmmoniteSparkSessionBuilder(implicit this } - private var forceProgressBars0 = false + private var forceProgressBars0 = false + private var disableProgressBars0 = false def progressBars(force: Boolean = true): this.type = { forceProgressBars0 = force this } + def disableProgressBars(disable: Boolean = true): this.type = { + disableProgressBars0 = disable + this + } + private var sendSparkYarnJars0 = true private var sendSparkJars0 = true private var ignoreJars0 = Set.empty[URI] @@ -462,6 +491,9 @@ class AmmoniteSparkSessionBuilder(implicit } } + if (disableProgressBars0) + config("spark.ui.showConsoleProgress", false) + printLine("Creating SparkSession") val session = super.getOrCreate() @@ -484,6 +516,8 @@ class AmmoniteSparkSessionBuilder(implicit if (forceProgressBars0) AmmoniteSparkSessionBuilder.forceProgressBars(session.sparkContext) + if (disableProgressBars0) + AmmoniteSparkSessionBuilder.disableProgressBars(session.sparkContext) for (api <- Option(replApi)) api.sess.frames.head.addHook { From 118d7c48b9a79362fe92a1201d5bb7b114e4f8d8 Mon Sep 17 00:00:00 2001 From: Alex Archambault Date: Thu, 15 Jun 2023 15:41:22 +0200 Subject: [PATCH 3/5] Don't keep Spark progress bars on screen by default --- .../sql/almondinternals/NotebookSparkSessionBuilder.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/NotebookSparkSessionBuilder.scala b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/NotebookSparkSessionBuilder.scala index b591d52..ec64f9a 100644 --- a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/NotebookSparkSessionBuilder.scala +++ b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/NotebookSparkSessionBuilder.scala @@ -29,11 +29,11 @@ class NotebookSparkSessionBuilder(implicit disableProgressBars(true) private var progress0 = true - private var keep0 = true + private var keep0 = false private var logsInDeveloperConsoleOpt = Option.empty[Boolean] - def progress(enable: Boolean = true, keep: Boolean = true): this.type = { + def progress(enable: Boolean = true, keep: Boolean = false): this.type = { progress0 = enable keep0 = keep this From 08bde28eee427ed9f7733a24de50517e2514029c Mon Sep 17 00:00:00 2001 From: Alex Archambault Date: Thu, 15 Jun 2023 15:42:47 +0200 Subject: [PATCH 4/5] Clear Spark progress bars from a scheduler Rather than blocking at the end of the job --- .../spark/sql/almondinternals/StageElem.scala | 44 +++++++++++++++++-- 1 file changed, 40 insertions(+), 4 deletions(-) diff --git a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/StageElem.scala b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/StageElem.scala index c5cafef..b89d95a 100644 --- a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/StageElem.scala +++ b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/StageElem.scala @@ -1,9 +1,13 @@ package org.apache.spark.sql.almondinternals +import almond.interpreter.api.OutputHandler + +import java.util.concurrent.ThreadFactory +import java.util.concurrent.ScheduledThreadPoolExecutor import java.util.UUID import java.util.concurrent.atomic.AtomicInteger -import almond.interpreter.api.OutputHandler +import scala.concurrent.duration.DurationInt final class StageElem(stageId: Int, numTasks: Int, keep: Boolean, name: String, details: String) { @@ -99,10 +103,42 @@ final class StageElem(stageId: Int, numTasks: Int, keep: Boolean, name: String, ) if (stageDone0 && !keep) { - Thread.sleep(3000) // Allow the user to see the completed bar before wiping it - publish.updateHtml("", id = titleDisplayId) - publish.updateHtml("", id = displayId) + // Allow the user to see the completed bar before wiping it + val delay = 3.seconds + val runnable: Runnable = + () => + try { + publish.updateHtml("", id = titleDisplayId) + publish.updateHtml("", id = displayId) + } + catch { + case t: Throwable => + System.err.println("Error while updating message") + t.printStackTrace(System.err) + } + StageElem.scheduler.schedule(runnable, delay.length, delay.unit) } } } + +object StageElem { + private def keepAlive = 30.seconds + lazy val scheduler = { + val executor = new ScheduledThreadPoolExecutor( + 1, + new ThreadFactory { + val count = new AtomicInteger + override def newThread(r: Runnable): Thread = { + val name = s"almond-spark-progress-${count.getAndIncrement()}" + val t = new Thread(r, name) + t.setDaemon(true) + t + } + } + ) + executor.setKeepAliveTime(keepAlive.length, keepAlive.unit) + executor + } + +} From 9beda63b307fbd8550c353c82718bbf121d935e8 Mon Sep 17 00:00:00 2001 From: Alex Archambault Date: Thu, 15 Jun 2023 16:06:40 +0200 Subject: [PATCH 5/5] Nicer looking Spark progress reporting by default No more progress bars, but simple text (updated as jobs progress) Call .progress(useBars = true) to keep the old progress bars --- .../NotebookSparkSessionBuilder.scala | 10 +- .../ProgressSparkListener.scala | 5 +- .../spark/sql/almondinternals/StageElem.scala | 121 +++++++++--------- 3 files changed, 75 insertions(+), 61 deletions(-) diff --git a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/NotebookSparkSessionBuilder.scala b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/NotebookSparkSessionBuilder.scala index ec64f9a..bc93ebd 100644 --- a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/NotebookSparkSessionBuilder.scala +++ b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/NotebookSparkSessionBuilder.scala @@ -30,12 +30,18 @@ class NotebookSparkSessionBuilder(implicit private var progress0 = true private var keep0 = false + private var useBars0 = false private var logsInDeveloperConsoleOpt = Option.empty[Boolean] - def progress(enable: Boolean = true, keep: Boolean = false): this.type = { + def progress( + enable: Boolean = true, + keep: Boolean = false, + useBars: Boolean = false + ): this.type = { progress0 = enable keep0 = keep + useBars0 = useBars this } @@ -81,7 +87,7 @@ class NotebookSparkSessionBuilder(implicit html(s"""Spark UI""") session.sparkContext.addSparkListener( - new ProgressSparkListener(session, keep0, progress0) + new ProgressSparkListener(session, keep0, progress0, useBars0) ) session diff --git a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/ProgressSparkListener.scala b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/ProgressSparkListener.scala index 130c036..f01ccae 100644 --- a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/ProgressSparkListener.scala +++ b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/ProgressSparkListener.scala @@ -20,7 +20,8 @@ import scala.util.Try final class ProgressSparkListener( session: SparkSession, keep: Boolean, - progress: Boolean + progress: Boolean, + useBars: Boolean )(implicit publish: OutputHandler, commHandler: CommHandler @@ -58,7 +59,7 @@ final class ProgressSparkListener( def newStageElem(stageId: Int, numTasks: Int, name: String, details: String): StageElem = { if (!elems.contains(stageId)) - elems.putIfAbsent(stageId, new StageElem(stageId, numTasks, keep, name, details)) + elems.putIfAbsent(stageId, new StageElem(stageId, numTasks, keep, name, details, useBars)) elems.get(stageId) } diff --git a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/StageElem.scala b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/StageElem.scala index b89d95a..37ecc59 100644 --- a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/StageElem.scala +++ b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/StageElem.scala @@ -9,7 +9,21 @@ import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.duration.DurationInt -final class StageElem(stageId: Int, numTasks: Int, keep: Boolean, name: String, details: String) { +final class StageElem( + stageId: Int, + numTasks: Int, + keep: Boolean, + name: String, + details: String, + useBars: Boolean +) { + + private val htmlName = { + val sep = " at " + val idx = name.indexOf(sep) + if (idx < 0) s"$name" + else s"${name.take(idx)} at ${name.drop(idx + sep.length)}" + } val displayId = s"stage-info-${UUID.randomUUID()}" val titleDisplayId = s"$displayId-title" @@ -39,68 +53,60 @@ final class StageElem(stageId: Int, numTasks: Int, keep: Boolean, name: String, def init(cancelStageCommTargetName: String, sendInitCode: Boolean)(implicit publish: OutputHandler - ): Unit = { - - if (sendInitCode) + ): Unit = + if (useBars) publish.html( - s""" - """.stripMargin + s"""
+ |
+ | 0 / $numTasks + |
+ |
+ |""".stripMargin, + id = displayId + ) + else + publish.html( + s"
$htmlName
", + id = displayId ) - - publish.html( - s"""
- | $name - | (kill) - |
- |
- |""".stripMargin, - id = titleDisplayId - ) - //
above seems required put both divs on different lines in nteract - publish.html( - s"""
- |
- | 0 / $numTasks - |
- |
- |""".stripMargin, - id = displayId - ) - } def update()(implicit publish: OutputHandler): Unit = { val doneTasks0 = doneTasks.get() val startedTasks0 = startedTasks.get() - - val diff = startedTasks0 - doneTasks0 - - val donePct = math.round(100.0 * doneTasks0.toDouble / numTasks).toInt - val startedPct = math.round(100.0 * (startedTasks0 - doneTasks0).toDouble / numTasks).toInt - - publish.updateHtml( - s"""
- |
- | $doneTasks0${if (diff == 0) "" else s" + $diff"} / $numTasks - |
- |
- |
- |""".stripMargin, - id = displayId - ) + val onGoingCount = startedTasks0 - doneTasks0 + + val donePct = math.round(100.0 * doneTasks0.toDouble / numTasks).toInt + + if (useBars) { + val diff = startedTasks0 - doneTasks0 + val startedPct = math.round(100.0 * (startedTasks0 - doneTasks0).toDouble / numTasks).toInt + + publish.updateHtml( + s"""
+ |
+ | $doneTasks0${if (diff == 0) "" else s" + $diff"} / $numTasks + |
+ |
+ |
+ |""".stripMargin, + id = displayId + ) + } + else { + val taskOrTasks = if (numTasks <= 1) "task" else "tasks" + val onGoingMessage = + if (onGoingCount <= 0) "" + else s", $onGoingCount on-going" + val message = + if (stageDone0) s"
$htmlName (done)
" + else s"
$htmlName ($donePct% of $numTasks $taskOrTasks$onGoingMessage)
" + publish.updateHtml(message, id = displayId) + } if (stageDone0 && !keep) { // Allow the user to see the completed bar before wiping it @@ -108,8 +114,9 @@ final class StageElem(stageId: Int, numTasks: Int, keep: Boolean, name: String, val runnable: Runnable = () => try { - publish.updateHtml("", id = titleDisplayId) publish.updateHtml("", id = displayId) + if (useBars) + publish.updateHtml("", id = titleDisplayId) } catch { case t: Throwable =>