Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add instrumentation support for spark built on scala 2.11 #7268

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions dd-java-agent/instrumentation/spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ configurations.all {
resolutionStrategy.deactivateDependencyLocking()
}
dependencies {
compileOnly group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.0'
compileOnly group: 'org.apache.spark', name: 'spark-sql_2.12', version: '2.4.0'
compileOnly group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.4.0'
compileOnly group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.4.0'

testFixturesImplementation group: 'com.datadoghq', name: 'sketches-java', version: '0.8.2'
testFixturesImplementation group: 'com.google.protobuf', name: 'protobuf-java', version: '3.14.0'
Expand All @@ -17,7 +17,7 @@ dependencies {
testFixturesApi project(':dd-java-agent:instrumentation:trace-annotation')
testFixturesApi project(':dd-java-agent:testing')

testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.0'
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-sql_2.12', version: '2.4.0'
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-yarn_2.12', version: '2.4.0'
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.4.0'
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.4.0'
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-yarn_2.11', version: '2.4.0'
}
54 changes: 54 additions & 0 deletions dd-java-agent/instrumentation/spark/spark_2.11/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
plugins {
id 'java-test-fixtures'
}

def sparkVersion = '2.4.0'
def scalaVersion = '2.11'

muzzle {
pass {
group = "org.apache.spark"
module = "spark-sql_$scalaVersion"
versions = "[$sparkVersion,2.4.8]"
assertInverse = true
}
}
configurations.all {
resolutionStrategy.deactivateDependencyLocking()
}
apply from: "$rootDir/gradle/java.gradle"

addTestSuiteForDir('latestDepTest', 'test')
addTestSuite('test_spark24')

ext {
// Hadoop does not behave correctly with OpenJ9 https://issues.apache.org/jira/browse/HADOOP-18174
excludeJdk = ['SEMERU8', 'SEMERU11']

// Spark does not support Java > 11 until 3.3.0 https://issues.apache.org/jira/browse/SPARK-33772
maxJavaVersionForTests = JavaVersion.VERSION_11
}

dependencies {
implementation project(':dd-java-agent:instrumentation:spark')

compileOnly group: 'org.apache.spark', name: "spark-core_$scalaVersion", version: "$sparkVersion"
compileOnly group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: "$sparkVersion"

testImplementation(testFixtures(project(":dd-java-agent:instrumentation:spark")))
testImplementation group: 'org.apache.spark', name: "spark-core_$scalaVersion", version: "$sparkVersion"
testImplementation group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: "$sparkVersion"
testImplementation group: 'org.apache.spark', name: "spark-yarn_$scalaVersion", version: "$sparkVersion"

test_spark24Implementation group: 'org.apache.spark', name: "spark-core_$scalaVersion", version: "2.4.0"
test_spark24Implementation group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: "2.4.0"
test_spark24Implementation group: 'org.apache.spark', name: "spark-yarn_$scalaVersion", version: "2.4.0"

latestDepTestImplementation group: 'org.apache.spark', name: "spark-core_$scalaVersion", version: '+'
latestDepTestImplementation group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: '+'
latestDepTestImplementation group: 'org.apache.spark', name: "spark-yarn_$scalaVersion", version: '+'
}

tasks.named("test").configure {
dependsOn "test_spark24"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# This is a Gradle generated file for dependency locking.
# Manual edits can break the build and are not advised.
# This file is expected to be part of source control.
empty=spotbugsPlugins,testFixturesAnnotationProcessor
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package datadog.trace.instrumentation.spark;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.StageInfo;
import org.apache.spark.sql.execution.SparkPlanInfo;
import org.apache.spark.sql.execution.metric.SQLMetricInfo;
import scala.collection.JavaConverters;

/**
* DatadogSparkListener compiled for Scala 2.11
*
* <p>The signature of scala.Seq change between 2.12 and 2.13. Methods using scala.Seq needs to be
* compiled with the specific scala version
*/
public class DatadogSpark211Listener extends AbstractDatadogSparkListener {
public DatadogSpark211Listener(SparkConf sparkConf, String appId, String sparkVersion) {
super(sparkConf, appId, sparkVersion);
}

@Override
protected ArrayList<Integer> getSparkJobStageIds(SparkListenerJobStart jobStart) {
ArrayList<Integer> javaIds = new ArrayList<>(jobStart.stageInfos().length());
JavaConverters.seqAsJavaListConverter(jobStart.stageInfos()).asJava().forEach(stage -> javaIds.add(stage.stageId()));
return javaIds;
}

@Override
protected String getSparkJobName(SparkListenerJobStart jobStart) {
if (jobStart.stageInfos().nonEmpty()) {
// In the spark UI, the name of a job is the name of its last stage
return jobStart.stageInfos().last().name();
}

return null;
}

@Override
protected int getStageCount(SparkListenerJobStart jobStart) {
return jobStart.stageInfos().length();
}

@Override
protected Collection<SparkPlanInfo> getPlanInfoChildren(SparkPlanInfo info) {
return JavaConverters.asJavaCollectionConverter(info.children()).asJavaCollection();
}

@Override
protected List<SQLMetricInfo> getPlanInfoMetrics(SparkPlanInfo info) {
return JavaConverters.seqAsJavaListConverter(info.metrics()).asJava();
}

@Override
protected int[] getStageParentIds(StageInfo info) {
int[] parentIds = new int[info.parentIds().length()];
for (int i = 0; i < parentIds.length; i++) {
parentIds[i] = (int) info.parentIds().apply(i);
}

return parentIds;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package datadog.trace.instrumentation.spark;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.*;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.InstrumenterModule;
import net.bytebuddy.asm.Advice;
import org.apache.spark.SparkContext;

@AutoService(InstrumenterModule.class)
public class Spark211Instrumentation extends AbstractSparkInstrumentation {
@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".AbstractDatadogSparkListener",
packageName + ".DatabricksParentContext",
packageName + ".DatadogSpark211Listener",
packageName + ".RemoveEldestHashMap",
packageName + ".SparkAggregatedTaskMetrics",
packageName + ".SparkConfAllowList",
packageName + ".SparkSQLUtils",
packageName + ".SparkSQLUtils$SparkPlanInfoForStage",
packageName + ".SparkSQLUtils$AccumulatorWithStage",
};
}

@Override
public void methodAdvice(MethodTransformer transformer) {
super.methodAdvice(transformer);

transformer.applyAdvice(
isMethod()
.and(named("setupAndStartListenerBus"))
.and(isDeclaredBy(named("org.apache.spark.SparkContext")))
.and(takesNoArguments()),
Spark211Instrumentation.class.getName() + "$InjectListener");
}

public static class InjectListener {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void enter(@Advice.This SparkContext sparkContext) {
AbstractDatadogSparkListener.listener =
new DatadogSpark211Listener(
sparkContext.getConf(), sparkContext.applicationId(), sparkContext.version());
sparkContext.listenerBus().addToSharedQueue(AbstractDatadogSparkListener.listener);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import datadog.trace.instrumentation.spark.AbstractSparkListenerTest
import datadog.trace.instrumentation.spark.DatadogSpark211Listener
import org.apache.spark.SparkConf
import org.apache.spark.scheduler.SparkListener

class SparkListenerTest extends AbstractSparkListenerTest {
@Override
protected SparkListener getTestDatadogSparkListener() {
def conf = new SparkConf()
return new DatadogSpark211Listener(conf, "some_app_id", "some_version")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import datadog.trace.instrumentation.spark.AbstractSparkStructuredStreamingTest

class SparkStructuredStreamingTest extends AbstractSparkStructuredStreamingTest {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import datadog.trace.instrumentation.spark.AbstractSparkTest

class SparkTest extends AbstractSparkTest {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import datadog.trace.instrumentation.spark.AbstractSpark24SqlTest

class Spark24SqlTest extends AbstractSpark24SqlTest {}
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ public synchronized void onStageCompleted(SparkListenerStageCompleted stageCompl
}

for (AccumulableInfo info :
JavaConverters.asJavaCollection(stageInfo.accumulables().values())) {
JavaConverters.asJavaIterableConverter(stageInfo.accumulables().values()).asJava()) {
accumulators.put(info.id(), new SparkSQLUtils.AccumulatorWithStage(stageId, info));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ private void toJson(JsonGenerator generator, Map<Long, AccumulatorWithStage> acc
generator.writeFieldName("meta");
generator.writeStartObject();

for (Tuple2<String, String> metadata : JavaConverters.asJavaCollection(plan.metadata())) {
for (Tuple2<String, String> metadata : JavaConverters.asJavaCollectionConverter(plan.metadata()).asJavaCollection()) {
generator.writeStringField(metadata._1, metadata._2);
}

Expand Down
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ include ':dd-java-agent:instrumentation:shutdown'
include ':dd-java-agent:instrumentation:slick'
include ':dd-java-agent:instrumentation:span-origin'
include ':dd-java-agent:instrumentation:spark'
include ':dd-java-agent:instrumentation:spark:spark_2.11'
include ':dd-java-agent:instrumentation:spark:spark_2.12'
include ':dd-java-agent:instrumentation:spark:spark_2.13'
include ':dd-java-agent:instrumentation:spark-executor'
Expand Down
Loading