Skip to content

Commit

Permalink
NIFI-13555: Add verification to HikariDBCPConnectionPool
Browse files Browse the repository at this point in the history
  • Loading branch information
mattyb149 committed Jul 16, 2024
1 parent 1ff5ebd commit 7505b5d
Show file tree
Hide file tree
Showing 3 changed files with 211 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,23 @@
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.VerifiableControllerService;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.krb.KerberosAction;
import org.apache.nifi.security.krb.KerberosLoginException;
import org.apache.nifi.security.krb.KerberosUser;

import javax.security.auth.login.LoginException;
Expand All @@ -48,10 +52,14 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.nifi.components.ConfigVerificationResult.Outcome.FAILED;
import static org.apache.nifi.components.ConfigVerificationResult.Outcome.SUCCESSFUL;

/**
* Implementation of Database Connection Pooling Service. HikariCP is used for connection pooling functionality.
*/
Expand All @@ -71,7 +79,7 @@
)
}
)
public class HikariCPConnectionPool extends AbstractControllerService implements DBCPService {
public class HikariCPConnectionPool extends AbstractControllerService implements DBCPService, VerifiableControllerService {
/**
* Property Name Prefix for Sensitive Dynamic Properties
*/
Expand All @@ -81,6 +89,8 @@ public class HikariCPConnectionPool extends AbstractControllerService implements
private static final String DEFAULT_TOTAL_CONNECTIONS = "10";
private static final String DEFAULT_MAX_CONN_LIFETIME = "-1";

private static final int DEFAULT_MIN_VALIDATION_TIMEOUT = 250;

public static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder()
.name("hikaricp-connection-url")
.displayName("Database Connection URL")
Expand Down Expand Up @@ -254,7 +264,132 @@ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String
*/
@OnEnabled
public void onConfigured(final ConfigurationContext context) {
dataSource = new HikariDataSource();
configureDataSource(context, dataSource);
}

private long extractMillisWithInfinite(PropertyValue prop) {
return "-1".equals(prop.getValue()) ? INFINITE_MILLISECONDS : prop.asTimePeriod(TimeUnit.MILLISECONDS);
}

/**
* Shutdown pool, close all open connections.
* If a principal is authenticated with a KDC, that principal is logged out.
* <p>
* If a @{@link LoginException} occurs while attempting to log out the @{@link org.apache.nifi.security.krb.KerberosUser},
* an attempt will still be made to shut down the pool and close open connections.
*
*/
@OnDisabled
public void shutdown() {
try {
if (kerberosUser != null) {
kerberosUser.logout();
}
} finally {
kerberosUser = null;
try {
if (dataSource != null) {
dataSource.close();
}
} finally {
dataSource = null;
}
}
}

@Override
public Connection getConnection() throws ProcessException {
try {
final Connection con;
if (kerberosUser != null) {
KerberosAction<Connection> kerberosAction = new KerberosAction<>(kerberosUser, () -> dataSource.getConnection(), getLogger());
con = kerberosAction.execute();
} else {
con = dataSource.getConnection();
}
return con;
} catch (final SQLException e) {
// If using Kerberos, attempt to re-login
if (kerberosUser != null) {
getLogger().info("Error getting connection, performing Kerberos re-login");
kerberosUser.login();
}
throw new ProcessException("Connection retrieval failed", e);
}
}

@Override
public List<ConfigVerificationResult> verify(final ConfigurationContext context, final ComponentLog verificationLogger, final Map<String, String> variables) {
List<ConfigVerificationResult> results = new ArrayList<>();
final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
KerberosUser kerberosUser = null;
try {
if (kerberosUserService != null) {
kerberosUser = kerberosUserService.createKerberosUser();
if (kerberosUser != null) {
results.add(new ConfigVerificationResult.Builder()
.verificationStepName("Configure Kerberos User")
.outcome(SUCCESSFUL)
.explanation("Successfully configured Kerberos user")
.build());
}
}
} catch (final Exception e) {
verificationLogger.error("Failed to configure Kerberos user", e);
results.add(new ConfigVerificationResult.Builder()
.verificationStepName("Configure Kerberos User")
.outcome(FAILED)
.explanation("Failed to configure Kerberos user: " + e.getMessage())
.build());
}

final HikariDataSource hikariDataSource = new HikariDataSource();
try {
configureDataSource(context, hikariDataSource);
results.add(new ConfigVerificationResult.Builder()
.verificationStepName("Configure Data Source")
.outcome(SUCCESSFUL)
.explanation("Successfully configured data source")
.build());

try (final Connection conn = getConnection(hikariDataSource, kerberosUser)) {
results.add(new ConfigVerificationResult.Builder()
.verificationStepName("Establish Connection")
.outcome(SUCCESSFUL)
.explanation("Successfully established Database Connection")
.build());
} catch (final Exception e) {
verificationLogger.error("Failed to establish Database Connection", e);
results.add(new ConfigVerificationResult.Builder()
.verificationStepName("Establish Connection")
.outcome(FAILED)
.explanation("Failed to establish Database Connection: " + e.getMessage())
.build());
}
} catch (final Exception e) {
String message = "Failed to configure Data Source.";
if (e.getCause() instanceof ClassNotFoundException) {
message += String.format(" Ensure changes to the '%s' property are applied before verifying",
DB_DRIVER_LOCATION.getDisplayName());
}
verificationLogger.error(message, e);
results.add(new ConfigVerificationResult.Builder()
.verificationStepName("Configure Data Source")
.outcome(FAILED)
.explanation(message + ": " + e.getMessage())
.build());
} finally {
try {
shutdown(dataSource, kerberosUser);
} catch (final SQLException e) {
verificationLogger.error("Failed to shut down data source", e);
}
}
return results;
}

protected void configureDataSource(final ConfigurationContext context, final HikariDataSource dataSource) {
final String driverName = context.getProperty(DB_DRIVERNAME).evaluateAttributeExpressions().getValue();
final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue();
final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
Expand All @@ -264,6 +399,7 @@ public void onConfigured(final ConfigurationContext context) {
final long maxWaitMillis = extractMillisWithInfinite(context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions());
final int minIdle = context.getProperty(MIN_IDLE).evaluateAttributeExpressions().asInteger();
final long maxConnLifetimeMillis = extractMillisWithInfinite(context.getProperty(MAX_CONN_LIFETIME).evaluateAttributeExpressions());

final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);

if (kerberosUserService != null) {
Expand All @@ -273,9 +409,8 @@ public void onConfigured(final ConfigurationContext context) {
}
}

dataSource = new HikariDataSource();
dataSource.setDriverClassName(driverName);
dataSource.setConnectionTimeout(maxWaitMillis);
dataSource.setValidationTimeout(Math.max(maxWaitMillis, DEFAULT_MIN_VALIDATION_TIMEOUT));
dataSource.setMaximumPoolSize(maxTotal);
dataSource.setMinimumIdle(minIdle);
dataSource.setMaxLifetime(maxConnLifetimeMillis);
Expand All @@ -284,6 +419,7 @@ public void onConfigured(final ConfigurationContext context) {
dataSource.setConnectionTestQuery(validationQuery);
}

dataSource.setDriverClassName(driverName);
dataSource.setJdbcUrl(dburl);
dataSource.setUsername(user);
dataSource.setPassword(passw);
Expand All @@ -308,42 +444,11 @@ public void onConfigured(final ConfigurationContext context) {
dataSource.setPoolName(toString());
}

private long extractMillisWithInfinite(PropertyValue prop) {
return "-1".equals(prop.getValue()) ? INFINITE_MILLISECONDS : prop.asTimePeriod(TimeUnit.MILLISECONDS);
}

/**
* Shutdown pool, close all open connections.
* If a principal is authenticated with a KDC, that principal is logged out.
* <p>
* If a @{@link LoginException} occurs while attempting to log out the @{@link org.apache.nifi.security.krb.KerberosUser},
* an attempt will still be made to shut down the pool and close open connections.
*
*/
@OnDisabled
public void shutdown() {
try {
if (kerberosUser != null) {
kerberosUser.logout();
}
} finally {
kerberosUser = null;
try {
if (dataSource != null) {
dataSource.close();
}
} finally {
dataSource = null;
}
}
}

@Override
public Connection getConnection() throws ProcessException {
private Connection getConnection(final HikariDataSource dataSource, final KerberosUser kerberosUser) {
try {
final Connection con;
if (kerberosUser != null) {
KerberosAction<Connection> kerberosAction = new KerberosAction<>(kerberosUser, () -> dataSource.getConnection(), getLogger());
KerberosAction<Connection> kerberosAction = new KerberosAction<>(kerberosUser, dataSource::getConnection, getLogger());
con = kerberosAction.execute();
} else {
con = dataSource.getConnection();
Expand All @@ -352,14 +457,30 @@ public Connection getConnection() throws ProcessException {
} catch (final SQLException e) {
// If using Kerberos, attempt to re-login
if (kerberosUser != null) {
getLogger().info("Error getting connection, performing Kerberos re-login");
kerberosUser.login();
try {
getLogger().info("Error getting connection, performing Kerberos re-login");
kerberosUser.login();
} catch (KerberosLoginException le) {
throw new ProcessException("Unable to authenticate Kerberos principal", le);
}
}
throw new ProcessException("Connection retrieval failed", e);
throw new ProcessException(e);
}
}

@Override
private void shutdown(final HikariDataSource dataSource, final KerberosUser kerberosUser) throws SQLException {
try {
if (kerberosUser != null) {
kerberosUser.logout();
}
} finally {
if (dataSource != null) {
dataSource.close();
}
}
}

@Override
public String toString() {
return String.format("%s[id=%s]", getClass().getSimpleName(), getIdentifier());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@
*/
package org.apache.nifi.dbcp;

import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.ControllerServiceConfiguration;
import org.apache.nifi.util.MockConfigurationContext;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.NoOpProcessor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
Expand All @@ -26,15 +30,25 @@

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;

public class HikariCPConnectionPoolTest {
private final static String SERVICE_ID = HikariCPConnectionPoolTest.class.getSimpleName();

private static final String INVALID_CONNECTION_URL = "jdbc:h2";

private static final String DB_DRIVERNAME_VALUE = "jdbc:mock";

private static final String MAX_WAIT_TIME_VALUE = "5 s";

private TestRunner runner;

@BeforeEach
Expand Down Expand Up @@ -134,11 +148,44 @@ public void testGetConnection() throws SQLException, InitializationException {
}
}

@Test
void testVerifySuccessful() throws Exception {
final HikariCPConnectionPool service = new HikariCPConnectionPool();
runner.addControllerService(SERVICE_ID, service);
final Connection mockConnection = mock(Connection.class);
MockDriver.setConnection(mockConnection);
setDatabaseProperties(service);
runner.setProperty(service, HikariCPConnectionPool.MAX_TOTAL_CONNECTIONS, "2");
runner.enableControllerService(service);
runner.assertValid(service);
MockProcessContext processContext = (MockProcessContext) runner.getProcessContext();
final ControllerServiceConfiguration configuration = processContext.getConfiguration(service.getIdentifier());
final MockConfigurationContext configContext = new MockConfigurationContext(service, configuration.getProperties(), processContext, Collections.emptyMap());
final List<ConfigVerificationResult> results = service.verify(configContext, runner.getLogger(), configContext.getAllProperties());

assertOutcomeSuccessful(results);
}

private void setDatabaseProperties(final HikariCPConnectionPool service) {
runner.setProperty(service, HikariCPConnectionPool.DATABASE_URL, "jdbc:mock");
runner.setProperty(service, HikariCPConnectionPool.DATABASE_URL, DB_DRIVERNAME_VALUE);
runner.setProperty(service, HikariCPConnectionPool.DB_DRIVERNAME, MockDriver.class.getName());
runner.setProperty(service, HikariCPConnectionPool.MAX_WAIT_TIME, "5 s");
runner.setProperty(service, HikariCPConnectionPool.MAX_WAIT_TIME, MAX_WAIT_TIME_VALUE);
runner.setProperty(service, HikariCPConnectionPool.DB_USER, String.class.getSimpleName());
runner.setProperty(service, HikariCPConnectionPool.DB_PASSWORD, String.class.getName());
}

private void assertOutcomeSuccessful(final List<ConfigVerificationResult> results) {
assertNotNull(results);
final Iterator<ConfigVerificationResult> resultsFound = results.iterator();

assertTrue(resultsFound.hasNext());
final ConfigVerificationResult firstResult = resultsFound.next();
assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, firstResult.getOutcome(), firstResult.getExplanation());

assertTrue(resultsFound.hasNext());
final ConfigVerificationResult secondResult = resultsFound.next();
assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, secondResult.getOutcome(), secondResult.getExplanation());

assertFalse(resultsFound.hasNext());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ protected void addControllerServices(final MockControllerServiceLookup other) {
this.controllerServiceMap.putAll(other.controllerServiceMap);
}

protected ControllerServiceConfiguration getConfiguration(final String identifier) {
public ControllerServiceConfiguration getConfiguration(final String identifier) {
return controllerServiceMap.get(identifier);
}

Expand Down

0 comments on commit 7505b5d

Please sign in to comment.