Skip to content

Commit

Permalink
Allow to send Spark logs to the Almond kernel console output
Browse files Browse the repository at this point in the history
Can be useful for debugging
  • Loading branch information
alexarchambault committed Jun 29, 2023
1 parent ccaf049 commit 302f870
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.apache.spark.sql

import almond.api.JupyterApi
import almond.interpreter.api.{CommHandler, OutputHandler}
import ammonite.repl.api.ReplAPI
import ammonite.interp.api.InterpAPI
Expand All @@ -11,7 +12,8 @@ object NotebookSparkSession {
interpApi: InterpAPI,
replApi: ReplAPI,
publish: OutputHandler,
commHandler: CommHandler
commHandler: CommHandler,
jupyterApi: JupyterApi
): NotebookSparkSessionBuilder =
new NotebookSparkSessionBuilder

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package org.apache.spark.sql.almondinternals
import java.io.File
import java.lang.{Boolean => JBoolean}

import almond.api.JupyterApi
import almond.interpreter.api.{CommHandler, OutputHandler}
import almond.display.Display.html
import ammonite.interp.api.InterpAPI
Expand All @@ -19,7 +20,8 @@ class NotebookSparkSessionBuilder(implicit
interpApi: InterpAPI,
replApi: ReplAPI,
publish: OutputHandler,
commHandler: CommHandler
commHandler: CommHandler,
jupyterApi: JupyterApi
) extends AmmoniteSparkSessionBuilder {

override def toString = "NotebookSparkSessionBuilder"
Expand All @@ -37,6 +39,11 @@ class NotebookSparkSessionBuilder(implicit
private var useBars0 = false

private var logsInDeveloperConsoleOpt = Option.empty[Boolean]
private var logsInKernelOutputOpt = Option.empty[Boolean]

private def defaultLogsInKernelOutput() =
Option(System.getenv("ALMOND_SPARK_LOGS_IN_KERNEL_OUTPUT"))
.exists(v => v == "1" || v == "true")

def progress(
enable: Boolean = true,
Expand All @@ -54,6 +61,11 @@ class NotebookSparkSessionBuilder(implicit
this
}

def logsInKernelOutput(enable: JBoolean = null): this.type = {
logsInKernelOutputOpt = Option[JBoolean](enable).map[Boolean](x => x)
this
}

override def getOrCreate(): SparkSession = {

def defaultLogFileOpt = {
Expand All @@ -78,13 +90,23 @@ class NotebookSparkSessionBuilder(implicit
defaultLogFileOpt
}

var sendLogOpt = Option.empty[SendLog]
var sendLogOpt = Option.empty[SendLog]
var sendLogToConsoleOpt = Option.empty[SendLogToConsole]

try {
sendLogOpt = logFileOpt.map { f =>
SendLog.start(f)
}

if (logsInKernelOutputOpt.contains(true) && defaultLogFileOpt.isEmpty)
Console.err.println(
"Warning: cannot determine log file, logs won't be sent to the kernel console."
)
if (logsInKernelOutputOpt.getOrElse(defaultLogsInKernelOutput()))
sendLogToConsoleOpt = defaultLogFileOpt.map { f =>
SendLogToConsole.start(f)
}

val session = super.getOrCreate()

val reverseProxyUrlOpt = session.sparkContext.getConf.getOption("spark.ui.reverseProxyUrl")
Expand All @@ -106,8 +128,10 @@ class NotebookSparkSessionBuilder(implicit

session.sparkContext.addSparkListener(
new SparkListener {
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) =
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) = {
sendLogOpt.foreach(_.stop())
sendLogToConsoleOpt.foreach(_.stop())
}
}
)

Expand All @@ -116,6 +140,7 @@ class NotebookSparkSessionBuilder(implicit
catch {
case NonFatal(e) =>
sendLogOpt.foreach(_.stop())
sendLogToConsoleOpt.foreach(_.stop())
throw e
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package org.apache.spark.sql.almondinternals

import almond.api.JupyterApi

import java.io.{BufferedReader, File, FileReader, PrintStream}
import java.nio.file.{Files, StandardOpenOption}

import scala.collection.mutable.ListBuffer
import scala.concurrent.duration.{DurationInt, FiniteDuration}

final class SendLogToConsole(
f: File,
out: PrintStream,
threadName: String = "send-log-console",
lineBufferSize: Int = 10,
delay: FiniteDuration = 2.seconds
) {

@volatile private var keepReading = true

val thread: Thread =
new Thread(threadName) {
override def run(): Unit = {

var r: FileReader = null

try {
// https://stackoverflow.com/questions/557844/java-io-implementation-of-unix-linux-tail-f/558262#558262

r = new FileReader(f)
val br = new BufferedReader(r)

var lines = new ListBuffer[String]
while (keepReading) {
val line = br.readLine()
if (line == null)
// wait until there is more in the file
Thread.sleep(delay.toMillis)
else {
lines += line

while (keepReading && lines.length <= lineBufferSize && br.ready())
lines += br.readLine()

val l = lines.result()
lines.clear()

for (line <- l)
out.println(line)
}
}
}
catch {
case _: InterruptedException =>
// normal exit
case e: Throwable =>
System.err.println(s"Thread $threadName crashed")
e.printStackTrace(System.err)
}
finally
if (r != null)
r.close()
}
}

def start(): Unit = {
assert(keepReading, "Already stopped")
if (!thread.isAlive)
synchronized {
if (!thread.isAlive)
thread.start()
}
}

def stop(): Unit = {
keepReading = false
thread.interrupt()
}

}

object SendLogToConsole {

def start(f: File)(implicit jupyterApi: JupyterApi): SendLogToConsole = {

// It seems the file must exist for the reader above to get content appended to it.
if (!f.exists())
Files.write(f.toPath, Array.emptyByteArray, StandardOpenOption.CREATE)

val sendLog = new SendLogToConsole(f, jupyterApi.consoleErr)
sendLog.start()
sendLog
}

}
2 changes: 1 addition & 1 deletion project/deps.sc
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ object Versions {

def scala = Seq(scala213, scala212)

def almond = "0.14.0-RC6"
def almond = "0.14.0-RC9"
def ammonite = "3.0.0-M0-40-d95c3b3d"
def jsoniterScala = "2.13.5"
}
Expand Down

0 comments on commit 302f870

Please sign in to comment.