From e8b2591e6f82ced943af3ac96e291a9e752813bf Mon Sep 17 00:00:00 2001 From: Alex Archambault Date: Thu, 29 Jun 2023 13:30:22 +0200 Subject: [PATCH 1/2] Fix send-log-in-developer-console stuff Logs were sent only when the session was being created, not after that --- .../NotebookSparkSessionBuilder.scala | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/NotebookSparkSessionBuilder.scala b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/NotebookSparkSessionBuilder.scala index dee223c..ecc182a 100644 --- a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/NotebookSparkSessionBuilder.scala +++ b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/NotebookSparkSessionBuilder.scala @@ -8,10 +8,12 @@ import almond.display.Display.html import ammonite.interp.api.InterpAPI import ammonite.repl.api.ReplAPI import org.apache.log4j.{Category, Logger, RollingFileAppender} +import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.ammonitesparkinternals.AmmoniteSparkSessionBuilder import scala.collection.JavaConverters._ +import scala.util.control.NonFatal class NotebookSparkSessionBuilder(implicit interpApi: InterpAPI, @@ -102,10 +104,20 @@ class NotebookSparkSessionBuilder(implicit new ProgressSparkListener(session, keep0, progress0, useBars0) ) + session.sparkContext.addSparkListener( + new SparkListener { + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) = + sendLogOpt.foreach(_.stop()) + } + ) + session } - finally - sendLogOpt.foreach(_.stop()) + catch { + case NonFatal(e) => + sendLogOpt.foreach(_.stop()) + throw e + } } } From 7607185ab9bcc03740d17bea0c80c472719aded4 Mon Sep 17 00:00:00 2001 From: Alex Archambault Date: Thu, 29 Jun 2023 13:31:57 +0200 Subject: [PATCH 2/2] Allow to send Spark logs to the Almond kernel console output Can be useful for debugging --- .../spark/sql/NotebookSparkSession.scala | 4 +- .../NotebookSparkSessionBuilder.scala | 31 +++++- .../almondinternals/SendLogToConsole.scala | 95 +++++++++++++++++++ project/deps.sc | 2 +- 4 files changed, 127 insertions(+), 5 deletions(-) create mode 100644 modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/SendLogToConsole.scala diff --git a/modules/almond-spark/src/main/scala/org/apache/spark/sql/NotebookSparkSession.scala b/modules/almond-spark/src/main/scala/org/apache/spark/sql/NotebookSparkSession.scala index eb3608a..e12b08c 100644 --- a/modules/almond-spark/src/main/scala/org/apache/spark/sql/NotebookSparkSession.scala +++ b/modules/almond-spark/src/main/scala/org/apache/spark/sql/NotebookSparkSession.scala @@ -1,5 +1,6 @@ package org.apache.spark.sql +import almond.api.JupyterApi import almond.interpreter.api.{CommHandler, OutputHandler} import ammonite.repl.api.ReplAPI import ammonite.interp.api.InterpAPI @@ -11,7 +12,8 @@ object NotebookSparkSession { interpApi: InterpAPI, replApi: ReplAPI, publish: OutputHandler, - commHandler: CommHandler + commHandler: CommHandler, + jupyterApi: JupyterApi ): NotebookSparkSessionBuilder = new NotebookSparkSessionBuilder diff --git a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/NotebookSparkSessionBuilder.scala b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/NotebookSparkSessionBuilder.scala index ecc182a..758fac6 100644 --- a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/NotebookSparkSessionBuilder.scala +++ b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/NotebookSparkSessionBuilder.scala @@ -3,6 +3,7 @@ package org.apache.spark.sql.almondinternals import java.io.File import java.lang.{Boolean => JBoolean} +import almond.api.JupyterApi import almond.interpreter.api.{CommHandler, OutputHandler} import almond.display.Display.html import ammonite.interp.api.InterpAPI @@ -19,7 +20,8 @@ class NotebookSparkSessionBuilder(implicit interpApi: InterpAPI, replApi: ReplAPI, publish: OutputHandler, - commHandler: CommHandler + commHandler: CommHandler, + jupyterApi: JupyterApi ) extends AmmoniteSparkSessionBuilder { override def toString = "NotebookSparkSessionBuilder" @@ -37,6 +39,11 @@ class NotebookSparkSessionBuilder(implicit private var useBars0 = false private var logsInDeveloperConsoleOpt = Option.empty[Boolean] + private var logsInKernelOutputOpt = Option.empty[Boolean] + + private def defaultLogsInKernelOutput() = + Option(System.getenv("ALMOND_SPARK_LOGS_IN_KERNEL_OUTPUT")) + .exists(v => v == "1" || v == "true") def progress( enable: Boolean = true, @@ -54,6 +61,11 @@ class NotebookSparkSessionBuilder(implicit this } + def logsInKernelOutput(enable: JBoolean = null): this.type = { + logsInKernelOutputOpt = Option[JBoolean](enable).map[Boolean](x => x) + this + } + override def getOrCreate(): SparkSession = { def defaultLogFileOpt = { @@ -78,13 +90,23 @@ class NotebookSparkSessionBuilder(implicit defaultLogFileOpt } - var sendLogOpt = Option.empty[SendLog] + var sendLogOpt = Option.empty[SendLog] + var sendLogToConsoleOpt = Option.empty[SendLogToConsole] try { sendLogOpt = logFileOpt.map { f => SendLog.start(f) } + if (logsInKernelOutputOpt.contains(true) && defaultLogFileOpt.isEmpty) + Console.err.println( + "Warning: cannot determine log file, logs won't be sent to the kernel console." + ) + if (logsInKernelOutputOpt.getOrElse(defaultLogsInKernelOutput())) + sendLogToConsoleOpt = defaultLogFileOpt.map { f => + SendLogToConsole.start(f) + } + val session = super.getOrCreate() val reverseProxyUrlOpt = session.sparkContext.getConf.getOption("spark.ui.reverseProxyUrl") @@ -106,8 +128,10 @@ class NotebookSparkSessionBuilder(implicit session.sparkContext.addSparkListener( new SparkListener { - override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) = + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) = { sendLogOpt.foreach(_.stop()) + sendLogToConsoleOpt.foreach(_.stop()) + } } ) @@ -116,6 +140,7 @@ class NotebookSparkSessionBuilder(implicit catch { case NonFatal(e) => sendLogOpt.foreach(_.stop()) + sendLogToConsoleOpt.foreach(_.stop()) throw e } } diff --git a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/SendLogToConsole.scala b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/SendLogToConsole.scala new file mode 100644 index 0000000..a69aba5 --- /dev/null +++ b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/SendLogToConsole.scala @@ -0,0 +1,95 @@ +package org.apache.spark.sql.almondinternals + +import almond.api.JupyterApi + +import java.io.{BufferedReader, File, FileReader, PrintStream} +import java.nio.file.{Files, StandardOpenOption} + +import scala.collection.mutable.ListBuffer +import scala.concurrent.duration.{DurationInt, FiniteDuration} + +final class SendLogToConsole( + f: File, + out: PrintStream, + threadName: String = "send-log-console", + lineBufferSize: Int = 10, + delay: FiniteDuration = 2.seconds +) { + + @volatile private var keepReading = true + + val thread: Thread = + new Thread(threadName) { + override def run(): Unit = { + + var r: FileReader = null + + try { + // https://stackoverflow.com/questions/557844/java-io-implementation-of-unix-linux-tail-f/558262#558262 + + r = new FileReader(f) + val br = new BufferedReader(r) + + var lines = new ListBuffer[String] + while (keepReading) { + val line = br.readLine() + if (line == null) + // wait until there is more in the file + Thread.sleep(delay.toMillis) + else { + lines += line + + while (keepReading && lines.length <= lineBufferSize && br.ready()) + lines += br.readLine() + + val l = lines.result() + lines.clear() + + for (line <- l) + out.println(line) + } + } + } + catch { + case _: InterruptedException => + // normal exit + case e: Throwable => + System.err.println(s"Thread $threadName crashed") + e.printStackTrace(System.err) + } + finally + if (r != null) + r.close() + } + } + + def start(): Unit = { + assert(keepReading, "Already stopped") + if (!thread.isAlive) + synchronized { + if (!thread.isAlive) + thread.start() + } + } + + def stop(): Unit = { + keepReading = false + thread.interrupt() + } + +} + +object SendLogToConsole { + + def start(f: File)(implicit jupyterApi: JupyterApi): SendLogToConsole = { + + // It seems the file must exist for the reader above to get content appended to it. + if (!f.exists()) + Files.write(f.toPath, Array.emptyByteArray, StandardOpenOption.CREATE) + + val sendLog = new SendLogToConsole(f, jupyterApi.consoleErr) + sendLog.start() + sendLog + } + +} diff --git a/project/deps.sc b/project/deps.sc index f073c55..c53c42d 100644 --- a/project/deps.sc +++ b/project/deps.sc @@ -6,7 +6,7 @@ object Versions { def scala = Seq(scala213, scala212) - def almond = "0.14.0-RC8" + def almond = "0.14.0-RC9" def ammonite = "3.0.0-M0-41-26a93d9c" def jsoniterScala = "2.13.5" }