Skip to content

Commit

Permalink
Capture all spark conf parameter
Browse files Browse the repository at this point in the history
Use the redact method from spark to not capture any secrets
  • Loading branch information
paul-laffon-dd committed Jun 26, 2024
1 parent 2d36ca7 commit 3c56d7d
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 47 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package datadog.trace.instrumentation.spark;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.StageInfo;
import org.apache.spark.sql.execution.SparkPlanInfo;
import org.apache.spark.sql.execution.metric.SQLMetricInfo;
import org.apache.spark.util.Utils;
import scala.Tuple2;
import scala.collection.JavaConverters;
import scala.collection.Seq;

/**
* DatadogSparkListener compiled for Scala 2.12
Expand Down Expand Up @@ -62,4 +66,11 @@ protected int[] getStageParentIds(StageInfo info) {

return parentIds;
}

@Override
protected List<Tuple2<String, String>> getRedactedSparkConf(SparkConf conf) {
Seq<Tuple2<String, String>> allConf =
JavaConverters.asScalaBuffer(Arrays.asList(conf.getAll())).toSeq();
return JavaConverters.seqAsJavaList(Utils.redact(conf, allConf));
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package datadog.trace.instrumentation.spark;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.StageInfo;
import org.apache.spark.sql.execution.SparkPlanInfo;
import org.apache.spark.sql.execution.metric.SQLMetricInfo;
import org.apache.spark.util.Utils;
import scala.Tuple2;
import scala.collection.immutable.Seq;
import scala.jdk.javaapi.CollectionConverters;

/**
Expand Down Expand Up @@ -62,4 +66,11 @@ protected int[] getStageParentIds(StageInfo info) {

return parentIds;
}

@Override
protected List<Tuple2<String, String>> getRedactedSparkConf(SparkConf conf) {
Seq<Tuple2<String, String>> allConf =
CollectionConverters.asScala(Arrays.asList(conf.getAll())).toSeq();
return CollectionConverters.asJava(Utils.redact(conf, allConf));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
/** Parent Ids of a Stage. Provide an implementation based on a specific scala version */
protected abstract int[] getStageParentIds(StageInfo info);

protected abstract List<Tuple2<String, String>> getRedactedSparkConf(SparkConf conf);

