diff --git a/INTERNALS.md b/INTERNALS.md index bae6c2e..a5e2433 100644 --- a/INTERNALS.md +++ b/INTERNALS.md @@ -11,8 +11,6 @@ The executor classpath depends on the cluster manager used: The code entered by the user during the session is compiled and results in extra classes. These are accessible via a small web server that `AmmoniteSparkSession.builder()` launches, and whose address is passed to Spark via `"spark.repl.class.uri"` in the SparkConf. -Lastly, if extra dependencies are loaded during the session after the SparkSession is created, users should call `AmmoniteSparkSession.sync()`. This passes to Spark any JAR added since the SparkSession creation, using the `addJar` method of `SparkContext`. - ## `AmmoniteSparkSession` vs `SparkSession` The builder created via `AmmoniteSparkSession.builder()` extends the one from `SparkSession.builder()`. It does a number of things more compared to it. diff --git a/README.md b/README.md index 9d79591..24c320c 100644 --- a/README.md +++ b/README.md @@ -72,10 +72,6 @@ You can then run Spark calculations, like @ val n = rdd.map(_ + 1).sum() ``` -### Syncing dependencies - -If extra dependencies are loaded, via ``import $ivy.`…` `` after the `SparkSession` has been created, one should call `AmmoniteSparkSession.sync()` for the newly added JARs to be passed to the Spark executors. - ## Using with standalone cluster Simply set the master to `spark://…` when building the session, e.g. diff --git a/build.sc b/build.sc index 8d9b1b6..cda2106 100644 --- a/build.sc +++ b/build.sc @@ -274,7 +274,9 @@ class AlmondSpark(val crossScalaVersion: String) extends CrossSbtModule with Amm Deps.ammoniteReplApi, Deps.jsoniterScalaMacros, Deps.log4j2, - Deps.scalaKernelApi, + Deps.scalaKernelApi + .exclude(("com.lihaoyi", s"ammonite-compiler_$crossScalaVersion")) + .exclude(("com.lihaoyi", s"ammonite-repl-api_$crossScalaVersion")), Deps.sparkSql(scalaVersion()) ) def repositoriesTask = T.task { diff --git a/modules/almond-spark/src/main/scala/org/apache/spark/sql/NotebookSparkSession.scala b/modules/almond-spark/src/main/scala/org/apache/spark/sql/NotebookSparkSession.scala index 138f7e6..eb3608a 100644 --- a/modules/almond-spark/src/main/scala/org/apache/spark/sql/NotebookSparkSession.scala +++ b/modules/almond-spark/src/main/scala/org/apache/spark/sql/NotebookSparkSession.scala @@ -15,6 +15,10 @@ object NotebookSparkSession { ): NotebookSparkSessionBuilder = new NotebookSparkSessionBuilder + @deprecated( + "Calling this method isn't needed any more, new dependencies are passed to Spark executors automatically", + "0.14.0-RC1" + ) def sync(session: SparkSession = null)(implicit replApi: ReplAPI): SparkSession = AmmoniteSparkSession.sync(session) diff --git a/modules/core/src/main/scala/org/apache/spark/sql/AmmoniteSparkSession.scala b/modules/core/src/main/scala/org/apache/spark/sql/AmmoniteSparkSession.scala index 5fd5c39..8002fe9 100644 --- a/modules/core/src/main/scala/org/apache/spark/sql/AmmoniteSparkSession.scala +++ b/modules/core/src/main/scala/org/apache/spark/sql/AmmoniteSparkSession.scala @@ -50,6 +50,10 @@ object AmmoniteSparkSession { * @param session: * [[SparkSession]] to add new JARs to */ + @deprecated( + "Calling this method isn't needed any more, new dependencies are passed to Spark executors automatically", + "0.14.0-RC1" + ) def sync(session: SparkSession = null)(implicit replApi: ReplAPI): SparkSession = { val session0 = Option(session).getOrElse { diff --git a/modules/core/src/main/scala/org/apache/spark/sql/ammonitesparkinternals/AmmoniteSparkSessionBuilder.scala b/modules/core/src/main/scala/org/apache/spark/sql/ammonitesparkinternals/AmmoniteSparkSessionBuilder.scala index fa8b74f..cb22ae6 100644 --- a/modules/core/src/main/scala/org/apache/spark/sql/ammonitesparkinternals/AmmoniteSparkSessionBuilder.scala +++ b/modules/core/src/main/scala/org/apache/spark/sql/ammonitesparkinternals/AmmoniteSparkSessionBuilder.scala @@ -355,19 +355,22 @@ class AmmoniteSparkSessionBuilder(implicit if (sendSparkYarnJars0 && isYarn()) config("spark.yarn.jars", sparkJars.map(_.toASCIIString).mkString(",")) - if (sendSparkJars0) { - val sparkJarFileSet = - (sparkJars.iterator ++ sparkDistClassPath.map(_.toUri).iterator ++ ignoreJars0.iterator) - .map(normalize) - .toSet + lazy val sparkJarFileSet = + (sparkJars.iterator ++ sparkDistClassPath.map(_.toUri).iterator ++ ignoreJars0.iterator) + .map(normalize) + .toSet + + def toBeSent(jars: Seq[URI]): Seq[URI] = { val nonSparkJars = jars.filter(uri => !sparkJarFileSet.contains(normalize(uri))) val nonSparkJars0 = if (sendSourceJars0) nonSparkJars else nonSparkJars.filter(uri => !uri.toASCIIString.endsWith("-sources.jar")) - val finalNonSparkJars = keepJars0.foldLeft(nonSparkJars0)(_.filter(_)) - config("spark.jars", finalNonSparkJars.map(_.toASCIIString).mkString(",")) + keepJars0.foldLeft(nonSparkJars0)(_.filter(_)) } + if (sendSparkJars0) + config("spark.jars", toBeSent(jars).map(_.toASCIIString).mkString(",")) + if (interpApi != null) interpApi._compilerManager.outputDir match { case None => @@ -465,6 +468,17 @@ class AmmoniteSparkSessionBuilder(implicit if (forceProgressBars0) AmmoniteSparkSessionBuilder.forceProgressBars(session.sparkContext) + for (api <- Option(replApi)) + api.sess.frames.head.addHook { + new ammonite.util.Frame.Hook { + def addClasspath(additional: Seq[URL]): Unit = { + val toBeSent0 = toBeSent(additional.map(_.toURI)) + for (uri <- toBeSent0) + session.sparkContext.addJar(uri.toASCIIString) + } + } + } + session } } diff --git a/modules/tests/src/main/scala/ammonite/spark/SparkReplTests.scala b/modules/tests/src/main/scala/ammonite/spark/SparkReplTests.scala index efb8985..0385c9e 100644 --- a/modules/tests/src/main/scala/ammonite/spark/SparkReplTests.scala +++ b/modules/tests/src/main/scala/ammonite/spark/SparkReplTests.scala @@ -549,8 +549,6 @@ class SparkReplTests( """ @ import $ivy.`com.twitter::algebird-spark:0.13.5` - @ AmmoniteSparkSession.sync() - @ import com.twitter.algebird.Semigroup import com.twitter.algebird.Semigroup diff --git a/project/deps.sc b/project/deps.sc index ea9f016..c3afac1 100644 --- a/project/deps.sc +++ b/project/deps.sc @@ -6,14 +6,14 @@ object Versions { def scala = Seq(scala213, scala212) - def ammonite = "3.0.0-M0-32-96e851cb" + def ammonite = "3.0.0-M0-38-dd60a0b5" def jsoniterScala = "2.13.5" } object Deps { - def ammoniteCompiler = ivy"com.lihaoyi:::ammonite-compiler:${Versions.ammonite}" - def ammoniteReplApi = ivy"com.lihaoyi:::ammonite-repl-api:${Versions.ammonite}" - def ammoniteRepl = ivy"com.lihaoyi:::ammonite-repl:${Versions.ammonite}" + def ammoniteCompiler = ivy"sh.almond.tmp.ammonite:::ammonite-compiler:${Versions.ammonite}" + def ammoniteReplApi = ivy"sh.almond.tmp.ammonite:::ammonite-repl-api:${Versions.ammonite}" + def ammoniteRepl = ivy"sh.almond.tmp.ammonite:::ammonite-repl:${Versions.ammonite}" def classPathUtil = ivy"io.get-coursier::class-path-util:0.1.4" def jettyServer = ivy"org.eclipse.jetty:jetty-server:9.4.51.v20230217"