Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARKC-403: Add CLUSTERING ORDER in cql statement #981

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.datastax.spark.connector.cql

import java.io.IOException

import com.datastax.spark.connector._
import com.datastax.spark.connector.mapper.{DataFrameColumnMapper, ColumnMapper}
import org.apache.spark.Logging
Expand Down Expand Up @@ -92,7 +94,8 @@ case object RegularColumn extends ColumnRole
case class ColumnDef(
columnName: String,
columnRole: ColumnRole,
columnType: ColumnType[_]) extends FieldDef {
columnType: ColumnType[_],
clusteringOrder: ClusteringOrder = ClusteringOrder.ASC) extends FieldDef {

def ref: ColumnRef = ColumnName(columnName)
def isStatic = columnRole == StaticColumn
Expand Down Expand Up @@ -128,6 +131,15 @@ object ColumnDef {
val columnType = ColumnType.fromDriverType(column.getType)
ColumnDef(column.getName, columnRole, columnType)
}

def apply(
column: ColumnMetadata,
columnRole: ColumnRole,
clusteringOrder: ClusteringOrder): ColumnDef = {

val columnType = ColumnType.fromDriverType(column.getType)
ColumnDef(column.getName, columnRole, columnType, clusteringOrder)
}
}

/** A Cassandra table metadata that can be serialized. */
Expand All @@ -138,11 +150,13 @@ case class TableDef(
clusteringColumns: Seq[ColumnDef],
regularColumns: Seq[ColumnDef],
indexes: Seq[IndexDef] = Seq.empty,
isView: Boolean = false) extends StructDef {
isView: Boolean = false,
options: Seq[String] = Seq.empty) extends StructDef {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this should be tableOptions (matching the cql docs), and also now that I think about this perhaps it fits a Map better than a sequence? This would make it much clearer that we are looking for a set of key-value pairs.


require(partitionKey.forall(_.isPartitionKeyColumn), "All partition key columns must have role PartitionKeyColumn")
require(clusteringColumns.forall(_.isClusteringColumn), "All clustering columns must have role ClusteringColumn")
require(regularColumns.forall(!_.isPrimaryKeyColumn), "Regular columns cannot have role PrimaryKeyColumn")
require(options.forall( option => !(option.toLowerCase.contains("and") && !(option.toLowerCase.contains("with")))), "Table options must not contain WITH OR AND")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if I want to set my comment on a table to Sand Castles with Judge's Rankings. Basically I think we should let the driver validate the tableOptions. If we want to test them here we should probably only test keys (once we change the tableOptions to a map). I think we are probably best off without the requires here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed accordingly. What will be the better option to validate tableOptions keys?


val allColumns = regularColumns ++ clusteringColumns ++ partitionKey

Expand Down Expand Up @@ -185,10 +199,15 @@ case class TableDef(
val clusteringColumnNames = clusteringColumns.map(_.columnName).map(quote)
val primaryKeyClause = (partitionKeyClause +: clusteringColumnNames).mkString(", ")

s"""CREATE TABLE ${quote(keyspaceName)}.${quote(tableName)} (
val stmt = s"""CREATE TABLE ${quote(keyspaceName)}.${quote(tableName)} (
| $columnList,
| PRIMARY KEY ($primaryKeyClause)
|)""".stripMargin
val ordered = clusteringColumns.map( col => s"${quote(col.columnName)} ${col.clusteringOrder}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit of an overloaded variable name. Perhaps clusterOrderingClause ?

.mkString("CLUSTERING ORDER BY (", ", ",")")

val orderWithOptions:Seq[String] = if (clusteringColumns.size > 0) options.+:(ordered) else options
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets just go straight to the treble quote here, I think it may be a bit clearer like

val tableOptionsClause = s"WITH $clusteringOrderingClause ${(tableOptions...).mkString(AND)}"
"""$stmt $tableOptionsClause"""

if (orderWithOptions.size > 0) s"""$stmt${Properties.lineSeparator}WITH ${orderWithOptions.mkString(s"${Properties.lineSeparator} AND ")}""" else stmt
}

type ValueRepr = CassandraRow
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package com.datastax.spark.connector.cql

import com.datastax.spark.connector.{PartitionKeyColumns, TTL, SomeColumns, AllColumns}
import com.datastax.driver.core.ClusteringOrder
import com.datastax.spark.connector.{AllColumns, PartitionKeyColumns, SomeColumns, TTL}
import com.datastax.spark.connector.types._
import org.scalatest.{WordSpec, Matchers}
import org.scalatest.{Matchers, WordSpec}

class TableDefSpec extends WordSpec with Matchers {

Expand Down Expand Up @@ -32,7 +33,28 @@ class TableDefSpec extends WordSpec with Matchers {
| "c2" varchar,
| "c3" varchar,
| PRIMARY KEY (("c1"), "c2")
|)""".stripMargin
|)
|WITH CLUSTERING ORDER BY ("c2" ASC)""".stripMargin
)
}

"it contains clustering columns with order" in {
val column1 = ColumnDef("c1", PartitionKeyColumn, IntType)
val column2 = ColumnDef("c2", PartitionKeyColumn, VarCharType)
val column3 = ColumnDef("c3", ClusteringColumn(0), VarCharType, ClusteringOrder.DESC)
val column4 = ColumnDef("c4", ClusteringColumn(1), VarCharType)
val column5 = ColumnDef("c5", RegularColumn, VarCharType)
val tableDef = TableDef("keyspace", "table", Seq(column1, column2), Seq(column3, column4), Seq(column5))
tableDef.cql should be(
"""CREATE TABLE "keyspace"."table" (
| "c1" int,
| "c2" varchar,
| "c3" varchar,
| "c4" varchar,
| "c5" varchar,
| PRIMARY KEY (("c1", "c2"), "c3", "c4")
|)
|WITH CLUSTERING ORDER BY ("c3" DESC, "c4" ASC)""".stripMargin
)
}

Expand All @@ -51,7 +73,8 @@ class TableDefSpec extends WordSpec with Matchers {
| "c4" varchar,
| "c5" varchar,
| PRIMARY KEY (("c1", "c2"), "c3", "c4")
|)""".stripMargin
|)
|WITH CLUSTERING ORDER BY ("c3" ASC, "c4" ASC)""".stripMargin
)
}

Expand All @@ -67,6 +90,39 @@ class TableDefSpec extends WordSpec with Matchers {
|)""".stripMargin
)
}

