Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,7 @@ class BeamModulePlugin implements Plugin<Project> {
def solace_version = "10.21.0"
def spark2_version = "2.4.8"
def spark3_version = "3.5.0"
def spark4_version = "4.0.1"
def spotbugs_version = "4.8.3"
def testcontainers_version = "1.19.7"
// [bomupgrader] determined by: org.apache.arrow:arrow-memory-core, consistent with: google_cloud_platform_libraries_bom
Expand All @@ -656,6 +657,7 @@ class BeamModulePlugin implements Plugin<Project> {

// Export Spark versions, so they are defined in a single place only
project.ext.spark3_version = spark3_version
project.ext.spark4_version = spark4_version
// version for BigQueryMetastore catalog (used by sdks:java:io:iceberg:bqms)
// TODO: remove this and download the jar normally when the catalog gets
// open-sourced (https://github.com/apache/iceberg/pull/11039)
Expand Down
69 changes: 69 additions & 0 deletions runners/spark/4/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

def basePath = '..'
/* All properties required for loading the Spark build script */
project.ext {
// Spark 4 version as defined in BeamModulePlugin
spark_version = spark4_version
println "Spark Version: $spark_version"
spark_scala_version = '2.13'
spark_scala_version = '2.13'
copySourceBase = true
archives_base_name = 'beam-runners-spark-4'
}

// Load the main build script which contains all build logic.
apply from: "$basePath/spark_runner.gradle"

sourceSets.main.java.srcDirs += "src/main/java"

// Generates runQuickstartJavaSpark task (can only support 1 version of Spark)
// createJavaExamplesArchetypeValidationTask(type: 'Quickstart', runner: 'Spark')

// Additional supported Spark versions (used in compatibility tests)
def sparkVersions = [
]

sparkVersions.each { kv ->
configurations.create("sparkVersion$kv.key")
configurations."sparkVersion$kv.key" {
resolutionStrategy {
spark.components.each { component -> force "$component:$kv.value" }
}
}

dependencies {
spark.components.each { component -> "sparkVersion$kv.key" "$component:$kv.value" }
}

tasks.register("sparkVersion${kv.key}Test", Test) {
group = "Verification"
description = "Verifies code compatibility with Spark $kv.value"
classpath = configurations."sparkVersion$kv.key" + sourceSets.test.runtimeClasspath
systemProperties test.systemProperties

include "**/*.class"
maxParallelForks 4
}
}

tasks.register("sparkVersionsTest") {
group = "Verification"
dependsOn sparkVersions.collect{k,v -> "sparkVersion${k}Test"}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.spark.structuredstreaming;

import org.apache.beam.runners.spark.SparkCommonPipelineOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;

/**
* Spark runner {@link PipelineOptions} handles Spark execution-related configurations, such as the
* master address, and other user-related knobs.
*/
public interface SparkStructuredStreamingPipelineOptions extends SparkCommonPipelineOptions {

/** Set to true to run the job in test mode. */
@Default.Boolean(false)
boolean getTestMode();

void setTestMode(boolean testMode);

@Description("Enable if the runner should use the currently active Spark session.")
@Default.Boolean(false)
boolean getUseActiveSparkSession();

void setUseActiveSparkSession(boolean value);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.spark.structuredstreaming;

import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull;

import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.beam.runners.spark.structuredstreaming.metrics.MetricsAccumulator;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.spark.SparkException;
import org.joda.time.Duration;

public class SparkStructuredStreamingPipelineResult implements PipelineResult {

private final Future<?> pipelineExecution;
private final MetricsAccumulator metrics;
private @Nullable final Runnable onTerminalState;
private PipelineResult.State state;

SparkStructuredStreamingPipelineResult(
Future<?> pipelineExecution,
MetricsAccumulator metrics,
@Nullable final Runnable onTerminalState) {
this.pipelineExecution = pipelineExecution;
this.metrics = metrics;
this.onTerminalState = onTerminalState;
// pipelineExecution is expected to have started executing eagerly.
this.state = State.RUNNING;
}

private static RuntimeException runtimeExceptionFrom(final Throwable e) {
return (e instanceof RuntimeException) ? (RuntimeException) e : new RuntimeException(e);
}

/**
* Unwrap cause of SparkException or UserCodeException as PipelineExecutionException. Otherwise,
* return {@code exception} as RuntimeException.
*/
private static RuntimeException unwrapCause(Throwable exception) {
Throwable next = exception;
while (next != null && (next instanceof SparkException || next instanceof UserCodeException)) {
exception = next;
next = next.getCause();
}
return exception == next
? runtimeExceptionFrom(exception)
: new Pipeline.PipelineExecutionException(firstNonNull(next, exception));
}

private State awaitTermination(Duration duration)
throws TimeoutException, ExecutionException, InterruptedException {
pipelineExecution.get(duration.getMillis(), TimeUnit.MILLISECONDS);
// Throws an exception if the job is not finished successfully in the given
// time.
return PipelineResult.State.DONE;
}

@Override
public PipelineResult.State getState() {
return state;
}

@Override
public PipelineResult.State waitUntilFinish() {
return waitUntilFinish(Duration.millis(Long.MAX_VALUE));
}

@Override
public State waitUntilFinish(final Duration duration) {
try {
State finishState = awaitTermination(duration);
offerNewState(finishState);
} catch (final TimeoutException e) {
// ignore.
} catch (final ExecutionException e) {
offerNewState(PipelineResult.State.FAILED);
throw unwrapCause(firstNonNull(e.getCause(), e));
} catch (final Exception e) {
offerNewState(PipelineResult.State.FAILED);
throw unwrapCause(e);
}

return state;
}

@Override
public MetricResults metrics() {
return asAttemptedOnlyMetricResults(metrics.value());
}

@Override
public PipelineResult.State cancel() throws IOException {
offerNewState(PipelineResult.State.CANCELLED);
return state;
}

private void offerNewState(State newState) {
State oldState = this.state;
this.state = newState;
if (!oldState.isTerminal() && newState.isTerminal() && onTerminalState != null) {
try {
onTerminalState.run();
} catch (Exception e) {
throw unwrapCause(e);
}
}
}
}
Loading
Loading