Skip to content

Commit

Permalink
WIP: Cassandra 4
Browse files Browse the repository at this point in the history
  • Loading branch information
mattyb149 committed Mar 15, 2024
1 parent 7db1664 commit 88f721f
Show file tree
Hide file tree
Showing 32 changed files with 1,659 additions and 235 deletions.
6 changes: 6 additions & 0 deletions nifi-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,12 @@ language governing permissions and limitations under the License. -->
<version>2.0.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-cassandra4-services-nar</artifactId>
<version>2.0.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-registry-nar</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,6 @@
*/
package org.apache.nifi.processors.cassandra;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.CodecRegistry;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.ProtocolOptions;
import com.datastax.driver.core.RemoteEndpointAwareJdkSSLOptions;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.SSLOptions;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.TypeCodec;
import com.datastax.driver.core.exceptions.AuthenticationException;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.extras.codecs.arrays.ObjectArrayCodec;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -38,11 +24,17 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLContext;

import com.datastax.driver.core.DataType;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.cassandra.AuthenticationException;
import org.apache.nifi.cassandra.CassandraRow;
import org.apache.nifi.cassandra.CassandraSession;
import org.apache.nifi.cassandra.CassandraSessionProviderService;
import org.apache.nifi.cassandra.NoHostAvailableException;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
Expand Down Expand Up @@ -134,15 +126,15 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
.name("Consistency Level")
.description("The strategy for how many replicas must respond before results are returned.")
.required(false)
.allowableValues(ConsistencyLevel.values())
.allowableValues("ANY", "ONE", "TWO", "THREE", "QUORUM", "ALL", "LOCAL_QUORUM", "EACH_QUORUM", "SERIAL", "LOCAL_SERIAL", "LOCAL_ONE")
.defaultValue("ONE")
.build();

static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder()
.name("Compression Type")
.description("Enable compression at transport-level requests and responses")
.required(false)
.allowableValues(ProtocolOptions.Compression.values())
.allowableValues("NONE", "SNAPPY", "LZ4")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("NONE")
.build();
Expand Down Expand Up @@ -186,10 +178,7 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
descriptors.add(CHARSET);
}

protected final AtomicReference<Cluster> cluster = new AtomicReference<>(null);
protected final AtomicReference<Session> cassandraSession = new AtomicReference<>(null);

protected static final CodecRegistry codecRegistry = new CodecRegistry();
protected final AtomicReference<CassandraSession> cassandraSession = new AtomicReference<>(null);

@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
Expand Down Expand Up @@ -229,7 +218,6 @@ public void onScheduled(ProcessContext context) {

if (connectionProviderIsSet) {
CassandraSessionProviderService sessionProvider = context.getProperty(CONNECTION_PROVIDER_SERVICE).asControllerService(CassandraSessionProviderService.class);
cluster.set(sessionProvider.getCluster());
cassandraSession.set(sessionProvider.getCassandraSession());
return;
}
Expand All @@ -247,97 +235,53 @@ public void onScheduled(ProcessContext context) {
}

void connectToCassandra(ProcessContext context) {
if (cluster.get() == null) {
ComponentLog log = getLogger();
final String contactPointList = context.getProperty(CONTACT_POINTS).evaluateAttributeExpressions().getValue();
final String consistencyLevel = context.getProperty(CONSISTENCY_LEVEL).getValue();
final String compressionType = context.getProperty(COMPRESSION_TYPE).getValue();
List<InetSocketAddress> contactPoints = getContactPoints(contactPointList);

// Set up the client for secure (SSL/TLS communications) if configured to do so
final SSLContextService sslService = context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final SSLContext sslContext;

if (sslService != null) {
sslContext = sslService.createContext();
} else {
sslContext = null;
}

final String username, password;
PropertyValue usernameProperty = context.getProperty(USERNAME).evaluateAttributeExpressions();
PropertyValue passwordProperty = context.getProperty(PASSWORD).evaluateAttributeExpressions();
ComponentLog log = getLogger();
final String contactPointList = context.getProperty(CONTACT_POINTS).evaluateAttributeExpressions().getValue();
final String consistencyLevel = context.getProperty(CONSISTENCY_LEVEL).getValue();
final String compressionType = context.getProperty(COMPRESSION_TYPE).getValue();
List<InetSocketAddress> contactPoints = getContactPoints(contactPointList);

// Set up the client for secure (SSL/TLS communications) if configured to do so
final SSLContextService sslService = context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final SSLContext sslContext;

if (sslService != null) {
sslContext = sslService.createContext();
} else {
sslContext = null;
}

if (usernameProperty != null && passwordProperty != null) {
username = usernameProperty.getValue();
password = passwordProperty.getValue();
} else {
username = null;
password = null;
}
final String username, password;
PropertyValue usernameProperty = context.getProperty(USERNAME).evaluateAttributeExpressions();
PropertyValue passwordProperty = context.getProperty(PASSWORD).evaluateAttributeExpressions();

// Create the cluster and connect to it
Cluster newCluster = createCluster(contactPoints, sslContext, username, password, compressionType);
PropertyValue keyspaceProperty = context.getProperty(KEYSPACE).evaluateAttributeExpressions();
if (usernameProperty != null && passwordProperty != null) {
username = usernameProperty.getValue();
password = passwordProperty.getValue();
} else {
username = null;
password = null;
}

final Session newSession;
// For Java 11, the getValue() call was added so the test could pass
if (keyspaceProperty != null && keyspaceProperty.getValue() != null) {
newSession = newCluster.connect(keyspaceProperty.getValue());
} else {
newSession = newCluster.connect();
}
// Create the cluster and connect to it
CassandraSession cassandraSession = createSession(contactPoints, sslContext, username, password, compressionType);
PropertyValue keyspaceProperty = context.getProperty(KEYSPACE).evaluateAttributeExpressions();

newCluster.getConfiguration().getQueryOptions().setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevel));
Metadata metadata = newCluster.getMetadata();
if (keyspaceProperty != null && keyspaceProperty.getValue() != null) {
newSession = cassandraSession.connect(keyspaceProperty.getValue());
} else {
newSession = cassandraSession.connect(null);
}

log.info("Connected to Cassandra cluster: {}", new Object[]{metadata.getClusterName()});
cassandraSession.setConsistencyLevel(consistencyLevel);

cluster.set(newCluster);
cassandraSession.set(newSession);
}
}
log.info("Connected to Cassandra cluster: {}", cassandraSession.getClusterName());

