From 4ae3ff959e282ebc6957aaa5adaa80599f08fb42 Mon Sep 17 00:00:00 2001 From: kaushal Date: Tue, 17 May 2016 14:57:16 +0530 Subject: [PATCH 1/7] Add CLUSTERING ORDER in cql statement --- .../datastax/spark/connector/cql/Schema.scala | 50 +++++++++++++++++-- .../spark/connector/cql/TableDefSpec.scala | 26 +++++++++- 2 files changed, 71 insertions(+), 5 deletions(-) diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/Schema.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/Schema.scala index 311596d03..29bfb7946 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/Schema.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/Schema.scala @@ -88,11 +88,30 @@ case class ClusteringColumn(index: Int) extends ColumnRole case object StaticColumn extends ColumnRole case object RegularColumn extends ColumnRole +/** + * Cluster Column Order Implementation */ +object ClusteringOrder { + abstract sealed class Order(val ordinal: Int) extends Ordered[Order] + with Serializable { + def compare(that: Order) = that.ordinal compare this.ordinal + + def toInt: Int = this.ordinal + } + + case object Ascending extends Order(0) + case object Descending extends Order(1) + + def fromInt(i: Int): Order = values.find(_.ordinal == i).get + + val values = Set(Ascending, Descending) +} + /** A Cassandra column metadata that can be serialized. */ case class ColumnDef( columnName: String, columnRole: ColumnRole, - columnType: ColumnType[_]) extends FieldDef { + columnType: ColumnType[_], + clusteringOrder: ClusteringOrder.Order = ClusteringOrder.Ascending) extends FieldDef { def ref: ColumnRef = ColumnName(columnName) def isStatic = columnRole == StaticColumn @@ -128,6 +147,15 @@ object ColumnDef { val columnType = ColumnType.fromDriverType(column.getType) ColumnDef(column.getName, columnRole, columnType) } + + def apply( + column: ColumnMetadata, + columnRole: ColumnRole, + clusteringOrder: ClusteringOrder.Order): ColumnDef = { + + val columnType = ColumnType.fromDriverType(column.getType) + ColumnDef(column.getName, columnRole, columnType, clusteringOrder) + } } /** A Cassandra table metadata that can be serialized. */ @@ -138,7 +166,8 @@ case class TableDef( clusteringColumns: Seq[ColumnDef], regularColumns: Seq[ColumnDef], indexes: Seq[IndexDef] = Seq.empty, - isView: Boolean = false) extends StructDef { + isView: Boolean = false, + options: String = "") extends StructDef { require(partitionKey.forall(_.isPartitionKeyColumn), "All partition key columns must have role PartitionKeyColumn") require(clusteringColumns.forall(_.isClusteringColumn), "All clustering columns must have role ClusteringColumn") @@ -185,11 +214,26 @@ case class TableDef( val clusteringColumnNames = clusteringColumns.map(_.columnName).map(quote) val primaryKeyClause = (partitionKeyClause +: clusteringColumnNames).mkString(", ") - s"""CREATE TABLE ${quote(keyspaceName)}.${quote(tableName)} ( + val stmt = s"""CREATE TABLE ${quote(keyspaceName)}.${quote(tableName)} ( | $columnList, | PRIMARY KEY ($primaryKeyClause) |)""".stripMargin + val ordered = if (clusteringColumns.size > 0) + s"$stmt${Properties.lineSeparator}WITH CLUSTERING ORDER BY (${clusteringColumnOrder(clusteringColumns)})" + else stmt + appendOptions(ordered, options) } + private[this] def clusteringColumnOrder(clusteringColumns: Seq[ColumnDef]): String = + clusteringColumns.map { col => + if (col.clusteringOrder == ClusteringOrder.Descending) + s"${quote(col.columnName)} DESC" else s"${quote(col.columnName)} ASC" + }.toList.mkString(", ") + + private[this] def appendOptions(stmt: String, opts: String) = + if (stmt.contains("WITH") && opts.startsWith("WITH")) s"$stmt${Properties.lineSeparator}AND ${opts.substring(4)}" + else if (!stmt.contains("WITH") && opts.startsWith("AND")) s"WITH ${opts.substring(3)}" + else if (opts == "") s"$stmt" + else s"$stmt${Properties.lineSeparator}$opts" type ValueRepr = CassandraRow diff --git a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/cql/TableDefSpec.scala b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/cql/TableDefSpec.scala index 01c289d6a..f5a614781 100644 --- a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/cql/TableDefSpec.scala +++ b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/cql/TableDefSpec.scala @@ -32,7 +32,28 @@ class TableDefSpec extends WordSpec with Matchers { | "c2" varchar, | "c3" varchar, | PRIMARY KEY (("c1"), "c2") - |)""".stripMargin + |) + |WITH CLUSTERING ORDER BY ("c2" ASC)""".stripMargin + ) + } + + "it contains clustering columns with order" in { + val column1 = ColumnDef("c1", PartitionKeyColumn, IntType) + val column2 = ColumnDef("c2", PartitionKeyColumn, VarCharType) + val column3 = ColumnDef("c3", ClusteringColumn(0), VarCharType, ClusteringOrder.Descending) + val column4 = ColumnDef("c4", ClusteringColumn(1), VarCharType) + val column5 = ColumnDef("c5", RegularColumn, VarCharType) + val tableDef = TableDef("keyspace", "table", Seq(column1, column2), Seq(column3, column4), Seq(column5)) + tableDef.cql should be( + """CREATE TABLE "keyspace"."table" ( + | "c1" int, + | "c2" varchar, + | "c3" varchar, + | "c4" varchar, + | "c5" varchar, + | PRIMARY KEY (("c1", "c2"), "c3", "c4") + |) + |WITH CLUSTERING ORDER BY ("c3" DESC, "c4" ASC)""".stripMargin ) } @@ -51,7 +72,8 @@ class TableDefSpec extends WordSpec with Matchers { | "c4" varchar, | "c5" varchar, | PRIMARY KEY (("c1", "c2"), "c3", "c4") - |)""".stripMargin + |) + |WITH CLUSTERING ORDER BY ("c3" ASC, "c4" ASC)""".stripMargin ) } From 5bc7cc6e7396515276b7f9cc6b219f7755ce85b4 Mon Sep 17 00:00:00 2001 From: kaushal Date: Tue, 17 May 2016 19:06:07 +0530 Subject: [PATCH 2/7] Add CLUSTERING ORDER in cql statement --- .../main/scala/com/datastax/spark/connector/cql/Schema.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/Schema.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/Schema.scala index 29bfb7946..1f9d23b52 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/Schema.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/Schema.scala @@ -227,7 +227,7 @@ case class TableDef( clusteringColumns.map { col => if (col.clusteringOrder == ClusteringOrder.Descending) s"${quote(col.columnName)} DESC" else s"${quote(col.columnName)} ASC" - }.toList.mkString(", ") + }.toList.mkString(", ") private[this] def appendOptions(stmt: String, opts: String) = if (stmt.contains("WITH") && opts.startsWith("WITH")) s"$stmt${Properties.lineSeparator}AND ${opts.substring(4)}" From 4dd085c4dcdc4e5509c383e550b84abd683897be Mon Sep 17 00:00:00 2001 From: kaushal Date: Wed, 27 Jul 2016 18:03:18 +0530 Subject: [PATCH 3/7] Changed custom ClusteringOrder class to com.datastax.driver.core.ClusteringOrder and Added list of Clustering order to TableDef --- .../datastax/spark/connector/cql/Schema.scala | 49 ++++++++----------- .../spark/connector/cql/TableDefSpec.scala | 28 +++++++++-- 2 files changed, 46 insertions(+), 31 deletions(-) diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/Schema.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/Schema.scala index 1f9d23b52..7c1871649 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/Schema.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/Schema.scala @@ -1,5 +1,7 @@ package com.datastax.spark.connector.cql +import java.io.IOException + import com.datastax.spark.connector._ import com.datastax.spark.connector.mapper.{DataFrameColumnMapper, ColumnMapper} import org.apache.spark.Logging @@ -8,7 +10,6 @@ import org.apache.spark.sql.DataFrame import scala.collection.JavaConversions._ import scala.language.existentials import scala.util.{Properties, Try} - import com.datastax.driver.core._ import com.datastax.spark.connector.types.{CounterType, ColumnType} import com.datastax.spark.connector.util.Quote._ @@ -88,30 +89,12 @@ case class ClusteringColumn(index: Int) extends ColumnRole case object StaticColumn extends ColumnRole case object RegularColumn extends ColumnRole -/** - * Cluster Column Order Implementation */ -object ClusteringOrder { - abstract sealed class Order(val ordinal: Int) extends Ordered[Order] - with Serializable { - def compare(that: Order) = that.ordinal compare this.ordinal - - def toInt: Int = this.ordinal - } - - case object Ascending extends Order(0) - case object Descending extends Order(1) - - def fromInt(i: Int): Order = values.find(_.ordinal == i).get - - val values = Set(Ascending, Descending) -} - /** A Cassandra column metadata that can be serialized. */ case class ColumnDef( columnName: String, columnRole: ColumnRole, columnType: ColumnType[_], - clusteringOrder: ClusteringOrder.Order = ClusteringOrder.Ascending) extends FieldDef { + clusteringOrder: ClusteringOrder = ClusteringOrder.ASC) extends FieldDef { def ref: ColumnRef = ColumnName(columnName) def isStatic = columnRole == StaticColumn @@ -151,7 +134,7 @@ object ColumnDef { def apply( column: ColumnMetadata, columnRole: ColumnRole, - clusteringOrder: ClusteringOrder.Order): ColumnDef = { + clusteringOrder: ClusteringOrder): ColumnDef = { val columnType = ColumnType.fromDriverType(column.getType) ColumnDef(column.getName, columnRole, columnType, clusteringOrder) @@ -166,7 +149,8 @@ case class TableDef( clusteringColumns: Seq[ColumnDef], regularColumns: Seq[ColumnDef], indexes: Seq[IndexDef] = Seq.empty, - isView: Boolean = false, + isView: Boolean = false, + clusteringOrder: Option[Seq[ClusteringOrder]] = None, options: String = "") extends StructDef { require(partitionKey.forall(_.isPartitionKeyColumn), "All partition key columns must have role PartitionKeyColumn") @@ -218,16 +202,25 @@ case class TableDef( | $columnList, | PRIMARY KEY ($primaryKeyClause) |)""".stripMargin - val ordered = if (clusteringColumns.size > 0) - s"$stmt${Properties.lineSeparator}WITH CLUSTERING ORDER BY (${clusteringColumnOrder(clusteringColumns)})" + val ordered = if (clusteringColumns.nonEmpty) + s"$stmt${Properties.lineSeparator}WITH CLUSTERING ORDER BY (${clusteringColumnOrder(clusteringColumns,clusteringOrder)})" else stmt appendOptions(ordered, options) } - private[this] def clusteringColumnOrder(clusteringColumns: Seq[ColumnDef]): String = - clusteringColumns.map { col => - if (col.clusteringOrder == ClusteringOrder.Descending) + private[this] def clusteringColumnOrder(clusteringColumns: Seq[ColumnDef], clusteringOrder: Option[Seq[ClusteringOrder]]): String = { + + val clusteringCols = clusteringOrder match { + case Some(orders) if orders.size == clusteringColumns.size => + for ( (col, order) <- clusteringColumns zip orders) yield col.copy(clusteringOrder = order) + case Some(e) => throw new IOException("clusteringOrder size is not matching with Clustering Columns") + case None => clusteringColumns + } + + clusteringCols.map { col => + if (col.clusteringOrder == ClusteringOrder.DESC) s"${quote(col.columnName)} DESC" else s"${quote(col.columnName)} ASC" - }.toList.mkString(", ") + }.toList.mkString(", ") + } private[this] def appendOptions(stmt: String, opts: String) = if (stmt.contains("WITH") && opts.startsWith("WITH")) s"$stmt${Properties.lineSeparator}AND ${opts.substring(4)}" diff --git a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/cql/TableDefSpec.scala b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/cql/TableDefSpec.scala index f5a614781..43078aa8d 100644 --- a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/cql/TableDefSpec.scala +++ b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/cql/TableDefSpec.scala @@ -1,8 +1,9 @@ package com.datastax.spark.connector.cql -import com.datastax.spark.connector.{PartitionKeyColumns, TTL, SomeColumns, AllColumns} +import com.datastax.driver.core.ClusteringOrder +import com.datastax.spark.connector.{AllColumns, PartitionKeyColumns, SomeColumns, TTL} import com.datastax.spark.connector.types._ -import org.scalatest.{WordSpec, Matchers} +import org.scalatest.{Matchers, WordSpec} class TableDefSpec extends WordSpec with Matchers { @@ -40,7 +41,7 @@ class TableDefSpec extends WordSpec with Matchers { "it contains clustering columns with order" in { val column1 = ColumnDef("c1", PartitionKeyColumn, IntType) val column2 = ColumnDef("c2", PartitionKeyColumn, VarCharType) - val column3 = ColumnDef("c3", ClusteringColumn(0), VarCharType, ClusteringOrder.Descending) + val column3 = ColumnDef("c3", ClusteringColumn(0), VarCharType, ClusteringOrder.DESC) val column4 = ColumnDef("c4", ClusteringColumn(1), VarCharType) val column5 = ColumnDef("c5", RegularColumn, VarCharType) val tableDef = TableDef("keyspace", "table", Seq(column1, column2), Seq(column3, column4), Seq(column5)) @@ -57,6 +58,27 @@ class TableDefSpec extends WordSpec with Matchers { ) } + "it contains clustering columns with order and override order using TableDef" in { + val column1 = ColumnDef("c1", PartitionKeyColumn, IntType) + val column2 = ColumnDef("c2", PartitionKeyColumn, VarCharType) + val column3 = ColumnDef("c3", ClusteringColumn(0), VarCharType, ClusteringOrder.DESC) + val column4 = ColumnDef("c4", ClusteringColumn(1), VarCharType) + val column5 = ColumnDef("c5", RegularColumn, VarCharType) + val tableDef = TableDef("keyspace", "table", Seq(column1, column2), Seq(column3, column4), Seq(column5) + ,clusteringOrder=Option(List(ClusteringOrder.ASC,ClusteringOrder.DESC))) + tableDef.cql should be( + """CREATE TABLE "keyspace"."table" ( + | "c1" int, + | "c2" varchar, + | "c3" varchar, + | "c4" varchar, + | "c5" varchar, + | PRIMARY KEY (("c1", "c2"), "c3", "c4") + |) + |WITH CLUSTERING ORDER BY ("c3" ASC, "c4" DESC)""".stripMargin + ) + } + "it contains compound partition key and multiple clustering columns" in { val column1 = ColumnDef("c1", PartitionKeyColumn, IntType) val column2 = ColumnDef("c2", PartitionKeyColumn, VarCharType) From bae67de51796d787987e24a0de53a2ea64d633da Mon Sep 17 00:00:00 2001 From: kaushal Date: Mon, 1 Aug 2016 14:53:26 +0530 Subject: [PATCH 4/7] Removed list of Clustering order from TableDef --- .../datastax/spark/connector/cql/Schema.scala | 24 ++++++------------- .../spark/connector/cql/TableDefSpec.scala | 21 ---------------- 2 files changed, 7 insertions(+), 38 deletions(-) diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/Schema.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/Schema.scala index 7c1871649..82b3089ec 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/Schema.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/Schema.scala @@ -10,6 +10,7 @@ import org.apache.spark.sql.DataFrame import scala.collection.JavaConversions._ import scala.language.existentials import scala.util.{Properties, Try} + import com.datastax.driver.core._ import com.datastax.spark.connector.types.{CounterType, ColumnType} import com.datastax.spark.connector.util.Quote._ @@ -130,7 +131,7 @@ object ColumnDef { val columnType = ColumnType.fromDriverType(column.getType) ColumnDef(column.getName, columnRole, columnType) } - + def apply( column: ColumnMetadata, columnRole: ColumnRole, @@ -150,7 +151,6 @@ case class TableDef( regularColumns: Seq[ColumnDef], indexes: Seq[IndexDef] = Seq.empty, isView: Boolean = false, - clusteringOrder: Option[Seq[ClusteringOrder]] = None, options: String = "") extends StructDef { require(partitionKey.forall(_.isPartitionKeyColumn), "All partition key columns must have role PartitionKeyColumn") @@ -202,25 +202,15 @@ case class TableDef( | $columnList, | PRIMARY KEY ($primaryKeyClause) |)""".stripMargin - val ordered = if (clusteringColumns.nonEmpty) - s"$stmt${Properties.lineSeparator}WITH CLUSTERING ORDER BY (${clusteringColumnOrder(clusteringColumns,clusteringOrder)})" + val ordered = if (clusteringColumns.size > 0) + s"$stmt${Properties.lineSeparator}WITH CLUSTERING ORDER BY (${clusteringColumnOrder(clusteringColumns)})" else stmt appendOptions(ordered, options) } - private[this] def clusteringColumnOrder(clusteringColumns: Seq[ColumnDef], clusteringOrder: Option[Seq[ClusteringOrder]]): String = { + private[this] def clusteringColumnOrder(clusteringColumns: Seq[ColumnDef]): String = + clusteringColumns.map { col => s"${quote(col.columnName)} ${col.clusteringOrder}"}.mkString(", ") - val clusteringCols = clusteringOrder match { - case Some(orders) if orders.size == clusteringColumns.size => - for ( (col, order) <- clusteringColumns zip orders) yield col.copy(clusteringOrder = order) - case Some(e) => throw new IOException("clusteringOrder size is not matching with Clustering Columns") - case None => clusteringColumns - } - - clusteringCols.map { col => - if (col.clusteringOrder == ClusteringOrder.DESC) - s"${quote(col.columnName)} DESC" else s"${quote(col.columnName)} ASC" - }.toList.mkString(", ") - } + def clusterOrder: Seq[ClusteringOrder] = clusteringColumns.map(_.clusteringOrder) private[this] def appendOptions(stmt: String, opts: String) = if (stmt.contains("WITH") && opts.startsWith("WITH")) s"$stmt${Properties.lineSeparator}AND ${opts.substring(4)}" diff --git a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/cql/TableDefSpec.scala b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/cql/TableDefSpec.scala index 43078aa8d..6e8a38121 100644 --- a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/cql/TableDefSpec.scala +++ b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/cql/TableDefSpec.scala @@ -58,27 +58,6 @@ class TableDefSpec extends WordSpec with Matchers { ) } - "it contains clustering columns with order and override order using TableDef" in { - val column1 = ColumnDef("c1", PartitionKeyColumn, IntType) - val column2 = ColumnDef("c2", PartitionKeyColumn, VarCharType) - val column3 = ColumnDef("c3", ClusteringColumn(0), VarCharType, ClusteringOrder.DESC) - val column4 = ColumnDef("c4", ClusteringColumn(1), VarCharType) - val column5 = ColumnDef("c5", RegularColumn, VarCharType) - val tableDef = TableDef("keyspace", "table", Seq(column1, column2), Seq(column3, column4), Seq(column5) - ,clusteringOrder=Option(List(ClusteringOrder.ASC,ClusteringOrder.DESC))) - tableDef.cql should be( - """CREATE TABLE "keyspace"."table" ( - | "c1" int, - | "c2" varchar, - | "c3" varchar, - | "c4" varchar, - | "c5" varchar, - | PRIMARY KEY (("c1", "c2"), "c3", "c4") - |) - |WITH CLUSTERING ORDER BY ("c3" ASC, "c4" DESC)""".stripMargin - ) - } - "it contains compound partition key and multiple clustering columns" in { val column1 = ColumnDef("c1", PartitionKeyColumn, IntType) val column2 = ColumnDef("c2", PartitionKeyColumn, VarCharType) From 697dd0e9cd8ec36c69e19c6eb3156dbddb350c5f Mon Sep 17 00:00:00 2001 From: kaushal Date: Sun, 21 Aug 2016 20:20:25 +0530 Subject: [PATCH 5/7] Changed Options String to Seq[String] --- .../datastax/spark/connector/cql/Schema.scala | 22 ++++--------- .../spark/connector/cql/TableDefSpec.scala | 33 +++++++++++++++++++ 2 files changed, 40 insertions(+), 15 deletions(-) diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/Schema.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/Schema.scala index 82b3089ec..9217f5aec 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/Schema.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/Schema.scala @@ -151,11 +151,12 @@ case class TableDef( regularColumns: Seq[ColumnDef], indexes: Seq[IndexDef] = Seq.empty, isView: Boolean = false, - options: String = "") extends StructDef { + options: Seq[String] = Seq.empty) extends StructDef { require(partitionKey.forall(_.isPartitionKeyColumn), "All partition key columns must have role PartitionKeyColumn") require(clusteringColumns.forall(_.isClusteringColumn), "All clustering columns must have role ClusteringColumn") require(regularColumns.forall(!_.isPrimaryKeyColumn), "Regular columns cannot have role PrimaryKeyColumn") + require(options.forall( option => !(option.toLowerCase.contains("and") && !(option.toLowerCase.contains("with")))), "Table options must not contain WITH OR AND") val allColumns = regularColumns ++ clusteringColumns ++ partitionKey @@ -202,21 +203,12 @@ case class TableDef( | $columnList, | PRIMARY KEY ($primaryKeyClause) |)""".stripMargin - val ordered = if (clusteringColumns.size > 0) - s"$stmt${Properties.lineSeparator}WITH CLUSTERING ORDER BY (${clusteringColumnOrder(clusteringColumns)})" - else stmt - appendOptions(ordered, options) - } - private[this] def clusteringColumnOrder(clusteringColumns: Seq[ColumnDef]): String = - clusteringColumns.map { col => s"${quote(col.columnName)} ${col.clusteringOrder}"}.mkString(", ") - - def clusterOrder: Seq[ClusteringOrder] = clusteringColumns.map(_.clusteringOrder) + val ordered = clusteringColumns.map( col => s"${quote(col.columnName)} ${col.clusteringOrder}") + .mkString("CLUSTERING ORDER BY (", ", ",")") - private[this] def appendOptions(stmt: String, opts: String) = - if (stmt.contains("WITH") && opts.startsWith("WITH")) s"$stmt${Properties.lineSeparator}AND ${opts.substring(4)}" - else if (!stmt.contains("WITH") && opts.startsWith("AND")) s"WITH ${opts.substring(3)}" - else if (opts == "") s"$stmt" - else s"$stmt${Properties.lineSeparator}$opts" + val orderWithOptions:Seq[String] = if (clusteringColumns.size > 0) options.+:(ordered) else options + if (orderWithOptions.size > 0) s"""$stmt${Properties.lineSeparator}WITH ${orderWithOptions.mkString(s"${Properties.lineSeparator} AND ")}""" else stmt + } type ValueRepr = CassandraRow diff --git a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/cql/TableDefSpec.scala b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/cql/TableDefSpec.scala index 6e8a38121..ef326ebb0 100644 --- a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/cql/TableDefSpec.scala +++ b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/cql/TableDefSpec.scala @@ -90,6 +90,39 @@ class TableDefSpec extends WordSpec with Matchers { |)""".stripMargin ) } + + "it contains options" in { + val column1 = ColumnDef("c1", PartitionKeyColumn, IntType) + val column2 = ColumnDef("c2", RegularColumn, VarCharType) + val column3 = ColumnDef("c3", RegularColumn, VarCharType) + val tableDef = TableDef("keyspace", "table", Seq(column1), Seq.empty, Seq(column2,column3),options=Seq("bloom_filter_fp_chance = 0.01")) + tableDef.cql should be( + """CREATE TABLE "keyspace"."table" ( + | "c1" int, + | "c2" varchar, + | "c3" varchar, + | PRIMARY KEY (("c1")) + |) + |WITH bloom_filter_fp_chance = 0.01""".stripMargin + ) + } + + "it contains clustering column and options" in { + val column1 = ColumnDef("c1", PartitionKeyColumn, IntType) + val column2 = ColumnDef("c2", ClusteringColumn(0), VarCharType) + val column3 = ColumnDef("c3", RegularColumn, VarCharType) + val tableDef = TableDef("keyspace", "table", Seq(column1), Seq(column2), Seq(column3),options=Seq("bloom_filter_fp_chance = 0.01")) + tableDef.cql should be( + """CREATE TABLE "keyspace"."table" ( + | "c1" int, + | "c2" varchar, + | "c3" varchar, + | PRIMARY KEY (("c1"), "c2") + |) + |WITH CLUSTERING ORDER BY ("c2" ASC) + | AND bloom_filter_fp_chance = 0.01""".stripMargin + ) + } } } } From 0aa10be0e658d06011698eeead23bc74879392f6 Mon Sep 17 00:00:00 2001 From: kaushal Date: Wed, 19 Oct 2016 20:02:24 +0530 Subject: [PATCH 6/7] Changed Options to tableOptions and its type to Map[String,String]. --- .../datastax/spark/connector/cql/Schema.scala | 18 +++++++++++++----- .../spark/connector/cql/TableDefSpec.scala | 4 ++-- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/Schema.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/Schema.scala index 9217f5aec..251d66b94 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/Schema.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/Schema.scala @@ -151,12 +151,11 @@ case class TableDef( regularColumns: Seq[ColumnDef], indexes: Seq[IndexDef] = Seq.empty, isView: Boolean = false, - options: Seq[String] = Seq.empty) extends StructDef { + tableOptions: Map[String,String] = Map.empty) extends StructDef { require(partitionKey.forall(_.isPartitionKeyColumn), "All partition key columns must have role PartitionKeyColumn") require(clusteringColumns.forall(_.isClusteringColumn), "All clustering columns must have role ClusteringColumn") require(regularColumns.forall(!_.isPrimaryKeyColumn), "Regular columns cannot have role PrimaryKeyColumn") - require(options.forall( option => !(option.toLowerCase.contains("and") && !(option.toLowerCase.contains("with")))), "Table options must not contain WITH OR AND") val allColumns = regularColumns ++ clusteringColumns ++ partitionKey @@ -203,11 +202,20 @@ case class TableDef( | $columnList, | PRIMARY KEY ($primaryKeyClause) |)""".stripMargin - val ordered = clusteringColumns.map( col => s"${quote(col.columnName)} ${col.clusteringOrder}") + + val clusteringOrderingClause = clusteringColumns.map( col => s"${quote(col.columnName)} ${col.clusteringOrder}") .mkString("CLUSTERING ORDER BY (", ", ",")") - val orderWithOptions:Seq[String] = if (clusteringColumns.size > 0) options.+:(ordered) else options - if (orderWithOptions.size > 0) s"""$stmt${Properties.lineSeparator}WITH ${orderWithOptions.mkString(s"${Properties.lineSeparator} AND ")}""" else stmt + val tableOptionsString:Seq[String] = tableOptions.map(option => s"${option._1} = ${option._2}").toSeq + + val tableOptionsClause:Seq[String] = if (clusteringColumns.size > 0) + tableOptionsString.+:(clusteringOrderingClause) + else tableOptionsString + + if (tableOptionsClause.size > 0) + s"""$stmt${Properties.lineSeparator}WITH ${tableOptionsClause.mkString(s"${Properties.lineSeparator} AND ")}""" + else + stmt } type ValueRepr = CassandraRow diff --git a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/cql/TableDefSpec.scala b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/cql/TableDefSpec.scala index ef326ebb0..4110c851e 100644 --- a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/cql/TableDefSpec.scala +++ b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/cql/TableDefSpec.scala @@ -95,7 +95,7 @@ class TableDefSpec extends WordSpec with Matchers { val column1 = ColumnDef("c1", PartitionKeyColumn, IntType) val column2 = ColumnDef("c2", RegularColumn, VarCharType) val column3 = ColumnDef("c3", RegularColumn, VarCharType) - val tableDef = TableDef("keyspace", "table", Seq(column1), Seq.empty, Seq(column2,column3),options=Seq("bloom_filter_fp_chance = 0.01")) + val tableDef = TableDef("keyspace", "table", Seq(column1), Seq.empty, Seq(column2,column3),tableOptions=Map("bloom_filter_fp_chance" -> "0.01")) tableDef.cql should be( """CREATE TABLE "keyspace"."table" ( | "c1" int, @@ -111,7 +111,7 @@ class TableDefSpec extends WordSpec with Matchers { val column1 = ColumnDef("c1", PartitionKeyColumn, IntType) val column2 = ColumnDef("c2", ClusteringColumn(0), VarCharType) val column3 = ColumnDef("c3", RegularColumn, VarCharType) - val tableDef = TableDef("keyspace", "table", Seq(column1), Seq(column2), Seq(column3),options=Seq("bloom_filter_fp_chance = 0.01")) + val tableDef = TableDef("keyspace", "table", Seq(column1), Seq(column2), Seq(column3),tableOptions=Map("bloom_filter_fp_chance" -> "0.01")) tableDef.cql should be( """CREATE TABLE "keyspace"."table" ( | "c1" int, From dfa03829771f003a0e9c6eef18418676b17b95bb Mon Sep 17 00:00:00 2001 From: kaushal Date: Thu, 27 Oct 2016 11:40:49 +0530 Subject: [PATCH 7/7] Changed Options to tableOptions and its type to Map[String,String]. --- .../scala/com/datastax/spark/connector/cql/TableDefSpec.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/cql/TableDefSpec.scala b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/cql/TableDefSpec.scala index 4110c851e..f644392d2 100644 --- a/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/cql/TableDefSpec.scala +++ b/spark-cassandra-connector/src/test/scala/com/datastax/spark/connector/cql/TableDefSpec.scala @@ -95,7 +95,8 @@ class TableDefSpec extends WordSpec with Matchers { val column1 = ColumnDef("c1", PartitionKeyColumn, IntType) val column2 = ColumnDef("c2", RegularColumn, VarCharType) val column3 = ColumnDef("c3", RegularColumn, VarCharType) - val tableDef = TableDef("keyspace", "table", Seq(column1), Seq.empty, Seq(column2,column3),tableOptions=Map("bloom_filter_fp_chance" -> "0.01")) + val tableDef = TableDef("keyspace", "table", Seq(column1), Seq.empty, Seq(column2,column3), + tableOptions=Map("bloom_filter_fp_chance" -> "0.01")) tableDef.cql should be( """CREATE TABLE "keyspace"."table" ( | "c1" int,