Skip to content

Commit

Permalink
Added Logger and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
pawankashyapollion committed Jan 10, 2025
1 parent d5cc433 commit d5ed2aa
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ public static void main(String[] args) {
* @return The result of the pipeline execution.
*/
public static PipelineResult run(Options options) {

LOG.info("Going to Start The Pipeline");
Pipeline pipeline = Pipeline.create(options);
pipeline
.getOptions()
Expand Down Expand Up @@ -481,7 +481,7 @@ public static PipelineResult run(Options options) {
+ " database connections than desired. Either reduce the max allowed workers or"
+ " incease the max shard connections");
}

LOG.info("Going to read schema for the source " + options.getSourceType());
// Read the session file for Mysql Only
Schema schema =
MYSQL_SOURCE_TYPE.equals(options.getSourceType())
Expand All @@ -495,7 +495,7 @@ public static PipelineResult run(Options options) {
.withProjectId(ValueProvider.StaticValueProvider.of(options.getSpannerProjectId()))
.withInstanceId(ValueProvider.StaticValueProvider.of(options.getInstanceId()))
.withDatabaseId(ValueProvider.StaticValueProvider.of(options.getDatabaseId()));

LOG.info("Spanner Config Generation Completed Successfully");
// Create shadow tables
// Note that there is a limit on the number of tables that can be created per DB: 5000.
// If we create shadow tables per shard, there will be an explosion of tables.
Expand All @@ -509,6 +509,8 @@ public static PipelineResult run(Options options) {
.withInstanceId(ValueProvider.StaticValueProvider.of(options.getMetadataInstance()))
.withDatabaseId(ValueProvider.StaticValueProvider.of(options.getMetadataDatabase()));

LOG.info("Spanner Config Metadata Generation Completed Successfully");

ShadowTableCreator shadowTableCreator =
new ShadowTableCreator(
spannerConfig,
Expand All @@ -520,7 +522,7 @@ public static PipelineResult run(Options options) {
spannerMetadataConfig.getDatabaseId().get())
.getDialect(),
options.getShadowTablePrefix());

LOG.info("Shadow Table Generation Completed Successfully");
DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class);

shadowTableCreator.createShadowTablesInSpanner();
Expand All @@ -547,8 +549,9 @@ public static PipelineResult run(Options options) {
"Logical shard id was not found, hence setting it to : " + Constants.DEFAULT_SHARD_ID);
}
}

