Skip to content

Commit

Permalink
Automatically sync dependencies with executors
Browse files Browse the repository at this point in the history
  • Loading branch information
alexarchambault committed Jun 12, 2023
1 parent 9fbf1aa commit 00af6a0
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 15 deletions.
2 changes: 0 additions & 2 deletions INTERNALS.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ The executor classpath depends on the cluster manager used:

The code entered by the user during the session is compiled and results in extra classes. These are accessible via a small web server that `AmmoniteSparkSession.builder()` launches, and whose address is passed to Spark via `"spark.repl.class.uri"` in the SparkConf.

Lastly, if extra dependencies are loaded during the session after the SparkSession is created, users should call `AmmoniteSparkSession.sync()`. This passes to Spark any JAR added since the SparkSession creation, using the `addJar` method of `SparkContext`.

## `AmmoniteSparkSession` vs `SparkSession`

The builder created via `AmmoniteSparkSession.builder()` extends the one from `SparkSession.builder()`. It does a number of things more compared to it.
Expand Down
4 changes: 0 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,6 @@ You can then run Spark calculations, like
@ val n = rdd.map(_ + 1).sum()
```

### Syncing dependencies

If extra dependencies are loaded, via ``import $ivy.`…` `` after the `SparkSession` has been created, one should call `AmmoniteSparkSession.sync()` for the newly added JARs to be passed to the Spark executors.

## Using with standalone cluster

Simply set the master to `spark://…` when building the session, e.g.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ object NotebookSparkSession {
): NotebookSparkSessionBuilder =
new NotebookSparkSessionBuilder

@deprecated(
"Calling this method isn't needed any more, new dependencies are passed to Spark executors automatically",
"0.14.0-RC1"
)
def sync(session: SparkSession = null)(implicit replApi: ReplAPI): SparkSession =
AmmoniteSparkSession.sync(session)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ object AmmoniteSparkSession {
* @param session:
* [[SparkSession]] to add new JARs to
*/
@deprecated(
"Calling this method isn't needed any more, new dependencies are passed to Spark executors automatically",
"0.14.0-RC1"
)
def sync(session: SparkSession = null)(implicit replApi: ReplAPI): SparkSession = {

val session0 = Option(session).getOrElse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,19 +355,22 @@ class AmmoniteSparkSessionBuilder(implicit
if (sendSparkYarnJars0 && isYarn())
config("spark.yarn.jars", sparkJars.map(_.toASCIIString).mkString(","))

if (sendSparkJars0) {
val sparkJarFileSet =
(sparkJars.iterator ++ sparkDistClassPath.map(_.toUri).iterator ++ ignoreJars0.iterator)
.map(normalize)
.toSet
lazy val sparkJarFileSet =
(sparkJars.iterator ++ sparkDistClassPath.map(_.toUri).iterator ++ ignoreJars0.iterator)
.map(normalize)
.toSet

def toBeSent(jars: Seq[URI]): Seq[URI] = {
val nonSparkJars = jars.filter(uri => !sparkJarFileSet.contains(normalize(uri)))
val nonSparkJars0 =
if (sendSourceJars0) nonSparkJars
else nonSparkJars.filter(uri => !uri.toASCIIString.endsWith("-sources.jar"))
val finalNonSparkJars = keepJars0.foldLeft(nonSparkJars0)(_.filter(_))
config("spark.jars", finalNonSparkJars.map(_.toASCIIString).mkString(","))
keepJars0.foldLeft(nonSparkJars0)(_.filter(_))
}

if (sendSparkJars0)
config("spark.jars", toBeSent(jars).map(_.toASCIIString).mkString(","))

if (interpApi != null)
interpApi._compilerManager.outputDir match {
case None =>
Expand Down Expand Up @@ -465,6 +468,17 @@ class AmmoniteSparkSessionBuilder(implicit
if (forceProgressBars0)
AmmoniteSparkSessionBuilder.forceProgressBars(session.sparkContext)

for (api <- Option(replApi))
api.sess.frames.head.addHook {
new ammonite.util.Frame.Hook {
def addClasspath(additional: Seq[URL]): Unit = {
val toBeSent0 = toBeSent(additional.map(_.toURI))
for (uri <- toBeSent0)
session.sparkContext.addJar(uri.toASCIIString)
}
}
}

session
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -549,8 +549,6 @@ class SparkReplTests(
"""
@ import $ivy.`com.twitter::algebird-spark:0.13.5`
@ AmmoniteSparkSession.sync()
@ import com.twitter.algebird.Semigroup
import com.twitter.algebird.Semigroup
Expand Down

0 comments on commit 00af6a0

Please sign in to comment.