Skip to content

Commit

Permalink
[MINOR] feat(spark-client): Support set accessId by another config dy…
Browse files Browse the repository at this point in the history
…namically (#2250)

### What changes were proposed in this pull request?

Support set accessId by another config dynamically.

### Why are the changes needed?

Without this PR, we have to set accessId for each user belongs to `spark.yarn.queue`, after patch this feature, the `spark.access.id` set to the value of `spark.yarn.queue`, if we set `spark.rss.access.id.provider.key=spark.yarn.queue`

### Does this PR introduce _any_ user-facing change?

Introduce  spark new config `spark.rss.access.id.provider.key`.

### How was this patch tested?

Updated UT.
  • Loading branch information
maobaolong authored and jerqi committed Dec 4, 2024
1 parent 6951e36 commit 7fd296a
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,9 @@ public class RssSparkConfig {

public static final ConfigEntry<String> RSS_ACCESS_ID =
createStringBuilder(new ConfigBuilder("spark.rss.access.id")).createWithDefault("");
public static final ConfigEntry<String> RSS_ACCESS_ID_PROVIDER_KEY =
createStringBuilder(new ConfigBuilder("spark.rss.access.id.providerKey"))
.createWithDefault("");

public static final ConfigEntry<Integer> RSS_ACCESS_TIMEOUT_MS =
createIntegerBuilder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,11 @@ private ShuffleManager createShuffleManagerInDriver() throws RssException {
private boolean tryAccessCluster() {
String accessId = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(), "").trim();
if (StringUtils.isEmpty(accessId)) {
LOG.warn("Access id key is empty");
return false;
String providerKey = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(), "");
if (StringUtils.isNotEmpty(accessId)) {
accessId = sparkConf.get(providerKey, "");
LOG.info("Get access id {} from provider key: {}", accessId, providerKey);
}
}
long retryInterval = sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS);
int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ private ShuffleManager createShuffleManagerInDriver() throws RssException {

private boolean tryAccessCluster() {
String accessId = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(), "").trim();
if (StringUtils.isEmpty(accessId)) {
String providerKey = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID_PROVIDER_KEY.key(), "");
if (StringUtils.isNotEmpty(accessId)) {
accessId = sparkConf.get(providerKey, "");
LOG.info("Get access id {} from provider key: {}", accessId, providerKey);
}
}
if (StringUtils.isEmpty(accessId)) {
LOG.warn("Access id key is empty");
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,27 @@ public void testCreateInDriver() throws Exception {

SparkConf conf = new SparkConf();
assertCreateSortShuffleManager(conf);
conf = new SparkConf();
conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
conf.set("spark.foo.bar.key", "mockId");
conf.set(RssSparkConfig.RSS_ACCESS_ID_PROVIDER_KEY.key(), "spark.foo.bar.key");
assertCreateSortShuffleManager(conf);

conf = new SparkConf();
conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId");
assertCreateSortShuffleManager(conf);

conf = new SparkConf();
conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId");
conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
conf.set("spark.rss.storage.type", StorageType.LOCALFILE.name());
conf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE, true);
assertCreateRssShuffleManager(conf);

conf = new SparkConf();
conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
assertCreateSortShuffleManager(conf);
}
Expand Down
2 changes: 2 additions & 0 deletions docs/client_guide/spark_client_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ The important configuration is listed as following.
| spark.rss.client.off.heap.memory.enable | false | The client use off heap memory to process data |
| spark.rss.client.remote.storage.useLocalConfAsDefault | false | This option is only valid when the remote storage path is specified. If ture, the remote storage conf will use the client side hadoop configuration loaded from the classpath |
| spark.rss.hadoop.* | - | The prefix key for Hadoop conf. For Spark like that: `spark.rss.hadoop.fs.defaultFS=hdfs://rbf-x1`, this will be as `fs.defaultFS=hdfs://rbf-x1` for Hadoop storage |
| spark.rss.access.id | - | The access id for request access rss cluster. This is used for DelegationRssShuffleManager |
| spark.rss.access.id.providerKey | - | Get access id from the value of the given provider key. This is used for DelegationRssShuffleManager |

### Block id bits

Expand Down

0 comments on commit 7fd296a

Please sign in to comment.