Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ashwin/patch spark3 jars #17

Open
wants to merge 5 commits into
base: affirm-3.1.2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -308,21 +308,22 @@ private[spark] class SparkSubmit extends Logging {
args.ivySettingsPath)

if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
// In K8s client mode, when in the driver, add resolved jars early as we might need
// them at the submit time for artifact downloading.
// For example we might use the dependencies for downloading
// files from a Hadoop Compatible fs e.g. S3. In this case the user might pass:
// --packages com.amazonaws:aws-java-sdk:1.7.4:org.apache.hadoop:hadoop-aws:2.7.6
if (isKubernetesClusterModeDriver) {
val loader = getSubmitClassLoader(sparkConf)
for (jar <- resolvedMavenCoordinates.split(",")) {
addJarToClasspath(jar, loader)
}
} else if (isKubernetesCluster) {
if (isKubernetesCluster) {
// We need this in K8s cluster mode so that we can upload local deps
// via the k8s application, like in cluster mode driver
childClasspath ++= resolvedMavenCoordinates.split(",")
} else {
// In K8s client mode, when in the driver, add resolved jars early as we might need
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unindent

// them at the submit time for artifact downloading.
// For example we might use the dependencies for downloading
// files from a Hadoop Compatible fs e.g. S3. In this case the user might pass:
// --packages com.amazonaws:aws-java-sdk:1.7.4:org.apache.hadoop:hadoop-aws:2.7.6
if (isKubernetesClusterModeDriver) {
val loader = getSubmitClassLoader(sparkConf)
for (jar <- resolvedMavenCoordinates.split(",")) {
addJarToClasspath(jar, loader)
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why you added if (isKubernetesClusterModeDriver) to the else branch but I presume it's for a good reason.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's because when spark-submit is in the k8s driver - deployMode == client

val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER
val isKubernetesClient = clusterManager == KUBERNETES && deployMode == CLIENT
 val isKubernetesClusterModeDriver = isKubernetesClient &&
      sparkConf.getBoolean("spark.kubernetes.submitInDriver", false)

args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
if (args.isPython || isInternal(args.primaryResource)) {
args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ private[spark] class MatrixUDT extends UserDefinedType[Matrix] {

override def typeName: String = "matrix"

override def pyUDT: String = "pyspark.ml.linalg.MatrixUDT"
override def pyUDT: String = "pyspark3.ml.linalg.MatrixUDT"

private[spark] override def asNullable: MatrixUDT = this
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private[spark] class VectorUDT extends UserDefinedType[Vector] {
}
}

override def pyUDT: String = "pyspark.ml.linalg.VectorUDT"
override def pyUDT: String = "pyspark3.ml.linalg.VectorUDT"

override def userClass: Class[Vector] = classOf[Vector]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.mllib.api.python.SerDeBase
*/
private[spark] object MLSerDe extends SerDeBase with Serializable {

override val PYSPARK_PACKAGE = "pyspark.ml"
override val PYSPARK_PACKAGE = "pyspark3.ml"

// Pickler for DenseVector
private[python] class DenseVectorPickler extends BasePickler[DenseVector] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ private[spark] class MatrixUDT extends UserDefinedType[Matrix] {

override def typeName: String = "matrix"

override def pyUDT: String = "pyspark.mllib.linalg.MatrixUDT"
override def pyUDT: String = "pyspark3.mllib.linalg.MatrixUDT"

private[spark] override def asNullable: MatrixUDT = this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ class VectorUDT extends UserDefinedType[Vector] {
}
}

override def pyUDT: String = "pyspark.mllib.linalg.VectorUDT"
override def pyUDT: String = "pyspark3.mllib.linalg.VectorUDT"

override def userClass: Class[Vector] = classOf[Vector]

Expand Down
4 changes: 2 additions & 2 deletions python/pyspark3/ml/linalg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def sqlType(cls):

@classmethod
def module(cls):
return "pyspark.ml.linalg"
return "pyspark3.ml.linalg"

@classmethod
def scalaUDT(cls):
Expand Down Expand Up @@ -184,7 +184,7 @@ def sqlType(cls):

@classmethod
def module(cls):
return "pyspark.ml.linalg"
return "pyspark3.ml.linalg"

@classmethod
def scalaUDT(cls):
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark3/ml/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ def __get_class(clazz):
for comp in parts[1:]:
m = getattr(m, comp)
return m
stage_name = java_stage.getClass().getName().replace("org.apache.spark", "pyspark")
stage_name = java_stage.getClass().getName().replace("org.apache.spark", "pyspark3")
# Generate a default new instance from the stage_name class.
py_type = __get_class(stage_name)
if issubclass(py_type, JavaParams):
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark3/mllib/linalg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def sqlType(cls):

@classmethod
def module(cls):
return "pyspark.mllib.linalg"
return "pyspark3.mllib.linalg"

@classmethod
def scalaUDT(cls):
Expand Down Expand Up @@ -187,7 +187,7 @@ def sqlType(cls):

@classmethod
def module(cls):
return "pyspark.mllib.linalg"
return "pyspark3.mllib.linalg"

@classmethod
def scalaUDT(cls):
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark3/mllib/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ def _java_loader_class(cls):
implementation replaces "pyspark" by "org.apache.spark" in
the Python full class name.
"""
java_package = cls.__module__.replace("pyspark", "org.apache.spark")
java_package = cls.__module__.replace("pyspark3", "org.apache.spark")
return ".".join([java_package, cls.__name__])

@classmethod
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark3/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

__version__ = "3.1.2+affirm4"
__version__ = "3.1.2+affirm8"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: what happened to 5 through 7?

16 changes: 16 additions & 0 deletions resource-managers/kubernetes/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,22 @@
</exclusions>
</dependency>

<dependency>
<groupId>io.kubernetes</groupId>
<artifactId>client-java</artifactId>
<version>8.0.0</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
</exclusion>
</exclusions>
</dependency>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this ported from the mainline?


<!-- Required by kubernetes-client but we exclude it -->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@
package org.apache.spark.deploy.k8s

import java.io.File
import java.io.FileReader
import java.util.Locale

import com.google.common.base.Charsets
import com.google.common.io.Files
import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient}
import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient, OAuthTokenProvider}
import io.fabric8.kubernetes.client.Config.KUBERNETES_KUBECONFIG_FILE
import io.fabric8.kubernetes.client.Config.autoConfigure
import io.fabric8.kubernetes.client.utils.HttpClientUtils
import io.fabric8.kubernetes.client.utils.{HttpClientUtils, Utils}
import io.kubernetes.client.util.FilePersister
import io.kubernetes.client.util.KubeConfig
import okhttp3.Dispatcher

import org.apache.spark.SparkConf
Expand All @@ -31,13 +36,64 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.util.ThreadUtils

private[spark] class SparkOAuthTokenProvider(config: File) extends OAuthTokenProvider {
val kubeConfig = KubeConfig.loadKubeConfig(new FileReader(config))
val persister = new FilePersister(config)
kubeConfig.setPersistConfig(persister)

def getToken(): String = {
return kubeConfig.getAccessToken()
}
}

/**
* Spark-opinionated builder for Kubernetes clients. It uses a prefix plus common suffixes to
* parse configuration keys, similar to the manner in which Spark's SecurityManager parses SSL
* options for different components.
*/
private[spark] object SparkKubernetesClientFactory extends Logging {

/**
* Check if the code is being run from within kubernetes.
* @return
*/
def isOnKubernetes(): Boolean = {
val serviceHost = System.getenv("KUBERNETES_SERVICE_HOST")
return serviceHost != null && serviceHost.length > 0
}

def getHomeDir(): String = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where's this coming from?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what sets it, but it's always present on k8s containers. We have something similar in ml_pipelines

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant more are you writing this code from scratch or are you porting it from somewhere?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh it's coming from @daggertheog's diff for the spark 2 k8s auth issue

val osName = System.getProperty("os.name").toLowerCase(Locale.ROOT)
if (osName.startsWith("win")) {
val homeDrive = System.getenv("HOMEDRIVE")
val homePath = System.getenv("HOMEPATH")
if (homeDrive != null && !homeDrive.isEmpty() && homePath != null && !homePath.isEmpty()) {
val homeDir = homeDrive + homePath
val f = new File(homeDir)
if (f.exists() && f.isDirectory()) {
return homeDir
}
}
val userProfile = System.getenv("USERPROFILE")
if (userProfile != null && !userProfile.isEmpty()) {
val f = new File(userProfile)
if (f.exists() && f.isDirectory()) {
return userProfile
}
}
}
val home = System.getenv("HOME")
if (home != null && !home.isEmpty()) {
val f = new File(home)
if (f.exists() && f.isDirectory()) {
return home
}
}

//Fall back to user.home should never really get here
return System.getProperty("user.home", ".")
}

def createKubernetesClient(
master: String,
namespace: Option[String],
Expand Down Expand Up @@ -77,7 +133,23 @@ private[spark] object SparkKubernetesClientFactory extends Logging {
// Start from an auto-configured config with the desired context
// Fabric 8 uses null to indicate that the users current context should be used so if no
// explicit setting pass null
val config = new ConfigBuilder(autoConfigure(kubeContext.getOrElse(null)))
var builder: ConfigBuilder = new ConfigBuilder()
if (!isOnKubernetes()){
// Get the kubeconfig file
var fileName = Utils.getSystemPropertyOrEnvVar(KUBERNETES_KUBECONFIG_FILE, new File(getHomeDir(), ".kube" + File.separator + "config").toString())
// if system property/env var contains multiple files take the first one based on the environment
// we are running in (eg. : for Linux, ; for Windows)
val fileNames = fileName.split(File.pathSeparator)
if (fileNames.length > 1) {
fileName = fileNames(0)
}
val kubeConfigFile = new File(fileName)

builder = new ConfigBuilder(autoConfigure(null))
.withOauthTokenProvider(new SparkOAuthTokenProvider(kubeConfigFile))
}

val config = builder.withApiVersion("v1")
.withApiVersion("v1")
.withMasterUrl(master)
.withWebsocketPingInterval(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ object EvaluatePython {
}
}

private val module = "pyspark.sql.types"
private val module = "pyspark3.sql.types"

/**
* Pickler for StructType
Expand Down