diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/StripedMonitoredQueuedThreadPool.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/StripedMonitoredQueuedThreadPool.java new file mode 100644 index 000000000000..c6e732883bda --- /dev/null +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/StripedMonitoredQueuedThreadPool.java @@ -0,0 +1,162 @@ +// +// ======================================================================== +// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.util.thread; + +import org.eclipse.jetty.util.NanoTime; +import org.eclipse.jetty.util.annotation.ManagedAttribute; +import org.eclipse.jetty.util.annotation.ManagedObject; +import org.eclipse.jetty.util.annotation.ManagedOperation; +import org.eclipse.jetty.util.statistic.CounterStatistic; +import org.eclipse.jetty.util.statistic.SampleStatistic; + +/** + *

A {@link StripedQueuedThreadPool} subclass that monitors its own activity by recording queue and task statistics.

+ */ +@ManagedObject +public class StripedMonitoredQueuedThreadPool extends StripedQueuedThreadPool +{ + private final CounterStatistic queueStats = new CounterStatistic(); + private final SampleStatistic queueLatencyStats = new SampleStatistic(); + private final SampleStatistic taskLatencyStats = new SampleStatistic(); + private final CounterStatistic threadStats = new CounterStatistic(); + + public StripedMonitoredQueuedThreadPool() + { + this(256); + } + + public StripedMonitoredQueuedThreadPool(int maxThreads) + { + this(maxThreads, maxThreads); + } + + public StripedMonitoredQueuedThreadPool(int maxThreads, int minThreads) + { + super(maxThreads, minThreads); + installBean(queueStats); + installBean(queueLatencyStats); + installBean(taskLatencyStats); + installBean(threadStats); + } + + @Override + public void execute(final Runnable job) + { + queueStats.increment(); + long begin = NanoTime.now(); + super.execute(new Runnable() + { + @Override + public void run() + { + long queueLatency = NanoTime.since(begin); + queueStats.decrement(); + threadStats.increment(); + queueLatencyStats.record(queueLatency); + long start = NanoTime.now(); + try + { + job.run(); + } + finally + { + long taskLatency = NanoTime.since(start); + threadStats.decrement(); + taskLatencyStats.record(taskLatency); + } + } + + @Override + public String toString() + { + return job.toString(); + } + }); + } + + /** + * Resets the statistics. + */ + @ManagedOperation(value = "resets the statistics", impact = "ACTION") + public void reset() + { + queueStats.reset(); + queueLatencyStats.reset(); + taskLatencyStats.reset(); + threadStats.reset(0); + } + + /** + * @return the number of tasks executed + */ + @ManagedAttribute("the number of tasks executed") + public long getTasks() + { + return taskLatencyStats.getCount(); + } + + /** + * @return the maximum number of busy threads + */ + @ManagedAttribute("the maximum number of busy threads") + public int getMaxBusyThreads() + { + return (int)threadStats.getMax(); + } + + /** + * @return the maximum task queue size + */ + @ManagedAttribute("the maximum task queue size") + public int getMaxQueueSize() + { + return (int)queueStats.getMax(); + } + + /** + * @return the average time a task remains in the queue, in nanoseconds + */ + @ManagedAttribute("the average time a task remains in the queue, in nanoseconds") + public long getAverageQueueLatency() + { + return (long)queueLatencyStats.getMean(); + } + + /** + * @return the maximum time a task remains in the queue, in nanoseconds + */ + @ManagedAttribute("the maximum time a task remains in the queue, in nanoseconds") + public long getMaxQueueLatency() + { + return queueLatencyStats.getMax(); + } + + /** + * @return the average task execution time, in nanoseconds + */ + @ManagedAttribute("the average task execution time, in nanoseconds") + public long getAverageTaskLatency() + { + return (long)taskLatencyStats.getMean(); + } + + /** + * @return the maximum task execution time, in nanoseconds + */ + @ManagedAttribute("the maximum task execution time, in nanoseconds") + public long getMaxTaskLatency() + { + return taskLatencyStats.getMax(); + } +} diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/StripedQueuedThreadPool.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/StripedQueuedThreadPool.java new file mode 100644 index 000000000000..3c64be8d949a --- /dev/null +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/StripedQueuedThreadPool.java @@ -0,0 +1,247 @@ +// +// ======================================================================== +// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.util.thread; + +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadLocalRandom; + +import org.eclipse.jetty.util.MathUtils; +import org.eclipse.jetty.util.ProcessorUtils; +import org.eclipse.jetty.util.VirtualThreads; +import org.eclipse.jetty.util.annotation.ManagedObject; +import org.eclipse.jetty.util.annotation.Name; +import org.eclipse.jetty.util.component.ContainerLifeCycle; +import org.eclipse.jetty.util.component.Dumpable; +import org.eclipse.jetty.util.component.DumpableCollection; +import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool; + +/** + * A striped thread pool with queues of jobs to execute. + */ +@ManagedObject("A striped thread pool") +public class StripedQueuedThreadPool extends ContainerLifeCycle implements ThreadFactory, SizedThreadPool, Dumpable, TryExecutor, VirtualThreads.Configurable +{ + private static final int STRIPES = 16; + + private final QueuedThreadPool[] queuedThreadPools = new QueuedThreadPool[STRIPES]; + + private QueuedThreadPool pickQTP() + { + int idx = ThreadLocalRandom.current().nextInt(STRIPES); + return queuedThreadPools[idx]; + } + + public StripedQueuedThreadPool() + { + this(200); + } + + public StripedQueuedThreadPool(@Name("maxThreads") int maxThreads) + { + this(maxThreads, Math.min(8, maxThreads)); + } + + public StripedQueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads) + { + if (maxThreads < minThreads) + throw new IllegalArgumentException("max threads (" + maxThreads + ") less than min threads (" + minThreads + ")"); + for (int i = 0; i < queuedThreadPools.length; i++) + { + queuedThreadPools[i] = new QueuedThreadPool(); + } + setMinThreads(minThreads); + setMaxThreads(maxThreads); + setIdleTimeout(60000); + setStopTimeout(5000); + setReservedThreads(-1); +// setThreadPoolBudget(new ThreadPoolBudget(this)); + } + + public void setThreadPoolBudget(ThreadPoolBudget budget) + { + for (QueuedThreadPool queuedThreadPool : queuedThreadPools) + { + queuedThreadPool.setThreadPoolBudget(budget); + } + } + + public void setReservedThreads(int reservedThreads) + { + if (isRunning()) + throw new IllegalStateException(getState()); + + int reserved; + if (reservedThreads >= 0) + { + reserved = reservedThreads / STRIPES; + } + else + { + int cpus = ProcessorUtils.availableProcessors(); + int threads = getMaxThreads() / STRIPES; + reserved = Math.max(1, MathUtils.ceilToNextPowerOfTwo(Math.min(cpus, threads / 8))); + } + + for (QueuedThreadPool queuedThreadPool : queuedThreadPools) + { + queuedThreadPool.setReservedThreads(reserved); + } + } + + public void setStopTimeout(long stopTimeout) + { + for (QueuedThreadPool queuedThreadPool : queuedThreadPools) + { + queuedThreadPool.setStopTimeout(stopTimeout); + } + } + + public void setIdleTimeout(int idleTimeout) + { + for (QueuedThreadPool queuedThreadPool : queuedThreadPools) + { + queuedThreadPool.setIdleTimeout(idleTimeout); + } + } + + public void setName(String name) + { + if (isRunning()) + throw new IllegalStateException(getState()); + for (int i = 0; i < queuedThreadPools.length; i++) + { + QueuedThreadPool queuedThreadPool = queuedThreadPools[i]; + queuedThreadPool.setName(name + '|' + i); + } + } + + @Override + protected void doStart() throws Exception + { + super.doStart(); + for (QueuedThreadPool queuedThreadPool : queuedThreadPools) + { + queuedThreadPool.start(); + } + } + + @Override + protected void doStop() throws Exception + { + super.doStop(); + for (QueuedThreadPool queuedThreadPool : queuedThreadPools) + { + queuedThreadPool.stop(); + } + } + + @Override + public int getMinThreads() + { + return queuedThreadPools[0].getMinThreads(); + } + + @Override + public int getMaxThreads() + { + return queuedThreadPools[0].getMaxThreads() * STRIPES; + } + + @Override + public void join() throws InterruptedException + { + for (QueuedThreadPool queuedThreadPool : queuedThreadPools) + { + queuedThreadPool.join(); + } + } + + @Override + public int getThreads() + { + int total = 0; + for (QueuedThreadPool queuedThreadPool : queuedThreadPools) + { + total += queuedThreadPool.getThreads(); + } + return total; + } + + @Override + public int getIdleThreads() + { + int total = 0; + for (QueuedThreadPool queuedThreadPool : queuedThreadPools) + { + total += queuedThreadPool.getIdleThreads(); + } + return total; + } + + @Override + public boolean isLowOnThreads() + { + boolean low = false; + for (QueuedThreadPool queuedThreadPool : queuedThreadPools) + { + low |= queuedThreadPool.isLowOnThreads(); + } + return low; + } + + @Override + public void setMinThreads(int threads) + { + for (QueuedThreadPool queuedThreadPool : queuedThreadPools) + { + queuedThreadPool.setMinThreads(threads); + } + } + + @Override + public void setMaxThreads(int threads) + { + for (QueuedThreadPool queuedThreadPool : queuedThreadPools) + { + queuedThreadPool.setMaxThreads(threads / STRIPES); + } + } + + @Override + public Thread newThread(Runnable r) + { + return pickQTP().newThread(r); + } + + @Override + public boolean tryExecute(Runnable task) + { + return pickQTP().tryExecute(task); + } + + @Override + public void execute(Runnable task) + { + pickQTP().execute(task); + } + + @Override + public void dump(Appendable out, String indent) throws IOException + { + DumpableCollection pools = new DumpableCollection("threadpools", Arrays.asList(queuedThreadPools)); + dumpObjects(out, indent, pools); + } +} diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java index de06f65679de..65de9a10ee78 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java @@ -130,6 +130,7 @@ private enum SubStrategy private final LongAdder _picMode = new LongAdder(); private final LongAdder _pecMode = new LongAdder(); private final LongAdder _epcMode = new LongAdder(); + private final LongAdder _epcProduce = new LongAdder(); private final Producer _producer; private final Executor _executor; private final TryExecutor _tryExecutor; @@ -460,6 +461,7 @@ private boolean consumeTask(Runnable task, SubStrategy subStrategy) // or it may take over if we subsequently do another EPC consumption. if (!_state.compareAndSet(biState, pending, PRODUCING)) continue; + _epcProduce.increment(); return true; } @@ -601,6 +603,12 @@ public long getEPCTasksConsumed() return _epcMode.longValue(); } + @ManagedAttribute(value = "number of times a EPC thread produces again", readonly = true) + public long getEPCProduceCount() + { + return _epcProduce.longValue(); + } + @ManagedAttribute(value = "whether this execution strategy is idle", readonly = true) public boolean isIdle() { @@ -663,6 +671,8 @@ private void getState(StringBuilder builder, long biState) builder.append(",pec="); builder.append(getPECTasksExecuted()); builder.append(",epc="); + builder.append(getEPCProduceCount()); + builder.append("/"); builder.append(getEPCTasksConsumed()); builder.append("]"); builder.append("@"); diff --git a/tests/jetty-jmh/src/main/java/org/eclipse/jetty/util/thread/jmh/QueuedThreadPoolScheduledExecutorSchedulerBenchmark.java b/tests/jetty-jmh/src/main/java/org/eclipse/jetty/util/thread/jmh/QueuedThreadPoolScheduledExecutorSchedulerBenchmark.java new file mode 100644 index 000000000000..bb077d51e347 --- /dev/null +++ b/tests/jetty-jmh/src/main/java/org/eclipse/jetty/util/thread/jmh/QueuedThreadPoolScheduledExecutorSchedulerBenchmark.java @@ -0,0 +1,116 @@ +// +// ======================================================================== +// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.util.thread.jmh; + +import java.io.Closeable; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import org.eclipse.jetty.util.BlockingArrayQueue; +import org.eclipse.jetty.util.component.LifeCycle; +import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler; +import org.eclipse.jetty.util.thread.Scheduler; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +@State(Scope.Benchmark) +@Warmup(iterations = 10, time = 500, timeUnit = TimeUnit.MILLISECONDS) +@Measurement(iterations = 10, time = 500, timeUnit = TimeUnit.MILLISECONDS) +public class QueuedThreadPoolScheduledExecutorSchedulerBenchmark +{ + @Param({ + "BAQ", + "ABQ", + }) + public static String QUEUE_TYPE; + + QueuedThreadPool pool; + ScheduledExecutorScheduler scheduler; + + @Setup(Level.Iteration) + public void buildPool() + { + BlockingQueue q = switch (QUEUE_TYPE) + { + case "BAQ" -> new BlockingArrayQueue<>(1024 * 1204, 4 * 1024); + case "ABQ" -> new ArrayBlockingQueue<>(16 * 1024 * 1204); + default -> throw new IllegalArgumentException(); + }; + pool = new QueuedThreadPool(200, 200, q); + pool.setStopTimeout(30000); + pool.setReservedThreads(0); + LifeCycle.start(pool); + + scheduler = new ScheduledExecutorScheduler(); + LifeCycle.start(scheduler); + } + + @TearDown(Level.Iteration) + public void shutdownPool() + { + LifeCycle.stop(pool); + LifeCycle.stop(scheduler); + } + + @Benchmark + @BenchmarkMode(Mode.Throughput) + @Threads(8) + public void test() + { + pool.execute((CloseableRunnable)() -> + { + Scheduler.Task task = scheduler.schedule(() -> {}, 1, TimeUnit.SECONDS); + task.cancel(); + }); + } + + public static void main(String[] args) throws RunnerException + { + // String asyncProfilerPath = "/home/lorban/work/tools/async-profiler/4.0/lib/libasyncProfiler.so"; + Options opt = new OptionsBuilder() + .include(QueuedThreadPoolScheduledExecutorSchedulerBenchmark.class.getSimpleName()) + .forks(1) + // .addProfiler(CompilerProfiler.class) + // .addProfiler(LinuxPerfProfiler.class) + // .addProfiler(LinuxPerfNormProfiler.class) + // .addProfiler(LinuxPerfAsmProfiler.class, "hotThreshold=0.05") + // .addProfiler(AsyncProfiler.class, "dir=/tmp/QTP;output=jfr;event=cpu;libPath=" + asyncProfilerPath) + .build(); + + new Runner(opt).run(); + } + + private interface CloseableRunnable extends Closeable, Runnable + { + @Override + default void close() + { + } + } +}