Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARKC-403: Add CLUSTERING ORDER in cql statement #981

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.datastax.spark.connector.cql

import java.io.IOException

import com.datastax.spark.connector._
import com.datastax.spark.connector.mapper.{DataFrameColumnMapper, ColumnMapper}
import org.apache.spark.Logging
Expand Down Expand Up @@ -92,7 +94,8 @@ case object RegularColumn extends ColumnRole
case class ColumnDef(
columnName: String,
columnRole: ColumnRole,
columnType: ColumnType[_]) extends FieldDef {
columnType: ColumnType[_],
clusteringOrder: ClusteringOrder = ClusteringOrder.ASC) extends FieldDef {

def ref: ColumnRef = ColumnName(columnName)
def isStatic = columnRole == StaticColumn
Expand Down Expand Up @@ -128,6 +131,15 @@ object ColumnDef {
val columnType = ColumnType.fromDriverType(column.getType)
ColumnDef(column.getName, columnRole, columnType)
}

def apply(
column: ColumnMetadata,
columnRole: ColumnRole,
clusteringOrder: ClusteringOrder): ColumnDef = {

val columnType = ColumnType.fromDriverType(column.getType)
ColumnDef(column.getName, columnRole, columnType, clusteringOrder)
}
}

/** A Cassandra table metadata that can be serialized. */
Expand All @@ -138,7 +150,8 @@ case class TableDef(
clusteringColumns: Seq[ColumnDef],
regularColumns: Seq[ColumnDef],
indexes: Seq[IndexDef] = Seq.empty,
isView: Boolean = false) extends StructDef {
isView: Boolean = false,
options: String = "") extends StructDef {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't like being able to just pass a string here, If you really think we need this I think it should at least be Seq[String] and we should just require that they not contain "AND" or "WITH". Then we can convert the append code below into a string join instead of having the more complicated logic.

This removes the need for the "appendOptions" function and replaces it with

require(options.forAll( option => !(option.toLowerCase.contains("and") && !(option.toLowerCase.contains("WITH")), "Table options must not contain "WITH OR AND"
(options +: clusteringOptions).mkString("WITH", "AND")```


require(partitionKey.forall(_.isPartitionKeyColumn), "All partition key columns must have role PartitionKeyColumn")
require(clusteringColumns.forall(_.isClusteringColumn), "All clustering columns must have role ClusteringColumn")
Expand Down Expand Up @@ -185,11 +198,25 @@ case class TableDef(
val clusteringColumnNames = clusteringColumns.map(_.columnName).map(quote)
val primaryKeyClause = (partitionKeyClause +: clusteringColumnNames).mkString(", ")

s"""CREATE TABLE ${quote(keyspaceName)}.${quote(tableName)} (
val stmt = s"""CREATE TABLE ${quote(keyspaceName)}.${quote(tableName)} (
| $columnList,
| PRIMARY KEY ($primaryKeyClause)
|)""".stripMargin
val ordered = if (clusteringColumns.size > 0)
Copy link
Contributor

@RussellSpitzer RussellSpitzer Aug 9, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could be

orderOptions = clusteringColumns.map( col => 
           s"${quote(col.columnName)} ${col.clusteringOrder}")
        .mkString("WITH CLUSTERING ORDER BY", ",")

To remove the If Statements and separate clusteringColumnOrder function

s"$stmt${Properties.lineSeparator}WITH CLUSTERING ORDER BY (${clusteringColumnOrder(clusteringColumns)})"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also when including line seps I think is fine to just use """ quotes

else stmt
appendOptions(ordered, options)
}
private[this] def clusteringColumnOrder(clusteringColumns: Seq[ColumnDef]): String =
clusteringColumns.map { col => s"${quote(col.columnName)} ${col.clusteringOrder}"}.mkString(", ")

def clusterOrder: Seq[ClusteringOrder] = clusteringColumns.map(_.clusteringOrder)

private[this] def appendOptions(stmt: String, opts: String) =
if (stmt.contains("WITH") && opts.startsWith("WITH")) s"$stmt${Properties.lineSeparator}AND ${opts.substring(4)}"
else if (!stmt.contains("WITH") && opts.startsWith("AND")) s"WITH ${opts.substring(3)}"
else if (opts == "") s"$stmt"
else s"$stmt${Properties.lineSeparator}$opts"

type ValueRepr = CassandraRow

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package com.datastax.spark.connector.cql

import com.datastax.spark.connector.{PartitionKeyColumns, TTL, SomeColumns, AllColumns}
import com.datastax.driver.core.ClusteringOrder
import com.datastax.spark.connector.{AllColumns, PartitionKeyColumns, SomeColumns, TTL}
import com.datastax.spark.connector.types._
import org.scalatest.{WordSpec, Matchers}
import org.scalatest.{Matchers, WordSpec}

class TableDefSpec extends WordSpec with Matchers {

Expand Down Expand Up @@ -32,7 +33,28 @@ class TableDefSpec extends WordSpec with Matchers {
| "c2" varchar,
| "c3" varchar,
| PRIMARY KEY (("c1"), "c2")
|)""".stripMargin
|)
|WITH CLUSTERING ORDER BY ("c2" ASC)""".stripMargin
)
}

"it contains clustering columns with order" in {
val column1 = ColumnDef("c1", PartitionKeyColumn, IntType)
val column2 = ColumnDef("c2", PartitionKeyColumn, VarCharType)
val column3 = ColumnDef("c3", ClusteringColumn(0), VarCharType, ClusteringOrder.DESC)
val column4 = ColumnDef("c4", ClusteringColumn(1), VarCharType)
val column5 = ColumnDef("c5", RegularColumn, VarCharType)
val tableDef = TableDef("keyspace", "table", Seq(column1, column2), Seq(column3, column4), Seq(column5))
tableDef.cql should be(
"""CREATE TABLE "keyspace"."table" (
| "c1" int,
| "c2" varchar,
| "c3" varchar,
| "c4" varchar,
| "c5" varchar,
| PRIMARY KEY (("c1", "c2"), "c3", "c4")
|)
|WITH CLUSTERING ORDER BY ("c3" DESC, "c4" ASC)""".stripMargin
)
}

Expand All @@ -51,7 +73,8 @@ class TableDefSpec extends WordSpec with Matchers {
| "c4" varchar,
| "c5" varchar,
| PRIMARY KEY (("c1", "c2"), "c3", "c4")
|)""".stripMargin
|)
|WITH CLUSTERING ORDER BY ("c3" ASC, "c4" ASC)""".stripMargin
)
}

Expand Down