Skip to content

Commit ab648c0

Browse files
jerryshaoMarcelo Vanzin
authored and
Marcelo Vanzin
committed
[SPARK-14743][YARN] Add a configurable credential manager for Spark running on YARN
## What changes were proposed in this pull request? Add a configurable token manager for Spark on running on yarn. ### Current Problems ### 1. Supported token provider is hard-coded, currently only hdfs, hbase and hive are supported and it is impossible for user to add new token provider without code changes. 2. Also this problem exits in timely token renewer and updater. ### Changes In This Proposal ### In this proposal, to address the problems mentioned above and make the current code more cleaner and easier to understand, mainly has 3 changes: 1. Abstract a `ServiceTokenProvider` as well as `ServiceTokenRenewable` interface for token provider. Each service wants to communicate with Spark through token way needs to implement this interface. 2. Provide a `ConfigurableTokenManager` to manage all the register token providers, also token renewer and updater. Also this class offers the API for other modules to obtain tokens, get renewal interval and so on. 3. Implement 3 built-in token providers `HDFSTokenProvider`, `HiveTokenProvider` and `HBaseTokenProvider` to keep the same semantics as supported today. Whether to load in these built-in token providers is controlled by configuration "spark.yarn.security.tokens.${service}.enabled", by default for all the built-in token providers are loaded. ### Behavior Changes ### For the end user there's no behavior change, we still use the same configuration `spark.yarn.security.tokens.${service}.enabled` to decide which token provider is enabled (hbase or hive). For user implemented token provider (assume the name of token provider is "test") needs to add into this class should have two configurations: 1. `spark.yarn.security.tokens.test.enabled` to true 2. `spark.yarn.security.tokens.test.class` to the full qualified class name. So we still keep the same semantics as current code while add one new configuration. ### Current Status ### - [x] token provider interface and management framework. - [x] implement built-in token providers (hdfs, hbase, hive). - [x] Coverage of unit test. - [x] Integrated test with security cluster. ## How was this patch tested? Unit test and integrated test. Please suggest and review, any comment is greatly appreciated. Author: jerryshao <[email protected]> Closes apache#14065 from jerryshao/SPARK-16342.
1 parent bd2c12f commit ab648c0

24 files changed

+985
-573
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

+6-32
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,19 @@
1717

1818
package org.apache.spark.deploy
1919

20-
import java.io.{ByteArrayInputStream, DataInputStream, IOException}
20+
import java.io.IOException
2121
import java.lang.reflect.Method
2222
import java.security.PrivilegedExceptionAction
2323
import java.text.DateFormat
2424
import java.util.{Arrays, Comparator, Date}
2525

2626
import scala.collection.JavaConverters._
27-
import scala.concurrent.duration._
2827
import scala.util.control.NonFatal
2928

3029
import com.google.common.primitives.Longs
3130
import org.apache.hadoop.conf.Configuration
3231
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
3332
import org.apache.hadoop.fs.FileSystem.Statistics
34-
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
3533
import org.apache.hadoop.mapred.JobConf
3634
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
3735
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
@@ -40,7 +38,6 @@ import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdenti
4038
import org.apache.spark.{SparkConf, SparkException}
4139
import org.apache.spark.annotation.DeveloperApi
4240
import org.apache.spark.internal.Logging
43-
import org.apache.spark.internal.config._
4441
import org.apache.spark.util.Utils
4542

