From cc6ec71e31330bece75cb5b9b24c3e28f4be5b77 Mon Sep 17 00:00:00 2001 From: Andrew Tolbert Date: Fri, 24 Feb 2017 14:39:49 -0600 Subject: [PATCH] [#186] Add Serial Consistency Level and Timestamp to Activity Log (#187) * [#186] Add Serial Consistency Level and Timestamp to Activity Log Updates recorded queries in the ActivityLog to include Serial Consistency Level and Timestamp if present in queries. If the values are not present, all the client implementations return null to indicate that they weren't set. Also fixes some bugs in the codec module: - Use of Notations.consistency in favor of Consistency.codec caused NullPointerExceptions due to order of loading. Use Consistency.codec instead in both Query and Batch. - Bitmasking in BatchFlags was offset incorrectly. * Fix version (1.1.2 was never released, so reuse) --- .../scala/org/scassandra/codec/Message.scala | 2 +- .../org/scassandra/codec/messages/Batch.scala | 5 +- .../org/scassandra/codec/messages/Query.scala | 2 +- gradle.properties | 2 +- .../http/client/BatchExecution.java | 44 ++++++++-- .../client/PreparedStatementExecution.java | 61 ++++++++++---- .../org/scassandra/http/client/Query.java | 80 ++++++++++++++----- .../http/client/ActivityClientTest.java | 66 +++++++++++++++ .../java/cassandra/CassandraExecutor21.java | 1 + .../java/cassandra/CassandraExecutor30.java | 1 + .../server/actors/BatchHandler.scala | 2 +- .../server/actors/ExecuteHandler.scala | 9 ++- .../server/actors/QueryHandler.scala | 6 +- .../server/priming/ActivityLog.scala | 21 +++-- .../priming/json/PrimingJsonImplicits.scala | 6 +- .../server/actors/BatchHandlerTest.scala | 8 +- .../server/actors/ExecuteHandlerTest.scala | 14 ++-- .../server/actors/QueryHandlerTest.scala | 4 +- .../QueryWithParametersTest.scala | 12 ++- ...edStatementExecutionVerificationTest.scala | 4 +- .../verification/QueryVerificationTest.scala | 21 ++++- .../server/priming/ActivityLogTest.scala | 19 ++--- .../priming/batch/PrimeBatchStoreTest.scala | 22 ++--- .../ActivityVerificationRouteTest.scala | 8 +- 24 files changed, 317 insertions(+), 103 deletions(-) diff --git a/codec/src/main/scala/org/scassandra/codec/Message.scala b/codec/src/main/scala/org/scassandra/codec/Message.scala index 2e437ce2..fcfffb87 100644 --- a/codec/src/main/scala/org/scassandra/codec/Message.scala +++ b/codec/src/main/scala/org/scassandra/codec/Message.scala @@ -543,7 +543,7 @@ object Batch { ("consistency" | Consistency.codec) :: // only parse flags for protocol version 2+ since that's when they were added. ("flags" | withDefaultValue(conditional(protocolVersion.version > 2, Codec[BatchFlags]), DefaultBatchFlags)).consume { flags => - ("serialConsistency" | conditional(flags.withSerialConsistency, consistency)) :: + ("serialConsistency" | conditional(flags.withSerialConsistency, Consistency.codec)) :: ("timestamp" | conditional(flags.withDefaultTimestamp, clong)) } { data => // derive flags from presence of serialConsistency and timestamp. val serialConsistency = data.head diff --git a/codec/src/main/scala/org/scassandra/codec/messages/Batch.scala b/codec/src/main/scala/org/scassandra/codec/messages/Batch.scala index 1532e8bb..600a46d2 100644 --- a/codec/src/main/scala/org/scassandra/codec/messages/Batch.scala +++ b/codec/src/main/scala/org/scassandra/codec/messages/Batch.scala @@ -50,10 +50,11 @@ private[codec] case class BatchFlags( private [codec] object BatchFlags { implicit val codec: Codec[BatchFlags] = { - ("reserved" | ignore(5)) :: + ("reserved" | ignore(1)) :: ("namesForValues" | bool) :: // Note that namesForValues is not currently used, see CASSANDRA-10246 ("defaultTimestamp" | bool) :: - ("serialConsistency" | bool) + ("serialConsistency" | bool) :: + ("reserved" | ignore(4)) // the first 4 bits are unused. }.as[BatchFlags] } diff --git a/codec/src/main/scala/org/scassandra/codec/messages/Query.scala b/codec/src/main/scala/org/scassandra/codec/messages/Query.scala index c6055829..f25eee79 100644 --- a/codec/src/main/scala/org/scassandra/codec/messages/Query.scala +++ b/codec/src/main/scala/org/scassandra/codec/messages/Query.scala @@ -70,7 +70,7 @@ object QueryParameters { ("skipMetadata" | provide[Boolean](flags.skipMetadata)) :: ("pageSize" | conditional(flags.pageSize, cint)) :: ("pagingState" | conditional(flags.withPagingState, cbytes)) :: - ("serialConsistency " | conditional(flags.withSerialConsistency, consistency)) :: + ("serialConsistency " | conditional(flags.withSerialConsistency, Consistency.codec)) :: ("timestamp" | conditional(flags.withDefaultTimestamp, clong)) } { data => // This is admittedly contrived, but allows us to forgo making QueryFlags part of QueryParameters as diff --git a/gradle.properties b/gradle.properties index d39a860f..e4f19d6a 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,2 @@ #Tue, 17 Jan 2017 21:09:46 +0000 -version=1.1.3 +version=1.1.2-SNAPSHOT diff --git a/java-client/src/main/java/org/scassandra/http/client/BatchExecution.java b/java-client/src/main/java/org/scassandra/http/client/BatchExecution.java index bdaacc0c..a0a9e2b2 100644 --- a/java-client/src/main/java/org/scassandra/http/client/BatchExecution.java +++ b/java-client/src/main/java/org/scassandra/http/client/BatchExecution.java @@ -25,12 +25,18 @@ public final class BatchExecution { private final List batchQueries; private final String consistency; + private final String serialConsistency; private final BatchType batchType; + private final Long timestamp; - private BatchExecution(List batchQueries, String consistency, BatchType batchType) { + + private BatchExecution(List batchQueries, String consistency, String serialConsistency, + BatchType batchType, Long timestamp) { this.batchQueries = batchQueries; this.consistency = consistency; + this.serialConsistency = serialConsistency; this.batchType = batchType; + this.timestamp = timestamp; } public List getBatchQueries() { @@ -41,30 +47,40 @@ public String getConsistency() { return consistency; } + public String getSerialConsistency() { + return serialConsistency; + } + public BatchType getBatchType() { return batchType; } + public Long getTimestamp() { + return timestamp; + } + @Override public boolean equals(Object o) { - if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; BatchExecution that = (BatchExecution) o; - if (batchQueries != null ? !batchQueries.equals(that.batchQueries) : that.batchQueries != null) - return false; + if (batchQueries != null ? !batchQueries.equals(that.batchQueries) : that.batchQueries != null) return false; if (consistency != null ? !consistency.equals(that.consistency) : that.consistency != null) return false; - return batchType == that.batchType; - + if (serialConsistency != null ? !serialConsistency.equals(that.serialConsistency) : that.serialConsistency != null) + return false; + if (batchType != that.batchType) return false; + return timestamp != null ? timestamp.equals(that.timestamp) : that.timestamp == null; } @Override public int hashCode() { int result = batchQueries != null ? batchQueries.hashCode() : 0; result = 31 * result + (consistency != null ? consistency.hashCode() : 0); + result = 31 * result + (serialConsistency != null ? serialConsistency.hashCode() : 0); result = 31 * result + (batchType != null ? batchType.hashCode() : 0); + result = 31 * result + (timestamp != null ? timestamp.hashCode() : 0); return result; } @@ -73,7 +89,9 @@ public String toString() { return "BatchExecution{" + "batchQueries=" + batchQueries + ", consistency='" + consistency + '\'' + + ", serialConsistency='" + serialConsistency + '\'' + ", batchType=" + batchType + + ", timestamp=" + timestamp + '}'; } @@ -85,7 +103,9 @@ public static BatchExecutionBuilder builder() { public static class BatchExecutionBuilder { private List batchQueries; private String consistency = "ONE"; + private String serialConsistency; private BatchType batchType = LOGGED; + private Long timestamp; private BatchExecutionBuilder() { } @@ -119,6 +139,11 @@ public BatchExecutionBuilder withConsistency(String consistency) { return this; } + public BatchExecutionBuilder withSerialConsistency(String serialConsistency) { + this.serialConsistency = serialConsistency; + return this; + } + /** * Defaults to LOGGED if not set. * @@ -130,8 +155,13 @@ public BatchExecutionBuilder withBatchType(BatchType batchType) { return this; } + public BatchExecutionBuilder withTimestamp(Long timestamp) { + this.timestamp = timestamp; + return this; + } + public BatchExecution build() { - return new BatchExecution(batchQueries, consistency, batchType); + return new BatchExecution(batchQueries, consistency, serialConsistency, batchType, timestamp); } } } diff --git a/java-client/src/main/java/org/scassandra/http/client/PreparedStatementExecution.java b/java-client/src/main/java/org/scassandra/http/client/PreparedStatementExecution.java index 4995faaa..f290bb3d 100644 --- a/java-client/src/main/java/org/scassandra/http/client/PreparedStatementExecution.java +++ b/java-client/src/main/java/org/scassandra/http/client/PreparedStatementExecution.java @@ -26,14 +26,19 @@ public final class PreparedStatementExecution { private final String preparedStatementText; private final String consistency; + private final String serialConsistency; private final List variables; private final List variableTypes; + private final Long timestamp; - private PreparedStatementExecution(String preparedStatementText, String consistency, List variables, List variableTypes) { + private PreparedStatementExecution(String preparedStatementText, String consistency, String serialConsistency, + List variables, List variableTypes, Long timestamp) { this.preparedStatementText = preparedStatementText; this.consistency = consistency; + this.serialConsistency = serialConsistency; this.variables = variables; this.variableTypes = variableTypes; + this.timestamp = timestamp; } public String getPreparedStatementText() { @@ -44,17 +49,16 @@ public String getConsistency() { return consistency; } - public List getVariables() { - return Collections.unmodifiableList(variables); + public String getSerialConsistency() { + return serialConsistency; } - @Override - public String toString() { - return "PreparedStatementExecution{" + - "preparedStatementText='" + preparedStatementText + '\'' + - ", consistency='" + consistency + '\'' + - ", variables=" + variables + - '}'; + public Long getTimestamp() { + return timestamp; + } + + public List getVariables() { + return Collections.unmodifiableList(variables); } @Override @@ -64,22 +68,38 @@ public boolean equals(Object o) { PreparedStatementExecution that = (PreparedStatementExecution) o; - if (consistency != null ? !consistency.equals(that.consistency) : that.consistency != null) return false; if (preparedStatementText != null ? !preparedStatementText.equals(that.preparedStatementText) : that.preparedStatementText != null) return false; + if (consistency != null ? !consistency.equals(that.consistency) : that.consistency != null) return false; + if (serialConsistency != null ? !serialConsistency.equals(that.serialConsistency) : that.serialConsistency != null) + return false; if (variables != null ? !variables.equals(that.variables) : that.variables != null) return false; - - return true; + if (variableTypes != null ? !variableTypes.equals(that.variableTypes) : that.variableTypes != null) + return false; + return timestamp != null ? timestamp.equals(that.timestamp) : that.timestamp == null; } @Override public int hashCode() { int result = preparedStatementText != null ? preparedStatementText.hashCode() : 0; result = 31 * result + (consistency != null ? consistency.hashCode() : 0); + result = 31 * result + (serialConsistency != null ? serialConsistency.hashCode() : 0); result = 31 * result + (variables != null ? variables.hashCode() : 0); + result = 31 * result + (variableTypes != null ? variableTypes.hashCode() : 0); + result = 31 * result + (timestamp != null ? timestamp.hashCode() : 0); return result; } + public String toString() { + return "PreparedStatementExecution{" + + "preparedStatementText='" + preparedStatementText + '\'' + + ", consistency='" + consistency + '\'' + + ", serialConsistency='" + serialConsistency + "\'" + + ", variables=" + variables + + ", timestamp=" + timestamp + + '}'; + } + public static PreparedStatementExecutionBuilder builder() { return new PreparedStatementExecutionBuilder(); } @@ -113,7 +133,9 @@ public static class PreparedStatementExecutionBuilder { private List variableTypes = Collections.emptyList(); private String preparedStatementText; private String consistency = "ONE"; + private String serialConsistency; private List variables = Collections.emptyList(); + private Long timestamp; private PreparedStatementExecutionBuilder() { } @@ -146,6 +168,11 @@ public PreparedStatementExecutionBuilder withConsistency(String consistency) { return this; } + public PreparedStatementExecutionBuilder withSerialConsistency(String serialConsistency) { + this.serialConsistency = serialConsistency; + return this; + } + public PreparedStatementExecutionBuilder withPreparedStatementText(String preparedStatementText) { this.preparedStatementText = preparedStatementText; return this; @@ -161,11 +188,17 @@ public PreparedStatementExecutionBuilder withVariables(List variables) { return this; } + public PreparedStatementExecutionBuilder withTimestamp(Long timestamp) { + this.timestamp = timestamp; + return this; + } + public PreparedStatementExecution build() { if (preparedStatementText == null) { throw new IllegalStateException("Must set preparedStatementText in PreparedStatementExecutionBuilder"); } - return new PreparedStatementExecution(this.preparedStatementText, this.consistency, this.variables, this.variableTypes); + return new PreparedStatementExecution(this.preparedStatementText, this.consistency, this.serialConsistency, + this.variables, this.variableTypes, this.timestamp); } } } diff --git a/java-client/src/main/java/org/scassandra/http/client/Query.java b/java-client/src/main/java/org/scassandra/http/client/Query.java index 334850c5..722d9ea6 100644 --- a/java-client/src/main/java/org/scassandra/http/client/Query.java +++ b/java-client/src/main/java/org/scassandra/http/client/Query.java @@ -25,14 +25,18 @@ public final class Query { private final String query; private final String consistency; + private final String serialConsistency; private final List variables; private final List variableTypes; + private final Long timestamp; - private Query(String query, String consistency, List variables, List variableTypes) { + private Query(String query, String consistency, String serialConsistency, List variables, List variableTypes, Long timestamp) { this.query = query; this.consistency = consistency; + this.serialConsistency = serialConsistency; this.variables = variables; this.variableTypes = variableTypes; + this.timestamp = timestamp; } public String getQuery() { @@ -43,6 +47,14 @@ public String getConsistency() { return consistency; } + public String getSerialConsistency() { + return serialConsistency; + } + + public Long getTimestamp() { + return timestamp; + } + public List getVariables() { return variables; } @@ -51,6 +63,26 @@ public List getVariableTypes() { return variableTypes; } + @Override + public String toString() { + return "Query{" + + "query='" + query + '\'' + + ", consistency='" + consistency + '\'' + + ", serialConsistency='" + serialConsistency + '\'' + + ", variables=" + variables + + ", variableTypes=" + variableTypes + + ", timestamp=" + timestamp + + '}'; + } + + public static QueryBuilder builder() { + return new QueryBuilder(); + } + + public static QueryBuilder builder(CqlType... variableTypes) { + return new QueryBuilder(Arrays.asList(variableTypes)); + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -60,43 +92,32 @@ public boolean equals(Object o) { if (query != null ? !query.equals(query1.query) : query1.query != null) return false; if (consistency != null ? !consistency.equals(query1.consistency) : query1.consistency != null) return false; + if (serialConsistency != null ? !serialConsistency.equals(query1.serialConsistency) : query1.serialConsistency != null) + return false; if (variables != null ? !variables.equals(query1.variables) : query1.variables != null) return false; - return !(variableTypes != null ? !variableTypes.equals(query1.variableTypes) : query1.variableTypes != null); - + if (variableTypes != null ? !variableTypes.equals(query1.variableTypes) : query1.variableTypes != null) + return false; + return timestamp != null ? timestamp.equals(query1.timestamp) : query1.timestamp == null; } @Override public int hashCode() { int result = query != null ? query.hashCode() : 0; result = 31 * result + (consistency != null ? consistency.hashCode() : 0); + result = 31 * result + (serialConsistency != null ? serialConsistency.hashCode() : 0); result = 31 * result + (variables != null ? variables.hashCode() : 0); result = 31 * result + (variableTypes != null ? variableTypes.hashCode() : 0); + result = 31 * result + (timestamp != null ? timestamp.hashCode() : 0); return result; } - @Override - public String toString() { - return "Query{" + - "query='" + query + '\'' + - ", consistency='" + consistency + '\'' + - ", variables=" + variables + - ", variableTypes=" + variableTypes + - '}'; - } - - public static QueryBuilder builder() { - return new QueryBuilder(); - } - - public static QueryBuilder builder(CqlType... variableTypes) { - return new QueryBuilder(Arrays.asList(variableTypes)); - } - public static class QueryBuilder { private String query; private String consistency = "ONE"; + private String serialConsistency = null; + private Long timestamp = null; private List variables = Collections.emptyList(); private List variableTypes = Collections.emptyList(); @@ -121,6 +142,21 @@ public QueryBuilder withConsistency(String consistency){ return this; } + /** + * Defaults to null if not set. + * @param consistency Serial query consistency + * @return this builder + */ + public QueryBuilder withSerialConsistency(String consistency){ + this.serialConsistency = consistency; + return this; + } + + public QueryBuilder withTimestamp(Long timestamp) { + this.timestamp = timestamp; + return this; + } + public QueryBuilder withVariables(List variables) { this.variables = variables; return this; @@ -135,7 +171,7 @@ public Query build(){ if (query == null) { throw new IllegalStateException("Must set query"); } - return new Query(query, consistency, variables, variableTypes); + return new Query(query, consistency, serialConsistency, variables, variableTypes, timestamp); } } } diff --git a/java-client/src/test/java/org/scassandra/http/client/ActivityClientTest.java b/java-client/src/test/java/org/scassandra/http/client/ActivityClientTest.java index 29b16361..d47c64e9 100644 --- a/java-client/src/test/java/org/scassandra/http/client/ActivityClientTest.java +++ b/java-client/src/test/java/org/scassandra/http/client/ActivityClientTest.java @@ -68,6 +68,22 @@ public void testRetrievalOfASingleQuery() { assertEquals(1, queries.size()); assertEquals("select * from people", queries.get(0).getQuery()); assertEquals("TWO", queries.get(0).getConsistency()); + assertEquals(null, queries.get(0).getSerialConsistency()); + assertEquals(null, queries.get(0).getTimestamp()); + } + + @Test + public void testRetrievalOfASingleQueryWithSerialCLAndTimestamp() { + //given + stubFor(get(urlEqualTo(queryUrl)).willReturn(aResponse().withBody("[{\"query\":\"select * from people\",\"consistency\":\"TWO\", \"serialConsistency\":\"LOCAL_SERIAL\", \"timestamp\": 8675309}]"))); + //when + List queries = underTest.retrieveQueries(); + //then + assertEquals(1, queries.size()); + assertEquals("select * from people", queries.get(0).getQuery()); + assertEquals("TWO", queries.get(0).getConsistency()); + assertEquals("LOCAL_SERIAL", queries.get(0).getSerialConsistency()); + assertEquals(new Long(8675309), queries.get(0).getTimestamp()); } @Test(expected = ActivityRequestFailed.class) @@ -183,6 +199,35 @@ public void retrievingPreparedStatementExecutions() throws Exception { assertEquals("select * from people where name = ?", singleExecution.getPreparedStatementText()); assertEquals(Arrays.asList("Chris"), singleExecution.getVariables()); assertEquals(Arrays.asList(TEXT), singleExecution.getVariableTypes()); + assertEquals(null, singleExecution.getSerialConsistency()); + assertEquals(null, singleExecution.getTimestamp()); + } + + @Test + public void retrievingPreparedStatementExecutionsWithSerialCLAndTimestamp() throws Exception { + //given + stubFor(get(urlEqualTo(preparedStatementExecutionUrl)) + .willReturn(aResponse().withStatus(200).withBody( + "[{\n" + + " \"preparedStatementText\": \"select * from people where name = ?\",\n" + + " \"consistency\": \"ONE\",\n" + + " \"variables\": [\"Chris\"],\n" + + " \"variableTypes\": [\"text\"],\n" + + " \"serialConsistency\": \"SERIAL\",\n" + + " \"timestamp\": 1234567890\n" + + "}]" + ))); + //when + List executions = underTest.retrievePreparedStatementExecutions(); + //then + assertEquals(1, executions.size()); + PreparedStatementExecution singleExecution = executions.get(0); + assertEquals("ONE", singleExecution.getConsistency()); + assertEquals("select * from people where name = ?", singleExecution.getPreparedStatementText()); + assertEquals(Arrays.asList("Chris"), singleExecution.getVariables()); + assertEquals(Arrays.asList(TEXT), singleExecution.getVariableTypes()); + assertEquals("SERIAL", singleExecution.getSerialConsistency()); + assertEquals(new Long(1234567890), singleExecution.getTimestamp()); } @Test(expected = ActivityRequestFailed.class) @@ -327,6 +372,27 @@ public void testRetrievalOfBatchExecutions() { assertEquals(expectedBatchExecution, batchExecution); } + @Test + public void testRetrievalOfBatchExecutionsWithSerialCLAndTimestamp() { + //given + stubFor(get(urlEqualTo(batchUrl)).willReturn(aResponse() + .withBody("[{\"batchQueries\":[{\"query\":\"select * from people\", \"batchQueryKind\":\"query\", \"variables\": [], \"variableTypes\": []}],\"consistency\":\"TWO\", \"batchType\":\"COUNTER\", \"serialConsistency\":\"LOCAL_SERIAL\", \"timestamp\": 8675309}]"))); + //when + List batchExecutions = underTest.retrieveBatches(); + //then + assertEquals(1, batchExecutions.size()); + final BatchExecution batchExecution = batchExecutions.get(0); + final BatchExecution expectedBatchExecution = BatchExecution.builder() + .withBatchQueries(BatchQuery.builder() + .withQuery("select * from people").build()) + .withConsistency("TWO") + .withSerialConsistency("LOCAL_SERIAL") + .withBatchType(BatchType.COUNTER) + .withTimestamp(8675309L) + .build(); + assertEquals(expectedBatchExecution, batchExecution); + } + @Test public void testDeletingOfRecordedBatches() { //given diff --git a/java-it-tests/driver21/src/test/java/cassandra/CassandraExecutor21.java b/java-it-tests/driver21/src/test/java/cassandra/CassandraExecutor21.java index 5c279ff0..1a2f6af4 100644 --- a/java-it-tests/driver21/src/test/java/cassandra/CassandraExecutor21.java +++ b/java-it-tests/driver21/src/test/java/cassandra/CassandraExecutor21.java @@ -51,6 +51,7 @@ public void onClusterClose(EventLoopGroup eventLoopGroup) { cluster = Cluster.builder().addContactPoint(Config.NATIVE_HOST) .withPort(binaryPort) .withNettyOptions(closeQuickly) + .withTimestampGenerator(ServerSideTimestampGenerator.INSTANCE) .build(); session = cluster.connect(KEYSPACE); } diff --git a/java-it-tests/driver30/src/test/java/cassandra/CassandraExecutor30.java b/java-it-tests/driver30/src/test/java/cassandra/CassandraExecutor30.java index 524f863a..7af6a092 100644 --- a/java-it-tests/driver30/src/test/java/cassandra/CassandraExecutor30.java +++ b/java-it-tests/driver30/src/test/java/cassandra/CassandraExecutor30.java @@ -51,6 +51,7 @@ public void onClusterClose(EventLoopGroup eventLoopGroup) { .withPort(binaryPort) .withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE)) .withNettyOptions(closeQuickly) + .withTimestampGenerator(ServerSideTimestampGenerator.INSTANCE) .build(); session = cluster.connect(Config.KEYSPACE); } diff --git a/server/src/main/scala/org/scassandra/server/actors/BatchHandler.scala b/server/src/main/scala/org/scassandra/server/actors/BatchHandler.scala index c8cbabdc..7a728055 100644 --- a/server/src/main/scala/org/scassandra/server/actors/BatchHandler.scala +++ b/server/src/main/scala/org/scassandra/server/actors/BatchHandler.scala @@ -84,7 +84,7 @@ class BatchHandler(activityLog: ActivityLog, } def processBatch(header: FrameHeader, batch: Batch, batchQueries: Seq[BatchQuery], recipient: ActorRef) = { - val execution = BatchExecution(batchQueries, batch.consistency, batch.batchType) + val execution = BatchExecution(batchQueries, batch.consistency, batch.serialConsistency, batch.batchType, batch.timestamp) activityLog.recordBatchExecution(execution) val prime = batchPrimeStore(execution) prime.foreach(p => log.info("Found prime {} for batch execution {}", p, execution)) diff --git a/server/src/main/scala/org/scassandra/server/actors/ExecuteHandler.scala b/server/src/main/scala/org/scassandra/server/actors/ExecuteHandler.scala index d27a0ac0..51bbdb2a 100644 --- a/server/src/main/scala/org/scassandra/server/actors/ExecuteHandler.scala +++ b/server/src/main/scala/org/scassandra/server/actors/ExecuteHandler.scala @@ -60,16 +60,19 @@ class ExecuteHandler(primePreparedStore: PreparedStoreLookup, activityLog: Activ val values = extractQueryVariables(queryText, execute.parameters.values.map(_.map(_.value)), dataTypes) values match { case Some(v) => - activityLog.recordPreparedStatementExecution(queryText, execute.parameters.consistency, v, dataTypes) + activityLog.recordPreparedStatementExecution(queryText, execute.parameters.consistency, + execute.parameters.serialConsistency, v, dataTypes, execute.parameters.timestamp) case None => - activityLog.recordPreparedStatementExecution(queryText, execute.parameters.consistency, Nil, Nil) + activityLog.recordPreparedStatementExecution(queryText, execute.parameters.consistency, + execute.parameters.serialConsistency, Nil, Nil, execute.parameters.timestamp) } writePrime(execute, prime, header, Some(connection), alternative=Some(Reply(VoidResult)), consistency = Some(execute.parameters.consistency)) case None => val errMsg = s"Could not find prepared statement with id: 0x${execute.id.toHex}" - activityLog.recordPreparedStatementExecution(errMsg, execute.parameters.consistency, Nil, Nil) + activityLog.recordPreparedStatementExecution(errMsg, execute.parameters.consistency, + execute.parameters.serialConsistency, Nil, Nil, execute.parameters.timestamp) val unprepared = Unprepared(errMsg, execute.id) write(unprepared, header, Some(connection)) } diff --git a/server/src/main/scala/org/scassandra/server/actors/QueryHandler.scala b/server/src/main/scala/org/scassandra/server/actors/QueryHandler.scala index eec31397..58d955fe 100644 --- a/server/src/main/scala/org/scassandra/server/actors/QueryHandler.scala +++ b/server/src/main/scala/org/scassandra/server/actors/QueryHandler.scala @@ -38,9 +38,11 @@ class QueryHandler(primeQueryStore: PrimeQueryStore, activityLog: ActivityLog) e typesAndValues match { case Some((variableTypes, values)) => - activityLog.recordQuery(query.query, query.parameters.consistency, values, variableTypes) + activityLog.recordQuery(query.query, query.parameters.consistency, query.parameters.serialConsistency, values, + variableTypes, query.parameters.timestamp) case None => - activityLog.recordQuery(query.query, query.parameters.consistency) + activityLog.recordQuery(query.query, query.parameters.consistency, query.parameters.serialConsistency, + timestamp = query.parameters.timestamp) } writePrime(query, prime, header, alternative = noRows, consistency = Some(query.parameters.consistency)) diff --git a/server/src/main/scala/org/scassandra/server/priming/ActivityLog.scala b/server/src/main/scala/org/scassandra/server/priming/ActivityLog.scala index 29702f9c..5b1a7122 100644 --- a/server/src/main/scala/org/scassandra/server/priming/ActivityLog.scala +++ b/server/src/main/scala/org/scassandra/server/priming/ActivityLog.scala @@ -36,9 +36,9 @@ class ActivityLog extends LazyLogging { } def retrieveQueries() : List[Query] = queries - - def recordQuery(query: String, consistency: Consistency, variables: List[Any] = List(), variableTypes: List[DataType] = List()) = { - queries = queries ::: Query(query, consistency, variables, variableTypes) :: Nil + + def recordQuery(query: String, consistency: Consistency, serialConsistency: Option[Consistency] = None, variables: List[Any] = List(), variableTypes: List[DataType] = List(), timestamp: Option[Long] = None) = { + queries = queries ::: Query(query, consistency, serialConsistency, variables, variableTypes, timestamp) :: Nil } /* Connection activity logging */ @@ -71,8 +71,9 @@ class ActivityLog extends LazyLogging { /* PreparedStatementExecution activity logging */ - def recordPreparedStatementExecution(preparedStatementText: String, consistency: Consistency, variables: List[Any], variableTypes: List[DataType]): Unit = { - val execution: PreparedStatementExecution = PreparedStatementExecution(preparedStatementText, consistency, variables, variableTypes) + def recordPreparedStatementExecution(preparedStatementText: String, consistency: Consistency, serialConsistency: Option[Consistency], variables: List[Any], variableTypes: List[DataType], timestamp: Option[Long]): Unit = { + val execution: PreparedStatementExecution = PreparedStatementExecution(preparedStatementText, consistency, + serialConsistency, variables, variableTypes, timestamp) logger.info("Recording {}",execution) preparedStatementExecutions = preparedStatementExecutions ::: execution :: Nil } @@ -107,9 +108,13 @@ class ActivityLog extends LazyLogging { } } -case class Query(query: String, consistency: Consistency, variables: List[Any] = List(), variableTypes: List[DataType] = List()) +case class Query(query: String, consistency: Consistency, serialConsistency: Option[Consistency], + variables: List[Any] = List(), variableTypes: List[DataType] = List(), timestamp: Option[Long] = None) case class Connection(result: String = "success") -case class PreparedStatementExecution(preparedStatementText: String, consistency: Consistency, variables: List[Any], variableTypes: List[DataType]) +case class PreparedStatementExecution(preparedStatementText: String, consistency: Consistency, + serialConsistency: Option[Consistency], variables: List[Any], + variableTypes: List[DataType], timestamp: Option[Long]) case class BatchQuery(query: String, batchQueryKind: BatchQueryKind, variables: List[Any] = List(), variableTypes: List[DataType] = List()) -case class BatchExecution(batchQueries: Seq[BatchQuery], consistency: Consistency, batchType: BatchType) +case class BatchExecution(batchQueries: Seq[BatchQuery], consistency: Consistency, + serialConsistency: Option[Consistency], batchType: BatchType, timestamp: Option[Long]) case class PreparedStatementPreparation(preparedStatementText: String) diff --git a/server/src/main/scala/org/scassandra/server/priming/json/PrimingJsonImplicits.scala b/server/src/main/scala/org/scassandra/server/priming/json/PrimingJsonImplicits.scala index f4462c46..f4f151f6 100644 --- a/server/src/main/scala/org/scassandra/server/priming/json/PrimingJsonImplicits.scala +++ b/server/src/main/scala/org/scassandra/server/priming/json/PrimingJsonImplicits.scala @@ -199,7 +199,7 @@ object PrimingJsonImplicits extends DefaultJsonProtocol with SprayJsonSupport wi implicit val impWhen = jsonFormat5(When) implicit val impPrimeQueryResult = jsonFormat(PrimeQuerySingle, "when", "then") implicit val impConnection = jsonFormat1(Connection) - implicit val impQuery = jsonFormat4(Query) + implicit val impQuery = jsonFormat6(Query) implicit val impPrimeCriteria = jsonFormat3(PrimeCriteria) implicit val impConflictingPrimes = jsonFormat1(ConflictingPrimes) implicit val impTypeMismatch = jsonFormat3(TypeMismatch) @@ -207,11 +207,11 @@ object PrimingJsonImplicits extends DefaultJsonProtocol with SprayJsonSupport wi implicit val impWhenPreparedSingle = jsonFormat3(WhenPrepared) implicit val impThenPreparedSingle = jsonFormat6(ThenPreparedSingle) implicit val impPrimePreparedSingle = jsonFormat(PrimePreparedSingle, "when", "then") - implicit val impPreparedStatementExecution = jsonFormat4(PreparedStatementExecution) + implicit val impPreparedStatementExecution = jsonFormat6(PreparedStatementExecution) implicit val impPreparedStatementPreparation = jsonFormat1(PreparedStatementPreparation) implicit val impVersion = jsonFormat1(Version) implicit val impBatchQuery = jsonFormat4(BatchQuery) - implicit val impBatchExecution = jsonFormat3(BatchExecution) + implicit val impBatchExecution = jsonFormat5(BatchExecution) implicit val impBatchQueryPrime = jsonFormat2(BatchQueryPrime) implicit val impBatchWhen = jsonFormat3(BatchWhen) implicit val impBatchPrimeSingle = jsonFormat(BatchPrimeSingle, "when", "then") diff --git a/server/src/test/scala/org/scassandra/server/actors/BatchHandlerTest.scala b/server/src/test/scala/org/scassandra/server/actors/BatchHandlerTest.scala index afbb530d..ab1da3ba 100644 --- a/server/src/test/scala/org/scassandra/server/actors/BatchHandlerTest.scala +++ b/server/src/test/scala/org/scassandra/server/actors/BatchHandlerTest.scala @@ -65,7 +65,7 @@ class BatchHandlerTest extends FunSuite with ProtocolActorTest with TestKitBase test("Records batch statement with ActivityLog") { val batch = Batch(BatchType.LOGGED, List( SimpleBatchQuery("insert into something"), SimpleBatchQuery("insert into something else") - )) + ), serialConsistency = Some(Consistency.LOCAL_SERIAL), timestamp = Some(8675309)) underTest ! protocolMessage(batch) @@ -77,7 +77,7 @@ class BatchHandlerTest extends FunSuite with ProtocolActorTest with TestKitBase BatchExecution(List( BatchQuery("insert into something", BatchQueryKind.Simple), BatchQuery("insert into something else", BatchQueryKind.Simple) - ), Consistency.ONE, BatchType.LOGGED)) + ), Consistency.ONE, Some(Consistency.LOCAL_SERIAL), BatchType.LOGGED, Some(8675309))) ) } @@ -108,7 +108,7 @@ class BatchHandlerTest extends FunSuite with ProtocolActorTest with TestKitBase activityLog.retrieveBatchExecutions() should equal(List( BatchExecution(List( BatchQuery("insert into something", BatchQueryKind.Prepared, List(1), List(DataType.Int)) - ), Consistency.ONE, BatchType.LOGGED)) + ), Consistency.ONE, None, BatchType.LOGGED, None)) ) } @@ -133,7 +133,7 @@ class BatchHandlerTest extends FunSuite with ProtocolActorTest with TestKitBase activityLog.retrieveBatchExecutions() should equal(List( BatchExecution(List( BatchQuery("A prepared statement was in the batch but couldn't be found - did you prepare against a different session?", BatchQueryKind.Prepared) - ), Consistency.ONE, BatchType.LOGGED)) + ), Consistency.ONE, None, BatchType.LOGGED, None)) ) } } diff --git a/server/src/test/scala/org/scassandra/server/actors/ExecuteHandlerTest.scala b/server/src/test/scala/org/scassandra/server/actors/ExecuteHandlerTest.scala index a575258a..2386c238 100644 --- a/server/src/test/scala/org/scassandra/server/actors/ExecuteHandlerTest.scala +++ b/server/src/test/scala/org/scassandra/server/actors/ExecuteHandlerTest.scala @@ -129,7 +129,7 @@ class ExecuteHandlerTest extends FunSuite with ProtocolActorTest with Matchers w columnSpec = ColumnSpecWithoutTable("0", DataType.Bigint) :: Nil)))))) activityLog.retrievePreparedStatementExecutions().size should equal(1) - activityLog.retrievePreparedStatementExecutions().head should equal(PreparedStatementExecution(query, consistency, values, variableTypes)) + activityLog.retrievePreparedStatementExecutions().head should equal(PreparedStatementExecution(query, consistency, None, values, variableTypes, None)) } test("Should record execution in activity log without variables when variables don't match prime") { @@ -144,7 +144,10 @@ class ExecuteHandlerTest extends FunSuite with ProtocolActorTest with Matchers w // Execute statement with two BigInt variables. val variables = List(10, 20) val execute = Execute(preparedIdBytes, parameters = QueryParameters(consistency = consistency, - values = Some(variables.map(v => QueryValue(None, Bytes(DataType.Bigint.codec.encode(v).require.bytes)))))) + serialConsistency = Some(Consistency.SERIAL), + values = Some(variables.map(v => QueryValue(None, Bytes(DataType.Bigint.codec.encode(v).require.bytes)))), + timestamp = Some(1000) + )) underTest ! protocolMessage(execute) @@ -156,7 +159,8 @@ class ExecuteHandlerTest extends FunSuite with ProtocolActorTest with Matchers w // The execution should still be recorded, but the variables not included. activityLog.retrievePreparedStatementExecutions().size should equal(1) - activityLog.retrievePreparedStatementExecutions().head should equal(PreparedStatementExecution(query, consistency, List(), List())) + activityLog.retrievePreparedStatementExecutions().head should equal(PreparedStatementExecution(query, consistency, + Some(Consistency.SERIAL), List(), List(), Some(1000))) } test("Should record execution in activity log event if not primed") { @@ -175,7 +179,7 @@ class ExecuteHandlerTest extends FunSuite with ProtocolActorTest with Matchers w // then - activity log should record an execution even though there was no prime. activityLog.retrievePreparedStatementExecutions().size should equal(1) - activityLog.retrievePreparedStatementExecutions().head should equal(PreparedStatementExecution(query, consistency, List(), List())) + activityLog.retrievePreparedStatementExecutions().head should equal(PreparedStatementExecution(query, consistency, None, List(), List(), None)) } test("Should return unprepared response if not prepared statement not found") { @@ -193,7 +197,7 @@ class ExecuteHandlerTest extends FunSuite with ProtocolActorTest with Matchers w // then - recorded execution indicating no prepared statement was found and an Unprepared response emitted. activityLog.retrievePreparedStatementExecutions().size should equal(1) - activityLog.retrievePreparedStatementExecutions().head should equal(PreparedStatementExecution(errMsg, consistency, List(), List())) + activityLog.retrievePreparedStatementExecutions().head should equal(PreparedStatementExecution(errMsg, consistency, None, List(), List(), None)) expectMsgPF() { case ProtocolResponse(_, Unprepared(`errMsg`, `preparedIdBytes`)) => true diff --git a/server/src/test/scala/org/scassandra/server/actors/QueryHandlerTest.scala b/server/src/test/scala/org/scassandra/server/actors/QueryHandlerTest.scala index b727b82d..13f90cfc 100644 --- a/server/src/test/scala/org/scassandra/server/actors/QueryHandlerTest.scala +++ b/server/src/test/scala/org/scassandra/server/actors/QueryHandlerTest.scala @@ -102,7 +102,7 @@ class QueryHandlerTest extends FunSuite with ImplicitSender with ProtocolActorTe val recordedQueries = activityLog.retrieveQueries() recordedQueries.size should equal(1) val recordedQuery = recordedQueries.head - recordedQuery should equal(RQuery(queryText, consistency)) + recordedQuery should equal(RQuery(queryText, consistency, None)) } test("Should record query parameter values from request in QueryLog if Prime contains variable types") { @@ -131,6 +131,6 @@ class QueryHandlerTest extends FunSuite with ImplicitSender with ProtocolActorTe val recordedQueries = activityLog.retrieveQueries() recordedQueries.size should equal(1) val recordedQuery = recordedQueries.head - recordedQuery should equal(RQuery(query.query, consistency, values, variableTypes)) + recordedQuery should equal(RQuery(query.query, consistency, None, values, variableTypes)) } } \ No newline at end of file diff --git a/server/src/test/scala/org/scassandra/server/e2e/querybuilder/QueryWithParametersTest.scala b/server/src/test/scala/org/scassandra/server/e2e/querybuilder/QueryWithParametersTest.scala index 90d0d6fa..815c5c69 100644 --- a/server/src/test/scala/org/scassandra/server/e2e/querybuilder/QueryWithParametersTest.scala +++ b/server/src/test/scala/org/scassandra/server/e2e/querybuilder/QueryWithParametersTest.scala @@ -41,7 +41,11 @@ class QueryWithParametersTest extends AbstractIntegrationTest { val result = session.execute(query, "chris") - getRecordedQueries() should contain(Query(query, ONE, List("chris"), List(DataType.Text))) + val queries = getRecordedQueries() + queries.size shouldEqual 2 // 1 for use keyspace, another for the actual query + queries(1) should matchPattern { + case Query(`query`, ONE, None, List("chris"), List(DataType.Text), Some(_)) => + } } test("Prime using int parameter") { @@ -56,6 +60,10 @@ class QueryWithParametersTest extends AbstractIntegrationTest { val result = session.execute(query, new Integer(15)) - getRecordedQueries() should contain(Query(query, ONE, List(15), List(DataType.Int))) + val queries = getRecordedQueries() + queries.size shouldEqual 2 + queries(1) should matchPattern { + case Query(`query`, ONE, None, List(15), List(DataType.Int), Some(_)) => + } } } diff --git a/server/src/test/scala/org/scassandra/server/e2e/verification/PreparedStatementExecutionVerificationTest.scala b/server/src/test/scala/org/scassandra/server/e2e/verification/PreparedStatementExecutionVerificationTest.scala index 3e1102b5..9f34322a 100644 --- a/server/src/test/scala/org/scassandra/server/e2e/verification/PreparedStatementExecutionVerificationTest.scala +++ b/server/src/test/scala/org/scassandra/server/e2e/verification/PreparedStatementExecutionVerificationTest.scala @@ -67,7 +67,9 @@ class PreparedStatementExecutionVerificationTest extends AbstractIntegrationTest println(result) val preparedStatementExecutions = JsonParser(result).convertTo[List[PreparedStatementExecution]] preparedStatementExecutions.size should equal(1) - preparedStatementExecutions.head should equal(PreparedStatementExecution(queryString, Consistency.ONE, List("Chris"), List(DataType.Varchar))) + preparedStatementExecutions.head should matchPattern { + case PreparedStatementExecution(`queryString`, Consistency.ONE, None, List("Chris"), List(DataType.Varchar), _) => + } } } } diff --git a/server/src/test/scala/org/scassandra/server/e2e/verification/QueryVerificationTest.scala b/server/src/test/scala/org/scassandra/server/e2e/verification/QueryVerificationTest.scala index eb33e2d3..4f4ca0df 100644 --- a/server/src/test/scala/org/scassandra/server/e2e/verification/QueryVerificationTest.scala +++ b/server/src/test/scala/org/scassandra/server/e2e/verification/QueryVerificationTest.scala @@ -26,6 +26,7 @@ class QueryVerificationTest extends AbstractIntegrationTest with ScalaFutures { before { clearQueryPrimes() + clearRecordedQueries() } test("Test clearing of query results") { @@ -44,6 +45,24 @@ class QueryVerificationTest extends AbstractIntegrationTest with ScalaFutures { session.execute(statement) val queryList = getRecordedQueries() - queryList should contain(Query(queryString, Consistency.TWO)) + queryList.size shouldEqual 2 // 1 for use keyspace, another for actual query + queryList(1) should matchPattern { + case Query(`queryString`, Consistency.TWO, None, List(), List(), Some(_)) => + } + } + + test("Test verification of a single query - with serial CL") { + val queryString: String = "select * from people" + val statement = new SimpleStatement(queryString) + statement.setConsistencyLevel(ConsistencyLevel.TWO) + statement.setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL) + + session.execute(statement) + + val queryList = getRecordedQueries() + queryList.size shouldEqual 2 // 1 for use keyspace, another for actual query + queryList(1) should matchPattern { + case Query(`queryString`, Consistency.TWO, Some(Consistency.LOCAL_SERIAL), List(), List(), Some(_)) => + } } } diff --git a/server/src/test/scala/org/scassandra/server/priming/ActivityLogTest.scala b/server/src/test/scala/org/scassandra/server/priming/ActivityLogTest.scala index e3a7b320..f12fcf6a 100644 --- a/server/src/test/scala/org/scassandra/server/priming/ActivityLogTest.scala +++ b/server/src/test/scala/org/scassandra/server/priming/ActivityLogTest.scala @@ -16,6 +16,7 @@ package org.scassandra.server.priming import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} +import org.scassandra.codec.Consistency import org.scassandra.codec.Consistency._ import org.scassandra.codec.datatype.DataType import org.scassandra.codec.messages.BatchQueryKind.Simple @@ -36,7 +37,7 @@ class ActivityLogTest extends FunSuite with Matchers with BeforeAndAfter { } test("Clear query activity log") { - underTest.recordQuery("select * from people", ONE) + underTest.recordQuery("select * from people", ONE, None) underTest.clearQueries() underTest.retrieveQueries().size should equal(0) } @@ -52,7 +53,7 @@ class ActivityLogTest extends FunSuite with Matchers with BeforeAndAfter { test("Store query and retrieve connection") { val query: String = "select * from people" - underTest.recordQuery(query, ONE) + underTest.recordQuery(query, ONE, None) underTest.retrieveQueries().size should equal(1) underTest.retrieveQueries().head.query should equal(query) underTest.retrieveQueries().head.consistency should equal(ONE) @@ -65,15 +66,15 @@ class ActivityLogTest extends FunSuite with Matchers with BeforeAndAfter { val consistency = ONE val variableTypes = List(DataType.Ascii, DataType.Bigint) - underTest.recordPreparedStatementExecution(preparedStatementText, consistency, variables, variableTypes) + underTest.recordPreparedStatementExecution(preparedStatementText, consistency, Some(Consistency.LOCAL_SERIAL), variables, variableTypes, Some(1)) val preparedStatementRecord = underTest.retrievePreparedStatementExecutions() preparedStatementRecord.size should equal(1) - preparedStatementRecord.head should equal(PreparedStatementExecution(preparedStatementText, consistency, variables, variableTypes)) + preparedStatementRecord.head should equal(PreparedStatementExecution(preparedStatementText, consistency, Some(Consistency.LOCAL_SERIAL), variables, variableTypes, Some(1))) } test("Clear prepared statement execution log") { - underTest.recordPreparedStatementExecution("anything", TWO, List(), List()) + underTest.recordPreparedStatementExecution("anything", TWO, None, List(), List(), None) underTest.clearPreparedStatementExecutions() underTest.retrievePreparedStatementExecutions().size should equal(0) } @@ -97,15 +98,15 @@ class ActivityLogTest extends FunSuite with Matchers with BeforeAndAfter { } test("Records query parameters") { - underTest.recordQuery("query", ONE, List("Hello"), List(DataType.Text)) + underTest.recordQuery("query", ONE, None, List("Hello"), List(DataType.Text)) - underTest.retrieveQueries() should equal(List(Query("query", ONE, List("Hello"), List(DataType.Text)))) + underTest.retrieveQueries() should equal(List(Query("query", ONE, None, List("Hello"), List(DataType.Text)))) } test("Record batch execution") { val consistency: Consistency = ONE val statements: List[BatchQuery] = List(BatchQuery("select * from hello", Simple)) - val execution: BatchExecution = BatchExecution(statements, consistency, LOGGED) + val execution: BatchExecution = BatchExecution(statements, consistency, None, LOGGED, None) underTest.recordBatchExecution(execution) val executions: List[BatchExecution] = underTest.retrieveBatchExecutions() @@ -114,7 +115,7 @@ class ActivityLogTest extends FunSuite with Matchers with BeforeAndAfter { } test("Clear batch execution") { - underTest.recordBatchExecution(BatchExecution(List(BatchQuery("select * from hello", Simple)), ONE, LOGGED)) + underTest.recordBatchExecution(BatchExecution(List(BatchQuery("select * from hello", Simple)), ONE, None, LOGGED, None)) underTest.clearBatchExecutions() diff --git a/server/src/test/scala/org/scassandra/server/priming/batch/PrimeBatchStoreTest.scala b/server/src/test/scala/org/scassandra/server/priming/batch/PrimeBatchStoreTest.scala index 8f475bfe..418b094e 100644 --- a/server/src/test/scala/org/scassandra/server/priming/batch/PrimeBatchStoreTest.scala +++ b/server/src/test/scala/org/scassandra/server/priming/batch/PrimeBatchStoreTest.scala @@ -16,6 +16,7 @@ package org.scassandra.server.priming.batch import org.scalatest.{FunSpec, Matchers} +import org.scassandra.codec.Consistency import org.scassandra.codec.Consistency._ import org.scassandra.codec.messages.BatchQueryKind._ import org.scassandra.codec.messages.BatchType._ @@ -33,7 +34,8 @@ class PrimeBatchStoreTest extends FunSpec with Matchers { underTest.record(primeRequest) - val prime = underTest(BatchExecution(Seq(BatchQuery("select * blah", Simple)), ONE, LOGGED)) + val ts = System.currentTimeMillis() + val prime = underTest(BatchExecution(Seq(BatchQuery("select * blah", Simple)), ONE, Some(Consistency.SERIAL), LOGGED, Some(ts))) prime should equal(Some(primeRequest.prime)) } @@ -43,7 +45,7 @@ class PrimeBatchStoreTest extends FunSpec with Matchers { underTest.record(primeRequest) - val prime = underTest(BatchExecution(Seq(BatchQuery("I am do different", Simple)), ONE, LOGGED)) + val prime = underTest(BatchExecution(Seq(BatchQuery("I am do different", Simple)), ONE, None, LOGGED, None)) prime should equal(None) } @@ -56,7 +58,7 @@ class PrimeBatchStoreTest extends FunSpec with Matchers { BatchQueryPrime("select * wah", Prepared))), Then(result = Some(Success)))) - val prime = underTest(BatchExecution(Seq(BatchQuery("select * blah", Simple)), ONE, LOGGED)) + val prime = underTest(BatchExecution(Seq(BatchQuery("select * blah", Simple)), ONE, None, LOGGED, None)) prime should equal(None) } @@ -73,7 +75,7 @@ class PrimeBatchStoreTest extends FunSpec with Matchers { val prime = underTest(BatchExecution(Seq( BatchQuery("select * blah", Simple), - BatchQuery("select * wah", Prepared)), ONE, LOGGED)) + BatchQuery("select * wah", Prepared)), ONE, None, LOGGED, None)) prime should equal(Some(batchRequest.prime)) } @@ -89,7 +91,7 @@ class PrimeBatchStoreTest extends FunSpec with Matchers { val prime = underTest.apply(BatchExecution(Seq( BatchQuery("select * blah", Simple), - BatchQuery("select * wah", Prepared)), ONE, LOGGED)) + BatchQuery("select * wah", Prepared)), ONE, None, LOGGED, None)) prime should equal(None) } @@ -108,7 +110,7 @@ class PrimeBatchStoreTest extends FunSpec with Matchers { val prime = underTest.apply(BatchExecution(Seq( BatchQuery("select * blah", Simple), - BatchQuery("select * wah", Prepared)), TWO, LOGGED)) + BatchQuery("select * wah", Prepared)), TWO, None, LOGGED, None)) prime should equal(Some(batchRequest.prime)) } @@ -120,7 +122,7 @@ class PrimeBatchStoreTest extends FunSpec with Matchers { BatchWhen(List(BatchQueryPrime("select * blah", Simple)), batchType = Some(COUNTER)), Then(result = Some(Success)))) - val prime = underTest(BatchExecution(Seq(BatchQuery("select * blah", Simple)), ONE, LOGGED)) + val prime = underTest(BatchExecution(Seq(BatchQuery("select * blah", Simple)), ONE, None, LOGGED, None)) prime should equal(None) } @@ -134,7 +136,7 @@ class PrimeBatchStoreTest extends FunSpec with Matchers { underTest.record(batchRequest) - val prime = underTest(BatchExecution(Seq(BatchQuery("select * blah", Simple)), ONE, COUNTER)) + val prime = underTest(BatchExecution(Seq(BatchQuery("select * blah", Simple)), ONE, None, COUNTER, None)) prime should equal(Some(batchRequest.prime)) } @@ -150,7 +152,7 @@ class PrimeBatchStoreTest extends FunSpec with Matchers { BatchWhen(List(BatchQueryPrime("select * blah", Simple))), Then(result = Some(WriteTimeout)))) - val prime = underTest(BatchExecution(Seq(BatchQuery("select * blah", Simple)), ONE, LOGGED)) + val prime = underTest(BatchExecution(Seq(BatchQuery("select * blah", Simple)), ONE, None, LOGGED, None)) prime should equal(Some(batchRequest.prime)) } @@ -165,7 +167,7 @@ class PrimeBatchStoreTest extends FunSpec with Matchers { underTest.record(batchRequest) - val prime = underTest(BatchExecution(Seq(BatchQuery("select * blah", Simple)), ONE, LOGGED)) + val prime = underTest(BatchExecution(Seq(BatchQuery("select * blah", Simple)), ONE, None, LOGGED, None)) prime should equal(Some(batchRequest.prime)) } diff --git a/server/src/test/scala/org/scassandra/server/priming/routes/ActivityVerificationRouteTest.scala b/server/src/test/scala/org/scassandra/server/priming/routes/ActivityVerificationRouteTest.scala index 4cce053e..3b70545a 100644 --- a/server/src/test/scala/org/scassandra/server/priming/routes/ActivityVerificationRouteTest.scala +++ b/server/src/test/scala/org/scassandra/server/priming/routes/ActivityVerificationRouteTest.scala @@ -145,7 +145,7 @@ class ActivityVerificationRouteTest extends FunSpec with BeforeAndAfter with Mat it("Should return prepared statement executions from ActivityLog - single") { activityLog.clearPreparedStatementExecutions() val preparedStatementText: String = "" - activityLog.recordPreparedStatementExecution(preparedStatementText, ONE, List(), List()) + activityLog.recordPreparedStatementExecution(preparedStatementText, ONE, None, List(), List(), None) Get("/prepared-statement-execution") ~> activityVerificationRoute ~> check { val response = responseAs[List[PreparedStatementExecution]] @@ -156,7 +156,7 @@ class ActivityVerificationRouteTest extends FunSpec with BeforeAndAfter with Mat } it("Should clear prepared statement executions for a delete") { - activityLog.recordPreparedStatementExecution("", ONE, List(), List()) + activityLog.recordPreparedStatementExecution("", ONE, None, List(), List(), None) Delete("/prepared-statement-execution") ~> activityVerificationRoute ~> check { activityLog.retrievePreparedStatementExecutions().size should equal(0) @@ -167,7 +167,7 @@ class ActivityVerificationRouteTest extends FunSpec with BeforeAndAfter with Mat describe("Batch execution") { it("Should return executions from ActivityLog") { activityLog.clearBatchExecutions() - activityLog.recordBatchExecution(BatchExecution(List(BatchQuery("Query", Simple)), ONE, LOGGED)) + activityLog.recordBatchExecution(BatchExecution(List(BatchQuery("Query", Simple)), ONE, None, LOGGED, None)) Get("/batch-execution") ~> activityVerificationRoute ~> check { val response = responseAs[List[BatchExecution]] @@ -180,7 +180,7 @@ class ActivityVerificationRouteTest extends FunSpec with BeforeAndAfter with Mat } it("Should clear batch executions for a delete") { - activityLog.recordBatchExecution(BatchExecution(List(), ONE, UNLOGGED)) + activityLog.recordBatchExecution(BatchExecution(List(), ONE, None, UNLOGGED, None)) Delete("/batch-execution") ~> activityVerificationRoute ~> check { activityLog.retrieveBatchExecutions().size should equal(0)