Skip to content

Commit

Permalink
add instrumentation support for spark built on scala 2.11
Browse files Browse the repository at this point in the history
  • Loading branch information
yiliangzhou committed Jul 2, 2024
1 parent cdecb42 commit abcf9d7
Show file tree
Hide file tree
Showing 12 changed files with 201 additions and 7 deletions.
10 changes: 5 additions & 5 deletions dd-java-agent/instrumentation/spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ configurations.all {
resolutionStrategy.deactivateDependencyLocking()
}
dependencies {
compileOnly group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.0'
compileOnly group: 'org.apache.spark', name: 'spark-sql_2.12', version: '2.4.0'
compileOnly group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.4.0'
compileOnly group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.4.0'

testFixturesImplementation group: 'com.datadoghq', name: 'sketches-java', version: '0.8.2'
testFixturesImplementation group: 'com.google.protobuf', name: 'protobuf-java', version: '3.14.0'
Expand All @@ -17,7 +17,7 @@ dependencies {
testFixturesApi project(':dd-java-agent:instrumentation:trace-annotation')
testFixturesApi project(':dd-java-agent:testing')

testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.0'
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-sql_2.12', version: '2.4.0'
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-yarn_2.12', version: '2.4.0'
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.4.0'
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.4.0'
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-yarn_2.11', version: '2.4.0'
}
54 changes: 54 additions & 0 deletions dd-java-agent/instrumentation/spark/spark_2.11/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
plugins {
id 'java-test-fixtures'
}

def sparkVersion = '2.4.0'
def scalaVersion = '2.11'

muzzle {
pass {
group = "org.apache.spark"
module = "spark-sql_$scalaVersion"
versions = "[$sparkVersion,2.4.8]"
assertInverse = true
}
}
configurations.all {
resolutionStrategy.deactivateDependencyLocking()
}
apply from: "$rootDir/gradle/java.gradle"

addTestSuiteForDir('latestDepTest', 'test')
addTestSuite('test_spark24')

ext {
// Hadoop does not behave correctly with OpenJ9 https://issues.apache.org/jira/browse/HADOOP-18174
excludeJdk = ['SEMERU8', 'SEMERU11']

// Spark does not support Java > 11 until 3.3.0 https://issues.apache.org/jira/browse/SPARK-33772
maxJavaVersionForTests = JavaVersion.VERSION_11
}

dependencies {
implementation project(':dd-java-agent:instrumentation:spark')

compileOnly group: 'org.apache.spark', name: "spark-core_$scalaVersion", version: "$sparkVersion"
compileOnly group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: "$sparkVersion"

testImplementation(testFixtures(project(":dd-java-agent:instrumentation:spark")))
testImplementation group: 'org.apache.spark', name: "spark-core_$scalaVersion", version: "$sparkVersion"
testImplementation group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: "$sparkVersion"
testImplementation group: 'org.apache.spark', name: "spark-yarn_$scalaVersion", version: "$sparkVersion"

test_spark24Implementation group: 'org.apache.spark', name: "spark-core_$scalaVersion", version: "2.4.0"
test_spark24Implementation group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: "2.4.0"
test_spark24Implementation group: 'org.apache.spark', name: "spark-yarn_$scalaVersion", version: "2.4.0"

latestDepTestImplementation group: 'org.apache.spark', name: "spark-core_$scalaVersion", version: '+'
latestDepTestImplementation group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: '+'
latestDepTestImplementation group: 'org.apache.spark', name: "spark-yarn_$scalaVersion", version: '+'
}

tasks.named("test").configure {
dependsOn "test_spark24"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# This is a Gradle generated file for dependency locking.
# Manual edits can break the build and are not advised.
# This file is expected to be part of source control.
empty=spotbugsPlugins,testFixturesAnnotationProcessor
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package datadog.trace.instrumentation.spark;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.StageInfo;
import org.apache.spark.sql.execution.SparkPlanInfo;
import org.apache.spark.sql.execution.metric.SQLMetricInfo;
import scala.collection.JavaConverters;

/**
* DatadogSparkListener compiled for Scala 2.11
*
* <p>The signature of scala.Seq change between 2.12 and 2.13. Methods using scala.Seq needs to be
* compiled with the specific scala version
*/
public class DatadogSpark211Listener extends AbstractDatadogSparkListener {
public DatadogSpark211Listener(SparkConf sparkConf, String appId, String sparkVersion) {
super(sparkConf, appId, sparkVersion);
}

@Override
protected ArrayList<Integer> getSparkJobStageIds(SparkListenerJobStart jobStart) {
ArrayList<Integer> javaIds = new ArrayList<>(jobStart.stageInfos().length());
JavaConverters.seqAsJavaListConverter(jobStart.stageInfos()).asJava().forEach(stage -> javaIds.add(stage.stageId()));
return javaIds;
}

@Override
protected String getSparkJobName(SparkListenerJobStart jobStart) {
if (jobStart.stageInfos().nonEmpty()) {
// In the spark UI, the name of a job is the name of its last stage
return jobStart.stageInfos().last().name();
}

return null;
}

@Override
protected int getStageCount(SparkListenerJobStart jobStart) {
return jobStart.stageInfos().length();
}

@Override
protected Collection<SparkPlanInfo> getPlanInfoChildren(SparkPlanInfo info) {
return JavaConverters.asJavaCollectionConverter(info.children()).asJavaCollection();
}

@Override
protected List<SQLMetricInfo> getPlanInfoMetrics(SparkPlanInfo info) {
return JavaConverters.seqAsJavaListConverter(info.metrics()).asJava();
}

@Override
protected int[] getStageParentIds(StageInfo info) {
int[] parentIds = new int[info.parentIds().length()];
for (int i = 0; i < parentIds.length; i++) {
parentIds[i] = (int) info.parentIds().apply(i);
}

return parentIds;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package datadog.trace.instrumentation.spark;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.*;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.InstrumenterModule;
import net.bytebuddy.asm.Advice;
import org.apache.spark.SparkContext;

@AutoService(InstrumenterModule.class)
public class Spark211Instrumentation extends AbstractSparkInstrumentation {
@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".AbstractDatadogSparkListener",
packageName + ".DatabricksParentContext",
packageName + ".DatadogSpark211Listener",
packageName + ".RemoveEldestHashMap",
packageName + ".SparkAggregatedTaskMetrics",
packageName + ".SparkConfAllowList",
packageName + ".SparkSQLUtils",
packageName + ".SparkSQLUtils$SparkPlanInfoForStage",
packageName + ".SparkSQLUtils$AccumulatorWithStage",
};
}

@Override
public void methodAdvice(MethodTransformer transformer) {
super.methodAdvice(transformer);

transformer.applyAdvice(
isMethod()
.and(named("setupAndStartListenerBus"))
.and(isDeclaredBy(named("org.apache.spark.SparkContext")))
.and(takesNoArguments()),
Spark211Instrumentation.class.getName() + "$InjectListener");
}

public static class InjectListener {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void enter(@Advice.This SparkContext sparkContext) {
AbstractDatadogSparkListener.listener =
new DatadogSpark211Listener(
sparkContext.getConf(), sparkContext.applicationId(), sparkContext.version());
sparkContext.listenerBus().addToSharedQueue(AbstractDatadogSparkListener.listener);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import datadog.trace.instrumentation.spark.AbstractSparkListenerTest
import datadog.trace.instrumentation.spark.DatadogSpark211Listener
import org.apache.spark.SparkConf
import org.apache.spark.scheduler.SparkListener

class SparkListenerTest extends AbstractSparkListenerTest {
@Override
protected SparkListener getTestDatadogSparkListener() {
def conf = new SparkConf()
return new DatadogSpark211Listener(conf, "some_app_id", "some_version")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import datadog.trace.instrumentation.spark.AbstractSparkStructuredStreamingTest

class SparkStructuredStreamingTest extends AbstractSparkStructuredStreamingTest {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import datadog.trace.instrumentation.spark.AbstractSparkTest

class SparkTest extends AbstractSparkTest {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import datadog.trace.instrumentation.spark.AbstractSpark24SqlTest

class Spark24SqlTest extends AbstractSpark24SqlTest {}
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ public synchronized void onStageCompleted(SparkListenerStageCompleted stageCompl
}

for (AccumulableInfo info :
JavaConverters.asJavaCollection(stageInfo.accumulables().values())) {
JavaConverters.asJavaIterableConverter(stageInfo.accumulables().values()).asJava()) {
accumulators.put(info.id(), new SparkSQLUtils.AccumulatorWithStage(stageId, info));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ private void toJson(JsonGenerator generator, Map<Long, AccumulatorWithStage> acc
generator.writeFieldName("meta");
generator.writeStartObject();

for (Tuple2<String, String> metadata : JavaConverters.asJavaCollection(plan.metadata())) {
for (Tuple2<String, String> metadata : JavaConverters.asJavaCollectionConverter(plan.metadata()).asJavaCollection()) {
generator.writeStringField(metadata._1, metadata._2);
}

Expand Down
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ include ':dd-java-agent:instrumentation:shutdown'
include ':dd-java-agent:instrumentation:slick'
include ':dd-java-agent:instrumentation:span-origin'
include ':dd-java-agent:instrumentation:spark'
include ':dd-java-agent:instrumentation:spark:spark_2.11'
include ':dd-java-agent:instrumentation:spark:spark_2.12'
include ':dd-java-agent:instrumentation:spark:spark_2.13'
include ':dd-java-agent:instrumentation:spark-executor'
Expand Down

0 comments on commit abcf9d7

Please sign in to comment.