diff --git a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraClusteringPredicatesExtractor.java b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraClusteringPredicatesExtractor.java index 000a49b478df..bf886c5518ec 100644 --- a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraClusteringPredicatesExtractor.java +++ b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraClusteringPredicatesExtractor.java @@ -53,11 +53,20 @@ public TupleDomain getUnenforcedConstraints() return predicates.filter((columnHandle, domain) -> !clusteringPushDownResult.hasBeenFullyPushed(columnHandle)); } + public boolean includesAllClusteringColumnsAndHasAllEqPredicates() + { + return clusteringPushDownResult.includesAllClusteringColumnsAndHasAllEqPredicates; + } + private ClusteringPushDownResult getClusteringKeysSet(List clusteringColumns, TupleDomain predicates) { ImmutableSet.Builder fullyPushedColumnPredicates = ImmutableSet.builder(); ImmutableList.Builder clusteringColumnSql = ImmutableList.builder(); + int eqPredicatesCount = 0; for (CassandraColumnHandle columnHandle : clusteringColumns) { + if (predicates.getDomains().isEmpty()) { + break; + } Domain domain = predicates.getDomains().get().get(columnHandle); if (domain == null) { break; @@ -109,10 +118,13 @@ private ClusteringPushDownResult getClusteringKeysSet(List") || predicateString.contains("<")) { break; } + else { + eqPredicatesCount++; + } } List clusteringColumnPredicates = clusteringColumnSql.build(); - return new ClusteringPushDownResult(fullyPushedColumnPredicates.build(), Joiner.on(" AND ").join(clusteringColumnPredicates)); + return new ClusteringPushDownResult(fullyPushedColumnPredicates.build(), Joiner.on(" AND ").join(clusteringColumnPredicates), eqPredicatesCount == clusteringColumns.size()); } private String toCqlLiteral(CassandraColumnHandle columnHandle, Object value) @@ -163,7 +175,7 @@ private String translateRangeIntoCql(CassandraColumnHandle columnHandle, Range r return upperBoundPredicate; } - private record ClusteringPushDownResult(Set fullyPushedColumnPredicates, String domainQuery) + private record ClusteringPushDownResult(Set fullyPushedColumnPredicates, String domainQuery, boolean includesAllClusteringColumnsAndHasAllEqPredicates) { private ClusteringPushDownResult { diff --git a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraMetadata.java b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraMetadata.java index 2422e0783d02..6decd6ad09b1 100644 --- a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraMetadata.java +++ b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraMetadata.java @@ -42,11 +42,13 @@ import io.trino.spi.connector.RelationColumnsMetadata; import io.trino.spi.connector.RelationCommentMetadata; import io.trino.spi.connector.RetryMode; +import io.trino.spi.connector.RowChangeParadigm; import io.trino.spi.connector.SaveMode; import io.trino.spi.connector.SchemaNotFoundException; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.TableFunctionApplicationResult; import io.trino.spi.connector.TableNotFoundException; +import io.trino.spi.expression.Constant; import io.trino.spi.function.table.ConnectorTableFunctionHandle; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.statistics.ComputedStatistics; @@ -78,6 +80,7 @@ import static io.trino.spi.connector.RelationColumnsMetadata.forTable; import static io.trino.spi.connector.RelationCommentMetadata.forRelation; import static io.trino.spi.connector.RetryMode.NO_RETRIES; +import static io.trino.spi.connector.RowChangeParadigm.CHANGE_ONLY_UPDATED_COLUMNS; import static io.trino.spi.connector.SaveMode.REPLACE; import static java.lang.String.format; import static java.util.Locale.ENGLISH; @@ -285,6 +288,7 @@ public Optional> applyFilter(C String clusteringKeyPredicates = ""; TupleDomain unenforcedConstraint; + Boolean includesAllClusteringColumnsAndHasAllEqPredicates = null; if (partitionResult.unpartitioned() || partitionResult.indexedColumnPredicatePushdown()) { // When the filter is missing at least one of the partition keys or when the table is not partitioned, // use the raw unenforced constraint of the partitionResult @@ -297,6 +301,7 @@ public Optional> applyFilter(C partitionResult.unenforcedConstraint()); clusteringKeyPredicates = clusteringPredicatesExtractor.getClusteringKeyPredicates(); unenforcedConstraint = clusteringPredicatesExtractor.getUnenforcedConstraints(); + includesAllClusteringColumnsAndHasAllEqPredicates = clusteringPredicatesExtractor.includesAllClusteringColumnsAndHasAllEqPredicates(); } Optional> currentPartitions = handle.getPartitions(); @@ -313,7 +318,8 @@ public Optional> applyFilter(C handle.getTableName(), Optional.of(partitionResult.partitions()), // TODO this should probably be AND-ed with handle.getClusteringKeyPredicates() - clusteringKeyPredicates)), + clusteringKeyPredicates, + includesAllClusteringColumnsAndHasAllEqPredicates)), unenforcedConstraint, constraint.getExpression(), false)); @@ -470,39 +476,103 @@ private static boolean isHiddenIdColumn(CassandraColumnHandle columnHandle) } @Override - public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorTableHandle tableHandle, Map> updateCaseColumns, RetryMode retryMode) + public RowChangeParadigm getRowChangeParadigm(ConnectorSession session, ConnectorTableHandle tableHandle) { - throw new TrinoException(NOT_SUPPORTED, "Delete without primary key or partition key is not supported"); + return CHANGE_ONLY_UPDATED_COLUMNS; } + /** + * Attempt to push down an update operation into the connector. If a connector + * can execute an update for the table handle on its own, it should return a + * table handle, which will be passed back to {@link #executeUpdate} during + * query executing to actually execute the update. + */ @Override - public ColumnHandle getMergeRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle) + public Optional applyUpdate(ConnectorSession session, ConnectorTableHandle tableHandle, Map assignments) { - return new CassandraColumnHandle("$update_row_id", 0, CassandraTypes.TEXT, false, false, false, true); + CassandraNamedRelationHandle table = ((CassandraTableHandle) tableHandle).getRequiredNamedRelation(); + if (cassandraSession.isMaterializedView(table.getSchemaTableName())) { + throw new TrinoException(NOT_SUPPORTED, "Updating materialized views not yet supported"); + } + + CassandraNamedRelationHandle handle = ((CassandraTableHandle) tableHandle).getRequiredNamedRelation(); + List partitions = handle.getPartitions() + .orElseThrow(() -> new TrinoException(NOT_SUPPORTED, "Updating without partition key is not supported")); + if (partitions.isEmpty()) { + // there are no records of a given partition key + throw new TrinoException(NOT_SUPPORTED, "Updating without partition key is not supported"); + } + if (!handle.getIncludesAllClusteringColumnsAndHasAllEqPredicates()) { + throw new TrinoException(NOT_SUPPORTED, "Updating without all clustering keys or with non-eq predicates is not supported"); + } + + Map assignmentsMap = new HashMap<>(); + for (Map.Entry entry : assignments.entrySet()) { + CassandraColumnHandle column = (CassandraColumnHandle) entry.getKey(); + if (isHiddenIdColumn(column)) { + throw new TrinoException(NOT_SUPPORTED, "Updating the hidden id column is not supported"); + } + Object value = entry.getValue().getValue(); + if (value == null) { + throw new TrinoException(NOT_SUPPORTED, "Updating columns to null is not supported"); + } + String cqlLiteral = cassandraTypeManager.toCqlLiteral(column.cassandraType(), value); // validate that the value can be converted to the cassandra type + assignmentsMap.put(column.name(), cqlLiteral); + } + + return Optional.of(new CassandraUpdateTableHandle(tableHandle, assignmentsMap)); } + /** + * Execute the update operation on the handle returned from {@link #applyUpdate}. + */ @Override - public Optional applyDelete(ConnectorSession session, ConnectorTableHandle handle) + public OptionalLong executeUpdate(ConnectorSession session, ConnectorTableHandle tableHandle) { - return Optional.of(handle); + CassandraUpdateTableHandle updateTableHandle = ((CassandraUpdateTableHandle) tableHandle); + CassandraTableHandle cassandraTableHandle = (CassandraTableHandle) updateTableHandle.tableHandle(); + CassandraNamedRelationHandle namedRelationHandle = cassandraTableHandle.getRequiredNamedRelation(); + for (String cql : CassandraCqlUtils.getUpdateQueries(namedRelationHandle, updateTableHandle.assignments())) { + cassandraSession.execute(cql); + } + return OptionalLong.empty(); } @Override - public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle deleteHandle) + public ColumnHandle getMergeRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle) + { + return new CassandraColumnHandle("$update_row_id", 0, CassandraTypes.TEXT, false, false, false, true); + } + + @Override + public Optional applyDelete(ConnectorSession session, ConnectorTableHandle deleteHandle) { CassandraNamedRelationHandle handle = ((CassandraTableHandle) deleteHandle).getRequiredNamedRelation(); List partitions = handle.getPartitions() .orElseThrow(() -> new TrinoException(NOT_SUPPORTED, "Deleting without partition key is not supported")); if (partitions.isEmpty()) { // there are no records of a given partition key - return OptionalLong.empty(); + return Optional.empty(); } + return Optional.of(deleteHandle); + } + + @Override + public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle deleteHandle) + { + CassandraNamedRelationHandle handle = ((CassandraTableHandle) deleteHandle).getRequiredNamedRelation(); for (String cql : CassandraCqlUtils.getDeleteQueries(handle)) { cassandraSession.execute(cql); } return OptionalLong.empty(); } + @Override + public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorTableHandle tableHandle, Map> updateCaseColumns, RetryMode retryMode) + { + throw new TrinoException(NOT_SUPPORTED, "This connector does not support modifying table rows"); + } + @Override public Optional> applyTableFunction(ConnectorSession session, ConnectorTableFunctionHandle handle) { diff --git a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraNamedRelationHandle.java b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraNamedRelationHandle.java index 943621d4572a..c63d2d3a497c 100644 --- a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraNamedRelationHandle.java +++ b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraNamedRelationHandle.java @@ -34,10 +34,11 @@ public class CassandraNamedRelationHandle private final String tableName; private final Optional> partitions; private final String clusteringKeyPredicates; + private final Boolean includesAllClusteringColumnsAndHasAllEqPredicates; public CassandraNamedRelationHandle(String schemaName, String tableName) { - this(schemaName, tableName, Optional.empty(), ""); + this(schemaName, tableName, Optional.empty(), "", null); } @JsonCreator @@ -45,12 +46,14 @@ public CassandraNamedRelationHandle( @JsonProperty("schemaName") String schemaName, @JsonProperty("tableName") String tableName, @JsonProperty("partitions") Optional> partitions, - @JsonProperty("clusteringKeyPredicates") String clusteringKeyPredicates) + @JsonProperty("clusteringKeyPredicates") String clusteringKeyPredicates, + @JsonProperty("includesAllClusteringColumnsAndHasAllEqPredicates") Boolean includesAllClusteringColumnsAndHasAllEqPredicates) { this.schemaName = requireNonNull(schemaName, "schemaName is null"); this.tableName = requireNonNull(tableName, "tableName is null"); this.partitions = partitions.map(ImmutableList::copyOf); this.clusteringKeyPredicates = requireNonNull(clusteringKeyPredicates, "clusteringKeyPredicates is null"); + this.includesAllClusteringColumnsAndHasAllEqPredicates = includesAllClusteringColumnsAndHasAllEqPredicates; } @JsonProperty @@ -77,6 +80,12 @@ public String getClusteringKeyPredicates() return clusteringKeyPredicates; } + @JsonProperty + public Boolean getIncludesAllClusteringColumnsAndHasAllEqPredicates() + { + return includesAllClusteringColumnsAndHasAllEqPredicates; + } + public SchemaTableName getSchemaTableName() { return new SchemaTableName(schemaName, tableName); @@ -101,7 +110,8 @@ public boolean equals(Object obj) return Objects.equals(this.schemaName, other.schemaName) && Objects.equals(this.tableName, other.tableName) && Objects.equals(this.partitions, other.partitions) && - Objects.equals(this.clusteringKeyPredicates, other.clusteringKeyPredicates); + Objects.equals(this.clusteringKeyPredicates, other.clusteringKeyPredicates) && + Objects.equals(this.includesAllClusteringColumnsAndHasAllEqPredicates, other.includesAllClusteringColumnsAndHasAllEqPredicates); } @Override @@ -122,6 +132,9 @@ public String toString() if (!clusteringKeyPredicates.isEmpty()) { result += format(" constraint(%s)", clusteringKeyPredicates); } + if (includesAllClusteringColumnsAndHasAllEqPredicates != null) { + result += format(" includesAllClusteringColumnsAndHasAllEqPredicates(%s)", includesAllClusteringColumnsAndHasAllEqPredicates); + } return result; } } diff --git a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraUpdateTableHandle.java b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraUpdateTableHandle.java new file mode 100644 index 000000000000..f27d17a6b05d --- /dev/null +++ b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraUpdateTableHandle.java @@ -0,0 +1,38 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.cassandra; + +import io.trino.spi.connector.ConnectorTableHandle; + +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +public record CassandraUpdateTableHandle( + ConnectorTableHandle tableHandle, + Map assignments) + implements ConnectorTableHandle +{ + public CassandraUpdateTableHandle + { + requireNonNull(tableHandle, "tableHandle is null"); + requireNonNull(assignments, "assignments is null"); + } + + @Override + public String toString() + { + return "cassandra:" + tableHandle; + } +} diff --git a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/util/CassandraCqlUtils.java b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/util/CassandraCqlUtils.java index beb55fbfd0ee..026a112a77ab 100644 --- a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/util/CassandraCqlUtils.java +++ b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/util/CassandraCqlUtils.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import static com.datastax.oss.driver.internal.core.util.Strings.doubleQuote; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -113,6 +114,33 @@ private static String getWhereCondition(String partition, String clusteringKeyPr return String.join(" AND ", conditions); } + private static String getSetStatements(Map assignments) + { + if (!assignments.isEmpty()) { + List setStatements = new ArrayList<>(); + for (Map.Entry entry : assignments.entrySet()) { + setStatements.add(format("%s = %s", validColumnName(entry.getKey()), entry.getValue())); + } + return String.join(" AND ", setStatements); + } + return ""; + } + + private static String update(String schemaName, String tableName, CassandraPartition partition, String clusteringKeyPredicates, Map assignments) + { + return format("UPDATE \"%s\".\"%s\" SET %s WHERE %s", + schemaName, tableName, getSetStatements(assignments), getWhereCondition(partition.getPartitionId(), clusteringKeyPredicates)); + } + + public static List getUpdateQueries(CassandraNamedRelationHandle handle, Map assignments) + { + ImmutableList.Builder queries = ImmutableList.builder(); + for (CassandraPartition partition : handle.getPartitions().orElse(ImmutableList.of())) { + queries.add(update(handle.getSchemaName(), handle.getTableName(), partition, handle.getClusteringKeyPredicates(), assignments)); + } + return queries.build(); + } + private static String deleteFrom(String schemaName, String tableName, CassandraPartition partition, String clusteringKeyPredicates) { return format("DELETE FROM \"%s\".\"%s\" WHERE %s", diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnector.java b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnector.java index d1a05ba43934..0264abe1bbda 100644 --- a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnector.java +++ b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnector.java @@ -462,7 +462,7 @@ private static Map indexColumns(List columnHandle private CassandraTableHandle getTableHandle(Optional> partitions, String clusteringKeyPredicates) { CassandraNamedRelationHandle handle = ((CassandraTableHandle) getTableHandle(tableForDelete)).getRequiredNamedRelation(); - return new CassandraTableHandle(new CassandraNamedRelationHandle(handle.getSchemaName(), handle.getTableName(), partitions, clusteringKeyPredicates)); + return new CassandraTableHandle(new CassandraNamedRelationHandle(handle.getSchemaName(), handle.getTableName(), partitions, clusteringKeyPredicates, null)); } private CassandraPartition createPartition(long value1, long value2) diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnectorTest.java b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnectorTest.java index 9b20f64933e3..466dbac5e8e7 100644 --- a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnectorTest.java +++ b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnectorTest.java @@ -102,8 +102,7 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) SUPPORTS_RENAME_TABLE, SUPPORTS_ROW_TYPE, SUPPORTS_SET_COLUMN_TYPE, - SUPPORTS_TOPN_PUSHDOWN, - SUPPORTS_UPDATE -> false; + SUPPORTS_TOPN_PUSHDOWN -> false; default -> super.hasBehavior(connectorBehavior); }; } @@ -1523,6 +1522,143 @@ public void testAllTypesInsert() } } + @Test + @Override + public void testUpdate() + { + try (TestCassandraTable testCassandraTable = testTable( + "table_update_data", + ImmutableList.of( + partitionColumn("partition_one", "bigint"), + partitionColumn("partition_two", "int"), + clusterColumn("clust_one", "text"), + clusterColumn("clust_two", "text"), + generalColumn("data", "text")), + ImmutableList.of( + "1, 1, 'clust_one_1', 'clust_two_1', null", + "2, 2, 'clust_one_2', 'clust_two_2', null", + "3, 3, 'clust_one_3', 'clust_two_3', null", + "4, 4, 'clust_one_4', 'clust_two_4', null", + "5, 5, 'clust_one_5', 'clust_two_5', null", + "6, 6, 'clust_one_6', 'clust_two_6', null", + "7, 7, 'clust_one_7', 'clust_two_7', null", + "8, 8, 'clust_one_8', 'clust_two_8', null", + "9, 9, 'clust_one_9', 'clust_two_9', null", + "1, 1, 'clust_one_1', 'clust_two_2', null", + "1, 1, 'clust_one_2', 'clust_two_1', null", + "1, 1, 'clust_one_2', 'clust_two_2', null", + "1, 2, 'clust_one_2', 'clust_two_1', null", + "1, 2, 'clust_one_2', 'clust_two_2', null", + "2, 2, 'clust_one_1', 'clust_two_1', null"))) { + String keyspaceAndTable = testCassandraTable.getTableName(); + assertThat(computeActual("SELECT * FROM " + keyspaceAndTable).getRowCount()).isEqualTo(15); + + // error + assertThat(query("UPDATE " + keyspaceAndTable + " SET data='new_data'")) + .failure().hasMessage("Updating without partition key is not supported"); + assertThat(computeActual("SELECT * FROM " + keyspaceAndTable + " WHERE data IS NULL").getRowCount()).isEqualTo(15); + + String wherePartialPartitionKey = " WHERE partition_one=1 AND clust_one='clust_one_1' AND clust_two>='clust_two_1'"; + assertThat(query("UPDATE " + keyspaceAndTable + " SET data='new_data'" + wherePartialPartitionKey)) + .failure().hasMessage("This connector does not support modifying table rows"); + assertThat(computeActual("SELECT * FROM " + keyspaceAndTable + " WHERE data IS NULL").getRowCount()).isEqualTo(15); + + String whereInvalidRangePartitionKey = " WHERE partition_one=3 AND partition_two>=3 AND clust_one='clust_one_2' AND clust_two='clust_two_3'"; + assertThat(query("UPDATE " + keyspaceAndTable + " SET data='new_data'" + whereInvalidRangePartitionKey)) + .failure().hasMessage("This connector does not support modifying table rows"); + assertThat(computeActual("SELECT * FROM " + keyspaceAndTable + " WHERE data IS NULL").getRowCount()).isEqualTo(15); + + String whereClusteringKeyOnly = " WHERE clust_one='clust_one_2'"; + assertThat(query("UPDATE " + keyspaceAndTable + " SET data='new_data'" + whereClusteringKeyOnly)) + .failure().hasMessage("This connector does not support modifying table rows"); + assertThat(computeActual("SELECT * FROM " + keyspaceAndTable + " WHERE data IS NULL").getRowCount()).isEqualTo(15); + + String whereIncompleteClusteringKey = " WHERE partition_one=3 AND partition_two=3 AND clust_one='clust_one_2'"; + assertThat(query("UPDATE " + keyspaceAndTable + " SET data='new_data'" + whereIncompleteClusteringKey)) + .failure().hasMessage("Updating without all clustering keys or with non-eq predicates is not supported"); + assertThat(computeActual("SELECT * FROM " + keyspaceAndTable + " WHERE data IS NULL").getRowCount()).isEqualTo(15); + + String whereInvalidRangeClusteringKey = " WHERE partition_one=3 AND partition_two=3 AND clust_one>='clust_one_2' AND clust_two='clust_two_3'"; + assertThat(query("UPDATE " + keyspaceAndTable + " SET data='new_data'" + whereInvalidRangeClusteringKey)) + .failure().hasMessage("This connector does not support modifying table rows"); + assertThat(computeActual("SELECT * FROM " + keyspaceAndTable + " WHERE data IS NULL").getRowCount()).isEqualTo(15); + + String whereMultiplePartitionKeyWithClusteringKey = " WHERE " + + " (partition_one=1 AND partition_two=1 AND clust_one='clust_one_1') OR " + + " (partition_one=1 AND partition_two=2 AND clust_one='clust_one_2') "; + assertThat(query("UPDATE " + keyspaceAndTable + " SET data='new_data'" + whereMultiplePartitionKeyWithClusteringKey)) + .failure().hasMessage("This connector does not support modifying table rows"); + assertThat(computeActual("SELECT * FROM " + keyspaceAndTable).getRowCount()).isEqualTo(15); + + // success + String wherePrimaryKey = " WHERE partition_one=3 AND partition_two=3 AND clust_one='clust_one_3' AND clust_two='clust_two_3'"; + assertUpdate("UPDATE " + keyspaceAndTable + " SET data='new_data'" + wherePrimaryKey); + assertThat(computeActual("SELECT * FROM " + keyspaceAndTable + " WHERE data IS NULL").getRowCount()).isEqualTo(14); + assertThat(computeActual("SELECT * FROM " + keyspaceAndTable + " WHERE data='new_data'").getRowCount()).isEqualTo(1); + + String whereClusteringKeysUsingIn = " WHERE partition_one=1 AND partition_two=1 AND clust_one IN ('clust_one_1', 'clust_one_2') AND clust_two IN ('clust_two_1', 'clust_two_2')"; + assertUpdate("UPDATE " + keyspaceAndTable + " SET data='new_data2'" + whereClusteringKeysUsingIn); + assertThat(computeActual("SELECT * FROM " + keyspaceAndTable + " WHERE data IS NULL").getRowCount()).isEqualTo(10); + assertThat(computeActual("SELECT * FROM " + keyspaceAndTable + " WHERE data='new_data2'").getRowCount()).isEqualTo(4); + } + } + + @Test + @Override + public void testUpdateWithPredicates() + { + assertThatThrownBy(super::testUpdateWithPredicates) + .hasStackTraceContaining("This connector does not support modifying table rows"); + } + + @Test + @Override + public void testUpdateWithNullValues() + { + assertThatThrownBy(super::testUpdateWithNullValues) + .hasStackTraceContaining("This connector does not support modifying table rows"); + } + + @Test + @Override + public void testRowLevelUpdate() + { + assertThatThrownBy(super::testRowLevelUpdate) + .hasStackTraceContaining("This connector does not support modifying table rows"); + } + + @Test + @Override + public void testUpdateAllValues() + { + assertThatThrownBy(super::testUpdateAllValues) + .hasStackTraceContaining("This connector does not support modifying table rows"); + } + + @Test + @Override + public void testUpdateCaseSensitivity() + { + assertThatThrownBy(super::testUpdateCaseSensitivity) + .hasStackTraceContaining("This connector does not support modifying table rows"); + } + + @Test + @Override + public void testUpdateMultipleCondition() + { + assertThatThrownBy(super::testUpdateMultipleCondition) + .hasStackTraceContaining("This connector does not support modifying table rows"); + } + + @Test + @Override + public void testUpdateRowConcurrently() + { + assertThatThrownBy(super::testUpdateRowConcurrently) + .hasStackTraceContaining("This connector does not support modifying table rows"); + } + @Test @Override public void testDelete() @@ -1533,23 +1669,24 @@ public void testDelete() partitionColumn("partition_one", "bigint"), partitionColumn("partition_two", "int"), clusterColumn("clust_one", "text"), + clusterColumn("clust_two", "text"), generalColumn("data", "text")), ImmutableList.of( - "1, 1, 'clust_one_1', null", - "2, 2, 'clust_one_2', null", - "3, 3, 'clust_one_3', null", - "4, 4, 'clust_one_4', null", - "5, 5, 'clust_one_5', null", - "6, 6, 'clust_one_6', null", - "7, 7, 'clust_one_7', null", - "8, 8, 'clust_one_8', null", - "9, 9, 'clust_one_9', null", - "1, 1, 'clust_one_2', null", - "1, 1, 'clust_one_3', null", - "1, 2, 'clust_one_1', null", - "1, 2, 'clust_one_2', null", - "1, 2, 'clust_one_3', null", - "2, 2, 'clust_one_1', null"))) { + "1, 1, 'clust_one_1', 'clust_two_1', null", + "2, 2, 'clust_one_2', 'clust_two_2', null", + "3, 3, 'clust_one_3', 'clust_two_3', null", + "4, 4, 'clust_one_4', 'clust_two_4', null", + "5, 5, 'clust_one_5', 'clust_two_5', null", + "6, 6, 'clust_one_6', 'clust_two_6', null", + "7, 7, 'clust_one_7', 'clust_two_7', null", + "8, 8, 'clust_one_8', 'clust_two_8', null", + "9, 9, 'clust_one_9', 'clust_two_9', null", + "1, 1, 'clust_one_2', 'clust_two_2', null", + "1, 1, 'clust_one_3', 'clust_two_1', null", + "1, 2, 'clust_one_1', 'clust_two_2', null", + "1, 2, 'clust_one_2', 'clust_two_1', null", + "1, 2, 'clust_one_3', 'clust_two_2', null", + "2, 2, 'clust_one_1', 'clust_two_1', null"))) { String keyspaceAndTable = testCassandraTable.getTableName(); assertThat(computeActual("SELECT * FROM " + keyspaceAndTable).getRowCount()).isEqualTo(15); @@ -1558,20 +1695,35 @@ public void testDelete() .failure().hasMessage("Deleting without partition key is not supported"); assertThat(computeActual("SELECT * FROM " + keyspaceAndTable).getRowCount()).isEqualTo(15); + String wherePartialPartitionKey = " WHERE partition_one=1 AND clust_one='clust_one_1' AND clust_two='clust_two_1'"; + assertThat(query("DELETE FROM " + keyspaceAndTable + wherePartialPartitionKey)) + .failure().hasMessage("This connector does not support modifying table rows"); + assertThat(computeActual("SELECT * FROM " + keyspaceAndTable).getRowCount()).isEqualTo(15); + + String whereInvalidRangePartitionKey = " WHERE partition_one=3 AND partition_two>=3 AND clust_one='clust_one_2' AND clust_two='clust_two_3'"; + assertThat(query("DELETE FROM " + keyspaceAndTable + whereInvalidRangePartitionKey)) + .failure().hasMessage("This connector does not support modifying table rows"); + assertThat(computeActual("SELECT * FROM " + keyspaceAndTable + " WHERE data IS NULL").getRowCount()).isEqualTo(15); + + String whereInvalidClusteringKey = " WHERE partition_one=1 AND partition_two=1 AND clust_two='clust_two_1'"; + assertThat(query("DELETE FROM " + keyspaceAndTable + whereInvalidClusteringKey)) + .failure().hasMessage("This connector does not support modifying table rows"); + assertThat(computeActual("SELECT * FROM " + keyspaceAndTable).getRowCount()).isEqualTo(15); + String whereClusteringKeyOnly = " WHERE clust_one='clust_one_2'"; assertThat(query("DELETE FROM " + keyspaceAndTable + whereClusteringKeyOnly)) - .failure().hasMessage("Delete without primary key or partition key is not supported"); + .failure().hasMessage("This connector does not support modifying table rows"); assertThat(computeActual("SELECT * FROM " + keyspaceAndTable).getRowCount()).isEqualTo(15); String whereMultiplePartitionKeyWithClusteringKey = " WHERE " + " (partition_one=1 AND partition_two=1 AND clust_one='clust_one_1') OR " + " (partition_one=1 AND partition_two=2 AND clust_one='clust_one_2') "; assertThat(query("DELETE FROM " + keyspaceAndTable + whereMultiplePartitionKeyWithClusteringKey)) - .failure().hasMessage("Delete without primary key or partition key is not supported"); + .failure().hasMessage("This connector does not support modifying table rows"); assertThat(computeActual("SELECT * FROM " + keyspaceAndTable).getRowCount()).isEqualTo(15); // success - String wherePrimaryKey = " WHERE partition_one=3 AND partition_two=3 AND clust_one='clust_one_3'"; + String wherePrimaryKey = " WHERE partition_one=3 AND partition_two=3 AND clust_one='clust_one_3' AND clust_two='clust_two_3'"; assertUpdate("DELETE FROM " + keyspaceAndTable + wherePrimaryKey); assertThat(computeActual("SELECT * FROM " + keyspaceAndTable).getRowCount()).isEqualTo(14); assertThat(computeActual("SELECT * FROM " + keyspaceAndTable + wherePrimaryKey).getRowCount()).isEqualTo(0); @@ -1585,6 +1737,11 @@ public void testDelete() assertUpdate("DELETE FROM " + keyspaceAndTable + whereMultiplePartitionKey); assertThat(computeActual("SELECT * FROM " + keyspaceAndTable).getRowCount()).isEqualTo(9); assertThat(computeActual("SELECT * FROM " + keyspaceAndTable + whereMultiplePartitionKey).getRowCount()).isEqualTo(0); + + String whereClusteringKeysUsingIn = " WHERE partition_one=4 AND partition_two=4 AND clust_one IN ('clust_one_4', 'clust_one_5') AND clust_two IN ('clust_two_4', 'clust_two_5')"; + assertUpdate("DELETE FROM " + keyspaceAndTable + whereClusteringKeysUsingIn); + assertThat(computeActual("SELECT * FROM " + keyspaceAndTable).getRowCount()).isEqualTo(8); + assertThat(computeActual("SELECT * FROM " + keyspaceAndTable + whereClusteringKeysUsingIn).getRowCount()).isEqualTo(0); } } @@ -1593,7 +1750,7 @@ public void testDelete() public void testDeleteWithLike() { assertThatThrownBy(super::testDeleteWithLike) - .hasStackTraceContaining("Delete without primary key or partition key is not supported"); + .hasStackTraceContaining("This connector does not support modifying table rows"); } @Test @@ -1601,7 +1758,7 @@ public void testDeleteWithLike() public void testDeleteWithComplexPredicate() { assertThatThrownBy(super::testDeleteWithComplexPredicate) - .hasStackTraceContaining("Delete without primary key or partition key is not supported"); + .hasStackTraceContaining("This connector does not support modifying table rows"); } @Test @@ -1609,7 +1766,7 @@ public void testDeleteWithComplexPredicate() public void testDeleteWithSemiJoin() { assertThatThrownBy(super::testDeleteWithSemiJoin) - .hasStackTraceContaining("Delete without primary key or partition key is not supported"); + .hasStackTraceContaining("This connector does not support modifying table rows"); } @Test @@ -1617,7 +1774,7 @@ public void testDeleteWithSemiJoin() public void testDeleteWithSubquery() { assertThatThrownBy(super::testDeleteWithSubquery) - .hasStackTraceContaining("Delete without primary key or partition key is not supported"); + .hasStackTraceContaining("This connector does not support modifying table rows"); } @Test @@ -1625,7 +1782,7 @@ public void testDeleteWithSubquery() public void testExplainAnalyzeWithDeleteWithSubquery() { assertThatThrownBy(super::testExplainAnalyzeWithDeleteWithSubquery) - .hasStackTraceContaining("Delete without primary key or partition key is not supported"); + .hasStackTraceContaining("This connector does not support modifying table rows"); } @Test @@ -1633,7 +1790,7 @@ public void testExplainAnalyzeWithDeleteWithSubquery() public void testDeleteWithVarcharPredicate() { assertThatThrownBy(super::testDeleteWithVarcharPredicate) - .hasStackTraceContaining("Delete without primary key or partition key is not supported"); + .hasStackTraceContaining("This connector does not support modifying table rows"); } @Test @@ -1649,7 +1806,7 @@ public void testDeleteAllDataFromTable() public void testRowLevelDelete() { assertThatThrownBy(super::testRowLevelDelete) - .hasStackTraceContaining("Delete without primary key or partition key is not supported"); + .hasStackTraceContaining("This connector does not support modifying table rows"); } // test polymorphic table function diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraSplitManager.java b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraSplitManager.java index 7378fd0eb251..438c9b7740b4 100644 --- a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraSplitManager.java +++ b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraSplitManager.java @@ -94,7 +94,7 @@ PRIMARY KEY(partition_key, clustering_key)) CassandraSplitManager splitManager = new CassandraSplitManager(config, session, null, partitionManager, CASSANDRA_TYPE_MANAGER); CassandraTableHandle tableHandle = new CassandraTableHandle( - new CassandraNamedRelationHandle(KEYSPACE, tableName, Optional.of(partitions.build()), "")); + new CassandraNamedRelationHandle(KEYSPACE, tableName, Optional.of(partitions.build()), "", null)); try (ConnectorSplitSource splitSource = splitManager.getSplits(null, null, tableHandle, null, null)) { List splits = splitSource.getNextBatch(100).get().getSplits(); assertThat(splits).hasSize(2); diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestJsonCassandraHandles.java b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestJsonCassandraHandles.java index cdf85a119d2f..ced865a2497b 100644 --- a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestJsonCassandraHandles.java +++ b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestJsonCassandraHandles.java @@ -109,7 +109,7 @@ public void testTableHandleSerialize() public void testTable2HandleSerialize() throws Exception { - CassandraTableHandle tableHandle = new CassandraTableHandle(new CassandraNamedRelationHandle("cassandra_schema", "cassandra_table", PARTITIONS, "clusteringKey1 = 33")); + CassandraTableHandle tableHandle = new CassandraTableHandle(new CassandraNamedRelationHandle("cassandra_schema", "cassandra_table", PARTITIONS, "clusteringKey1 = 33", null)); String json = OBJECT_MAPPER.writeValueAsString(tableHandle); testJsonEquals(json, TABLE2_HANDLE_AS_MAP); }