From dd6cae09b66bf8681cd935876f7b2c4aafb1f64a Mon Sep 17 00:00:00 2001 From: Russell Spitzer Date: Tue, 21 Mar 2017 17:00:53 -0700 Subject: [PATCH 01/21] SPARKC-475: Add implicit RowWriterFactory for RDD[Row] Previously when a DataFrame was turned into an RDD by it's `rdd` method saveToCassandra and joinWithCassandraTable would fail because of the lack of an implicit RowWriterFactory. To fix this we add an implicit for a RowWriterFactory for RDD[T <: Row] which ends up mapping to the SqlRowWriterFactory which we already have written. --- .../spark/connector/sql/CassandraDataSourceSpec.scala | 11 ++++++++++- .../spark/connector/writer/RowWriterFactory.scala | 5 ++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataSourceSpec.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataSourceSpec.scala index a8e3ee414..5d2db7362 100644 --- a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataSourceSpec.scala +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataSourceSpec.scala @@ -2,12 +2,12 @@ package com.datastax.spark.connector.sql import scala.concurrent.Future -import com.datastax.spark.connector.util.Logging import org.apache.spark.sql.SaveMode._ import org.apache.spark.sql.cassandra.{AnalyzedPredicates, CassandraPredicateRules, CassandraSourceRelation, TableRef} import org.apache.spark.sql.sources.{EqualTo, Filter} import org.apache.spark.sql.{DataFrame, SparkSession} import org.scalatest.BeforeAndAfterEach +import com.datastax.spark.connector._ import com.datastax.spark.connector.SparkCassandraITFlatSpecBase import com.datastax.spark.connector.cql.{CassandraConnector, TableDef} import com.datastax.spark.connector.embedded.YamlTransformations @@ -35,6 +35,10 @@ class CassandraDataSourceSpec extends SparkCassandraITFlatSpecBase with Logging session.execute(s"""INSERT INTO $ks.test1 (a, b, c, d, e, f, g, h) VALUES (1, 2, 1, 2, 2, 2, 2, 2)""") }, + Future { + session.execute(s"CREATE TABLE $ks.test_rowwriter (a INT PRIMARY KEY, b INT)") + }, + Future { session.execute(s"CREATE TABLE $ks.test_insert (a INT PRIMARY KEY, b INT)") }, @@ -165,6 +169,11 @@ class CassandraDataSourceSpec extends SparkCassandraITFlatSpecBase with Logging sparkSession.sql("DROP VIEW insertTable2") } + // This test is just to make sure at runtime the implicit for RDD[Row] can be found + it should "implicitly generate a rowWriter from it's RDD form" in { + sparkSession.sql("SELECT a, b from tmpTable").rdd.saveToCassandra(ks, "test_rowwriter") + } + it should "allow to filter a table" in { sparkSession.sql("SELECT a, b FROM tmpTable WHERE a=1 and b=2 and c=1 and e=1").collect() should have length 2 } diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/RowWriterFactory.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/RowWriterFactory.scala index 96b1ec862..e02d86cf3 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/RowWriterFactory.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/RowWriterFactory.scala @@ -1,10 +1,10 @@ package com.datastax.spark.connector.writer import scala.reflect.runtime.universe._ - import com.datastax.spark.connector.ColumnRef import com.datastax.spark.connector.cql.TableDef import com.datastax.spark.connector.mapper.ColumnMapper +import org.apache.spark.sql.Row /** Creates instances of [[RowWriter]] objects for the given row type `T`. * `RowWriterFactory` is the trait you need to implement if you want to support row representations @@ -21,6 +21,9 @@ trait RowWriterFactory[T] { /** Provides a low-priority implicit `RowWriterFactory` able to write objects of any class for which * a [[com.datastax.spark.connector.mapper.ColumnMapper ColumnMapper]] is defined. */ trait LowPriorityRowWriterFactoryImplicits { + implicit def sqlRowWriterFactory[T <: Row : TypeTag]: RowWriterFactory[Row] = + SqlRowWriter.Factory + implicit def defaultRowWriterFactory[T : TypeTag : ColumnMapper]: RowWriterFactory[T] = DefaultRowWriter.factory } From bdbd6585deba39210356c8f460a5388ad4c5a0af Mon Sep 17 00:00:00 2001 From: Russell Spitzer Date: Fri, 24 Mar 2017 16:05:18 -0700 Subject: [PATCH 02/21] SPARKC-466: Add a CassandraRDDMock for end users to use in Unit Testing The CassandraRDDMock just passes through another RDD and pretends it is a CassandraRDD. --- doc/FAQ.md | 5 ++ .../connector/rdd/CassandraRDDMockSpec.scala | 24 ++++++++ .../connector/rdd/CassandraRDDMock.scala | 55 +++++++++++++++++++ 3 files changed, 84 insertions(+) create mode 100644 spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDMockSpec.scala create mode 100644 spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/rdd/CassandraRDDMock.scala diff --git a/doc/FAQ.md b/doc/FAQ.md index 080b0928f..9cc956153 100644 --- a/doc/FAQ.md +++ b/doc/FAQ.md @@ -205,6 +205,11 @@ would like to add. We would be happy to discuss it with you and see your work. F that you are satisfied with and passes all the tests (`/dev/run_tests.sh`) make a GitHub PR against your target Connector Version and set your Jira to Reviewing. +### Is there a CassandraRDDMock I can use in my tests? + +Yes. Please see CassandraRDDMock.scala for the class and CassandraRDDMockSpec.scala for example +usage. + ### What should I do if I find a bug? Feel free to post a repo on the Mailing List or if you are feeling ambitious file a Jira with diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDMockSpec.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDMockSpec.scala new file mode 100644 index 000000000..974288a92 --- /dev/null +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDMockSpec.scala @@ -0,0 +1,24 @@ +package com.datastax.spark.connector.rdd + +import com.datastax.spark.connector.{CassandraRow, SparkCassandraITFlatSpecBase} +import com.datastax.spark.connector.cql.CassandraConnector +import com.datastax.spark.connector.embedded.YamlTransformations + +class CassandraRDDMockSpec extends SparkCassandraITFlatSpecBase { + useCassandraConfig(Seq(YamlTransformations.Default)) + useSparkConf(defaultConf) + + override val conn = CassandraConnector(defaultConf) + + "A CassandraRDDMock" should "behave like a CassandraRDD without needing Cassandra" in { + val columns = Seq("key", "value") + //Create a fake CassandraRDD[CassandraRow] + val rdd = sc + .parallelize(1 to 10) + .map(num => CassandraRow.fromMap(columns.zip(Seq(num, num)).toMap)) + + val fakeCassandraRDD: CassandraRDD[CassandraRow] = new CassandraRDDMock(rdd) + + fakeCassandraRDD.cassandraCount() should be (10) + } +} diff --git a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/rdd/CassandraRDDMock.scala b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/rdd/CassandraRDDMock.scala new file mode 100644 index 000000000..ba204d09f --- /dev/null +++ b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/rdd/CassandraRDDMock.scala @@ -0,0 +1,55 @@ +package com.datastax.spark.connector.rdd + +import com.datastax.spark.connector.cql.CassandraConnector +import com.datastax.spark.connector.{ColumnRef, ColumnSelector, SomeColumns} +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD +import org.apache.spark.{Partition, TaskContext} + +import scala.reflect.ClassTag + +/** + * A Fake CassandraRDD for mocking CassandraRDDs. Instead of reading data from cassandra this + * call uses an RDD prev as a source for data. In essenece this is a passthrough RDD which + * takes the contents of the previous RDD and pretends it is a CassandraRDD. + */ +class CassandraRDDMock[R : ClassTag](prev: RDD[R], keyspace: String = "fake", table: String = "fake") extends + CassandraRDD[R](prev.sparkContext, prev.dependencies){ + + /** This is slightly different than Scala this.type. + * this.type is the unique singleton type of an object which is not compatible with other + * instances of the same type, so returning anything other than `this` is not really possible + * without lying to the compiler by explicit casts. + * Here SelfType is used to return a copy of the object - a different instance of the same type */ + override type Self = this.type + + override protected[connector] def keyspaceName: String = keyspace + override protected[connector] def tableName: String = table + override protected def columnNames: ColumnSelector = SomeColumns() + override protected def where: CqlWhereClause = CqlWhereClause(Seq.empty, Seq.empty) + override protected def readConf: ReadConf = ReadConf() + override protected def limit: Option[CassandraLimit] = None + override protected def clusteringOrder: Option[ClusteringOrder] = None + override protected def connector: CassandraConnector = ??? + override val selectedColumnRefs: Seq[ColumnRef] = Seq.empty + override protected def narrowColumnSelection(columns: Seq[ColumnRef]): Seq[ColumnRef] = Seq.empty + + override def toEmptyCassandraRDD: EmptyCassandraRDD[R] = new EmptyCassandraRDD[R](prev.sparkContext, keyspace, table) + + /** Counts the number of items in this RDD by selecting count(*) on Cassandra table */ + override def cassandraCount(): Long = prev.count + + /** Doesn't actually copy since we don't really use any of these parameters **/ + override protected def copy( + columnNames: ColumnSelector, + where: CqlWhereClause, + limit: Option[CassandraLimit], + clusteringOrder: Option[ClusteringOrder], + readConf: ReadConf, + connector: CassandraConnector) : CassandraRDDMock.this.type = this + + /*** Pass through the parent RDD's Compute and partitions ***/ + @DeveloperApi + override def compute(split: Partition, context: TaskContext): Iterator[R] = prev.compute(split, context) + override protected def getPartitions: Array[Partition] = prev.partitions +} From 16a6154884a02255d514f105f170b52ddbee96f4 Mon Sep 17 00:00:00 2001 From: Russell Spitzer Date: Thu, 30 Mar 2017 09:57:03 -0700 Subject: [PATCH 03/21] Update Doc References to Latest Version --- README.md | 8 ++++---- doc/0_quick_start.md | 8 ++++---- doc/13_spark_shell.md | 4 ++-- doc/15_python.md | 2 +- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 4fedac6a9..857d3df3e 100644 --- a/README.md +++ b/README.md @@ -4,9 +4,9 @@ Chat with us at [DataStax Academy #spark-cassandra-connector](#datastax-academy) ### Most Recent Release Scala Docs -### 2.0.0 -* [Spark-Cassandra-Connector](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.0/spark-cassandra-connector/) -* [Embedded-Cassandra](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.0/spark-cassandra-connector-embedded/) +### 2.0.1 +* [Spark-Cassandra-Connector](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.1/spark-cassandra-connector/) +* [Embedded-Cassandra](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.1/spark-cassandra-connector-embedded/) [All Versions API Docs](#hosted-api-docs) @@ -58,7 +58,7 @@ development for the next connector release in progress. ## Hosted API Docs API documentation for the Scala and Java interfaces are available online: -### 2.0.0 +### 2.0.1 * [Spark-Cassandra-Connector](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.0/spark-cassandra-connector/) * [Embedded-Cassandra](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.0/spark-cassandra-connector-embedded/) diff --git a/doc/0_quick_start.md b/doc/0_quick_start.md index 06bbfa601..48f2ad5ca 100644 --- a/doc/0_quick_start.md +++ b/doc/0_quick_start.md @@ -16,14 +16,14 @@ Configure a new Scala project with the Apache Spark and dependency. The dependencies are easily retrieved via the spark-packages.org website. For example, if you're using `sbt`, your build.sbt should include something like this: resolvers += "Spark Packages Repo" at "https://dl.bintray.com/spark-packages/maven" - libraryDependencies += "datastax" % "spark-cassandra-connector" % "2.0.0-s_2.11" + libraryDependencies += "datastax" % "spark-cassandra-connector" % "2.0.1-s_2.11" The spark-packages libraries can also be used with spark-submit and spark shell, these commands will place the connector and all of its dependencies on the path of the Spark Driver and all Spark Executors. - $SPARK_HOME/bin/spark-shell --packages datastax:spark-cassandra-connector:2.0.0-s_2.11 - $SPARK_HOME/bin/spark-submit --packages datastax:spark-cassandra-connector:2.0.0-s_2.11 + $SPARK_HOME/bin/spark-shell --packages datastax:spark-cassandra-connector:2.0.1-s_2.11 + $SPARK_HOME/bin/spark-submit --packages datastax:spark-cassandra-connector:2.0.1-s_2.11 For the list of available versions, see: - https://spark-packages.org/package/datastax/spark-cassandra-connector @@ -59,7 +59,7 @@ Run the `spark-shell` with the packages line for your version. To configure the default Spark Configuration pass key value pairs with `--conf` $SPARK_HOME/bin/spark-shell --conf spark.cassandra.connection.host=127.0.0.1 \ - --packages datastax:spark-cassandra-connector:2.0.0-s_2.11 + --packages datastax:spark-cassandra-connector:2.0.1-s_2.11 This command would set the Spark Cassandra Connector parameter `spark.cassandra.connection.host` to `127.0.0.1`. Change this diff --git a/doc/13_spark_shell.md b/doc/13_spark_shell.md index c5e881cfd..c893a5fb4 100644 --- a/doc/13_spark_shell.md +++ b/doc/13_spark_shell.md @@ -2,7 +2,7 @@ ## Using the Spark Cassandra Connector with the Spark Shell -These instructions were last confirmed with Cassandra 3.0.9, Spark 2.0.2 and Connector 2.0.0. +These instructions were last confirmed with Cassandra 3.0.9, Spark 2.0.2 and Connector 2.0.1. For this guide, we assume an existing Cassandra deployment, running either locally or on a cluster, a local installation of Spark and an optional Spark cluster. For detail setup instructions see [setup spark-shell](13_1_setup_spark_shell.md) @@ -18,7 +18,7 @@ Find additional versions at [Spark Packages](http://spark-packages.org/package/d ```bash cd spark/install/dir #Include the --master if you want to run against a spark cluster and not local mode -./bin/spark-shell [--master sparkMasterAddress] --jars yourAssemblyJar --packages datastax:spark-cassandra-connector:2.0.0-s_2.11 --conf spark.cassandra.connection.host=yourCassandraClusterIp +./bin/spark-shell [--master sparkMasterAddress] --jars yourAssemblyJar --packages datastax:spark-cassandra-connector:2.0.1-s_2.11 --conf spark.cassandra.connection.host=yourCassandraClusterIp ``` By default spark will log everything to the console and this may be a bit of an overload. To change this copy and modify the `log4j.properties` template file diff --git a/doc/15_python.md b/doc/15_python.md index a0fa71f79..d7f5d4e6d 100644 --- a/doc/15_python.md +++ b/doc/15_python.md @@ -16,7 +16,7 @@ http://spark-packages.org/package/datastax/spark-cassandra-connector ```bash ./bin/pyspark \ - --packages com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-s_2.11 + --packages com.datastax.spark:spark-cassandra-connector_2.11:2.0.1-s_2.11 ``` ### Loading a DataFrame in Python From 85d59259774b541670f576281cd4f2c73bbe3df5 Mon Sep 17 00:00:00 2001 From: Russell Spitzer Date: Thu, 23 Mar 2017 16:05:49 -0700 Subject: [PATCH 04/21] Refresh Documentation to Use Spark 2.X concepts Fix doc links Adds Python Dictionary as Kwargs Example More examples, Common issues Minor edits for consistency Text and link edits --- README.md | 7 +- doc/0_quick_start.md | 3 +- doc/10_embedded.md | 2 +- doc/14_data_frames.md | 261 +++++++++++++++++++++--------------------- doc/15_python.md | 15 ++- doc/9_demos.md | 3 + doc/FAQ.md | 33 ++++-- 7 files changed, 177 insertions(+), 147 deletions(-) diff --git a/README.md b/README.md index 857d3df3e..fff72c631 100644 --- a/README.md +++ b/README.md @@ -59,8 +59,8 @@ development for the next connector release in progress. API documentation for the Scala and Java interfaces are available online: ### 2.0.1 -* [Spark-Cassandra-Connector](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.0/spark-cassandra-connector/) -* [Embedded-Cassandra](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.0/spark-cassandra-connector-embedded/) +* [Spark-Cassandra-Connector](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.1/spark-cassandra-connector/) +* [Embedded-Cassandra](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.1/spark-cassandra-connector-embedded/) ### 1.6.5 * [Spark-Cassandra-Connector](http://datastax.github.io/spark-cassandra-connector/ApiDocs/1.6.5/spark-cassandra-connector/) @@ -95,6 +95,9 @@ For SBT to download the connector binaries, sources and javadoc, put this in you SBT config: libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "2.0.0" + +* The default Scala version for Spark 2.0+ is 2.11 please choose the appropriate build. See the +[FAQ](doc/FAQ.md) for more information ## Building See [Building And Artifacts](doc/12_building_and_artifacts.md) diff --git a/doc/0_quick_start.md b/doc/0_quick_start.md index 48f2ad5ca..505cd7f65 100644 --- a/doc/0_quick_start.md +++ b/doc/0_quick_start.md @@ -65,10 +65,11 @@ This command would set the Spark Cassandra Connector parameter `spark.cassandra.connection.host` to `127.0.0.1`. Change this to the address of one of the nodes in your Cassandra cluster. -Enable Cassandra-specific functions on the `SparkContext`, `RDD`, and `DataFrame`: +Enable Cassandra-specific functions on the `SparkContext`, `SparkSession`, `RDD`, and `DataFrame`: ```scala import com.datastax.spark.connector._ +import org.apache.spark.sql.cassandra._ ``` ### Loading and analyzing data from Cassandra diff --git a/doc/10_embedded.md b/doc/10_embedded.md index 7a9ee69cc..1907d5dcc 100644 --- a/doc/10_embedded.md +++ b/doc/10_embedded.md @@ -24,6 +24,6 @@ Simply add this to your SBT build, or in the appropriate format for a Maven buil "com.datastax.spark" %% "spark-cassandra-connector-embedded" % {latest.version} ## Examples -https://github.com/datastax/SparkBuildExamples +[Spark Build Examples](https://github.com/datastax/SparkBuildExamples) [Next - Performance Monitoring](11_metrics.md) diff --git a/doc/14_data_frames.md b/doc/14_data_frames.md index 76d7276a1..0d22a1c0f 100644 --- a/doc/14_data_frames.md +++ b/doc/14_data_frames.md @@ -1,19 +1,20 @@ # Documentation -## DataFrames +## Datasets (Previously DataFrames) -DataFrames provide a new API for manipulating data within Spark. These provide a more user +Datasets provide a new API for manipulating data within Spark. These provide a more user friendly experience than pure Scala for common queries. The Spark Cassandra Connector provides -an integrated DataSource to make creating Cassandra DataFrames easy. +an integrated Data Source to make creating Cassandra Datasets easy. -Spark Docs: -[Data Sources](https://spark.apache.org/docs/latest/sql-programming-guide.html#data-sources) -[Data Frames](https://spark.apache.org/docs/latest/sql-programming-guide.html#dataframes) +[What happened to DataFrames?](#what-happened-to-dataframes) +Spark Docs: +* [Data Sources](https://spark.apache.org/docs/latest/sql-programming-guide.html#data-sources) +* [Datasets and DataFrames](https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes) -### Options +### Datasource Specific Options DataSources in Spark take a map of Options which define how the source should act. The -Connector provides a CassandraSource which recognizes the following Key Value pairs. +Connector provides a CassandraSource which recognizes the following key/value pairs. Those followed with a default of N/A are required, all others are optional. | Option Key | Controls | Values | Default | @@ -23,38 +24,64 @@ Those followed with a default of N/A are required, all others are optional. | cluster | The group of the Cluster Level Settings to inherit | String | "default"| | pushdown | Enables pushing down predicates to Cassandra when applicable | (true,false) | true | -#### Read, Writing and CassandraConnector Options +#### General Read, Write and Connection Options Any normal Spark Connector configuration options for Connecting, Reading or Writing -can be passed through as DataFrame options as well. When using the `read` command below these -options should appear exactly the same as when set in the SparkConf. +can be passed through as Dataset options as well. When using the `read` command below these +options should appear exactly the same as when set in the SparkConf. See +[Config Helpers](#example-using-typesafe-parameter-configuration-options) for +typed helpers for setting these options. #### Setting Cluster and Keyspace Level Options The connector also provides a way to describe the options which should be applied to all -DataFrames within a cluster or within a keyspace. When a property has been specified at the +Datasets within a cluster or within a keyspace. When a property has been specified at the table level it will override the default keyspace or cluster property. -To add these properties add keys to your `SparkConf` in the format +To add these properties add keys to your `SparkConf` use the helpers explained + in the next section or by manually entering them in the format + + clusterName:keyspaceName/propertyName + +#### Example Using TypeSafe Parameter Configuration Options +There are also some helper methods which simplify setting Spark Cassandra +Connector related parameters. This makes it easier to set parameters without +remembering the above syntax: +```scala +import org.apache.spark.sql.cassandra._ + +import com.datastax.spark.connector.cql.CassandraConnectorConf +import com.datastax.spark.connector.rdd.ReadConf - clusterName:keyspaceName/propertyName. +// set params for all clusters and keyspaces +spark.setCassandraConf(CassandraConnectorConf.KeepAliveMillisParam.option(10000)) + +// set params for the particular cluster +spark.setCassandraConf("Cluster1", CassandraConnectorConf.ConnectionHostParam.option("127.0.0.1") ++ CassandraConnectorConf.ConnectionPortParam.option(12345)) +spark.setCassandraConf("Cluster2", CassandraConnectorConf.ConnectionHostParam.option("127.0.0.2")) + +// set params for the particular keyspace +spark.setCassandraConf("Cluster1", "ks1", ReadConf.SplitSizeInMBParam.option(128)) +spark.setCassandraConf("Cluster1", "ks2", ReadConf.SplitSizeInMBParam.option(64)) +spark.setCassandraConf("Cluster2", "ks3", ReadConf.SplitSizeInMBParam.option(80)) +``` #### Example Changing Cluster/Keyspace Level Properties ```scala -sqlContext.setConf("ClusterOne/spark.cassandra.input.split.size_in_mb", "32") -sqlContext.setConf("default:test/spark.cassandra.input.split.size_in_mb", "128") -... -val df = sqlContext +spark.setCassandraConf("ClusterOne", "ks1", ReadConf.SplitSizeInMBParam.option(32)) +spark.setCassandraConf("default", "test", ReadConf.SplitSizeInMBParam.option(128)) + +val df = spark .read .format("org.apache.spark.sql.cassandra") .options(Map( "table" -> "words", "keyspace" -> "test")) - .load() // This DataFrame will use a spark.cassandra.input.size of 32 + .load() // This Dataset will use a spark.cassandra.input.size of 128 -val otherdf = sqlContext +val otherdf = spark .read .format("org.apache.spark.sql.cassandra") .options(Map( "table" -> "words", "keyspace" -> "test" , "cluster" -> "ClusterOne")) - .load() // This DataFrame will use a spark.cassandra.input.size of 128 + .load() // This Dataset will use a spark.cassandra.input.size of 32 -val lastdf = sqlContext +val lastdf = spark .read .format("org.apache.spark.sql.cassandra") .options(Map( @@ -63,41 +90,29 @@ val lastdf = sqlContext "cluster" -> "ClusterOne", "spark.cassandra.input.split.size_in_mb" -> 48 ) - ).load() // This DataFrame will use a spark.cassandra.input.split.size of 48 + ).load() // This Dataset will use a spark.cassandra.input.split.size of 48 ``` +### Creating Datasets using Read Commands -#### Example Using TypeSafe Parameter Configuration Options -There are also some helper method which simplifies setting Spark Cassandra Connector related parameters. They are a part -of `CassandraSqlContext`: -```scala -// set params for all clusters and keyspaces -sqlContext.setConf(CassandraConnectorConf.KeepAliveMillisParam.option(10000)) - -// set params for the particular cluster -sqlContext.setConf("Cluster1", CassandraConnectorConf.ConnectionHostParam.option("127.0.0.1") ++ CassandraConnectorConf.ConnectionPortParam.option(12345)) -sqlContext.setConf("Cluster2", CassandraConnectorConf.ConnectionHostParam.option("127.0.0.2")) - -// set params for the particular keyspace -sqlContext.setConf("Cluster1", "ks1", ReadConf.SplitSizeInMBParam.option(128)) -sqlContext.setConf("Cluster1", "ks2", ReadConf.SplitSizeInMBParam.option(64)) -sqlContext.setConf("Cluster2", "ks3", ReadConf.SplitSizeInMBParam.option(80)) -``` - -###Creating DataFrames using Read Commands +The most programmatic way to create a Dataset is to invoke a `read` command on the SparkSession. This +will build a `DataFrameReader`. Specify `format` as `org.apache.spark.sql.cassandra`. +You can then use `options` to give a map of `Map[String,String]` of options as described above. +Then finish by calling `load` to actually get a `Dataset`. This code is all lazy +and will not actually load any data until an action is called. + +As well as specifying all these parameters manually, we offer a set of +[helper functions](#example-using-format-helper-functions) to make this easier as well. -The most programmatic way to create a data frame is to invoke a `read` command on the SQLContext. This - will build a `DataFrameReader`. Specify `format` as `org.apache.spark.sql.cassandra`. - You can then use `options` to give a map of `Map[String,String]` of options as described above. - Then finish by calling `load` to actually get a `DataFrame`. -#### Example Creating a DataFrame using a Read Command +#### Example Creating a Dataset using a Read Command ```scala -val df = sqlContext +val df = spark .read .format("org.apache.spark.sql.cassandra") .options(Map( "table" -> "words", "keyspace" -> "test" )) .load() + df.show ``` ``` @@ -106,92 +121,72 @@ cat 30 fox 40 ``` -There are also some helper methods which can make creating data frames easier. They can be accessed after importing -`org.apache.spark.sql.cassandra` package. In the following example, all the commands used to create a data frame are -equivalent: +There are also some helper methods which can make creating Datasets easier. They can +be accessed after importing `org.apache.spark.sql.cassandra` package. In the following +example, all the commands used to create the Dataset are equivalent: #### Example Using Format Helper Functions ```scala import org.apache.spark.sql.cassandra._ -val df1 = sqlContext +val df = spark .read - .format("org.apache.spark.sql.cassandra") - .options(Map("table" -> "words", "keyspace" -> "test", "cluster" -> "cluster_A")) + .cassandraFormat("words", "test") .load() - -val df2 = sqlContext + +//Loading an Dataset using a format helper and a option helper +val df = spark .read - .cassandraFormat("words", "test", "cluster_A") + .cassandraFormat("words", "test") + .options(ReadConf.SplitSizeInMBParam.option(32)) .load() + ``` -### Creating DataFrames using Spark SQL +### Creating Datasets using Spark SQL -Accessing data Frames using Spark SQL involves creating temporary tables and specifying the -source as `org.apache.spark.sql.cassandra`. The `OPTIONS` passed to this table are used to -establish a relation between the CassandraTable and the internally used DataSource. +Accessing Datasets using Spark SQL involves creating temporary views with the format + as `org.apache.spark.sql.cassandra`. The `OPTIONS` passed to this table are used to +establish a relation between the CassandraTable and the Spark catalog reference. #### Example Creating a Source Using Spark SQL: Create Relation with the Cassandra table test.words ```scala -scala> sqlContext.sql( - """CREATE TEMPORARY TABLE words - |USING org.apache.spark.sql.cassandra - |OPTIONS ( - | table "words", - | keyspace "test", - | cluster "Test Cluster", - | pushdown "true" - |)""".stripMargin) -scala> val df = sqlContext.sql("SELECT * FROM words") -scala> df.show() -``` -``` -word count -cat 30 -fox 40 -``` -```scala -scala> df.filter(df("count") > 30).show -``` -``` -word count -fox 40 -``` - -In addition you can use Spark SQL on the registered tables: +val createDDL = """CREATE TEMPORARY VIEW words + USING org.apache.spark.sql.cassandra + OPTIONS ( + table "words", + keyspace "test", + cluster "Test Cluster", + pushdown "true")""" +spark.sql(createDDL) // Creates Catalog Entry registering an existing Cassandra Table +spark.sql("SELECT * FROM words").show +spark.sql("SELECT * FROM words WHERE word = 'fox'").show +``` + +### Persisting a Dataset to Cassandra Using the Save Command +Datasets provide a save function which allows them to persist their data to another +DataSource. The connector supports using this feature to persist a Dataset to a Cassandra +table. + +#### Example Copying Between Two Tables Using Datasets ```scala -sqlContext.sql("SELECT * FROM words WHERE word = 'fox'").collect -``` -``` -Array[org.apache.spark.sql.Row] = Array([fox,40]) -``` - -###Persisting a DataFrame to Cassandra Using the Save Command -DataFrames provide a save function which allows them to persist their data to another -DataSource. The connector supports using this feature to persist a DataFrame a Cassandra -Table. -#### Example Copying Between Two Tables Using DataFrames -```scala -val df = sqlContext +val df = spark .read - .format("org.apache.spark.sql.cassandra") - .options(Map( "table" -> "words", "keyspace" -> "test" )) + .cassandraFormat("words", "test") .load() df.write - .format("org.apache.spark.sql.cassandra") - .options(Map( "table" -> "words_copy", "keyspace" -> "test")) + .cassandraFormat("words_copy", "test") .save() ``` -Similarly to reading Cassandra tables into data frames, we have some helper methods for the write path which are +Similarly to reading Cassandra tables into Datasets, we have some helper methods for the write path which are provided by `org.apache.spark.sql.cassandra` package. In the following example, all the commands are equivalent: -#### Example Using Helper Commands to Write DataFrames +#### Example Using Helper Commands to Write Datasets ```scala import org.apache.spark.sql.cassandra._ @@ -206,7 +201,7 @@ df.write ``` -### Setting Connector specific options on DataFrames +### Setting Connector Specific Options on Datasets Connector specific options can be set by invoking `options` method on either `DataFrameReader` or `DataFrameWriter`. There are several settings you may want to change in `ReadConf`, `WriteConf`, `CassandraConnectorConf`, `AuthConf` and others. Those settings are identified by instances of `ConfigParameter` case class which offers an easy way to apply @@ -229,24 +224,23 @@ multiple parameters can be chained: options(CassandraConnectorConf.ReadTimeoutParam.sqlOption("7000") ++ ReadConf.TaskMetricParam.sqlOption(true)) ``` -###Creating a New Cassandra Table From a DataFrame Schema -Spark Cassandra Connector adds a method to `DataFrame` that allows it to create a new Cassandra table from -the `StructType` schema of the DataFrame. This is convenient for persisting a DataFrame to a new table, especially -when the schema of the DataFrame is not known (fully or at all) ahead of time (at compile time of your application). -Once the new table is created, you can persist the DataFrame to the new table using the save function described above. +### Creating a New Cassandra Table From a Dataset Schema +Spark Cassandra Connector adds a method to `Dataset` that allows it to create a new Cassandra table from +the `StructType` schema of the Dataset. This is convenient for persisting a Dataset to a new table, especially +when the schema of the Dataset is not known (fully or at all) ahead of time (at compile time of your application). +Once the new table is created, you can persist the Dataset to the new table using the save function described above. The partition key and clustering key of the newly generated table can be set by passing in a list of names of columns which should be used as partition key and clustering key. -#### Example Creating a Cassandra Table from a DataFrame +#### Example Creating a Cassandra Table from a Dataset ```scala -// Add spark connector specific methods to DataFrame +// Add spark connector specific methods to Dataset import com.datastax.spark.connector._ -val df = sqlContext +val df = spark .read - .format("org.apache.spark.sql.cassandra") - .options(Map( "table" -> "words", "keyspace" -> "test" )) + .cassandraFormat("words", "test") .load() val renamed = df.withColumnRenamed("col1", "newcolumnname") @@ -257,14 +251,13 @@ renamed.createCassandraTable( clusteringKeyColumns = Some(Seq("newcolumnname"))) renamed.write - .format("org.apache.spark.sql.cassandra") - .options(Map( "table" -> "renamed", "keyspace" -> "test")) + .cassandraFormat("renamed", "test") .save() ``` -### Pushing down clauses to Cassandra -The DataFrame API will automatically pushdown valid where clauses to Cassandra as long as the -pushdown option is enabled (defaults to enabled.) +### Automatic Predicate Pushdown and Column Pruning +The Dataset API will automatically pushdown valid "where" clauses to Cassandra as long as the +pushdown option is enabled (default is enabled). Example Table ```sql @@ -282,16 +275,15 @@ INSERT INTO words (user, word, count ) VALUES ( 'Sam', 'alpha', 3 ); INSERT INTO words (user, word, count ) VALUES ( 'Zebra', 'zed', 100 ); ``` -First we can create a DataFrame and see that it has no `pushdown filters` set in the log. This +First we can create a Dataset and see that it has no `pushdown filters` set in the log. This means all requests will go directly to Cassandra and we will require reading all of the data to `show` -this DataFrame. +this Dataset. #### Example Catalyst Optimization with Cassandra Server Side Pushdowns ```scala -val df = sqlContext +val df = spark .read - .format("org.apache.spark.sql.cassandra") - .options(Map( "table" -> "words", "keyspace" -> "test")) + .cassandraFormat("words", "test") .load df.explain ``` @@ -319,7 +311,7 @@ df.show ``` The example schema has a clustering key of "word" so we can pushdown filters on that column to Cassandra. We -do this by applying a normal DataFrame filter. The connector will automatically determine that the +do this by applying a normal Dataset filter. The connector will automatically determine that the filter can be pushed down and will add it to `pushdown filters`. All of the elements of `pushdown filters` will be automatically added to the CQL requests made to Cassandra for the data from this table. The subsequent call will then only serialize data from Cassandra which passes the filter, @@ -367,13 +359,9 @@ CREATE TABLE pushdownexample ( ); ``` ```scala -val sqlContext = new org.apache.spark.sql.SQLContext(sc) -import sqlContext.implicits._ - -val df = sqlContext +val df = spark .read - .format("org.apache.spark.sql.cassandra") - .options(Map( "table" -> "pushdownexample", "keyspace" -> "pushdowns" )) + .cassandraFormat("pushdownexample", "pushdowns") .load() ``` To push down partition keys, all of them must be included, but not more than one predicate per partition key, otherwise nothing is pushed down. @@ -425,4 +413,11 @@ INFO 2015-08-26 00:56:37 org.apache.spark.sql.cassandra.CassandraSourceRelation INFO 2015-08-26 00:56:37 org.apache.spark.sql.cassandra.CassandraSourceRelation: pushdown filters: ArrayBuffer(EqualTo(clusterkey1,1), EqualTo(clusterkey2,1)) ``` +#### What Happened to DataFrames? + +In Spark 2.0 DataFrames are now just a specific case of the Dataset API. In particular +a DataFrame is just an alias for Dataset\[Row\]. This means everything you know about +DataFrames is also applicable to Datasets. A DataFrame is just a special Dataset that is +made up of Row objects. Many texts and resources still use the two terms interchangeably. + [Next - Python DataFrames](15_python.md) diff --git a/doc/15_python.md b/doc/15_python.md index d7f5d4e6d..d0d31071f 100644 --- a/doc/15_python.md +++ b/doc/15_python.md @@ -26,7 +26,7 @@ source and by specifying keyword arguments for `keyspace` and `table`. #### Example Loading a Cassandra Table as a Pyspark DataFrame ```python - sqlContext.read\ + spark.read\ .format("org.apache.spark.sql.cassandra")\ .options(table="kv", keyspace="test")\ .load().show() @@ -60,4 +60,17 @@ A DataFrame can be saved to an *existing* Cassandra table by using the the `org. The options and parameters are identical to the Scala Data Frames Api so please see [Data Frames](14_data_frames.md) for more information. +### Passing options with periods to the DataFrameReader + +Python does not support using periods(".") in variable names. This makes it +slightly more difficult to pass SCC options to the DataFrameReader. The `options` +function takes `kwargs**` which means you can't directly pass in keys. There is a +workaround though. Python allows you to pass a dictionary as a representation of kwargs and dictionaries +can have keys with periods. + +#### Example of using a dictionary as kwargs + + load_options = { "table": "kv", "keyspace": "test", "spark.cassandra.input.split.size_in_mb": "10"} + spark.read.format("org.apache.spark.sql.cassandra").options(**load_options).load().show() + [Next - Spark Partitioners](16_partitioning.md) diff --git a/doc/9_demos.md b/doc/9_demos.md index dc9dfcfc7..a485c8e4f 100644 --- a/doc/9_demos.md +++ b/doc/9_demos.md @@ -2,4 +2,7 @@ Demos are not a part of Spark Cassandra Connector as of 2.0.0. +Build examples are provided here +[Spark Build Examples](https://github.com/datastax/SparkBuildExamples) + [Back to main page](../README.md) diff --git a/doc/FAQ.md b/doc/FAQ.md index 9cc956153..2df97f778 100644 --- a/doc/FAQ.md +++ b/doc/FAQ.md @@ -2,15 +2,21 @@ ## Frequently Asked Questions -### What does this mean "`NoClassDefFoundError: scala/collection/GenTraversableOnce$class?`" +### Why am I seeing (Scala Version Mismatch Error) + +#### Non Exclusive List of Scala Version Mismatch Errors +* `NoClassDefFoundError: scala/collection/GenTraversableOnce$class?` +* `NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)` +* `missing or invalid dependency detected while loading class file 'ScalaGettableByIndexData.class'` +* `java.lang.NoClassDefFoundError: scala/runtime/AbstractPartialFunction` This means that there is a mix of Scala versions in the libraries used in your -code. The collection api is different between Scala 2.10 and 2.11 and this the -most common error which occurs if a scala 2.10 library is attempted to be loaded +code. The collection API is different between Scala 2.10 and 2.11 and this the +most common error which occurs if a Scala 2.10 library is attempted to be loaded in a Scala 2.11 runtime. To fix this make sure that the name has the correct Scala version suffix to match your Scala version. -##### Spark Cassandra Connector built against Scala 2.10 +##### Spark Cassandra Connector built Against Scala 2.10 ```xml spark-cassandra-connector_2.10 ``` @@ -39,7 +45,7 @@ For reference the defaults of Spark as downloaded from the Apache Website are ### How do I Fix Guava Classpath Errors Guava errors come from a conflict between Guava brought in by some -dependency (like Hadoop 2.7) and the Cassandra java Driver. +dependency (like Hadoop 2.7) and the Cassandra Java Driver. The Cassandra Java Driver is unable to function correctly if an earlier version of Guava is preempting the required version. The Java Driver will throw errors if it determines @@ -90,7 +96,7 @@ determine how many tasks have been generated. To check this look at the UI for y job and see how many tasks are being run. In the current Shell a small progress bar is shown when running stages, the numbers represent (Completed Tasks + Running Tasks) / Total Tasks - [Stage 2:=============================================> (121 + 1) / 200]0 + [Stage 2:=============================================> (121 + 1) / 200] If you see that only a single task has been created this means that the Cassandra Token range has not been split into a enough tasks to be well parallelized on your cluster. The number of @@ -197,13 +203,22 @@ Input.split.size_in_mb uses a internal system table in Cassandra ( >= 2.1.5) to of the data in Cassandra. The table is called system.size_estimates is not meant to be absolutely accurate so there will be some inaccuracy with smaller tables and split sizes. +### java.lang.NoClassDefFoundError: com/twitter/jsr166e/LongAdder is getting thrown? + +This error is commonly thrown when the dependencies of the Spark Cassandra Connector are not +on the runtime classpath of the Spark Application. This is usually caused by not using the +prescribed `--packages` method of adding the Spark Cassandra Connector and it's dependencies +to the runtime classpath. Fix this by following the launch guidelines as shown in the +[quick start guide](0_quick_start.md). + + ### Can I contribute to the Spark Cassandra Connector? -YES! Feel free to start a Jira and detail the changes you would like to make or the feature you +YES! Feel free to start a JIRA and detail the changes you would like to make or the feature you would like to add. We would be happy to discuss it with you and see your work. Feel free to create - a Jira before you have started any work if you would like feedback on an idea. When you have a branch +a JIRA before you have started any work if you would like feedback on an idea. When you have a branch that you are satisfied with and passes all the tests (`/dev/run_tests.sh`) make a GitHub PR against -your target Connector Version and set your Jira to Reviewing. +your target Connector Version and set your JIRA to Reviewing. ### Is there a CassandraRDDMock I can use in my tests? From e19883d59ea5ef8227fe14666ad04231c67b569d Mon Sep 17 00:00:00 2001 From: Russell Spitzer Date: Tue, 4 Apr 2017 15:02:29 -0700 Subject: [PATCH 05/21] Add 1.6.6 Api Doc Links --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index fff72c631..f46b2c52b 100644 --- a/README.md +++ b/README.md @@ -62,9 +62,9 @@ API documentation for the Scala and Java interfaces are available online: * [Spark-Cassandra-Connector](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.1/spark-cassandra-connector/) * [Embedded-Cassandra](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.1/spark-cassandra-connector-embedded/) -### 1.6.5 -* [Spark-Cassandra-Connector](http://datastax.github.io/spark-cassandra-connector/ApiDocs/1.6.5/spark-cassandra-connector/) -* [Embedded-Cassandra](http://datastax.github.io/spark-cassandra-connector/ApiDocs/1.6.5/spark-cassandra-connector-embedded/) +### 1.6.6 +* [Spark-Cassandra-Connector](http://datastax.github.io/spark-cassandra-connector/ApiDocs/1.6.6/spark-cassandra-connector/) +* [Embedded-Cassandra](http://datastax.github.io/spark-cassandra-connector/ApiDocs/1.6.6/spark-cassandra-connector-embedded/) ### 1.5.2 * [Spark-Cassandra-Connector](http://datastax.github.io/spark-cassandra-connector/ApiDocs/1.5.2/spark-cassandra-connector/) From e63b771b2a00cdd771e0fa15eb0ce9056fc5f25d Mon Sep 17 00:00:00 2001 From: Russell Spitzer Date: Wed, 17 May 2017 16:24:31 -0700 Subject: [PATCH 06/21] Preparing 2.0.2 Release MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * SPARKC 492: Protect against Size Estimate Overflows
 * SPARKC-491: add java.time classes support to converters and sparkSQL
 * SPARKC-470: Allow Writes to Static Columnns and Partition Keys --- CHANGES.txt | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGES.txt b/CHANGES.txt index c8df78691..7193290d5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,3 +1,8 @@ +2.0.2 + * Protect against Size Estimate Overflows
