Skip to content

Commit f0a48cf

Browse files
Added Logger and fixes
1 parent 5be7510 commit f0a48cf

File tree

7 files changed

+33
-30
lines changed

7 files changed

+33
-30
lines changed

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDb.java

+9-5
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,7 @@ public static void main(String[] args) {
451451
* @return The result of the pipeline execution.
452452
*/
453453
public static PipelineResult run(Options options) {
454-
454+
LOG.info("Going to Start The Pipeline");
455455
Pipeline pipeline = Pipeline.create(options);
456456
pipeline
457457
.getOptions()
@@ -481,7 +481,7 @@ public static PipelineResult run(Options options) {
481481
+ " database connections than desired. Either reduce the max allowed workers or"
482482
+ " incease the max shard connections");
483483
}
484-
484+
LOG.info("Going to read schema for the source " + options.getSourceType());
485485
// Read the session file for Mysql Only
486486
Schema schema =
487487
MYSQL_SOURCE_TYPE.equals(options.getSourceType())
@@ -495,7 +495,7 @@ public static PipelineResult run(Options options) {
495495
.withProjectId(ValueProvider.StaticValueProvider.of(options.getSpannerProjectId()))
496496
.withInstanceId(ValueProvider.StaticValueProvider.of(options.getInstanceId()))
497497
.withDatabaseId(ValueProvider.StaticValueProvider.of(options.getDatabaseId()));
498-
498+
LOG.info("Spanner Config Generation Completed Successfully");
499499
// Create shadow tables
500500
// Note that there is a limit on the number of tables that can be created per DB: 5000.
501501
// If we create shadow tables per shard, there will be an explosion of tables.
@@ -509,6 +509,8 @@ public static PipelineResult run(Options options) {
509509
.withInstanceId(ValueProvider.StaticValueProvider.of(options.getMetadataInstance()))
510510
.withDatabaseId(ValueProvider.StaticValueProvider.of(options.getMetadataDatabase()));
511511

512+
LOG.info("Spanner Config Metadata Generation Completed Successfully");
513+
512514
ShadowTableCreator shadowTableCreator =
513515
new ShadowTableCreator(
514516
spannerConfig,
@@ -520,7 +522,7 @@ public static PipelineResult run(Options options) {
520522
spannerMetadataConfig.getDatabaseId().get())
521523
.getDialect(),
522524
options.getShadowTablePrefix());
523-
525+
LOG.info("Shadow Table Generation Completed Successfully");
524526
DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class);
525527

526528
shadowTableCreator.createShadowTablesInSpanner();
@@ -547,8 +549,9 @@ public static PipelineResult run(Options options) {
547549
"Logical shard id was not found, hence setting it to : " + Constants.DEFAULT_SHARD_ID);
548550
}
549551
}
550-
552+
LOG.info("Shards Generation Completed Successfully");
551553
if (options.getSourceType().equals(CASSANDRA_SOURCE_TYPE)) {
554+
LOG.info("Going to Generate Schema for Cassandra");
552555
Map<String, SpannerTable> spannerTableMap =
553556
SpannerSchema.convertDDLTableToSpannerTable(ddl.allTables());
554557
Map<String, NameAndCols> spannerTableNameColsMap =
@@ -568,6 +571,7 @@ public static PipelineResult run(Options options) {
568571
spannerTableNameColsMap,
569572
cassandraSourceMetadata.getNameAndColsMap());
570573
} catch (Exception e) {
574+
LOG.error("Exception while Generate Schema for Cassandra");
571575
throw new IllegalArgumentException(e);
572576
}
573577
}

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dml/CassandraDMLGenerator.java

