Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARKC-403: Add CLUSTERING ORDER in cql statement #981

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.datastax.spark.connector.cql

import java.io.IOException

import com.datastax.spark.connector._
import com.datastax.spark.connector.mapper.{DataFrameColumnMapper, ColumnMapper}
import org.apache.spark.Logging
Expand Down Expand Up @@ -92,7 +94,8 @@ case object RegularColumn extends ColumnRole
case class ColumnDef(
columnName: String,
columnRole: ColumnRole,
columnType: ColumnType[_]) extends FieldDef {
columnType: ColumnType[_],
clusteringOrder: ClusteringOrder = ClusteringOrder.ASC) extends FieldDef {

def ref: ColumnRef = ColumnName(columnName)
def isStatic = columnRole == StaticColumn
Expand Down Expand Up @@ -128,6 +131,15 @@ object ColumnDef {
val columnType = ColumnType.fromDriverType(column.getType)
ColumnDef(column.getName, columnRole, columnType)
}

def apply(
column: ColumnMetadata,
columnRole: ColumnRole,
clusteringOrder: ClusteringOrder): ColumnDef = {

val columnType = ColumnType.fromDriverType(column.getType)
ColumnDef(column.getName, columnRole, columnType, clusteringOrder)
}
}

/** A Cassandra table metadata that can be serialized. */
Expand All @@ -138,7 +150,8 @@ case class TableDef(
clusteringColumns: Seq[ColumnDef],
regularColumns: Seq[ColumnDef],
indexes: Seq[IndexDef] = Seq.empty,
isView: Boolean = false) extends StructDef {
isView: Boolean = false,
tableOptions: Map[String,String] = Map.empty) extends StructDef {

require(partitionKey.forall(_.isPartitionKeyColumn), "All partition key columns must have role PartitionKeyColumn")
require(clusteringColumns.forall(_.isClusteringColumn), "All clustering columns must have role ClusteringColumn")
Expand Down Expand Up @@ -185,10 +198,24 @@ case class TableDef(
val clusteringColumnNames = clusteringColumns.map(_.columnName).map(quote)
val primaryKeyClause = (partitionKeyClause +: clusteringColumnNames).mkString(", ")

s"""CREATE TABLE ${quote(keyspaceName)}.${quote(tableName)} (
val stmt = s"""CREATE TABLE ${quote(keyspaceName)}.${quote(tableName)} (
| $columnList,
| PRIMARY KEY ($primaryKeyClause)
|)""".stripMargin

val clusteringOrderingClause = clusteringColumns.map( col => s"${quote(col.columnName)} ${col.clusteringOrder}")
.mkString("CLUSTERING ORDER BY (", ", ",")")

val tableOptionsString:Seq[String] = tableOptions.map(option => s"${option._1} = ${option._2}").toSeq

val tableOptionsClause:Seq[String] = if (clusteringColumns.size > 0)
tableOptionsString.+:(clusteringOrderingClause)
else tableOptionsString

if (tableOptionsClause.size > 0)
s"""$stmt${Properties.lineSeparator}WITH ${tableOptionsClause.mkString(s"${Properties.lineSeparator} AND ")}"""
else
stmt
}

type ValueRepr = CassandraRow
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package com.datastax.spark.connector.cql

import com.datastax.spark.connector.{PartitionKeyColumns, TTL, SomeColumns, AllColumns}
import com.datastax.driver.core.ClusteringOrder
import com.datastax.spark.connector.{AllColumns, PartitionKeyColumns, SomeColumns, TTL}
import com.datastax.spark.connector.types._
import org.scalatest.{WordSpec, Matchers}
import org.scalatest.{Matchers, WordSpec}

class TableDefSpec extends WordSpec with Matchers {

Expand Down Expand Up @@ -32,7 +33,28 @@ class TableDefSpec extends WordSpec with Matchers {
| "c2" varchar,
| "c3" varchar,
| PRIMARY KEY (("c1"), "c2")
|)""".stripMargin
|)
|WITH CLUSTERING ORDER BY ("c2" ASC)""".stripMargin
)
}

"it contains clustering columns with order" in {
val column1 = ColumnDef("c1", PartitionKeyColumn, IntType)
val column2 = ColumnDef("c2", PartitionKeyColumn, VarCharType)
val column3 = ColumnDef("c3", ClusteringColumn(0), VarCharType, ClusteringOrder.DESC)
val column4 = ColumnDef("c4", ClusteringColumn(1), VarCharType)
val column5 = ColumnDef("c5", RegularColumn, VarCharType)
val tableDef = TableDef("keyspace", "table", Seq(column1, column2), Seq(column3, column4), Seq(column5))
tableDef.cql should be(
"""CREATE TABLE "keyspace"."table" (
| "c1" int,
| "c2" varchar,
| "c3" varchar,
| "c4" varchar,
| "c5" varchar,
| PRIMARY KEY (("c1", "c2"), "c3", "c4")
|)
|WITH CLUSTERING ORDER BY ("c3" DESC, "c4" ASC)""".stripMargin
)
}

Expand All @@ -51,7 +73,8 @@ class TableDefSpec extends WordSpec with Matchers {
| "c4" varchar,
| "c5" varchar,
| PRIMARY KEY (("c1", "c2"), "c3", "c4")
|)""".stripMargin
|)
|WITH CLUSTERING ORDER BY ("c3" ASC, "c4" ASC)""".stripMargin
)
}

Expand All @@ -67,6 +90,40 @@ class TableDefSpec extends WordSpec with Matchers {
|)""".stripMargin
)
}

"it contains options" in {
val column1 = ColumnDef("c1", PartitionKeyColumn, IntType)
val column2 = ColumnDef("c2", RegularColumn, VarCharType)
val column3 = ColumnDef("c3", RegularColumn, VarCharType)
val tableDef = TableDef("keyspace", "table", Seq(column1), Seq.empty, Seq(column2,column3),
tableOptions=Map("bloom_filter_fp_chance" -> "0.01"))
tableDef.cql should be(
"""CREATE TABLE "keyspace"."table" (
| "c1" int,
| "c2" varchar,
| "c3" varchar,
| PRIMARY KEY (("c1"))
|)
|WITH bloom_filter_fp_chance = 0.01""".stripMargin
)
}

"it contains clustering column and options" in {
val column1 = ColumnDef("c1", PartitionKeyColumn, IntType)
val column2 = ColumnDef("c2", ClusteringColumn(0), VarCharType)
val column3 = ColumnDef("c3", RegularColumn, VarCharType)
val tableDef = TableDef("keyspace", "table", Seq(column1), Seq(column2), Seq(column3),tableOptions=Map("bloom_filter_fp_chance" -> "0.01"))
tableDef.cql should be(
"""CREATE TABLE "keyspace"."table" (
| "c1" int,
| "c2" varchar,
| "c3" varchar,
| PRIMARY KEY (("c1"), "c2")
|)
|WITH CLUSTERING ORDER BY ("c2" ASC)
| AND bloom_filter_fp_chance = 0.01""".stripMargin
)
}
}
}
}