LOG.info("Shards Generation Completed Successfully");
if (options.getSourceType().equals(CASSANDRA_SOURCE_TYPE)) {
LOG.info("Going to Generate Schema for Cassandra");
Map<String, SpannerTable> spannerTableMap =
SpannerSchema.convertDDLTableToSpannerTable(ddl.allTables());
Map<String, NameAndCols> spannerTableNameColsMap =
Expand All @@ -568,6 +571,7 @@ public static PipelineResult run(Options options) {
spannerTableNameColsMap,
cassandraSourceMetadata.getNameAndColsMap());
} catch (Exception e) {
LOG.error("Exception while Generate Schema for Cassandra");
throw new IllegalArgumentException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,45 +74,45 @@ public class CassandraDMLGenerator implements IDMLGenerator {
@Override
public DMLGeneratorResponse getDMLStatement(DMLGeneratorRequest dmlGeneratorRequest) {
if (dmlGeneratorRequest == null) {
LOG.warn("DMLGeneratorRequest is null. Cannot process the request.");
LOG.info("DMLGeneratorRequest is null. Cannot process the request.");
return new DMLGeneratorResponse("");
}

LOG.info("DMLGeneratorRequest is not null, process the request.");
String spannerTableName = dmlGeneratorRequest.getSpannerTableName();
Schema schema = dmlGeneratorRequest.getSchema();

if (schema == null
|| schema.getSpannerToID() == null
|| schema.getSpSchema() == null
|| schema.getSrcSchema() == null) {
LOG.warn("Schema is invalid or incomplete for table: {}", spannerTableName);
LOG.info("Schema is invalid or incomplete for table: {}", spannerTableName);
return new DMLGeneratorResponse("");
}

NameAndCols tableMapping = schema.getSpannerToID().get(spannerTableName);
if (tableMapping == null) {
LOG.warn(
LOG.info(
"Spanner table {} not found in session file. Dropping the record.", spannerTableName);
return new DMLGeneratorResponse("");
}

String spannerTableId = tableMapping.getName();
SpannerTable spannerTable = schema.getSpSchema().get(spannerTableId);
if (spannerTable == null) {
LOG.warn(
LOG.info(
"Spanner table {} not found in session file. Dropping the record.", spannerTableName);
return new DMLGeneratorResponse("");
}

SourceTable sourceTable = schema.getSrcSchema().get(spannerTableId);
if (sourceTable == null) {
LOG.warn(
LOG.info(
"Source table {} not found for Spanner table ID: {}", spannerTableName, spannerTableId);
return new DMLGeneratorResponse("");
}

if (sourceTable.getPrimaryKeys() == null || sourceTable.getPrimaryKeys().length == 0) {
LOG.warn(
LOG.info(
"Cannot reverse replicate table {} without primary key. Skipping the record.",
sourceTable.getName());
return new DMLGeneratorResponse("");
Expand All @@ -126,7 +126,7 @@ public DMLGeneratorResponse getDMLStatement(DMLGeneratorRequest dmlGeneratorRequ
dmlGeneratorRequest.getKeyValuesJson(),
dmlGeneratorRequest.getSourceDbTimezoneOffset());
if (pkColumnNameValues == null) {
LOG.warn(
LOG.info(
"Failed to generate primary key values for table {}. Skipping the record.",
sourceTable.getName());
return new DMLGeneratorResponse("");
Expand Down Expand Up @@ -402,7 +402,7 @@ private static Map<String, PreparedStatementValueObject<?>> getPkColumnValues(
SourceColumnDefinition sourceColDef = sourceTable.getColDefs().get(colId);
SpannerColumnDefinition spannerColDef = spannerTable.getColDefs().get(colId);
if (spannerColDef == null) {
LOG.warn(
LOG.info(
"The corresponding primary key column {} was not found in Spanner",
sourceColDef.getName());
return null;
Expand All @@ -426,7 +426,7 @@ private static Map<String, PreparedStatementValueObject<?>> getPkColumnValues(
getMappedColumnValue(
spannerColDef, sourceColDef, newValuesJson, sourceDbTimezoneOffset);
} else {
LOG.warn("The column {} was not found in input record", spannerColumnName);
LOG.info("The column {} was not found in input record", spannerColumnName);
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,15 @@ public static boolean processRecord(
throws Exception {

try {

LOG.info("Input Recodr is going to Process");
String tableName = spannerRecord.getTableName();
String modType = spannerRecord.getModType().name();
String keysJsonStr = spannerRecord.getMod().getKeysJson();
String newValueJsonStr = spannerRecord.getMod().getNewValuesJson();
JSONObject newValuesJson = new JSONObject(newValueJsonStr);
JSONObject keysJson = new JSONObject(keysJsonStr);
Map<String, Object> customTransformationResponse = null;

LOG.info("Input Recodr is going to Process for " + tableName);
if (spannerToSourceTransformer != null) {
org.joda.time.Instant startTimestamp = org.joda.time.Instant.now();
Map<String, Object> mapRequest =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@
import com.google.cloud.teleport.v2.templates.dbutils.dao.source.CassandraDao;
import com.google.cloud.teleport.v2.templates.dbutils.dao.source.IDao;
import com.google.cloud.teleport.v2.templates.dbutils.dao.source.JdbcDao;
import com.google.cloud.teleport.v2.templates.dbutils.dml.CassandraDMLGenerator;
import com.google.cloud.teleport.v2.templates.dbutils.dml.IDMLGenerator;
import com.google.cloud.teleport.v2.templates.dbutils.dml.MySQLDMLGenerator;
import com.google.cloud.teleport.v2.templates.exceptions.UnsupportedSourceException;
import com.google.cloud.teleport.v2.templates.models.ConnectionHelperRequest;
import com.google.cloud.teleport.v2.templates.models.DMLGeneratorRequest;
import com.google.cloud.teleport.v2.templates.models.DMLGeneratorResponse;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -54,16 +53,7 @@ public class SourceProcessorFactory {

static {
dmlGeneratorMap.put(Constants.SOURCE_MYSQL, new MySQLDMLGenerator());
dmlGeneratorMap.put(
Constants.SOURCE_CASSANDRA,
new IDMLGenerator() {
// TODO It will get removed in DML PR added Now for Test case eg: new
// CassandraDMLGenerator()
@Override
public DMLGeneratorResponse getDMLStatement(DMLGeneratorRequest dmlGeneratorRequest) {
return new DMLGeneratorResponse("");
}
});
dmlGeneratorMap.put(Constants.SOURCE_CASSANDRA, new CassandraDMLGenerator());

connectionHelperMap.put(Constants.SOURCE_MYSQL, new JdbcConnectionHelper());
connectionHelperMap.put(Constants.SOURCE_CASSANDRA, new CassandraConnectionHelper());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ public void teardown() {
*/
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
LOG.info("Going to Process Element for Assign Shard ID");
TrimmedShardedDataChangeRecord record = new TrimmedShardedDataChangeRecord(c.element());
String qualifiedShard = "";
String tableName = record.getTableName();
Expand Down Expand Up @@ -233,6 +234,7 @@ public void processElement(ProcessContext c) throws Exception {
record.setShard(qualifiedShard);
String finalKeyString = tableName + "_" + keysJsonStr + "_" + qualifiedShard;
finalKey = finalKeyString.hashCode() % maxConnectionsAcrossAllShards;
LOG.info(" Generated Final Key for Assign Shard Id " + finalKey + " " + finalKeyString);
c.output(KV.of(finalKey, record));

} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public void setSpannerToSourceTransformer(
/** Setup function connects to Cloud Spanner. */
@Setup
public void setup() throws UnsupportedSourceException {
LOG.info("Applying Source Written Function");
mapper = new ObjectMapper();
mapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
sourceProcessor =
Expand All @@ -162,20 +163,25 @@ public void teardown() throws Exception {

@ProcessElement
public void processElement(ProcessContext c) {
LOG.info("Process Element for Source Written Function");
KV<Long, TrimmedShardedDataChangeRecord> element = c.element();
TrimmedShardedDataChangeRecord spannerRec = element.getValue();
String shardId = spannerRec.getShard();
LOG.info("shardId " + shardId);
if (shardId == null) {
LOG.info("Skipping becasue of Shard is null");
// no shard found, move to permanent error
outputWithTag(
c, Constants.PERMANENT_ERROR_TAG, Constants.SHARD_NOT_PRESENT_ERROR_MESSAGE, spannerRec);
} else if (shardId.equals(skipDirName)) {
LOG.info("Skipping because It is under Skip Dir Name");
// the record is skipped
skippedRecordCountMetric.inc();
outputWithTag(c, Constants.SKIPPED_TAG, Constants.SKIPPED_TAG_MESSAGE, spannerRec);
} else {
// Get the latest commit timestamp processed at source
try {
LOG.info("Executing Record");
JsonNode keysJson = mapper.readTree(spannerRec.getMod().getKeysJson());
String tableName = spannerRec.getTableName();
com.google.cloud.spanner.Key primaryKey =
Expand All @@ -199,6 +205,7 @@ public void processElement(ProcessContext c) {
== 0
&& shadowTableRecord.getRecordSequence()
> Long.parseLong(spannerRec.getRecordSequence())));
LOG.info("isSourceAhead " + isSourceAhead);

if (!isSourceAhead) {
IDao sourceDao = sourceProcessor.getSourceDao(shardId);
Expand Down

0 comments on commit d5ed2aa

Please sign in to comment.