From abcf9d7b8de8de858e273c5c8737fdc056721743 Mon Sep 17 00:00:00 2001
From: Liangzhou Yi <1934063+yiliangzhou@users.noreply.github.com>
Date: Tue, 2 Jul 2024 11:11:04 -0400
Subject: [PATCH] add instrumentation support for spark built on scala 2.11
---
.../instrumentation/spark/build.gradle | 10 +--
.../spark/spark_2.11/build.gradle | 54 +++++++++++++++
.../spark/spark_2.11/gradle.lockfile | 4 ++
.../spark/DatadogSpark211Listener.java | 65 +++++++++++++++++++
.../spark/Spark211Instrumentation.java | 49 ++++++++++++++
.../src/test/groovy/SparkListenerTest.groovy | 12 ++++
.../SparkStructuredStreamingTest.groovy | 3 +
.../src/test/groovy/SparkTest.groovy | 3 +
.../test_spark24/groovy/Spark24SqlTest.groovy | 3 +
.../spark/AbstractDatadogSparkListener.java | 2 +-
.../instrumentation/spark/SparkSQLUtils.java | 2 +-
settings.gradle | 1 +
12 files changed, 201 insertions(+), 7 deletions(-)
create mode 100644 dd-java-agent/instrumentation/spark/spark_2.11/build.gradle
create mode 100644 dd-java-agent/instrumentation/spark/spark_2.11/gradle.lockfile
create mode 100644 dd-java-agent/instrumentation/spark/spark_2.11/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark211Listener.java
create mode 100644 dd-java-agent/instrumentation/spark/spark_2.11/src/main/java/datadog/trace/instrumentation/spark/Spark211Instrumentation.java
create mode 100644 dd-java-agent/instrumentation/spark/spark_2.11/src/test/groovy/SparkListenerTest.groovy
create mode 100644 dd-java-agent/instrumentation/spark/spark_2.11/src/test/groovy/SparkStructuredStreamingTest.groovy
create mode 100644 dd-java-agent/instrumentation/spark/spark_2.11/src/test/groovy/SparkTest.groovy
create mode 100644 dd-java-agent/instrumentation/spark/spark_2.11/src/test_spark24/groovy/Spark24SqlTest.groovy
diff --git a/dd-java-agent/instrumentation/spark/build.gradle b/dd-java-agent/instrumentation/spark/build.gradle
index 90e58f3977e..19940c47e2e 100644
--- a/dd-java-agent/instrumentation/spark/build.gradle
+++ b/dd-java-agent/instrumentation/spark/build.gradle
@@ -7,8 +7,8 @@ configurations.all {
resolutionStrategy.deactivateDependencyLocking()
}
dependencies {
- compileOnly group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.0'
- compileOnly group: 'org.apache.spark', name: 'spark-sql_2.12', version: '2.4.0'
+ compileOnly group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.4.0'
+ compileOnly group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.4.0'
testFixturesImplementation group: 'com.datadoghq', name: 'sketches-java', version: '0.8.2'
testFixturesImplementation group: 'com.google.protobuf', name: 'protobuf-java', version: '3.14.0'
@@ -17,7 +17,7 @@ dependencies {
testFixturesApi project(':dd-java-agent:instrumentation:trace-annotation')
testFixturesApi project(':dd-java-agent:testing')
- testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.0'
- testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-sql_2.12', version: '2.4.0'
- testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-yarn_2.12', version: '2.4.0'
+ testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.4.0'
+ testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.4.0'
+ testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-yarn_2.11', version: '2.4.0'
}
diff --git a/dd-java-agent/instrumentation/spark/spark_2.11/build.gradle b/dd-java-agent/instrumentation/spark/spark_2.11/build.gradle
new file mode 100644
index 00000000000..eb2e2354a0e
--- /dev/null
+++ b/dd-java-agent/instrumentation/spark/spark_2.11/build.gradle
@@ -0,0 +1,54 @@
+plugins {
+ id 'java-test-fixtures'
+}
+
+def sparkVersion = '2.4.0'
+def scalaVersion = '2.11'
+
+muzzle {
+ pass {
+ group = "org.apache.spark"
+ module = "spark-sql_$scalaVersion"
+ versions = "[$sparkVersion,2.4.8]"
+ assertInverse = true
+ }
+}
+configurations.all {
+ resolutionStrategy.deactivateDependencyLocking()
+}
+apply from: "$rootDir/gradle/java.gradle"
+
+addTestSuiteForDir('latestDepTest', 'test')
+addTestSuite('test_spark24')
+
+ext {
+ // Hadoop does not behave correctly with OpenJ9 https://issues.apache.org/jira/browse/HADOOP-18174
+ excludeJdk = ['SEMERU8', 'SEMERU11']
+
+ // Spark does not support Java > 11 until 3.3.0 https://issues.apache.org/jira/browse/SPARK-33772
+ maxJavaVersionForTests = JavaVersion.VERSION_11
+}
+
+dependencies {
+ implementation project(':dd-java-agent:instrumentation:spark')
+
+ compileOnly group: 'org.apache.spark', name: "spark-core_$scalaVersion", version: "$sparkVersion"
+ compileOnly group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: "$sparkVersion"
+
+ testImplementation(testFixtures(project(":dd-java-agent:instrumentation:spark")))
+ testImplementation group: 'org.apache.spark', name: "spark-core_$scalaVersion", version: "$sparkVersion"
+ testImplementation group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: "$sparkVersion"
+ testImplementation group: 'org.apache.spark', name: "spark-yarn_$scalaVersion", version: "$sparkVersion"
+
+ test_spark24Implementation group: 'org.apache.spark', name: "spark-core_$scalaVersion", version: "2.4.0"
+ test_spark24Implementation group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: "2.4.0"
+ test_spark24Implementation group: 'org.apache.spark', name: "spark-yarn_$scalaVersion", version: "2.4.0"
+
+ latestDepTestImplementation group: 'org.apache.spark', name: "spark-core_$scalaVersion", version: '+'
+ latestDepTestImplementation group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: '+'
+ latestDepTestImplementation group: 'org.apache.spark', name: "spark-yarn_$scalaVersion", version: '+'
+}
+
+tasks.named("test").configure {
+ dependsOn "test_spark24"
+}
diff --git a/dd-java-agent/instrumentation/spark/spark_2.11/gradle.lockfile b/dd-java-agent/instrumentation/spark/spark_2.11/gradle.lockfile
new file mode 100644
index 00000000000..5a8ecba31ff
--- /dev/null
+++ b/dd-java-agent/instrumentation/spark/spark_2.11/gradle.lockfile
@@ -0,0 +1,4 @@
+# This is a Gradle generated file for dependency locking.
+# Manual edits can break the build and are not advised.
+# This file is expected to be part of source control.
+empty=spotbugsPlugins,testFixturesAnnotationProcessor
diff --git a/dd-java-agent/instrumentation/spark/spark_2.11/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark211Listener.java b/dd-java-agent/instrumentation/spark/spark_2.11/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark211Listener.java
new file mode 100644
index 00000000000..e080d246326
--- /dev/null
+++ b/dd-java-agent/instrumentation/spark/spark_2.11/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark211Listener.java
@@ -0,0 +1,65 @@
+package datadog.trace.instrumentation.spark;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.spark.SparkConf;
+import org.apache.spark.scheduler.SparkListenerJobStart;
+import org.apache.spark.scheduler.StageInfo;
+import org.apache.spark.sql.execution.SparkPlanInfo;
+import org.apache.spark.sql.execution.metric.SQLMetricInfo;
+import scala.collection.JavaConverters;
+
+/**
+ * DatadogSparkListener compiled for Scala 2.11
+ *
+ *
The signature of scala.Seq change between 2.12 and 2.13. Methods using scala.Seq needs to be
+ * compiled with the specific scala version
+ */
+public class DatadogSpark211Listener extends AbstractDatadogSparkListener {
+ public DatadogSpark211Listener(SparkConf sparkConf, String appId, String sparkVersion) {
+ super(sparkConf, appId, sparkVersion);
+ }
+
+ @Override
+ protected ArrayList getSparkJobStageIds(SparkListenerJobStart jobStart) {
+ ArrayList javaIds = new ArrayList<>(jobStart.stageInfos().length());
+ JavaConverters.seqAsJavaListConverter(jobStart.stageInfos()).asJava().forEach(stage -> javaIds.add(stage.stageId()));
+ return javaIds;
+ }
+
+ @Override
+ protected String getSparkJobName(SparkListenerJobStart jobStart) {
+ if (jobStart.stageInfos().nonEmpty()) {
+ // In the spark UI, the name of a job is the name of its last stage
+ return jobStart.stageInfos().last().name();
+ }
+
+ return null;
+ }
+
+ @Override
+ protected int getStageCount(SparkListenerJobStart jobStart) {
+ return jobStart.stageInfos().length();
+ }
+
+ @Override
+ protected Collection getPlanInfoChildren(SparkPlanInfo info) {
+ return JavaConverters.asJavaCollectionConverter(info.children()).asJavaCollection();
+ }
+
+ @Override
+ protected List getPlanInfoMetrics(SparkPlanInfo info) {
+ return JavaConverters.seqAsJavaListConverter(info.metrics()).asJava();
+ }
+
+ @Override
+ protected int[] getStageParentIds(StageInfo info) {
+ int[] parentIds = new int[info.parentIds().length()];
+ for (int i = 0; i < parentIds.length; i++) {
+ parentIds[i] = (int) info.parentIds().apply(i);
+ }
+
+ return parentIds;
+ }
+}
diff --git a/dd-java-agent/instrumentation/spark/spark_2.11/src/main/java/datadog/trace/instrumentation/spark/Spark211Instrumentation.java b/dd-java-agent/instrumentation/spark/spark_2.11/src/main/java/datadog/trace/instrumentation/spark/Spark211Instrumentation.java
new file mode 100644
index 00000000000..50d2cbb1426
--- /dev/null
+++ b/dd-java-agent/instrumentation/spark/spark_2.11/src/main/java/datadog/trace/instrumentation/spark/Spark211Instrumentation.java
@@ -0,0 +1,49 @@
+package datadog.trace.instrumentation.spark;
+
+import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.*;
+
+import com.google.auto.service.AutoService;
+import datadog.trace.agent.tooling.InstrumenterModule;
+import net.bytebuddy.asm.Advice;
+import org.apache.spark.SparkContext;
+
+@AutoService(InstrumenterModule.class)
+public class Spark211Instrumentation extends AbstractSparkInstrumentation {
+ @Override
+ public String[] helperClassNames() {
+ return new String[] {
+ packageName + ".AbstractDatadogSparkListener",
+ packageName + ".DatabricksParentContext",
+ packageName + ".DatadogSpark211Listener",
+ packageName + ".RemoveEldestHashMap",
+ packageName + ".SparkAggregatedTaskMetrics",
+ packageName + ".SparkConfAllowList",
+ packageName + ".SparkSQLUtils",
+ packageName + ".SparkSQLUtils$SparkPlanInfoForStage",
+ packageName + ".SparkSQLUtils$AccumulatorWithStage",
+ };
+ }
+
+ @Override
+ public void methodAdvice(MethodTransformer transformer) {
+ super.methodAdvice(transformer);
+
+ transformer.applyAdvice(
+ isMethod()
+ .and(named("setupAndStartListenerBus"))
+ .and(isDeclaredBy(named("org.apache.spark.SparkContext")))
+ .and(takesNoArguments()),
+ Spark211Instrumentation.class.getName() + "$InjectListener");
+ }
+
+ public static class InjectListener {
+ @Advice.OnMethodEnter(suppress = Throwable.class)
+ public static void enter(@Advice.This SparkContext sparkContext) {
+ AbstractDatadogSparkListener.listener =
+ new DatadogSpark211Listener(
+ sparkContext.getConf(), sparkContext.applicationId(), sparkContext.version());
+ sparkContext.listenerBus().addToSharedQueue(AbstractDatadogSparkListener.listener);
+ }
+ }
+}
diff --git a/dd-java-agent/instrumentation/spark/spark_2.11/src/test/groovy/SparkListenerTest.groovy b/dd-java-agent/instrumentation/spark/spark_2.11/src/test/groovy/SparkListenerTest.groovy
new file mode 100644
index 00000000000..b4c7f6410de
--- /dev/null
+++ b/dd-java-agent/instrumentation/spark/spark_2.11/src/test/groovy/SparkListenerTest.groovy
@@ -0,0 +1,12 @@
+import datadog.trace.instrumentation.spark.AbstractSparkListenerTest
+import datadog.trace.instrumentation.spark.DatadogSpark211Listener
+import org.apache.spark.SparkConf
+import org.apache.spark.scheduler.SparkListener
+
+class SparkListenerTest extends AbstractSparkListenerTest {
+ @Override
+ protected SparkListener getTestDatadogSparkListener() {
+ def conf = new SparkConf()
+ return new DatadogSpark211Listener(conf, "some_app_id", "some_version")
+ }
+}
diff --git a/dd-java-agent/instrumentation/spark/spark_2.11/src/test/groovy/SparkStructuredStreamingTest.groovy b/dd-java-agent/instrumentation/spark/spark_2.11/src/test/groovy/SparkStructuredStreamingTest.groovy
new file mode 100644
index 00000000000..c004e152502
--- /dev/null
+++ b/dd-java-agent/instrumentation/spark/spark_2.11/src/test/groovy/SparkStructuredStreamingTest.groovy
@@ -0,0 +1,3 @@
+import datadog.trace.instrumentation.spark.AbstractSparkStructuredStreamingTest
+
+class SparkStructuredStreamingTest extends AbstractSparkStructuredStreamingTest {}
diff --git a/dd-java-agent/instrumentation/spark/spark_2.11/src/test/groovy/SparkTest.groovy b/dd-java-agent/instrumentation/spark/spark_2.11/src/test/groovy/SparkTest.groovy
new file mode 100644
index 00000000000..054fb96c2c1
--- /dev/null
+++ b/dd-java-agent/instrumentation/spark/spark_2.11/src/test/groovy/SparkTest.groovy
@@ -0,0 +1,3 @@
+import datadog.trace.instrumentation.spark.AbstractSparkTest
+
+class SparkTest extends AbstractSparkTest {}
diff --git a/dd-java-agent/instrumentation/spark/spark_2.11/src/test_spark24/groovy/Spark24SqlTest.groovy b/dd-java-agent/instrumentation/spark/spark_2.11/src/test_spark24/groovy/Spark24SqlTest.groovy
new file mode 100644
index 00000000000..466406055d0
--- /dev/null
+++ b/dd-java-agent/instrumentation/spark/spark_2.11/src/test_spark24/groovy/Spark24SqlTest.groovy
@@ -0,0 +1,3 @@
+import datadog.trace.instrumentation.spark.AbstractSpark24SqlTest
+
+class Spark24SqlTest extends AbstractSpark24SqlTest {}
diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java
index 000287f121b..87806b38418 100644
--- a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java
+++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java
@@ -496,7 +496,7 @@ public synchronized void onStageCompleted(SparkListenerStageCompleted stageCompl
}
for (AccumulableInfo info :
- JavaConverters.asJavaCollection(stageInfo.accumulables().values())) {
+ JavaConverters.asJavaIterableConverter(stageInfo.accumulables().values()).asJava()) {
accumulators.put(info.id(), new SparkSQLUtils.AccumulatorWithStage(stageId, info));
}
diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java
index f3f1536c42a..3846f6d7f39 100644
--- a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java
+++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java
@@ -170,7 +170,7 @@ private void toJson(JsonGenerator generator, Map acc
generator.writeFieldName("meta");
generator.writeStartObject();
- for (Tuple2 metadata : JavaConverters.asJavaCollection(plan.metadata())) {
+ for (Tuple2 metadata : JavaConverters.asJavaCollectionConverter(plan.metadata()).asJavaCollection()) {
generator.writeStringField(metadata._1, metadata._2);
}
diff --git a/settings.gradle b/settings.gradle
index e906f7859bd..3852cad680e 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -412,6 +412,7 @@ include ':dd-java-agent:instrumentation:shutdown'
include ':dd-java-agent:instrumentation:slick'
include ':dd-java-agent:instrumentation:span-origin'
include ':dd-java-agent:instrumentation:spark'
+include ':dd-java-agent:instrumentation:spark:spark_2.11'
include ':dd-java-agent:instrumentation:spark:spark_2.12'
include ':dd-java-agent:instrumentation:spark:spark_2.13'
include ':dd-java-agent:instrumentation:spark-executor'