+10-10
Original file line numberDiff line numberDiff line change
@@ -48,45 +48,45 @@ public class CassandraDMLGenerator implements IDMLGenerator {
4848
@Override
4949
public DMLGeneratorResponse getDMLStatement(DMLGeneratorRequest dmlGeneratorRequest) {
5050
if (dmlGeneratorRequest == null) {
51-
LOG.warn("DMLGeneratorRequest is null. Cannot process the request.");
51+
LOG.info("DMLGeneratorRequest is null. Cannot process the request.");
5252
return new DMLGeneratorResponse("");
5353
}
54-
54+
LOG.info("DMLGeneratorRequest is not null, process the request.");
5555
String spannerTableName = dmlGeneratorRequest.getSpannerTableName();
5656
Schema schema = dmlGeneratorRequest.getSchema();
5757

5858
if (schema == null
5959
|| schema.getSpannerToID() == null
6060
|| schema.getSpSchema() == null
6161
|| schema.getSrcSchema() == null) {
62-
LOG.warn("Schema is invalid or incomplete for table: {}", spannerTableName);
62+
LOG.info("Schema is invalid or incomplete for table: {}", spannerTableName);
6363
return new DMLGeneratorResponse("");
6464
}
6565

6666
NameAndCols tableMapping = schema.getSpannerToID().get(spannerTableName);
6767
if (tableMapping == null) {
68-
LOG.warn(
68+
LOG.info(
6969
"Spanner table {} not found in session file. Dropping the record.", spannerTableName);
7070
return new DMLGeneratorResponse("");
7171
}
7272

7373
String spannerTableId = tableMapping.getName();
7474
SpannerTable spannerTable = schema.getSpSchema().get(spannerTableId);
7575
if (spannerTable == null) {
76-
LOG.warn(
76+
LOG.info(
7777
"Spanner table {} not found in session file. Dropping the record.", spannerTableName);
7878
return new DMLGeneratorResponse("");
7979
}
8080

8181
SourceTable sourceTable = schema.getSrcSchema().get(spannerTableId);
8282
if (sourceTable == null) {
83-
LOG.warn(
83+
LOG.info(
8484
"Source table {} not found for Spanner table ID: {}", spannerTableName, spannerTableId);
8585
return new DMLGeneratorResponse("");
8686
}
8787

8888
if (sourceTable.getPrimaryKeys() == null || sourceTable.getPrimaryKeys().length == 0) {
89-
LOG.warn(
89+
LOG.info(
9090
"Cannot reverse replicate table {} without primary key. Skipping the record.",
9191
sourceTable.getName());
9292
return new DMLGeneratorResponse("");
@@ -100,7 +100,7 @@ public DMLGeneratorResponse getDMLStatement(DMLGeneratorRequest dmlGeneratorRequ
100100
dmlGeneratorRequest.getKeyValuesJson(),
101101
dmlGeneratorRequest.getSourceDbTimezoneOffset());
102102
if (pkColumnNameValues == null) {
103-
LOG.warn(
103+
LOG.info(
104104
"Failed to generate primary key values for table {}. Skipping the record.",
105105
sourceTable.getName());
106106
return new DMLGeneratorResponse("");
@@ -312,7 +312,7 @@ private static Map<String, PreparedStatementValueObject<?>> getPkColumnValues(
312312
SourceColumnDefinition sourceColDef = sourceTable.getColDefs().get(colId);
313313
SpannerColumnDefinition spannerColDef = spannerTable.getColDefs().get(colId);
314314
if (spannerColDef == null) {
315-
LOG.warn(
315+
LOG.info(
316316
"The corresponding primary key column {} was not found in Spanner",
317317
sourceColDef.getName());
318318
return null;
@@ -336,7 +336,7 @@ private static Map<String, PreparedStatementValueObject<?>> getPkColumnValues(
336336
getMappedColumnValue(
337337
spannerColDef, sourceColDef, newValuesJson, sourceDbTimezoneOffset);
338338
} else {
339-
LOG.warn("The column {} was not found in input record", spannerColumnName);
339+
LOG.info("The column {} was not found in input record", spannerColumnName);
340340
return null;
341341
}
342342

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dml/CassandraTypeHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1323,7 +1323,7 @@ public static PreparedStatementValueObject<?> getColumnValueByType(
13231323
Object columnValue = handleSpannerColumnType(spannerType, columnName, valuesJson);
13241324

13251325
if (columnValue == null) {
1326-
LOG.warn("Column value is null for column: {}, type: {}", columnName, spannerType);
1326+
LOG.info("Column value is null for column: {}, type: {}", columnName, spannerType);
13271327
return PreparedStatementValueObject.create(cassandraType, null);
13281328
}
13291329
return PreparedStatementValueObject.create(cassandraType, columnValue);

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/processor/InputRecordProcessor.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -60,15 +60,15 @@ public static boolean processRecord(
6060
throws Exception {
6161

6262
try {
63-
63+
LOG.info("Input Recodr is going to Process");
6464
String tableName = spannerRecord.getTableName();
6565
String modType = spannerRecord.getModType().name();
6666
String keysJsonStr = spannerRecord.getMod().getKeysJson();
6767
String newValueJsonStr = spannerRecord.getMod().getNewValuesJson();
6868
JSONObject newValuesJson = new JSONObject(newValueJsonStr);
6969
JSONObject keysJson = new JSONObject(keysJsonStr);
7070
Map<String, Object> customTransformationResponse = null;
71-
71+
LOG.info("Input Recodr is going to Process for " + tableName);
7272
if (spannerToSourceTransformer != null) {
7373
org.joda.time.Instant startTimestamp = org.joda.time.Instant.now();
7474
Map<String, Object> mapRequest =

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/processor/SourceProcessorFactory.java

+2-12
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,11 @@
2424
import com.google.cloud.teleport.v2.templates.dbutils.dao.source.CassandraDao;
2525
import com.google.cloud.teleport.v2.templates.dbutils.dao.source.IDao;
2626
import com.google.cloud.teleport.v2.templates.dbutils.dao.source.JdbcDao;
27+
import com.google.cloud.teleport.v2.templates.dbutils.dml.CassandraDMLGenerator;
2728
import com.google.cloud.teleport.v2.templates.dbutils.dml.IDMLGenerator;
2829
import com.google.cloud.teleport.v2.templates.dbutils.dml.MySQLDMLGenerator;
2930
import com.google.cloud.teleport.v2.templates.exceptions.UnsupportedSourceException;
3031
import com.google.cloud.teleport.v2.templates.models.ConnectionHelperRequest;
31-
import com.google.cloud.teleport.v2.templates.models.DMLGeneratorRequest;
32-
import com.google.cloud.teleport.v2.templates.models.DMLGeneratorResponse;
3332
import java.util.HashMap;
3433
import java.util.List;
3534
import java.util.Map;
@@ -54,16 +53,7 @@ public class SourceProcessorFactory {
5453

5554
static {
5655
dmlGeneratorMap.put(Constants.SOURCE_MYSQL, new MySQLDMLGenerator());
57-
dmlGeneratorMap.put(
58-
Constants.SOURCE_CASSANDRA,
59-
new IDMLGenerator() {
60-
// TODO It will get removed in DML PR added Now for Test case eg: new
61-
// CassandraDMLGenerator()
62-
@Override
63-
public DMLGeneratorResponse getDMLStatement(DMLGeneratorRequest dmlGeneratorRequest) {
64-
return new DMLGeneratorResponse("");
65-
}
66-
});
56+
dmlGeneratorMap.put(Constants.SOURCE_CASSANDRA, new CassandraDMLGenerator());
6757

6858
connectionHelperMap.put(Constants.SOURCE_MYSQL, new JdbcConnectionHelper());
6959
connectionHelperMap.put(Constants.SOURCE_CASSANDRA, new CassandraConnectionHelper());

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/transforms/AssignShardIdFn.java

+2
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ public void teardown() {
177177
*/
178178
@ProcessElement
179179
public void processElement(ProcessContext c) throws Exception {
180+
LOG.info("Going to Process Element for Assign Shard ID");
180181
TrimmedShardedDataChangeRecord record = new TrimmedShardedDataChangeRecord(c.element());
181182
String qualifiedShard = "";
182183
String tableName = record.getTableName();
@@ -233,6 +234,7 @@ public void processElement(ProcessContext c) throws Exception {
233234
record.setShard(qualifiedShard);
234235
String finalKeyString = tableName + "_" + keysJsonStr + "_" + qualifiedShard;
235236
finalKey = finalKeyString.hashCode() % maxConnectionsAcrossAllShards;
237+
LOG.info(" Generated Final Key for Assign Shard Id " + finalKey + " " + finalKeyString);
236238
c.output(KV.of(finalKey, record));
237239

238240
} catch (Exception e) {

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/transforms/SourceWriterFn.java

+7
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ public void setSpannerToSourceTransformer(
144144
/** Setup function connects to Cloud Spanner. */
145145
@Setup
146146
public void setup() throws UnsupportedSourceException {
147+
LOG.info("Applying Source Written Function");
147148
mapper = new ObjectMapper();
148149
mapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
149150
sourceProcessor =
@@ -162,20 +163,25 @@ public void teardown() throws Exception {
162163

163164
@ProcessElement
164165
public void processElement(ProcessContext c) {
166+
LOG.info("Process Element for Source Written Function");
165167
KV<Long, TrimmedShardedDataChangeRecord> element = c.element();
166168
TrimmedShardedDataChangeRecord spannerRec = element.getValue();
167169
String shardId = spannerRec.getShard();
170+
LOG.info("shardId " + shardId);
168171
if (shardId == null) {
172+
LOG.info("Skipping becasue of Shard is null");
169173
// no shard found, move to permanent error
170174
outputWithTag(
171175
c, Constants.PERMANENT_ERROR_TAG, Constants.SHARD_NOT_PRESENT_ERROR_MESSAGE, spannerRec);
172176
} else if (shardId.equals(skipDirName)) {
177+
LOG.info("Skipping because It is under Skip Dir Name");
173178
// the record is skipped
174179
skippedRecordCountMetric.inc();
175180
outputWithTag(c, Constants.SKIPPED_TAG, Constants.SKIPPED_TAG_MESSAGE, spannerRec);
176181
} else {
177182
// Get the latest commit timestamp processed at source
178183
try {
184+
LOG.info("Executing Record");
179185
JsonNode keysJson = mapper.readTree(spannerRec.getMod().getKeysJson());
180186
String tableName = spannerRec.getTableName();
181187
com.google.cloud.spanner.Key primaryKey =
@@ -199,6 +205,7 @@ public void processElement(ProcessContext c) {
199205
== 0
200206
&& shadowTableRecord.getRecordSequence()
201207
> Long.parseLong(spannerRec.getRecordSequence())));
208+
LOG.info("isSourceAhead " + isSourceAhead);
202209

203210
if (!isSourceAhead) {
204211
IDao sourceDao = sourceProcessor.getSourceDao(shardId);

0 commit comments

Comments
 (0)