Skip to content

Commit

Permalink
Merge pull request #6 from alexarchambault/develop
Browse files Browse the repository at this point in the history
Add spark 2.4 and scala 2.12 support
  • Loading branch information
alexarchambault committed Nov 7, 2018
2 parents 155a27f + ae91107 commit ff9df48
Show file tree
Hide file tree
Showing 23 changed files with 447 additions and 68 deletions.
8 changes: 7 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
language: scala
scala: 2.11.12
jdk: oraclejdk8
script: ./.travis.sh
sudo: required
Expand All @@ -19,7 +18,14 @@ stages:
jobs:
include:
- env: MASTER=local
scala: 2.11.12
- env: MASTER=local
scala: 2.12.7
- env: MASTER=standalone STANDALONE_CACHE=$HOME/standalone-stuff
scala: 2.11.12
- env: MASTER=yarn YARN_CACHE=$HOME/yarn-stuff
scala: 2.11.12
- env: MASTER=yarn YARN_CACHE=$HOME/yarn-stuff
scala: 2.12.7
- stage: release
script: sbt ci-release
31 changes: 26 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,43 @@ inThisBuild(List(
)
))

lazy val `spark-stubs` = project
lazy val `spark-stubs_20` = project
.underModules
.settings(
shared,
libraryDependencies += Deps.sparkSql % "provided"
baseDirectory := {
val baseDir = baseDirectory.value

if (Settings.isAtLeast212.value)
baseDir / "target" / "dummy"
else
baseDir
},
libraryDependencies ++= {
if (Settings.isAtLeast212.value)
Nil
else
Seq(Deps.sparkSql20 % "provided")
},
publishArtifact := !Settings.isAtLeast212.value
)

lazy val `spark-stubs_24` = project
.underModules
.settings(
shared,
libraryDependencies += Deps.sparkSql24 % "provided"
)

lazy val core = project
.in(file("modules/core"))
.dependsOn(`spark-stubs`)
.settings(
shared,
name := "ammonite-spark",
generatePropertyFile("org/apache/spark/sql/ammonitesparkinternals/ammonite-spark.properties"),
libraryDependencies ++= Seq(
Deps.ammoniteRepl % "provided",
Deps.sparkSql % "provided",
Deps.sparkSql.value % "provided",
Deps.jettyServer
)
)
Expand Down Expand Up @@ -72,7 +92,8 @@ lazy val `ammonite-spark` = project
.in(file("."))
.aggregate(
core,
`spark-stubs`,
`spark-stubs_20`,
`spark-stubs_24`,
tests
)
.settings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,25 @@ class AmmoniteSparkSessionBuilder
replApi: ReplAPI
) extends SparkSession.Builder {

private val options0: scala.collection.Map[String, String] =
try {
val f = classOf[SparkSession.Builder].getDeclaredField("org$apache$spark$sql$SparkSession$Builder$$options")
f.setAccessible(true)
f.get(this).asInstanceOf[scala.collection.mutable.HashMap[String, String]]
} catch {
case t: Throwable =>
println(s"Warning: can't read SparkSession Builder options, caught $t")
private val options0: scala.collection.Map[String, String] = {

def fieldVia(name: String): Option[scala.collection.mutable.HashMap[String, String]] =
try {
val f = classOf[SparkSession.Builder].getDeclaredField(name)
f.setAccessible(true)
Some(f.get(this).asInstanceOf[scala.collection.mutable.HashMap[String, String]])
} catch {
case _: NoSuchFieldException =>
None
}

fieldVia("org$apache$spark$sql$SparkSession$Builder$$options")
.orElse(fieldVia("options"))
.getOrElse {
println("Warning: can't read SparkSession Builder options (options field not found)")
Map.empty[String, String]
}
}
}

private def init(): Unit = {

Expand Down Expand Up @@ -158,17 +167,28 @@ class AmmoniteSparkSessionBuilder
private def bindAddress(): String =
options0.getOrElse("spark.driver.bindAddress", host())

override def getOrCreate(): SparkSession = {
private def loadExtraDependencies(): Unit = {

if (isYarn() && !SparkDependencies.sparkYarnFound()) {
println("Loading spark-yarn")
interpApi.load.ivy(SparkDependencies.sparkYarnDependency)
}
var deps = List.empty[(String, coursier.Dependency)]

if (hiveSupport() && !SparkDependencies.sparkHiveFound())
deps = ("spark-hive", SparkDependencies.sparkHiveDependency) :: deps

if (!SparkDependencies.sparkExecutorClassLoaderFound())
deps = ("spark-stubs", SparkDependencies.stubsDependency) :: deps

if (isYarn() && !SparkDependencies.sparkYarnFound())
deps = ("spark-yarn", SparkDependencies.sparkYarnDependency) :: deps

if (hiveSupport() && !SparkDependencies.sparkHiveFound()) {
println("Loading spark-hive")
interpApi.load.ivy(SparkDependencies.sparkHiveDependency)
if (deps.nonEmpty) {
println(s"Loading ${deps.map(_._1).mkString(", ")}")
interpApi.load.ivy(deps.map(_._2): _*)
}
}

override def getOrCreate(): SparkSession = {

loadExtraDependencies()

val sessionJars =
replApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import scala.annotation.tailrec
import scala.collection.mutable
import scala.concurrent.ExecutionContext
import scala.util.Properties.{versionNumberString => scalaVersion}
import scala.util.Try

object SparkDependencies {

Expand All @@ -24,6 +25,7 @@ object SparkDependencies {
)

private def sparkYarnClass = "org.apache.spark.deploy.yarn.Client"
private def sparkExecutorClassLoaderClass = "org.apache.spark.repl.ExecutorClassLoader"

def sparkHiveFound(): Boolean =
sparkHiveClasses.exists { className =>
Expand All @@ -45,6 +47,15 @@ object SparkDependencies {
false
}

def sparkExecutorClassLoaderFound(): Boolean =
try {
Thread.currentThread().getContextClassLoader.loadClass(sparkExecutorClassLoaderClass)
true
} catch {
case _: ClassNotFoundException =>
false
}

private def sparkModules(): Seq[String] = {

val b = new mutable.ListBuffer[String]
Expand Down Expand Up @@ -79,10 +90,17 @@ object SparkDependencies {
b.result()
}

def stubsDependency =
def stubsDependency = {
val suffix = org.apache.spark.SPARK_VERSION.split('.').take(2) match {
case Array("2", n) if Try(n.toInt).toOption.exists(_ <= 3) =>
"20"
case _ =>
"24"
}
coursier.Dependency(
coursier.Module("sh.almond", s"spark-stubs_$sbv"), Properties.version
coursier.Module("sh.almond", s"spark-stubs_${suffix}_$sbv"), Properties.version
)
}

def sparkYarnDependency =
coursier.Dependency(
Expand Down
Loading

0 comments on commit ff9df48

Please sign in to comment.