Skip to content

Commit 88580c9

Browse files
committed
add prototype StripedQueuedThreadPool
Signed-off-by: Ludovic Orban <[email protected]>
1 parent 4e6a835 commit 88580c9

File tree

1 file changed

+247
-0
lines changed

1 file changed

+247
-0
lines changed
Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
//
2+
// ========================================================================
3+
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
4+
//
5+
// This program and the accompanying materials are made available under the
6+
// terms of the Eclipse Public License v. 2.0 which is available at
7+
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
8+
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
9+
//
10+
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
11+
// ========================================================================
12+
//
13+
14+
package org.eclipse.jetty.util.thread;
15+
16+
import java.io.IOException;
17+
import java.util.Arrays;
18+
import java.util.concurrent.ThreadFactory;
19+
import java.util.concurrent.ThreadLocalRandom;
20+
21+
import org.eclipse.jetty.util.MathUtils;
22+
import org.eclipse.jetty.util.ProcessorUtils;
23+
import org.eclipse.jetty.util.VirtualThreads;
24+
import org.eclipse.jetty.util.annotation.ManagedObject;
25+
import org.eclipse.jetty.util.annotation.Name;
26+
import org.eclipse.jetty.util.component.ContainerLifeCycle;
27+
import org.eclipse.jetty.util.component.Dumpable;
28+
import org.eclipse.jetty.util.component.DumpableCollection;
29+
import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
30+
31+
/**
32+
* A striped thread pool with queues of jobs to execute.
33+
*/
34+
@ManagedObject("A striped thread pool")
35+
public class StripedQueuedThreadPool extends ContainerLifeCycle implements ThreadFactory, SizedThreadPool, Dumpable, TryExecutor, VirtualThreads.Configurable
36+
{
37+
private static final int STRIPES = 16;
38+
39+
private final QueuedThreadPool[] queuedThreadPools = new QueuedThreadPool[STRIPES];
40+
41+
private QueuedThreadPool pickQTP()
42+
{
43+
int idx = ThreadLocalRandom.current().nextInt(STRIPES);
44+
return queuedThreadPools[idx];
45+
}
46+
47+
public StripedQueuedThreadPool()
48+
{
49+
this(200);
50+
}
51+
52+
public StripedQueuedThreadPool(@Name("maxThreads") int maxThreads)
53+
{
54+
this(maxThreads, Math.min(8, maxThreads));
55+
}
56+
57+
public StripedQueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads)
58+
{
59+
if (maxThreads < minThreads)
60+
throw new IllegalArgumentException("max threads (" + maxThreads + ") less than min threads (" + minThreads + ")");
61+
for (int i = 0; i < queuedThreadPools.length; i++)
62+
{
63+
queuedThreadPools[i] = new QueuedThreadPool();
64+
}
65+
setMinThreads(minThreads);
66+
setMaxThreads(maxThreads);
67+
setIdleTimeout(60000);
68+
setStopTimeout(5000);
69+
setReservedThreads(-1);
70+
// setThreadPoolBudget(new ThreadPoolBudget(this));
71+
}
72+
73+
public void setThreadPoolBudget(ThreadPoolBudget budget)
74+
{
75+
for (QueuedThreadPool queuedThreadPool : queuedThreadPools)
76+
{
77+
queuedThreadPool.setThreadPoolBudget(budget);
78+
}
79+
}
80+
81+
public void setReservedThreads(int reservedThreads)
82+
{
83+
if (isRunning())
84+
throw new IllegalStateException(getState());
85+
86+
int reserved;
87+
if (reservedThreads >= 0)
88+
{
89+
reserved = reservedThreads / STRIPES;
90+
}
91+
else
92+
{
93+
int cpus = ProcessorUtils.availableProcessors();
94+
int threads = getMaxThreads() / STRIPES;
95+
reserved = Math.max(1, MathUtils.ceilToNextPowerOfTwo(Math.min(cpus, threads / 8)));
96+
}
97+
98+
for (QueuedThreadPool queuedThreadPool : queuedThreadPools)
99+
{
100+
queuedThreadPool.setReservedThreads(reserved);
101+
}
102+
}
103+
104+
public void setStopTimeout(long stopTimeout)
105+
{
106+
for (QueuedThreadPool queuedThreadPool : queuedThreadPools)
107+
{
108+
queuedThreadPool.setStopTimeout(stopTimeout);
109+
}
110+
}
111+
112+
public void setIdleTimeout(int idleTimeout)
113+
{
114+
for (QueuedThreadPool queuedThreadPool : queuedThreadPools)
115+
{
116+
queuedThreadPool.setIdleTimeout(idleTimeout);
117+
}
118+
}
119+
120+
public void setName(String name)
121+
{
122+
if (isRunning())
123+
throw new IllegalStateException(getState());
124+
for (int i = 0; i < queuedThreadPools.length; i++)
125+
{
126+
QueuedThreadPool queuedThreadPool = queuedThreadPools[i];
127+
queuedThreadPool.setName(name + '|' + i);
128+
}
129+
}
130+
131+
@Override
132+
protected void doStart() throws Exception
133+
{
134+
super.doStart();
135+
for (QueuedThreadPool queuedThreadPool : queuedThreadPools)
136+
{
137+
queuedThreadPool.start();
138+
}
139+
}
140+
141+
@Override
142+
protected void doStop() throws Exception
143+
{
144+
super.doStop();
145+
for (QueuedThreadPool queuedThreadPool : queuedThreadPools)
146+
{
147+
queuedThreadPool.stop();
148+
}
149+
}
150+
151+
@Override
152+
public int getMinThreads()
153+
{
154+
return queuedThreadPools[0].getMinThreads();
155+
}
156+
157+
@Override
158+
public int getMaxThreads()
159+
{
160+
return queuedThreadPools[0].getMaxThreads() * STRIPES;
161+
}
162+
163+
@Override
164+
public void join() throws InterruptedException
165+
{
166+
for (QueuedThreadPool queuedThreadPool : queuedThreadPools)
167+
{
168+
queuedThreadPool.join();
169+
}
170+
}
171+
172+
@Override
173+
public int getThreads()
174+
{
175+
int total = 0;
176+
for (QueuedThreadPool queuedThreadPool : queuedThreadPools)
177+
{
178+
total += queuedThreadPool.getThreads();
179+
}
180+
return total;
181+
}
182+
183+
@Override
184+
public int getIdleThreads()
185+
{
186+
int total = 0;
187+
for (QueuedThreadPool queuedThreadPool : queuedThreadPools)
188+
{
189+
total += queuedThreadPool.getIdleThreads();
190+
}
191+
return total;
192+
}
193+
194+
@Override
195+
public boolean isLowOnThreads()
196+
{
197+
boolean low = false;
198+
for (QueuedThreadPool queuedThreadPool : queuedThreadPools)
199+
{
200+
low |= queuedThreadPool.isLowOnThreads();
201+
}
202+
return low;
203+
}
204+
205+
@Override
206+
public void setMinThreads(int threads)
207+
{
208+
for (QueuedThreadPool queuedThreadPool : queuedThreadPools)
209+
{
210+
queuedThreadPool.setMinThreads(threads);
211+
}
212+
}
213+
214+
@Override
215+
public void setMaxThreads(int threads)
216+
{
217+
for (QueuedThreadPool queuedThreadPool : queuedThreadPools)
218+
{
219+
queuedThreadPool.setMaxThreads(threads / STRIPES);
220+
}
221+
}
222+
223+
@Override
224+
public Thread newThread(Runnable r)
225+
{
226+
return pickQTP().newThread(r);
227+
}
228+
229+
@Override
230+
public boolean tryExecute(Runnable task)
231+
{
232+
return pickQTP().tryExecute(task);
233+
}
234+
235+
@Override
236+
public void execute(Runnable task)
237+
{
238+
pickQTP().execute(task);
239+
}
240+
241+
@Override
242+
public void dump(Appendable out, String indent) throws IOException
243+
{
244+
DumpableCollection pools = new DumpableCollection("threadpools", Arrays.asList(queuedThreadPools));
245+
dumpObjects(out, indent, pools);
246+
}
247+
}

0 commit comments

Comments
 (0)