Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4060,26 +4060,35 @@ public MutationState dropCDC(DropCDCStatement statement) throws SQLException {
String schemaName = statement.getTableName().getSchemaName();
String cdcTableName = statement.getCdcObjName().getName();
String parentTableName = statement.getTableName().getTableName();
String indexName = CDCUtil.getCDCIndexName(statement.getCdcObjName().getName());
// Mark CDC Stream as Disabled
long cdcIndexTimestamp =
connection.getTable(SchemaUtil.getTableName(schemaName, indexName)).getTimeStamp();
String fullParentTableName = SchemaUtil.getTableName(schemaName, parentTableName);
String streamName = String.format(CDC_STREAM_NAME_FORMAT, fullParentTableName, cdcTableName,
cdcIndexTimestamp, CDCUtil.getCDCCreationUTCDateTime(cdcIndexTimestamp));
markCDCStreamStatus(fullParentTableName, streamName, CDCUtil.CdcStreamStatus.DISABLED);
// Dropping the virtual CDC Table
dropTable(schemaName, cdcTableName, parentTableName, PTableType.CDC, statement.ifExists(),
false, false);
// Dropping the uncovered index associated with the CDC Table
boolean ifExists = statement.ifExists();
MutationState mutationState = new MutationState(0, 0, connection);
try {
return dropTable(schemaName, indexName, parentTableName, PTableType.INDEX,
statement.ifExists(), false, false);
} catch (SQLException e) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.fromErrorCode(e.getErrorCode()))
.setTableName(statement.getCdcObjName().getName()).setRootCause(e.getCause()).build()
.buildException();
String indexName = CDCUtil.getCDCIndexName(statement.getCdcObjName().getName());
// Mark CDC Stream as Disabled
long cdcIndexTimestamp =
connection.getTable(SchemaUtil.getTableName(schemaName, indexName)).getTimeStamp();
String fullParentTableName = SchemaUtil.getTableName(schemaName, parentTableName);
String streamName = String.format(CDC_STREAM_NAME_FORMAT, fullParentTableName, cdcTableName,
cdcIndexTimestamp, CDCUtil.getCDCCreationUTCDateTime(cdcIndexTimestamp));
markCDCStreamStatus(fullParentTableName, streamName, CDCUtil.CdcStreamStatus.DISABLED);
// Dropping the virtual CDC Table
dropTable(schemaName, cdcTableName, parentTableName, PTableType.CDC, statement.ifExists(),
false, false);
// Dropping the uncovered index associated with the CDC Table
try {
mutationState = dropTable(schemaName, indexName, parentTableName, PTableType.INDEX,
statement.ifExists(), false, false);
} catch (SQLException e) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.fromErrorCode(e.getErrorCode()))
.setTableName(statement.getCdcObjName().getName()).setRootCause(e.getCause()).build()
.buildException();
}
} catch (MetaDataEntityNotFoundException e) {
if (!ifExists) {
throw e;
}
}
return mutationState;
}

private void markCDCStreamStatus(String tableName, String streamName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,22 @@ public void testIndexNameAfterCreateCDC() throws Exception {
}
}

@Test
public void testDropCDCIfExists() throws SQLException {
Properties props = new Properties();
Connection conn = DriverManager.getConnection(getUrl(), props);
String tableName = generateUniqueName();
String schemaName = null;
String viewName = forView ? generateUniqueName() : null;
String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
if (forView) {
fullTableName = SchemaUtil.getTableName(schemaName, viewName);
}
String cdcName = generateUniqueName();
String drop_cdc_sql = "DROP CDC IF EXISTS " + cdcName + " ON " + fullTableName;
conn.createStatement().execute(drop_cdc_sql);
}

@Test
public void testDropCDC() throws SQLException {
Properties props = new Properties();
Expand Down