Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARKC-706: Add basic support for Cassandra vectors #1366

Merged
merged 6 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
3.5.1
* Support for Vector type available in Cassandra 5.0 (SPARKC-706)
* Upgrade Cassandra Java Driver to 4.18.1, support Cassandra 5.0 in test framework (SPARKC-710)

3.5.0
* Support for Apache Spark 3.5 (SPARKC-704)
Expand Down
21 changes: 14 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@

## Quick Links

| What | Where |
| ---------- |---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Community | Chat with us at [Apache Cassandra](https://cassandra.apache.org/_/community.html#discussions) |
| Scala Docs | Most Recent Release (3.5.0): [Connector API docs](https://datastax.github.io/spark-cassandra-connector/ApiDocs/3.5.0/connector/com/datastax/spark/connector/index.html), [Connector Driver docs](https://datastax.github.io/spark-cassandra-connector/ApiDocs/3.5.0/driver/com/datastax/spark/connector/index.html) |
| Latest Production Release | [3.5.0](https://search.maven.org/artifact/com.datastax.spark/spark-cassandra-connector_2.12/3.5.0/jar) |
| What | Where |
| ---------- |---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Community | Chat with us at [Apache Cassandra](https://cassandra.apache.org/_/community.html#discussions) |
| Scala Docs | Most Recent Release (3.5.1): [Connector API docs](https://datastax.github.io/spark-cassandra-connector/ApiDocs/3.5.1/connector/com/datastax/spark/connector/index.html), [Connector Driver docs](https://datastax.github.io/spark-cassandra-connector/ApiDocs/3.5.1/driver/com/datastax/spark/connector/index.html) |
| Latest Production Release | [3.5.1](https://search.maven.org/artifact/com.datastax.spark/spark-cassandra-connector_2.12/3.5.1/jar) |

## News
### 3.5.1
- The latest release of the Spark-Cassandra-Connector introduces support for vector types, greatly enhancing its capabilities. This new feature allows developers to seamlessly integrate and work with Cassandra 5.0 and Astra vectors within the Spark ecosystem. By supporting vector types, the connector now provides insights into AI and Retrieval-Augmented Generation (RAG) data, enabling more advanced and efficient data processing and analysis.

## Features

Expand Down Expand Up @@ -55,7 +59,7 @@ Currently, the following branches are actively supported:

| Connector | Spark | Cassandra | Cassandra Java Driver | Minimum Java Version | Supported Scala Versions |
|-----------|---------------|----------------------------|-----------------------|----------------------|--------------------------|
| 3.5.1 | 3.5 | 2.1.5*, 2.2, 3.x, 4.x, 5.0 | 4.18 | 8 | 2.12, 2.13 |
| 3.5.1 | 3.5 | 2.1.5*, 2.2, 3.x, 4.x, 5.0 | 4.18.1 | 8 | 2.12, 2.13 |
| 3.5 | 3.5 | 2.1.5*, 2.2, 3.x, 4.x | 4.13 | 8 | 2.12, 2.13 |
| 3.4 | 3.4 | 2.1.5*, 2.2, 3.x, 4.x | 4.13 | 8 | 2.12, 2.13 |
| 3.3 | 3.3 | 2.1.5*, 2.2, 3.x, 4.x | 4.13 | 8 | 2.12 |
Expand All @@ -80,6 +84,9 @@ Currently, the following branches are actively supported:
## Hosted API Docs
API documentation for the Scala and Java interfaces are available online:

### 3.5.1
* [Spark-Cassandra-Connector](https://datastax.github.io/spark-cassandra-connector/ApiDocs/3.5.1/connector/com/datastax/spark/connector/index.html)

### 3.5.0
* [Spark-Cassandra-Connector](https://datastax.github.io/spark-cassandra-connector/ApiDocs/3.5.0/connector/com/datastax/spark/connector/index.html)

Expand Down Expand Up @@ -111,7 +118,7 @@ This project is available on the Maven Central Repository.
For SBT to download the connector binaries, sources and javadoc, put this in your project
SBT config:

libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "3.5.0"
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "3.5.1"

* The default Scala version for Spark 3.0+ is 2.12 please choose the appropriate build. See the
[FAQ](doc/FAQ.md) for more information.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ trait SparkCassandraITSpecBase
}

override def withFixture(test: NoArgTest): Outcome = wrapUnserializableExceptions {
super.withFixture(test)
super.withFixture(test)
}

def getKsName = {
Expand Down Expand Up @@ -145,18 +145,32 @@ trait SparkCassandraITSpecBase
else report(s"Skipped Because ProtocolVersion $pv < $protocolVersion")
}

/** Skips the given test if the Cluster Version is lower or equal to the given `cassandra` Version or `dse` Version
* (if this is a DSE cluster) */
def from(cassandra: Version, dse: Version)(f: => Unit): Unit = {
/** Runs the given test only if the cluster type and version matches.
*
* @param cassandra run the test if the cluster is Cassandra >= the given version;
* if `None`, never run the test for Cassandra clusters
* @param dse run the test if the cluster is DSE >= the given version;
* if `None`, never run the test for DSE clusters
* @param f the test to run
*/
def from(cassandra: Version, dse: Version)(f: => Unit): Unit = from(Option(cassandra), Option(dse))(f)

def from(cassandra: Option[Version] = None, dse: Option[Version] = None)(f: => Unit): Unit = {
if (isDse(conn)) {
from(dse)(f)
dse match {
case Some(dseVersion) => from(dseVersion)(f)
case None => report(s"Skipped because not DSE")
}
} else {
from(cassandra)(f)
cassandra match {
case Some(cassandraVersion) => from(cassandraVersion)(f)
case None => report(s"Skipped because not Cassandra")
}
}
}

/** Skips the given test if the Cluster Version is lower or equal to the given version */
def from(version: Version)(f: => Unit): Unit = {
/** Skips the given test if the Cluster Version is lower than the given version */
private def from(version: Version)(f: => Unit): Unit = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doc is not correct, right? It skips only when the version is lower.

skip(cluster.getCassandraVersion, version) { f }
}

Expand All @@ -172,7 +186,7 @@ trait SparkCassandraITSpecBase
else f
}

/** Skips the given test if the Cluster Version is lower or equal to the given version or the cluster is not DSE */
/** Skips the given test if the Cluster Version is lower than the given version or the cluster is not DSE */
def dseFrom(version: Version)(f: => Any): Unit = {
dseOnly {
skip(cluster.getDseVersion.get, version) { f }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.datastax.spark.connector.cql

import com.datastax.spark.connector.SparkCassandraITWordSpecBase
import com.datastax.spark.connector.ccm.CcmConfig
import com.datastax.spark.connector.cluster.DefaultCluster
import com.datastax.spark.connector.types._
import com.datastax.spark.connector.util.schemaFromCassandra
Expand Down Expand Up @@ -49,6 +50,9 @@ class SchemaSpec extends SparkCassandraITWordSpecBase with DefaultCluster {
s"""CREATE INDEX test_d9_m23423ap_idx ON $ks.test (full(d10_set))""")
session.execute(
s"""CREATE INDEX test_d7_int_idx ON $ks.test (d7_int)""")
from(Some(CcmConfig.V5_0_0), None) {
session.execute(s"ALTER TABLE $ks.test ADD d17_vector frozen<vector<int,3>>")
}

for (i <- 0 to 9) {
session.execute(s"insert into $ks.test (k1,k2,k3,c1,c2,c3,d10_set) " +
Expand Down Expand Up @@ -111,8 +115,8 @@ class SchemaSpec extends SparkCassandraITWordSpecBase with DefaultCluster {

"allow to read regular column definitions" in {
val columns = table.regularColumns
columns.size shouldBe 16
columns.map(_.columnName).toSet shouldBe Set(
columns.size should be >= 16
columns.map(_.columnName).toSet should contain allElementsOf Set(
"d1_blob", "d2_boolean", "d3_decimal", "d4_double", "d5_float",
"d6_inet", "d7_int", "d8_list", "d9_map", "d10_set",
"d11_timestamp", "d12_uuid", "d13_timeuuid", "d14_varchar",
Expand All @@ -136,6 +140,9 @@ class SchemaSpec extends SparkCassandraITWordSpecBase with DefaultCluster {
table.columnByName("d14_varchar").columnType shouldBe VarCharType
table.columnByName("d15_varint").columnType shouldBe VarIntType
table.columnByName("d16_address").columnType shouldBe a [UserDefinedType]
from(Some(CcmConfig.V5_0_0), None) {
table.columnByName("d17_vector").columnType shouldBe VectorType(IntType, 3)
}
}

"allow to list fields of a user defined type" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import com.datastax.oss.driver.api.core.cql.SimpleStatement
import com.datastax.oss.driver.api.core.cql.SimpleStatement._
import com.datastax.spark.connector._
import com.datastax.spark.connector.ccm.CcmConfig.{DSE_V6_7_0, V3_6_0}
import com.datastax.spark.connector.ccm.CcmConfig.{DSE_V5_1_0, DSE_V6_7_0, V3_6_0}
import com.datastax.spark.connector.cluster.DefaultCluster
import com.datastax.spark.connector.cql.{CassandraConnector, CassandraConnectorConf}
import com.datastax.spark.connector.mapper.{DefaultColumnMapper, JavaBeanColumnMapper, JavaTestBean, JavaTestUDTBean}
Expand Down Expand Up @@ -279,7 +279,7 @@
executor.execute(newInstance( s"""CREATE TABLE $ks.big_table (key INT PRIMARY KEY, value INT)"""))
val insert = session.prepare( s"""INSERT INTO $ks.big_table(key, value) VALUES (?, ?)""")
awaitAll {
for (k <- (0 until bigTableRowCount).grouped(100); i <- k) yield {

Check failure on line 282 in connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDSpec.scala

View workflow job for this annotation

GitHub Actions / build (2.13.13, 5.0-beta1)

CassandraRDDSpec.(It is not a test it is a sbt.testing.SuiteSelector)

com.datastax.oss.driver.api.core.AllNodesFailedException: All 1 node(s) tried for the query failed (showing first 1 nodes, use getAllErrors() for more): Node(endPoint=localhost/127.0.0.1:9042, hostId=836f93be-d374-4478-aebb-4343be720aa4, hashCode=580674de): [com.datastax.oss.driver.api.core.NodeUnavailableException: No connection was available to Node(endPoint=localhost/127.0.0.1:9042, hostId=836f93be-d374-4478-aebb-4343be720aa4, hashCode=580674de)]

Check failure on line 282 in connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDSpec.scala

View workflow job for this annotation

GitHub Actions / build (2.13.13, dse-6.8.44)

CassandraRDDSpec.(It is not a test it is a sbt.testing.SuiteSelector)

com.datastax.oss.driver.api.core.AllNodesFailedException: All 1 node(s) tried for the query failed (showing first 1 nodes, use getAllErrors() for more): Node(endPoint=localhost/127.0.0.1:9042, hostId=6cca8cc0-8f21-4ed0-9e85-b695850a80aa, hashCode=5bbfa995): [com.datastax.oss.driver.api.core.NodeUnavailableException: No connection was available to Node(endPoint=localhost/127.0.0.1:9042, hostId=6cca8cc0-8f21-4ed0-9e85-b695850a80aa, hashCode=5bbfa995)]
executor.executeAsync(insert.bind(i.asInstanceOf[AnyRef], i.asInstanceOf[AnyRef]))
}
}
Expand Down Expand Up @@ -794,7 +794,7 @@
results should contain ((KeyGroup(3, 300), (3, 300, "0003")))
}

it should "allow the use of PER PARTITION LIMITs " in from(V3_6_0) {
it should "allow the use of PER PARTITION LIMITs " in from(cassandra = V3_6_0, dse = DSE_V5_1_0) {
val result = sc.cassandraTable(ks, "clustering_time").perPartitionLimit(1).collect
result.length should be (1)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import com.datastax.oss.driver.api.core.config.DefaultDriverOption._
import com.datastax.oss.driver.api.core.cql.{AsyncResultSet, BoundStatement}
import com.datastax.oss.driver.api.core.{DefaultConsistencyLevel, DefaultProtocolVersion}
import com.datastax.spark.connector._
import com.datastax.spark.connector.ccm.CcmConfig.V3_6_0
import com.datastax.spark.connector.ccm.CcmConfig.{DSE_V5_1_0, V3_6_0}
import com.datastax.spark.connector.cluster.DefaultCluster
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector.embedded.SparkTemplate._
Expand Down Expand Up @@ -425,7 +425,7 @@ class RDDSpec extends SparkCassandraITFlatSpecBase with DefaultCluster {

}

it should "should be joinable with a PER PARTITION LIMIT limit" in from(V3_6_0){
it should "should be joinable with a PER PARTITION LIMIT limit" in from(cassandra = V3_6_0, dse = DSE_V5_1_0){
val source = sc.parallelize(keys).map(x => (x, x * 100))
val someCass = source
.joinWithCassandraTable(ks, wideTable, joinColumns = SomeColumns("key", "group"))
Expand Down
Loading
Loading