Skip to content

Commit

Permalink
Merge pull request #8 from alexarchambault/develop
Browse files Browse the repository at this point in the history
Try to run yarn tests using a spark distrib on CI (WIP)
  • Loading branch information
alexarchambault committed Dec 7, 2018
2 parents ff9df48 + b713ca9 commit 5fbce26
Show file tree
Hide file tree
Showing 14 changed files with 199 additions and 52 deletions.
4 changes: 4 additions & 0 deletions .travis.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ set -e
case "${MASTER:-"local"}" in
local)
sbt ++$TRAVIS_SCALA_VERSION'!' publishLocal test ;;
local-distrib)
./with-spark-home.sh sbt ++$TRAVIS_SCALA_VERSION'!' publishLocal local-spark-distrib-tests/test ;;
standalone)
./sbt-with-standalone-cluster.sh ++$TRAVIS_SCALA_VERSION'!' publishLocal standalone-tests/test ;;
yarn)
./sbt-in-docker-with-yarn-cluster.sh -batch ++$TRAVIS_SCALA_VERSION'!' publishLocal yarn-tests/test ;;
yarn-distrib)
./with-spark-home.sh ./sbt-in-docker-with-yarn-cluster.sh -batch ++$TRAVIS_SCALA_VERSION'!' publishLocal yarn-spark-distrib-tests/test ;;
*)
echo "Unrecognized master type $MASTER"
exit 1
Expand Down
4 changes: 4 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@ stages:
if: (branch = master AND type = push) OR (tag IS present)
jobs:
include:
- env: MASTER=local-distrib
scala: 2.11.12
- env: MASTER=local
scala: 2.11.12
- env: MASTER=local
scala: 2.12.7
- env: MASTER=standalone STANDALONE_CACHE=$HOME/standalone-stuff
scala: 2.11.12
- env: MASTER=yarn-distrib YARN_CACHE=$HOME/yarn-stuff STANDALONE_CACHE=$HOME/yarn-cache
scala: 2.11.12
- env: MASTER=yarn YARN_CACHE=$HOME/yarn-stuff
scala: 2.11.12
- env: MASTER=yarn YARN_CACHE=$HOME/yarn-stuff
Expand Down
7 changes: 6 additions & 1 deletion TESTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,9 @@ Run the tests against a YARN cluster with
```
$ ./sbt-in-docker-with-yarn-cluster.sh publishLocal yarn-tests/test
```
Note that SBT is run inside a docker container in that case. This commands starts a dockerized single-node YARN cluster, and shuts it down upon exit.

Run the tests against a YARN cluster _using a provided spark distribution_ with
```
$ ./with-spark-home.sh ./sbt-in-docker-with-yarn-cluster.sh publishLocal yarn-spark-distrib-tests/test
```
Note that SBT is run inside a docker container in the last two cases. This commands starts a dockerized single-node YARN cluster, and shuts it down upon exit.
18 changes: 18 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,15 @@ lazy val tests = project
)
)

lazy val `local-spark-distrib-tests` = project
.dependsOn(tests)
.underModules
.settings(
shared,
dontPublish,
testSettings
)

lazy val `standalone-tests` = project
.dependsOn(tests)
.underModules
Expand All @@ -88,6 +97,15 @@ lazy val `yarn-tests` = project
testSettings
)

lazy val `yarn-spark-distrib-tests` = project
.dependsOn(tests)
.underModules
.settings(
shared,
dontPublish,
testSettings
)

lazy val `ammonite-spark` = project
.in(file("."))
.aggregate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.SparkSession
import org.apache.spark.ui.ConsoleProgressBar

import scala.collection.JavaConverters._

