From 69aba3edf3859f2926916e4b8535f5c914c52763 Mon Sep 17 00:00:00 2001 From: yyj8 <1012293987@qq.com> Date: Sun, 8 Dec 2024 11:58:31 +0800 Subject: [PATCH] [improvement][broker] If there is a deadlock in the service, the probe should return a failure because the service may be unavailable. Add unit testing code. --- .../common/configuration/VipStatus.java | 36 +- .../configuration/MockServletContext.java | 255 +++++++++++ .../common/configuration/VipStatusTest.java | 403 +----------------- 3 files changed, 300 insertions(+), 394 deletions(-) create mode 100644 pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/MockServletContext.java diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java index 21158359fc086..203d03e974062 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java @@ -22,6 +22,7 @@ import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; +import java.time.Clock; import java.util.Arrays; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -31,6 +32,7 @@ import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Context; import javax.ws.rs.core.Response.Status; +import com.google.common.annotations.VisibleForTesting; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.common.util.ThreadDumpUtil; @@ -47,33 +49,47 @@ public class VipStatus { // log a full thread dump when a deadlock is detected in status check once every 10 minutes // to prevent excessive logging private static final long LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED = 600000L; - private static volatile long lastCheckStatusTimestamp; - private static volatile long lastPrintThreadDumpTimestamp; - // Rate limit status checks to every 500ms to prevent DoS private static final long CHECK_STATUS_INTERVAL = 500L; + + private static volatile long lastCheckStatusTimestamp; + private static volatile long lastPrintThreadDumpTimestamp; private static volatile boolean lastCheckStatusResult; + private long printThreadDumpIntervalMs; + private Clock clock; + @Context protected ServletContext servletContext; + public VipStatus() { + this.clock = Clock.systemUTC(); + this.printThreadDumpIntervalMs = LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED; + } + + @VisibleForTesting + public VipStatus(ServletContext servletContext, long printThreadDumpIntervalMs) { + this.servletContext = servletContext; + this.printThreadDumpIntervalMs = printThreadDumpIntervalMs; + this.clock = Clock.systemUTC(); + } + @GET public String checkStatus() { // Locking classes to avoid deadlock detection in multi-thread concurrent requests. synchronized (VipStatus.class) { - if (System.currentTimeMillis() - lastCheckStatusTimestamp < CHECK_STATUS_INTERVAL) { + if (clock.millis() - lastCheckStatusTimestamp < CHECK_STATUS_INTERVAL) { if (lastCheckStatusResult) { return "OK"; } else { throw new WebApplicationException(Status.SERVICE_UNAVAILABLE); } } - lastCheckStatusTimestamp = System.currentTimeMillis(); + lastCheckStatusTimestamp = clock.millis(); String statusFilePath = (String) servletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH); @SuppressWarnings("unchecked") Supplier isReadyProbe = (Supplier) servletContext.getAttribute(ATTRIBUTE_IS_READY_PROBE); - boolean isReady = isReadyProbe != null ? isReadyProbe.get() : true; if (statusFilePath != null) { @@ -89,12 +105,11 @@ public String checkStatus() { .map(threadInfo -> threadInfo.getThreadName() + "(tid=" + threadInfo.getThreadId() + ")") .collect(Collectors.joining(", ")); - if (System.currentTimeMillis() - lastPrintThreadDumpTimestamp - > LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED) { + if (clock.millis() - lastPrintThreadDumpTimestamp > printThreadDumpIntervalMs) { String diagnosticResult = ThreadDumpUtil.buildThreadDiagnosticString(); log.error("Deadlock detected, service may be unavailable, " + "thread stack details are as follows: {}.", diagnosticResult); - lastPrintThreadDumpTimestamp = System.currentTimeMillis(); + lastPrintThreadDumpTimestamp = clock.millis(); } else { log.error("Deadlocked threads detected. {}", threadNames); } @@ -111,5 +126,4 @@ public String checkStatus() { throw new WebApplicationException(Status.NOT_FOUND); } } - -} +} \ No newline at end of file diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/MockServletContext.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/MockServletContext.java new file mode 100644 index 0000000000000..93df67c93d820 --- /dev/null +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/MockServletContext.java @@ -0,0 +1,255 @@ +package org.apache.pulsar.common.configuration; + +import org.eclipse.jetty.util.AttributesMap; + +import javax.servlet.*; +import javax.servlet.descriptor.JspConfigDescriptor; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Enumeration; +import java.util.EventListener; +import java.util.Map; +import java.util.Set; + +public class MockServletContext extends AttributesMap implements ServletContext { + @Override + public String getContextPath() { + return null; + } + + @Override + public ServletContext getContext(String s) { + return null; + } + + @Override + public int getMajorVersion() { + return 0; + } + + @Override + public int getMinorVersion() { + return 0; + } + + @Override + public int getEffectiveMajorVersion() { + return 0; + } + + @Override + public int getEffectiveMinorVersion() { + return 0; + } + + @Override + public String getMimeType(String s) { + return null; + } + + @Override + public Set getResourcePaths(String s) { + return null; + } + + @Override + public URL getResource(String s) throws MalformedURLException { + return null; + } + + @Override + public InputStream getResourceAsStream(String s) { + return null; + } + + @Override + public RequestDispatcher getRequestDispatcher(String s) { + return null; + } + + @Override + public RequestDispatcher getNamedDispatcher(String s) { + return null; + } + + @Override + public Servlet getServlet(String s) throws ServletException { + return null; + } + + @Override + public Enumeration getServlets() { + return null; + } + + @Override + public Enumeration getServletNames() { + return null; + } + + @Override + public void log(String s) { + + } + + @Override + public void log(Exception e, String s) { + + } + + @Override + public void log(String s, Throwable throwable) { + + } + + @Override + public String getRealPath(String s) { + return null; + } + + @Override + public String getServerInfo() { + return null; + } + + @Override + public String getInitParameter(String s) { + return null; + } + + @Override + public Enumeration getInitParameterNames() { + return null; + } + + @Override + public boolean setInitParameter(String s, String s1) { + return false; + } + + @Override + public String getServletContextName() { + return null; + } + + @Override + public ServletRegistration.Dynamic addServlet(String s, String s1) { + return null; + } + + @Override + public ServletRegistration.Dynamic addServlet(String s, Servlet servlet) { + return null; + } + + @Override + public ServletRegistration.Dynamic addServlet(String s, Class aClass) { + return null; + } + + @Override + public T createServlet(Class aClass) throws ServletException { + return null; + } + + @Override + public ServletRegistration getServletRegistration(String s) { + return null; + } + + @Override + public Map getServletRegistrations() { + return null; + } + + @Override + public FilterRegistration.Dynamic addFilter(String s, String s1) { + return null; + } + + @Override + public FilterRegistration.Dynamic addFilter(String s, Filter filter) { + return null; + } + + @Override + public FilterRegistration.Dynamic addFilter(String s, Class aClass) { + return null; + } + + @Override + public T createFilter(Class aClass) throws ServletException { + return null; + } + + @Override + public FilterRegistration getFilterRegistration(String s) { + return null; + } + + @Override + public Map getFilterRegistrations() { + return null; + } + + @Override + public SessionCookieConfig getSessionCookieConfig() { + return null; + } + + @Override + public void setSessionTrackingModes(Set set) { + + } + + @Override + public Set getDefaultSessionTrackingModes() { + return null; + } + + @Override + public Set getEffectiveSessionTrackingModes() { + return null; + } + + @Override + public void addListener(String s) { + + } + + @Override + public void addListener(T t) { + + } + + @Override + public void addListener(Class aClass) { + + } + + @Override + public T createListener(Class aClass) throws ServletException { + return null; + } + + @Override + public JspConfigDescriptor getJspConfigDescriptor() { + return null; + } + + @Override + public ClassLoader getClassLoader() { + return null; + } + + @Override + public void declareRoles(String... strings) { + + } + + @Override + public String getVirtualServerName() { + return null; + } +} diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java index 1cafbd26141af..c47336a794511 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java @@ -19,36 +19,13 @@ package org.apache.pulsar.common.configuration; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; import java.io.File; import java.io.IOException; -import java.io.InputStream; -import java.lang.management.ManagementFactory; -import java.lang.management.ThreadInfo; -import java.lang.management.ThreadMXBean; -import java.net.MalformedURLException; -import java.net.URL; -import java.util.Arrays; -import java.util.Enumeration; -import java.util.EventListener; -import java.util.Map; -import java.util.Set; import java.util.function.Supplier; -import java.util.stream.Collectors; -import javax.servlet.Filter; -import javax.servlet.FilterRegistration; -import javax.servlet.RequestDispatcher; -import javax.servlet.Servlet; -import javax.servlet.ServletContext; -import javax.servlet.ServletException; -import javax.servlet.ServletRegistration; -import javax.servlet.SessionCookieConfig; -import javax.servlet.SessionTrackingMode; -import javax.servlet.descriptor.JspConfigDescriptor; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Response; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.common.util.ThreadDumpUtil; -import org.eclipse.jetty.util.AttributesMap; import org.testng.annotations.AfterTest; import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; @@ -58,35 +35,27 @@ public class VipStatusTest { public static final String ATTRIBUTE_STATUS_FILE_PATH = "statusFilePath"; public static final String ATTRIBUTE_IS_READY_PROBE = "isReadyProbe"; - - // log a full thread dump when a deadlock is detected in status check once every 10 minutes - // to prevent excessive logging private static final long LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED = 10000L; - private static volatile long lastCheckStatusTimestamp; - private static volatile long lastPrintThreadDumpTimestamp; - // Rate limit status checks to every 500ms to prevent DoS private static final long CHECK_STATUS_INTERVAL = 500L; - private static volatile boolean lastCheckStatusResult; - private static String CHECK_RESULT_OK = "OK"; - private static String CHECK_RESULT_NOT_OK = "NOT_OK"; - private int checkThrottlingCount; - private int checkNoThrottlingCount; - private int printDeadlockThreadDumpCount; private MockServletContext mockServletContext = new MockServletContext(); + private VipStatus vipStatus; + @BeforeTest public void setup() throws IOException { String statusFilePath = "/tmp/status.html"; File file = new File(statusFilePath); file.createNewFile(); + mockServletContext.setAttribute(ATTRIBUTE_STATUS_FILE_PATH, statusFilePath); Supplier isReadyProbe = () -> true; mockServletContext.setAttribute(ATTRIBUTE_IS_READY_PROBE, isReadyProbe); + vipStatus = new VipStatus(mockServletContext, LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED); } @Test - public void testVipStatusCheckStatus() throws InterruptedException { + public void testVipStatusCheckStatus() { // No deadlocks testVipStatusCheckStatusWithoutDeadlock(); // There is a deadlock @@ -100,355 +69,23 @@ public void release() throws IOException { file.deleteOnExit(); } - public void testVipStatusCheckStatusWithoutDeadlock() throws InterruptedException { - //1.No DOS attacks - for (int i = 0; i < 10; i++) { - assertEquals(checkStatus(), CHECK_RESULT_OK); - Thread.sleep(CHECK_STATUS_INTERVAL + 10); - } - assertTrue(checkNoThrottlingCount == 10); - assertTrue(checkThrottlingCount == 0); - checkNoThrottlingCount = 0; - checkThrottlingCount = 0; - - //2.There are DOS attacks - for (int i = 0; i < 10; i++) { - assertEquals(checkStatus(), CHECK_RESULT_OK); - } - assertTrue(checkNoThrottlingCount >= 1); - assertTrue(checkThrottlingCount >= 1); - checkNoThrottlingCount = 0; - checkThrottlingCount = 0; + public void testVipStatusCheckStatusWithoutDeadlock() { + assertEquals(vipStatus.checkStatus(), "OK"); } - public void testVipStatusCheckStatusWithDeadlock() throws InterruptedException { + public void testVipStatusCheckStatusWithDeadlock() { MockDeadlock.startDeadlock(); - //1.No DOS attacks - for (int i = 0; i < 10; i++) { - assertEquals(checkStatus(), CHECK_RESULT_NOT_OK); - Thread.sleep(CHECK_STATUS_INTERVAL + 10); - } - assertTrue(checkNoThrottlingCount == 10); - assertTrue(checkThrottlingCount == 0); - checkNoThrottlingCount = 0; - checkThrottlingCount = 0; - - //2.There are DOS attacks - for (int i = 0; i < 10; i++) { - assertEquals(checkStatus(), CHECK_RESULT_NOT_OK); - } - assertTrue(checkNoThrottlingCount >= 1); - assertTrue(checkThrottlingCount >= 1); - checkNoThrottlingCount = 0; - checkThrottlingCount = 0; - - //3.print deadlock info - for (int i = 0; i < 10; i++) { - assertEquals(checkStatus(), CHECK_RESULT_NOT_OK); - Thread.sleep(LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED / 2); - } - assertTrue(printDeadlockThreadDumpCount >= 5); - } - - public String checkStatus() { - // Locking classes to avoid deadlock detection in multi-thread concurrent requests. - synchronized (VipStatus.class) { - if (System.currentTimeMillis() - lastCheckStatusTimestamp < CHECK_STATUS_INTERVAL) { - checkThrottlingCount ++; - if (lastCheckStatusResult) { - return CHECK_RESULT_OK; - } else { - return CHECK_RESULT_NOT_OK; - } - } - lastCheckStatusTimestamp = System.currentTimeMillis(); - - String statusFilePath = (String) mockServletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH); - @SuppressWarnings("unchecked") - Supplier isReadyProbe = (Supplier) mockServletContext.getAttribute(ATTRIBUTE_IS_READY_PROBE); - - boolean isReady = isReadyProbe != null ? isReadyProbe.get() : true; - - if (statusFilePath != null) { - File statusFile = new File(statusFilePath); - if (isReady && statusFile.exists() && statusFile.isFile()) { - // check deadlock - ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); - long[] threadIds = threadBean.findDeadlockedThreads(); - if (threadIds != null && threadIds.length > 0) { - ThreadInfo[] threadInfos = threadBean.getThreadInfo(threadIds, false, - false); - String threadNames = Arrays.stream(threadInfos) - .map(threadInfo -> threadInfo.getThreadName() - + "(tid=" + threadInfo.getThreadId() + ")") - .collect(Collectors.joining(", ")); - if (System.currentTimeMillis() - lastPrintThreadDumpTimestamp - > LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED) { - String diagnosticResult = ThreadDumpUtil.buildThreadDiagnosticString(); - log.error("Deadlock detected, service may be unavailable, " - + "thread stack details are as follows: {}.", diagnosticResult); - lastPrintThreadDumpTimestamp = System.currentTimeMillis(); - printDeadlockThreadDumpCount ++; - } else { - log.error("Deadlocked threads detected. {}", threadNames); - } - lastCheckStatusResult = false; - checkNoThrottlingCount ++; - return CHECK_RESULT_NOT_OK; - } else { - checkNoThrottlingCount ++; - lastCheckStatusResult = true; - return CHECK_RESULT_OK; - } - } - } - checkNoThrottlingCount ++; - lastCheckStatusResult = false; - log.warn("Failed to access \"status.html\". The service is not ready"); - return CHECK_RESULT_OK; - } - } - - public class MockServletContext extends AttributesMap implements ServletContext { - - @Override - public String getContextPath() { - return null; - } - - @Override - public ServletContext getContext(String s) { - return null; - } - - @Override - public int getMajorVersion() { - return 0; - } - - @Override - public int getMinorVersion() { - return 0; - } - - @Override - public int getEffectiveMajorVersion() { - return 0; - } - - @Override - public int getEffectiveMinorVersion() { - return 0; - } - - @Override - public String getMimeType(String s) { - return null; - } - - @Override - public Set getResourcePaths(String s) { - return null; - } - - @Override - public URL getResource(String s) throws MalformedURLException { - return null; - } - - @Override - public InputStream getResourceAsStream(String s) { - return null; - } - - @Override - public RequestDispatcher getRequestDispatcher(String s) { - return null; - } - - @Override - public RequestDispatcher getNamedDispatcher(String s) { - return null; - } - - @Override - public Servlet getServlet(String s) throws ServletException { - return null; - } - - @Override - public Enumeration getServlets() { - return null; - } - - @Override - public Enumeration getServletNames() { - return null; - } - - @Override - public void log(String s) { - - } - - @Override - public void log(Exception e, String s) { - - } - - @Override - public void log(String s, Throwable throwable) { - - } - - @Override - public String getRealPath(String s) { - return null; - } - - @Override - public String getServerInfo() { - return null; - } - - @Override - public String getInitParameter(String s) { - return null; - } - - @Override - public Enumeration getInitParameterNames() { - return null; - } - - @Override - public boolean setInitParameter(String s, String s1) { - return false; - } - - @Override - public String getServletContextName() { - return null; - } - - @Override - public ServletRegistration.Dynamic addServlet(String s, String s1) { - return null; - } - - @Override - public ServletRegistration.Dynamic addServlet(String s, Servlet servlet) { - return null; - } - - @Override - public ServletRegistration.Dynamic addServlet(String s, Class aClass) { - return null; - } - - @Override - public T createServlet(Class aClass) throws ServletException { - return null; - } - - @Override - public ServletRegistration getServletRegistration(String s) { - return null; - } - - @Override - public Map getServletRegistrations() { - return null; - } - - @Override - public FilterRegistration.Dynamic addFilter(String s, String s1) { - return null; - } - - @Override - public FilterRegistration.Dynamic addFilter(String s, Filter filter) { - return null; - } - - @Override - public FilterRegistration.Dynamic addFilter(String s, Class aClass) { - return null; - } - - @Override - public T createFilter(Class aClass) throws ServletException { - return null; - } - - @Override - public FilterRegistration getFilterRegistration(String s) { - return null; - } - - @Override - public Map getFilterRegistrations() { - return null; - } - - @Override - public SessionCookieConfig getSessionCookieConfig() { - return null; - } - - @Override - public void setSessionTrackingModes(Set set) { - - } - - @Override - public Set getDefaultSessionTrackingModes() { - return null; - } - - @Override - public Set getEffectiveSessionTrackingModes() { - return null; - } - - @Override - public void addListener(String s) { - - } - - @Override - public void addListener(T t) { - - } - - @Override - public void addListener(Class aClass) { - - } - - @Override - public T createListener(Class aClass) throws ServletException { - return null; - } - - @Override - public JspConfigDescriptor getJspConfigDescriptor() { - return null; - } - - @Override - public ClassLoader getClassLoader() { - return null; - } - - @Override - public void declareRoles(String... strings) { - + boolean asExpected = true; + try { + vipStatus.checkStatus(); + asExpected = false; + System.out.println("Simulated deadlock, no deadlock detected, not as expected."); + } catch (Exception wae) { + System.out.println("Simulated deadlock and detected it, as expected."); } - @Override - public String getVirtualServerName() { - return null; + if (!asExpected) { + throw new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE); } } @@ -499,4 +136,4 @@ public void run() { } } } -} +} \ No newline at end of file