Skip to content

Commit

Permalink
Merge pull request #321 from alexarchambault/add-dependency-hook
Browse files Browse the repository at this point in the history
Automatically sync dependencies with executors
  • Loading branch information
alexarchambault committed Jun 12, 2023
2 parents 2fdf947 + 00af6a0 commit 8f99209
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 20 deletions.
2 changes: 0 additions & 2 deletions INTERNALS.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ The executor classpath depends on the cluster manager used:

The code entered by the user during the session is compiled and results in extra classes. These are accessible via a small web server that `AmmoniteSparkSession.builder()` launches, and whose address is passed to Spark via `"spark.repl.class.uri"` in the SparkConf.

Lastly, if extra dependencies are loaded during the session after the SparkSession is created, users should call `AmmoniteSparkSession.sync()`. This passes to Spark any JAR added since the SparkSession creation, using the `addJar` method of `SparkContext`.

## `AmmoniteSparkSession` vs `SparkSession`

The builder created via `AmmoniteSparkSession.builder()` extends the one from `SparkSession.builder()`. It does a number of things more compared to it.
Expand Down
4 changes: 0 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,6 @@ You can then run Spark calculations, like
@ val n = rdd.map(_ + 1).sum()
```

### Syncing dependencies

If extra dependencies are loaded, via ``import $ivy.`…` `` after the `SparkSession` has been created, one should call `AmmoniteSparkSession.sync()` for the newly added JARs to be passed to the Spark executors.

## Using with standalone cluster

Simply set the master to `spark://…` when building the session, e.g.
Expand Down
4 changes: 3 additions & 1 deletion build.sc
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,9 @@ class AlmondSpark(val crossScalaVersion: String) extends CrossSbtModule with Amm
Deps.ammoniteReplApi,
Deps.jsoniterScalaMacros,
Deps.log4j2,
Deps.scalaKernelApi,
Deps.scalaKernelApi
.exclude(("com.lihaoyi", s"ammonite-compiler_$crossScalaVersion"))
.exclude(("com.lihaoyi", s"ammonite-repl-api_$crossScalaVersion")),
Deps.sparkSql(scalaVersion())
)
def repositoriesTask = T.task {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ object NotebookSparkSession {
): NotebookSparkSessionBuilder =
new NotebookSparkSessionBuilder

@deprecated(
"Calling this method isn't needed any more, new dependencies are passed to Spark executors automatically",
"0.14.0-RC1"
)
def sync(session: SparkSession = null)(implicit replApi: ReplAPI): SparkSession =
AmmoniteSparkSession.sync(session)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ object AmmoniteSparkSession {
* @param session:
* [[SparkSession]] to add new JARs to
*/
@deprecated(
"Calling this method isn't needed any more, new dependencies are passed to Spark executors automatically",
"0.14.0-RC1"
)
def sync(session: SparkSession = null)(implicit replApi: ReplAPI): SparkSession = {

val session0 = Option(session).getOrElse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,19 +355,22 @@ class AmmoniteSparkSessionBuilder(implicit
if (sendSparkYarnJars0 && isYarn())
config("spark.yarn.jars", sparkJars.map(_.toASCIIString).mkString(","))

if (sendSparkJars0) {
val sparkJarFileSet =
(sparkJars.iterator ++ sparkDistClassPath.map(_.toUri).iterator ++ ignoreJars0.iterator)
.map(normalize)
.toSet
lazy val sparkJarFileSet =
(sparkJars.iterator ++ sparkDistClassPath.map(_.toUri).iterator ++ ignoreJars0.iterator)
.map(normalize)
.toSet

def toBeSent(jars: Seq[URI]): Seq[URI] = {
val nonSparkJars = jars.filter(uri => !sparkJarFileSet.contains(normalize(uri)))
val nonSparkJars0 =
if (sendSourceJars0) nonSparkJars
else nonSparkJars.filter(uri => !uri.toASCIIString.endsWith("-sources.jar"))
val finalNonSparkJars = keepJars0.foldLeft(nonSparkJars0)(_.filter(_))
config("spark.jars", finalNonSparkJars.map(_.toASCIIString).mkString(","))
keepJars0.foldLeft(nonSparkJars0)(_.filter(_))
}

if (sendSparkJars0)
config("spark.jars", toBeSent(jars).map(_.toASCIIString).mkString(","))

if (interpApi != null)
interpApi._compilerManager.outputDir match {
case None =>
Expand Down Expand Up @@ -465,6 +468,17 @@ class AmmoniteSparkSessionBuilder(implicit
if (forceProgressBars0)
AmmoniteSparkSessionBuilder.forceProgressBars(session.sparkContext)

for (api <- Option(replApi))
api.sess.frames.head.addHook {
new ammonite.util.Frame.Hook {
def addClasspath(additional: Seq[URL]): Unit = {
val toBeSent0 = toBeSent(additional.map(_.toURI))
for (uri <- toBeSent0)
session.sparkContext.addJar(uri.toASCIIString)
}
}
}

session
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -549,8 +549,6 @@ class SparkReplTests(
"""
@ import $ivy.`com.twitter::algebird-spark:0.13.5`
@ AmmoniteSparkSession.sync()
@ import com.twitter.algebird.Semigroup
import com.twitter.algebird.Semigroup
Expand Down
8 changes: 4 additions & 4 deletions project/deps.sc
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ object Versions {

def scala = Seq(scala213, scala212)

def ammonite = "3.0.0-M0-32-96e851cb"
def ammonite = "3.0.0-M0-38-dd60a0b5"
def jsoniterScala = "2.13.5"
}

object Deps {
def ammoniteCompiler = ivy"com.lihaoyi:::ammonite-compiler:${Versions.ammonite}"
def ammoniteReplApi = ivy"com.lihaoyi:::ammonite-repl-api:${Versions.ammonite}"
def ammoniteRepl = ivy"com.lihaoyi:::ammonite-repl:${Versions.ammonite}"
def ammoniteCompiler = ivy"sh.almond.tmp.ammonite:::ammonite-compiler:${Versions.ammonite}"
def ammoniteReplApi = ivy"sh.almond.tmp.ammonite:::ammonite-repl-api:${Versions.ammonite}"
def ammoniteRepl = ivy"sh.almond.tmp.ammonite:::ammonite-repl:${Versions.ammonite}"

def classPathUtil = ivy"io.get-coursier::class-path-util:0.1.4"
def jettyServer = ivy"org.eclipse.jetty:jetty-server:9.4.51.v20230217"
Expand Down

0 comments on commit 8f99209

Please sign in to comment.