Skip to content

Commit

Permalink
SPARKC-704 - spark 3.5.0 support (#1363)
Browse files Browse the repository at this point in the history
* chore(deps) - upgrade spark and jetty base versions

* chore(deps) - add spark staging resolver to build.sbt

* doc(spark-3.5) - prepare docs for spark 3.5 but not publish

* substitute RowEncode as ExpressionEncoder and reduce BatchScanExec params

* ensure useCommitCoordinator is set correctly for StreamingWrite

* explicitly declare inherited value from BatchWrite for StreamingWrite

* api changes in scan and structtype apis

* doc(3.5) - add 3.5 to documentation as head version

* deps(spark 3.5.0 rc-3) - bump spark 3.5.0 to RC-3

* chore(deps) - remove spark rc registry
  • Loading branch information
JackBuggins committed Feb 2, 2024
1 parent 932abd0 commit 05ca11a
Show file tree
Hide file tree
Showing 15 changed files with 36 additions and 26 deletions.
4 changes: 4 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@

3.5.0
* Support for Apache Spark 3.5 (SPARKC-704)

3.4.1
* Scala 2.13 support (SPARKC-686)

Expand Down
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Spark RDDs and Datasets/DataFrames to Cassandra tables, and execute arbitrary CQ
in your Spark applications.

- Compatible with Apache Cassandra version 2.1 or higher (see table below)
- Compatible with Apache Spark 1.0 through 3.4 ([see table below](#version-compatibility))
- Compatible with Apache Spark 1.0 through 3.5 ([see table below](#version-compatibility))
- Compatible with Scala 2.11, 2.12 and 2.13
- Exposes Cassandra tables as Spark RDDs and Datasets/DataFrames
- Maps table rows to CassandraRow objects or tuples
Expand All @@ -45,15 +45,17 @@ corresponds to the 1.6 release. The "master" branch will normally contain
development for the next connector release in progress.

Currently, the following branches are actively supported:
3.4.x ([master](https://github.com/datastax/spark-cassandra-connector/tree/master)),
3.5.x ([master](https://github.com/datastax/spark-cassandra-connector/tree/master)),
3.4.x ([b3.4](https://github.com/datastax/spark-cassandra-connector/tree/b3.4)),
3.3.x ([b3.2](https://github.com/datastax/spark-cassandra-connector/tree/b3.3)),
3.2.x ([b3.2](https://github.com/datastax/spark-cassandra-connector/tree/b3.2)),
3.1.x ([b3.1](https://github.com/datastax/spark-cassandra-connector/tree/b3.1)),
3.0.x ([b3.0](https://github.com/datastax/spark-cassandra-connector/tree/b3.0)) and
2.5.x ([b2.5](https://github.com/datastax/spark-cassandra-connector/tree/b2.5)).

| Connector | Spark | Cassandra | Cassandra Java Driver | Minimum Java Version | Supported Scala Versions |
|-----------|---------------|-----------------------| --------------------- | -------------------- |--------------------------|
|-----------|---------------|-----------------------| --------------------- | -------------------- | ----------------------- |
| 3.5 | 3.5 | 2.1.5*, 2.2, 3.x, 4.x | 4.13 | 8 | 2.12, 2.13 |
| 3.4 | 3.4 | 2.1.5*, 2.2, 3.x, 4.x | 4.13 | 8 | 2.12, 2.13 |
| 3.3 | 3.3 | 2.1.5*, 2.2, 3.x, 4.x | 4.13 | 8 | 2.12 |
| 3.2 | 3.2 | 2.1.5*, 2.2, 3.x, 4.0 | 4.13 | 8 | 2.12 |
Expand Down Expand Up @@ -105,7 +107,7 @@ This project is available on the Maven Central Repository.
For SBT to download the connector binaries, sources and javadoc, put this in your project
SBT config:

libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "3.4.1"
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "3.5.0"

* The default Scala version for Spark 3.0+ is 2.12 please choose the appropriate build. See the
[FAQ](doc/FAQ.md) for more information.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ trait SparkCassandraITSpecBase

def getCassandraScan(plan: SparkPlan): CassandraScan = {
plan.collectLeaves.collectFirst{
case BatchScanExec(_, cassandraScan: CassandraScan, _, _, _, _, _, _, _) => cassandraScan
case BatchScanExec(_, cassandraScan: CassandraScan, _, _, _, _) => cassandraScan
}.getOrElse(throw new IllegalArgumentException("No Cassandra Scan Found"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ trait SaiBaseSpec extends Matchers with SparkCassandraITSpecBase {

def findCassandraScan(plan: SparkPlan): CassandraScan = {
plan match {
case BatchScanExec(_, scan: CassandraScan, _, _, _, _, _, _, _) => scan
case BatchScanExec(_, scan: CassandraScan, _, _, _, _) => scan
case filter: FilterExec => findCassandraScan(filter.child)
case project: ProjectExec => findCassandraScan(project.child)
case _ => throw new NoSuchElementException("RowDataSourceScanExec was not found in the given plan")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,10 @@ class CassandraDataSourceSpec extends SparkCassandraITFlatSpecBase with DefaultC
if (pushDown)
withClue(s"Given Dataframe plan does not contain CassandraInJoin in its predecessors.\n${df.queryExecution.sparkPlan.toString()}") {
df.queryExecution.executedPlan.collectLeaves().collectFirst{
case a@BatchScanExec(_, _: CassandraInJoin, _, _, _, _, _, _, _) => a
case a@BatchScanExec(_, _: CassandraInJoin, _, _, _, _) => a
case b@AdaptiveSparkPlanExec(_, _, _, _, _) =>
b.executedPlan.collectLeaves().collectFirst{
case a@BatchScanExec(_, _: CassandraInJoin, _, _, _, _, _, _, _) => a
case a@BatchScanExec(_, _: CassandraInJoin, _, _, _, _) => a
}
} shouldBe defined
}
Expand All @@ -288,7 +288,7 @@ class CassandraDataSourceSpec extends SparkCassandraITFlatSpecBase with DefaultC
private def assertOnAbsenceOfCassandraInJoin(df: DataFrame): Unit =
withClue(s"Given Dataframe plan contains CassandraInJoin in its predecessors.\n${df.queryExecution.sparkPlan.toString()}") {
df.queryExecution.executedPlan.collectLeaves().collectFirst{
case a@BatchScanExec(_, _: CassandraInJoin, _, _, _, _, _, _, _) => a
case a@BatchScanExec(_, _: CassandraInJoin, _, _, _, _) => a
} shouldBe empty
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
object CatalystUtil {

def findCassandraScan(sparkPlan: SparkPlan): Option[CassandraScan] = {
sparkPlan.collectFirst{ case BatchScanExec(_, scan: CassandraScan, _, _, _, _, _, _, _) => scan}
sparkPlan.collectFirst{ case BatchScanExec(_, scan: CassandraScan, _, _, _, _) => scan}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.apache.spark.sql.datastax.test.empty

import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source}
import org.apache.spark.sql.sources.StreamSourceProvider
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
Expand Down Expand Up @@ -33,7 +34,7 @@ class DefaultSource extends StreamSourceProvider {
}

override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
Dataset.ofRows(sqlContext.sparkSession, LocalRelation(schema.toAttributes, isStreaming = true))
Dataset.ofRows(sqlContext.sparkSession, LocalRelation(DataTypeUtils.toAttributes(schema), isStreaming = true))
}

override def stop() {}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.apache.spark.sql.datastax.test.monotonic

import org.apache.spark.sql.{DataFrame, Dataset, SQLContext}
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, SerializedOffset, Source}
Expand Down Expand Up @@ -43,7 +44,7 @@ class DefaultSource extends StreamSourceProvider {
}
val rows = (startValue.toInt to endValue.toInt).map( value =>
new GenericInternalRow(values = Array(value)))
Dataset.ofRows(spark.sparkSession, LocalRelation(schema.toAttributes, rows, isStreaming = true))
Dataset.ofRows(spark.sparkSession, LocalRelation(DataTypeUtils.toAttributes(schema), rows, isStreaming = true))
}

override def stop() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ case class CassandraBulkWrite(

override def createStreamingWriterFactory(info: PhysicalWriteInfo): StreamingDataWriterFactory = getWriterFactory()

override def useCommitCoordinator(): Boolean = super.useCommitCoordinator()

private def getWriterFactory(): CassandraDriverDataWriterFactory = {
CassandraDriverDataWriterFactory(
connector,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.datastax.spark
import com.datastax.spark.connector.rdd.{CassandraTableScanRDD, SparkPartitionLimit}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row}

import scala.language.implicitConversions
Expand Down Expand Up @@ -65,7 +65,7 @@ package object connector {
new CassandraTableScanRDDFunctions(rdd)

implicit def toDataFrameFunctions(dataFrame: DataFrame): DatasetFunctions[Row] =
new DatasetFunctions[Row](dataFrame)(RowEncoder(dataFrame.schema))
new DatasetFunctions[Row](dataFrame)(ExpressionEncoder(dataFrame.schema))

implicit def toDatasetFunctions[K: Encoder](dataset: Dataset[K]): DatasetFunctions[K] =
new DatasetFunctions[K](dataset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ object CassandraDirectJoinStrategy extends Logging {
*/
def getScanExec(plan: SparkPlan): Option[BatchScanExec] = {
plan.collectFirst {
case exec @ BatchScanExec(_, _: CassandraScan, _, _, _, _, _, _, _) => exec
case exec @ BatchScanExec(_, _: CassandraScan, _, _, _, _) => exec
}
}

Expand Down Expand Up @@ -205,7 +205,7 @@ object CassandraDirectJoinStrategy extends Logging {
def hasCassandraChild[T <: QueryPlan[T]](plan: T): Boolean = {
plan.children.size == 1 && plan.children.exists {
case DataSourceV2ScanRelation(DataSourceV2Relation(_: CassandraTable, _, _, _, _), _, _, _, _) => true
case BatchScanExec(_, _: CassandraScan, _, _, _, _, _, _, _) => true
case BatchScanExec(_, _: CassandraScan, _, _, _, _) => true
case _ => false
}
}
Expand Down Expand Up @@ -238,7 +238,7 @@ object CassandraDirectJoinStrategy extends Logging {
originalOutput: Seq[Attribute]): SparkPlan = {
val reordered = plan match {
//This may be the only node in the Plan
case BatchScanExec(_, _: CassandraScan, _, _, _, _, _, _, _) => directJoin
case BatchScanExec(_, _: CassandraScan, _, _, _, _) => directJoin
// Plan has children
case normalPlan => normalPlan.transform {
case penultimate if hasCassandraChild(penultimate) =>
Expand Down
10 changes: 5 additions & 5 deletions doc/0_quick_start.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ Configure a new Scala project with the Apache Spark and dependency.

The dependencies are easily retrieved via Maven Central

libraryDependencies += "com.datastax.spark" % "spark-cassandra-connector_2.12" % "3.4.1"
libraryDependencies += "com.datastax.spark" % "spark-cassandra-connector_2.12" % "3.5.0"

The spark-packages libraries can also be used with spark-submit and spark shell, these
commands will place the connector and all of its dependencies on the path of the
Spark Driver and all Spark Executors.

$SPARK_HOME/bin/spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.12:3.4.1
$SPARK_HOME/bin/spark-submit --packages com.datastax.spark:spark-cassandra-connector_2.12:3.4.1
$SPARK_HOME/bin/spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.12:3.5.0
$SPARK_HOME/bin/spark-submit --packages com.datastax.spark:spark-cassandra-connector_2.12:3.5.0

For the list of available versions, see:
- https://repo1.maven.org/maven2/com/datastax/spark/spark-cassandra-connector_2.12/

Expand All @@ -42,7 +42,7 @@ and *all* of its dependencies on the Spark Class PathTo configure
the default Spark Configuration pass key value pairs with `--conf`

$SPARK_HOME/bin/spark-shell --conf spark.cassandra.connection.host=127.0.0.1 \
--packages com.datastax.spark:spark-cassandra-connector_2.12:3.4.1
--packages com.datastax.spark:spark-cassandra-connector_2.12:3.5.0
--conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions

This command would set the Spark Cassandra Connector parameter
Expand Down
2 changes: 1 addition & 1 deletion doc/13_spark_shell.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Find additional versions at [Spark Packages](https://repo1.maven.org/maven2/com/
```bash
cd spark/install/dir
#Include the --master if you want to run against a spark cluster and not local mode
./bin/spark-shell [--master sparkMasterAddress] --jars yourAssemblyJar --packages com.datastax.spark:spark-cassandra-connector_2.12:3.4.1 --conf spark.cassandra.connection.host=yourCassandraClusterIp
./bin/spark-shell [--master sparkMasterAddress] --jars yourAssemblyJar --packages com.datastax.spark:spark-cassandra-connector_2.12:3.5.0 --conf spark.cassandra.connection.host=yourCassandraClusterIp
```

By default spark will log everything to the console and this may be a bit of an overload. To change this copy and modify the `log4j.properties` template file
Expand Down
2 changes: 1 addition & 1 deletion doc/15_python.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ shell similarly to how the spark shell is started. The preferred method is now t

```bash
./bin/pyspark \
--packages com.datastax.spark:spark-cassandra-connector_2.12:3.4.1 \
--packages com.datastax.spark:spark-cassandra-connector_2.12:3.5.0 \
--conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions
```

Expand Down
4 changes: 2 additions & 2 deletions project/Versions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ object Versions {
val JUnitInterface = "0.11"
val Mockito = "1.10.19"

val ApacheSpark = "3.4.1"
val SparkJetty = "9.4.50.v20221201"
val ApacheSpark = "3.5.0"
val SparkJetty = "9.4.51.v20230217"
val SolrJ = "8.3.0"

val ScalaCompat = "2.11.0"
Expand Down

0 comments on commit 05ca11a

Please sign in to comment.