diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java index 902593b7bf678..a7c4244899069 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java @@ -44,6 +44,7 @@ import org.eclipse.jetty.server.handler.RequestLogHandler; import org.eclipse.jetty.server.handler.ResourceHandler; import org.eclipse.jetty.server.handler.StatisticsHandler; +import org.eclipse.jetty.server.handler.gzip.GzipHandler; import org.eclipse.jetty.servlet.FilterHolder; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; @@ -258,15 +259,18 @@ public void addFilters(ServletContextHandler context, boolean requiresAuthentica public void addServlet(String path, ServletHolder servletHolder, boolean requiresAuthentication, Map attributeMap) { - ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); + ServletContextHandler servletContextHandler = new ServletContextHandler(ServletContextHandler.SESSIONS); // Notice: each context path should be unique, but there's nothing here to verify that - context.setContextPath(path); - context.addServlet(servletHolder, MATCH_ALL); + servletContextHandler.setContextPath(path); + servletContextHandler.addServlet(servletHolder, MATCH_ALL); if (attributeMap != null) { - attributeMap.forEach(context::setAttribute); + attributeMap.forEach(servletContextHandler::setAttribute); } - filterInitializer.addFilters(context, requiresAuthentication); - handlers.add(context); + filterInitializer.addFilters(servletContextHandler, requiresAuthentication); + + GzipHandler gzipHandler = new GzipHandler(); + gzipHandler.setHandler(servletContextHandler); + handlers.add(gzipHandler); } public void addStaticResources(String basePath, String resourcePath) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java index 8fb95eed789d5..72437fe33743e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java @@ -23,11 +23,14 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; + import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import com.google.common.io.CharStreams; import com.google.common.io.Closeables; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; + +import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; @@ -44,6 +47,8 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.zip.GZIPInputStream; +import java.util.zip.ZipException; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManagerFactory; @@ -354,6 +359,73 @@ public void testBrokerReady() throws Exception { assertEquals(res.getResponseBody(), "ok"); } + @Test + public void testCompressOutputMetricsInPrometheus() throws Exception { + + setupEnv(true, false, false, false, -1, false); + + String metricsUrl = pulsar.getWebServiceAddress() + "/metrics/"; + + String[] command = {"curl", "-H", "Accept-Encoding: gzip", metricsUrl}; + + ProcessBuilder processBuilder = new ProcessBuilder(command); + Process process = processBuilder.start(); + + InputStream inputStream = process.getInputStream(); + + try { + GZIPInputStream gzipInputStream = new GZIPInputStream(inputStream); + + // Process the decompressed content + StringBuilder content = new StringBuilder(); + int data; + while ((data = gzipInputStream.read()) != -1) { + content.append((char) data); + } + log.info("Response Content: {}", content); + + process.waitFor(); + assertTrue(content.toString().contains("process_cpu_seconds_total")); + } catch (IOException e) { + log.error("Failed to decompress the content, likely the content is not compressed ", e); + fail(); + } + } + + @Test + public void testUnCompressOutputMetricsInPrometheus() throws Exception { + + setupEnv(true, false, false, false, -1, false); + + String metricsUrl = pulsar.getWebServiceAddress() + "/metrics/"; + + String[] command = {"curl", metricsUrl}; + + ProcessBuilder processBuilder = new ProcessBuilder(command); + Process process = processBuilder.start(); + + InputStream inputStream = process.getInputStream(); + try { + GZIPInputStream gzipInputStream = new GZIPInputStream(inputStream); + fail(); + } catch (IOException e) { + log.error("Failed to decompress the content, likely the content is not compressed ", e); + assertTrue(e instanceof ZipException); + } + + BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); + StringBuilder content = new StringBuilder(); + String line; + while ((line = reader.readLine()) != null) { + content.append(line + "\n"); + } + + log.info("Response Content: {}", content); + + process.waitFor(); + assertTrue(content.toString().contains("process_cpu_seconds_total")); + } + private String makeHttpRequest(boolean useTls, boolean useAuth) throws Exception { InputStream response = null; try {