4643
/**
@@ -277,29 +274,6 @@ class SparkHadoopUtil extends Logging {
277274
}
278275
}
279276

280-
/**
281-
* How much time is remaining (in millis) from now to (fraction * renewal time for the token that
282-
* is valid the latest)?
283-
* This will return -ve (or 0) value if the fraction of validity has already expired.
284-
*/
285-
def getTimeFromNowToRenewal(
286-
sparkConf: SparkConf,
287-
fraction: Double,
288-
credentials: Credentials): Long = {
289-
val now = System.currentTimeMillis()
290-
291-
val renewalInterval = sparkConf.get(TOKEN_RENEWAL_INTERVAL).get
292-
293-
credentials.getAllTokens.asScala
294-
.filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
295-
.map { t =>
296-
val identifier = new DelegationTokenIdentifier()
297-
identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
298-
(identifier.getIssueDate + fraction * renewalInterval).toLong - now
299-
}.foldLeft(0L)(math.max)
300-
}
301-
302-
303277
private[spark] def getSuffixForCredentialsPath(credentialsPath: Path): Int = {
304278
val fileName = credentialsPath.getName
305279
fileName.substring(
@@ -337,15 +311,15 @@ class SparkHadoopUtil extends Logging {
337311
}
338312

339313
/**
340-
* Start a thread to periodically update the current user's credentials with new delegation
341-
* tokens so that writes to HDFS do not fail.
314+
* Start a thread to periodically update the current user's credentials with new credentials so
315+
* that access to secured service does not fail.
342316
*/
343-
private[spark] def startExecutorDelegationTokenRenewer(conf: SparkConf) {}
317+
private[spark] def startCredentialUpdater(conf: SparkConf) {}
344318

345319
/**
346-
* Stop the thread that does the delegation token updates.
320+
* Stop the thread that does the credential updates.
347321
*/
348-
private[spark] def stopExecutorDelegationTokenRenewer() {}
322+
private[spark] def stopCredentialUpdater() {}
349323

350324
/**
351325
* Return a fresh Hadoop configuration, bypassing the HDFS cache mechanism.

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
203203
if (driverConf.contains("spark.yarn.credentials.file")) {
204204
logInfo("Will periodically update credentials from: " +
205205
driverConf.get("spark.yarn.credentials.file"))
206-
SparkHadoopUtil.get.startExecutorDelegationTokenRenewer(driverConf)
206+
SparkHadoopUtil.get.startCredentialUpdater(driverConf)
207207
}
208208

209209
val env = SparkEnv.createExecutorEnv(
@@ -215,7 +215,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
215215
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
216216
}
217217
env.rpcEnv.awaitTermination()
218-
SparkHadoopUtil.get.stopExecutorDelegationTokenRenewer()
218+
SparkHadoopUtil.get.stopCredentialUpdater()
219219
}
220220
}
221221

core/src/main/scala/org/apache/spark/internal/config/package.scala

-7
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.spark.internal
1919

20-
import java.util.concurrent.TimeUnit
21-
2220
import org.apache.spark.launcher.SparkLauncher
2321
import org.apache.spark.network.util.ByteUnit
2422

@@ -82,11 +80,6 @@ package object config {
8280
.doc("Name of the Kerberos principal.")
8381
.stringConf.createOptional
8482

85-
private[spark] val TOKEN_RENEWAL_INTERVAL = ConfigBuilder("spark.yarn.token.renewal.interval")
86-
.internal()
87-
.timeConf(TimeUnit.MILLISECONDS)
88-
.createOptional
89-
9083
private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances")
9184
.intConf
9285
.createOptional

dev/.rat-excludes

+1
Original file line numberDiff line numberDiff line change
@@ -100,3 +100,4 @@ spark-deps-.*
100100
org.apache.spark.scheduler.ExternalClusterManager
101101
.*\.sql
102102
.Rbuildignore
103+
org.apache.spark.deploy.yarn.security.ServiceCredentialProvider

docs/running-on-yarn.md

+14-8
Original file line numberDiff line numberDiff line change
@@ -461,15 +461,14 @@ To use a custom metrics.properties for the application master and executors, upd
461461
</td>
462462
</tr>
463463
<tr>
464-
<td><code>spark.yarn.security.tokens.${service}.enabled</code></td>
464+
<td><code>spark.yarn.security.credentials.${service}.enabled</code></td>
465465
<td><code>true</code></td>
466466
<td>
467-
Controls whether to retrieve delegation tokens for non-HDFS services when security is enabled.
468-
By default, delegation tokens for all supported services are retrieved when those services are
467+
Controls whether to obtain credentials for services when security is enabled.
468+
By default, credentials for all supported services are retrieved when those services are
469469
configured, but it's possible to disable that behavior if it somehow conflicts with the
470-
application being run.
471-
<p/>
472-
Currently supported services are: <code>hive</code>, <code>hbase</code>
470+
application being run. For further details please see
471+
[Running in a Secure Cluster](running-on-yarn.html#running-in-a-secure-cluster)
473472
</td>
474473
</tr>
475474
<tr>
@@ -525,11 +524,11 @@ token for the cluster's HDFS filesystem, and potentially for HBase and Hive.
525524

526525
An HBase token will be obtained if HBase is in on classpath, the HBase configuration declares
527526
the application is secure (i.e. `hbase-site.xml` sets `hbase.security.authentication` to `kerberos`),
528-
and `spark.yarn.security.tokens.hbase.enabled` is not set to `false`.
527+
and `spark.yarn.security.credentials.hbase.enabled` is not set to `false`.
529528

530529
Similarly, a Hive token will be obtained if Hive is on the classpath, its configuration
531530
includes a URI of the metadata store in `"hive.metastore.uris`, and
532-
`spark.yarn.security.tokens.hive.enabled` is not set to `false`.
531+
`spark.yarn.security.credentials.hive.enabled` is not set to `false`.
533532

534533
If an application needs to interact with other secure HDFS clusters, then
535534
the tokens needed to access these clusters must be explicitly requested at
@@ -539,6 +538,13 @@ launch time. This is done by listing them in the `spark.yarn.access.namenodes` p
539538
spark.yarn.access.namenodes hdfs://ireland.example.org:8020/,hdfs://frankfurt.example.org:8020/
540539
```
541540

541+
Spark supports integrating with other security-aware services through Java Services mechanism (see
542+
`java.util.ServiceLoader`). To do that, implementations of `org.apache.spark.deploy.yarn.security.ServiceCredentialProvider`
543+
should be available to Spark by listing their names in the corresponding file in the jar's
544+
`META-INF/services` directory. These plug-ins can be disabled by setting
545+
`spark.yarn.security.tokens.{service}.enabled` to `false`, where `{service}` is the name of
546+
credential provider.
547+
542548
## Configuring the External Shuffle Service
543549

544550
To start the Spark Shuffle Service on each `NodeManager` in your YARN cluster, follow these

project/MimaExcludes.scala

+4-1
Original file line numberDiff line numberDiff line change
@@ -784,7 +784,10 @@ object MimaExcludes {
784784
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.SQLContext.jdbc"),
785785
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.SQLContext.parquetFile"),
786786
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.SQLContext.applySchema")
787-
)
787+
) ++ Seq(
788+
// [SPARK-14743] Improve delegation token handling in secure cluster
789+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkHadoopUtil.getTimeFromNowToRenewal")
790+
)
788791
}
789792

790793
def excludes(version: String) = version match {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
org.apache.spark.deploy.yarn.security.HDFSCredentialProvider
2+
org.apache.spark.deploy.yarn.security.HBaseCredentialProvider
3+
org.apache.spark.deploy.yarn.security.HiveCredentialProvider

yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

+9-4
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.spark._
3535
import org.apache.spark.deploy.SparkHadoopUtil
3636
import org.apache.spark.deploy.history.HistoryServer
3737
import org.apache.spark.deploy.yarn.config._
38+
import org.apache.spark.deploy.yarn.security.{AMCredentialRenewer, ConfigurableCredentialManager}
3839
import org.apache.spark.internal.Logging
3940
import org.apache.spark.internal.config._
4041
import org.apache.spark.rpc._
@@ -112,7 +113,7 @@ private[spark] class ApplicationMaster(
112113
// Fields used in cluster mode.
113114
private val sparkContextRef = new AtomicReference[SparkContext](null)
114115

115-
private var delegationTokenRenewerOption: Option[AMDelegationTokenRenewer] = None
116+
private var credentialRenewer: AMCredentialRenewer = _
116117

117118
// Load the list of localized files set by the client. This is used when launching executors,
118119
// and is loaded here so that these configs don't pollute the Web UI's environment page in
@@ -235,10 +236,11 @@ private[spark] class ApplicationMaster(
235236
// If the credentials file config is present, we must periodically renew tokens. So create
236237
// a new AMDelegationTokenRenewer
237238
if (sparkConf.contains(CREDENTIALS_FILE_PATH.key)) {
238-
delegationTokenRenewerOption = Some(new AMDelegationTokenRenewer(sparkConf, yarnConf))
239239
// If a principal and keytab have been set, use that to create new credentials for executors
240240
// periodically
241-
delegationTokenRenewerOption.foreach(_.scheduleLoginFromKeytab())
241+
credentialRenewer =
242+
new ConfigurableCredentialManager(sparkConf, yarnConf).credentialRenewer()
243+
credentialRenewer.scheduleLoginFromKeytab()
242244
}
243245

244246
if (isClusterMode) {
@@ -305,7 +307,10 @@ private[spark] class ApplicationMaster(
305307
logDebug("shutting down user thread")
306308
userClassThread.interrupt()
307309
}
308-
if (!inShutdown) delegationTokenRenewerOption.foreach(_.stop())
310+
if (!inShutdown && credentialRenewer != null) {
311+
credentialRenewer.stop()
312+
credentialRenewer = null
313+
}
309314
}
310315
}
311316
}

yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

+29-34
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@
1717

1818
package org.apache.spark.deploy.yarn
1919

20-
import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream, IOException,
21-
OutputStreamWriter}
20+
import java.io.{File, FileOutputStream, IOException, OutputStreamWriter}
2221
import java.net.{InetAddress, UnknownHostException, URI}
2322
import java.nio.ByteBuffer
2423
import java.nio.charset.StandardCharsets
@@ -35,7 +34,6 @@ import com.google.common.io.Files
3534
import org.apache.hadoop.conf.Configuration
3635
import org.apache.hadoop.fs._
3736
import org.apache.hadoop.fs.permission.FsPermission
38-
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
3937
import org.apache.hadoop.io.DataOutputBuffer
4038
import org.apache.hadoop.mapreduce.MRJobConfig
4139
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
@@ -52,6 +50,7 @@ import org.apache.hadoop.yarn.util.Records
5250
import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException}
5351
import org.apache.spark.deploy.SparkHadoopUtil
5452
import org.apache.spark.deploy.yarn.config._
53+
import org.apache.spark.deploy.yarn.security.ConfigurableCredentialManager
5554
import org.apache.spark.internal.Logging
5655
import org.apache.spark.internal.config._
5756
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils}
@@ -122,6 +121,8 @@ private[spark] class Client(
122121
private val appStagingBaseDir = sparkConf.get(STAGING_DIR).map { new Path(_) }
123122
.getOrElse(FileSystem.get(hadoopConf).getHomeDirectory())
124123

124+
private val credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
125+
125126
def reportLauncherState(state: SparkAppHandle.State): Unit = {
126127
launcherBackend.setState(state)
127128
}
@@ -390,8 +391,31 @@ private[spark] class Client(
390391
// Upload Spark and the application JAR to the remote file system if necessary,
391392
// and add them as local resources to the application master.
392393
val fs = destDir.getFileSystem(hadoopConf)
393-
val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + destDir
394-
YarnSparkHadoopUtil.get.obtainTokensForNamenodes(nns, hadoopConf, credentials)
394+
395+
// Merge credentials obtained from registered providers
396+
val nearestTimeOfNextRenewal = credentialManager.obtainCredentials(hadoopConf, credentials)
397+
398+
if (credentials != null) {
399+
logDebug(YarnSparkHadoopUtil.get.dumpTokens(credentials).mkString("\n"))
400+
}
401+
402+
// If we use principal and keytab to login, also credentials can be renewed some time
403+
// after current time, we should pass the next renewal and updating time to credential
404+
// renewer and updater.
405+
if (loginFromKeytab && nearestTimeOfNextRenewal > System.currentTimeMillis() &&
406+
nearestTimeOfNextRenewal != Long.MaxValue) {
407+
408+
// Valid renewal time is 75% of next renewal time, and the valid update time will be
409+
// slightly later then renewal time (80% of next renewal time). This is to make sure
410+
// credentials are renewed and updated before expired.
411+
val currTime = System.currentTimeMillis()
412+
val renewalTime = (nearestTimeOfNextRenewal - currTime) * 0.75 + currTime
413+
val updateTime = (nearestTimeOfNextRenewal - currTime) * 0.8 + currTime
414+
415+
sparkConf.set(CREDENTIALS_RENEWAL_TIME, renewalTime.toLong)
416+
sparkConf.set(CREDENTIALS_UPDATE_TIME, updateTime.toLong)
417+
}
418+
395419
// Used to keep track of URIs added to the distributed cache. If the same URI is added
396420
// multiple times, YARN will fail to launch containers for the app with an internal
397421
// error.
@@ -400,11 +424,6 @@ private[spark] class Client(
400424
// same name but different path files are added multiple time, YARN will fail to launch
401425
// containers for the app with an internal error.
402426
val distributedNames = new HashSet[String]
403-
YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(sparkConf, hadoopConf, credentials)
404-
YarnSparkHadoopUtil.get.obtainTokenForHBase(sparkConf, hadoopConf, credentials)
405-
if (credentials != null) {
406-
logDebug(YarnSparkHadoopUtil.get.dumpTokens(credentials).mkString("\n"))
407-
}
408427

409428
val replication = sparkConf.get(STAGING_FILE_REPLICATION).map(_.toShort)
410429
.getOrElse(fs.getDefaultReplication(destDir))
@@ -716,28 +735,6 @@ private[spark] class Client(
716735
confArchive
717736
}
718737

719-
/**
720-
* Get the renewal interval for tokens.
721-
*/
722-
private def getTokenRenewalInterval(stagingDirPath: Path): Long = {
723-
// We cannot use the tokens generated above since those have renewer yarn. Trying to renew
724-
// those will fail with an access control issue. So create new tokens with the logged in
725-
// user as renewer.
726-
val creds = new Credentials()
727-
val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + stagingDirPath
728-
YarnSparkHadoopUtil.get.obtainTokensForNamenodes(
729-
nns, hadoopConf, creds, sparkConf.get(PRINCIPAL))
730-
val t = creds.getAllTokens.asScala
731-
.filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
732-
.head
733-
val newExpiration = t.renew(hadoopConf)
734-
val identifier = new DelegationTokenIdentifier()
735-
identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
736-
val interval = newExpiration - identifier.getIssueDate
737-
logInfo(s"Renewal Interval set to $interval")
738-
interval
739-
}
740-
741738
/**
742739
* Set up the environment for launching our ApplicationMaster container.
743740
*/
@@ -754,8 +751,6 @@ private[spark] class Client(
754751
val credentialsFile = "credentials-" + UUID.randomUUID().toString
755752
sparkConf.set(CREDENTIALS_FILE_PATH, new Path(stagingDirPath, credentialsFile).toString)
756753
logInfo(s"Credentials file set to: $credentialsFile")
757-
val renewalInterval = getTokenRenewalInterval(stagingDirPath)
758-
sparkConf.set(TOKEN_RENEWAL_INTERVAL, renewalInterval)
759754
}
760755

761756
// Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.*

0 commit comments

Comments
 (0)