Skip to content

Commit 5da6c4b

Browse files
author
Marcelo Vanzin
committed
[SPARK-16671][CORE][SQL] Consolidate code to do variable substitution.
Both core and sql have slightly different code that does variable substitution of config values. This change refactors that code and encapsulates the logic of reading config values and expading variables in a new helper class, which can be configured so that both core and sql can use it without losing existing functionality, and allows for easier testing and makes it easier to add more features in the future. Tested with existing and new unit tests, and by running spark-shell with some configs referencing variables and making sure it behaved as expected. Author: Marcelo Vanzin <[email protected]> Closes apache#14468 from vanzin/SPARK-16671.
1 parent 564fe61 commit 5da6c4b

File tree

9 files changed

+312
-228
lines changed

9 files changed

+312
-228
lines changed

core/src/main/scala/org/apache/spark/SparkConf.scala

+7-2
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import scala.collection.mutable.LinkedHashSet
2525
import org.apache.avro.{Schema, SchemaNormalization}
2626

2727
import org.apache.spark.internal.Logging
28-
import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry}
28+
import org.apache.spark.internal.config._
2929
import org.apache.spark.serializer.KryoSerializer
3030
import org.apache.spark.util.Utils
3131

@@ -56,6 +56,11 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
5656

5757
private val settings = new ConcurrentHashMap[String, String]()
5858

59+
private val reader = new ConfigReader(new SparkConfigProvider(settings))
60+
reader.bindEnv(new ConfigProvider {
61+
override def get(key: String): Option[String] = Option(getenv(key))
62+
})
63+
5964
if (loadDefaults) {
6065
loadFromSystemProperties(false)
6166
}
@@ -248,7 +253,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
248253
* - This will throw an exception is the config is not optional and the value is not set.
249254
*/
250255
private[spark] def get[T](entry: ConfigEntry[T]): T = {
251-
entry.readFrom(settings, getenv)
256+
entry.readFrom(reader)
252257
}
253258

