From 05ca11a533428daa631421ec77726245a10af9a3 Mon Sep 17 00:00:00 2001 From: Jack Richard Buggins Date: Fri, 2 Feb 2024 08:05:25 +0000 Subject: [PATCH] SPARKC-704 - spark 3.5.0 support (#1363) * chore(deps) - upgrade spark and jetty base versions * chore(deps) - add spark staging resolver to build.sbt * doc(spark-3.5) - prepare docs for spark 3.5 but not publish * substitute RowEncode as ExpressionEncoder and reduce BatchScanExec params * ensure useCommitCoordinator is set correctly for StreamingWrite * explicitly declare inherited value from BatchWrite for StreamingWrite * api changes in scan and structtype apis * doc(3.5) - add 3.5 to documentation as head version * deps(spark 3.5.0 rc-3) - bump spark 3.5.0 to RC-3 * chore(deps) - remove spark rc registry --- CHANGES.txt | 4 ++++ README.md | 10 ++++++---- .../spark/connector/SparkCassandraITFlatSpecBase.scala | 2 +- .../datastax/spark/connector/cql/sai/SaiBaseSpec.scala | 2 +- .../spark/connector/sql/CassandraDataSourceSpec.scala | 6 +++--- .../datastax/spark/connector/util/CatalystUtil.scala | 2 +- .../sql/datastax/test/empty/EmptyStreamSource.scala | 3 ++- .../sql/datastax/test/monotonic/FakeStreamSource.scala | 3 ++- .../connector/datasource/CassandraWriteBuilder.scala | 2 ++ .../scala/com/datastax/spark/connector/package.scala | 4 ++-- .../execution/CassandraDirectJoinStrategy.scala | 6 +++--- doc/0_quick_start.md | 10 +++++----- doc/13_spark_shell.md | 2 +- doc/15_python.md | 2 +- project/Versions.scala | 4 ++-- 15 files changed, 36 insertions(+), 26 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 653db5e35..c36988be4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,3 +1,7 @@ + +3.5.0 + * Support for Apache Spark 3.5 (SPARKC-704) + 3.4.1 * Scala 2.13 support (SPARKC-686) diff --git a/README.md b/README.md index 14f38049e..084862e53 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ Spark RDDs and Datasets/DataFrames to Cassandra tables, and execute arbitrary CQ in your Spark applications. - Compatible with Apache Cassandra version 2.1 or higher (see table below) - - Compatible with Apache Spark 1.0 through 3.4 ([see table below](#version-compatibility)) + - Compatible with Apache Spark 1.0 through 3.5 ([see table below](#version-compatibility)) - Compatible with Scala 2.11, 2.12 and 2.13 - Exposes Cassandra tables as Spark RDDs and Datasets/DataFrames - Maps table rows to CassandraRow objects or tuples @@ -45,7 +45,8 @@ corresponds to the 1.6 release. The "master" branch will normally contain development for the next connector release in progress. Currently, the following branches are actively supported: -3.4.x ([master](https://github.com/datastax/spark-cassandra-connector/tree/master)), +3.5.x ([master](https://github.com/datastax/spark-cassandra-connector/tree/master)), +3.4.x ([b3.4](https://github.com/datastax/spark-cassandra-connector/tree/b3.4)), 3.3.x ([b3.2](https://github.com/datastax/spark-cassandra-connector/tree/b3.3)), 3.2.x ([b3.2](https://github.com/datastax/spark-cassandra-connector/tree/b3.2)), 3.1.x ([b3.1](https://github.com/datastax/spark-cassandra-connector/tree/b3.1)), @@ -53,7 +54,8 @@ Currently, the following branches are actively supported: 2.5.x ([b2.5](https://github.com/datastax/spark-cassandra-connector/tree/b2.5)). | Connector | Spark | Cassandra | Cassandra Java Driver | Minimum Java Version | Supported Scala Versions | -|-----------|---------------|-----------------------| --------------------- | -------------------- |--------------------------| +|-----------|---------------|-----------------------| --------------------- | -------------------- | ----------------------- | +| 3.5 | 3.5 | 2.1.5*, 2.2, 3.x, 4.x | 4.13 | 8 | 2.12, 2.13 | | 3.4 | 3.4 | 2.1.5*, 2.2, 3.x, 4.x | 4.13 | 8 | 2.12, 2.13 | | 3.3 | 3.3 | 2.1.5*, 2.2, 3.x, 4.x | 4.13 | 8 | 2.12 | | 3.2 | 3.2 | 2.1.5*, 2.2, 3.x, 4.0 | 4.13 | 8 | 2.12 | @@ -105,7 +107,7 @@ This project is available on the Maven Central Repository. For SBT to download the connector binaries, sources and javadoc, put this in your project SBT config: - libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "3.4.1" + libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "3.5.0" * The default Scala version for Spark 3.0+ is 2.12 please choose the appropriate build. See the [FAQ](doc/FAQ.md) for more information. diff --git a/connector/src/it/scala/com/datastax/spark/connector/SparkCassandraITFlatSpecBase.scala b/connector/src/it/scala/com/datastax/spark/connector/SparkCassandraITFlatSpecBase.scala index bdcc8a7fc..b1c77a0ba 100644 --- a/connector/src/it/scala/com/datastax/spark/connector/SparkCassandraITFlatSpecBase.scala +++ b/connector/src/it/scala/com/datastax/spark/connector/SparkCassandraITFlatSpecBase.scala @@ -248,7 +248,7 @@ trait SparkCassandraITSpecBase def getCassandraScan(plan: SparkPlan): CassandraScan = { plan.collectLeaves.collectFirst{ - case BatchScanExec(_, cassandraScan: CassandraScan, _, _, _, _, _, _, _) => cassandraScan + case BatchScanExec(_, cassandraScan: CassandraScan, _, _, _, _) => cassandraScan }.getOrElse(throw new IllegalArgumentException("No Cassandra Scan Found")) } diff --git a/connector/src/it/scala/com/datastax/spark/connector/cql/sai/SaiBaseSpec.scala b/connector/src/it/scala/com/datastax/spark/connector/cql/sai/SaiBaseSpec.scala index 4e2150522..7dc617b17 100644 --- a/connector/src/it/scala/com/datastax/spark/connector/cql/sai/SaiBaseSpec.scala +++ b/connector/src/it/scala/com/datastax/spark/connector/cql/sai/SaiBaseSpec.scala @@ -47,7 +47,7 @@ trait SaiBaseSpec extends Matchers with SparkCassandraITSpecBase { def findCassandraScan(plan: SparkPlan): CassandraScan = { plan match { - case BatchScanExec(_, scan: CassandraScan, _, _, _, _, _, _, _) => scan + case BatchScanExec(_, scan: CassandraScan, _, _, _, _) => scan case filter: FilterExec => findCassandraScan(filter.child) case project: ProjectExec => findCassandraScan(project.child) case _ => throw new NoSuchElementException("RowDataSourceScanExec was not found in the given plan") diff --git a/connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataSourceSpec.scala b/connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataSourceSpec.scala index 640d444a9..1064d63e8 100644 --- a/connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataSourceSpec.scala +++ b/connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataSourceSpec.scala @@ -274,10 +274,10 @@ class CassandraDataSourceSpec extends SparkCassandraITFlatSpecBase with DefaultC if (pushDown) withClue(s"Given Dataframe plan does not contain CassandraInJoin in its predecessors.\n${df.queryExecution.sparkPlan.toString()}") { df.queryExecution.executedPlan.collectLeaves().collectFirst{ - case a@BatchScanExec(_, _: CassandraInJoin, _, _, _, _, _, _, _) => a + case a@BatchScanExec(_, _: CassandraInJoin, _, _, _, _) => a case b@AdaptiveSparkPlanExec(_, _, _, _, _) => b.executedPlan.collectLeaves().collectFirst{ - case a@BatchScanExec(_, _: CassandraInJoin, _, _, _, _, _, _, _) => a + case a@BatchScanExec(_, _: CassandraInJoin, _, _, _, _) => a } } shouldBe defined } @@ -288,7 +288,7 @@ class CassandraDataSourceSpec extends SparkCassandraITFlatSpecBase with DefaultC private def assertOnAbsenceOfCassandraInJoin(df: DataFrame): Unit = withClue(s"Given Dataframe plan contains CassandraInJoin in its predecessors.\n${df.queryExecution.sparkPlan.toString()}") { df.queryExecution.executedPlan.collectLeaves().collectFirst{ - case a@BatchScanExec(_, _: CassandraInJoin, _, _, _, _, _, _, _) => a + case a@BatchScanExec(_, _: CassandraInJoin, _, _, _, _) => a } shouldBe empty } diff --git a/connector/src/it/scala/com/datastax/spark/connector/util/CatalystUtil.scala b/connector/src/it/scala/com/datastax/spark/connector/util/CatalystUtil.scala index 65a623cb0..bb7d88934 100644 --- a/connector/src/it/scala/com/datastax/spark/connector/util/CatalystUtil.scala +++ b/connector/src/it/scala/com/datastax/spark/connector/util/CatalystUtil.scala @@ -7,6 +7,6 @@ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec object CatalystUtil { def findCassandraScan(sparkPlan: SparkPlan): Option[CassandraScan] = { - sparkPlan.collectFirst{ case BatchScanExec(_, scan: CassandraScan, _, _, _, _, _, _, _) => scan} + sparkPlan.collectFirst{ case BatchScanExec(_, scan: CassandraScan, _, _, _, _) => scan} } } diff --git a/connector/src/it/scala/org/apache/spark/sql/datastax/test/empty/EmptyStreamSource.scala b/connector/src/it/scala/org/apache/spark/sql/datastax/test/empty/EmptyStreamSource.scala index 3b29a801e..60ff8102f 100644 --- a/connector/src/it/scala/org/apache/spark/sql/datastax/test/empty/EmptyStreamSource.scala +++ b/connector/src/it/scala/org/apache/spark/sql/datastax/test/empty/EmptyStreamSource.scala @@ -1,6 +1,7 @@ package org.apache.spark.sql.datastax.test.empty import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source} import org.apache.spark.sql.sources.StreamSourceProvider import org.apache.spark.sql.types.{IntegerType, StructField, StructType} @@ -33,7 +34,7 @@ class DefaultSource extends StreamSourceProvider { } override def getBatch(start: Option[Offset], end: Offset): DataFrame = { - Dataset.ofRows(sqlContext.sparkSession, LocalRelation(schema.toAttributes, isStreaming = true)) + Dataset.ofRows(sqlContext.sparkSession, LocalRelation(DataTypeUtils.toAttributes(schema), isStreaming = true)) } override def stop() {} diff --git a/connector/src/it/scala/org/apache/spark/sql/datastax/test/monotonic/FakeStreamSource.scala b/connector/src/it/scala/org/apache/spark/sql/datastax/test/monotonic/FakeStreamSource.scala index 54ad3c12a..1995718d6 100644 --- a/connector/src/it/scala/org/apache/spark/sql/datastax/test/monotonic/FakeStreamSource.scala +++ b/connector/src/it/scala/org/apache/spark/sql/datastax/test/monotonic/FakeStreamSource.scala @@ -1,6 +1,7 @@ package org.apache.spark.sql.datastax.test.monotonic import org.apache.spark.sql.{DataFrame, Dataset, SQLContext} +import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, SerializedOffset, Source} @@ -43,7 +44,7 @@ class DefaultSource extends StreamSourceProvider { } val rows = (startValue.toInt to endValue.toInt).map( value => new GenericInternalRow(values = Array(value))) - Dataset.ofRows(spark.sparkSession, LocalRelation(schema.toAttributes, rows, isStreaming = true)) + Dataset.ofRows(spark.sparkSession, LocalRelation(DataTypeUtils.toAttributes(schema), rows, isStreaming = true)) } override def stop() {} diff --git a/connector/src/main/scala/com/datastax/spark/connector/datasource/CassandraWriteBuilder.scala b/connector/src/main/scala/com/datastax/spark/connector/datasource/CassandraWriteBuilder.scala index 9e8a92686..2f39606ff 100644 --- a/connector/src/main/scala/com/datastax/spark/connector/datasource/CassandraWriteBuilder.scala +++ b/connector/src/main/scala/com/datastax/spark/connector/datasource/CassandraWriteBuilder.scala @@ -113,6 +113,8 @@ case class CassandraBulkWrite( override def createStreamingWriterFactory(info: PhysicalWriteInfo): StreamingDataWriterFactory = getWriterFactory() + override def useCommitCoordinator(): Boolean = super.useCommitCoordinator() + private def getWriterFactory(): CassandraDriverDataWriterFactory = { CassandraDriverDataWriterFactory( connector, diff --git a/connector/src/main/scala/com/datastax/spark/connector/package.scala b/connector/src/main/scala/com/datastax/spark/connector/package.scala index 3c17f06ad..1e6cfb1c0 100644 --- a/connector/src/main/scala/com/datastax/spark/connector/package.scala +++ b/connector/src/main/scala/com/datastax/spark/connector/package.scala @@ -3,7 +3,7 @@ package com.datastax.spark import com.datastax.spark.connector.rdd.{CassandraTableScanRDD, SparkPartitionLimit} import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row} import scala.language.implicitConversions @@ -65,7 +65,7 @@ package object connector { new CassandraTableScanRDDFunctions(rdd) implicit def toDataFrameFunctions(dataFrame: DataFrame): DatasetFunctions[Row] = - new DatasetFunctions[Row](dataFrame)(RowEncoder(dataFrame.schema)) + new DatasetFunctions[Row](dataFrame)(ExpressionEncoder(dataFrame.schema)) implicit def toDatasetFunctions[K: Encoder](dataset: Dataset[K]): DatasetFunctions[K] = new DatasetFunctions[K](dataset) diff --git a/connector/src/main/scala/org/apache/spark/sql/cassandra/execution/CassandraDirectJoinStrategy.scala b/connector/src/main/scala/org/apache/spark/sql/cassandra/execution/CassandraDirectJoinStrategy.scala index e0ee2d322..b4085e664 100644 --- a/connector/src/main/scala/org/apache/spark/sql/cassandra/execution/CassandraDirectJoinStrategy.scala +++ b/connector/src/main/scala/org/apache/spark/sql/cassandra/execution/CassandraDirectJoinStrategy.scala @@ -147,7 +147,7 @@ object CassandraDirectJoinStrategy extends Logging { */ def getScanExec(plan: SparkPlan): Option[BatchScanExec] = { plan.collectFirst { - case exec @ BatchScanExec(_, _: CassandraScan, _, _, _, _, _, _, _) => exec + case exec @ BatchScanExec(_, _: CassandraScan, _, _, _, _) => exec } } @@ -205,7 +205,7 @@ object CassandraDirectJoinStrategy extends Logging { def hasCassandraChild[T <: QueryPlan[T]](plan: T): Boolean = { plan.children.size == 1 && plan.children.exists { case DataSourceV2ScanRelation(DataSourceV2Relation(_: CassandraTable, _, _, _, _), _, _, _, _) => true - case BatchScanExec(_, _: CassandraScan, _, _, _, _, _, _, _) => true + case BatchScanExec(_, _: CassandraScan, _, _, _, _) => true case _ => false } } @@ -238,7 +238,7 @@ object CassandraDirectJoinStrategy extends Logging { originalOutput: Seq[Attribute]): SparkPlan = { val reordered = plan match { //This may be the only node in the Plan - case BatchScanExec(_, _: CassandraScan, _, _, _, _, _, _, _) => directJoin + case BatchScanExec(_, _: CassandraScan, _, _, _, _) => directJoin // Plan has children case normalPlan => normalPlan.transform { case penultimate if hasCassandraChild(penultimate) => diff --git a/doc/0_quick_start.md b/doc/0_quick_start.md index f046bc73b..8172568b6 100644 --- a/doc/0_quick_start.md +++ b/doc/0_quick_start.md @@ -15,15 +15,15 @@ Configure a new Scala project with the Apache Spark and dependency. The dependencies are easily retrieved via Maven Central - libraryDependencies += "com.datastax.spark" % "spark-cassandra-connector_2.12" % "3.4.1" + libraryDependencies += "com.datastax.spark" % "spark-cassandra-connector_2.12" % "3.5.0" The spark-packages libraries can also be used with spark-submit and spark shell, these commands will place the connector and all of its dependencies on the path of the Spark Driver and all Spark Executors. - $SPARK_HOME/bin/spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.12:3.4.1 - $SPARK_HOME/bin/spark-submit --packages com.datastax.spark:spark-cassandra-connector_2.12:3.4.1 - + $SPARK_HOME/bin/spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.12:3.5.0 + $SPARK_HOME/bin/spark-submit --packages com.datastax.spark:spark-cassandra-connector_2.12:3.5.0 + For the list of available versions, see: - https://repo1.maven.org/maven2/com/datastax/spark/spark-cassandra-connector_2.12/ @@ -42,7 +42,7 @@ and *all* of its dependencies on the Spark Class PathTo configure the default Spark Configuration pass key value pairs with `--conf` $SPARK_HOME/bin/spark-shell --conf spark.cassandra.connection.host=127.0.0.1 \ - --packages com.datastax.spark:spark-cassandra-connector_2.12:3.4.1 + --packages com.datastax.spark:spark-cassandra-connector_2.12:3.5.0 --conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions This command would set the Spark Cassandra Connector parameter diff --git a/doc/13_spark_shell.md b/doc/13_spark_shell.md index 49303d24c..e7962306c 100644 --- a/doc/13_spark_shell.md +++ b/doc/13_spark_shell.md @@ -18,7 +18,7 @@ Find additional versions at [Spark Packages](https://repo1.maven.org/maven2/com/ ```bash cd spark/install/dir #Include the --master if you want to run against a spark cluster and not local mode -./bin/spark-shell [--master sparkMasterAddress] --jars yourAssemblyJar --packages com.datastax.spark:spark-cassandra-connector_2.12:3.4.1 --conf spark.cassandra.connection.host=yourCassandraClusterIp +./bin/spark-shell [--master sparkMasterAddress] --jars yourAssemblyJar --packages com.datastax.spark:spark-cassandra-connector_2.12:3.5.0 --conf spark.cassandra.connection.host=yourCassandraClusterIp ``` By default spark will log everything to the console and this may be a bit of an overload. To change this copy and modify the `log4j.properties` template file diff --git a/doc/15_python.md b/doc/15_python.md index e1016c080..7abf97e51 100644 --- a/doc/15_python.md +++ b/doc/15_python.md @@ -14,7 +14,7 @@ shell similarly to how the spark shell is started. The preferred method is now t ```bash ./bin/pyspark \ - --packages com.datastax.spark:spark-cassandra-connector_2.12:3.4.1 \ + --packages com.datastax.spark:spark-cassandra-connector_2.12:3.5.0 \ --conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions ``` diff --git a/project/Versions.scala b/project/Versions.scala index a9d8b7e44..7ac8abab5 100644 --- a/project/Versions.scala +++ b/project/Versions.scala @@ -13,8 +13,8 @@ object Versions { val JUnitInterface = "0.11" val Mockito = "1.10.19" - val ApacheSpark = "3.4.1" - val SparkJetty = "9.4.50.v20221201" + val ApacheSpark = "3.5.0" + val SparkJetty = "9.4.51.v20230217" val SolrJ = "8.3.0" val ScalaCompat = "2.11.0"