Skip to content

Commit

Permalink
[MINOR] improvement(spark-client): put sparkConf as extra properties …
Browse files Browse the repository at this point in the history
…while client request accessCluster (#2254)

### What changes were proposed in this pull request?

put sparkConf as extra properties while client request accessCluster

### Why are the changes needed?

Coordinator can let the spark application access rss cluster or not by some customized access checker leverage some of the spark configs.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

No need
  • Loading branch information
maobaolong authored Nov 26, 2024
1 parent b7d391c commit 204e4e4
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.shuffle;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import scala.Tuple2;
Expand Down Expand Up @@ -515,4 +517,15 @@ public static RssConf toRssConf(SparkConf sparkConf) {
}
return rssConf;
}

public static Map<String, String> sparkConfToMap(SparkConf sparkConf) {
Map<String, String> map = new HashMap<>();

for (Tuple2<String, String> tuple : sparkConf.getAll()) {
String key = tuple._1;
map.put(key, tuple._2);
}

return map;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import scala.Tuple2;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -1064,7 +1062,7 @@ protected void registerShuffleServers(
}
LOG.info("Start to register shuffleId {}", shuffleId);
long start = System.currentTimeMillis();
Map<String, String> sparkConfMap = sparkConfToMap(getSparkConf());
Map<String, String> sparkConfMap = RssSparkConfig.sparkConfToMap(getSparkConf());
serverToPartitionRanges.entrySet().stream()
.forEach(
entry -> {
Expand Down Expand Up @@ -1095,7 +1093,7 @@ protected void registerShuffleServers(
}
LOG.info("Start to register shuffleId[{}]", shuffleId);
long start = System.currentTimeMillis();
Map<String, String> sparkConfMap = sparkConfToMap(getSparkConf());
Map<String, String> sparkConfMap = RssSparkConfig.sparkConfToMap(getSparkConf());
Set<Map.Entry<ShuffleServerInfo, List<PartitionRange>>> entries =
serverToPartitionRanges.entrySet();
entries.stream()
Expand Down Expand Up @@ -1141,15 +1139,4 @@ public boolean isRssStageRetryForFetchFailureEnabled() {
public SparkConf getSparkConf() {
return sparkConf;
}

public Map<String, String> sparkConfToMap(SparkConf sparkConf) {
Map<String, String> map = new HashMap<>();

for (Tuple2<String, String> tuple : sparkConf.getAll()) {
String key = tuple._1;
map.put(key, tuple._2);
}

return map;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.shuffle;

import java.util.List;
import java.util.Map;
import java.util.Set;

Expand All @@ -32,6 +33,8 @@
import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcRetryableClient;
import org.apache.uniffle.client.request.RssAccessClusterRequest;
import org.apache.uniffle.client.response.RssAccessClusterResponse;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.Constants;
Expand Down Expand Up @@ -127,6 +130,13 @@ private boolean tryAccessCluster() {
extraProperties.put(
ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, String.valueOf(assignmentShuffleNodesNum));

RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
List<String> excludeProperties =
rssConf.get(RssClientConf.RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES);
rssConf.getAll().stream()
.filter(entry -> !excludeProperties.contains(entry.getKey()))
.forEach(entry -> extraProperties.put(entry.getKey(), (String) entry.getValue()));

Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf);
try {
if (coordinatorClient != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.shuffle;

import java.util.List;
import java.util.Map;
import java.util.Set;

Expand All @@ -32,6 +33,8 @@
import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcRetryableClient;
import org.apache.uniffle.client.request.RssAccessClusterRequest;
import org.apache.uniffle.client.response.RssAccessClusterResponse;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.Constants;
Expand Down Expand Up @@ -131,6 +134,13 @@ private boolean tryAccessCluster() {
extraProperties.put(
ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, String.valueOf(assignmentShuffleNodesNum));

RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
List<String> excludeProperties =
rssConf.get(RssClientConf.RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES);
rssConf.getAll().stream()
.filter(entry -> !excludeProperties.contains(entry.getKey()))
.forEach(entry -> extraProperties.put(entry.getKey(), (String) entry.getValue()));

Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf);
try {
if (coordinatorClient != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,4 +303,11 @@ public class RssClientConf {
.withDescription(
"The block id manager class of server for this application, "
+ "the implementation of this interface to manage the shuffle block ids");

public static final ConfigOption<List<String>> RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES =
ConfigOptions.key("rss.client.reportExcludeProperties")
.stringType()
.asList()
.defaultValues()
.withDescription("the report exclude properties could be configured by this option");
}

0 comments on commit 204e4e4

Please sign in to comment.