(SPARKC-492) + * Add java.time classes support to converters and sparkSQL
(SPARKC-491) + * Allow Writes to Static Columnns and Partition Keys (SPARKC-470) + 2.0.1 * Refactor Custom Scan Method (SPARKC-481) From 75eef872c6234e07dc6c4aff0d7cdfb8bdb51d2e Mon Sep 17 00:00:00 2001 From: Russell Spitzer Date: Wed, 17 May 2017 17:13:28 -0700 Subject: [PATCH 07/21] Fix GenerateDocs --- generateDocs.sh | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/generateDocs.sh b/generateDocs.sh index 2cf9d61b9..4e13add6b 100755 --- a/generateDocs.sh +++ b/generateDocs.sh @@ -1,30 +1,25 @@ #!/bin/bash -SCC_HOME=~/repos/spark-cassandra-connector -OUTPUT=/tmp/SCC_DOC_TEMP +SCC_HOME=$HOME/repos/spark-cassandra-connector +OUTPUT="/tmp/SCC_DOC_TEMP" rm -r $OUTPUT mkdir -p $OUTPUT -for version in $@ ;do - cd $SCC_HOME - echo "Making docs for $version" - - git checkout "v$version" +echo "SPARK CASSANDRA CONNECTOR HOME IS $SCC_HOME" +for VERSION in $@ ;do + echo "Making docs for $VERSION" + git checkout "v$VERSION" if [ $? -ne 0 ]; then - echo "Unable to checkout version $version, skipping" + echo "Unable to checkout version $VERSION, skipping" continue fi - - cd $SCC_HOME sbt clean sbt doc - mkdir $OUTPUT/$version + mkdir $OUTPUT/$VERSION - cd $OUTPUT/$version - for folder in $SCC_HOME/spark*; do - module=$(basename $folder) - cp -vr $folder/target/scala-2.10/api $module + for FOLDER in $SCC_HOME/spark*; do + echo "COPYING $FOLDER to $OUTPUT/$VERSION/$MODULE" + MODULE=$(basename $folder) + cp -vr $FOLDER/target/scala-2.10/api $OUTPUT/$VERSION/$MODULE done done - -cd $SCC_HOME git checkout gh-pages cp -r $OUTPUT/* $SCC_HOME/ApiDocs From 952a36da16b523f11c7780bf9d329cab2a98d794 Mon Sep 17 00:00:00 2001 From: Russell Spitzer Date: Wed, 17 May 2017 17:18:59 -0700 Subject: [PATCH 08/21] Updated Api Doc Links --- README.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index f46b2c52b..ec92df2d5 100644 --- a/README.md +++ b/README.md @@ -4,9 +4,9 @@ Chat with us at [DataStax Academy #spark-cassandra-connector](#datastax-academy) ### Most Recent Release Scala Docs -### 2.0.1 -* [Spark-Cassandra-Connector](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.1/spark-cassandra-connector/) -* [Embedded-Cassandra](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.1/spark-cassandra-connector-embedded/) +### 2.0.2 +* [Spark-Cassandra-Connector](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.2/spark-cassandra-connector/) +* [Embedded-Cassandra](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.2/spark-cassandra-connector-embedded/) [All Versions API Docs](#hosted-api-docs) @@ -58,9 +58,9 @@ development for the next connector release in progress. ## Hosted API Docs API documentation for the Scala and Java interfaces are available online: -### 2.0.1 -* [Spark-Cassandra-Connector](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.1/spark-cassandra-connector/) -* [Embedded-Cassandra](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.1/spark-cassandra-connector-embedded/) +### 2.0.2 +* [Spark-Cassandra-Connector](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.2/spark-cassandra-connector/) +* [Embedded-Cassandra](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.2/spark-cassandra-connector-embedded/) ### 1.6.6 * [Spark-Cassandra-Connector](http://datastax.github.io/spark-cassandra-connector/ApiDocs/1.6.6/spark-cassandra-connector/) From 8e7ee2f4c0101c86b192346df65621e9dc529ee0 Mon Sep 17 00:00:00 2001 From: Jacek Laskowski Date: Thu, 18 May 2017 10:53:16 +0200 Subject: [PATCH 09/21] Add link to spark-connector Slack channel at DataStax Academy Slack --- README.md | 37 +++++++++++++++++++++---------------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index ec92df2d5..6587892e0 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,7 @@ # Spark Cassandra Connector [![Build Status](https://travis-ci.org/datastax/spark-cassandra-connector.svg)](http://travis-ci.org/datastax/spark-cassandra-connector) ### [Spark Cassandra Connector Spark Packages Website](http://spark-packages.org/package/datastax/spark-cassandra-connector) -Chat with us at [DataStax Academy #spark-cassandra-connector](#datastax-academy) + +Chat with us at [DataStax Academy's #spark-connector Slack channel](#Slack) ### Most Recent Release Scala Docs @@ -29,7 +30,7 @@ execute arbitrary CQL queries in your Spark applications. - Partition RDDs according to Cassandra replication using `repartitionByCassandraReplica` call - Converts data types between Cassandra and Scala - Supports all Cassandra data types including collections - - Filters rows on the server side via the CQL `WHERE` clause + - Filters rows on the server side via the CQL `WHERE` clause - Allows for execution of arbitrary CQL statements - Plays nice with Cassandra Virtual Nodes - Works with PySpark DataFrames @@ -81,7 +82,7 @@ API documentation for the Scala and Java interfaces are available online: * [Spark-Cassandra-Connector-Java](http://datastax.github.io/spark-cassandra-connector/ApiDocs/1.3.1/spark-cassandra-connector-java/) * [Embedded-Cassandra](http://datastax.github.io/spark-cassandra-connector/ApiDocs/1.3.1/spark-cassandra-connector-embedded/) -### 1.2.0 +### 1.2.0 * [Spark-Cassandra-Connector](http://datastax.github.io/spark-cassandra-connector/ApiDocs/1.2.0/spark-cassandra-connector/) * [Spark-Cassandra-Connector-Java](http://datastax.github.io/spark-cassandra-connector/ApiDocs/1.2.0/spark-cassandra-connector-java/) * [Embedded-Cassandra](http://datastax.github.io/spark-cassandra-connector/ApiDocs/1.2.0/spark-cassandra-connector-embedded/) @@ -91,17 +92,17 @@ This project is available on Spark Packages; this is the easiest way to start us http://spark-packages.org/package/datastax/spark-cassandra-connector This project has also been published to the Maven Central Repository. -For SBT to download the connector binaries, sources and javadoc, put this in your project +For SBT to download the connector binaries, sources and javadoc, put this in your project SBT config: - + libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "2.0.0" - + * The default Scala version for Spark 2.0+ is 2.11 please choose the appropriate build. See the [FAQ](doc/FAQ.md) for more information ## Building See [Building And Artifacts](doc/12_building_and_artifacts.md) - + ## Documentation - [Quick-start guide](doc/0_quick_start.md) @@ -125,25 +126,29 @@ See [Building And Artifacts](doc/12_building_and_artifacts.md) - [Tips for Developing the Spark Cassandra Connector](doc/developers.md) ## Online Training + ### DataStax Academy + DataStax Academy provides free online training for Apache Cassandra and DataStax Enterprise. In [DS320: Analytics with Spark](https://academy.datastax.com/courses/ds320-analytics-with-apache-spark), you will learn how to effectively and efficiently solve analytical problems with Apache Spark, Apache Cassandra, and DataStax Enterprise. You will learn about Spark API, Spark-Cassandra Connector, Spark SQL, Spark Streaming, and crucial performance optimization techniques. ## Community + ### Reporting Bugs + New issues may be reported using [JIRA](https://datastax-oss.atlassian.net/browse/SPARKC/). Please include all relevant details including versions of Spark, Spark Cassandra Connector, Cassandra and/or DSE. A minimal reproducible case with sample code is ideal. ### Mailing List + Questions and requests for help may be submitted to the [user mailing list](http://groups.google.com/a/lists.datastax.com/forum/#!forum/spark-connector-user). -### Gitter -Datastax is consolidating our chat resources to Slack at [DataStax Academy](#datastax-academy) +### Slack -The gitter room will be shut down in the near future -[![Join the chat at https://gitter.im/datastax/spark-cassandra-connector](https://badges.gitter.im/datastax/spark-cassandra-connector.svg)](https://gitter.im/datastax/spark-cassandra-connector?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) +The project uses Slack to facilitate conversation in our community. Find us in `#spark-connector` channel at [DataStax Academy Slack](https://academy.datastax.com/slack). ### IRC + \#spark-cassandra-connector on irc.freenode.net. If you are new to IRC, you can use a [web-based client](http://webchat.freenode.net/?channels=#spark-cassandra-connector). ## Contributing @@ -174,20 +179,20 @@ To run unit and integration tests: By default, integration tests start up a separate, single Cassandra instance and run Spark in local mode. It is possible to run integration tests with your own Cassandra and/or Spark cluster. First, prepare a jar with testing code: - + ./sbt/sbt test:package - + Then copy the generated test jar to your Spark nodes and run: export IT_TEST_CASSANDRA_HOST= export IT_TEST_SPARK_MASTER= ./sbt/sbt it:test - + ## Generating Documents -To generate the Reference Document use +To generate the Reference Document use ./sbt/sbt spark-cassandra-connector-unshaded/run (outputLocation) - + outputLocation defaults to doc/reference.md ## License From 3a33912345cbda09cdea80100d1a79dfd6f4cf83 Mon Sep 17 00:00:00 2001 From: Jacek Laskowski Date: Mon, 22 May 2017 20:08:44 +0200 Subject: [PATCH 10/21] [DOCS][MINOR] Formatting --- README.md | 2 +- doc/1_connecting.md | 20 ++++++++------------ 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 6587892e0..eef07c41a 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # Spark Cassandra Connector [![Build Status](https://travis-ci.org/datastax/spark-cassandra-connector.svg)](http://travis-ci.org/datastax/spark-cassandra-connector) ### [Spark Cassandra Connector Spark Packages Website](http://spark-packages.org/package/datastax/spark-cassandra-connector) -Chat with us at [DataStax Academy's #spark-connector Slack channel](#Slack) +Chat with us at [DataStax Academy's #spark-connector Slack channel](#slack) ### Most Recent Release Scala Docs diff --git a/doc/1_connecting.md b/doc/1_connecting.md index 4647babed..2e75a17aa 100644 --- a/doc/1_connecting.md +++ b/doc/1_connecting.md @@ -8,7 +8,7 @@ how to execute CQL statements from Spark applications. To connect your Spark application to Cassandra, set connection options in the `SparkConf` object. These are prefixed with `spark.` so that they can be recognized -from the spark-shell and set within the $SPARK_HOME/conf/spark-default.conf. +from `spark-shell` and set in `$SPARK_HOME/conf/spark-default.conf`. Example: @@ -21,12 +21,10 @@ val conf = new SparkConf(true) val sc = new SparkContext("spark://192.168.123.10:7077", "test", conf) ``` -Multiple hosts can be passed in using a comma separated list -("127.0.0.1,127.0.0.2"). These are the *initial contact points only*, all -nodes in the local DC will be used upon connecting. +Multiple hosts can be passed in using a comma-separated list in `spark.cassandra.connection.host` +(e.g. `"127.0.0.1,127.0.0.2"`). These are the *initial contact points only*, all nodes in the local DC will be used upon connecting. -See the reference section for a full list of options -[Cassandra Connection Parameters](reference.md#cassandra-connection-parameters) +See the reference section for [Cassandra Connection Parameters](reference.md#cassandra-connection-parameters). ### Connection management @@ -75,7 +73,7 @@ Eventually, when all the tasks needing Cassandra connectivity terminate, the connection to the Cassandra cluster will be closed shortly thereafter. The period of time for keeping unused connections open is controlled by the global `spark.cassandra.connection.keep_alive_ms` system property, -see [Cassandra Connection Parameters](reference.md#cassandra-connection-parameters) +see [Cassandra Connection Parameters](reference.md#cassandra-connection-parameters). ### Connecting manually to Cassandra @@ -101,13 +99,12 @@ CassandraConnector(conf).withSessionDo { session => ### Connecting to multiple Cassandra Clusters The Spark Cassandra Connector is able to connect to multiple Cassandra -Clusters for all of it's normal operations. The default `CassandraConnector` -object used by calls to `sc.cassandraTable` and `saveToCassandra` is -specified by the `SparkConfiguration`. If you would like to use multiple clusters, +Clusters for all of its normal operations. +The default `CassandraConnector` object used by calls to `sc.cassandraTable` and `saveToCassandra` is specified by the `SparkConfiguration`. If you would like to use multiple clusters, an implicit `CassandraConnector` can be used in a code block to specify the target cluster for all operations in that block. -####Example of reading from one cluster and writing to another +#### Example of reading from one cluster and writing to another ```scala import com.datastax.spark.connector._ @@ -115,7 +112,6 @@ import com.datastax.spark.connector.cql._ import org.apache.spark.SparkContext - def twoClusterExample ( sc: SparkContext) = { val connectorToClusterOne = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "127.0.0.1")) val connectorToClusterTwo = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "127.0.0.2")) From fe9911b22dfde16b0c9b5544c0abadd71f3e9b07 Mon Sep 17 00:00:00 2001 From: Brian Cantoni Date: Tue, 30 May 2017 09:32:29 -0700 Subject: [PATCH 11/21] SPARKC-493: Fix generate docs script For some reason the logic for finding the module names was not working (at least on my Mac). Here I simplify it by just iterating over the two folders we know about, therefore making sure the output files exist in their own separate folders. --- generateDocs.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/generateDocs.sh b/generateDocs.sh index 4e13add6b..62a8d43c8 100755 --- a/generateDocs.sh +++ b/generateDocs.sh @@ -15,9 +15,9 @@ for VERSION in $@ ;do sbt doc mkdir $OUTPUT/$VERSION - for FOLDER in $SCC_HOME/spark*; do + for MODULE in spark-cassandra-connector spark-cassandra-connector-embedded; do + FOLDER=$SCC_HOME/$MODULE echo "COPYING $FOLDER to $OUTPUT/$VERSION/$MODULE" - MODULE=$(basename $folder) cp -vr $FOLDER/target/scala-2.10/api $OUTPUT/$VERSION/$MODULE done done From 6ac59def30f730023c5e7325527da40439d1be40 Mon Sep 17 00:00:00 2001 From: Russell Spitzer Date: Mon, 12 Jun 2017 16:04:45 -0700 Subject: [PATCH 12/21] 1.6.7 API Docs --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index eef07c41a..116cb9b73 100644 --- a/README.md +++ b/README.md @@ -63,9 +63,9 @@ API documentation for the Scala and Java interfaces are available online: * [Spark-Cassandra-Connector](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.2/spark-cassandra-connector/) * [Embedded-Cassandra](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.2/spark-cassandra-connector-embedded/) -### 1.6.6 -* [Spark-Cassandra-Connector](http://datastax.github.io/spark-cassandra-connector/ApiDocs/1.6.6/spark-cassandra-connector/) -* [Embedded-Cassandra](http://datastax.github.io/spark-cassandra-connector/ApiDocs/1.6.6/spark-cassandra-connector-embedded/) +### 1.6.7 +* [Spark-Cassandra-Connector](http://datastax.github.io/spark-cassandra-connector/ApiDocs/1.6.7/spark-cassandra-connector/) +* [Embedded-Cassandra](http://datastax.github.io/spark-cassandra-connector/ApiDocs/1.6.7/spark-cassandra-connector-embedded/) ### 1.5.2 * [Spark-Cassandra-Connector](http://datastax.github.io/spark-cassandra-connector/ApiDocs/1.5.2/spark-cassandra-connector/) From c3ef9e10c8de29031c5ff19d6b99241d2a683855 Mon Sep 17 00:00:00 2001 From: Russell Spitzer Date: Wed, 28 Jun 2017 09:04:01 -0700 Subject: [PATCH 13/21] Update 15_python.md Fix Python package url --- doc/15_python.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/15_python.md b/doc/15_python.md index d0d31071f..d8fe897a3 100644 --- a/doc/15_python.md +++ b/doc/15_python.md @@ -16,7 +16,7 @@ http://spark-packages.org/package/datastax/spark-cassandra-connector ```bash ./bin/pyspark \ - --packages com.datastax.spark:spark-cassandra-connector_2.11:2.0.1-s_2.11 + --packages com.datastax.spark:spark-cassandra-connector_2.11:2.0.1 ``` ### Loading a DataFrame in Python From efc65c7601a5bdace9563279145290b179c81867 Mon Sep 17 00:00:00 2001 From: artemaliev Date: Thu, 29 Jun 2017 22:19:15 +0300 Subject: [PATCH 14/21] Add support for LcalDate year only parsing --- .../com/datastax/spark/connector/types/TypeConverter.scala | 2 ++ .../datastax/spark/connector/types/TypeConverterTest.scala | 7 +++++++ 2 files changed, 9 insertions(+) diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala index 1fac00e62..423832469 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala @@ -463,10 +463,12 @@ object TypeConverter { implicit object LocalDateConverter extends NullableTypeConverter[LocalDate] { def targetTypeTag = LocalDateTypeTag val dateRegx = """(\d\d\d\d)-(\d\d)-(\d\d)""".r + val yearRegx = """(\d\d\d\d)""".r def convertPF = { case x: LocalDate => x case dateRegx(y, m, d) => LocalDate.fromYearMonthDay(y.toInt, m.toInt, d.toInt) + case yearRegx(y) => LocalDate.fromYearMonthDay(y.toInt, 1, 1) case x: Int => LocalDate.fromDaysSinceEpoch(x) case x: JodaLocalDate => LocalDate.fromYearMonthDay(x.getYear, x.getMonthOfYear, x.getDayOfMonth) case x: DateTime => { diff --git a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/types/TypeConverterTest.scala b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/types/TypeConverterTest.scala index f3e825c46..da08f9e19 100644 --- a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/types/TypeConverterTest.scala +++ b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/types/TypeConverterTest.scala @@ -235,6 +235,13 @@ class TypeConverterTest { assertEquals(testDate, c.convert(new org.joda.time.LocalDate(1985, 8, 3))) } + @Test + def testYearLocalDate(): Unit = { + val c = TypeConverter.forType[LocalDate] + val testDate = LocalDate.fromYearMonthDay(1985, 1, 1) + assertEquals(testDate, c.convert("1985")) + } + @Test def testTimeType(): Unit = { val c = TypeConverter.TimeTypeConverter From 5825d92fb0da93004d9334108a970f753e4e41c6 Mon Sep 17 00:00:00 2001 From: Daniil Kudryavtsev Date: Fri, 30 Jun 2017 13:31:04 +0300 Subject: [PATCH 15/21] Update Java API Documentation. Detailed Java API user guide with code examples. --- doc/7_java_api.md | 17 +++++++++++++++++ .../spark/connector/japi/CassandraJavaUtil.java | 10 ++++++---- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/doc/7_java_api.md b/doc/7_java_api.md index 3135580ec..000c49f34 100644 --- a/doc/7_java_api.md +++ b/doc/7_java_api.md @@ -253,6 +253,23 @@ of *RDD* elements and uses a default `JavaBeanColumnMapper` to map those element to attribute translations can be specified in order to override the default logic. If `JavaBeanColumnMapper` is not an option, a custom column mapper can be specified as well. +#### Example of Saving and RDD of Person object with Differently Named Fields +Say we have a table `people2` with columns `id INT`, `last_name TEXT`, `date_of_birth TIMESTAMP` and +we want to save RDD of `Person` class objects to this table. To do it we need to use overloaded `mapToRow(Class, Map)` method. +```java +Map fieldToColumnMapping = new HashMap<>(); +fieldToColumnMapping.put("name", "last_name"); +fieldToColumnMapping.put("birthDate", "date_of_birth"); +javaFunctions(rdd).writerBuilder("ks", "people2", mapToRow(Person.class, fieldToColumnMapping)).saveToCassandra(); +``` +Another version of method `mapToRow(Class, Pair[])` can be considered much more handy for inline invocations. +```java +javaFunctions(rdd).writerBuilder("ks", "people2", mapToRow( + Person.class, + Pair.of("name", "last_name"), + Pair.of("birthDate", "date_of_birth"))) + .saveToCassandra(); +``` ### Working with tuples Since 1.3 there new methods to work with Scala tuples. diff --git a/spark-cassandra-connector/src/main/java/com/datastax/spark/connector/japi/CassandraJavaUtil.java b/spark-cassandra-connector/src/main/java/com/datastax/spark/connector/japi/CassandraJavaUtil.java index 9fd1bbab0..3968d2f93 100644 --- a/spark-cassandra-connector/src/main/java/com/datastax/spark/connector/japi/CassandraJavaUtil.java +++ b/spark-cassandra-connector/src/main/java/com/datastax/spark/connector/japi/CassandraJavaUtil.java @@ -1196,11 +1196,12 @@ public static RowWriterFactory safeMapToRow(Class targetClass, ColumnM *

