Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #12289 - Improve ConcurrentPool concurrency. #12290

Merged
merged 3 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -653,11 +653,11 @@ protected void exitScope(Request request, Context lastContext, ClassLoader lastL
*/
protected void notifyExitScope(Request request)
{
for (int i = _contextListeners.size(); i-- > 0; )
for (ContextScopeListener listener : TypeUtil.reverse(_contextListeners))
{
try
{
_contextListeners.get(i).exitScope(_context, request);
listener.exitScope(_context, request);
}
catch (Throwable e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public ConcurrentPool(StrategyType strategyType, int maxSize)
* @param cache whether a {@link ThreadLocal} cache should be used for the most recently released entry
* @deprecated cache is no longer supported. Use {@link StrategyType#THREAD_ID}
*/
@Deprecated
@Deprecated(since = "12.0.4", forRemoval = true)
public ConcurrentPool(StrategyType strategyType, int maxSize, boolean cache)
{
this(strategyType, maxSize, pooled -> 1);
Expand All @@ -103,7 +103,7 @@ public ConcurrentPool(StrategyType strategyType, int maxSize, boolean cache)
* @param maxMultiplex a function that given the pooled object returns the max multiplex count
* @deprecated cache is no longer supported. Use {@link StrategyType#THREAD_ID}
*/
@Deprecated
@Deprecated(since = "12.0.4", forRemoval = true)
public ConcurrentPool(StrategyType strategyType, int maxSize, boolean cache, ToIntFunction<P> maxMultiplex)
{
this(strategyType, maxSize, maxMultiplex);
Expand Down Expand Up @@ -148,7 +148,7 @@ private void leaked(Holder<P> holder)
{
leaked.increment();
if (LOG.isDebugEnabled())
LOG.debug("Leaked " + holder);
LOG.debug("Leaked {}", holder);
leaked();
}

Expand Down Expand Up @@ -195,15 +195,14 @@ public Entry<P> reserve()

void sweep()
{
for (int i = 0; i < entries.size(); i++)
// Remove entries atomically with respect to remove(Entry).
entries.removeIf(holder ->
{
Holder<P> holder = entries.get(i);
if (holder.getEntry() == null)
{
entries.remove(i--);
boolean remove = holder.getEntry() == null;
if (remove)
leaked(holder);
}
}
return remove;
});
}

@Override
Expand Down Expand Up @@ -285,8 +284,7 @@ private boolean remove(Entry<P> entry)
if (!removed)
return false;

// No need to lock, no race with reserve()
// and the race with terminate() is harmless.
// In a harmless race with reserve()/sweep()/terminate().
Holder<P> holder = ((ConcurrentEntry<P>)entry).getHolder();
boolean evicted = entries.remove(holder);
if (LOG.isDebugEnabled())
Expand All @@ -313,10 +311,7 @@ public Collection<Entry<P>> terminate()
// Field this.terminated must be modified with the lock held
// because the list of entries is modified, see reserve().
terminated = true;
copy = entries.stream()
.map(Holder::getEntry)
.filter(Objects::nonNull)
.toList();
copy = stream().toList();
entries.clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,21 @@ public static <T> List<T> asList(T[] a)
return Arrays.asList(a);
}

/**
* <p>Returns a new list with the elements of the specified list in reverse order.</p>
* <p>The specified list is not modified, differently from {@link Collections#reverse(List)}.</p>
*
* @param list the list whose elements are to be reversed
* @return a new list with the elements in reverse order
* @param <T> the element type
*/
public static <T> List<T> reverse(List<T> list)
{
List<T> result = new ArrayList<>(list);
Collections.reverse(result);
return result;
}

/**
* Class from a canonical name for a type.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import org.eclipse.jetty.util.ExceptionUtil;
import org.eclipse.jetty.util.Loader;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.URIUtil;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
Expand Down Expand Up @@ -518,8 +519,7 @@ public void contextDestroyed() throws Exception
//Call context listeners
Throwable multiException = null;
ServletContextEvent event = new ServletContextEvent(getServletContext());
Collections.reverse(_destroyServletContextListeners);
for (ServletContextListener listener : _destroyServletContextListeners)
for (ServletContextListener listener : TypeUtil.reverse(_destroyServletContextListeners))
{
try
{
Expand Down Expand Up @@ -574,17 +574,17 @@ protected void requestDestroyed(Request baseRequest, HttpServletRequest request)
if (!_servletRequestListeners.isEmpty())
{
final ServletRequestEvent sre = new ServletRequestEvent(getServletContext(), request);
for (int i = _servletRequestListeners.size(); i-- > 0; )
for (ServletRequestListener listener : TypeUtil.reverse(_servletRequestListeners))
{
_servletRequestListeners.get(i).requestDestroyed(sre);
listener.requestDestroyed(sre);
}
}

if (!_servletRequestAttributeListeners.isEmpty())
{
for (int i = _servletRequestAttributeListeners.size(); i-- > 0; )
for (ServletRequestAttributeListener listener : TypeUtil.reverse(_servletRequestAttributeListeners))
{
scopedRequest.removeEventListener(_servletRequestAttributeListeners.get(i));
scopedRequest.removeEventListener(listener);
}
}
}
Expand Down Expand Up @@ -1223,11 +1223,11 @@ protected void notifyExitScope(Request request)
ServletContextRequest scopedRequest = Request.as(request, ServletContextRequest.class);
if (!_contextListeners.isEmpty())
{
for (int i = _contextListeners.size(); i-- > 0; )
for (ServletContextScopeListener listener : TypeUtil.reverse(_contextListeners))
{
try
{
_contextListeners.get(i).exitScope(getContext(), scopedRequest);
listener.exitScope(getContext(), scopedRequest);
}
catch (Throwable e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.eclipse.jetty.session.ManagedSession;
import org.eclipse.jetty.session.SessionConfig;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.TypeUtil;

public class SessionHandler extends AbstractSessionManager implements Handler.Singleton
{
Expand Down Expand Up @@ -569,9 +570,9 @@ public void onSessionDestroyed(Session session)
getSessionContext().run(() ->
{
HttpSessionEvent event = new HttpSessionEvent(session.getApi());
for (int i = _sessionListeners.size() - 1; i >= 0; i--)
for (HttpSessionListener listener : TypeUtil.reverse(_sessionListeners))
{
_sessionListeners.get(i).sessionDestroyed(event);
listener.sessionDestroyed(event);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1000,17 +1000,17 @@ protected void requestDestroyed(Request baseRequest, HttpServletRequest request)
if (!_servletRequestListeners.isEmpty())
{
final ServletRequestEvent sre = new ServletRequestEvent(_apiContext, request);
for (int i = _servletRequestListeners.size(); i-- > 0; )
for (ServletRequestListener listener : TypeUtil.reverse(_servletRequestListeners))
sbordet marked this conversation as resolved.
Show resolved Hide resolved
{
_servletRequestListeners.get(i).requestDestroyed(sre);
listener.requestDestroyed(sre);
}
}

if (!_servletRequestAttributeListeners.isEmpty())
{
for (int i = _servletRequestAttributeListeners.size(); i-- > 0; )
for (ServletRequestAttributeListener listener : TypeUtil.reverse(_servletRequestAttributeListeners))
{
baseRequest.removeEventListener(_servletRequestAttributeListeners.get(i));
baseRequest.removeEventListener(listener);
}
}
}
Expand Down Expand Up @@ -1070,11 +1070,11 @@ protected void exitScope(Request request)
{
if (!_contextListeners.isEmpty())
{
for (int i = _contextListeners.size(); i-- > 0; )
for (ContextScopeListener listener : TypeUtil.reverse(_contextListeners))
{
try
{
_contextListeners.get(i).exitScope(_apiContext, request);
listener.exitScope(_apiContext, request);
}
catch (Throwable e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.eclipse.jetty.session.SessionManager;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.TypeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -834,9 +835,9 @@ public void onSessionDestroyed(Session session)
Runnable r = () ->
{
HttpSessionEvent event = new HttpSessionEvent(session.getApi());
for (int i = _sessionListeners.size() - 1; i >= 0; i--)
for (HttpSessionListener listener : TypeUtil.reverse(_sessionListeners))
{
_sessionListeners.get(i).sessionDestroyed(event);
listener.sessionDestroyed(event);
}
};
_contextHandler.getCoreContextHandler().getContext().run(r);
Expand Down
Loading