@Override
public synchronized void onApplicationStart(SparkListenerApplicationStart applicationStart) {
this.applicationStart = applicationStart;
Expand Down Expand Up @@ -242,8 +244,7 @@ private AgentSpan getOrCreateStreamingBatchSpan(
AgentTracer.SpanBuilder builder =
buildSparkSpan("spark.streaming_batch", jobProperties).withStartTimestamp(timeMs * 1000);

// Streaming spans will always be the root span, capturing all parameters on those
captureApplicationParameters(builder);
// Streaming spans will always be the root span, capturing job parameters on those
captureJobParameters(builder, jobProperties);

if (isRunningOnDatabricks) {
Expand All @@ -258,9 +259,8 @@ private AgentSpan getOrCreateStreamingBatchSpan(

private void addDatabricksSpecificTags(
AgentTracer.SpanBuilder builder, Properties properties, boolean withParentContext) {
// In databricks, there is no application span. Adding the spark conf parameters to the top
// level spark span
captureApplicationParameters(builder);
// In databricks, there is no application span. Adding the parameters to the top level spark
// span
captureJobParameters(builder, properties);

if (properties != null) {
Expand Down Expand Up @@ -369,9 +369,6 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) {

jobSpanBuilder.withTag(DDTags.RESOURCE_NAME, getSparkJobName(jobStart));

// Some properties can change at runtime, so capturing properties of all jobs
captureJobParameters(jobSpanBuilder, jobStart.properties());

AgentSpan jobSpan = jobSpanBuilder.start();
setDataJobsSamplingPriority(jobSpan);
jobSpan.setMeasured(true);
Expand Down Expand Up @@ -1029,10 +1026,8 @@ private long computeCurrentAvailableExecutorTime(long time) {
}

private void captureApplicationParameters(AgentTracer.SpanBuilder builder) {
for (Tuple2<String, String> conf : sparkConf.getAll()) {
if (SparkConfAllowList.canCaptureApplicationParameter(conf._1)) {
builder.withTag("config." + conf._1.replace(".", "_"), conf._2);
}
for (Tuple2<String, String> conf : getRedactedSparkConf(sparkConf)) {
builder.withTag("config." + conf._1.replace(".", "_"), conf._2);
}
builder.withTag("config.spark_version", sparkVersion);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,29 @@
* @see <a href="https://spark.apache.org/docs/latest/configuration.html">Spark Configuration</a>
*/
class SparkConfAllowList {

/**
* Application parameters defined at the start of the application, with the spark-submit command
* Job-specific parameters that can be used to control job execution or provide metadata about the
* job being executed
*/
private static final Set<String> allowedApplicationParams =
private static final Set<String> allowedJobParams =
new HashSet<>(
Arrays.asList(
"spark.app.id",
"spark.app.name",
"spark.app.startTime",
"spark.databricks.clusterSource",
"spark.databricks.clusterUsageTags.clusterId",
"spark.databricks.clusterUsageTags.clusterName",
"spark.databricks.clusterUsageTags.clusterNodeType",
"spark.databricks.clusterUsageTags.clusterWorkers",
"spark.databricks.clusterUsageTags.driverContainerId",
"spark.databricks.clusterUsageTags.sparkVersion",
"spark.databricks.clusterUsageTags.workerEnvironmentId",
"spark.databricks.env",
"spark.databricks.job.parentRunId",
"spark.databricks.job.type",
"spark.databricks.sparkContextId",
"spark.databricks.workload.name",
"spark.default.parallelism",
"spark.dynamicAllocation.enabled",
"spark.dynamicAllocation.executorIdleTimeout",
Expand All @@ -37,6 +53,8 @@ class SparkConfAllowList {
"spark.executor.memoryOverhead",
"spark.executor.memoryOverheadFactor",
"spark.files.maxPartitionBytes",
"spark.job.description",
"spark.jobGroup.id",
"spark.master",
"spark.memory.fraction",
"spark.memory.storageFraction",
Expand All @@ -45,43 +63,12 @@ class SparkConfAllowList {
"spark.submit.deployMode",
"spark.sql.autoBroadcastJoinThreshold",
"spark.sql.files.maxPartitionBytes",
"spark.sql.shuffle.partitions"));

/**
* Job-specific parameters that can be used to control job execution or provide metadata about the
* job being executed
*/
private static final Set<String> allowedJobParams =
new HashSet<>(
Arrays.asList(
"spark.app.id",
"spark.app.name",
"spark.app.startTime",
"spark.databricks.clusterSource",
"spark.databricks.clusterUsageTags.clusterId",
"spark.databricks.clusterUsageTags.clusterName",
"spark.databricks.clusterUsageTags.clusterNodeType",
"spark.databricks.clusterUsageTags.clusterWorkers",
"spark.databricks.clusterUsageTags.driverContainerId",
"spark.databricks.clusterUsageTags.sparkVersion",
"spark.databricks.clusterUsageTags.workerEnvironmentId",
"spark.databricks.env",
"spark.databricks.job.parentRunId",
"spark.databricks.job.type",
"spark.databricks.sparkContextId",
"spark.databricks.workload.name",
"spark.job.description",
"spark.jobGroup.id",
"spark.sql.shuffle.partitions",
"spark.sql.execution.id",
"sql.streaming.queryId",
"streaming.sql.batchId",
"user"));

public static boolean canCaptureApplicationParameter(String parameterName) {
return allowedApplicationParams.contains(parameterName)
|| allowedJobParams.contains(parameterName);
}

public static boolean canCaptureJobParameter(String parameterName) {
return allowedJobParams.contains(parameterName);
}
Expand Down

0 comments on commit 3c56d7d

Please sign in to comment.