Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,16 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>kubernetes</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-kubernetes_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>hive</id>
<dependencies>
Expand Down
27 changes: 25 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ object SparkSubmit {
private val STANDALONE = 2
private val MESOS = 4
private val LOCAL = 8
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
private val KUBERNETES = 16
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | KUBERNETES | LOCAL

// Deploy modes
private val CLIENT = 1
Expand Down Expand Up @@ -240,8 +241,9 @@ object SparkSubmit {
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("mesos") => MESOS
case m if m.startsWith("local") => LOCAL
case m if m.startsWith("k8s") => KUBERNETES
case _ =>
printErrorAndExit("Master must either be yarn or start with spark, mesos, local")
printErrorAndExit("Master must either be yarn or start with spark, mesos, k8s, local")
-1
}

Expand Down Expand Up @@ -284,6 +286,7 @@ object SparkSubmit {
}
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER

// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
// too for packages that include Python code
Expand Down Expand Up @@ -606,6 +609,26 @@ object SparkSubmit {
}
}

if (isKubernetesCluster) {
childMainClass = "org.apache.spark.deploy.kubernetes.Client"
if (args.isPython) {
childArgs += ("--primary-py-file", args.primaryResource)
childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
} else if (args.isR) {
val mainFile = new Path(args.primaryResource).getName
childArgs += ("--primary-r-file", mainFile)
childArgs += ("--class", "org.apache.spark.deploy.RRunner")
} else {
if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
childArgs += ("--jar", args.primaryResource)
}
childArgs += ("--class", args.mainClass)
}
if (args.childArgs != null) {
args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
}
}

// Load any properties specified through --conf and the default properties file
for ((k, v) <- args.sparkProperties) {
sysProps.getOrElseUpdate(k, v)
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.4
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ jersey-client-2.22.2.jar
jersey-common-2.22.2.jar
jersey-container-servlet-2.22.2.jar
jersey-container-servlet-core-2.22.2.jar
jersey-guava-2.22.2.jar
jersey-guava-2.22.2.jarshaded-proto
jersey-media-jaxb-2.22.2.jar
jersey-server-2.22.2.jar
jets3t-0.9.3.jar
Expand Down
4 changes: 3 additions & 1 deletion dev/make-distribution.sh
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ export MAVEN_OPTS="${MAVEN_OPTS:--Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCac
# Store the command as an array because $MVN variable might have spaces in it.
# Normal quoting tricks don't work.
# See: http://mywiki.wooledge.org/BashFAQ/050
BUILD_COMMAND=("$MVN" -T 1C clean package -DskipTests $@)
# BUILD_COMMAND=("$MVN" -T 1C clean package -DskipTests $@)

BUILD_COMMAND=("$MVN" -T 2C package -DskipTests $@)

# Actually build the jar
echo -e "\nBuilding with..."
Expand Down
47 changes: 47 additions & 0 deletions examples/src/main/scala/org/apache/spark/examples/DelayedMap.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// scalastyle:off println
package org.apache.spark.examples

import org.apache.spark.sql.SparkSession
import org.apache.spark.internal.Logging

/** Simulates a long-running computation with a delay inserted into a map */
object DelayedMap extends Logging {
def main(args: Array[String]) {
val spark = SparkSession
.builder
.appName("Delayed Map")
.getOrCreate()
// delay in milliseconds
val delay = args(0).toInt
// remaining args are numbers of partitions to use
val plist = args.drop(1).map(_.toInt)

plist.foreach { np =>
logInfo(s"DELAYED MAP: np= $np")
spark.sparkContext.parallelize(1 to 1000, np).map { x =>
Thread.sleep(delay)
x
}.count
}

spark.stop()
}
}
// scalastyle:on println
21 changes: 21 additions & 0 deletions kubernetes/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Pre-requisites
* maven, JDK and all other pre-requisites for building Spark.

# Steps to compile

* Clone the fork of spark: https://github.com/foxish/spark/ and switch to the k8s-support branch.
* Build the project
* ./build/mvn -Pkubernetes -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests package
* Ensure that you are pointing to a k8s cluster (kubectl config current-context), which you want to use with spark.
* Launch a spark-submit job:
* `./bin/spark-submit --deploy-mode cluster --class org.apache.spark.examples.SparkPi --master k8s://default --conf spark.executor.instances=5 --conf spark.kubernetes.sparkImage=manyangled/kube-spark:dynamic http://storage.googleapis.com/foxish-spark-distro/original-spark-examples_2.11-2.1.0-SNAPSHOT.jar 10000`
* Optionally, the following config parameters can be supplied to spark-submit with additional `--conf` arguments (or a configuration file).
* spark.kubernetes.serviceAccountName (defaults to "default")
* spark.kubernetes.namespace (defaults to "default"). The namespace must exist prior to launching spark-submit.
* The image is built from https://github.com/erikerlandson/openshift-spark.
* `--master k8s://default` ensures that it picks up the correct APIServer the default from the current context.
* Check for pods being created. Watch the master logs using kubectl log -f <driver-pod>.
* If on a cloud/infrastructure provider that allows external load balancers to be provisioned, an external IP will be allocated to the service associated with the driver. The spark-master UI can be accessed from that IP address on port 4040.


![spark-submit](spark-submit.png)
54 changes: 54 additions & 0 deletions kubernetes/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>spark-kubernetes_2.11</artifactId>
<packaging>jar</packaging>
<name>Spark Project Kubernetes</name>
<properties>
<sbt.project.name>kubernetes</sbt.project.name>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
<version>1.4.8</version>
</dependency>

<!-- Explicit listing of transitive deps that are shaded. Otherwise, odd compiler crashes. -->
</dependencies>

<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>

</project>
Binary file added kubernetes/spark-submit.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 3 additions & 0 deletions kubernetes/src/main/resources/META-INF/MANIFEST.MF
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Manifest-Version: 1.0
Main-Class: org.apache.spark.deploy.kubernetes.Client

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.spark.scheduler.cluster.kubernetes.KubernetesClusterManager
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.kubernetes

import java.util.concurrent.CountDownLatch

import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.cluster.kubernetes.KubernetesClusterScheduler
import org.apache.spark.util.ShutdownHookManager

private[spark] class Client(val args: ClientArguments,
val hadoopConf: Configuration,
val sparkConf: SparkConf)
extends Logging {
private val scheduler = new KubernetesClusterScheduler(sparkConf)
private val shutdownLatch = new CountDownLatch(1)

def this(clientArgs: ClientArguments, spConf: SparkConf) =
this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)

def start(): Unit = {
scheduler.start(args)
}

def stop(): Unit = {
scheduler.stop()
shutdownLatch.countDown()
System.clearProperty("SPARK_KUBERNETES_MODE")
}

def awaitShutdown(): Unit = {
shutdownLatch.await()
}
}

