Skip to content

Commit

Permalink
Merge pull request #302 from alexarchambault/develop
Browse files Browse the repository at this point in the history
Various enhancements
  • Loading branch information
alexarchambault authored May 15, 2023
2 parents 42bdd16 + be5077d commit ed58dae
Showing 1 changed file with 82 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,29 @@ object AmmoniteSparkSessionBuilder {
Stream.empty
}

/** Resolves symbolic links in `uri` if it points at a local file
*
* So that two URIs pointing at the same file have the same normalized URI.
*/
private def normalize(uri: URI): URI =
if (uri.getScheme == "file") {
val path = Paths.get(uri)
if (Files.exists(path))
path.toRealPath().toUri
else
uri
}
else
uri
}

class AmmoniteSparkSessionBuilder(implicit
interpApi: InterpAPI,
replApi: ReplAPI
) extends SparkSession.Builder {

import AmmoniteSparkSessionBuilder.normalize

private val options0: scala.collection.Map[String, String] = {

def fieldVia(name: String): Option[scala.collection.mutable.HashMap[String, String]] =
Expand Down Expand Up @@ -183,6 +199,25 @@ class AmmoniteSparkSessionBuilder(implicit
this
}

private var sendSparkYarnJars0 = true
private var sendSparkJars0 = true
private var ignoreJars0 = Set.empty[URI]

def sendSparkYarnJars(force: Boolean = true): this.type = {
sendSparkYarnJars0 = force
this
}

def sendSparkJars(force: Boolean = true): this.type = {
sendSparkJars0 = force
this
}

def ignoreJars(ignore: Set[URI]): this.type = {
ignoreJars0 = ignoreJars0 ++ ignore
this
}

var classServerOpt = Option.empty[AmmoniteClassServer]

private def isYarn(): Boolean =
Expand Down Expand Up @@ -271,10 +306,12 @@ class AmmoniteSparkSessionBuilder(implicit

val jars = (baseJars ++ sessionJars).distinct

val sparkJars = sys.env.get("SPARK_HOME") match {
val (sparkJars, sparkDistClassPath) = sys.env.get("SPARK_HOME") match {
case None =>
println("Getting spark JARs")
SparkDependencies.sparkJars(interpApi.repositories(), interpApi.resolutionHooks, Nil)
val sparkJars0 =
SparkDependencies.sparkJars(interpApi.repositories(), interpApi.resolutionHooks, Nil)
(sparkJars0, Nil)
case Some(sparkHome) =>
// Loose attempt at using the scala JARs already loaded in Ammonite,
// rather than ones from the spark distribution.
Expand All @@ -295,34 +332,49 @@ class AmmoniteSparkSessionBuilder(implicit
}
.map(_.toAbsolutePath.toUri)

fromBaseCp ++ fromSparkDistrib
val sparkDistClassPath = sys.env.get("SPARK_DIST_CLASSPATH")
.toList
.flatMap(_.split(File.pathSeparator).toList)
.map(Paths.get(_))

(fromBaseCp ++ fromSparkDistrib, sparkDistClassPath)
}

if (isYarn())
if (sendSparkYarnJars0 && isYarn())
config("spark.yarn.jars", sparkJars.map(_.toASCIIString).mkString(","))

config("spark.jars", jars.filterNot(sparkJars.toSet).map(_.toASCIIString).mkString(","))

interpApi._compilerManager.outputDir match {
case None =>
val classServer = new AmmoniteClassServer(
host(),
bindAddress(),
options0.get("spark.repl.class.port").fold(AmmoniteClassServer.randomPort())(_.toInt),
replApi.sess.frames
)
classServerOpt = Some(classServer)

config("spark.repl.class.uri", classServer.uri.toString)
if (sendSparkJars0) {
val sparkJarFileSet =
(sparkJars.iterator ++ sparkDistClassPath.map(_.toUri).iterator ++ ignoreJars0.iterator)
.map(normalize)
.toSet
val nonSparkJars = jars.filter(uri => !sparkJarFileSet.contains(normalize(uri)))
config("spark.jars", nonSparkJars.map(_.toASCIIString).mkString(","))
}

System.err.println(
"Warning: Ammonite output directory not specified upon launch. " +
"Relying on the spark.repl.class.uri property, which might have issues in tight network environments."
)
if (interpApi != null)
interpApi._compilerManager.outputDir match {
case None =>
if (replApi != null) {
val classServer = new AmmoniteClassServer(
host(),
bindAddress(),
options0.get("spark.repl.class.port").fold(AmmoniteClassServer.randomPort())(_.toInt),
replApi.sess.frames
)
classServerOpt = Some(classServer)

config("spark.repl.class.uri", classServer.uri.toString)

System.err.println(
"Warning: Ammonite output directory not specified upon launch. " +
"Relying on the spark.repl.class.uri property, which might have issues in tight network environments."
)
}

case Some(outputDir) =>
config("spark.repl.class.outputDir", outputDir.toAbsolutePath.toString)
}
case Some(outputDir) =>
config("spark.repl.class.outputDir", outputDir.toAbsolutePath.toString)
}

if (!options0.contains("spark.ui.port"))
config("spark.ui.port", AmmoniteClassServer.availablePortFrom(4040).toString)
Expand Down Expand Up @@ -377,11 +429,12 @@ class AmmoniteSparkSessionBuilder(implicit
println("Creating SparkSession")
val session = super.getOrCreate()

interpApi.beforeExitHooks += { v =>
if (!session.sparkContext.isStopped)
session.sparkContext.stop()
v
}
if (interpApi != null)
interpApi.beforeExitHooks += { v =>
if (!session.sparkContext.isStopped)
session.sparkContext.stop()
v
}

session.sparkContext.addSparkListener(
new SparkListener {
Expand Down

0 comments on commit ed58dae

Please sign in to comment.