protected void registerAdditionalCodecs() {
// Conversion between a String[] and a list of varchar
CodecRegistry.DEFAULT_INSTANCE.register(new ObjectArrayCodec<>(
DataType.list(DataType.varchar()),
String[].class,
TypeCodec.varchar()));
this.cassandraSession.set(newSession);
}

/**
* Uses a Cluster.Builder to create a Cassandra cluster reference using the given parameters
*
* @param contactPoints The contact points (hostname:port list of Cassandra nodes)
* @param sslContext The SSL context (used for secure connections)
* @param username The username for connection authentication
* @param password The password for connection authentication
* @param compressionType Enable compression at transport-level requests and responses.
* @return A reference to the Cluster object associated with the given Cassandra configuration
*/
protected Cluster createCluster(List<InetSocketAddress> contactPoints, SSLContext sslContext,
String username, String password, String compressionType) {
Cluster.Builder builder = Cluster.builder().addContactPointsWithPorts(contactPoints);
if (sslContext != null) {
final SSLOptions sslOptions = RemoteEndpointAwareJdkSSLOptions.builder()
.withSSLContext(sslContext)
.build();

builder = builder.withSSL(sslOptions);
if (ProtocolOptions.Compression.SNAPPY.name().equals(compressionType)) {
builder = builder.withCompression(ProtocolOptions.Compression.SNAPPY);
} else if (ProtocolOptions.Compression.LZ4.name().equals(compressionType)) {
builder = builder.withCompression(ProtocolOptions.Compression.LZ4);
}
}

if (username != null && password != null) {
builder = builder.withCredentials(username, password);
}
protected abstract void registerAdditionalCodecs();

return builder.build();
}

public void stop(ProcessContext context) {
// We don't want to close the connection when using 'Cassandra Connection Provider'
Expand All @@ -349,15 +293,11 @@ public void stop(ProcessContext context) {
cassandraSession.get().close();
cassandraSession.set(null);
}
if (cluster.get() != null) {
cluster.get().close();
cluster.set(null);
}
}
}