254259
/**

core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala

+14-78
Original file line numberDiff line numberDiff line change
@@ -26,22 +26,9 @@ import org.apache.spark.SparkConf
2626
/**
2727
* An entry contains all meta information for a configuration.
2828
*
29-
* Config options created using this feature support variable expansion. If the config value
30-
* contains variable references of the form "${prefix:variableName}", the reference will be replaced
31-
* with the value of the variable depending on the prefix. The prefix can be one of:
32-
*
33-
* - no prefix: if the config key starts with "spark", looks for the value in the Spark config
34-
* - system: looks for the value in the system properties
35-
* - env: looks for the value in the environment
36-
*
37-
* So referencing "${spark.master}" will look for the value of "spark.master" in the Spark
38-
* configuration, while referencing "${env:MASTER}" will read the value from the "MASTER"
39-
* environment variable.
40-
*
41-
* For known Spark configuration keys (i.e. those created using `ConfigBuilder`), references
42-
* will also consider the default value when it exists.
43-
*
44-
* If the reference cannot be resolved, the original string will be retained.
29+
* When applying variable substitution to config values, only references starting with "spark." are
30+
* considered in the default namespace. For known Spark configuration keys (i.e. those created using
31+
* `ConfigBuilder`), references will also consider the default value when it exists.
4532
*
4633
* Variable expansion is also applied to the default values of config entries that have a default
4734
* value declared as a string.
@@ -72,21 +59,14 @@ private[spark] abstract class ConfigEntry[T] (
7259

7360
def defaultValueString: String
7461

75-
def readFrom(conf: JMap[String, String], getenv: String => String): T
62+
def readFrom(reader: ConfigReader): T
7663

7764
def defaultValue: Option[T] = None
7865

7966
override def toString: String = {
8067
s"ConfigEntry(key=$key, defaultValue=$defaultValueString, doc=$doc, public=$isPublic)"
8168
}
8269

83-
protected def readAndExpand(
84-
conf: JMap[String, String],
85-
getenv: String => String,
86-
usedRefs: Set[String] = Set()): Option[String] = {
87-
Option(conf.get(key)).map(expand(_, conf, getenv, usedRefs))
88-
}
89-
9070
}
9171

9272
private class ConfigEntryWithDefault[T] (
@@ -102,8 +82,8 @@ private class ConfigEntryWithDefault[T] (
10282

10383
override def defaultValueString: String = stringConverter(_defaultValue)
10484

105-
def readFrom(conf: JMap[String, String], getenv: String => String): T = {
106-
readAndExpand(conf, getenv).map(valueConverter).getOrElse(_defaultValue)
85+
def readFrom(reader: ConfigReader): T = {
86+
reader.get(key).map(valueConverter).getOrElse(_defaultValue)
10787
}
10888

10989
}
@@ -121,12 +101,9 @@ private class ConfigEntryWithDefaultString[T] (
121101

122102
override def defaultValueString: String = _defaultValue
123103

124-
def readFrom(conf: JMap[String, String], getenv: String => String): T = {
125-
Option(conf.get(key))
126-
.orElse(Some(_defaultValue))
127-
.map(ConfigEntry.expand(_, conf, getenv, Set()))
128-
.map(valueConverter)
129-
.get
104+
def readFrom(reader: ConfigReader): T = {
105+
val value = reader.get(key).getOrElse(reader.substitute(_defaultValue))
106+
valueConverter(value)
130107
}
131108

132109
}
@@ -146,8 +123,8 @@ private[spark] class OptionalConfigEntry[T](
146123

147124
override def defaultValueString: String = "<undefined>"
148125

149-
override def readFrom(conf: JMap[String, String], getenv: String => String): Option[T] = {
150-
readAndExpand(conf, getenv).map(rawValueConverter)
126+
override def readFrom(reader: ConfigReader): Option[T] = {
127+
reader.get(key).map(rawValueConverter)
151128
}
152129

153130
}
@@ -164,62 +141,21 @@ private class FallbackConfigEntry[T] (
164141

165142
override def defaultValueString: String = s"<value of ${fallback.key}>"
166143

167-
override def readFrom(conf: JMap[String, String], getenv: String => String): T = {
168-
Option(conf.get(key)).map(valueConverter).getOrElse(fallback.readFrom(conf, getenv))
144+
override def readFrom(reader: ConfigReader): T = {
145+
reader.get(key).map(valueConverter).getOrElse(fallback.readFrom(reader))
169146
}
170147

171148
}
172149

173-
private object ConfigEntry {
150+
private[spark] object ConfigEntry {
174151

175152
private val knownConfigs = new java.util.concurrent.ConcurrentHashMap[String, ConfigEntry[_]]()
176153

177-
private val REF_RE = "\\$\\{(?:(\\w+?):)?(\\S+?)\\}".r
178-
179154
def registerEntry(entry: ConfigEntry[_]): Unit = {
180155
val existing = knownConfigs.putIfAbsent(entry.key, entry)
181156
require(existing == null, s"Config entry ${entry.key} already registered!")
182157
}
183158

184159
def findEntry(key: String): ConfigEntry[_] = knownConfigs.get(key)
185160

186-
/**
187-
* Expand the `value` according to the rules explained in ConfigEntry.
188-
*/
189-
def expand(
190-
value: String,
191-
conf: JMap[String, String],
192-
getenv: String => String,
193-
usedRefs: Set[String]): String = {
194-
REF_RE.replaceAllIn(value, { m =>
195-
val prefix = m.group(1)
196-
val name = m.group(2)
197-
val replacement = prefix match {
198-
case null =>
199-
require(!usedRefs.contains(name), s"Circular reference in $value: $name")
200-
if (name.startsWith("spark.")) {
201-
Option(findEntry(name))
202-
.flatMap(_.readAndExpand(conf, getenv, usedRefs = usedRefs + name))
203-
.orElse(Option(conf.get(name)))
204-
.orElse(defaultValueString(name))
205-
} else {
206-
None
207-
}
208-
case "system" => sys.props.get(name)
209-
case "env" => Option(getenv(name))
210-
case _ => None
211-
}
212-
Regex.quoteReplacement(replacement.getOrElse(m.matched))
213-
})
214-
}
215-
216-
private def defaultValueString(key: String): Option[String] = {
217-
findEntry(key) match {
218-
case e: ConfigEntryWithDefault[_] => Some(e.defaultValueString)
219-
case e: ConfigEntryWithDefaultString[_] => Some(e.defaultValueString)
220-
case e: FallbackConfigEntry[_] => defaultValueString(e.fallback.key)
221-
case _ => None
222-
}
223-
}
224-
225161
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.internal.config
19+
20+
import java.util.{Map => JMap}
21+
22+
/**
23+
* A source of configuration values.
24+
*/
25+
private[spark] trait ConfigProvider {
26+
27+
def get(key: String): Option[String]
28+
29+
}
30+
31+
private[spark] class EnvProvider extends ConfigProvider {
32+
33+
override def get(key: String): Option[String] = sys.env.get(key)
34+
35+
}
36+
37+
private[spark] class SystemProvider extends ConfigProvider {
38+
39+
override def get(key: String): Option[String] = sys.props.get(key)
40+
41+
}
42+
43+
private[spark] class MapProvider(conf: JMap[String, String]) extends ConfigProvider {
44+
45+
override def get(key: String): Option[String] = Option(conf.get(key))
46+
47+
}
48+
49+
/**
50+
* A config provider that only reads Spark config keys, and considers default values for known
51+
* configs when fetching configuration values.
52+
*/
53+
private[spark] class SparkConfigProvider(conf: JMap[String, String]) extends ConfigProvider {
54+
55+
import ConfigEntry._
56+
57+
override def get(key: String): Option[String] = {
58+
if (key.startsWith("spark.")) {
59+
Option(conf.get(key)).orElse(defaultValueString(key))
60+
} else {
61+
None
62+
}
63+
}
64+
65+
private def defaultValueString(key: String): Option[String] = {
66+
findEntry(key) match {
67+
case e: ConfigEntryWithDefault[_] => Option(e.defaultValueString)
68+
case e: ConfigEntryWithDefaultString[_] => Option(e.defaultValueString)
69+
case e: FallbackConfigEntry[_] => defaultValueString(e.fallback.key)
70+
case _ => None
71+
}
72+
}
73+
74+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.internal.config
19+
20+
import java.util.{Map => JMap}
21+
import java.util.regex.Pattern
22+
23+
import scala.collection.mutable.HashMap
24+
import scala.util.matching.Regex
25+
26+
private object ConfigReader {
27+
28+
private val REF_RE = "\\$\\{(?:(\\w+?):)?(\\S+?)\\}".r
29+
30+
}
31+
32+
/**
33+
* A helper class for reading config entries and performing variable substitution.
34+
*
35+
* If a config value contains variable references of the form "${prefix:variableName}", the
36+
* reference will be replaced with the value of the variable depending on the prefix. By default,
37+
* the following prefixes are handled:
38+
*
39+
* - no prefix: use the default config provider
40+
* - system: looks for the value in the system properties
41+
* - env: looks for the value in the environment
42+
*
43+
* Different prefixes can be bound to a `ConfigProvider`, which is used to read configuration
44+
* values from the data source for the prefix, and both the system and env providers can be
45+
* overridden.
46+
*
47+
* If the reference cannot be resolved, the original string will be retained.
48+
*
49+
* @param conf The config provider for the default namespace (no prefix).
50+
*/
51+
private[spark] class ConfigReader(conf: ConfigProvider) {
52+
53+
def this(conf: JMap[String, String]) = this(new MapProvider(conf))
54+
55+
private val bindings = new HashMap[String, ConfigProvider]()
56+
bind(null, conf)
57+
bindEnv(new EnvProvider())
58+
bindSystem(new SystemProvider())
59+
60+
/**
61+
* Binds a prefix to a provider. This method is not thread-safe and should be called
62+
* before the instance is used to expand values.
63+
*/
64+
def bind(prefix: String, provider: ConfigProvider): ConfigReader = {
65+
bindings(prefix) = provider
66+
this
67+
}
68+
69+
def bind(prefix: String, values: JMap[String, String]): ConfigReader = {
70+
bind(prefix, new MapProvider(values))
71+
}
72+
73+
def bindEnv(provider: ConfigProvider): ConfigReader = bind("env", provider)
74+
75+
def bindSystem(provider: ConfigProvider): ConfigReader = bind("system", provider)
76+
77+
/**
78+
* Reads a configuration key from the default provider, and apply variable substitution.
79+
*/
80+
def get(key: String): Option[String] = conf.get(key).map(substitute)
81+
82+
/**
83+
* Perform variable substitution on the given input string.
84+
*/
85+
def substitute(input: String): String = substitute(input, Set())
86+
87+
private def substitute(input: String, usedRefs: Set[String]): String = {
88+
if (input != null) {
89+
ConfigReader.REF_RE.replaceAllIn(input, { m =>
90+
val prefix = m.group(1)
91+
val name = m.group(2)
92+
val ref = if (prefix == null) name else s"$prefix:$name"
93+
require(!usedRefs.contains(ref), s"Circular reference in $input: $ref")
94+
95+
val replacement = bindings.get(prefix)
96+
.flatMap(_.get(name))
97+
.map { v => substitute(v, usedRefs + ref) }
98+
.getOrElse(m.matched)
99+
Regex.quoteReplacement(replacement)
100+
})
101+
} else {
102+
input
103+
}
104+
}
105+
106+
}

0 commit comments

Comments
 (0)