Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cassandra Connection #5

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
package com.google.cloud.teleport.v2.spanner.migrations.cassandra;

import java.io.Serializable;
import java.util.Objects;

public class CassandraConfig implements Serializable {

private String host;
private String port;
private String username;
private String password;
private String keyspace;
private String consistencyLevel = "LOCAL_QUORUM";
private boolean sslOptions = false;
private String protocolVersion = "v5";
private String dataCenter = "datacenter1";
private int localPoolSize = 1024;
private int remotePoolSize = 256;

public CassandraConfig() {}

public CassandraConfig(
String host,
String port,
String username,
String password,
String keyspace,
String consistencyLevel,
boolean sslOptions,
String protocolVersion,
String dataCenter,
int localPoolSize,
int remotePoolSize) {
this.host = host;
this.port = port;
this.username = username;
this.password = password;
this.keyspace = keyspace;
this.consistencyLevel = consistencyLevel;
this.sslOptions = sslOptions;
this.protocolVersion = protocolVersion;
this.dataCenter = dataCenter;
this.localPoolSize = localPoolSize;
this.remotePoolSize = remotePoolSize;
}

// Getter and Setter methods for all fields

public String getHost() {
return host;
}

public void setHost(String host) {
this.host = host;
}

public String getPort() {
return port;
}

public void setPort(String port) {
this.port = port;
}

public String getUsername() {
return username;
}

public void setUsername(String username) {
this.username = username;
}

public String getPassword() {
return password;
}

public void setPassword(String password) {
this.password = password;
}

public String getKeyspace() {
return keyspace;
}

public void setKeyspace(String keyspace) {
this.keyspace = keyspace;
}

public String getConsistencyLevel() {
return consistencyLevel;
}

public void setConsistencyLevel(String consistencyLevel) {
this.consistencyLevel = consistencyLevel;
}

public boolean isSslOptions() {
return sslOptions;
}

public void setSslOptions(boolean sslOptions) {
this.sslOptions = sslOptions;
}

public String getProtocolVersion() {
return protocolVersion;
}

public void setProtocolVersion(String protocolVersion) {
this.protocolVersion = protocolVersion;
}

public String getDataCenter() {
return dataCenter;
}

public void setDataCenter(String dataCenter) {
this.dataCenter = dataCenter;
}

public int getLocalPoolSize() {
return localPoolSize;
}

public void setLocalPoolSize(int localPoolSize) {
this.localPoolSize = localPoolSize;
}

public int getRemotePoolSize() {
return remotePoolSize;
}

public void setRemotePoolSize(int remotePoolSize) {
this.remotePoolSize = remotePoolSize;
}

public void validate() throws IllegalArgumentException {
if (host == null || host.isEmpty()) {
throw new IllegalArgumentException("Host is required");
}
if (port == null || port.isEmpty()) {
throw new IllegalArgumentException("Port is required");
}
if (username == null || username.isEmpty()) {
throw new IllegalArgumentException("Username is required");
}
if (password == null || password.isEmpty()) {
throw new IllegalArgumentException("Password is required");
}
if (keyspace == null || keyspace.isEmpty()) {
throw new IllegalArgumentException("Keyspace is required");
}
}

@Override
public String toString() {
return "CassandraConfig{"
+ "host='" + host + '\''
+ ", port='" + port + '\''
+ ", username='" + username + '\''
+ ", password='" + password + '\''
+ ", keyspace='" + keyspace + '\''
+ ", consistencyLevel='" + consistencyLevel + '\''
+ ", sslOptions=" + sslOptions
+ ", protocolVersion='" + protocolVersion + '\''
+ ", dataCenter='" + dataCenter + '\''
+ ", localPoolSize=" + localPoolSize
+ ", remotePoolSize=" + remotePoolSize
+ '}';
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof CassandraConfig)) {
return false;
}
CassandraConfig cassandraConfig = (CassandraConfig) o;
return sslOptions == cassandraConfig.sslOptions
&& localPoolSize == cassandraConfig.localPoolSize
&& remotePoolSize == cassandraConfig.remotePoolSize
&& Objects.equals(host, cassandraConfig.host)
&& Objects.equals(port, cassandraConfig.port)
&& Objects.equals(username, cassandraConfig.username)
&& Objects.equals(password, cassandraConfig.password)
&& Objects.equals(keyspace, cassandraConfig.keyspace)
&& Objects.equals(consistencyLevel, cassandraConfig.consistencyLevel)
&& Objects.equals(protocolVersion, cassandraConfig.protocolVersion)
&& Objects.equals(dataCenter, cassandraConfig.dataCenter);
}

