diff --git a/README.md b/README.md index 4fedac6a9..37bb9c968 100644 --- a/README.md +++ b/README.md @@ -1,22 +1,20 @@ -# Spark Cassandra Connector [![Build Status](https://travis-ci.org/datastax/spark-cassandra-connector.svg)](http://travis-ci.org/datastax/spark-cassandra-connector) -### [Spark Cassandra Connector Spark Packages Website](http://spark-packages.org/package/datastax/spark-cassandra-connector) -Chat with us at [DataStax Academy #spark-cassandra-connector](#datastax-academy) +# Spark Cassandra Connector [![Build Status](https://travis-ci.org/datastax/spark-cassandra-connector.svg)](https://travis-ci.org/datastax/spark-cassandra-connector) -### Most Recent Release Scala Docs +## Quick Links -### 2.0.0 -* [Spark-Cassandra-Connector](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.0/spark-cassandra-connector/) -* [Embedded-Cassandra](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.0/spark-cassandra-connector-embedded/) +| What | Where | +| ---------- | ----- | +| Packages | [Spark Cassandra Connector Spark Packages Website](https://spark-packages.org/package/datastax/spark-cassandra-connector) | +| Community | Chat with us at [DataStax Academy's #spark-connector Slack channel](#slack) | +| Scala Docs | Most Recent Release (2.0.3): [Spark-Cassandra-Connector](https://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.3/spark-cassandra-connector/), [Embedded-Cassandra](https://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.3/spark-cassandra-connector-embedded/) | -[All Versions API Docs](#hosted-api-docs) +## Features -## Lightning-fast cluster computing with Apache Spark(TM) and Apache Cassandra(TM); +*Lightning-fast cluster computing with Apache Spark™ and Apache Cassandra®.* This library lets you expose Cassandra tables as Spark RDDs, write Spark RDDs to Cassandra tables, and execute arbitrary CQL queries in your Spark applications. -## Features - - Compatible with Apache Cassandra version 2.0 or higher (see table below) - Compatible with Apache Spark 1.0 through 2.0 (see table below) - Compatible with Scala 2.10 and 2.11 @@ -29,7 +27,7 @@ execute arbitrary CQL queries in your Spark applications. - Partition RDDs according to Cassandra replication using `repartitionByCassandraReplica` call - Converts data types between Cassandra and Scala - Supports all Cassandra data types including collections - - Filters rows on the server side via the CQL `WHERE` clause + - Filters rows on the server side via the CQL `WHERE` clause - Allows for execution of arbitrary CQL statements - Plays nice with Cassandra Virtual Nodes - Works with PySpark DataFrames @@ -58,13 +56,13 @@ development for the next connector release in progress. ## Hosted API Docs API documentation for the Scala and Java interfaces are available online: -### 2.0.0 -* [Spark-Cassandra-Connector](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.0/spark-cassandra-connector/) -* [Embedded-Cassandra](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.0/spark-cassandra-connector-embedded/) +### 2.0.3 +* [Spark-Cassandra-Connector](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.3/spark-cassandra-connector/) +* [Embedded-Cassandra](http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.0.3/spark-cassandra-connector-embedded/) -### 1.6.5 -* [Spark-Cassandra-Connector](http://datastax.github.io/spark-cassandra-connector/ApiDocs/1.6.5/spark-cassandra-connector/) -* [Embedded-Cassandra](http://datastax.github.io/spark-cassandra-connector/ApiDocs/1.6.5/spark-cassandra-connector-embedded/) +### 1.6.8 +* [Spark-Cassandra-Connector](http://datastax.github.io/spark-cassandra-connector/ApiDocs/1.6.8/spark-cassandra-connector/) +* [Embedded-Cassandra](http://datastax.github.io/spark-cassandra-connector/ApiDocs/1.6.8/spark-cassandra-connector-embedded/) ### 1.5.2 * [Spark-Cassandra-Connector](http://datastax.github.io/spark-cassandra-connector/ApiDocs/1.5.2/spark-cassandra-connector/) @@ -81,24 +79,27 @@ API documentation for the Scala and Java interfaces are available online: * [Spark-Cassandra-Connector-Java](http://datastax.github.io/spark-cassandra-connector/ApiDocs/1.3.1/spark-cassandra-connector-java/) * [Embedded-Cassandra](http://datastax.github.io/spark-cassandra-connector/ApiDocs/1.3.1/spark-cassandra-connector-embedded/) -### 1.2.0 +### 1.2.0 * [Spark-Cassandra-Connector](http://datastax.github.io/spark-cassandra-connector/ApiDocs/1.2.0/spark-cassandra-connector/) * [Spark-Cassandra-Connector-Java](http://datastax.github.io/spark-cassandra-connector/ApiDocs/1.2.0/spark-cassandra-connector-java/) * [Embedded-Cassandra](http://datastax.github.io/spark-cassandra-connector/ApiDocs/1.2.0/spark-cassandra-connector-embedded/) ## Download This project is available on Spark Packages; this is the easiest way to start using the connector: -http://spark-packages.org/package/datastax/spark-cassandra-connector +https://spark-packages.org/package/datastax/spark-cassandra-connector This project has also been published to the Maven Central Repository. -For SBT to download the connector binaries, sources and javadoc, put this in your project +For SBT to download the connector binaries, sources and javadoc, put this in your project SBT config: - - libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "2.0.0" + + libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "2.0.3" + +* The default Scala version for Spark 2.0+ is 2.11 please choose the appropriate build. See the +[FAQ](doc/FAQ.md) for more information ## Building See [Building And Artifacts](doc/12_building_and_artifacts.md) - + ## Documentation - [Quick-start guide](doc/0_quick_start.md) @@ -122,26 +123,26 @@ See [Building And Artifacts](doc/12_building_and_artifacts.md) - [Tips for Developing the Spark Cassandra Connector](doc/developers.md) ## Online Training + ### DataStax Academy + DataStax Academy provides free online training for Apache Cassandra and DataStax Enterprise. In [DS320: Analytics with Spark](https://academy.datastax.com/courses/ds320-analytics-with-apache-spark), you will learn how to effectively and efficiently solve analytical problems with Apache Spark, Apache Cassandra, and DataStax Enterprise. You will learn about Spark API, Spark-Cassandra Connector, Spark SQL, Spark Streaming, and crucial performance optimization techniques. ## Community + ### Reporting Bugs + New issues may be reported using [JIRA](https://datastax-oss.atlassian.net/browse/SPARKC/). Please include all relevant details including versions of Spark, Spark Cassandra Connector, Cassandra and/or DSE. A minimal reproducible case with sample code is ideal. ### Mailing List -Questions and requests for help may be submitted to the [user mailing list](http://groups.google.com/a/lists.datastax.com/forum/#!forum/spark-connector-user). -### Gitter -Datastax is consolidating our chat resources to Slack at [DataStax Academy](#datastax-academy) +Questions and requests for help may be submitted to the [user mailing list](https://groups.google.com/a/lists.datastax.com/forum/#!forum/spark-connector-user). -The gitter room will be shut down in the near future -[![Join the chat at https://gitter.im/datastax/spark-cassandra-connector](https://badges.gitter.im/datastax/spark-cassandra-connector.svg)](https://gitter.im/datastax/spark-cassandra-connector?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) +### Slack -### IRC -\#spark-cassandra-connector on irc.freenode.net. If you are new to IRC, you can use a [web-based client](http://webchat.freenode.net/?channels=#spark-cassandra-connector). +The project uses Slack to facilitate conversation in our community. Find us in the `#spark-connector` channel at [DataStax Academy Slack](https://academy.datastax.com/slack). ## Contributing @@ -171,25 +172,25 @@ To run unit and integration tests: By default, integration tests start up a separate, single Cassandra instance and run Spark in local mode. It is possible to run integration tests with your own Cassandra and/or Spark cluster. First, prepare a jar with testing code: - + ./sbt/sbt test:package - + Then copy the generated test jar to your Spark nodes and run: export IT_TEST_CASSANDRA_HOST= export IT_TEST_SPARK_MASTER= ./sbt/sbt it:test - + ## Generating Documents -To generate the Reference Document use +To generate the Reference Document use ./sbt/sbt spark-cassandra-connector-unshaded/run (outputLocation) - + outputLocation defaults to doc/reference.md ## License -Copyright 2014-2016, DataStax, Inc. +Copyright 2014-2017, DataStax, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at diff --git a/doc/0_quick_start.md b/doc/0_quick_start.md index 06bbfa601..505cd7f65 100644 --- a/doc/0_quick_start.md +++ b/doc/0_quick_start.md @@ -16,14 +16,14 @@ Configure a new Scala project with the Apache Spark and dependency. The dependencies are easily retrieved via the spark-packages.org website. For example, if you're using `sbt`, your build.sbt should include something like this: resolvers += "Spark Packages Repo" at "https://dl.bintray.com/spark-packages/maven" - libraryDependencies += "datastax" % "spark-cassandra-connector" % "2.0.0-s_2.11" + libraryDependencies += "datastax" % "spark-cassandra-connector" % "2.0.1-s_2.11" The spark-packages libraries can also be used with spark-submit and spark shell, these commands will place the connector and all of its dependencies on the path of the Spark Driver and all Spark Executors. - $SPARK_HOME/bin/spark-shell --packages datastax:spark-cassandra-connector:2.0.0-s_2.11 - $SPARK_HOME/bin/spark-submit --packages datastax:spark-cassandra-connector:2.0.0-s_2.11 + $SPARK_HOME/bin/spark-shell --packages datastax:spark-cassandra-connector:2.0.1-s_2.11 + $SPARK_HOME/bin/spark-submit --packages datastax:spark-cassandra-connector:2.0.1-s_2.11 For the list of available versions, see: - https://spark-packages.org/package/datastax/spark-cassandra-connector @@ -59,16 +59,17 @@ Run the `spark-shell` with the packages line for your version. To configure the default Spark Configuration pass key value pairs with `--conf` $SPARK_HOME/bin/spark-shell --conf spark.cassandra.connection.host=127.0.0.1 \ - --packages datastax:spark-cassandra-connector:2.0.0-s_2.11 + --packages datastax:spark-cassandra-connector:2.0.1-s_2.11 This command would set the Spark Cassandra Connector parameter `spark.cassandra.connection.host` to `127.0.0.1`. Change this to the address of one of the nodes in your Cassandra cluster. -Enable Cassandra-specific functions on the `SparkContext`, `RDD`, and `DataFrame`: +Enable Cassandra-specific functions on the `SparkContext`, `SparkSession`, `RDD`, and `DataFrame`: ```scala import com.datastax.spark.connector._ +import org.apache.spark.sql.cassandra._ ``` ### Loading and analyzing data from Cassandra diff --git a/doc/10_embedded.md b/doc/10_embedded.md index 7a9ee69cc..1907d5dcc 100644 --- a/doc/10_embedded.md +++ b/doc/10_embedded.md @@ -24,6 +24,6 @@ Simply add this to your SBT build, or in the appropriate format for a Maven buil "com.datastax.spark" %% "spark-cassandra-connector-embedded" % {latest.version} ## Examples -https://github.com/datastax/SparkBuildExamples +[Spark Build Examples](https://github.com/datastax/SparkBuildExamples) [Next - Performance Monitoring](11_metrics.md) diff --git a/doc/13_spark_shell.md b/doc/13_spark_shell.md index c5e881cfd..c893a5fb4 100644 --- a/doc/13_spark_shell.md +++ b/doc/13_spark_shell.md @@ -2,7 +2,7 @@ ## Using the Spark Cassandra Connector with the Spark Shell -These instructions were last confirmed with Cassandra 3.0.9, Spark 2.0.2 and Connector 2.0.0. +These instructions were last confirmed with Cassandra 3.0.9, Spark 2.0.2 and Connector 2.0.1. For this guide, we assume an existing Cassandra deployment, running either locally or on a cluster, a local installation of Spark and an optional Spark cluster. For detail setup instructions see [setup spark-shell](13_1_setup_spark_shell.md) @@ -18,7 +18,7 @@ Find additional versions at [Spark Packages](http://spark-packages.org/package/d ```bash cd spark/install/dir #Include the --master if you want to run against a spark cluster and not local mode -./bin/spark-shell [--master sparkMasterAddress] --jars yourAssemblyJar --packages datastax:spark-cassandra-connector:2.0.0-s_2.11 --conf spark.cassandra.connection.host=yourCassandraClusterIp +./bin/spark-shell [--master sparkMasterAddress] --jars yourAssemblyJar --packages datastax:spark-cassandra-connector:2.0.1-s_2.11 --conf spark.cassandra.connection.host=yourCassandraClusterIp ``` By default spark will log everything to the console and this may be a bit of an overload. To change this copy and modify the `log4j.properties` template file diff --git a/doc/14_data_frames.md b/doc/14_data_frames.md index e7a3f4e4e..8c290b975 100644 --- a/doc/14_data_frames.md +++ b/doc/14_data_frames.md @@ -1,61 +1,88 @@ # Documentation -## DataFrames +## Datasets (Previously DataFrames) -DataFrames provide a new API for manipulating data within Spark. These provide a more user +Datasets provide a new API for manipulating data within Spark. These provide a more user friendly experience than pure Scala for common queries. The Spark Cassandra Connector provides -an integrated DataSource to make creating Cassandra DataFrames easy. +an integrated Data Source to make creating Cassandra Datasets easy. -Spark Docs: -[Data Sources](https://spark.apache.org/docs/latest/sql-programming-guide.html#data-sources) -[Data Frames](https://spark.apache.org/docs/latest/sql-programming-guide.html#dataframes) +[What happened to DataFrames?](#what-happened-to-dataframes) +Spark Docs: +* [Data Sources](https://spark.apache.org/docs/latest/sql-programming-guide.html#data-sources) +* [Datasets and DataFrames](https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes) -### Options +### Datasource Specific Options DataSources in Spark take a map of Options which define how the source should act. The -Connector provides a CassandraSource which recognizes the following Key Value pairs. +Connector provides a CassandraSource which recognizes the following key/value pairs. Those followed with a default of N/A are required, all others are optional. -| Option Key | Controls | Values | Default | -|-------------|--------------------------------------------------------------|---------------|----------| -| table | The Cassandra table to connect to | String | N/A | -| keyspace | The keyspace where table is looked for | String | N/A | -| cluster | The group of the Cluster Level Settings to inherit | String | "default"| -| pushdown | Enables pushing down predicates to Cassandra when applicable | (true,false) | true | -| confirm.truncate | Confirm to truncate table when use Save.overwrite mode | (true,false) | false | +| Option Key | Controls | Values | Default | +|-------------|-------------------------------------------------------|---------------|----------| +| table | The Cassandra table to connect to | String | N/A | +| keyspace | The keyspace where table is looked for | String | N/A | +| cluster | The group of the Cluster Level Settings to inherit | String | "default"| +| pushdown | Enables pushing down predicates to C* when applicable | (true,false) | true | +| confirm.truncate | Confirm to truncate table when use Save.overwrite mode | (true,false) | false | -#### Read, Writing and CassandraConnector Options +#### General Read, Write and Connection Options Any normal Spark Connector configuration options for Connecting, Reading or Writing -can be passed through as DataFrame options as well. When using the `read` command below these -options should appear exactly the same as when set in the SparkConf. +can be passed through as Dataset options as well. When using the `read` command below these +options should appear exactly the same as when set in the SparkConf. See +[Config Helpers](#example-using-typesafe-parameter-configuration-options) for +typed helpers for setting these options. #### Setting Cluster and Keyspace Level Options The connector also provides a way to describe the options which should be applied to all -DataFrames within a cluster or within a keyspace. When a property has been specified at the +Datasets within a cluster or within a keyspace. When a property has been specified at the table level it will override the default keyspace or cluster property. -To add these properties add keys to your `SparkConf` in the format +To add these properties add keys to your `SparkConf` use the helpers explained + in the next section or by manually entering them in the format + + clusterName:keyspaceName/propertyName + +#### Example Using TypeSafe Parameter Configuration Options +There are also some helper methods which simplify setting Spark Cassandra +Connector related parameters. This makes it easier to set parameters without +remembering the above syntax: +```scala +import org.apache.spark.sql.cassandra._ + +import com.datastax.spark.connector.cql.CassandraConnectorConf +import com.datastax.spark.connector.rdd.ReadConf - clusterName:keyspaceName/propertyName. +// set params for all clusters and keyspaces +spark.setCassandraConf(CassandraConnectorConf.KeepAliveMillisParam.option(10000)) + +// set params for the particular cluster +spark.setCassandraConf("Cluster1", CassandraConnectorConf.ConnectionHostParam.option("127.0.0.1") ++ CassandraConnectorConf.ConnectionPortParam.option(12345)) +spark.setCassandraConf("Cluster2", CassandraConnectorConf.ConnectionHostParam.option("127.0.0.2")) + +// set params for the particular keyspace +spark.setCassandraConf("Cluster1", "ks1", ReadConf.SplitSizeInMBParam.option(128)) +spark.setCassandraConf("Cluster1", "ks2", ReadConf.SplitSizeInMBParam.option(64)) +spark.setCassandraConf("Cluster2", "ks3", ReadConf.SplitSizeInMBParam.option(80)) +``` #### Example Changing Cluster/Keyspace Level Properties ```scala -sqlContext.setConf("ClusterOne/spark.cassandra.input.split.size_in_mb", "32") -sqlContext.setConf("default:test/spark.cassandra.input.split.size_in_mb", "128") -... -val df = sqlContext +spark.setCassandraConf("ClusterOne", "ks1", ReadConf.SplitSizeInMBParam.option(32)) +spark.setCassandraConf("default", "test", ReadConf.SplitSizeInMBParam.option(128)) + +val df = spark .read .format("org.apache.spark.sql.cassandra") .options(Map( "table" -> "words", "keyspace" -> "test")) - .load() // This DataFrame will use a spark.cassandra.input.size of 32 + .load() // This Dataset will use a spark.cassandra.input.size of 128 -val otherdf = sqlContext +val otherdf = spark .read .format("org.apache.spark.sql.cassandra") .options(Map( "table" -> "words", "keyspace" -> "test" , "cluster" -> "ClusterOne")) - .load() // This DataFrame will use a spark.cassandra.input.size of 128 + .load() // This Dataset will use a spark.cassandra.input.size of 32 -val lastdf = sqlContext +val lastdf = spark .read .format("org.apache.spark.sql.cassandra") .options(Map( @@ -64,41 +91,29 @@ val lastdf = sqlContext "cluster" -> "ClusterOne", "spark.cassandra.input.split.size_in_mb" -> 48 ) - ).load() // This DataFrame will use a spark.cassandra.input.split.size of 48 + ).load() // This Dataset will use a spark.cassandra.input.split.size of 48 ``` +### Creating Datasets using Read Commands -#### Example Using TypeSafe Parameter Configuration Options -There are also some helper method which simplifies setting Spark Cassandra Connector related parameters. They are a part -of `CassandraSqlContext`: -```scala -// set params for all clusters and keyspaces -sqlContext.setConf(CassandraConnectorConf.KeepAliveMillisParam.option(10000)) - -// set params for the particular cluster -sqlContext.setConf("Cluster1", CassandraConnectorConf.ConnectionHostParam.option("127.0.0.1") ++ CassandraConnectorConf.ConnectionPortParam.option(12345)) -sqlContext.setConf("Cluster2", CassandraConnectorConf.ConnectionHostParam.option("127.0.0.2")) - -// set params for the particular keyspace -sqlContext.setConf("Cluster1", "ks1", ReadConf.SplitSizeInMBParam.option(128)) -sqlContext.setConf("Cluster1", "ks2", ReadConf.SplitSizeInMBParam.option(64)) -sqlContext.setConf("Cluster2", "ks3", ReadConf.SplitSizeInMBParam.option(80)) -``` - -###Creating DataFrames using Read Commands +The most programmatic way to create a Dataset is to invoke a `read` command on the SparkSession. This +will build a `DataFrameReader`. Specify `format` as `org.apache.spark.sql.cassandra`. +You can then use `options` to give a map of `Map[String,String]` of options as described above. +Then finish by calling `load` to actually get a `Dataset`. This code is all lazy +and will not actually load any data until an action is called. + +As well as specifying all these parameters manually, we offer a set of +[helper functions](#example-using-format-helper-functions) to make this easier as well. -The most programmatic way to create a data frame is to invoke a `read` command on the SQLContext. This - will build a `DataFrameReader`. Specify `format` as `org.apache.spark.sql.cassandra`. - You can then use `options` to give a map of `Map[String,String]` of options as described above. - Then finish by calling `load` to actually get a `DataFrame`. -#### Example Creating a DataFrame using a Read Command +#### Example Creating a Dataset using a Read Command ```scala -val df = sqlContext +val df = spark .read .format("org.apache.spark.sql.cassandra") .options(Map( "table" -> "words", "keyspace" -> "test" )) .load() + df.show ``` ``` @@ -107,92 +122,72 @@ cat 30 fox 40 ``` -There are also some helper methods which can make creating data frames easier. They can be accessed after importing -`org.apache.spark.sql.cassandra` package. In the following example, all the commands used to create a data frame are -equivalent: +There are also some helper methods which can make creating Datasets easier. They can +be accessed after importing `org.apache.spark.sql.cassandra` package. In the following +example, all the commands used to create the Dataset are equivalent: #### Example Using Format Helper Functions ```scala import org.apache.spark.sql.cassandra._ -val df1 = sqlContext +val df = spark .read - .format("org.apache.spark.sql.cassandra") - .options(Map("table" -> "words", "keyspace" -> "test", "cluster" -> "cluster_A")) + .cassandraFormat("words", "test") .load() - -val df2 = sqlContext + +//Loading an Dataset using a format helper and a option helper +val df = spark .read - .cassandraFormat("words", "test", "cluster_A") + .cassandraFormat("words", "test") + .options(ReadConf.SplitSizeInMBParam.option(32)) .load() + ``` -### Creating DataFrames using Spark SQL +### Creating Datasets using Spark SQL -Accessing data Frames using Spark SQL involves creating temporary tables and specifying the -source as `org.apache.spark.sql.cassandra`. The `OPTIONS` passed to this table are used to -establish a relation between the CassandraTable and the internally used DataSource. +Accessing Datasets using Spark SQL involves creating temporary views with the format + as `org.apache.spark.sql.cassandra`. The `OPTIONS` passed to this table are used to +establish a relation between the CassandraTable and the Spark catalog reference. #### Example Creating a Source Using Spark SQL: Create Relation with the Cassandra table test.words ```scala -scala> sqlContext.sql( - """CREATE TEMPORARY TABLE words - |USING org.apache.spark.sql.cassandra - |OPTIONS ( - | table "words", - | keyspace "test", - | cluster "Test Cluster", - | pushdown "true" - |)""".stripMargin) -scala> val df = sqlContext.sql("SELECT * FROM words") -scala> df.show() -``` -``` -word count -cat 30 -fox 40 -``` -```scala -scala> df.filter(df("count") > 30).show -``` -``` -word count -fox 40 -``` - -In addition you can use Spark SQL on the registered tables: +val createDDL = """CREATE TEMPORARY VIEW words + USING org.apache.spark.sql.cassandra + OPTIONS ( + table "words", + keyspace "test", + cluster "Test Cluster", + pushdown "true")""" +spark.sql(createDDL) // Creates Catalog Entry registering an existing Cassandra Table +spark.sql("SELECT * FROM words").show +spark.sql("SELECT * FROM words WHERE word = 'fox'").show +``` + +### Persisting a Dataset to Cassandra Using the Save Command +Datasets provide a save function which allows them to persist their data to another +DataSource. The connector supports using this feature to persist a Dataset to a Cassandra +table. + +#### Example Copying Between Two Tables Using Datasets ```scala -sqlContext.sql("SELECT * FROM words WHERE word = 'fox'").collect -``` -``` -Array[org.apache.spark.sql.Row] = Array([fox,40]) -``` - -###Persisting a DataFrame to Cassandra Using the Save Command -DataFrames provide a save function which allows them to persist their data to another -DataSource. The connector supports using this feature to persist a DataFrame a Cassandra -Table. -#### Example Copying Between Two Tables Using DataFrames -```scala -val df = sqlContext +val df = spark .read - .format("org.apache.spark.sql.cassandra") - .options(Map( "table" -> "words", "keyspace" -> "test" )) + .cassandraFormat("words", "test") .load() df.write - .format("org.apache.spark.sql.cassandra") - .options(Map( "table" -> "words_copy", "keyspace" -> "test")) + .cassandraFormat("words_copy", "test") .save() ``` -Similarly to reading Cassandra tables into data frames, we have some helper methods for the write path which are +Similarly to reading Cassandra tables into Datasets, we have some helper methods for the write path which are provided by `org.apache.spark.sql.cassandra` package. In the following example, all the commands are equivalent: -#### Example Using Helper Commands to Write DataFrames +#### Example Using Helper Commands to Write Datasets ```scala import org.apache.spark.sql.cassandra._ @@ -207,7 +202,7 @@ df.write ``` -### Setting Connector specific options on DataFrames +### Setting Connector Specific Options on Datasets Connector specific options can be set by invoking `options` method on either `DataFrameReader` or `DataFrameWriter`. There are several settings you may want to change in `ReadConf`, `WriteConf`, `CassandraConnectorConf`, `AuthConf` and others. Those settings are identified by instances of `ConfigParameter` case class which offers an easy way to apply @@ -230,24 +225,23 @@ multiple parameters can be chained: options(CassandraConnectorConf.ReadTimeoutParam.sqlOption("7000") ++ ReadConf.TaskMetricParam.sqlOption(true)) ``` -###Creating a New Cassandra Table From a DataFrame Schema -Spark Cassandra Connector adds a method to `DataFrame` that allows it to create a new Cassandra table from -the `StructType` schema of the DataFrame. This is convenient for persisting a DataFrame to a new table, especially -when the schema of the DataFrame is not known (fully or at all) ahead of time (at compile time of your application). -Once the new table is created, you can persist the DataFrame to the new table using the save function described above. +### Creating a New Cassandra Table From a Dataset Schema +Spark Cassandra Connector adds a method to `Dataset` that allows it to create a new Cassandra table from +the `StructType` schema of the Dataset. This is convenient for persisting a Dataset to a new table, especially +when the schema of the Dataset is not known (fully or at all) ahead of time (at compile time of your application). +Once the new table is created, you can persist the Dataset to the new table using the save function described above. The partition key and clustering key of the newly generated table can be set by passing in a list of names of columns which should be used as partition key and clustering key. -#### Example Creating a Cassandra Table from a DataFrame +#### Example Creating a Cassandra Table from a Dataset ```scala -// Add spark connector specific methods to DataFrame +// Add spark connector specific methods to Dataset import com.datastax.spark.connector._ -val df = sqlContext +val df = spark .read - .format("org.apache.spark.sql.cassandra") - .options(Map( "table" -> "words", "keyspace" -> "test" )) + .cassandraFormat("words", "test") .load() val renamed = df.withColumnRenamed("col1", "newcolumnname") @@ -258,14 +252,13 @@ renamed.createCassandraTable( clusteringKeyColumns = Some(Seq("newcolumnname"))) renamed.write - .format("org.apache.spark.sql.cassandra") - .options(Map( "table" -> "renamed", "keyspace" -> "test")) + .cassandraFormat("renamed", "test") .save() ``` -### Pushing down clauses to Cassandra -The DataFrame API will automatically pushdown valid where clauses to Cassandra as long as the -pushdown option is enabled (defaults to enabled.) +### Automatic Predicate Pushdown and Column Pruning +The Dataset API will automatically pushdown valid "where" clauses to Cassandra as long as the +pushdown option is enabled (default is enabled). Example Table ```sql @@ -283,16 +276,15 @@ INSERT INTO words (user, word, count ) VALUES ( 'Sam', 'alpha', 3 ); INSERT INTO words (user, word, count ) VALUES ( 'Zebra', 'zed', 100 ); ``` -First we can create a DataFrame and see that it has no `pushdown filters` set in the log. This +First we can create a Dataset and see that it has no `pushdown filters` set in the log. This means all requests will go directly to Cassandra and we will require reading all of the data to `show` -this DataFrame. +this Dataset. #### Example Catalyst Optimization with Cassandra Server Side Pushdowns ```scala -val df = sqlContext +val df = spark .read - .format("org.apache.spark.sql.cassandra") - .options(Map( "table" -> "words", "keyspace" -> "test")) + .cassandraFormat("words", "test") .load df.explain ``` @@ -320,7 +312,7 @@ df.show ``` The example schema has a clustering key of "word" so we can pushdown filters on that column to Cassandra. We -do this by applying a normal DataFrame filter. The connector will automatically determine that the +do this by applying a normal Dataset filter. The connector will automatically determine that the filter can be pushed down and will add it to `pushdown filters`. All of the elements of `pushdown filters` will be automatically added to the CQL requests made to Cassandra for the data from this table. The subsequent call will then only serialize data from Cassandra which passes the filter, @@ -368,13 +360,9 @@ CREATE TABLE pushdownexample ( ); ``` ```scala -val sqlContext = new org.apache.spark.sql.SQLContext(sc) -import sqlContext.implicits._ - -val df = sqlContext +val df = spark .read - .format("org.apache.spark.sql.cassandra") - .options(Map( "table" -> "pushdownexample", "keyspace" -> "pushdowns" )) + .cassandraFormat("pushdownexample", "pushdowns") .load() ``` To push down partition keys, all of them must be included, but not more than one predicate per partition key, otherwise nothing is pushed down. @@ -426,4 +414,11 @@ INFO 2015-08-26 00:56:37 org.apache.spark.sql.cassandra.CassandraSourceRelation INFO 2015-08-26 00:56:37 org.apache.spark.sql.cassandra.CassandraSourceRelation: pushdown filters: ArrayBuffer(EqualTo(clusterkey1,1), EqualTo(clusterkey2,1)) ``` +#### What Happened to DataFrames? + +In Spark 2.0 DataFrames are now just a specific case of the Dataset API. In particular +a DataFrame is just an alias for Dataset\[Row\]. This means everything you know about +DataFrames is also applicable to Datasets. A DataFrame is just a special Dataset that is +made up of Row objects. Many texts and resources still use the two terms interchangeably. + [Next - Python DataFrames](15_python.md) diff --git a/doc/15_python.md b/doc/15_python.md index a0fa71f79..d8fe897a3 100644 --- a/doc/15_python.md +++ b/doc/15_python.md @@ -16,7 +16,7 @@ http://spark-packages.org/package/datastax/spark-cassandra-connector ```bash ./bin/pyspark \ - --packages com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-s_2.11 + --packages com.datastax.spark:spark-cassandra-connector_2.11:2.0.1 ``` ### Loading a DataFrame in Python @@ -26,7 +26,7 @@ source and by specifying keyword arguments for `keyspace` and `table`. #### Example Loading a Cassandra Table as a Pyspark DataFrame ```python - sqlContext.read\ + spark.read\ .format("org.apache.spark.sql.cassandra")\ .options(table="kv", keyspace="test")\ .load().show() @@ -60,4 +60,17 @@ A DataFrame can be saved to an *existing* Cassandra table by using the the `org. The options and parameters are identical to the Scala Data Frames Api so please see [Data Frames](14_data_frames.md) for more information. +### Passing options with periods to the DataFrameReader + +Python does not support using periods(".") in variable names. This makes it +slightly more difficult to pass SCC options to the DataFrameReader. The `options` +function takes `kwargs**` which means you can't directly pass in keys. There is a +workaround though. Python allows you to pass a dictionary as a representation of kwargs and dictionaries +can have keys with periods. + +#### Example of using a dictionary as kwargs + + load_options = { "table": "kv", "keyspace": "test", "spark.cassandra.input.split.size_in_mb": "10"} + spark.read.format("org.apache.spark.sql.cassandra").options(**load_options).load().show() + [Next - Spark Partitioners](16_partitioning.md) diff --git a/doc/16_partitioning.md b/doc/16_partitioning.md index 589a2dbf5..63fdbd537 100644 --- a/doc/16_partitioning.md +++ b/doc/16_partitioning.md @@ -296,7 +296,7 @@ built for primitives which is why the above examples use `Tuple1` so frequently. This functionality does not currently function in DataFrames. -###Data Generator +### Data Generator ```scala import com.datastax.spark.connector._ diff --git a/doc/1_connecting.md b/doc/1_connecting.md index 4647babed..2e75a17aa 100644 --- a/doc/1_connecting.md +++ b/doc/1_connecting.md @@ -8,7 +8,7 @@ how to execute CQL statements from Spark applications. To connect your Spark application to Cassandra, set connection options in the `SparkConf` object. These are prefixed with `spark.` so that they can be recognized -from the spark-shell and set within the $SPARK_HOME/conf/spark-default.conf. +from `spark-shell` and set in `$SPARK_HOME/conf/spark-default.conf`. Example: @@ -21,12 +21,10 @@ val conf = new SparkConf(true) val sc = new SparkContext("spark://192.168.123.10:7077", "test", conf) ``` -Multiple hosts can be passed in using a comma separated list -("127.0.0.1,127.0.0.2"). These are the *initial contact points only*, all -nodes in the local DC will be used upon connecting. +Multiple hosts can be passed in using a comma-separated list in `spark.cassandra.connection.host` +(e.g. `"127.0.0.1,127.0.0.2"`). These are the *initial contact points only*, all nodes in the local DC will be used upon connecting. -See the reference section for a full list of options -[Cassandra Connection Parameters](reference.md#cassandra-connection-parameters) +See the reference section for [Cassandra Connection Parameters](reference.md#cassandra-connection-parameters). ### Connection management @@ -75,7 +73,7 @@ Eventually, when all the tasks needing Cassandra connectivity terminate, the connection to the Cassandra cluster will be closed shortly thereafter. The period of time for keeping unused connections open is controlled by the global `spark.cassandra.connection.keep_alive_ms` system property, -see [Cassandra Connection Parameters](reference.md#cassandra-connection-parameters) +see [Cassandra Connection Parameters](reference.md#cassandra-connection-parameters). ### Connecting manually to Cassandra @@ -101,13 +99,12 @@ CassandraConnector(conf).withSessionDo { session => ### Connecting to multiple Cassandra Clusters The Spark Cassandra Connector is able to connect to multiple Cassandra -Clusters for all of it's normal operations. The default `CassandraConnector` -object used by calls to `sc.cassandraTable` and `saveToCassandra` is -specified by the `SparkConfiguration`. If you would like to use multiple clusters, +Clusters for all of its normal operations. +The default `CassandraConnector` object used by calls to `sc.cassandraTable` and `saveToCassandra` is specified by the `SparkConfiguration`. If you would like to use multiple clusters, an implicit `CassandraConnector` can be used in a code block to specify the target cluster for all operations in that block. -####Example of reading from one cluster and writing to another +#### Example of reading from one cluster and writing to another ```scala import com.datastax.spark.connector._ @@ -115,7 +112,6 @@ import com.datastax.spark.connector.cql._ import org.apache.spark.SparkContext - def twoClusterExample ( sc: SparkContext) = { val connectorToClusterOne = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "127.0.0.1")) val connectorToClusterTwo = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "127.0.0.2")) diff --git a/doc/7_java_api.md b/doc/7_java_api.md index 3135580ec..000c49f34 100644 --- a/doc/7_java_api.md +++ b/doc/7_java_api.md @@ -253,6 +253,23 @@ of *RDD* elements and uses a default `JavaBeanColumnMapper` to map those element to attribute translations can be specified in order to override the default logic. If `JavaBeanColumnMapper` is not an option, a custom column mapper can be specified as well. +#### Example of Saving and RDD of Person object with Differently Named Fields +Say we have a table `people2` with columns `id INT`, `last_name TEXT`, `date_of_birth TIMESTAMP` and +we want to save RDD of `Person` class objects to this table. To do it we need to use overloaded `mapToRow(Class, Map)` method. +```java +Map fieldToColumnMapping = new HashMap<>(); +fieldToColumnMapping.put("name", "last_name"); +fieldToColumnMapping.put("birthDate", "date_of_birth"); +javaFunctions(rdd).writerBuilder("ks", "people2", mapToRow(Person.class, fieldToColumnMapping)).saveToCassandra(); +``` +Another version of method `mapToRow(Class, Pair[])` can be considered much more handy for inline invocations. +```java +javaFunctions(rdd).writerBuilder("ks", "people2", mapToRow( + Person.class, + Pair.of("name", "last_name"), + Pair.of("birthDate", "date_of_birth"))) + .saveToCassandra(); +``` ### Working with tuples Since 1.3 there new methods to work with Scala tuples. diff --git a/doc/9_demos.md b/doc/9_demos.md index dc9dfcfc7..a485c8e4f 100644 --- a/doc/9_demos.md +++ b/doc/9_demos.md @@ -2,4 +2,7 @@ Demos are not a part of Spark Cassandra Connector as of 2.0.0. +Build examples are provided here +[Spark Build Examples](https://github.com/datastax/SparkBuildExamples) + [Back to main page](../README.md) diff --git a/doc/FAQ.md b/doc/FAQ.md index 080b0928f..2df97f778 100644 --- a/doc/FAQ.md +++ b/doc/FAQ.md @@ -2,15 +2,21 @@ ## Frequently Asked Questions -### What does this mean "`NoClassDefFoundError: scala/collection/GenTraversableOnce$class?`" +### Why am I seeing (Scala Version Mismatch Error) + +#### Non Exclusive List of Scala Version Mismatch Errors +* `NoClassDefFoundError: scala/collection/GenTraversableOnce$class?` +* `NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)` +* `missing or invalid dependency detected while loading class file 'ScalaGettableByIndexData.class'` +* `java.lang.NoClassDefFoundError: scala/runtime/AbstractPartialFunction` This means that there is a mix of Scala versions in the libraries used in your -code. The collection api is different between Scala 2.10 and 2.11 and this the -most common error which occurs if a scala 2.10 library is attempted to be loaded +code. The collection API is different between Scala 2.10 and 2.11 and this the +most common error which occurs if a Scala 2.10 library is attempted to be loaded in a Scala 2.11 runtime. To fix this make sure that the name has the correct Scala version suffix to match your Scala version. -##### Spark Cassandra Connector built against Scala 2.10 +##### Spark Cassandra Connector built Against Scala 2.10 ```xml spark-cassandra-connector_2.10 ``` @@ -39,7 +45,7 @@ For reference the defaults of Spark as downloaded from the Apache Website are ### How do I Fix Guava Classpath Errors Guava errors come from a conflict between Guava brought in by some -dependency (like Hadoop 2.7) and the Cassandra java Driver. +dependency (like Hadoop 2.7) and the Cassandra Java Driver. The Cassandra Java Driver is unable to function correctly if an earlier version of Guava is preempting the required version. The Java Driver will throw errors if it determines @@ -90,7 +96,7 @@ determine how many tasks have been generated. To check this look at the UI for y job and see how many tasks are being run. In the current Shell a small progress bar is shown when running stages, the numbers represent (Completed Tasks + Running Tasks) / Total Tasks - [Stage 2:=============================================> (121 + 1) / 200]0 + [Stage 2:=============================================> (121 + 1) / 200] If you see that only a single task has been created this means that the Cassandra Token range has not been split into a enough tasks to be well parallelized on your cluster. The number of @@ -197,13 +203,27 @@ Input.split.size_in_mb uses a internal system table in Cassandra ( >= 2.1.5) to of the data in Cassandra. The table is called system.size_estimates is not meant to be absolutely accurate so there will be some inaccuracy with smaller tables and split sizes. +### java.lang.NoClassDefFoundError: com/twitter/jsr166e/LongAdder is getting thrown? + +This error is commonly thrown when the dependencies of the Spark Cassandra Connector are not +on the runtime classpath of the Spark Application. This is usually caused by not using the +prescribed `--packages` method of adding the Spark Cassandra Connector and it's dependencies +to the runtime classpath. Fix this by following the launch guidelines as shown in the +[quick start guide](0_quick_start.md). + + ### Can I contribute to the Spark Cassandra Connector? -YES! Feel free to start a Jira and detail the changes you would like to make or the feature you +YES! Feel free to start a JIRA and detail the changes you would like to make or the feature you would like to add. We would be happy to discuss it with you and see your work. Feel free to create - a Jira before you have started any work if you would like feedback on an idea. When you have a branch +a JIRA before you have started any work if you would like feedback on an idea. When you have a branch that you are satisfied with and passes all the tests (`/dev/run_tests.sh`) make a GitHub PR against -your target Connector Version and set your Jira to Reviewing. +your target Connector Version and set your JIRA to Reviewing. + +### Is there a CassandraRDDMock I can use in my tests? + +Yes. Please see CassandraRDDMock.scala for the class and CassandraRDDMockSpec.scala for example +usage. ### What should I do if I find a bug? diff --git a/generateDocs.sh b/generateDocs.sh index 2cf9d61b9..62a8d43c8 100755 --- a/generateDocs.sh +++ b/generateDocs.sh @@ -1,30 +1,25 @@ #!/bin/bash -SCC_HOME=~/repos/spark-cassandra-connector -OUTPUT=/tmp/SCC_DOC_TEMP +SCC_HOME=$HOME/repos/spark-cassandra-connector +OUTPUT="/tmp/SCC_DOC_TEMP" rm -r $OUTPUT mkdir -p $OUTPUT -for version in $@ ;do - cd $SCC_HOME - echo "Making docs for $version" - - git checkout "v$version" +echo "SPARK CASSANDRA CONNECTOR HOME IS $SCC_HOME" +for VERSION in $@ ;do + echo "Making docs for $VERSION" + git checkout "v$VERSION" if [ $? -ne 0 ]; then - echo "Unable to checkout version $version, skipping" + echo "Unable to checkout version $VERSION, skipping" continue fi - - cd $SCC_HOME sbt clean sbt doc - mkdir $OUTPUT/$version + mkdir $OUTPUT/$VERSION - cd $OUTPUT/$version - for folder in $SCC_HOME/spark*; do - module=$(basename $folder) - cp -vr $folder/target/scala-2.10/api $module + for MODULE in spark-cassandra-connector spark-cassandra-connector-embedded; do + FOLDER=$SCC_HOME/$MODULE + echo "COPYING $FOLDER to $OUTPUT/$VERSION/$MODULE" + cp -vr $FOLDER/target/scala-2.10/api $OUTPUT/$VERSION/$MODULE done done - -cd $SCC_HOME git checkout gh-pages cp -r $OUTPUT/* $SCC_HOME/ApiDocs diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDMockSpec.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDMockSpec.scala new file mode 100644 index 000000000..974288a92 --- /dev/null +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDMockSpec.scala @@ -0,0 +1,24 @@ +package com.datastax.spark.connector.rdd + +import com.datastax.spark.connector.{CassandraRow, SparkCassandraITFlatSpecBase} +import com.datastax.spark.connector.cql.CassandraConnector +import com.datastax.spark.connector.embedded.YamlTransformations + +class CassandraRDDMockSpec extends SparkCassandraITFlatSpecBase { + useCassandraConfig(Seq(YamlTransformations.Default)) + useSparkConf(defaultConf) + + override val conn = CassandraConnector(defaultConf) + + "A CassandraRDDMock" should "behave like a CassandraRDD without needing Cassandra" in { + val columns = Seq("key", "value") + //Create a fake CassandraRDD[CassandraRow] + val rdd = sc + .parallelize(1 to 10) + .map(num => CassandraRow.fromMap(columns.zip(Seq(num, num)).toMap)) + + val fakeCassandraRDD: CassandraRDD[CassandraRow] = new CassandraRDDMock(rdd) + + fakeCassandraRDD.cassandraCount() should be (10) + } +} diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataSourceSpec.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataSourceSpec.scala index 456c37413..00e5f908e 100644 --- a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataSourceSpec.scala +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataSourceSpec.scala @@ -2,12 +2,12 @@ package com.datastax.spark.connector.sql import scala.concurrent.Future -import com.datastax.spark.connector.util.Logging import org.apache.spark.sql.SaveMode._ import org.apache.spark.sql.cassandra.{AnalyzedPredicates, CassandraPredicateRules, CassandraSourceRelation, TableRef} import org.apache.spark.sql.sources.{EqualTo, Filter} import org.apache.spark.sql.{DataFrame, SparkSession} import org.scalatest.BeforeAndAfterEach +import com.datastax.spark.connector._ import com.datastax.spark.connector.SparkCassandraITFlatSpecBase import com.datastax.spark.connector.cql.{CassandraConnector, TableDef} import com.datastax.spark.connector.embedded.YamlTransformations @@ -35,6 +35,10 @@ class CassandraDataSourceSpec extends SparkCassandraITFlatSpecBase with Logging session.execute(s"""INSERT INTO $ks.test1 (a, b, c, d, e, f, g, h) VALUES (1, 2, 1, 2, 2, 2, 2, 2)""") }, + Future { + session.execute(s"CREATE TABLE $ks.test_rowwriter (a INT PRIMARY KEY, b INT)") + }, + Future { session.execute(s"CREATE TABLE $ks.test_insert (a INT PRIMARY KEY, b INT)") }, @@ -87,7 +91,7 @@ class CassandraDataSourceSpec extends SparkCassandraITFlatSpecBase with Logging } override def afterEach(): Unit ={ - sc.setLocalProperty(CassandraSourceRelation.AdditionalCassandraPushDownRulesParam.name, null) + sc.setLocalProperty(CassandraSourceRelation.AdditionalCassandraPushDownRulesParam.name, null) } def createTempTable(keyspace: String, table: String, tmpTable: String) = { @@ -166,6 +170,11 @@ class CassandraDataSourceSpec extends SparkCassandraITFlatSpecBase with Logging sparkSession.sql("DROP VIEW insertTable2") } + // This test is just to make sure at runtime the implicit for RDD[Row] can be found + it should "implicitly generate a rowWriter from it's RDD form" in { + sparkSession.sql("SELECT a, b from tmpTable").rdd.saveToCassandra(ks, "test_rowwriter") + } + it should "allow to filter a table" in { sparkSession.sql("SELECT a, b FROM tmpTable WHERE a=1 and b=2 and c=1 and e=1").collect() should have length 2 } @@ -212,9 +221,9 @@ class CassandraDataSourceSpec extends SparkCassandraITFlatSpecBase with Logging it should "throws exception during overwriting a table when confirm.truncate is false" in { val test_df = TestPartialColumns(1400820884, "Firefox", 123242) - val ss = sparkSession import ss.implicits._ + val df = sc.parallelize(Seq(test_df)).toDF val message = intercept[UnsupportedOperationException] { diff --git a/spark-cassandra-connector/src/main/java/com/datastax/spark/connector/japi/CassandraJavaUtil.java b/spark-cassandra-connector/src/main/java/com/datastax/spark/connector/japi/CassandraJavaUtil.java index 9fd1bbab0..3968d2f93 100644 --- a/spark-cassandra-connector/src/main/java/com/datastax/spark/connector/japi/CassandraJavaUtil.java +++ b/spark-cassandra-connector/src/main/java/com/datastax/spark/connector/japi/CassandraJavaUtil.java @@ -1196,11 +1196,12 @@ public static RowWriterFactory safeMapToRow(Class targetClass, ColumnM *

