diff --git a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/NotebookSparkSessionBuilder.scala b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/NotebookSparkSessionBuilder.scala
index 00cf47d..bc93ebd 100644
--- a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/NotebookSparkSessionBuilder.scala
+++ b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/NotebookSparkSessionBuilder.scala
@@ -20,14 +20,28 @@ class NotebookSparkSessionBuilder(implicit
commHandler: CommHandler
) extends AmmoniteSparkSessionBuilder {
+ override def printLine(line: String, htmlLine: String = null): Unit =
+ if (htmlLine == null)
+ publish.html(line + System.lineSeparator())
+ else
+ publish.html(htmlLine)
+
+ disableProgressBars(true)
+
private var progress0 = true
- private var keep0 = true
+ private var keep0 = false
+ private var useBars0 = false
private var logsInDeveloperConsoleOpt = Option.empty[Boolean]
- def progress(enable: Boolean = true, keep: Boolean = true): this.type = {
+ def progress(
+ enable: Boolean = true,
+ keep: Boolean = false,
+ useBars: Boolean = false
+ ): this.type = {
progress0 = enable
keep0 = keep
+ useBars0 = useBars
this
}
@@ -73,7 +87,7 @@ class NotebookSparkSessionBuilder(implicit
html(s"""Spark UI""")
session.sparkContext.addSparkListener(
- new ProgressSparkListener(session, keep0, progress0)
+ new ProgressSparkListener(session, keep0, progress0, useBars0)
)
session
diff --git a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/ProgressSparkListener.scala b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/ProgressSparkListener.scala
index 130c036..f01ccae 100644
--- a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/ProgressSparkListener.scala
+++ b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/ProgressSparkListener.scala
@@ -20,7 +20,8 @@ import scala.util.Try
final class ProgressSparkListener(
session: SparkSession,
keep: Boolean,
- progress: Boolean
+ progress: Boolean,
+ useBars: Boolean
)(implicit
publish: OutputHandler,
commHandler: CommHandler
@@ -58,7 +59,7 @@ final class ProgressSparkListener(
def newStageElem(stageId: Int, numTasks: Int, name: String, details: String): StageElem = {
if (!elems.contains(stageId))
- elems.putIfAbsent(stageId, new StageElem(stageId, numTasks, keep, name, details))
+ elems.putIfAbsent(stageId, new StageElem(stageId, numTasks, keep, name, details, useBars))
elems.get(stageId)
}
diff --git a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/StageElem.scala b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/StageElem.scala
index c5cafef..37ecc59 100644
--- a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/StageElem.scala
+++ b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/StageElem.scala
@@ -1,11 +1,29 @@
package org.apache.spark.sql.almondinternals
+import almond.interpreter.api.OutputHandler
+
+import java.util.concurrent.ThreadFactory
+import java.util.concurrent.ScheduledThreadPoolExecutor
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
-import almond.interpreter.api.OutputHandler
-
-final class StageElem(stageId: Int, numTasks: Int, keep: Boolean, name: String, details: String) {
+import scala.concurrent.duration.DurationInt
+
+final class StageElem(
+ stageId: Int,
+ numTasks: Int,
+ keep: Boolean,
+ name: String,
+ details: String,
+ useBars: Boolean
+) {
+
+ private val htmlName = {
+ val sep = " at "
+ val idx = name.indexOf(sep)
+ if (idx < 0) s"$name
"
+ else s"${name.take(idx)}
at ${name.drop(idx + sep.length)}
"
+ }
val displayId = s"stage-info-${UUID.randomUUID()}"
val titleDisplayId = s"$displayId-title"
@@ -35,74 +53,99 @@ final class StageElem(stageId: Int, numTasks: Int, keep: Boolean, name: String,
def init(cancelStageCommTargetName: String, sendInitCode: Boolean)(implicit
publish: OutputHandler
- ): Unit = {
-
- if (sendInitCode)
+ ): Unit =
+ if (useBars)
publish.html(
- s"""
- """.stripMargin
+ s"""
SparkSession
Builder
options (options
field not found)"
+ )
Map.empty[String, String]
}
}
@@ -164,7 +193,10 @@ class AmmoniteSparkSessionBuilder(implicit
envVar <- AmmoniteSparkSessionBuilder.confEnvVars
path <- sys.env.get(envVar)
} {
- println(s"Loading spark conf from ${AmmoniteSparkSessionBuilder.prettyDir(path)}")
+ printLine(
+ s"Loading spark conf from ${AmmoniteSparkSessionBuilder.prettyDir(path)}",
+ s"Loading spark conf from ${AmmoniteSparkSessionBuilder.prettyDir(path)}
"
+ )
loadConf(path)
}
@@ -193,13 +225,19 @@ class AmmoniteSparkSessionBuilder(implicit
this
}
- private var forceProgressBars0 = false
+ private var forceProgressBars0 = false
+ private var disableProgressBars0 = false
def progressBars(force: Boolean = true): this.type = {
forceProgressBars0 = force
this
}
+ def disableProgressBars(disable: Boolean = true): this.type = {
+ disableProgressBars0 = disable
+ this
+ }
+
private var sendSparkYarnJars0 = true
private var sendSparkJars0 = true
private var ignoreJars0 = Set.empty[URI]
@@ -261,7 +299,10 @@ class AmmoniteSparkSessionBuilder(implicit
deps = ("spark-yarn", SparkDependencies.sparkYarnDependency) :: deps
if (deps.nonEmpty) {
- println(s"Loading ${deps.map(_._1).mkString(", ")}")
+ printLine(
+ s"Loading ${deps.map(_._1).mkString(", ")}",
+ s"Loading ${deps.map("" + _._1 + "
").mkString(", ")}"
+ )
interpApi.load.ivy(deps.map(_._2): _*)
}
}
@@ -321,7 +362,7 @@ class AmmoniteSparkSessionBuilder(implicit
val (sparkJars, sparkDistClassPath) = sys.env.get("SPARK_HOME") match {
case None =>
- println("Getting spark JARs")
+ printLine("Getting spark JARs")
val sparkJars0 =
SparkDependencies.sparkJars(interpApi.repositories(), interpApi.resolutionHooks, Nil)
(sparkJars0, Nil)
@@ -416,13 +457,16 @@ class AmmoniteSparkSessionBuilder(implicit
hadoopConfDirOpt match {
case None =>
- println(
+ printLine(
"Warning: core-site.xml not found in the classpath, and no hadoop conf found via HADOOP_CONF_DIR, " +
- "YARN_CONF_DIR, or at /etc/hadoop/conf"
+ "YARN_CONF_DIR, or at /etc/hadoop/conf",
+ "Warning: core-site.xml
not found in the classpath, and no hadoop conf found via HADOOP_CONF_DIR
, " +
+ "YARN_CONF_DIR
, or at /etc/hadoop/conf
"
)
case Some(dir) =>
- println(
- s"Adding Hadoop conf dir ${AmmoniteSparkSessionBuilder.prettyDir(dir)} to classpath"
+ printLine(
+ s"Adding Hadoop conf dir ${AmmoniteSparkSessionBuilder.prettyDir(dir)} to classpath",
+ s"Adding Hadoop conf dir ${AmmoniteSparkSessionBuilder.prettyDir(dir)} to classpath"
)
interpApi.load.cp(os.Path(dir))
}
@@ -434,18 +478,23 @@ class AmmoniteSparkSessionBuilder(implicit
hiveConfDirOpt match {
case None =>
- println(
- "Warning: hive-site.xml not found in the classpath, and no Hive conf found via HIVE_CONF_DIR"
+ printLine(
+ "Warning: hive-site.xml not found in the classpath, and no Hive conf found via HIVE_CONF_DIR",
+ "Warning: hive-site.xml
not found in the classpath, and no Hive conf found via HIVE_CONF_DIR
"
)
case Some(dir) =>
- println(
- s"Adding Hive conf dir ${AmmoniteSparkSessionBuilder.prettyDir(dir)} to classpath"
+ printLine(
+ s"Adding Hive conf dir ${AmmoniteSparkSessionBuilder.prettyDir(dir)} to classpath",
+ s"Adding Hive conf dir ${AmmoniteSparkSessionBuilder.prettyDir(dir)}
to classpath"
)
interpApi.load.cp(os.Path(dir))
}
}
- println("Creating SparkSession")
+ if (disableProgressBars0)
+ config("spark.ui.showConsoleProgress", false)
+
+ printLine("Creating SparkSession")
val session = super.getOrCreate()
if (interpApi != null)
@@ -467,6 +516,8 @@ class AmmoniteSparkSessionBuilder(implicit
if (forceProgressBars0)
AmmoniteSparkSessionBuilder.forceProgressBars(session.sparkContext)
+ if (disableProgressBars0)
+ AmmoniteSparkSessionBuilder.disableProgressBars(session.sparkContext)
for (api <- Option(replApi))
api.sess.frames.head.addHook {