Skip to content

Commit

Permalink
STARCH-730 final close setup, coverage progress
Browse files Browse the repository at this point in the history
  • Loading branch information
mwfyffeiu committed Jan 9, 2024
1 parent 569b615 commit 347c160
Show file tree
Hide file tree
Showing 5 changed files with 256 additions and 68 deletions.
105 changes: 96 additions & 9 deletions jdbc/pool/src/main/java/edu/iu/jdbc/pool/IuCommonDataSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@
import edu.iu.IuObject;
import edu.iu.IuUtilityTaskController;
import edu.iu.UnsafeFunction;
import edu.iu.UnsafeRunnable;
import edu.iu.UnsafeSupplier;

/**
* Abstract generic database connection pool.
*/
public abstract class IuCommonDataSource implements CommonDataSource, ConnectionEventListener {
public abstract class IuCommonDataSource implements CommonDataSource, ConnectionEventListener, AutoCloseable {

static {
Logger.getLogger(IuCommonDataSource.class.getPackageName());
Expand All @@ -48,11 +49,14 @@ public abstract class IuCommonDataSource implements CommonDataSource, Connection
private long maxConnectionReuseCount = 100;
private Duration maxConnectionReuseTime = Duration.ofMinutes(15L);
private Duration abandonedConnectionTimeout = Duration.ofMinutes(30L);
private Duration shutdownTimeout = Duration.ofSeconds(30L);

private String validationQuery;
private Duration validationInterval = Duration.ofSeconds(15L);
private UnsafeFunction<Connection, Connection> connectionInitializer;
private UnsafeRunnable onClose;

private boolean closed;
private volatile int pendingConnections;

/**
Expand Down Expand Up @@ -89,16 +93,22 @@ public IuPooledConnection getPooledConnection() throws SQLException {

var attempt = 0;
Throwable error = null;
while (attempt <= maxRetry && timeout.isAfter(Instant.now()))
while (!closed //
&& attempt <= maxRetry //
&& timeout.isAfter(Instant.now()))
try {
attempt++;

synchronized (this) {
IuObject.waitFor(this, () -> !reusableConnections.isEmpty() || !this.isExhausted(), timeout);
IuObject.waitFor(this, () -> closed //
|| !reusableConnections.isEmpty() //
|| !this.isExhausted(),
timeout);

pendingConnections++;
}

try {
attempt++;

while (!reusableConnections.isEmpty()) {
final var reusableConnection = reusableConnections.poll();

Expand Down Expand Up @@ -169,10 +179,12 @@ public void connectionClosed(ConnectionEvent event) {
return;
}

LOG.finer(() -> "jdbc-pool-reusable; " + reusableConnection);
reusableConnections.offer(reusableConnection);
synchronized (this) {
this.notifyAll();
if (!closed) {
LOG.finer(() -> "jdbc-pool-reusable; " + reusableConnection);
reusableConnections.offer(reusableConnection);
synchronized (this) {
this.notifyAll();
}
}
}

Expand Down Expand Up @@ -369,6 +381,28 @@ public void setAbandonedConnectionTimeout(Duration abandonedConnectionTimeout) {
this.abandonedConnectionTimeout = abandonedConnectionTimeout;
}

/**
* Gets the maximum length of time to wait for all connections to close on
* shutdown.
*
* @return Maximum length of time to wait for all connections to close
* gracefully
*/
public Duration getShutdownTimeout() {
return shutdownTimeout;
}

/**
* Sets the maximum length of time to wait for all connections to close on
* shutdown.
*
* @param shutdownTimeout Maximum length of time to wait for all connections to
* close gracefully
*/
protected void setShutdownTimeout(Duration shutdownTimeout) {
this.shutdownTimeout = shutdownTimeout;
}

/**
* Gets the query to use for validating connections on creation, and
* intermittently before checking out from the pool.
Expand Down Expand Up @@ -430,10 +464,60 @@ public void setConnectionInitializer(UnsafeFunction<Connection, Connection> conn
this.connectionInitializer = connectionInitializer;
}

/**
* Sets an optional shutdown hook to be invoked from {@link #close()} after all
* physical connections managed by the pool have been closed.
*
* @param onClose {@link UnsafeRunnable}
*/
public void setOnClose(UnsafeRunnable onClose) {
this.onClose = onClose;
}

/**
* Waits for completion and closes all open connections.
*/
@Override
public synchronized void close() throws SQLException {
if (!closed) {
closed = true;

class CloseStatus {
Throwable error = null;
}
final var closeStatus = new CloseStatus();

while (!reusableConnections.isEmpty())
closeStatus.error = IuException.suppress(closeStatus.error, () -> reusableConnections.poll().close());

IuException.suppress(closeStatus.error, () -> IuObject.waitFor(this, () -> {
for (final var c : openConnections)
if (c.logicalConnectionOpened() == null)
closeStatus.error = IuException.suppress(closeStatus.error, () -> c.close());

return openConnections.isEmpty();
}, shutdownTimeout));

if (onClose != null)
closeStatus.error = IuException.suppress(closeStatus.error, onClose);

closeStatus.error = IuException.suppress(closeStatus.error, () -> {
final var size = openConnections.size();
if (size > 0)
throw new SQLException(
size + " connections remaining in the pool after graceful shutdown " + shutdownTimeout);
});

if (closeStatus.error != null)
throw IuException.checked(closeStatus.error, SQLException.class);
}
}

@Override
public String toString() {
return getClass().getSimpleName() + " [" //
+ "loginTimeout=" + getLoginTimeout() //
+ ", closed=" + closed //
+ ", url=" + getUrl() //
+ ", username=" + getUsername() //
+ ", schema=" + getSchema() //
Expand All @@ -454,6 +538,9 @@ private synchronized boolean isExhausted() {
}

private IuPooledConnection openConnection(Instant timeout) throws SQLException {
if (closed)
throw new SQLException("closed");

final var initTime = Instant.now();
final var pooledConnection = IuException.checked(SQLException.class, () -> {
try {
Expand Down
3 changes: 1 addition & 2 deletions jdbc/pool/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
module iu.util.jdbc.pool {
exports edu.iu.jdbc.pool;

requires iu.util;

requires transitive iu.util;
requires transitive java.sql;
requires transitive java.logging;
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void testSetAndToStringMethods() throws SQLException {
final var ds = new IuCommonDataSource(null) {
};
assertEquals(
" [loginTimeout=15, url=null, username=null, schema=null, available=0, open=0, maxSize=16, maxRetry=1, maxConnectionReuseCount=100, maxConnectionReuseTime=PT15M, abandonedConnectionTimeout=PT30M, validationQuery=null, validationInterval=PT15S]",
" [loginTimeout=15, closed=false, url=null, username=null, schema=null, available=0, open=0, maxSize=16, maxRetry=1, maxConnectionReuseCount=100, maxConnectionReuseTime=PT15M, abandonedConnectionTimeout=PT30M, validationQuery=null, validationInterval=PT15S]",
ds.toString());
ds.setLoginTimeout(12);
ds.setAbandonedConnectionTimeout(Duration.ofMillis(34));
Expand All @@ -73,7 +73,7 @@ public void testSetAndToStringMethods() throws SQLException {
ds.setMaxRetry(45);
ds.setMaxSize(67);
assertEquals(
" [loginTimeout=12, url=jdbc:foo:bar, username=foo, schema=bar, available=0, open=0, maxSize=67, maxRetry=45, maxConnectionReuseCount=56, maxConnectionReuseTime=PT89H, abandonedConnectionTimeout=PT0.034S, validationQuery=select foo from bar, validationInterval=PT0.000000072S]",
" [loginTimeout=12, closed=false, url=jdbc:foo:bar, username=foo, schema=bar, available=0, open=0, maxSize=67, maxRetry=45, maxConnectionReuseCount=56, maxConnectionReuseTime=PT89H, abandonedConnectionTimeout=PT0.034S, validationQuery=select foo from bar, validationInterval=PT0.000000072S]",
ds.toString());
}

Expand Down Expand Up @@ -110,4 +110,15 @@ public void testConnectionInitializer() throws SQLException {

}

@Test
public void testClose() throws SQLException {
final var pc1 = mock(PooledConnection.class);
final var c1 = mock(Connection.class);
when(c1.unwrap(Connection.class)).thenReturn(c1);
when(pc1.getConnection()).thenReturn(c1);

final var ds = new IuCommonDataSource(() -> pc1) {
};
ds.close();
}
}
Loading

0 comments on commit 347c160

Please sign in to comment.