@Override
public int hashCode() {
return Objects.hash(
host,
port,
username,
password,
keyspace,
consistencyLevel,
sslOptions,
protocolVersion,
dataCenter,
localPoolSize,
remotePoolSize);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
//package com.google.cloud.teleport.v2.spanner.migrations.cassandra;
6 changes: 6 additions & 0 deletions v2/spanner-to-sourcedb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>4.17.0</version>
<scope>compile</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2023 Google LLC
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.google.cloud.teleport.v2.templates.dbutils.connection;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.api.core.config.DriverOption;
import com.datastax.oss.driver.api.core.config.ProgrammaticDriverConfigLoaderBuilder;
import com.datastax.oss.driver.api.core.config.TypedDriverOption;
import com.google.cloud.teleport.v2.spanner.migrations.cassandra.CassandraConfig;
import com.google.cloud.teleport.v2.templates.exceptions.ConnectionException;
import com.google.cloud.teleport.v2.templates.models.ConnectionHelperRequest;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import com.google.cloud.teleport.v2.templates.dbutils.connection.IConnectionHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CassandraConnectionHelper implements IConnectionHelper<CqlSession> {

private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectionHelper.class);
private static Map<String, CqlSession> connectionPoolMap = null;

@Override
public synchronized void init(ConnectionHelperRequest connectionHelperRequest) {
if (connectionPoolMap != null) {
return;
}
LOG.info("Initializing Cassandra connection pool with size: {}", connectionHelperRequest.getMaxConnections());
connectionPoolMap = new HashMap<>();
CassandraConfig cassandraConfig = connectionHelperRequest.getCassandraConfig();
cassandraConfig.validate();

CqlSessionBuilder builder = CqlSession.builder()
.addContactPoint(new InetSocketAddress(cassandraConfig.getHost(), Integer.parseInt(cassandraConfig.getPort())))
.withAuthCredentials(cassandraConfig.getUsername(), cassandraConfig.getPassword())
.withKeyspace(cassandraConfig.getKeyspace());

ProgrammaticDriverConfigLoaderBuilder configLoaderBuilder = DriverConfigLoader.programmaticBuilder();
configLoaderBuilder.withInt((DriverOption) TypedDriverOption.CONNECTION_POOL_LOCAL_SIZE, cassandraConfig.getLocalPoolSize());
configLoaderBuilder.withInt((DriverOption) TypedDriverOption.CONNECTION_POOL_REMOTE_SIZE, cassandraConfig.getRemotePoolSize());
builder.withConfigLoader(configLoaderBuilder.build());

CqlSession session = builder.build();
String connectionKey = cassandraConfig.getHost() + ":" + cassandraConfig.getPort() + "/" + cassandraConfig.getUsername();
connectionPoolMap.put(connectionKey, session);
}

@Override
public CqlSession getConnection(String connectionRequestKey) throws ConnectionException {
try {
if (connectionPoolMap == null) {
LOG.warn("Connection pool not initialized");
return null;
}
CqlSession session = connectionPoolMap.get(connectionRequestKey);
if (session == null) {
LOG.warn("Connection pool not found for source connection: {}", connectionRequestKey);
return null;
}
return session;
} catch (Exception e) {
throw new ConnectionException(e);
}
}

@Override
public boolean isConnectionPoolInitialized() {
return false;
}

// for unit testing
public void setConnectionPoolMap(Map<String, CqlSession> inputMap) {
connectionPoolMap = inputMap;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.google.cloud.teleport.v2.templates.dbutils.dao.source;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.google.cloud.teleport.v2.templates.dao.source.IDao;
import com.google.cloud.teleport.v2.templates.exceptions.ConnectionException;
import com.google.cloud.teleport.v2.templates.utils.connection.IConnectionHelper;

public class CassandraDao implements IDao<String> {
private final String cassandraUrl;
private final String cassandraUser;
private final IConnectionHelper<CqlSession> connectionHelper;

public CassandraDao(String cassandraUrl, String cassandraUser, IConnectionHelper<CqlSession> connectionHelper) {
this.cassandraUrl = cassandraUrl;
this.cassandraUser = cassandraUser;
this.connectionHelper = connectionHelper;
}

@Override
public void write(String cqlStatement) throws Exception {
CqlSession session = null;

try {
session = connectionHelper.getConnection(this.cassandraUrl + "/" + this.cassandraUser);
if (session == null) {
throw new ConnectionException("Connection is null");
}
SimpleStatement statement = SimpleStatement.newInstance(cqlStatement);
session.execute(statement);
} finally {
if (session != null) {
session.close();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.teleport.v2.templates.models;

import com.google.cloud.teleport.v2.spanner.migrations.cassandra.CassandraConfig;
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
import java.util.List;

Expand All @@ -38,6 +39,7 @@ public class ConnectionHelperRequest {
private int maxConnections;
private String driver;
private String connectionInitQuery;
private CassandraConfig cassandraConfig;

public List<Shard> getShards() {
return shards;
Expand All @@ -51,6 +53,10 @@ public int getMaxConnections() {
return maxConnections;
}

public CassandraConfig getCassandraConfig() {
return cassandraConfig;
}

public String getDriver() {
return driver;
}
Expand All @@ -71,4 +77,9 @@ public ConnectionHelperRequest(
this.driver = driver;
this.connectionInitQuery = connectionInitQuery;
}

public ConnectionHelperRequest(CassandraConfig cassandraConfig, int maxConnections) {
this.cassandraConfig = cassandraConfig;
this.maxConnections = maxConnections;
}
}
Loading