protected static Object getCassandraObject(Row row, int i, DataType dataType) {
protected static Object getCassandraObject(CassandraRow row, int i, DataType dataType) {
if (dataType.equals(DataType.blob())) {
return row.getBytes(i);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,7 @@
*/
package org.apache.nifi.processors.cassandra;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.TypeCodec;
import com.datastax.driver.core.exceptions.InvalidTypeException;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
Expand All @@ -38,6 +34,10 @@
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.cassandra.CassandraBoundStatement;
import org.apache.nifi.cassandra.CassandraPreparedStatement;
import org.apache.nifi.cassandra.CassandraResultSetFuture;
import org.apache.nifi.cassandra.CassandraSession;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
Expand Down Expand Up @@ -128,7 +128,7 @@ public class PutCassandraQL extends AbstractCassandraProcessor {
* LRU cache for the compiled patterns. The size of the cache is determined by the value of the Statement Cache Size property
*/
@VisibleForTesting
private ConcurrentMap<String, PreparedStatement> statementCache;
private ConcurrentMap<String, CassandraPreparedStatement> statementCache;

/*
* Will ensure that the list of property descriptors is build only once.
Expand Down Expand Up @@ -167,7 +167,7 @@ public void onScheduled(final ProcessContext context) {
int statementCacheSize = context.getProperty(STATEMENT_CACHE_SIZE).evaluateAttributeExpressions().asInteger();
statementCache = CacheBuilder.newBuilder()
.maximumSize(statementCacheSize)
.<String, PreparedStatement>build()
.<String, CassandraPreparedStatement>build()
.asMap();
}

Expand All @@ -185,16 +185,16 @@ public void onTrigger(final ProcessContext context, final ProcessSession session

// The documentation for the driver recommends the session remain open the entire time the processor is running
// and states that it is thread-safe. This is why connectionSession is not in a try-with-resources.
final Session connectionSession = cassandraSession.get();
final CassandraSession connectionSession = cassandraSession.get();

String cql = getCQL(session, flowFile, charset);
try {
PreparedStatement statement = statementCache.get(cql);
CassandraPreparedStatement statement = statementCache.get(cql);
if(statement == null) {
statement = connectionSession.prepare(cql);
statementCache.put(cql, statement);
}
BoundStatement boundStatement = statement.bind();
CassandraBoundStatement boundStatement = statement.bind();

Map<String, String> attributes = flowFile.getAttributes();
for (final Map.Entry<String, String> entry : attributes.entrySet()) {
Expand All @@ -221,7 +221,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
}

try {
ResultSetFuture future = connectionSession.executeAsync(boundStatement);
CassandraResultSetFuture future = connectionSession.executeAsync(boundStatement);
if (statementTimeout > 0) {
future.getUninterruptibly(statementTimeout, TimeUnit.MILLISECONDS);
} else {
Expand All @@ -231,7 +231,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);

// This isn't a real URI but since Cassandra is distributed we just use the cluster name
String transitUri = "cassandra://" + connectionSession.getCluster().getMetadata().getClusterName();
String transitUri = "cassandra://" + connectionSession.getClusterName();
session.getProvenanceReporter().send(flowFile, transitUri, transmissionMillis, true);
session.transfer(flowFile, REL_SUCCESS);

Expand Down Expand Up @@ -295,14 +295,14 @@ public void process(final InputStream in) throws IOException {
* Determines how to map the given value to the appropriate Cassandra data type and returns the object as
* represented by the given type. This can be used in a Prepared/BoundStatement.
*
* @param statement the BoundStatement for setting objects on
* @param statement the CassandraBoundStatement for setting objects on
* @param paramIndex the index of the parameter at which to set the object
* @param attrName the name of the attribute that the parameter is coming from - for logging purposes
* @param paramValue the value of the CQL parameter to set
* @param paramType the Cassandra data type of the CQL parameter to set
* @throws IllegalArgumentException if the PreparedStatement throws a CQLException when calling the appropriate setter
*/
protected void setStatementObject(final BoundStatement statement, final int paramIndex, final String attrName,
protected void setStatementObject(final CassandraBoundStatement statement, final int paramIndex, final String attrName,
final String paramValue, final String paramType) throws IllegalArgumentException {
if (paramValue == null) {
statement.setToNull(paramIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
}

@Override
protected Cluster createCluster(List<InetSocketAddress> contactPoints, SSLContext sslContext,
protected Cluster createSession(List<InetSocketAddress> contactPoints, SSLContext sslContext,
String username, String password, String compressionType) {
Cluster mockCluster = mock(Cluster.class);
Metadata mockMetadata = mock(Metadata.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ private static class MockPutCassandraQL extends PutCassandraQL {
private Session mockSession = mock(Session.class);

@Override
protected Cluster createCluster(List<InetSocketAddress> contactPoints, SSLContext sslContext,
protected Cluster createSession(List<InetSocketAddress> contactPoints, SSLContext sslContext,
String username, String password, String compressionType) {
Cluster mockCluster = mock(Cluster.class);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ private static class MockPutCassandraRecord extends PutCassandraRecord {
private Session mockSession = mock(Session.class);

@Override
protected Cluster createCluster(List<InetSocketAddress> contactPoints, SSLContext sslContext,
protected Cluster createSession(List<InetSocketAddress> contactPoints, SSLContext sslContext,
String username, String password, String compressionType) {
Cluster mockCluster = mock(Cluster.class);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ private static class MockQueryCassandra extends QueryCassandra {
private Exception exceptionToThrow = null;

@Override
protected Cluster createCluster(List<InetSocketAddress> contactPoints, SSLContext sslContext,
protected Cluster createSession(List<InetSocketAddress> contactPoints, SSLContext sslContext,
String username, String password, String compressionType) {
Cluster mockCluster = mock(Cluster.class);
try {
Expand Down Expand Up @@ -550,7 +550,7 @@ private static class MockQueryCassandraTwoRounds extends MockQueryCassandra {
private Exception exceptionToThrow = null;

@Override
protected Cluster createCluster(List<InetSocketAddress> contactPoints, SSLContext sslContext,
protected Cluster createSession(List<InetSocketAddress> contactPoints, SSLContext sslContext,
String username, String password, String compressionType) {
Cluster mockCluster = mock(Cluster.class);
try {
Expand Down
Loading

0 comments on commit 88f721f

Please sign in to comment.