Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] DML Statement PR #3

Open
wants to merge 52 commits into
base: dev-repackaged
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
77e4f5a
minor changes
shreyakhajanchi Oct 29, 2024
6d88d89
minor changes
shreyakhajanchi Nov 4, 2024
171d33c
refactoring code
shreyakhajanchi Nov 10, 2024
a33f481
merge to master
shreyakhajanchi Nov 10, 2024
a00046e
addressing some comments
shreyakhajanchi Nov 11, 2024
18c8751
minor changes
shreyakhajanchi Nov 15, 2024
5409069
addressing comments
shreyakhajanchi Nov 20, 2024
c99a254
adding comments
shreyakhajanchi Nov 20, 2024
f561fd8
addressing some comments
shreyakhajanchi Nov 20, 2024
3e375ac
Merge branch 'GoogleCloudPlatform:main' into pr/2000
akashthawaitcc Nov 20, 2024
d557dcd
Merge branch 'GoogleCloudPlatform:main' into pr/2000
taherkl Nov 22, 2024
d94b665
cassandra dml generator added with typehandler
defector221 Nov 25, 2024
242ea3b
updated TypeHandler
defector221 Nov 26, 2024
a63b04b
updated TypeHandler
defector221 Nov 26, 2024
270633c
updated TypeHandler
defector221 Nov 26, 2024
b5c105b
updated TypeHandler
defector221 Nov 26, 2024
5c2a0aa
Added pipeline options for cassandra. Made changes to the required cl…
akashthawaitcc Nov 27, 2024
e379796
Added connection class for cassandra and created connection helper
akashthawaitcc Nov 27, 2024
d163bf6
Merge branch 'feature/cassandra-connection' into feature/modify-pipel…
akashthawaitcc Nov 27, 2024
e94c911
conflicts resolved
akashthawaitcc Nov 28, 2024
f27a2ae
fix conflict issues on JdbcConnectionHelper
akashthawaitcc Nov 28, 2024
d8004f0
removed blank spaces
akashthawaitcc Nov 28, 2024
a657469
Fixed import statement
akashthawaitcc Nov 28, 2024
4c0c4ff
Fixed import statement
akashthawaitcc Nov 28, 2024
32c1f52
Fixed import statement
akashthawaitcc Nov 28, 2024
5e39575
Removed unused file
akashthawaitcc Nov 28, 2024
b6a934b
removed blank spaces
akashthawaitcc Nov 28, 2024
ae1fd22
removed blank spaces and fix imports
akashthawaitcc Nov 28, 2024
ebb7988
refactored CassandraConnectionHelper class
akashthawaitcc Nov 28, 2024
aa0483c
fixed conflicts issues
akashthawaitcc Nov 28, 2024
4ae0b69
removed white spaces
akashthawaitcc Nov 28, 2024
e03ca36
removed white spaces
akashthawaitcc Nov 28, 2024
1c870c3
removed white spaces
akashthawaitcc Nov 28, 2024
fa33b48
removed conflict changes
akashthawaitcc Nov 28, 2024
7017a4f
resolved conflict changes
akashthawaitcc Nov 28, 2024
4b61fea
resolved conflict changes
akashthawaitcc Nov 28, 2024
d03c116
resolved conflict changes
akashthawaitcc Nov 28, 2024
779a168
resolved conflict changes
akashthawaitcc Nov 28, 2024
f18ea4d
reverse nerge
defector221 Nov 28, 2024
e1ed039
reverse nerge
defector221 Nov 28, 2024
c2714a4
Refector Shard
defector221 Dec 2, 2024
3cb1e41
Refector Shard
defector221 Dec 2, 2024
d679b2d
Refector Shard
defector221 Dec 2, 2024
51a8066
Refector Shard
defector221 Dec 2, 2024
de5faff
Refector Shard
defector221 Dec 2, 2024
cbedb19
Refector Shard
defector221 Dec 2, 2024
8b80f0f
Refector Shard
defector221 Dec 2, 2024
9489ce5
Refector Shard
defector221 Dec 2, 2024
c524c94
Refector Shard
defector221 Dec 2, 2024
8504e36
Refector Shard
defector221 Dec 2, 2024
33ef2b5
Refector Shard
defector221 Dec 2, 2024
63f13c9
Refector Shard
defector221 Dec 2, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@
@Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class})
@TemplateIntegrationTest(DataStreamToSpanner.class)
@RunWith(JUnit4.class)
public class DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT
public class DataStreamToSpannerShardedMigrationWithMigrationMySqlShardIdColumnIT
extends DataStreamToSpannerITBase {

private static final Logger LOG =
LoggerFactory.getLogger(
DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT.class);
DataStreamToSpannerShardedMigrationWithMigrationMySqlShardIdColumnIT.class);