"it contains options" in {
val column1 = ColumnDef("c1", PartitionKeyColumn, IntType)
val column2 = ColumnDef("c2", RegularColumn, VarCharType)
val column3 = ColumnDef("c3", RegularColumn, VarCharType)
val tableDef = TableDef("keyspace", "table", Seq(column1), Seq.empty, Seq(column2,column3),options=Seq("bloom_filter_fp_chance = 0.01"))
tableDef.cql should be(
"""CREATE TABLE "keyspace"."table" (
| "c1" int,
| "c2" varchar,
| "c3" varchar,
| PRIMARY KEY (("c1"))
|)
|WITH bloom_filter_fp_chance = 0.01""".stripMargin
)
}

"it contains clustering column and options" in {
val column1 = ColumnDef("c1", PartitionKeyColumn, IntType)
val column2 = ColumnDef("c2", ClusteringColumn(0), VarCharType)
val column3 = ColumnDef("c3", RegularColumn, VarCharType)
val tableDef = TableDef("keyspace", "table", Seq(column1), Seq(column2), Seq(column3),options=Seq("bloom_filter_fp_chance = 0.01"))
tableDef.cql should be(
"""CREATE TABLE "keyspace"."table" (
| "c1" int,
| "c2" varchar,
| "c3" varchar,
| PRIMARY KEY (("c1"), "c2")
|)
|WITH CLUSTERING ORDER BY ("c2" ASC)
| AND bloom_filter_fp_chance = 0.01""".stripMargin
)
}
}
}
}