-
Notifications
You must be signed in to change notification settings - Fork 3k
Create connections concurrently in separate threads #2317
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Changes from all commits
2c9dd89
ddee141
2074427
d8a4706
491c9fb
538ec4e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,6 +38,7 @@ | |
| import java.sql.SQLTransientConnectionException; | ||
| import java.util.Optional; | ||
| import java.util.concurrent.*; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; | ||
|
|
||
| import static com.zaxxer.hikari.util.ClockSource.*; | ||
|
|
@@ -73,6 +74,8 @@ public final class HikariPool extends PoolBase implements HikariPoolMXBean, IBag | |
|
|
||
| private final PoolEntryCreator poolEntryCreator = new PoolEntryCreator(); | ||
| private final PoolEntryCreator postFillPoolEntryCreator = new PoolEntryCreator("After adding "); | ||
|
|
||
| private final AtomicInteger connectionsInProgress = new AtomicInteger(0); | ||
| private final ThreadPoolExecutor addConnectionExecutor; | ||
| private final ThreadPoolExecutor closeConnectionExecutor; | ||
|
|
||
|
|
@@ -114,24 +117,24 @@ public HikariPool(final HikariConfig config) | |
| ThreadFactory threadFactory = config.getThreadFactory(); | ||
|
|
||
| final int maxPoolSize = config.getMaximumPoolSize(); | ||
| this.addConnectionExecutor = createThreadPoolExecutor(maxPoolSize, poolName + ":connection-adder", threadFactory, new CustomDiscardPolicy()); | ||
| this.addConnectionExecutor = createCreatorThreadPoolExecutor( | ||
| Math.min(Runtime.getRuntime().availableProcessors() * 2, config.getMinimumIdle()), | ||
| Math.max(Runtime.getRuntime().availableProcessors() * 2, config.getMinimumIdle()), | ||
| config.getMaximumPoolSize(), | ||
| ":connection-adder", | ||
| threadFactory, | ||
| new CustomDiscardPolicy()); | ||
| this.closeConnectionExecutor = createThreadPoolExecutor(maxPoolSize, poolName + ":connection-closer", threadFactory, new ThreadPoolExecutor.CallerRunsPolicy()); | ||
|
|
||
| this.leakTaskFactory = new ProxyLeakTaskFactory(config.getLeakDetectionThreshold(), houseKeepingExecutorService); | ||
|
|
||
| this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, housekeepingPeriodMs, MILLISECONDS); | ||
|
|
||
| if (Boolean.getBoolean("com.zaxxer.hikari.blockUntilFilled") && config.getInitializationFailTimeout() > 1) { | ||
| addConnectionExecutor.setMaximumPoolSize(Math.min(16, Runtime.getRuntime().availableProcessors())); | ||
| addConnectionExecutor.setCorePoolSize(Math.min(16, Runtime.getRuntime().availableProcessors())); | ||
|
|
||
| final long startTime = currentTime(); | ||
| while (elapsedMillis(startTime) < config.getInitializationFailTimeout() && getTotalConnections() < config.getMinimumIdle()) { | ||
| quietlySleep(MILLISECONDS.toMillis(100)); | ||
| } | ||
|
|
||
| addConnectionExecutor.setCorePoolSize(1); | ||
| addConnectionExecutor.setMaximumPoolSize(1); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -335,7 +338,7 @@ public void setHealthCheckRegistry(Object healthCheckRegistry) | |
| @Override | ||
| public void addBagItem(final int waiting) | ||
| { | ||
| if (waiting > addConnectionExecutor.getQueue().size()) | ||
| if (waiting > connectionsInProgress.get()) | ||
| addConnectionExecutor.submit(poolEntryCreator); | ||
| } | ||
|
|
||
|
|
@@ -520,10 +523,11 @@ private PoolEntry createPoolEntry() | |
| private synchronized void fillPool(final boolean isAfterAdd) | ||
| { | ||
| final var idle = getIdleConnections(); | ||
| final var shouldAdd = getTotalConnections() < config.getMaximumPoolSize() && idle < config.getMinimumIdle(); | ||
| final var connectionsInProgress = this.connectionsInProgress.get(); | ||
| final var shouldAdd = getTotalConnections() < config.getMaximumPoolSize() - connectionsInProgress && idle < config.getMinimumIdle(); | ||
|
|
||
| if (shouldAdd) { | ||
| final var countToAdd = config.getMinimumIdle() - idle; | ||
| final var countToAdd = config.getMinimumIdle() - idle - connectionsInProgress; | ||
| for (int i = 0; i < countToAdd; i++) | ||
| addConnectionExecutor.submit(isAfterAdd ? postFillPoolEntryCreator : poolEntryCreator); | ||
| } | ||
|
|
@@ -740,26 +744,26 @@ private final class PoolEntryCreator implements Callable<Boolean> | |
| @Override | ||
| public Boolean call() | ||
| { | ||
| var backoffMs = 10L; | ||
| long jitterMs = ThreadLocalRandom.current().nextLong(Math.max(10, config.getConnectionTimeout() / 10)); | ||
| if (connectionsInProgress.get() >= Runtime.getRuntime().availableProcessors()) { | ||
| quietlySleep(jitterMs); | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added sleeping for some jittered time to avoid potential CPU throttling in CPU-limited environments like Kubernetes, when multiple connections are created at once. |
||
| } | ||
| var added = false; | ||
| try { | ||
| while (shouldContinueCreating()) { | ||
| if (!Thread.interrupted() && shouldContinueCreating()) { | ||
| final var poolEntry = createPoolEntry(); | ||
| if (poolEntry != null) { | ||
| added = true; | ||
| connectionBag.add(poolEntry); | ||
| logger.debug("{} - Added connection {}", poolName, poolEntry.connection); | ||
| quietlySleep(30L); | ||
| break; | ||
| } else { // failed to get connection from db, sleep and retry | ||
| if (loggingPrefix != null && backoffMs % 50 == 0) | ||
| logger.debug("{} - Connection add failed, sleeping with backoff: {}ms", poolName, backoffMs); | ||
| quietlySleep(backoffMs); | ||
| backoffMs = Math.min(SECONDS.toMillis(5), backoffMs * 2); | ||
| if (loggingPrefix != null) | ||
| logger.debug("{} - Connection add failed", poolName); | ||
| } | ||
| } | ||
| } | ||
| finally { | ||
| connectionsInProgress.decrementAndGet(); | ||
| if (added && loggingPrefix != null) | ||
| logPoolState(loggingPrefix); | ||
| else if (!added) | ||
|
|
@@ -777,8 +781,10 @@ else if (!added) | |
| * @return true if we should create a connection, false if the need has disappeared | ||
| */ | ||
| private synchronized boolean shouldContinueCreating() { | ||
| return poolState == POOL_NORMAL && getTotalConnections() < config.getMaximumPoolSize() && | ||
| (getIdleConnections() < config.getMinimumIdle() || connectionBag.getWaitingThreadCount() > getIdleConnections()); | ||
| final int idleConnections = getIdleConnections(); | ||
| final int connectionsInProgress = HikariPool.this.connectionsInProgress.incrementAndGet(); | ||
| return poolState == POOL_NORMAL && getTotalConnections() + connectionsInProgress < config.getMaximumPoolSize() && | ||
| (idleConnections < config.getMinimumIdle() || connectionBag.getWaitingThreadCount() > idleConnections + connectionsInProgress); | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ | |
|
|
||
| import java.util.Locale; | ||
| import java.util.concurrent.*; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.regex.Pattern; | ||
|
|
||
| import static java.lang.Thread.currentThread; | ||
|
|
@@ -131,6 +132,11 @@ public static ThreadPoolExecutor createThreadPoolExecutor(final int queueSize, f | |
| return createThreadPoolExecutor(new LinkedBlockingQueue<>(queueSize), threadName, threadFactory, policy); | ||
| } | ||
|
|
||
| public static ThreadPoolExecutor createCreatorThreadPoolExecutor(final int corePoolSize, final int maxPoolSize, final int queueSize, final String threadName, ThreadFactory threadFactory, final RejectedExecutionHandler policy) | ||
| { | ||
| return createCreatorThreadPoolExecutor(corePoolSize, maxPoolSize, new LinkedBlockingQueue<>(queueSize), threadName, threadFactory, policy); | ||
| } | ||
|
|
||
| /** | ||
| * Create a ThreadPoolExecutor. | ||
| * | ||
|
|
@@ -151,6 +157,18 @@ public static ThreadPoolExecutor createThreadPoolExecutor(final BlockingQueue<Ru | |
| return executor; | ||
| } | ||
|
|
||
| public static ThreadPoolExecutor createCreatorThreadPoolExecutor( | ||
| int corePoolSize, int maxPoolSize, final BlockingQueue<Runnable> queue, final String threadName, ThreadFactory threadFactory, final RejectedExecutionHandler policy) | ||
| { | ||
| if (threadFactory == null) { | ||
| threadFactory = new DefaultThreadFactory(threadName); | ||
| } | ||
|
|
||
| var executor = new ThreadPoolExecutor(corePoolSize /*core*/, maxPoolSize /*max*/, 5 /*keepalive*/, SECONDS, queue, threadFactory, policy); | ||
| executor.allowCoreThreadTimeOut(true); | ||
| return executor; | ||
| } | ||
|
|
||
| // *********************************************************************** | ||
| // Misc. public methods | ||
| // *********************************************************************** | ||
|
|
@@ -198,6 +216,7 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { | |
|
|
||
| public static final class DefaultThreadFactory implements ThreadFactory | ||
| { | ||
| private static final AtomicInteger counter = new AtomicInteger(0); | ||
| private final String threadName; | ||
| private final boolean daemon; | ||
|
|
||
|
|
@@ -209,7 +228,7 @@ public DefaultThreadFactory(String threadName) { | |
| @Override | ||
| @SuppressWarnings("NullableProblems") | ||
| public Thread newThread(Runnable r) { | ||
| var thread = new Thread(r, threadName); | ||
| var thread = new Thread(r, threadName + "_" + counter.incrementAndGet()); | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. P.S. If the overall idea is good, gotta change this part so that thread naming doesn't interfere with other existing executors. |
||
| thread.setDaemon(daemon); | ||
| return thread; | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, core pool size will have to be the same as max pool size (not Hikari's), so that executor doesn't hold the task in the queue and executes immediately. Something to fix