Skip to content

Commit 37def7a

Browse files
Cassandra pr bug fixes (#57)
1 parent 4929b58 commit 37def7a

File tree

4 files changed

+5
-66
lines changed

4 files changed

+5
-66
lines changed

v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/schema/Schema.java

-7
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,6 @@ public Schema() {
6666
this.empty = true;
6767
}
6868

69-
public Schema(Map<String, SpannerTable> spSchema, Map<String, SourceTable> srcSchema) {
70-
this.spSchema = spSchema;
71-
this.srcSchema = srcSchema;
72-
this.syntheticPKeys = new HashMap<String, SyntheticPKey>();
73-
this.empty = (spSchema == null || srcSchema == null);
74-
}
75-
7669
public Schema(
7770
Map<String, SpannerTable> spSchema,
7871
Map<String, SyntheticPKey> syntheticPKeys,

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dao/source/CassandraDao.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919
import com.datastax.oss.driver.api.core.cql.BoundStatement;
2020
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
2121
import com.google.cloud.teleport.v2.templates.dbutils.connection.IConnectionHelper;
22+
import com.google.cloud.teleport.v2.templates.dbutils.dml.CassandraTypeHandler;
2223
import com.google.cloud.teleport.v2.templates.exceptions.ConnectionException;
2324
import com.google.cloud.teleport.v2.templates.models.DMLGeneratorResponse;
2425
import com.google.cloud.teleport.v2.templates.models.PreparedStatementGeneratedResponse;
25-
import com.google.cloud.teleport.v2.templates.models.PreparedStatementValueObject;
2626

2727
public class CassandraDao implements IDao<DMLGeneratorResponse> {
2828
private final String cassandraUrl;
@@ -51,7 +51,7 @@ public void write(DMLGeneratorResponse dmlGeneratorResponse) throws Exception {
5151
BoundStatement boundStatement =
5252
preparedStatement.bind(
5353
preparedStatementGeneratedResponse.getValues().stream()
54-
.map(PreparedStatementValueObject::value)
54+
.map(v -> CassandraTypeHandler.castToExpectedType(v.dataType(), v.value()))
5555
.toArray());
5656
session.execute(boundStatement);
5757
}

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dml/CassandraTypeHandler.java

+1-54
Original file line numberDiff line numberDiff line change
@@ -85,59 +85,6 @@ public String toString() {
8585
}
8686
}
8787

88-
/**
89-
* Functional interface for parsing an object value to a specific type.
90-
*
91-
* <p>This interface provides a contract to implement type conversion logic where an input object
92-
* is parsed and transformed into the desired target type.
93-
*
94-
* <p>Example usage:
95-
*
96-
* <pre>{@code
97-
* TypeParser<Integer> intParser = value -> Integer.parseInt(value.toString());
98-
* Integer parsedValue = intParser.parse("123");
99-
* }</pre>
100-
*
101-
* @param <T> The target type to which the value will be parsed.
102-
*/
103-
@FunctionalInterface
104-
public interface TypeParser<T> {
105-
106-
/**
107-
* Parses the given value and converts it into the target type {@code T}.
108-
*
109-
* @param value The input value to be parsed.
110-
* @return The parsed value of type {@code T}.
111-
*/
112-
T parse(Object value);
113-
}
114-
115-
/**
116-
* Functional interface for supplying a value with exception handling.
117-
*
118-
* <p>This interface provides a mechanism to execute logic that may throw a checked exception,
119-
* making it useful for methods where exception handling is required.
120-
*
121-
* <p>Example usage:
122-
*
123-
* <pre>{@code
124-
* HandlerSupplier<String> supplier = () -> {
125-
* if (someCondition) {
126-
* throw new IOException("Error occurred");
127-
* }
128-
* return "Success";
129-
* };
130-
*
131-
* try {
132-
* String result = supplier.get();
133-
* System.out.println(result);
134-
* } catch (Exception e) {
135-
* e.printStackTrace();
136-
* }
137-
* }</pre>
138-
*
139-
* @param <T> The type of value supplied by the supplier.
140-
*/
14188
@FunctionalInterface
14289
private interface HandlerSupplier<T> {
14390

@@ -702,7 +649,7 @@ private static Object handleSpannerColumnType(
702649

703650
default:
704651
LOG.warn("Unsupported Spanner column type: {}", spannerType);
705-
return null;
652+
throw new IllegalArgumentException("Unsupported Spanner column type: " + spannerType);
706653
}
707654
}
708655

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/transforms/AssignShardIdFn.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,6 @@ public void processElement(ProcessContext c) throws Exception {
181181
String qualifiedShard = "";
182182
String tableName = record.getTableName();
183183
String keysJsonStr = record.getMod().getKeysJson();
184-
long finalKey;
185184

186185
try {
187186
if (shardingMode.equals(Constants.SHARDING_MODE_SINGLE_SHARD)) {
@@ -232,7 +231,7 @@ public void processElement(ProcessContext c) throws Exception {
232231

233232
record.setShard(qualifiedShard);
234233
String finalKeyString = tableName + "_" + keysJsonStr + "_" + qualifiedShard;
235-
finalKey = finalKeyString.hashCode() % maxConnectionsAcrossAllShards;
234+
Long finalKey = finalKeyString.hashCode() % maxConnectionsAcrossAllShards;
236235
c.output(KV.of(finalKey, record));
237236

238237
} catch (Exception e) {
@@ -241,7 +240,7 @@ public void processElement(ProcessContext c) throws Exception {
241240
LOG.error("Error fetching shard Id column: " + e.getMessage() + ": " + errors.toString());
242241
// The record has no shard hence will be sent to DLQ in subsequent steps
243242
String finalKeyString = record.getTableName() + "_" + keysJsonStr + "_" + skipDirName;
244-
finalKey = finalKeyString.hashCode() % maxConnectionsAcrossAllShards;
243+
Long finalKey = finalKeyString.hashCode() % maxConnectionsAcrossAllShards;
245244
c.output(KV.of(finalKey, record));
246245
}
247246
}

0 commit comments

Comments
 (0)