private static final String TABLE = "Users";
private static final String MOVIE_TABLE = "Movie";
Expand All @@ -75,7 +75,7 @@ public class DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT
private static final String SPANNER_DDL_RESOURCE =
"DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT/spanner-schema.sql";

private static HashSet<DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT>
private static HashSet<DataStreamToSpannerShardedMigrationWithMigrationMySqlShardIdColumnIT>
testInstances = new HashSet<>();
private static PipelineLauncher.LaunchInfo jobInfo1;
private static PipelineLauncher.LaunchInfo jobInfo2;
Expand All @@ -92,7 +92,7 @@ public class DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT
public void setUp() throws IOException, InterruptedException {
// Prevent cleaning up of dataflow job after a test method is executed.
skipBaseCleanup = true;
synchronized (DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT.class) {
synchronized (DataStreamToSpannerShardedMigrationWithMigrationMySqlShardIdColumnIT.class) {
testInstances.add(this);
if (spannerResourceManager == null) {
spannerResourceManager = setUpSpannerResourceManager();
Expand Down Expand Up @@ -150,7 +150,7 @@ public void setUp() throws IOException, InterruptedException {
*/
@AfterClass
public static void cleanUp() throws IOException {
for (DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT instance : testInstances) {
for (DataStreamToSpannerShardedMigrationWithMigrationMySqlShardIdColumnIT instance : testInstances) {
instance.tearDownBase();
}
ResourceManagerUtils.cleanResources(spannerResourceManager, pubsubResourceManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,17 @@
@Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class})
@TemplateIntegrationTest(DataStreamToSpanner.class)
@RunWith(JUnit4.class)
public class DataStreamToSpannerShardedMigrationWithoutMigrationShardIdColumnIT
public class DataStreamToSpannerShardedMigrationWithoutMigrationMySqlShardIdColumnIT
extends DataStreamToSpannerITBase {
private static final Logger LOG =
LoggerFactory.getLogger(
DataStreamToSpannerShardedMigrationWithoutMigrationShardIdColumnIT.class);
DataStreamToSpannerShardedMigrationWithoutMigrationMySqlShardIdColumnIT.class);

private static final String TABLE = "Users";
private static final String SPANNER_DDL_RESOURCE =
"DataStreamToSpannerShardedMigrationWithoutMigrationShardIdColumnIT/spanner-schema.sql";

private static HashSet<DataStreamToSpannerShardedMigrationWithoutMigrationShardIdColumnIT>
private static HashSet<DataStreamToSpannerShardedMigrationWithoutMigrationMySqlShardIdColumnIT>
testInstances = new HashSet<>();
private static PipelineLauncher.LaunchInfo jobInfo1;
private static PipelineLauncher.LaunchInfo jobInfo2;
Expand All @@ -78,7 +78,7 @@ public class DataStreamToSpannerShardedMigrationWithoutMigrationShardIdColumnIT
public void setUp() throws IOException {
// Prevent cleaning up of dataflow job after a test method is executed.
skipBaseCleanup = true;
synchronized (DataStreamToSpannerShardedMigrationWithoutMigrationShardIdColumnIT.class) {
synchronized (DataStreamToSpannerShardedMigrationWithoutMigrationMySqlShardIdColumnIT.class) {
testInstances.add(this);
if (spannerResourceManager == null) {
spannerResourceManager = setUpSpannerResourceManager();
Expand Down Expand Up @@ -131,7 +131,7 @@ public void setUp() throws IOException {
*/
@AfterClass
public static void cleanUp() throws IOException {
for (DataStreamToSpannerShardedMigrationWithoutMigrationShardIdColumnIT instance :
for (DataStreamToSpannerShardedMigrationWithoutMigrationMySqlShardIdColumnIT instance :
testInstances) {
instance.tearDownBase();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public class DatastreamToSpannerSingleDFShardedMigrationIT extends DataStreamToS
public void setUp() throws IOException, InterruptedException {
// Prevent cleaning up of dataflow job after a test method is executed.
skipBaseCleanup = true;
synchronized (DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT.class) {
synchronized (DataStreamToSpannerShardedMigrationWithMigrationMySqlShardIdColumnIT.class) {
testInstances.add(this);
if (spannerResourceManager == null) {
spannerResourceManager = setUpSpannerResourceManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@
package com.google.cloud.teleport.v2.templates.common;

import com.google.cloud.teleport.v2.spanner.migrations.schema.Schema;
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
import com.google.cloud.teleport.v2.spanner.migrations.shard.IShard;

import java.io.Serializable;
import java.util.Objects;
import org.joda.time.Duration;

/** Each worker task context. */
public class ProcessingContext implements Serializable {

private Shard shard;
private IShard iShard;
private Schema schema;

private String sourceDbTimezoneOffset;
Expand All @@ -34,14 +35,14 @@ public class ProcessingContext implements Serializable {
private String runId;

public ProcessingContext(
Shard shard,
IShard iShard,
Schema schema,
String sourceDbTimezoneOffset,
String startTimestamp,
Duration windowDuration,
String gcsPath,
String runId) {
this.shard = shard;
this.iShard = iShard;
this.schema = schema;
this.sourceDbTimezoneOffset = sourceDbTimezoneOffset;
this.startTimestamp = startTimestamp;
Expand All @@ -50,8 +51,8 @@ public ProcessingContext(
this.runId = runId;
}

public Shard getShard() {
return shard;
public IShard getShard() {
return iShard;
}

public Schema getSchema() {
Expand Down Expand Up @@ -85,8 +86,8 @@ public String getRunId() {
@Override
public String toString() {

return "{ Shard details :"
+ shard.toString()
return "{ IShard details :"
+ iShard.toString()
+ " sourceDbTimezoneOffset: "
+ sourceDbTimezoneOffset
+ " startTimestamp: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ public class Constants {
/** Run mode - resumeAll. */
public static final String RUN_MODE_RESUME_ALL = "resumeAll";

/** Shard progress status - success. */
/** IShard progress status - success. */
public static final String SHARD_PROGRESS_STATUS_SUCCESS = "SUCCESS";

/** Shard progress status - error. */
/** IShard progress status - error. */
public static final String SHARD_PROGRESS_STATUS_ERROR = "ERROR";

/** Shard progress status - reprocess. */
/** IShard progress status - reprocess. */
public static final String SHARD_PROGRESS_STATUS_REPROCESS = "REPROCESS";
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public static String process(
List<TrimmedShardedDataChangeRecord> records = inputFileReader.getRecords();
Instant readEndTime = Instant.now();
LOG.info(
"Shard "
"IShard "
+ shardId
+ ": read "
+ records.size()
Expand All @@ -82,7 +82,7 @@ public static String process(
MySqlDao dao =
new DaoFactory(
connectString,
taskContext.getShard().getUserName(),
taskContext.getShard().getUser(),
taskContext.getShard().getPassword())
.getMySqlDao(shardId);

Expand All @@ -101,7 +101,7 @@ public static String process(
markShardSuccess(taskContext, spannerDao, fileProcessedStartInterval);
dao.cleanup();
LOG.info(
"Shard " + shardId + ": Successfully processed batch of " + records.size() + " records.");
"IShard " + shardId + ": Successfully processed batch of " + records.size() + " records.");
} catch (Exception e) {
Metrics.counter(GCSToSourceStreamingHandler.class, "shard_failed_" + shardId).inc();
markShardFailure(taskContext, spannerDao, fileProcessedStartInterval);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public static void processRecords(
dao.batchWrite(dmlBatch);
Instant daoEndTime = Instant.now();
LOG.info(
"Shard "
"MySqlShard "
+ shardId
+ ": Write to mysql for "
+ recordList.size()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public void onExpiry(
@StateId("stopProcessing") ValueState<Boolean> stopProcessing) {
String shardId = keyString.read();
LOG.info(
"Shard " + shardId + ": started timer processing for expiry time: " + context.timestamp());
"IShard " + shardId + ": started timer processing for expiry time: " + context.timestamp());
ProcessingContext taskContext = processingContext.read();
Boolean failedShard = stopProcessing.read();
if (failedShard != null && failedShard) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import com.google.cloud.teleport.v2.spanner.migrations.metadata.SpannerToGcsJobMetadata;
import com.google.cloud.teleport.v2.spanner.migrations.schema.Schema;
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
import com.google.cloud.teleport.v2.spanner.migrations.shard.IShard;
import com.google.cloud.teleport.v2.spanner.migrations.utils.SecretManagerAccessorImpl;
import com.google.cloud.teleport.v2.spanner.migrations.utils.SessionFileReader;
import com.google.cloud.teleport.v2.spanner.migrations.utils.ShardFileReader;
Expand Down Expand Up @@ -66,7 +66,7 @@ public static Map<String, ProcessingContext> getProcessingContextForGCS(

Schema schema = SessionFileReader.read(sessionFilePath);
ShardFileReader shardFileReader = new ShardFileReader(new SecretManagerAccessorImpl());
List<Shard> shards = shardFileReader.getOrderedShardDetails(sourceShardsFilePath);
List<IShard> iShards = shardFileReader.getOrderedShardDetails(sourceShardsFilePath);

ShardProgressTracker shardProgressTracker =
new ShardProgressTracker(
Expand All @@ -92,7 +92,7 @@ public static Map<String, ProcessingContext> getProcessingContextForGCS(
metadataDatabase,
tableSuffix,
runId,
shards,
iShards,
schema,
isMetadataDbPostgres);
} else {
Expand All @@ -105,7 +105,7 @@ public static Map<String, ProcessingContext> getProcessingContextForGCS(
metadataDatabase,
tableSuffix,
runId,
shards,
iShards,
schema,
shardProgressTracker,
runMode,
Expand All @@ -126,7 +126,7 @@ private static Map<String, ProcessingContext> getProcessingContextForRegularMode
String metadataDatabase,
String tableSuffix,
String runId,
List<Shard> shards,
List<IShard> iShards,
Schema schema,
boolean isMetadataDbPostgres) {

Expand Down Expand Up @@ -157,18 +157,18 @@ private static Map<String, ProcessingContext> getProcessingContextForRegularMode

Duration duration = DurationUtils.parseDuration(windowDuration);

for (Shard shard : shards) {
LOG.info(" The sorted shard is: {}", shard);
for (IShard iShard : iShards) {
LOG.info(" The sorted IShards is: {}", iShard);
ProcessingContext taskContext =
new ProcessingContext(
shard,
iShard,
schema,
sourceDbTimezoneOffset,
startTimestamp,
duration,
gcsInputDirectoryPath,
runId);
response.put(shard.getLogicalShardId(), taskContext);
response.put(iShard.getLogicalShardId(), taskContext);
}

return response;
Expand All @@ -182,7 +182,7 @@ private static Map<String, ProcessingContext> getProcessingContextForReprocessOr
String metadataDatabase,
String tableSuffix,
String runId,
List<Shard> shards,
List<IShard> iShards,
Schema schema,
ShardProgressTracker shardProgressTracker,
String runMode,
Expand Down Expand Up @@ -228,9 +228,9 @@ private static Map<String, ProcessingContext> getProcessingContextForReprocessOr

Duration duration = DurationUtils.parseDuration(windowDuration);

for (Shard shard : shards) {
LOG.info(" The sorted shard is: {}", shard);
ShardProgress shardProgress = shardProgressList.get(shard.getLogicalShardId());
for (IShard iShard : iShards) {
LOG.info(" The sorted shards is: {}", iShard);
ShardProgress shardProgress = shardProgressList.get(iShard.getLogicalShardId());

String shardStartTime = null;
if (shardProgress != null) {
Expand All @@ -239,27 +239,27 @@ private static Map<String, ProcessingContext> getProcessingContextForReprocessOr
if ((runMode.equals(Constants.RUN_MODE_RESUME_SUCCESS)
|| runMode.equals(Constants.RUN_MODE_RESUME_ALL))
&& shardProgress.getStatus().equals(Constants.SHARD_PROGRESS_STATUS_SUCCESS)) {
// Advance the start time by window duration for successful shards
// Advance the start time by window duration for successful Shards
shardStartTimeInst = shardStartTimeInst.plus(duration);
}
shardStartTime = shardStartTimeInst.toString();

LOG.info(" The startTime for shard {} is : {}", shard, shardStartTime);
LOG.info(" The startTime for shards {} is : {}", iShard, shardStartTime);
} else {
LOG.info(
" Skipping shard: {} as it does not qualify for given runMode {}.", shard, runMode);
" Skipping shards: {} as it does not qualify for given runMode {}.", iShard, runMode);
continue;
}
ProcessingContext taskContext =
new ProcessingContext(
shard,
iShard,
schema,
sourceDbTimezoneOffset,
shardStartTime,
duration,
gcsInputDirectoryPath,
runId);
response.put(shard.getLogicalShardId(), taskContext);
response.put(iShard.getLogicalShardId(), taskContext);
}
return response;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import com.google.cloud.spanner.Mutation;
import com.google.cloud.teleport.metadata.SkipDirectRunnerTest;
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
import com.google.cloud.teleport.v2.spanner.migrations.shard.MySqlShard;
import com.google.common.io.Resources;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
Expand Down Expand Up @@ -410,19 +410,19 @@ private void launchReaderDataflowJob() throws IOException {
private void createAndUploadShardConfigToGcs(
GcsResourceManager gcsResourceManager, MySQLResourceManager jdbcResourceManager)
throws IOException {
Shard shard = new Shard();
shard.setLogicalShardId("Shard1");
shard.setUser(jdbcResourceManager.getUsername());
shard.setHost(jdbcResourceManager.getHost());
shard.setPassword(jdbcResourceManager.getPassword());
shard.setPort(String.valueOf(jdbcResourceManager.getPort()));
shard.setDbName(jdbcResourceManager.getDatabaseName());
JsonObject jsObj = (JsonObject) new Gson().toJsonTree(shard).getAsJsonObject();
MySqlShard mySqlShard = new MySqlShard();
mySqlShard.setLogicalShardId("Shard1");
mySqlShard.setUser(jdbcResourceManager.getUsername());
mySqlShard.setHost(jdbcResourceManager.getHost());
mySqlShard.setPassword(jdbcResourceManager.getPassword());
mySqlShard.setPort(String.valueOf(jdbcResourceManager.getPort()));
mySqlShard.setDbName(jdbcResourceManager.getDatabaseName());
JsonObject jsObj = (JsonObject) new Gson().toJsonTree(mySqlShard).getAsJsonObject();
jsObj.remove("secretManagerUri"); // remove field secretManagerUri
JsonArray ja = new JsonArray();
ja.add(jsObj);
String shardFileContents = ja.toString();
LOG.info("Shard file contents: {}", shardFileContents);
gcsResourceManager.createArtifact("input/shard.json", shardFileContents);
LOG.info("MySqlShard file contents: {}", shardFileContents);
gcsResourceManager.createArtifact("input/mySqlShard.json", shardFileContents);
}
}
Loading
Loading