* The method uses {@link JavaBeanColumnMapper} as the column mapper. If another column mapper has to * be used, see {@link #mapToRow(Class, ColumnMapper)} method. + * @param fieldToColumnNameMap mapping of field name to column name. */ public static RowWriterFactory mapToRow( - Class sourceClass, Map columnNameMappings + Class sourceClass, Map fieldToColumnNameMap ) { - ColumnMapper mapper = javaBeanColumnMapper(safeClassTag(sourceClass), columnNameMappings); + ColumnMapper mapper = javaBeanColumnMapper(safeClassTag(sourceClass), fieldToColumnNameMap); return mapToRow(sourceClass, mapper); } @@ -1216,9 +1217,10 @@ public static RowWriterFactory mapToRow( *

* The method uses {@link JavaBeanColumnMapper} as the column mapper. If another column mapper has to * be used, see {@link #mapToRow(Class, ColumnMapper)} method. + * @param fieldToColumnNameMappings mapping of field name to column name. */ - public static RowWriterFactory mapToRow(Class sourceClass, Pair... columnNameMappings) { - return mapToRow(sourceClass, convertToMap(columnNameMappings)); + public static RowWriterFactory mapToRow(Class sourceClass, Pair... fieldToColumnNameMappings) { + return mapToRow(sourceClass, convertToMap(fieldToColumnNameMappings)); } // ------------------------------------------------------------------------- diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/SessionProxy.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/SessionProxy.scala index e52e9f84b..4f9f03584 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/SessionProxy.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/SessionProxy.scala @@ -8,9 +8,6 @@ import org.apache.commons.lang3.ClassUtils import collection.JavaConverters._ -import org.apache.commons.lang3.ClassUtils - - /** Wraps a `Session` and intercepts: * - `close` method to invoke `afterClose` handler * - `prepare` methods to cache `PreparedStatement` objects. */ @@ -63,7 +60,6 @@ object SessionProxy extends Logging { /** Creates a new `SessionProxy` delegating to the given `Session`. * Additionally registers a callback on `Session#close` method. - * * @param afterClose code to be invoked after the session has been closed */ def wrapWithCloseAction(session: Session)(afterClose: Session => Any): Session = { val listInterfaces = ClassUtils.getAllInterfaces(session.getClass) diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/mapper/ColumnMapper.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/mapper/ColumnMapper.scala index 72d141055..fd2ee47e3 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/mapper/ColumnMapper.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/mapper/ColumnMapper.scala @@ -26,6 +26,8 @@ import scala.reflect.runtime.universe._ */ trait ColumnMapper[T] { + val isTopLevel: Boolean = true + /** Provides a mapping between given table or UDT and properties of type `T`, * useful for creating objects of type `T`. Throws [[IllegalArgumentException]] if * `selectedColumns` does not provide some columns needed to instantiate object of type `T`*/ diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/mapper/DefaultColumnMapper.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/mapper/DefaultColumnMapper.scala index 325b78f9f..d890077f8 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/mapper/DefaultColumnMapper.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/mapper/DefaultColumnMapper.scala @@ -102,11 +102,16 @@ class DefaultColumnMapper[T : TypeTag](columnNameOverride: Map[String, String] = } yield (getterName, columnRef) }.toMap - // Check if we have all the required columns: + /** if this is a column mapper for the top level row, then handle gracefully a set of unmapped columns at the end of + * the table columns. Otherwise, if this is a nested type, then require that there are be no unmapped columns. + */ val mappedColumns = getterMap.values.toSet val unmappedColumns = selectedColumns.filterNot(mappedColumns) - require(unmappedColumns.isEmpty, s"Columns not found in $tpe: [${unmappedColumns.mkString(", ")}]") + if (!isTopLevel) + require(unmappedColumns.isEmpty, s"Columns not found in nested $tpe: [${unmappedColumns.mkString(", ")}]") + else + require( selectedColumns.endsWith(unmappedColumns), s"Unmapped columns nust be at end of table definition: [${unmappedColumns.mkString(", ")}]") SimpleColumnMapForWriting(getterMap) } diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/AbstractCassandraJoin.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/AbstractCassandraJoin.scala index 550598f64..d74212c56 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/AbstractCassandraJoin.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/AbstractCassandraJoin.scala @@ -65,6 +65,7 @@ private[rdd] trait AbstractCassandraJoin[L, R] { // Initialize RowWriter and Query to be used for accessing Cassandra rowWriter.columnNames + rowWriter.isValidRowWriter singleKeyCqlQuery.length def checkSingleColumn(column: ColumnRef): Unit = { diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala index 1fac00e62..fccfba5c9 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala @@ -45,6 +45,9 @@ trait TypeConverter[T] extends Serializable { throw new TypeConversionException(s"Cannot convert object $obj to $targetTypeName.") ) } + + def filteredColumns: Set[String] = Set() + } /** Handles nullable types and converts any null to null. */ @@ -463,10 +466,12 @@ object TypeConverter { implicit object LocalDateConverter extends NullableTypeConverter[LocalDate] { def targetTypeTag = LocalDateTypeTag val dateRegx = """(\d\d\d\d)-(\d\d)-(\d\d)""".r + val yearRegx = """(\d\d\d\d)""".r def convertPF = { case x: LocalDate => x case dateRegx(y, m, d) => LocalDate.fromYearMonthDay(y.toInt, m.toInt, d.toInt) + case yearRegx(y) => LocalDate.fromYearMonthDay(y.toInt, 1, 1) case x: Int => LocalDate.fromDaysSinceEpoch(x) case x: JodaLocalDate => LocalDate.fromYearMonthDay(x.getYear, x.getMonthOfYear, x.getDayOfMonth) case x: DateTime => { diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/DefaultRowWriter.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/DefaultRowWriter.scala index d8fd84840..bfadcc14c 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/DefaultRowWriter.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/DefaultRowWriter.scala @@ -19,13 +19,19 @@ class DefaultRowWriter[T : TypeTag : ColumnMapper]( override def readColumnValues(data: T, buffer: Array[Any]) = { val row = converter.convert(data) - for (i <- columnNames.indices) + for (i <- row.columnValues.indices) buffer(i) = row.getRaw(i) } + + override def isValidRowWriter(): Unit = { + val availableColumns = converter.filteredColumns + val unmatchedPartitionKeys = table.partitionKey.filter(pk => !availableColumns.contains(pk.columnName)) + require(unmatchedPartitionKeys.isEmpty, + s"The following partition keys are missing: [${unmatchedPartitionKeys.mkString(",")}]") + } } object DefaultRowWriter { - def factory[T : ColumnMapper : TypeTag] = new RowWriterFactory[T] { override def rowWriter(tableDef: TableDef, selectedColumns: IndexedSeq[ColumnRef]) = { new DefaultRowWriter[T](tableDef, selectedColumns) diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/MappedToGettableDataConverter.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/MappedToGettableDataConverter.scala index f710e2cc8..8b990f599 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/MappedToGettableDataConverter.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/MappedToGettableDataConverter.scala @@ -43,7 +43,7 @@ private[connector] object MappedToGettableDataConverter extends Logging{ * [[com.datastax.spark.connector.mapper.JavaBeanColumnMapper JavaBeanColumnMapper]], * and for everything else uses * [[com.datastax.spark.connector.mapper.DefaultColumnMapper DefaultColumnMapper]] */ - private def columnMapper[U: TypeTag]: ColumnMapper[U] = { + private def columnMapper[U: TypeTag](topLevel: Boolean = true): ColumnMapper[U] = { logDebug(s"Finding a UDT ColumnMapper for typeTag ${typeTag[U]}") val tpe = SparkReflectionLock.synchronized(typeTag[U].tpe) if (ReflectionUtil.isScalaTuple(tpe)) @@ -51,7 +51,7 @@ private[connector] object MappedToGettableDataConverter extends Logging{ else if (isJavaBean) new JavaBeanColumnMapper[U]()(ReflectionUtil.classTag[U]) else - new DefaultColumnMapper[U] + new DefaultColumnMapper[U] { override val isTopLevel = topLevel } } /** Returns true for tuple types that provide full type information of their components */ @@ -112,13 +112,13 @@ private[connector] object MappedToGettableDataConverter extends Logging{ case (t: StructDef, TypeRef(_, _, List(argScalaType))) if scalaType <:< typeOf[Option[Any]] => type U2 = u2 forSome {type u2} implicit val tt = ReflectionUtil.typeToTypeTag[U2](argScalaType) - implicit val cm: ColumnMapper[U2] = columnMapper[U2] + implicit val cm: ColumnMapper[U2] = columnMapper[U2]() apply[U2](t, t.columnRefs, Some(childClassloader)) // UDTs mapped to case classes and tuples mapped to Scala tuples. // ColumnMappers support mapping Scala tuples, so we don't need a special case for them. case (t: StructDef, _) => - implicit val cm: ColumnMapper[U] = columnMapper[U] + implicit val cm: ColumnMapper[U] = columnMapper[U](false) apply[U](t, t.columnRefs, Some(childClassloader)) // Primitive types @@ -166,7 +166,7 @@ private[connector] object MappedToGettableDataConverter extends Logging{ } private val getters = - columnNames.map(getterByColumnName) + columnNames.flatMap(col => getterByColumnName.get(col)) @transient private val scalaTypes: IndexedSeq[Type] = @@ -179,7 +179,7 @@ private[connector] object MappedToGettableDataConverter extends Logging{ new PropertyExtractor(cls, getters) private val converters = { - for (i <- columnNames.indices) yield { + for (i <- getters.indices) yield { try { val ct = columnTypes(i) val st = scalaTypes(i) @@ -196,7 +196,8 @@ private[connector] object MappedToGettableDataConverter extends Logging{ } override def targetTypeTag = typeTag[struct.ValueRepr] - + override def filteredColumns = + columnNames.toSet.intersect(columnMap.getters.values.map(_.toString).toSet) override def convertPF = { case obj if cls.isInstance(obj) => diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/RowWriter.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/RowWriter.scala index 9c5cd1110..730d73cb0 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/RowWriter.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/RowWriter.scala @@ -14,4 +14,6 @@ trait RowWriter[T] extends Serializable { * in the same order as they are listed in the columnNames sequence. */ def readColumnValues(data: T, buffer: Array[Any]) + def isValidRowWriter(): Unit = {} + } diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/RowWriterFactory.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/RowWriterFactory.scala index 96b1ec862..e02d86cf3 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/RowWriterFactory.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/RowWriterFactory.scala @@ -1,10 +1,10 @@ package com.datastax.spark.connector.writer import scala.reflect.runtime.universe._ - import com.datastax.spark.connector.ColumnRef import com.datastax.spark.connector.cql.TableDef import com.datastax.spark.connector.mapper.ColumnMapper +import org.apache.spark.sql.Row /** Creates instances of [[RowWriter]] objects for the given row type `T`. * `RowWriterFactory` is the trait you need to implement if you want to support row representations @@ -21,6 +21,9 @@ trait RowWriterFactory[T] { /** Provides a low-priority implicit `RowWriterFactory` able to write objects of any class for which * a [[com.datastax.spark.connector.mapper.ColumnMapper ColumnMapper]] is defined. */ trait LowPriorityRowWriterFactoryImplicits { + implicit def sqlRowWriterFactory[T <: Row : TypeTag]: RowWriterFactory[Row] = + SqlRowWriter.Factory + implicit def defaultRowWriterFactory[T : TypeTag : ColumnMapper]: RowWriterFactory[T] = DefaultRowWriter.factory } diff --git a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/mapper/DefaultColumnMapperTest.scala b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/mapper/DefaultColumnMapperTest.scala index 8376a465e..54c83f009 100644 --- a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/mapper/DefaultColumnMapperTest.scala +++ b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/mapper/DefaultColumnMapperTest.scala @@ -205,11 +205,4 @@ class DefaultColumnMapperTest { new DefaultColumnMapper[DefaultColumnMapperTestClass1]() .columnMapForReading(table1, table1.columnRefs.tail) } - - @Test(expected = classOf[IllegalArgumentException]) - def testNotEnoughPropertiesForWriting(): Unit = { - new DefaultColumnMapper[DefaultColumnMapperTestClass1]() - .columnMapForWriting(table1, table1.columnRefs :+ ColumnName("missingColumn")) - } - -} \ No newline at end of file +} diff --git a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/rdd/CassandraRDDMock.scala b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/rdd/CassandraRDDMock.scala new file mode 100644 index 000000000..ba204d09f --- /dev/null +++ b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/rdd/CassandraRDDMock.scala @@ -0,0 +1,55 @@ +package com.datastax.spark.connector.rdd + +import com.datastax.spark.connector.cql.CassandraConnector +import com.datastax.spark.connector.{ColumnRef, ColumnSelector, SomeColumns} +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD +import org.apache.spark.{Partition, TaskContext} + +import scala.reflect.ClassTag + +/** + * A Fake CassandraRDD for mocking CassandraRDDs. Instead of reading data from cassandra this + * call uses an RDD prev as a source for data. In essenece this is a passthrough RDD which + * takes the contents of the previous RDD and pretends it is a CassandraRDD. + */ +class CassandraRDDMock[R : ClassTag](prev: RDD[R], keyspace: String = "fake", table: String = "fake") extends + CassandraRDD[R](prev.sparkContext, prev.dependencies){ + + /** This is slightly different than Scala this.type. + * this.type is the unique singleton type of an object which is not compatible with other + * instances of the same type, so returning anything other than `this` is not really possible + * without lying to the compiler by explicit casts. + * Here SelfType is used to return a copy of the object - a different instance of the same type */ + override type Self = this.type + + override protected[connector] def keyspaceName: String = keyspace + override protected[connector] def tableName: String = table + override protected def columnNames: ColumnSelector = SomeColumns() + override protected def where: CqlWhereClause = CqlWhereClause(Seq.empty, Seq.empty) + override protected def readConf: ReadConf = ReadConf() + override protected def limit: Option[CassandraLimit] = None + override protected def clusteringOrder: Option[ClusteringOrder] = None + override protected def connector: CassandraConnector = ??? + override val selectedColumnRefs: Seq[ColumnRef] = Seq.empty + override protected def narrowColumnSelection(columns: Seq[ColumnRef]): Seq[ColumnRef] = Seq.empty + + override def toEmptyCassandraRDD: EmptyCassandraRDD[R] = new EmptyCassandraRDD[R](prev.sparkContext, keyspace, table) + + /** Counts the number of items in this RDD by selecting count(*) on Cassandra table */ + override def cassandraCount(): Long = prev.count + + /** Doesn't actually copy since we don't really use any of these parameters **/ + override protected def copy( + columnNames: ColumnSelector, + where: CqlWhereClause, + limit: Option[CassandraLimit], + clusteringOrder: Option[ClusteringOrder], + readConf: ReadConf, + connector: CassandraConnector) : CassandraRDDMock.this.type = this + + /*** Pass through the parent RDD's Compute and partitions ***/ + @DeveloperApi + override def compute(split: Partition, context: TaskContext): Iterator[R] = prev.compute(split, context) + override protected def getPartitions: Array[Partition] = prev.partitions +} diff --git a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/types/TypeConverterTest.scala b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/types/TypeConverterTest.scala index f3e825c46..da08f9e19 100644 --- a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/types/TypeConverterTest.scala +++ b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/types/TypeConverterTest.scala @@ -235,6 +235,13 @@ class TypeConverterTest { assertEquals(testDate, c.convert(new org.joda.time.LocalDate(1985, 8, 3))) } + @Test + def testYearLocalDate(): Unit = { + val c = TypeConverter.forType[LocalDate] + val testDate = LocalDate.fromYearMonthDay(1985, 1, 1) + assertEquals(testDate, c.convert("1985")) + } + @Test def testTimeType(): Unit = { val c = TypeConverter.TimeTypeConverter diff --git a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/writer/DefaultRowWriterTest.scala b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/writer/DefaultRowWriterTest.scala index 053b82d37..0769f2d3d 100644 --- a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/writer/DefaultRowWriterTest.scala +++ b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/writer/DefaultRowWriterTest.scala @@ -99,4 +99,49 @@ class DefaultRowWriterTest { TypeConverter.unregisterConverter(StringWrapperConverter) } } + + case class UnderSpecified(c1: String, c2: Int) + case class UnderSpecifiedReversed(c2: Int, c1: String) + case class UnderSpecifiedOutOfOrder(c1: String, c3: Int) + + @Test + def testIgnoresRegularTableColumns(): Unit = { + val column = ColumnDef("c1", PartitionKeyColumn, TextType) + val column2 = ColumnDef("c2", RegularColumn, IntType) + val column3 = ColumnDef("c3", RegularColumn, IntType) + val table = TableDef("test", "table", Seq(column), Nil, Seq(column2, column3)) + val rowWriter = new DefaultRowWriter[UnderSpecified](table, table.columnRefs) + val obj = UnderSpecified("some text", 10) + val buf = Array.ofDim[Any](3) + rowWriter.readColumnValues(obj, buf) + assertEquals("some text", buf(0)) + assertEquals(10, buf(1)) + assertEquals(null, buf(2)) + } + @Test + def testIgnoresReglarTableColumnsReversedOrder(): Unit = { + val column = ColumnDef("c1", PartitionKeyColumn, TextType) + val column2 = ColumnDef("c2", RegularColumn, IntType) + val column3 = ColumnDef("c3", RegularColumn, IntType) + val table = TableDef("test", "table", Seq(column), Nil, Seq(column2, column3)) + val rowWriter = new DefaultRowWriter[UnderSpecifiedReversed](table, table.columnRefs) + val obj = UnderSpecifiedReversed(10, "some text") + val buf = Array.ofDim[Any](3) + rowWriter.readColumnValues(obj, buf) + assertEquals("some text", buf(0)) + assertEquals(10, buf(1)) + assertEquals(null, buf(2)) + } + + @Test(expected = classOf[IllegalArgumentException]) + def testWriterFailsForColumnsOutOfOrder(): Unit = { + val column = ColumnDef("c1", PartitionKeyColumn, TextType) + val column2 = ColumnDef("c2", RegularColumn, IntType) + val column3 = ColumnDef("c3", RegularColumn, IntType) + val table = TableDef("test", "table", Seq(column), Nil, Seq(column2, column3)) + val rowWriter = new DefaultRowWriter[UnderSpecifiedOutOfOrder](table, table.columnRefs) + val obj = UnderSpecifiedOutOfOrder("some text", 10) + val buf = Array.ofDim[Any](3) + rowWriter.readColumnValues(obj, buf) + } } diff --git a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/writer/MappedToGettableDataConverterSpec.scala b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/writer/MappedToGettableDataConverterSpec.scala index 1d6ee1c61..b0a7715dd 100644 --- a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/writer/MappedToGettableDataConverterSpec.scala +++ b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/writer/MappedToGettableDataConverterSpec.scala @@ -17,6 +17,7 @@ class MappedToGettableDataConverterSpec extends FlatSpec with Matchers { val addressType = UserDefinedType("address", IndexedSeq(streetColumn, numberColumn)) val loginColumn = ColumnDef("login", PartitionKeyColumn, VarCharType) + val logoutColumn = ColumnDef("logout", RegularColumn, VarCharType) val passwordColumn = ColumnDef("password", RegularColumn, VarCharType) val addressColumn = ColumnDef("address", RegularColumn, addressType) val addressesColumn = ColumnDef("addresses", RegularColumn, ListType(addressType)) @@ -27,9 +28,10 @@ class MappedToGettableDataConverterSpec extends FlatSpec with Matchers { new TableDef("test", "test", partitionKeyColumns, clusteringColumns, regularColumns) } - case class User(login: String, password: String) case class Address(street: String, number: Int) + case class Session(logout: String, login: String) + case class SessionWithPassword(logout: String, login: String, password: Option[String]) "MappedToGettableDataConverter" should "be Serializable" in { val userTable = newTable(loginColumn, passwordColumn) @@ -294,6 +296,31 @@ class MappedToGettableDataConverterSpec extends FlatSpec with Matchers { exception.getMessage should include("test") } + case class UnderSpecifiedAddress(street: String) + case class UnderSpecifiedUser1(login: String, address: UnderSpecifiedAddress) + case class UnderSpecifiedUser2(login: String) + + it should "throw a meaningful exception when a nested column is not fully specified" in { + val userTable = newTable(loginColumn, addressColumn) + val user = UnderSpecifiedUser1("foo", UnderSpecifiedAddress("main street")) + val exception = the[IllegalArgumentException] thrownBy { + val converter = MappedToGettableDataConverter[UnderSpecifiedUser1](userTable, userTable.columnRefs) + converter.convert(user) + } + exception.getMessage should include("UnderSpecifiedAddress") + exception.getMessage should include("address") + exception.getMessage should include("test") + } + + it should "work when the top level row is not fully specified" in { + val userTable = newTable(loginColumn, addressColumn) + val user = UnderSpecifiedUser2("foo") + val converter = MappedToGettableDataConverter[UnderSpecifiedUser2](userTable, userTable.columnRefs) + val row = converter.convert(user) + row.getString("login") shouldEqual "foo" + row.length shouldEqual 1 + } + it should "throw a meaningful exception when a tuple field has an incorrect number of components" in { val tupleType = TupleType( TupleFieldDef(0, IntType), @@ -323,4 +350,13 @@ class MappedToGettableDataConverterSpec extends FlatSpec with Matchers { row.getString("login") shouldEqual "login" row.getString("password") shouldEqual "password" } + + it should "work when fields are reversed" in { + val loginTable = newTable(loginColumn, logoutColumn) + val converter = MappedToGettableDataConverter[Session](loginTable, loginTable.columnRefs) + val value = Session("user_logout", "user_login") + val row = converter.convert(value) + row.getString("login") shouldEqual "user_login" + row.getString("logout") shouldEqual "user_logout" + } }