Skip to content

Commit

Permalink
NIFI-12382: Renamed to DatabaseTableSchemaRegistry, fixed tests and r…
Browse files Browse the repository at this point in the history
…efactored complexity into methods
  • Loading branch information
mattyb149 committed Nov 17, 2023
1 parent b4b8a6d commit 49bc2e7
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,6 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-base</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kerberos-credentials-service-api</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kerberos-user-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,20 @@
@CapabilityDescription("Provides a service for generating a record schema from a database table definition. The service is configured "
+ "to use a table name and a database connection fetches the table metadata (i.e. table definition) such as column names, data types, "
+ "nullability, etc.")
public class DatabaseSchemaRegistry extends AbstractControllerService implements SchemaRegistry {
public class DatabaseTableSchemaRegistry extends AbstractControllerService implements SchemaRegistry {

private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME);

static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
.name("dbcp-service")
.name("Database Connection Pooling Service")
.displayName("Database Connection Pooling Service")
.description("The Controller Service that is used to obtain a connection to the database for retrieving table information.")
.required(true)
.identifiesControllerService(DBCPService.class)
.build();

static final PropertyDescriptor CATALOG_NAME = new PropertyDescriptor.Builder()
.name("catalog-name")
.name("Catalog Name")
.displayName("Catalog Name")
.description("The name of the catalog used to locate the desired table. This may not apply for the database that you are querying. In this case, leave the field empty. Note that if the "
+ "property is set and the database is case-sensitive, the catalog name must match the database's catalog name exactly.")
Expand All @@ -74,7 +74,7 @@ public class DatabaseSchemaRegistry extends AbstractControllerService implements
.build();

static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
.name("schema-name")
.name("Schema Name")
.displayName("Schema Name")
.description("The name of the schema that the table belongs to. This may not apply for the database that you are updating. In this case, leave the field empty. Note that if the "
+ "property is set and the database is case-sensitive, the schema name must match the database's schema name exactly. Also notice that if the same table name exists in multiple "
Expand Down Expand Up @@ -129,51 +129,64 @@ RecordSchema retrieveSchemaByName(final SchemaIdentifier schemaIdentifier) throw
final String tableName = schemaName.get();
try {
try (final Connection conn = dbcpService.getConnection()) {
final DatabaseMetaData dmd = conn.getMetaData();
try (final ResultSet colrs = dmd.getColumns(dbCatalogName, dbSchemaName, tableName, "%")) {
final List<RecordField> recordFields = new ArrayList<>();
while (colrs.next()) {
// COLUMN_DEF must be read first to work around Oracle bug, see NIFI-4279 for details
final String defaultValue = colrs.getString("COLUMN_DEF");
final String columnName = colrs.getString("COLUMN_NAME");
final int dataType = colrs.getInt("DATA_TYPE");
final String nullableValue = colrs.getString("IS_NULLABLE");
final boolean isNullable = "YES".equalsIgnoreCase(nullableValue) || nullableValue.isEmpty();
recordFields.add(new RecordField(
columnName,
DataTypeUtils.getDataTypeFromSQLTypeValue(dataType),
defaultValue,
isNullable));
}

// If no columns are found, check that the table exists
if (recordFields.isEmpty()) {
try (final ResultSet tblrs = dmd.getTables(dbCatalogName, dbSchemaName, tableName, null)) {
List<String> qualifiedNameSegments = new ArrayList<>();
if (dbCatalogName != null) {
qualifiedNameSegments.add(dbCatalogName);
}
if (dbSchemaName != null) {
qualifiedNameSegments.add(dbSchemaName);
}
qualifiedNameSegments.add(tableName);

if (!tblrs.next()) {
throw new SchemaNotFoundException("Table "
+ String.join(".", qualifiedNameSegments)
+ " not found");
} else {
getLogger().warn("Table "
+ String.join(".", qualifiedNameSegments)
+ " found but no columns were found, if this is not expected then check the user permissions for getting table metadata from the database");
}
}
}
return new SimpleRecordSchema(recordFields);
final DatabaseMetaData databaseMetaData = conn.getMetaData();
return getRecordSchemaFromMetadata(databaseMetaData, tableName);
}
}
} catch (SQLException sqle) {
throw new IOException("Error retrieving schema for table " + schemaName.get(), sqle);
}
}

private RecordSchema getRecordSchemaFromMetadata(final DatabaseMetaData databaseMetaData, final String tableName) throws SQLException, SchemaNotFoundException {
try (final ResultSet columnResultSet = databaseMetaData.getColumns(dbCatalogName, dbSchemaName, tableName, "%")) {

final List<RecordField> recordFields = new ArrayList<>();
while (columnResultSet.next()) {
recordFields.add(createRecordFieldFromColumn(columnResultSet));
}

// If no columns are found, check that the table exists
if (recordFields.isEmpty()) {
checkTableExists(databaseMetaData, tableName);
}
return new SimpleRecordSchema(recordFields);
}
}

private RecordField createRecordFieldFromColumn(final ResultSet columnResultSet) throws SQLException {
// COLUMN_DEF must be read first to work around Oracle bug, see NIFI-4279 for details
final String defaultValue = columnResultSet.getString("COLUMN_DEF");
final String columnName = columnResultSet.getString("COLUMN_NAME");
final int dataType = columnResultSet.getInt("DATA_TYPE");
final String nullableValue = columnResultSet.getString("IS_NULLABLE");
final boolean isNullable = "YES".equalsIgnoreCase(nullableValue) || nullableValue.isEmpty();
return new RecordField(
columnName,
DataTypeUtils.getDataTypeFromSQLTypeValue(dataType),
defaultValue,
isNullable);
}

private void checkTableExists(final DatabaseMetaData databaseMetaData, final String tableName) throws SchemaNotFoundException, SQLException {
try (final ResultSet tblrs = databaseMetaData.getTables(dbCatalogName, dbSchemaName, tableName, null)) {
List<String> qualifiedNameSegments = new ArrayList<>();
if (dbCatalogName != null) {
qualifiedNameSegments.add(dbCatalogName);
}
if (dbSchemaName != null) {
qualifiedNameSegments.add(dbSchemaName);
}
qualifiedNameSegments.add(tableName);

if (!tblrs.next()) {
throw new SchemaNotFoundException("Table "
+ String.join(".", qualifiedNameSegments)
+ " not found");
} else {
getLogger().warn("Table "
+ String.join(".", qualifiedNameSegments)
+ " found but no columns were found, if this is not expected then check the user permissions for getting table metadata from the database");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.nifi.db.schemaregistry.DatabaseSchemaRegistry
org.apache.nifi.db.schemaregistry.DatabaseTableSchemaRegistry
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@
*/
package org.apache.nifi.db.schemaregistry;

import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.ConnectionUrlValidator;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.dbcp.utils.DBCPProperties;
import org.apache.nifi.dbcp.DriverClassValidator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.record.RecordField;
Expand Down Expand Up @@ -52,7 +56,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.spy;

public class DatabaseSchemaRegistryTest {
public class DatabaseTableSchemaRegistryTest {

private static final List<String> CREATE_TABLE_STATEMENTS = Arrays.asList(
"CREATE TABLE PERSONS (id integer primary key, name varchar(100)," +
Expand All @@ -69,6 +73,30 @@ public class DatabaseSchemaRegistryTest {

private final static String DB_LOCATION = "target/db_schema_reg";

// This is to mimic those in DBCPProperties to avoid adding the dependency to nifi-dbcp-base
private static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder()
.name("Database Connection URL")
.addValidator(new ConnectionUrlValidator())
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();

private static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder()
.name("Database User")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();

private static final PropertyDescriptor DB_PASSWORD = new PropertyDescriptor.Builder()
.name("Password")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
private static final PropertyDescriptor DB_DRIVERNAME = new PropertyDescriptor.Builder()
.name("Database Driver Class Name")
.addValidator(new DriverClassValidator())
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();

private TestRunner runner;

@BeforeAll
Expand Down Expand Up @@ -120,19 +148,19 @@ public void setService() throws InitializationException {
runner.addControllerService(SERVICE_ID, dbcp);

final String url = String.format("jdbc:derby:%s;create=false", DB_LOCATION);
runner.setProperty(dbcp, DBCPProperties.DATABASE_URL, url);
runner.setProperty(dbcp, DBCPProperties.DB_USER, String.class.getSimpleName());
runner.setProperty(dbcp, DBCPProperties.DB_PASSWORD, String.class.getName());
runner.setProperty(dbcp, DBCPProperties.DB_DRIVERNAME, "org.apache.derby.jdbc.EmbeddedDriver");
runner.setProperty(dbcp, DATABASE_URL, url);
runner.setProperty(dbcp, DB_USER, String.class.getSimpleName());
runner.setProperty(dbcp, DB_PASSWORD, String.class.getName());
runner.setProperty(dbcp, DB_DRIVERNAME, "org.apache.derby.jdbc.EmbeddedDriver");
runner.enableControllerService(dbcp);
}

@Test
public void testGetSchemaExists() throws Exception {
DatabaseSchemaRegistry dbSchemaRegistry = new DatabaseSchemaRegistry();
DatabaseTableSchemaRegistry dbSchemaRegistry = new DatabaseTableSchemaRegistry();
runner.addControllerService("schemaRegistry", dbSchemaRegistry);
runner.setProperty(dbSchemaRegistry, DatabaseSchemaRegistry.DBCP_SERVICE, SERVICE_ID);
runner.setProperty(dbSchemaRegistry, DatabaseSchemaRegistry.SCHEMA_NAME, "SCHEMA1");
runner.setProperty(dbSchemaRegistry, DatabaseTableSchemaRegistry.DBCP_SERVICE, SERVICE_ID);
runner.setProperty(dbSchemaRegistry, DatabaseTableSchemaRegistry.SCHEMA_NAME, "SCHEMA1");
runner.enableControllerService(dbSchemaRegistry);
SchemaIdentifier schemaIdentifier = new StandardSchemaIdentifier.Builder()
.name("PERSONS")
Expand All @@ -158,10 +186,10 @@ public void testGetSchemaExists() throws Exception {

@Test
public void testGetSchemaNotExists() throws Exception {
DatabaseSchemaRegistry dbSchemaRegistry = new DatabaseSchemaRegistry();
DatabaseTableSchemaRegistry dbSchemaRegistry = new DatabaseTableSchemaRegistry();
runner.addControllerService("schemaRegistry", dbSchemaRegistry);
runner.setProperty(dbSchemaRegistry, DatabaseSchemaRegistry.DBCP_SERVICE, SERVICE_ID);
runner.setProperty(dbSchemaRegistry, DatabaseSchemaRegistry.SCHEMA_NAME, "SCHEMA1");
runner.setProperty(dbSchemaRegistry, DatabaseTableSchemaRegistry.DBCP_SERVICE, SERVICE_ID);
runner.setProperty(dbSchemaRegistry, DatabaseTableSchemaRegistry.SCHEMA_NAME, "SCHEMA1");
runner.enableControllerService(dbSchemaRegistry);
SchemaIdentifier schemaIdentifier = new StandardSchemaIdentifier.Builder()
.name("NOT_A_TABLE")
Expand Down

0 comments on commit 49bc2e7

Please sign in to comment.