object AmmoniteSparkSessionBuilder {

private def prettyDir(dir: String): String = {
Expand Down Expand Up @@ -196,7 +198,7 @@ class AmmoniteSparkSessionBuilder
.frames
.flatMap(_.classpath)
.filter(AmmoniteSparkSessionBuilder.shouldPassToSpark)
.map(_.getAbsoluteFile.toURI.toASCIIString)
.map(_.getAbsoluteFile.toURI)

val baseJars = {
val cp = AmmoniteSparkSessionBuilder.classpath(
Expand All @@ -212,19 +214,42 @@ class AmmoniteSparkSessionBuilder
.map(_.toURI)
// assuming the JDK on the YARN machines already have those
.filter(u => !AmmoniteSparkSessionBuilder.isJdkJar(u))
.map(_.toASCIIString)
.toVector
}

val jars = (baseJars ++ sessionJars).distinct

println("Getting spark JARs")
val sparkJars = SparkDependencies.sparkJars(interpApi.repositories(), Nil) // interpApi.profiles().sorted)
val sparkJars = sys.env.get("SPARK_HOME") match {
case None =>
println("Getting spark JARs")
SparkDependencies.sparkJars(interpApi.repositories(), Nil)
case Some(sparkHome) =>
// Loose attempt at using the scala JARs already loaded in Ammonite,
// rather than ones from the spark distribution.
val fromBaseCp = jars.filter { f =>
f.toASCIIString.contains("/scala-library-") ||
f.toASCIIString.contains("/scala-reflect-") ||
f.toASCIIString.contains("/scala-compiler-")
}
val fromSparkDistrib = Files.list(Paths.get(sparkHome).resolve("jars"))
.iterator()
.asScala
.toSeq
.filter { p =>
val name = p.getFileName.toString
!name.startsWith("scala-library-") &&
!name.startsWith("scala-reflect-") &&
!name.startsWith("scala-compiler-")
}
.map(_.toAbsolutePath.toUri)

fromBaseCp ++ fromSparkDistrib
}

if (isYarn())
config("spark.yarn.jars", sparkJars.mkString(","))
config("spark.yarn.jars", sparkJars.map(_.toASCIIString).mkString(","))

config("spark.jars", jars.filterNot(sparkJars.toSet).mkString(","))
config("spark.jars", jars.filterNot(sparkJars.toSet).map(_.toASCIIString).mkString(","))

val classServer = new AmmoniteClassServer(
host(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.apache.spark.sql.ammonitesparkinternals

import java.net.URI

import coursier.util.Task
import coursier.{Cache, Dependency, Fetch, Module, Repository, Resolution}

Expand Down Expand Up @@ -127,7 +129,7 @@ object SparkDependencies {
def sparkJars(
repositories: Seq[Repository],
profiles: Seq[String]
): Seq[String] = {
): Seq[URI] = {

val start = Resolution(
sparkBaseDependencies().toSet,
Expand Down Expand Up @@ -179,7 +181,7 @@ object SparkDependencies {

localArtifactsRes
.flatMap(_._2.right.toOption)
.map(_.getAbsoluteFile.toURI.toASCIIString)
.map(_.getAbsoluteFile.toURI)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package ammonite.spark

object LocalSparkHomeTests extends SparkReplTests(
sys.env("SPARK_VERSION"),
Local.master
) {
override def sparkHomeBased =
true
}
34 changes: 31 additions & 3 deletions modules/tests/src/main/scala/ammonite/spark/Init.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,18 @@ object Init {
master: String,
sparkVersion: String,
conf: Seq[(String, String)],
prependBuilderCalls: Seq[String] = Nil
): String =
prependBuilderCalls: Seq[String] = Nil,
loadSparkSql: Boolean = true
): String = {

val optionalSparkSqlImport =
if (loadSparkSql)
Some(s"import $$ivy.`org.apache.spark::spark-sql:$sparkVersion`")
else
None

s"""
@ import $$ivy.`org.apache.spark::spark-sql:$sparkVersion`; import $$ivy.`sh.almond::ammonite-spark:$version`
@ ${optionalSparkSqlImport.fold("")(_ + "; ")}import $$ivy.`sh.almond::ammonite-spark:$version`

@ import org.apache.spark.sql._

Expand All @@ -22,6 +30,26 @@ object Init {
@ val spark = AmmoniteSparkSession.builder()${prependBuilderCalls.mkString}.appName("test-ammonite-spark").master("$master")${conf.map(t => s".config($q${t._1}$q, $q${t._2}$q)").mkString}.getOrCreate()

@ def sc = spark.sparkContext"""
}

def sparkHomeInit(
master: String,
sparkVersion: String,
conf: Seq[(String, String)],
prependBuilderCalls: Seq[String] = Nil
): String =
s"""
@ interp.load.cp {
@ import java.nio.file.{Files, Paths}, scala.collection.JavaConverters._
@ Files.list(Paths.get(s"$${sys.env("SPARK_HOME")}/jars"))
@ .iterator()
@ .asScala
@ .toVector
@ .filter(f => !f.getFileName.toString.startsWith("scala-compiler") && !f.getFileName.toString.startsWith("scala-reflect") && !f.getFileName.toString.startsWith("scala-library"))
@ .sortBy(_.getFileName.toString)
@ .map(ammonite.ops.Path(_))
@ }
""" ++ init(master, sparkVersion, conf, loadSparkSql = false)

def end = "@ spark.sparkContext.stop()"

Expand Down
17 changes: 15 additions & 2 deletions modules/tests/src/main/scala/ammonite/spark/SparkReplTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,28 @@ package ammonite.spark
import ammonite.spark.fromammonite.TestRepl
import utest._

class SparkReplTests(sparkVersion: String, master: String, conf: (String, String)*) extends TestSuite {
class SparkReplTests(
val sparkVersion: String,
val master: String,
val conf: (String, String)*
) extends TestSuite {

// Most of the tests here were adapted from https://github.com/apache/spark/blob/ab18b02e66fd04bc8f1a4fb7b6a7f2773902a494/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala

Init.setupLog4j()

val check = new TestRepl

check.session(Init.init(master, sparkVersion, conf))
def sparkHomeBased: Boolean =
false

def init =
if (sparkHomeBased)
Init.sparkHomeInit(master, sparkVersion, conf)
else
Init.init(master, sparkVersion, conf)

check.session(init)

override def utestAfterAll() =
check.session(Init.end)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package ammonite.spark

object YarnSparkDistribTests extends SparkReplTests(
sys.env("SPARK_VERSION"),
"yarn",
"spark.executor.instances" -> "1",
"spark.executor.memory" -> "2g",
"spark.yarn.executor.memoryOverhead" -> "1g",
"spark.yarn.am.memory" -> "2g"
) {

if (!sys.env.contains("SPARK_HOME"))
sys.error("SPARK_HOME not set")

override def sparkHomeBased =
true

override def inputUrlOpt =
Some(
sys.env.getOrElse(
"INPUT_TXT_URL",
sys.error("INPUT_TXT_URL not set")
)
)
}
25 changes: 15 additions & 10 deletions sbt-in-docker-with-yarn-cluster.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env bash
set -e
set -eu

# when the tests are running, open the YARN UI at http://localhost:8088

Expand Down Expand Up @@ -98,7 +98,6 @@ if [ ! -d "$CACHE/hadoop-conf" ]; then
test "$TRANSIENT_DOCKER_YARN_CLUSTER" = 0 || rm -rf "$CACHE/docker-yarn-cluster"
fi

SPARK_VERSION="2.4.0"
SCALA_VERSION="${TRAVIS_SCALA_VERSION:-"2.11.12"}"
case "$SCALA_VERSION" in
2.11.*)
Expand All @@ -117,16 +116,20 @@ cat > "$CACHE/run.sh" << EOF
#!/usr/bin/env bash
set -e
# prefetch stuff
if [ "\$SPARK_HOME" = "" ]; then
# prefetch stuff
DEPS=()
DEPS+="org.apache.spark:spark-sql_$SBV:$SPARK_VERSION"
DEPS+="org.apache.spark:spark-yarn_$SBV:$SPARK_VERSION"
export SPARK_VERSION="2.4.0"
for d in "${DEPS[@]}"; do
echo "Pre-fetching \$d"
coursier fetch $(if [ "$INTERACTIVE" = 1 ]; then echo --progress; fi) "\$d" >/dev/null
done
DEPS=()
DEPS+=("org.apache.spark:spark-sql_$SBV:\$SPARK_VERSION")
DEPS+=("org.apache.spark:spark-yarn_$SBV:\$SPARK_VERSION")
for d in "\${DEPS[@]}"; do
echo "Pre-fetching \$d"
coursier fetch $(if [ "$INTERACTIVE" = 1 ]; then echo --progress; fi) "\$d" >/dev/null
done
fi
exec sbt -J-Xmx1g "\$@"
EOF
Expand All @@ -145,6 +148,8 @@ docker run -t $(if [ "$INTERACTIVE" = 1 ]; then echo -i; fi) --rm \
-v "$CACHE/ivy2-home:/root/.ivy2" \
-v "$CACHE/hadoop-conf:/etc/hadoop/conf" \
-v "$(pwd):/workspace" \
$(if [ ! -z ${SPARK_HOME+x} ]; then echo "" -e SPARK_HOME=/spark -v "$SPARK_HOME:/spark"; fi) \
$(if [ ! -z ${SPARK_VERSION+x} ]; then echo "" -e SPARK_VERSION; fi) \
-e INPUT_TXT_URL \
-w /workspace \
openjdk:8u151-jdk \
Expand Down
35 changes: 7 additions & 28 deletions sbt-with-standalone-cluster.sh
Original file line number Diff line number Diff line change
@@ -1,45 +1,24 @@
#!/usr/bin/env bash
set -e
set -eu

SPARK_VERSION="2.3.2"
HOST=localhost

cd "$(dirname "${BASH_SOURCE[0]}")"

CACHE="${STANDALONE_CACHE:-"$(pwd)/target/standalone"}"

mkdir -p "$CACHE"

# Fetch spark distrib
if [ ! -d "$CACHE/spark-$SPARK_VERSION-"* ]; then
TRANSIENT_SPARK_ARCHIVE=0
if [ ! -e "$CACHE/archive/spark-$SPARK_VERSION-"*.tgz ]; then
mkdir -p "$CACHE/archive"
TRANSIENT_SPARK_ARCHIVE=1
curl -Lo "$CACHE/archive/spark-$SPARK_VERSION-bin-hadoop2.7.tgz" "https://archive.apache.org/dist/spark/spark-$SPARK_VERSION/spark-$SPARK_VERSION-bin-hadoop2.7.tgz"
fi

( cd "$CACHE" && tar -zxvf "archive/spark-$SPARK_VERSION-"*.tgz )
test "$TRANSIENT_SPARK_ARCHIVE" = 0 || rm -f "$CACHE/archive/spark-$SPARK_VERSION-"*.tgz
rmdir -p "$CACHE/archive" 2>/dev/null || true
fi

SPARK_HOME="$(./with-spark-home.sh /bin/bash -c 'echo $SPARK_HOME')"
SPARK_VERSION="$(./with-spark-home.sh /bin/bash -c 'echo $SPARK_VERSION')"

cleanup() {
cd "$CACHE/spark-$SPARK_VERSION-"*
sbin/stop-slave.sh || true
sbin/stop-master.sh || true
"$SPARK_HOME/sbin/stop-slave.sh" || true
"$SPARK_HOME/sbin/stop-master.sh" || true
}

trap cleanup EXIT INT TERM


SPARK_MASTER="spark://$HOST:7077"

cd "$CACHE/spark-$SPARK_VERSION-"*
sbin/start-master.sh --host "$HOST"
sbin/start-slave.sh --host "$HOST" "$SPARK_MASTER" -c 4 -m 4g
cd -
"$SPARK_HOME/sbin/start-master.sh" --host "$HOST"
"$SPARK_HOME/sbin/start-slave.sh" --host "$HOST" "$SPARK_MASTER" -c 4 -m 4g

STANDALONE_SPARK_MASTER="$SPARK_MASTER" \
STANDALONE_SPARK_VERSION="$SPARK_VERSION" \
Expand Down
Loading

0 comments on commit 5fbce26

Please sign in to comment.