* The method uses {@link JavaBeanColumnMapper} as the column mapper. If another column mapper has to * be used, see {@link #mapToRow(Class, ColumnMapper)} method. + * @param fieldToColumnNameMap mapping of field name to column name. */ public static RowWriterFactory mapToRow( - Class sourceClass, Map columnNameMappings + Class sourceClass, Map fieldToColumnNameMap ) { - ColumnMapper mapper = javaBeanColumnMapper(safeClassTag(sourceClass), columnNameMappings); + ColumnMapper mapper = javaBeanColumnMapper(safeClassTag(sourceClass), fieldToColumnNameMap); return mapToRow(sourceClass, mapper); } @@ -1216,9 +1217,10 @@ public static RowWriterFactory mapToRow( *

* The method uses {@link JavaBeanColumnMapper} as the column mapper. If another column mapper has to * be used, see {@link #mapToRow(Class, ColumnMapper)} method. + * @param fieldToColumnNameMappings mapping of field name to column name. */ - public static RowWriterFactory mapToRow(Class sourceClass, Pair... columnNameMappings) { - return mapToRow(sourceClass, convertToMap(columnNameMappings)); + public static RowWriterFactory mapToRow(Class sourceClass, Pair... fieldToColumnNameMappings) { + return mapToRow(sourceClass, convertToMap(fieldToColumnNameMappings)); } // ------------------------------------------------------------------------- From 9a50162dade448c386edafa382b949838ca27c96 Mon Sep 17 00:00:00 2001 From: Russell Spitzer Date: Fri, 7 Jul 2017 17:45:57 -0700 Subject: [PATCH 16/21] Update Readme Doc Links --- README.md | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 116cb9b73..a166787a6 100644 --- a/README.md +++ b/README.md @@ -5,9 +5,9 @@ Chat with us at [DataStax Academy's #spark-connector Slack channel](#slack) ### Most Recent Release Scala Docs -### 2.0.2 -* [Spark-Cassandra-Connector](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.2/spark-cassandra-connector/) -* [Embedded-Cassandra](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.2/spark-cassandra-connector-embedded/) +### 2.0.3 +* [Spark-Cassandra-Connector](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.3/spark-cassandra-connector/) +* [Embedded-Cassandra](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.3/spark-cassandra-connector-embedded/) [All Versions API Docs](#hosted-api-docs) @@ -59,13 +59,13 @@ development for the next connector release in progress. ## Hosted API Docs API documentation for the Scala and Java interfaces are available online: -### 2.0.2 -* [Spark-Cassandra-Connector](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.2/spark-cassandra-connector/) -* [Embedded-Cassandra](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.2/spark-cassandra-connector-embedded/) +### 2.0.3 +* [Spark-Cassandra-Connector](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.3/spark-cassandra-connector/) +* [Embedded-Cassandra](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.3/spark-cassandra-connector-embedded/) -### 1.6.7 -* [Spark-Cassandra-Connector](http://datastax.github.io/spark-cassandra-connector/ApiDocs/1.6.7/spark-cassandra-connector/) -* [Embedded-Cassandra](http://datastax.github.io/spark-cassandra-connector/ApiDocs/1.6.7/spark-cassandra-connector-embedded/) +### 1.6.8 +* [Spark-Cassandra-Connector](http://datastax.github.io/spark-cassandra-connector/ApiDocs/1.6.8/spark-cassandra-connector/) +* [Embedded-Cassandra](http://datastax.github.io/spark-cassandra-connector/ApiDocs/1.6.8/spark-cassandra-connector-embedded/) ### 1.5.2 * [Spark-Cassandra-Connector](http://datastax.github.io/spark-cassandra-connector/ApiDocs/1.5.2/spark-cassandra-connector/) @@ -95,7 +95,7 @@ This project has also been published to the Maven Central Repository. For SBT to download the connector binaries, sources and javadoc, put this in your project SBT config: - libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "2.0.0" + libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "2.0.3" * The default Scala version for Spark 2.0+ is 2.11 please choose the appropriate build. See the [FAQ](doc/FAQ.md) for more information From 582c98a5457866a79f67b3161cbb6c568e64afd1 Mon Sep 17 00:00:00 2001 From: Paul Leclercq Date: Tue, 18 Jul 2017 16:00:28 +0200 Subject: [PATCH 17/21] doc: markdown typo - best commit in history --- doc/16_partitioning.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/16_partitioning.md b/doc/16_partitioning.md index 589a2dbf5..63fdbd537 100644 --- a/doc/16_partitioning.md +++ b/doc/16_partitioning.md @@ -296,7 +296,7 @@ built for primitives which is why the above examples use `Tuple1` so frequently. This functionality does not currently function in DataFrames. -###Data Generator +### Data Generator ```scala import com.datastax.spark.connector._ From 1bb71d168459b791a05799e1115f9f695288a5ac Mon Sep 17 00:00:00 2001 From: Brian Cantoni Date: Fri, 28 Jul 2017 10:33:23 -0700 Subject: [PATCH 18/21] README updates - minor restructure for the introduction sections - extend copyright year - fix a few external links for https - drop IRC reference (no longer used) --- README.md | 33 +++++++++++++-------------------- 1 file changed, 13 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index a166787a6..37bb9c968 100644 --- a/README.md +++ b/README.md @@ -1,23 +1,20 @@ -# Spark Cassandra Connector [![Build Status](https://travis-ci.org/datastax/spark-cassandra-connector.svg)](http://travis-ci.org/datastax/spark-cassandra-connector) -### [Spark Cassandra Connector Spark Packages Website](http://spark-packages.org/package/datastax/spark-cassandra-connector) +# Spark Cassandra Connector [![Build Status](https://travis-ci.org/datastax/spark-cassandra-connector.svg)](https://travis-ci.org/datastax/spark-cassandra-connector) -Chat with us at [DataStax Academy's #spark-connector Slack channel](#slack) +## Quick Links -### Most Recent Release Scala Docs +| What | Where | +| ---------- | ----- | +| Packages | [Spark Cassandra Connector Spark Packages Website](https://spark-packages.org/package/datastax/spark-cassandra-connector) | +| Community | Chat with us at [DataStax Academy's #spark-connector Slack channel](#slack) | +| Scala Docs | Most Recent Release (2.0.3): [Spark-Cassandra-Connector](https://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.3/spark-cassandra-connector/), [Embedded-Cassandra](https://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.3/spark-cassandra-connector-embedded/) | -### 2.0.3 -* [Spark-Cassandra-Connector](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.3/spark-cassandra-connector/) -* [Embedded-Cassandra](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.3/spark-cassandra-connector-embedded/) - -[All Versions API Docs](#hosted-api-docs) +## Features -## Lightning-fast cluster computing with Apache Spark(TM) and Apache Cassandra(TM); +*Lightning-fast cluster computing with Apache Spark™ and Apache Cassandra®.* This library lets you expose Cassandra tables as Spark RDDs, write Spark RDDs to Cassandra tables, and execute arbitrary CQL queries in your Spark applications. -## Features - - Compatible with Apache Cassandra version 2.0 or higher (see table below) - Compatible with Apache Spark 1.0 through 2.0 (see table below) - Compatible with Scala 2.10 and 2.11 @@ -89,7 +86,7 @@ API documentation for the Scala and Java interfaces are available online: ## Download This project is available on Spark Packages; this is the easiest way to start using the connector: -http://spark-packages.org/package/datastax/spark-cassandra-connector +https://spark-packages.org/package/datastax/spark-cassandra-connector This project has also been published to the Maven Central Repository. For SBT to download the connector binaries, sources and javadoc, put this in your project @@ -141,15 +138,11 @@ reproducible case with sample code is ideal. ### Mailing List -Questions and requests for help may be submitted to the [user mailing list](http://groups.google.com/a/lists.datastax.com/forum/#!forum/spark-connector-user). +Questions and requests for help may be submitted to the [user mailing list](https://groups.google.com/a/lists.datastax.com/forum/#!forum/spark-connector-user). ### Slack -The project uses Slack to facilitate conversation in our community. Find us in `#spark-connector` channel at [DataStax Academy Slack](https://academy.datastax.com/slack). - -### IRC - -\#spark-cassandra-connector on irc.freenode.net. If you are new to IRC, you can use a [web-based client](http://webchat.freenode.net/?channels=#spark-cassandra-connector). +The project uses Slack to facilitate conversation in our community. Find us in the `#spark-connector` channel at [DataStax Academy Slack](https://academy.datastax.com/slack). ## Contributing @@ -197,7 +190,7 @@ outputLocation defaults to doc/reference.md ## License -Copyright 2014-2016, DataStax, Inc. +Copyright 2014-2017, DataStax, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at From 4ceaa205863ad858e65c7f8e65ce44a7aea7fc4a Mon Sep 17 00:00:00 2001 From: Aashish Srinivas Date: Mon, 21 Aug 2017 18:06:44 -0400 Subject: [PATCH 19/21] support partially specified writes from case classes --- .../mapper/DefaultColumnMapper.scala | 4 +- .../connector/writer/DefaultRowWriter.scala | 3 +- .../MappedToGettableDataConverter.scala | 4 +- .../mapper/DefaultColumnMapperTest.scala | 9 +--- .../writer/DefaultRowWriterTest.scala | 45 +++++++++++++++++++ .../MappedToGettableDataConverterSpec.scala | 19 +++++--- 6 files changed, 65 insertions(+), 19 deletions(-) diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/mapper/DefaultColumnMapper.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/mapper/DefaultColumnMapper.scala index 325b78f9f..6e360d181 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/mapper/DefaultColumnMapper.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/mapper/DefaultColumnMapper.scala @@ -102,10 +102,10 @@ class DefaultColumnMapper[T : TypeTag](columnNameOverride: Map[String, String] = } yield (getterName, columnRef) }.toMap - // Check if we have all the required columns: + // Check if all columns at start of table description are present in the case class: val mappedColumns = getterMap.values.toSet val unmappedColumns = selectedColumns.filterNot(mappedColumns) - require(unmappedColumns.isEmpty, s"Columns not found in $tpe: [${unmappedColumns.mkString(", ")}]") + require(selectedColumns.endsWith(unmappedColumns), s"Unmapped columns nust be at end of table definition: [${unmappedColumns.mkString(", ")}]") SimpleColumnMapForWriting(getterMap) } diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/DefaultRowWriter.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/DefaultRowWriter.scala index d8fd84840..ba1844ca8 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/DefaultRowWriter.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/DefaultRowWriter.scala @@ -19,13 +19,12 @@ class DefaultRowWriter[T : TypeTag : ColumnMapper]( override def readColumnValues(data: T, buffer: Array[Any]) = { val row = converter.convert(data) - for (i <- columnNames.indices) + for (i <- row.columnValues.indices) buffer(i) = row.getRaw(i) } } object DefaultRowWriter { - def factory[T : ColumnMapper : TypeTag] = new RowWriterFactory[T] { override def rowWriter(tableDef: TableDef, selectedColumns: IndexedSeq[ColumnRef]) = { new DefaultRowWriter[T](tableDef, selectedColumns) diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/MappedToGettableDataConverter.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/MappedToGettableDataConverter.scala index f710e2cc8..4eed8c934 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/MappedToGettableDataConverter.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/MappedToGettableDataConverter.scala @@ -166,7 +166,7 @@ private[connector] object MappedToGettableDataConverter extends Logging{ } private val getters = - columnNames.map(getterByColumnName) + columnNames.flatMap(col => getterByColumnName.get(col)) @transient private val scalaTypes: IndexedSeq[Type] = @@ -179,7 +179,7 @@ private[connector] object MappedToGettableDataConverter extends Logging{ new PropertyExtractor(cls, getters) private val converters = { - for (i <- columnNames.indices) yield { + for (i <- getters.indices) yield { try { val ct = columnTypes(i) val st = scalaTypes(i) diff --git a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/mapper/DefaultColumnMapperTest.scala b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/mapper/DefaultColumnMapperTest.scala index 8376a465e..54c83f009 100644 --- a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/mapper/DefaultColumnMapperTest.scala +++ b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/mapper/DefaultColumnMapperTest.scala @@ -205,11 +205,4 @@ class DefaultColumnMapperTest { new DefaultColumnMapper[DefaultColumnMapperTestClass1]() .columnMapForReading(table1, table1.columnRefs.tail) } - - @Test(expected = classOf[IllegalArgumentException]) - def testNotEnoughPropertiesForWriting(): Unit = { - new DefaultColumnMapper[DefaultColumnMapperTestClass1]() - .columnMapForWriting(table1, table1.columnRefs :+ ColumnName("missingColumn")) - } - -} \ No newline at end of file +} diff --git a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/writer/DefaultRowWriterTest.scala b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/writer/DefaultRowWriterTest.scala index 053b82d37..0769f2d3d 100644 --- a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/writer/DefaultRowWriterTest.scala +++ b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/writer/DefaultRowWriterTest.scala @@ -99,4 +99,49 @@ class DefaultRowWriterTest { TypeConverter.unregisterConverter(StringWrapperConverter) } } + + case class UnderSpecified(c1: String, c2: Int) + case class UnderSpecifiedReversed(c2: Int, c1: String) + case class UnderSpecifiedOutOfOrder(c1: String, c3: Int) + + @Test + def testIgnoresRegularTableColumns(): Unit = { + val column = ColumnDef("c1", PartitionKeyColumn, TextType) + val column2 = ColumnDef("c2", RegularColumn, IntType) + val column3 = ColumnDef("c3", RegularColumn, IntType) + val table = TableDef("test", "table", Seq(column), Nil, Seq(column2, column3)) + val rowWriter = new DefaultRowWriter[UnderSpecified](table, table.columnRefs) + val obj = UnderSpecified("some text", 10) + val buf = Array.ofDim[Any](3) + rowWriter.readColumnValues(obj, buf) + assertEquals("some text", buf(0)) + assertEquals(10, buf(1)) + assertEquals(null, buf(2)) + } + @Test + def testIgnoresReglarTableColumnsReversedOrder(): Unit = { + val column = ColumnDef("c1", PartitionKeyColumn, TextType) + val column2 = ColumnDef("c2", RegularColumn, IntType) + val column3 = ColumnDef("c3", RegularColumn, IntType) + val table = TableDef("test", "table", Seq(column), Nil, Seq(column2, column3)) + val rowWriter = new DefaultRowWriter[UnderSpecifiedReversed](table, table.columnRefs) + val obj = UnderSpecifiedReversed(10, "some text") + val buf = Array.ofDim[Any](3) + rowWriter.readColumnValues(obj, buf) + assertEquals("some text", buf(0)) + assertEquals(10, buf(1)) + assertEquals(null, buf(2)) + } + + @Test(expected = classOf[IllegalArgumentException]) + def testWriterFailsForColumnsOutOfOrder(): Unit = { + val column = ColumnDef("c1", PartitionKeyColumn, TextType) + val column2 = ColumnDef("c2", RegularColumn, IntType) + val column3 = ColumnDef("c3", RegularColumn, IntType) + val table = TableDef("test", "table", Seq(column), Nil, Seq(column2, column3)) + val rowWriter = new DefaultRowWriter[UnderSpecifiedOutOfOrder](table, table.columnRefs) + val obj = UnderSpecifiedOutOfOrder("some text", 10) + val buf = Array.ofDim[Any](3) + rowWriter.readColumnValues(obj, buf) + } } diff --git a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/writer/MappedToGettableDataConverterSpec.scala b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/writer/MappedToGettableDataConverterSpec.scala index 1d6ee1c61..e882da544 100644 --- a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/writer/MappedToGettableDataConverterSpec.scala +++ b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/writer/MappedToGettableDataConverterSpec.scala @@ -17,6 +17,7 @@ class MappedToGettableDataConverterSpec extends FlatSpec with Matchers { val addressType = UserDefinedType("address", IndexedSeq(streetColumn, numberColumn)) val loginColumn = ColumnDef("login", PartitionKeyColumn, VarCharType) + val logoutColumn = ColumnDef("logout", RegularColumn, VarCharType) val passwordColumn = ColumnDef("password", RegularColumn, VarCharType) val addressColumn = ColumnDef("address", RegularColumn, addressType) val addressesColumn = ColumnDef("addresses", RegularColumn, ListType(addressType)) @@ -27,9 +28,9 @@ class MappedToGettableDataConverterSpec extends FlatSpec with Matchers { new TableDef("test", "test", partitionKeyColumns, clusteringColumns, regularColumns) } - case class User(login: String, password: String) case class Address(street: String, number: Int) + case class Session(logout: String, login: String) "MappedToGettableDataConverter" should "be Serializable" in { val userTable = newTable(loginColumn, passwordColumn) @@ -278,18 +279,17 @@ class MappedToGettableDataConverterSpec extends FlatSpec with Matchers { row.getTupleValue(1).getUDTValue(2).getInt("number") shouldEqual 3 } - class UnknownType - case class UserWithUnknownType(login: String, address: UnknownType) + case class UserWithUnknownType(login: String, address: String) it should "throw a meaningful exception when a column has an incorrect type" in { val userTable = newTable(loginColumn, addressColumn) - val user = UserWithUnknownType("foo", new UnknownType) + val user = UserWithUnknownType("foo", "bar") val exception = the[IllegalArgumentException] thrownBy { val converter = MappedToGettableDataConverter[UserWithUnknownType](userTable, userTable.columnRefs) converter.convert(user) } exception.getMessage should include("UserWithUnknownType") - exception.getMessage should include("UnknownType") + exception.getMessage should include("String") exception.getMessage should include("address") exception.getMessage should include("test") } @@ -323,4 +323,13 @@ class MappedToGettableDataConverterSpec extends FlatSpec with Matchers { row.getString("login") shouldEqual "login" row.getString("password") shouldEqual "password" } + + it should "work when fields are reversed" in { + val loginTable = newTable(loginColumn, logoutColumn) + val converter = MappedToGettableDataConverter[Session](loginTable, loginTable.columnRefs) + val value = Session("user_logout", "user_login") + val row = converter.convert(value) + row.getString("login") shouldEqual "user_login" + row.getString("logout") shouldEqual "user_logout" + } } From 96ab9c411737d8436ccbe58020c1803ffed91356 Mon Sep 17 00:00:00 2001 From: Aashish Srinivas Date: Thu, 24 Aug 2017 13:45:42 -0400 Subject: [PATCH 20/21] isTopLevel flag for nested UDFs should require no unmapped columns --- .../spark/connector/mapper/ColumnMapper.scala | 2 ++ .../mapper/DefaultColumnMapper.scala | 9 +++-- .../MappedToGettableDataConverter.scala | 8 ++--- .../MappedToGettableDataConverterSpec.scala | 33 +++++++++++++++++-- 4 files changed, 43 insertions(+), 9 deletions(-) diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/mapper/ColumnMapper.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/mapper/ColumnMapper.scala index 72d141055..fd2ee47e3 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/mapper/ColumnMapper.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/mapper/ColumnMapper.scala @@ -26,6 +26,8 @@ import scala.reflect.runtime.universe._ */ trait ColumnMapper[T] { + val isTopLevel: Boolean = true + /** Provides a mapping between given table or UDT and properties of type `T`, * useful for creating objects of type `T`. Throws [[IllegalArgumentException]] if * `selectedColumns` does not provide some columns needed to instantiate object of type `T`*/ diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/mapper/DefaultColumnMapper.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/mapper/DefaultColumnMapper.scala index 6e360d181..d890077f8 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/mapper/DefaultColumnMapper.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/mapper/DefaultColumnMapper.scala @@ -102,11 +102,16 @@ class DefaultColumnMapper[T : TypeTag](columnNameOverride: Map[String, String] = } yield (getterName, columnRef) }.toMap - // Check if all columns at start of table description are present in the case class: + /** if this is a column mapper for the top level row, then handle gracefully a set of unmapped columns at the end of + * the table columns. Otherwise, if this is a nested type, then require that there are be no unmapped columns. + */ val mappedColumns = getterMap.values.toSet val unmappedColumns = selectedColumns.filterNot(mappedColumns) - require(selectedColumns.endsWith(unmappedColumns), s"Unmapped columns nust be at end of table definition: [${unmappedColumns.mkString(", ")}]") + if (!isTopLevel) + require(unmappedColumns.isEmpty, s"Columns not found in nested $tpe: [${unmappedColumns.mkString(", ")}]") + else + require( selectedColumns.endsWith(unmappedColumns), s"Unmapped columns nust be at end of table definition: [${unmappedColumns.mkString(", ")}]") SimpleColumnMapForWriting(getterMap) } diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/MappedToGettableDataConverter.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/MappedToGettableDataConverter.scala index 4eed8c934..bce982b3a 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/MappedToGettableDataConverter.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/MappedToGettableDataConverter.scala @@ -43,7 +43,7 @@ private[connector] object MappedToGettableDataConverter extends Logging{ * [[com.datastax.spark.connector.mapper.JavaBeanColumnMapper JavaBeanColumnMapper]], * and for everything else uses * [[com.datastax.spark.connector.mapper.DefaultColumnMapper DefaultColumnMapper]] */ - private def columnMapper[U: TypeTag]: ColumnMapper[U] = { + private def columnMapper[U: TypeTag](topLevel: Boolean = true): ColumnMapper[U] = { logDebug(s"Finding a UDT ColumnMapper for typeTag ${typeTag[U]}") val tpe = SparkReflectionLock.synchronized(typeTag[U].tpe) if (ReflectionUtil.isScalaTuple(tpe)) @@ -51,7 +51,7 @@ private[connector] object MappedToGettableDataConverter extends Logging{ else if (isJavaBean) new JavaBeanColumnMapper[U]()(ReflectionUtil.classTag[U]) else - new DefaultColumnMapper[U] + new DefaultColumnMapper[U] { override val isTopLevel = topLevel } } /** Returns true for tuple types that provide full type information of their components */ @@ -112,13 +112,13 @@ private[connector] object MappedToGettableDataConverter extends Logging{ case (t: StructDef, TypeRef(_, _, List(argScalaType))) if scalaType <:< typeOf[Option[Any]] => type U2 = u2 forSome {type u2} implicit val tt = ReflectionUtil.typeToTypeTag[U2](argScalaType) - implicit val cm: ColumnMapper[U2] = columnMapper[U2] + implicit val cm: ColumnMapper[U2] = columnMapper[U2]() apply[U2](t, t.columnRefs, Some(childClassloader)) // UDTs mapped to case classes and tuples mapped to Scala tuples. // ColumnMappers support mapping Scala tuples, so we don't need a special case for them. case (t: StructDef, _) => - implicit val cm: ColumnMapper[U] = columnMapper[U] + implicit val cm: ColumnMapper[U] = columnMapper[U](false) apply[U](t, t.columnRefs, Some(childClassloader)) // Primitive types diff --git a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/writer/MappedToGettableDataConverterSpec.scala b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/writer/MappedToGettableDataConverterSpec.scala index e882da544..b0a7715dd 100644 --- a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/writer/MappedToGettableDataConverterSpec.scala +++ b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/writer/MappedToGettableDataConverterSpec.scala @@ -31,6 +31,7 @@ class MappedToGettableDataConverterSpec extends FlatSpec with Matchers { case class User(login: String, password: String) case class Address(street: String, number: Int) case class Session(logout: String, login: String) + case class SessionWithPassword(logout: String, login: String, password: Option[String]) "MappedToGettableDataConverter" should "be Serializable" in { val userTable = newTable(loginColumn, passwordColumn) @@ -279,21 +280,47 @@ class MappedToGettableDataConverterSpec extends FlatSpec with Matchers { row.getTupleValue(1).getUDTValue(2).getInt("number") shouldEqual 3 } - case class UserWithUnknownType(login: String, address: String) + class UnknownType + case class UserWithUnknownType(login: String, address: UnknownType) it should "throw a meaningful exception when a column has an incorrect type" in { val userTable = newTable(loginColumn, addressColumn) - val user = UserWithUnknownType("foo", "bar") + val user = UserWithUnknownType("foo", new UnknownType) val exception = the[IllegalArgumentException] thrownBy { val converter = MappedToGettableDataConverter[UserWithUnknownType](userTable, userTable.columnRefs) converter.convert(user) } exception.getMessage should include("UserWithUnknownType") - exception.getMessage should include("String") + exception.getMessage should include("UnknownType") exception.getMessage should include("address") exception.getMessage should include("test") } + case class UnderSpecifiedAddress(street: String) + case class UnderSpecifiedUser1(login: String, address: UnderSpecifiedAddress) + case class UnderSpecifiedUser2(login: String) + + it should "throw a meaningful exception when a nested column is not fully specified" in { + val userTable = newTable(loginColumn, addressColumn) + val user = UnderSpecifiedUser1("foo", UnderSpecifiedAddress("main street")) + val exception = the[IllegalArgumentException] thrownBy { + val converter = MappedToGettableDataConverter[UnderSpecifiedUser1](userTable, userTable.columnRefs) + converter.convert(user) + } + exception.getMessage should include("UnderSpecifiedAddress") + exception.getMessage should include("address") + exception.getMessage should include("test") + } + + it should "work when the top level row is not fully specified" in { + val userTable = newTable(loginColumn, addressColumn) + val user = UnderSpecifiedUser2("foo") + val converter = MappedToGettableDataConverter[UnderSpecifiedUser2](userTable, userTable.columnRefs) + val row = converter.convert(user) + row.getString("login") shouldEqual "foo" + row.length shouldEqual 1 + } + it should "throw a meaningful exception when a tuple field has an incorrect number of components" in { val tupleType = TupleType( TupleFieldDef(0, IntType), From 727a7e4e7fb6d25bc96fde504ab88600cbb48b04 Mon Sep 17 00:00:00 2001 From: Aashish Srinivas Date: Thu, 31 Aug 2017 12:20:17 -0400 Subject: [PATCH 21/21] fix integration test --- .../spark/connector/rdd/AbstractCassandraJoin.scala | 1 + .../com/datastax/spark/connector/types/TypeConverter.scala | 3 +++ .../datastax/spark/connector/writer/DefaultRowWriter.scala | 7 +++++++ .../connector/writer/MappedToGettableDataConverter.scala | 3 ++- .../com/datastax/spark/connector/writer/RowWriter.scala | 2 ++ 5 files changed, 15 insertions(+), 1 deletion(-) diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/AbstractCassandraJoin.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/AbstractCassandraJoin.scala index 550598f64..d74212c56 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/AbstractCassandraJoin.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/AbstractCassandraJoin.scala @@ -65,6 +65,7 @@ private[rdd] trait AbstractCassandraJoin[L, R] { // Initialize RowWriter and Query to be used for accessing Cassandra rowWriter.columnNames + rowWriter.isValidRowWriter singleKeyCqlQuery.length def checkSingleColumn(column: ColumnRef): Unit = { diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala index 423832469..fccfba5c9 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala @@ -45,6 +45,9 @@ trait TypeConverter[T] extends Serializable { throw new TypeConversionException(s"Cannot convert object $obj to $targetTypeName.") ) } + + def filteredColumns: Set[String] = Set() + } /** Handles nullable types and converts any null to null. */ diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/DefaultRowWriter.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/DefaultRowWriter.scala index ba1844ca8..bfadcc14c 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/DefaultRowWriter.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/DefaultRowWriter.scala @@ -22,6 +22,13 @@ class DefaultRowWriter[T : TypeTag : ColumnMapper]( for (i <- row.columnValues.indices) buffer(i) = row.getRaw(i) } + + override def isValidRowWriter(): Unit = { + val availableColumns = converter.filteredColumns + val unmatchedPartitionKeys = table.partitionKey.filter(pk => !availableColumns.contains(pk.columnName)) + require(unmatchedPartitionKeys.isEmpty, + s"The following partition keys are missing: [${unmatchedPartitionKeys.mkString(",")}]") + } } object DefaultRowWriter { diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/MappedToGettableDataConverter.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/MappedToGettableDataConverter.scala index bce982b3a..8b990f599 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/MappedToGettableDataConverter.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/MappedToGettableDataConverter.scala @@ -196,7 +196,8 @@ private[connector] object MappedToGettableDataConverter extends Logging{ } override def targetTypeTag = typeTag[struct.ValueRepr] - + override def filteredColumns = + columnNames.toSet.intersect(columnMap.getters.values.map(_.toString).toSet) override def convertPF = { case obj if cls.isInstance(obj) => diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/RowWriter.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/RowWriter.scala index 9c5cd1110..730d73cb0 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/RowWriter.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/RowWriter.scala @@ -14,4 +14,6 @@ trait RowWriter[T] extends Serializable { * in the same order as they are listed in the columnNames sequence. */ def readColumnValues(data: T, buffer: Array[Any]) + def isValidRowWriter(): Unit = {} + }