Skip to content

Commit

Permalink
Merge pull request #334 from alexarchambault/spark-logs-in-almond-output
Browse files Browse the repository at this point in the history
Allow to send Spark logs to the Almond kernel console output
  • Loading branch information
alexarchambault committed Jun 30, 2023
2 parents bc97612 + 7607185 commit 834339d
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.apache.spark.sql

import almond.api.JupyterApi
import almond.interpreter.api.{CommHandler, OutputHandler}
import ammonite.repl.api.ReplAPI
import ammonite.interp.api.InterpAPI
Expand All @@ -11,7 +12,8 @@ object NotebookSparkSession {
interpApi: InterpAPI,
replApi: ReplAPI,
publish: OutputHandler,
commHandler: CommHandler
commHandler: CommHandler,
jupyterApi: JupyterApi
): NotebookSparkSessionBuilder =
new NotebookSparkSessionBuilder

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,25 @@ package org.apache.spark.sql.almondinternals
import java.io.File
import java.lang.{Boolean => JBoolean}

import almond.api.JupyterApi
import almond.interpreter.api.{CommHandler, OutputHandler}
import almond.display.Display.html
import ammonite.interp.api.InterpAPI
import ammonite.repl.api.ReplAPI
import org.apache.log4j.{Category, Logger, RollingFileAppender}
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.ammonitesparkinternals.AmmoniteSparkSessionBuilder

import scala.collection.JavaConverters._
import scala.util.control.NonFatal

class NotebookSparkSessionBuilder(implicit
interpApi: InterpAPI,
replApi: ReplAPI,
publish: OutputHandler,
commHandler: CommHandler
commHandler: CommHandler,
jupyterApi: JupyterApi
) extends AmmoniteSparkSessionBuilder {

override def toString = "NotebookSparkSessionBuilder"
Expand All @@ -35,6 +39,11 @@ class NotebookSparkSessionBuilder(implicit
private var useBars0 = false

private var logsInDeveloperConsoleOpt = Option.empty[Boolean]
private var logsInKernelOutputOpt = Option.empty[Boolean]

private def defaultLogsInKernelOutput() =
Option(System.getenv("ALMOND_SPARK_LOGS_IN_KERNEL_OUTPUT"))
.exists(v => v == "1" || v == "true")

def progress(
enable: Boolean = true,
Expand All @@ -52,6 +61,11 @@ class NotebookSparkSessionBuilder(implicit
this
}

def logsInKernelOutput(enable: JBoolean = null): this.type = {
logsInKernelOutputOpt = Option[JBoolean](enable).map[Boolean](x => x)
this
}

override def getOrCreate(): SparkSession = {

def defaultLogFileOpt = {
Expand All @@ -76,13 +90,23 @@ class NotebookSparkSessionBuilder(implicit
defaultLogFileOpt
}

var sendLogOpt = Option.empty[SendLog]
var sendLogOpt = Option.empty[SendLog]
var sendLogToConsoleOpt = Option.empty[SendLogToConsole]

try {
sendLogOpt = logFileOpt.map { f =>
SendLog.start(f)
}

if (logsInKernelOutputOpt.contains(true) && defaultLogFileOpt.isEmpty)
Console.err.println(
"Warning: cannot determine log file, logs won't be sent to the kernel console."
)
if (logsInKernelOutputOpt.getOrElse(defaultLogsInKernelOutput()))
sendLogToConsoleOpt = defaultLogFileOpt.map { f =>
SendLogToConsole.start(f)
}

val session = super.getOrCreate()

val reverseProxyUrlOpt = session.sparkContext.getConf.getOption("spark.ui.reverseProxyUrl")
Expand All @@ -102,10 +126,23 @@ class NotebookSparkSessionBuilder(implicit
new ProgressSparkListener(session, keep0, progress0, useBars0)
)

session.sparkContext.addSparkListener(
new SparkListener {
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) = {
sendLogOpt.foreach(_.stop())
sendLogToConsoleOpt.foreach(_.stop())
}
}
)

session
}
finally
sendLogOpt.foreach(_.stop())
catch {
case NonFatal(e) =>
sendLogOpt.foreach(_.stop())
sendLogToConsoleOpt.foreach(_.stop())
throw e
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package org.apache.spark.sql.almondinternals

import almond.api.JupyterApi

import java.io.{BufferedReader, File, FileReader, PrintStream}
import java.nio.file.{Files, StandardOpenOption}

import scala.collection.mutable.ListBuffer
import scala.concurrent.duration.{DurationInt, FiniteDuration}

final class SendLogToConsole(
f: File,
out: PrintStream,
threadName: String = "send-log-console",
lineBufferSize: Int = 10,
delay: FiniteDuration = 2.seconds
) {

@volatile private var keepReading = true

val thread: Thread =
new Thread(threadName) {
override def run(): Unit = {

var r: FileReader = null

try {
// https://stackoverflow.com/questions/557844/java-io-implementation-of-unix-linux-tail-f/558262#558262

r = new FileReader(f)
val br = new BufferedReader(r)

var lines = new ListBuffer[String]
while (keepReading) {
val line = br.readLine()
if (line == null)
// wait until there is more in the file
Thread.sleep(delay.toMillis)
else {
lines += line

while (keepReading && lines.length <= lineBufferSize && br.ready())
lines += br.readLine()

val l = lines.result()
lines.clear()

for (line <- l)
out.println(line)
}
}
}
catch {
case _: InterruptedException =>
// normal exit
case e: Throwable =>
System.err.println(s"Thread $threadName crashed")
e.printStackTrace(System.err)
}
finally
if (r != null)
r.close()
}
}

def start(): Unit = {
assert(keepReading, "Already stopped")
if (!thread.isAlive)
synchronized {
if (!thread.isAlive)
thread.start()
}
}

def stop(): Unit = {
keepReading = false
thread.interrupt()
}

}

object SendLogToConsole {

def start(f: File)(implicit jupyterApi: JupyterApi): SendLogToConsole = {

// It seems the file must exist for the reader above to get content appended to it.
if (!f.exists())
Files.write(f.toPath, Array.emptyByteArray, StandardOpenOption.CREATE)

val sendLog = new SendLogToConsole(f, jupyterApi.consoleErr)
sendLog.start()
sendLog
}

}
2 changes: 1 addition & 1 deletion project/deps.sc
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ object Versions {

def scala = Seq(scala213, scala212)

def almond = "0.14.0-RC8"
def almond = "0.14.0-RC9"
def ammonite = "3.0.0-M0-41-26a93d9c"
def jsoniterScala = "2.13.5"
}
Expand Down

0 comments on commit 834339d

Please sign in to comment.