diff --git a/INTERNALS.md b/INTERNALS.md index bae6c2e..a5e2433 100644 --- a/INTERNALS.md +++ b/INTERNALS.md @@ -11,8 +11,6 @@ The executor classpath depends on the cluster manager used: The code entered by the user during the session is compiled and results in extra classes. These are accessible via a small web server that `AmmoniteSparkSession.builder()` launches, and whose address is passed to Spark via `"spark.repl.class.uri"` in the SparkConf. -Lastly, if extra dependencies are loaded during the session after the SparkSession is created, users should call `AmmoniteSparkSession.sync()`. This passes to Spark any JAR added since the SparkSession creation, using the `addJar` method of `SparkContext`. - ## `AmmoniteSparkSession` vs `SparkSession` The builder created via `AmmoniteSparkSession.builder()` extends the one from `SparkSession.builder()`. It does a number of things more compared to it. diff --git a/README.md b/README.md index 9d79591..24c320c 100644 --- a/README.md +++ b/README.md @@ -72,10 +72,6 @@ You can then run Spark calculations, like @ val n = rdd.map(_ + 1).sum() ``` -### Syncing dependencies - -If extra dependencies are loaded, via ``import $ivy.`…` `` after the `SparkSession` has been created, one should call `AmmoniteSparkSession.sync()` for the newly added JARs to be passed to the Spark executors. - ## Using with standalone cluster Simply set the master to `spark://…` when building the session, e.g. diff --git a/modules/almond-spark/src/main/scala/org/apache/spark/sql/NotebookSparkSession.scala b/modules/almond-spark/src/main/scala/org/apache/spark/sql/NotebookSparkSession.scala index 138f7e6..eb3608a 100644 --- a/modules/almond-spark/src/main/scala/org/apache/spark/sql/NotebookSparkSession.scala +++ b/modules/almond-spark/src/main/scala/org/apache/spark/sql/NotebookSparkSession.scala @@ -15,6 +15,10 @@ object NotebookSparkSession { ): NotebookSparkSessionBuilder = new NotebookSparkSessionBuilder + @deprecated( + "Calling this method isn't needed any more, new dependencies are passed to Spark executors automatically", + "0.14.0-RC1" + ) def sync(session: SparkSession = null)(implicit replApi: ReplAPI): SparkSession = AmmoniteSparkSession.sync(session) diff --git a/modules/core/src/main/scala/org/apache/spark/sql/AmmoniteSparkSession.scala b/modules/core/src/main/scala/org/apache/spark/sql/AmmoniteSparkSession.scala index 5fd5c39..8002fe9 100644 --- a/modules/core/src/main/scala/org/apache/spark/sql/AmmoniteSparkSession.scala +++ b/modules/core/src/main/scala/org/apache/spark/sql/AmmoniteSparkSession.scala @@ -50,6 +50,10 @@ object AmmoniteSparkSession { * @param session: * [[SparkSession]] to add new JARs to */ + @deprecated( + "Calling this method isn't needed any more, new dependencies are passed to Spark executors automatically", + "0.14.0-RC1" + ) def sync(session: SparkSession = null)(implicit replApi: ReplAPI): SparkSession = { val session0 = Option(session).getOrElse { diff --git a/modules/core/src/main/scala/org/apache/spark/sql/ammonitesparkinternals/AmmoniteSparkSessionBuilder.scala b/modules/core/src/main/scala/org/apache/spark/sql/ammonitesparkinternals/AmmoniteSparkSessionBuilder.scala index fa8b74f..cb22ae6 100644 --- a/modules/core/src/main/scala/org/apache/spark/sql/ammonitesparkinternals/AmmoniteSparkSessionBuilder.scala +++ b/modules/core/src/main/scala/org/apache/spark/sql/ammonitesparkinternals/AmmoniteSparkSessionBuilder.scala @@ -355,19 +355,22 @@ class AmmoniteSparkSessionBuilder(implicit if (sendSparkYarnJars0 && isYarn()) config("spark.yarn.jars", sparkJars.map(_.toASCIIString).mkString(",")) - if (sendSparkJars0) { - val sparkJarFileSet = - (sparkJars.iterator ++ sparkDistClassPath.map(_.toUri).iterator ++ ignoreJars0.iterator) - .map(normalize) - .toSet + lazy val sparkJarFileSet = + (sparkJars.iterator ++ sparkDistClassPath.map(_.toUri).iterator ++ ignoreJars0.iterator) + .map(normalize) + .toSet + + def toBeSent(jars: Seq[URI]): Seq[URI] = { val nonSparkJars = jars.filter(uri => !sparkJarFileSet.contains(normalize(uri))) val nonSparkJars0 = if (sendSourceJars0) nonSparkJars else nonSparkJars.filter(uri => !uri.toASCIIString.endsWith("-sources.jar")) - val finalNonSparkJars = keepJars0.foldLeft(nonSparkJars0)(_.filter(_)) - config("spark.jars", finalNonSparkJars.map(_.toASCIIString).mkString(",")) + keepJars0.foldLeft(nonSparkJars0)(_.filter(_)) } + if (sendSparkJars0) + config("spark.jars", toBeSent(jars).map(_.toASCIIString).mkString(",")) + if (interpApi != null) interpApi._compilerManager.outputDir match { case None => @@ -465,6 +468,17 @@ class AmmoniteSparkSessionBuilder(implicit if (forceProgressBars0) AmmoniteSparkSessionBuilder.forceProgressBars(session.sparkContext) + for (api <- Option(replApi)) + api.sess.frames.head.addHook { + new ammonite.util.Frame.Hook { + def addClasspath(additional: Seq[URL]): Unit = { + val toBeSent0 = toBeSent(additional.map(_.toURI)) + for (uri <- toBeSent0) + session.sparkContext.addJar(uri.toASCIIString) + } + } + } + session } } diff --git a/modules/tests/src/main/scala/ammonite/spark/SparkReplTests.scala b/modules/tests/src/main/scala/ammonite/spark/SparkReplTests.scala index efb8985..0385c9e 100644 --- a/modules/tests/src/main/scala/ammonite/spark/SparkReplTests.scala +++ b/modules/tests/src/main/scala/ammonite/spark/SparkReplTests.scala @@ -549,8 +549,6 @@ class SparkReplTests( """ @ import $ivy.`com.twitter::algebird-spark:0.13.5` - @ AmmoniteSparkSession.sync() - @ import com.twitter.algebird.Semigroup import com.twitter.algebird.Semigroup