private object Client extends Logging {
def main(argStrings: Array[String]) {
val sparkConf = new SparkConf
System.setProperty("SPARK_KUBERNETES_MODE", "true")
val args = new ClientArguments(argStrings)
val client = new Client(args, sparkConf)
client.start()

logDebug("Adding shutdown hook")
ShutdownHookManager.addShutdownHook { () =>
logInfo("Shutdown hook is shutting down client")
client.stop()
client.awaitShutdown()
}
client.awaitShutdown()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.kubernetes

import scala.collection.mutable.ArrayBuffer

private[spark] class ClientArguments(args: Array[String]) {

var userJar: String = null
var userClass: String = null
var primaryPyFile: String = null
var primaryRFile: String = null
var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]()

parseArgs(args.toList)

private def parseArgs(inputArgs: List[String]): Unit = {
var args = inputArgs

while (!args.isEmpty) {
args match {
case ("--jar") :: value :: tail =>
userJar = value
args = tail

case ("--class") :: value :: tail =>
userClass = value
args = tail

case ("--primary-py-file") :: value :: tail =>
primaryPyFile = value
args = tail

case ("--primary-r-file") :: value :: tail =>
primaryRFile = value
args = tail

case ("--arg") :: value :: tail =>
userArgs += value
args = tail

case Nil =>

case _ =>
throw new IllegalArgumentException(getUsageMessage(args))
}
}

if (primaryPyFile != null && primaryRFile != null) {
throw new IllegalArgumentException("Cannot have primary-py-file and primary-r-file" +
" at the same time")
}
}

private def getUsageMessage(unknownParam: List[String] = null): String = {
val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else ""
message +
s"""
|Usage: org.apache.spark.deploy.kubernetes.Client [options]
|Options:
| --jar JAR_PATH Path to your application's JAR file (required in kubernetes-cluster
| mode)
| --class CLASS_NAME Name of your application's main class (required)
| --primary-py-file A main Python file
| --primary-r-file A main R file
| --arg ARG Argument to be passed to your application's main class.
| Multiple invocations are possible, each will be passed in order.
""".stripMargin
}
}
Loading