Skip to content

Commit

Permalink
[improvement][broker] If there is a deadlock in the service, the prob…
Browse files Browse the repository at this point in the history
…e should return a failure because the service may be unavailable. Add unit testing code.
  • Loading branch information
yyj8 committed Dec 8, 2024
1 parent e1a1dd5 commit 69aba3e
Show file tree
Hide file tree
Showing 3 changed files with 300 additions and 394 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.time.Clock;
import java.util.Arrays;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand All @@ -31,6 +32,7 @@
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response.Status;
import com.google.common.annotations.VisibleForTesting;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.util.ThreadDumpUtil;

Expand All @@ -47,33 +49,47 @@ public class VipStatus {
// log a full thread dump when a deadlock is detected in status check once every 10 minutes
// to prevent excessive logging
private static final long LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED = 600000L;
private static volatile long lastCheckStatusTimestamp;
private static volatile long lastPrintThreadDumpTimestamp;

// Rate limit status checks to every 500ms to prevent DoS
private static final long CHECK_STATUS_INTERVAL = 500L;

private static volatile long lastCheckStatusTimestamp;
private static volatile long lastPrintThreadDumpTimestamp;
private static volatile boolean lastCheckStatusResult;

private long printThreadDumpIntervalMs;
private Clock clock;

@Context
protected ServletContext servletContext;

public VipStatus() {
this.clock = Clock.systemUTC();
this.printThreadDumpIntervalMs = LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED;
}

@VisibleForTesting
public VipStatus(ServletContext servletContext, long printThreadDumpIntervalMs) {
this.servletContext = servletContext;
this.printThreadDumpIntervalMs = printThreadDumpIntervalMs;
this.clock = Clock.systemUTC();
}

@GET
public String checkStatus() {
// Locking classes to avoid deadlock detection in multi-thread concurrent requests.
synchronized (VipStatus.class) {
if (System.currentTimeMillis() - lastCheckStatusTimestamp < CHECK_STATUS_INTERVAL) {
if (clock.millis() - lastCheckStatusTimestamp < CHECK_STATUS_INTERVAL) {
if (lastCheckStatusResult) {
return "OK";
} else {
throw new WebApplicationException(Status.SERVICE_UNAVAILABLE);
}
}
lastCheckStatusTimestamp = System.currentTimeMillis();
lastCheckStatusTimestamp = clock.millis();

String statusFilePath = (String) servletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH);
@SuppressWarnings("unchecked")
Supplier<Boolean> isReadyProbe = (Supplier<Boolean>) servletContext.getAttribute(ATTRIBUTE_IS_READY_PROBE);

boolean isReady = isReadyProbe != null ? isReadyProbe.get() : true;

if (statusFilePath != null) {
Expand All @@ -89,12 +105,11 @@ public String checkStatus() {
.map(threadInfo -> threadInfo.getThreadName()
+ "(tid=" + threadInfo.getThreadId() + ")")
.collect(Collectors.joining(", "));
if (System.currentTimeMillis() - lastPrintThreadDumpTimestamp
> LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED) {
if (clock.millis() - lastPrintThreadDumpTimestamp > printThreadDumpIntervalMs) {
String diagnosticResult = ThreadDumpUtil.buildThreadDiagnosticString();
log.error("Deadlock detected, service may be unavailable, "
+ "thread stack details are as follows: {}.", diagnosticResult);
lastPrintThreadDumpTimestamp = System.currentTimeMillis();
lastPrintThreadDumpTimestamp = clock.millis();
} else {
log.error("Deadlocked threads detected. {}", threadNames);
}
Expand All @@ -111,5 +126,4 @@ public String checkStatus() {
throw new WebApplicationException(Status.NOT_FOUND);
}
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
package org.apache.pulsar.common.configuration;

import org.eclipse.jetty.util.AttributesMap;

import javax.servlet.*;
import javax.servlet.descriptor.JspConfigDescriptor;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Enumeration;
import java.util.EventListener;
import java.util.Map;
import java.util.Set;

public class MockServletContext extends AttributesMap implements ServletContext {
@Override
public String getContextPath() {
return null;
}

@Override
public ServletContext getContext(String s) {
return null;
}

@Override
public int getMajorVersion() {
return 0;
}

@Override
public int getMinorVersion() {
return 0;
}

@Override
public int getEffectiveMajorVersion() {
return 0;
}

@Override
public int getEffectiveMinorVersion() {
return 0;
}

@Override
public String getMimeType(String s) {
return null;
}

@Override
public Set<String> getResourcePaths(String s) {
return null;
}

@Override
public URL getResource(String s) throws MalformedURLException {
return null;
}

@Override
public InputStream getResourceAsStream(String s) {
return null;
}

@Override
public RequestDispatcher getRequestDispatcher(String s) {
return null;
}

@Override
public RequestDispatcher getNamedDispatcher(String s) {
return null;
}

@Override
public Servlet getServlet(String s) throws ServletException {
return null;
}

@Override
public Enumeration<Servlet> getServlets() {
return null;
}

@Override
public Enumeration<String> getServletNames() {
return null;
}

@Override
public void log(String s) {

}

@Override
public void log(Exception e, String s) {

}

@Override
public void log(String s, Throwable throwable) {

}

@Override
public String getRealPath(String s) {
return null;
}

@Override
public String getServerInfo() {
return null;
}

@Override
public String getInitParameter(String s) {
return null;
}

@Override
public Enumeration<String> getInitParameterNames() {
return null;
}

@Override
public boolean setInitParameter(String s, String s1) {
return false;
}

@Override
public String getServletContextName() {
return null;
}

@Override
public ServletRegistration.Dynamic addServlet(String s, String s1) {
return null;
}

@Override
public ServletRegistration.Dynamic addServlet(String s, Servlet servlet) {
return null;
}

@Override
public ServletRegistration.Dynamic addServlet(String s, Class<? extends Servlet> aClass) {
return null;
}

@Override
public <T extends Servlet> T createServlet(Class<T> aClass) throws ServletException {
return null;
}

@Override
public ServletRegistration getServletRegistration(String s) {
return null;
}

@Override
public Map<String, ? extends ServletRegistration> getServletRegistrations() {
return null;
}

@Override
public FilterRegistration.Dynamic addFilter(String s, String s1) {
return null;
}

@Override
public FilterRegistration.Dynamic addFilter(String s, Filter filter) {
return null;
}

@Override
public FilterRegistration.Dynamic addFilter(String s, Class<? extends Filter> aClass) {
return null;
}

@Override
public <T extends Filter> T createFilter(Class<T> aClass) throws ServletException {
return null;
}

@Override
public FilterRegistration getFilterRegistration(String s) {
return null;
}

@Override
public Map<String, ? extends FilterRegistration> getFilterRegistrations() {
return null;
}

@Override
public SessionCookieConfig getSessionCookieConfig() {
return null;
}

@Override
public void setSessionTrackingModes(Set<SessionTrackingMode> set) {

}

@Override
public Set<SessionTrackingMode> getDefaultSessionTrackingModes() {
return null;
}

@Override
public Set<SessionTrackingMode> getEffectiveSessionTrackingModes() {
return null;
}

@Override
public void addListener(String s) {

}

@Override
public <T extends EventListener> void addListener(T t) {

}

@Override
public void addListener(Class<? extends EventListener> aClass) {

}

@Override
public <T extends EventListener> T createListener(Class<T> aClass) throws ServletException {
return null;
}

@Override
public JspConfigDescriptor getJspConfigDescriptor() {
return null;
}

@Override
public ClassLoader getClassLoader() {
return null;
}

@Override
public void declareRoles(String... strings) {

}

@Override
public String getVirtualServerName() {
return null;
}
}
Loading

0 comments on commit 69aba3e

Please sign in to comment.