Skip to content

Commit

Permalink
Add minSize to pool to maintain a minimum # of connections
Browse files Browse the repository at this point in the history
  • Loading branch information
kdubb committed Apr 26, 2023
1 parent cc83994 commit 7021dce
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ public static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json,
obj.setMaxWaitQueueSize(((Number)member.getValue()).intValue());
}
break;
case "minSize":
if (member.getValue() instanceof Number) {
obj.setMinSize(((Number)member.getValue()).intValue());
}
break;
case "name":
if (member.getValue() instanceof String) {
obj.setName((String)member.getValue());
Expand Down Expand Up @@ -104,6 +109,7 @@ public static void toJson(PoolOptions obj, java.util.Map<String, Object> json) {
}
json.put("maxSize", obj.getMaxSize());
json.put("maxWaitQueueSize", obj.getMaxWaitQueueSize());
json.put("minSize", obj.getMinSize());
if (obj.getName() != null) {
json.put("name", obj.getName());
}
Expand Down
31 changes: 31 additions & 0 deletions vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
@DataObject(generateConverter = true)
public class PoolOptions {

/**
* The default minimum number of connections a client will keep open in the pool = 0
*/
public static final int DEFAULT_MIN_SIZE = 0;

/**
* The default maximum number of connections a client will pool = 4
*/
Expand Down Expand Up @@ -92,6 +97,7 @@ public class PoolOptions {
*/
public static final int DEFAULT_EVENT_LOOP_SIZE = 0;

private int minSize = DEFAULT_MIN_SIZE;
private int maxSize = DEFAULT_MAX_SIZE;
private int maxWaitQueueSize = DEFAULT_MAX_WAIT_QUEUE_SIZE;
private int idleTimeout = DEFAULT_IDLE_TIMEOUT;
Expand All @@ -113,6 +119,7 @@ public PoolOptions(JsonObject json) {
}

public PoolOptions(PoolOptions other) {
minSize = other.minSize;
maxSize = other.maxSize;
maxWaitQueueSize = other.maxWaitQueueSize;
idleTimeout = other.idleTimeout;
Expand All @@ -122,6 +129,30 @@ public PoolOptions(PoolOptions other) {
eventLoopSize = other.eventLoopSize;
}

/**
* @return the minimum pool size
*/
public int getMinSize() {
return minSize;
}

/**
* Set the minimum pool size
*
* @param minSize the minimum pool size
* @return a reference to this, so the API can be used fluently
*/
public PoolOptions setMinSize(int minSize) {
if (minSize < 0) {
throw new IllegalArgumentException("Min size cannot be negative");
}
if (minSize > maxSize) {
throw new IllegalArgumentException("Min size cannot be greater than max size");
}
this.minSize = minSize;
return this;
}

/**
* @return the maximum pool size
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ public PoolImpl(VertxInternal vertx,
this.timerID = -1L;
this.pipelined = pipelined;
this.vertx = vertx;
this.pool = new SqlConnectionPool(ctx -> connectionProvider.apply(ctx), () -> connectionInitializer, afterAcquire, beforeRecycle, vertx, idleTimeout, maxLifetime, poolOptions.getMaxSize(), pipelined, poolOptions.getMaxWaitQueueSize(), poolOptions.getEventLoopSize());
this.pool = new SqlConnectionPool(ctx -> connectionProvider.apply(ctx), () -> connectionInitializer,
afterAcquire, beforeRecycle, vertx, idleTimeout, maxLifetime, poolOptions.getMinSize(), poolOptions.getMaxSize(),
pipelined, poolOptions.getMaxWaitQueueSize(), poolOptions.getEventLoopSize());
this.closeFuture = closeFuture;
}

Expand All @@ -77,10 +79,11 @@ public Pool init() {
if ((idleTimeout > 0 || maxLifetime > 0) && cleanerPeriod > 0) {
synchronized (this) {
timerID = vertx.setTimer(cleanerPeriod, id -> {
runEviction();
runInvariantsCheck();
});
}
}
pool.checkMin(connectionTimeout);
return this;
}

Expand All @@ -92,17 +95,17 @@ public Pool connectionProvider(Function<Context, Future<SqlConnection>> connecti
return this;
}

private void runEviction() {
private void runInvariantsCheck() {
synchronized (this) {
if (timerID == -1) {
// Cancelled
return;
}
timerID = vertx.setTimer(cleanerPeriod, id -> {
runEviction();
runInvariantsCheck();
});
}
pool.evict();
pool.checkInvariants(connectionTimeout);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class SqlConnectionPool {
private final boolean pipelined;
private final long idleTimeout;
private final long maxLifetime;
private final int minSize;
private final int maxSize;

public SqlConnectionPool(Function<Context, Future<SqlConnection>> connectionProvider,
Expand All @@ -63,10 +64,17 @@ public SqlConnectionPool(Function<Context, Future<SqlConnection>> connectionProv
VertxInternal vertx,
long idleTimeout,
long maxLifetime,
int minSize,
int maxSize,
boolean pipelined,
int maxWaitQueueSize,
int eventLoopSize) {
if (minSize < 0) {
throw new IllegalArgumentException("Pool min size must be > 0");
}
if (minSize > maxSize) {
throw new IllegalArgumentException("Pool min size must be <= max size");
}
if (maxSize < 1) {
throw new IllegalArgumentException("Pool max size must be > 0");
}
Expand All @@ -78,6 +86,7 @@ public SqlConnectionPool(Function<Context, Future<SqlConnection>> connectionProv
this.pipelined = pipelined;
this.idleTimeout = idleTimeout;
this.maxLifetime = maxLifetime;
this.minSize = minSize;
this.maxSize = maxSize;
this.hook = hook;
this.connectionProvider = connectionProvider;
Expand Down Expand Up @@ -145,18 +154,32 @@ public int size() {
return pool.size();
}

public void evict() {
public void checkInvariants(long connectionTimeout) {
long now = System.currentTimeMillis();
pool.evict(conn -> conn.shouldEvict(now), ar -> {
if (ar.succeeded()) {
List<PooledConnection> res = ar.result();
for (PooledConnection conn : res) {
conn.close(Promise.promise());
}
checkMin(connectionTimeout);
}
});
}

public void checkMin(long connectionTimeout) {
if (pool.size() < minSize) {
ContextInternal context = vertx.getOrCreateContext();
for (int i = 0; i < minSize; ++i) {
acquire(context, connectionTimeout, (ar) -> {
if (ar.succeeded()) {
ar.result().cleanup(Promise.promise());
}
});
}
}
}

public <R> Future<R> execute(ContextInternal context, CommandBase<R> cmd) {
Promise<Lease<PooledConnection>> p = context.promise();
pool.acquire(context, 0, p);
Expand Down

0 comments on commit 7021dce

Please sign in to comment.