From a0abfd65da1623572af0fabd02e4203d38fe9b95 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Fri, 5 Jun 2020 20:53:01 +0800 Subject: [PATCH] update test code --- pom.xml | 11 ++ .../infra/pegasus/client/ClientOptions.java | 81 +++++++---- .../{Falcon.java => FalconCollector.java} | 19 ++- ...tricsReporter.java => FalconReporter.java} | 120 +++++++--------- .../infra/pegasus/metrics/MetricsManager.java | 15 +- .../infra/pegasus/metrics/MetricsPool.java | 34 ++--- .../pegasus/metrics/PegasusCollector.java | 6 - .../infra/pegasus/metrics/PegasusMonitor.java | 8 ++ .../infra/pegasus/metrics/Prometheus.java | 122 ----------------- .../pegasus/metrics/PrometheusCollector.java | 129 ++++++++++++++++++ .../pegasus/operator/client_operator.java | 14 +- .../infra/pegasus/rpc/ClusterOptions.java | 1 + .../infra/pegasus/client/TestBasic.java | 61 ++++----- .../pegasus/metrics/MetricsPoolTest.java | 20 +-- src/test/resource/pegasus.properties | 3 +- 15 files changed, 331 insertions(+), 313 deletions(-) rename src/main/java/com/xiaomi/infra/pegasus/metrics/{Falcon.java => FalconCollector.java} (85%) rename src/main/java/com/xiaomi/infra/pegasus/metrics/{MetricsReporter.java => FalconReporter.java} (67%) delete mode 100644 src/main/java/com/xiaomi/infra/pegasus/metrics/PegasusCollector.java create mode 100644 src/main/java/com/xiaomi/infra/pegasus/metrics/PegasusMonitor.java delete mode 100644 src/main/java/com/xiaomi/infra/pegasus/metrics/Prometheus.java create mode 100644 src/main/java/com/xiaomi/infra/pegasus/metrics/PrometheusCollector.java diff --git a/pom.xml b/pom.xml index e37ee80d..3f8a49bb 100644 --- a/pom.xml +++ b/pom.xml @@ -6,6 +6,7 @@ jar 1.12-thrift-0.11.0-inlined-SNAPSHOT Pegasus Java Client + org.junit.jupiter @@ -80,6 +81,16 @@ javax.annotation-api 1.3.2 + + io.prometheus + simpleclient + 0.4.0 + + + io.prometheus + simpleclient_httpserver + 0.4.0 + diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java b/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java index 2552678c..5b1e9710 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java @@ -23,7 +23,7 @@ * .operationTimeout(Duration.ofMillis(1000)) * .asyncWorkers(4) * .enablePerfCounter(false) - * .falconPerfCounterTags("") + * .perfCounterTags("") * .falconPushInterval(Duration.ofSeconds(10)) * .metaQueryTimeout(Duration.ofMillis(5000)) * .build(); @@ -36,7 +36,8 @@ public class ClientOptions { public static final Duration DEFAULT_OPERATION_TIMEOUT = Duration.ofMillis(1000); public static final int DEFAULT_ASYNC_WORKERS = Runtime.getRuntime().availableProcessors(); public static final boolean DEFAULT_ENABLE_PERF_COUNTER = true; - public static final String DEFAULT_FALCON_PERF_COUNTER_TAGS = ""; + public static final String DEFAULT_PERF_COUNTER_TYPE = "falcon"; + public static final String DEFAULT_PERF_COUNTER_TAGS = ""; public static final Duration DEFAULT_FALCON_PUSH_INTERVAL = Duration.ofSeconds(10); public static final boolean DEFAULT_ENABLE_WRITE_LIMIT = true; public static final Duration DEFAULT_META_QUERY_TIMEOUT = Duration.ofMillis(5000); @@ -45,7 +46,8 @@ public class ClientOptions { private final Duration operationTimeout; private final int asyncWorkers; private final boolean enablePerfCounter; - private final String falconPerfCounterTags; + private final String perfCounterType; + private final String perfCounterTags; private final Duration falconPushInterval; private final boolean enableWriteLimit; private final Duration metaQueryTimeout; @@ -55,7 +57,8 @@ protected ClientOptions(Builder builder) { this.operationTimeout = builder.operationTimeout; this.asyncWorkers = builder.asyncWorkers; this.enablePerfCounter = builder.enablePerfCounter; - this.falconPerfCounterTags = builder.falconPerfCounterTags; + this.perfCounterType = builder.perfCounterType; + this.perfCounterTags = builder.perfCounterTags; this.falconPushInterval = builder.falconPushInterval; this.enableWriteLimit = builder.enableWriteLimit; this.metaQueryTimeout = builder.metaQueryTimeout; @@ -66,7 +69,8 @@ protected ClientOptions(ClientOptions original) { this.operationTimeout = original.getOperationTimeout(); this.asyncWorkers = original.getAsyncWorkers(); this.enablePerfCounter = original.isEnablePerfCounter(); - this.falconPerfCounterTags = original.getFalconPerfCounterTags(); + this.perfCounterType = original.perfCounterType; + this.perfCounterTags = original.getPerfCounterTags(); this.falconPushInterval = original.getFalconPushInterval(); this.enableWriteLimit = original.isWriteLimitEnabled(); this.metaQueryTimeout = original.getMetaQueryTimeout(); @@ -111,7 +115,8 @@ public boolean equals(Object options) { && this.operationTimeout.toMillis() == clientOptions.operationTimeout.toMillis() && this.asyncWorkers == clientOptions.asyncWorkers && this.enablePerfCounter == clientOptions.enablePerfCounter - && this.falconPerfCounterTags.equals(clientOptions.falconPerfCounterTags) + && this.perfCounterType.equals(clientOptions.perfCounterType) + && this.perfCounterTags.equals(clientOptions.perfCounterTags) && this.falconPushInterval.toMillis() == clientOptions.falconPushInterval.toMillis() && this.enableWriteLimit == clientOptions.enableWriteLimit && this.metaQueryTimeout.toMillis() == clientOptions.metaQueryTimeout.toMillis(); @@ -129,10 +134,12 @@ public String toString() { + operationTimeout.toMillis() + ", asyncWorkers=" + asyncWorkers - + ", enablePerfCounter=" + + ", perfCounterType=" + enablePerfCounter - + ", falconPerfCounterTags='" - + falconPerfCounterTags + + ", perfCounterTags='" + + perfCounterType + + ", enablePerfCounter=" + + perfCounterTags + '\'' + ", falconPushInterval(s)=" + falconPushInterval.getSeconds() @@ -149,7 +156,8 @@ public static class Builder { private Duration operationTimeout = DEFAULT_OPERATION_TIMEOUT; private int asyncWorkers = DEFAULT_ASYNC_WORKERS; private boolean enablePerfCounter = DEFAULT_ENABLE_PERF_COUNTER; - private String falconPerfCounterTags = DEFAULT_FALCON_PERF_COUNTER_TAGS; + private String perfCounterType = DEFAULT_PERF_COUNTER_TYPE; + private String perfCounterTags = DEFAULT_PERF_COUNTER_TAGS; private Duration falconPushInterval = DEFAULT_FALCON_PUSH_INTERVAL; private boolean enableWriteLimit = DEFAULT_ENABLE_WRITE_LIMIT; private Duration metaQueryTimeout = DEFAULT_META_QUERY_TIMEOUT; @@ -194,8 +202,8 @@ public Builder asyncWorkers(int asyncWorkers) { /** * Whether to enable performance statistics. If true, the client will periodically report - * metrics to local falcon agent (currently we only support falcon as monitoring system). - * Defaults to {@literal true}, see {@link #DEFAULT_ENABLE_PERF_COUNTER}. + * metrics to local falcon agent (if set falcon as monitoring system) or open prometheus + * collector http server. Defaults to {@literal true}, see {@link #DEFAULT_ENABLE_PERF_COUNTER}. * * @param enablePerfCounter enablePerfCounter * @return {@code this} @@ -205,22 +213,34 @@ public Builder enablePerfCounter(boolean enablePerfCounter) { return this; } + /** + * set the perf-counter type, now only support falcon and prometheus, Defaults to {@literal + * falcon}, see {@link #DEFAULT_PERF_COUNTER_TYPE} + * + * @param perfCounterType perfCounterType + * @return this + */ + public Builder perfCounterType(String perfCounterType) { + this.perfCounterType = perfCounterType; + return this; + } + /** * Additional tags for falcon metrics. For example: * "cluster=c3srv-ad,job=recommend-service-history". Defaults to empty string, see {@link - * #DEFAULT_FALCON_PERF_COUNTER_TAGS}. + * #DEFAULT_PERF_COUNTER_TAGS}. * - * @param falconPerfCounterTags falconPerfCounterTags + * @param perfCounterTags perfCounterTags * @return {@code this} */ - public Builder falconPerfCounterTags(String falconPerfCounterTags) { - this.falconPerfCounterTags = falconPerfCounterTags; + public Builder perfCounterTags(String perfCounterTags) { + this.perfCounterTags = perfCounterTags; return this; } /** - * The interval to report metrics to local falcon agent. Defaults to {@literal 10s}, see {@link - * #DEFAULT_FALCON_PUSH_INTERVAL}. + * The interval to report metrics to local falcon agent(if set falcon as monitor system). + * Defaults to {@literal 10s}, see {@link #DEFAULT_FALCON_PUSH_INTERVAL}. * * @param falconPushInterval falconPushInterval * @return {@code this} @@ -279,7 +299,8 @@ public ClientOptions.Builder mutate() { .operationTimeout(getOperationTimeout()) .asyncWorkers(getAsyncWorkers()) .enablePerfCounter(isEnablePerfCounter()) - .falconPerfCounterTags(getFalconPerfCounterTags()) + .perfCounterType(getPerfCounterType()) + .perfCounterTags(getPerfCounterTags()) .falconPushInterval(getFalconPushInterval()) .enableWriteLimit(isWriteLimitEnabled()) .metaQueryTimeout(getMetaQueryTimeout()); @@ -316,8 +337,8 @@ public int getAsyncWorkers() { /** * Whether to enable performance statistics. If true, the client will periodically report metrics - * to local falcon agent (currently we only support falcon as monitoring system). Defaults to - * {@literal true}. + * to local falcon agent (if set falcon as monitoring system) or open prometheus collector http + * server. Defaults to {@literal true}. * * @return whether to enable performance statistics. */ @@ -326,16 +347,26 @@ public boolean isEnablePerfCounter() { } /** - * Additional tags for falcon metrics. Defaults to empty string. + * get perf-counter type, now only support falcon and prometheus + * + * @return perf-counter type + */ + public String getPerfCounterType() { + return perfCounterType; + } + + /** + * Additional tags for metrics. Defaults to empty string. * * @return additional tags for falcon metrics. */ - public String getFalconPerfCounterTags() { - return falconPerfCounterTags; + public String getPerfCounterTags() { + return perfCounterTags; } /** - * The interval to report metrics to local falcon agent. Defaults to {@literal 10s}. + * The interval to report metrics to local falcon agent(if set falcon as monitor system). Defaults + * to {@literal 10s}. * * @return the interval to report metrics to local falcon agent. */ diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/Falcon.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/FalconCollector.java similarity index 85% rename from src/main/java/com/xiaomi/infra/pegasus/metrics/Falcon.java rename to src/main/java/com/xiaomi/infra/pegasus/metrics/FalconCollector.java index 1dc14885..f8f1ee58 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/metrics/Falcon.java +++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/FalconCollector.java @@ -12,20 +12,19 @@ import org.json.JSONException; import org.json.JSONObject; -public class Falcon implements PegasusCollector { - +public class FalconCollector { private FalconMetric falconMetric = new FalconMetric(); private final MetricRegistry registry; public final String defaultTags; - public Falcon(String host, String tags, int reportStepSec, MetricRegistry registry) { + public FalconCollector(String host, String tags, int reportStepSec, MetricRegistry registry) { this.defaultTags = tags; this.registry = registry; falconMetric.endpoint = host; falconMetric.step = reportStepSec; } - public String updateMetric() { + public String metricsToJson() { falconMetric.timestamp = Tools.unixEpochMills() / 1000; StringBuilder builder = new StringBuilder(); @@ -54,8 +53,8 @@ public void genJsonsFromMeter(String name, Meter meter, StringBuilder output) throws JSONException { falconMetric.counterType = "GAUGE"; - falconMetric.metric = name + ".cps-1sec"; - falconMetric.tags = getTableTag(name, defaultTags, "@"); + falconMetric.metric = name + "_cps_1sec"; + falconMetric.tags = getTableTag(name, defaultTags); falconMetric.value = meter.getMeanRate(); oneMetricToJson(falconMetric, output); } @@ -65,14 +64,14 @@ public void genJsonsFromHistogram(String name, Histogram hist, StringBuilder out falconMetric.counterType = "GAUGE"; Snapshot s = hist.getSnapshot(); - falconMetric.metric = name + ".p99"; - falconMetric.tags = getTableTag(name, defaultTags, "@"); + falconMetric.metric = name + "_p99"; + falconMetric.tags = getTableTag(name, defaultTags); falconMetric.value = s.get99thPercentile(); oneMetricToJson(falconMetric, output); output.append(','); - falconMetric.metric = name + ".p999"; - falconMetric.tags = getTableTag(name, defaultTags, "@"); + falconMetric.metric = name + "_p999"; + falconMetric.tags = getTableTag(name, defaultTags); falconMetric.value = s.get999thPercentile(); oneMetricToJson(falconMetric, output); } diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsReporter.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/FalconReporter.java similarity index 67% rename from src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsReporter.java rename to src/main/java/com/xiaomi/infra/pegasus/metrics/FalconReporter.java index 3285c5aa..2be86666 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsReporter.java +++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/FalconReporter.java @@ -17,15 +17,15 @@ import org.slf4j.Logger; /** Created by weijiesun on 18-3-9. */ -public class MetricsReporter { - public MetricsReporter(int reportSecs, PegasusCollector collector) { +public class FalconReporter implements PegasusMonitor { + public FalconReporter(int reportSecs, FalconCollector falconCollectorMetric) { falconAgentIP = "127.0.0.1"; falconAgentPort = 1988; falconAgentSocket = falconAgentIP + ":" + falconAgentPort; reportIntervalSecs = reportSecs; falconRequestPath = "/v1/push"; - pegasusCollector = collector; + falconCollector = falconCollectorMetric; boot = new Bootstrap(); httpClientGroup = new NioEventLoopGroup(1); @@ -51,36 +51,32 @@ public void initChannel(SocketChannel ch) { reportTarget = null; } + @Override public void start() { reportStopped = false; tryConnect(); } + @Override public void stop() { httpClientGroup.execute( - new Runnable() { - @Override - public void run() { - reportStopped = true; - if (actionLater != null) { - actionLater.cancel(false); - } - if (reportTarget != null) { - reportTarget - .close() - .addListener( - new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture channelFuture) - throws Exception { + () -> { + reportStopped = true; + if (actionLater != null) { + actionLater.cancel(false); + } + if (reportTarget != null) { + reportTarget + .close() + .addListener( + (ChannelFutureListener) + channelFuture -> { if (channelFuture.isSuccess()) { logger.info("close channel to {} succeed", falconAgentSocket); } else { logger.warn("close channel to {} failed: ", channelFuture.cause()); } - } - }); - } + }); } }); @@ -95,68 +91,52 @@ public void operationComplete(ChannelFuture channelFuture) public void tryConnect() { boot.connect(falconAgentIP, falconAgentPort) .addListener( - new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture channelFuture) throws Exception { - if (channelFuture.isSuccess()) { - reportTarget = channelFuture.channel(); - logger.info("create channel with {} succeed, wait it active", falconAgentSocket); - } else { - logger.error( - "create channel with {} failed, connect later: ", - falconAgentSocket, - channelFuture.cause()); - scheduleNextConnect(); - } - } - }); + (ChannelFutureListener) + channelFuture -> { + if (channelFuture.isSuccess()) { + reportTarget = channelFuture.channel(); + logger.info( + "create channel with {} succeed, wait it active", falconAgentSocket); + } else { + logger.error( + "create channel with {} failed, connect later: ", + falconAgentSocket, + channelFuture.cause()); + scheduleNextConnect(); + } + }); } public void scheduleNextConnect() { if (reportStopped) return; actionLater = - httpClientGroup.schedule( - new Runnable() { - @Override - public void run() { - tryConnect(); - } - }, - (long) reportIntervalSecs, - TimeUnit.SECONDS); + httpClientGroup.schedule(this::tryConnect, (long) reportIntervalSecs, TimeUnit.SECONDS); } public void scheduleNextReport(final Channel channel) { if (reportStopped) return; actionLater = httpClientGroup.schedule( - new Runnable() { - @Override - public void run() { - reportMetrics(channel); - } - }, - reportIntervalSecs, - TimeUnit.SECONDS); + () -> reportMetrics(channel), reportIntervalSecs, TimeUnit.SECONDS); } public void reportMetrics(final Channel channel) { - String result; + String json_metrics; try { - result = pegasusCollector.updateMetric(); + json_metrics = falconCollector.metricsToJson(); } catch (JSONException ex) { logger.warn("encode metrics to json failed, skip current report, retry later: ", ex); scheduleNextReport(channel); return; } - logger.debug("generate metrics {} and try to report", result); + logger.debug("generate metrics {} and try to report", json_metrics); FullHttpRequest request = new DefaultFullHttpRequest( HttpVersion.HTTP_1_1, HttpMethod.POST, falconRequestPath, - Unpooled.copiedBuffer(result.getBytes())); + Unpooled.copiedBuffer(json_metrics.getBytes())); request.headers().add(HttpHeaders.Names.HOST, falconAgentSocket); request.headers().add(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); request.headers().add(HttpHeaders.Names.CONTENT_LENGTH, request.content().readableBytes()); @@ -165,18 +145,16 @@ public void reportMetrics(final Channel channel) { channel .writeAndFlush(request) .addListener( - new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture channelFuture) throws Exception { - if (!channelFuture.isSuccess()) { - logger.warn( - "report to {} failed, skip current report, retry later: ", - channel.toString(), - channelFuture.cause()); - channel.close(); - } - } - }); + (ChannelFutureListener) + channelFuture -> { + if (!channelFuture.isSuccess()) { + logger.warn( + "report to {} failed, skip current report, retry later: ", + channel.toString(), + channelFuture.cause()); + channel.close(); + } + }); } class HttpClientHandler extends SimpleChannelInboundHandler { @@ -226,7 +204,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { private int reportIntervalSecs; private String falconRequestPath; - private PegasusCollector pegasusCollector; + public FalconCollector falconCollector; private Bootstrap boot; private EventLoopGroup httpClientGroup; @@ -235,5 +213,5 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { private boolean reportStopped; private Channel reportTarget; - private static final Logger logger = org.slf4j.LoggerFactory.getLogger(MetricsReporter.class); + private static final Logger logger = org.slf4j.LoggerFactory.getLogger(FalconReporter.class); } diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsManager.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsManager.java index 37360bad..89c0fc1b 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsManager.java +++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsManager.java @@ -8,6 +8,7 @@ /** Created by weijiesun on 18-3-8. */ public final class MetricsManager { + public static void updateCount(String counterName, long count) { metrics.setMeter(counterName, count); } @@ -16,13 +17,14 @@ public static void setHistogramValue(String counterName, long value) { metrics.setHistorgram(counterName, value); } - public static final void initFromHost( + public static void initFromHost( String host, String tag, int reportIntervalSec, String perfCounterType) { synchronized (logger) { if (started) { logger.warn( - "perf counter system has started with host({}), tag({}), interval({}), " + "perf counter system({}) has started with host({}), tag({}), interval(if set falcon as system)({}), " + "skip this init with host({}), tag({}), interval(){}", + MetricsManager.perfCounterType, MetricsManager.host, MetricsManager.tag, MetricsManager.reportIntervalSecs, @@ -35,21 +37,22 @@ public static final void initFromHost( logger.info( "init metrics with host({}), tag({}), interval({})", host, tag, reportIntervalSec); + MetricsManager.perfCounterType = perfCounterType; MetricsManager.host = host; MetricsManager.tag = tag; MetricsManager.reportIntervalSecs = reportIntervalSec; metrics = new MetricsPool(host, tag, reportIntervalSec, perfCounterType); + metrics.start(); started = true; } } - public static final void detectHostAndInit( - String tag, int reportIntervalSec, String perfCounterType) { + public static void detectHostAndInit(String tag, int reportIntervalSec, String perfCounterType) { initFromHost( Tools.getLocalHostAddress().getHostName(), tag, reportIntervalSec, perfCounterType); } - public static final void finish() { + public static void finish() { synchronized (logger) { if (started) { metrics.stop(); @@ -62,7 +65,7 @@ public static final void finish() { private static String host; private static String tag; private static int reportIntervalSecs; - + private static String perfCounterType; private static MetricsPool metrics; private static final Logger logger = org.slf4j.LoggerFactory.getLogger(MetricsManager.class); diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java index 9623fb6f..df7e386f 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java +++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java @@ -8,31 +8,25 @@ /** Created by weijiesun on 18-3-9. */ public final class MetricsPool { - public final String metricType; - private final MetricRegistry registry = new MetricRegistry(); - public static MetricsReporter reporter; - public static PegasusCollector collector; - - public MetricsPool(String host, String tags, int reportStepSec, String type) { - metricType = type; + public static PegasusMonitor pegasusMonitor; + public MetricsPool(String host, String tags, int reportStepSec, String metricType) { if (metricType.equals("falcon")) { - collector = new Falcon(host, tags, reportStepSec, registry); - reporter = new MetricsReporter(reportStepSec, collector); - reporter.start(); - } else if (type.equals("prometheus")) { - collector = new Prometheus(tags, reportStepSec, registry); - ((Prometheus) collector).start(); + pegasusMonitor = + new FalconReporter( + reportStepSec, new FalconCollector(host, tags, reportStepSec, registry)); + } else if (metricType.equals("prometheus")) { + pegasusMonitor = new PrometheusCollector(tags, registry); } } + public void start() { + pegasusMonitor.start(); + } + public void stop() { - if (metricType.equals("falcon")) { - reporter.stop(); - } else if (metricType.equals("prometheus")) { - ((Prometheus) collector).stop(); - } + pegasusMonitor.stop(); } public void setMeter(String counterName, long count) { @@ -43,11 +37,11 @@ public void setHistorgram(String counterName, long value) { registry.histogram(counterName).update(value); } - public static String getTableTag(String counterName, String defaultTags, String regex) { + public static String getTableTag(String counterName, String defaultTags) { if (defaultTags.contains("table=")) { return defaultTags; } - String[] result = counterName.split(regex); + String[] result = counterName.split(":"); if (result.length >= 2) { return defaultTags.equals("") ? ("table=" + result[1]) diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/PegasusCollector.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/PegasusCollector.java deleted file mode 100644 index 1434d76f..00000000 --- a/src/main/java/com/xiaomi/infra/pegasus/metrics/PegasusCollector.java +++ /dev/null @@ -1,6 +0,0 @@ -package com.xiaomi.infra.pegasus.metrics; - -public interface PegasusCollector { - - public String updateMetric(); -} diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/PegasusMonitor.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/PegasusMonitor.java new file mode 100644 index 00000000..3c8a66a1 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/PegasusMonitor.java @@ -0,0 +1,8 @@ +package com.xiaomi.infra.pegasus.metrics; + +public interface PegasusMonitor { + + public void start(); + + public void stop(); +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/Prometheus.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/Prometheus.java deleted file mode 100644 index 1572012f..00000000 --- a/src/main/java/com/xiaomi/infra/pegasus/metrics/Prometheus.java +++ /dev/null @@ -1,122 +0,0 @@ -package com.xiaomi.infra.pegasus.metrics; - -import static com.xiaomi.infra.pegasus.metrics.MetricsPool.getTableTag; - -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Meter; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.Snapshot; -import io.prometheus.client.Collector; -import io.prometheus.client.GaugeMetricFamily; -import io.prometheus.client.exporter.HTTPServer; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -public class Prometheus extends Collector implements PegasusCollector { - - private final ScheduledExecutorService collectorTask; - private final String defaultTags; - private final MetricRegistry registry; - private int reportStepSec; - - private Map metrics = new ConcurrentHashMap<>(); - private Map> tableLabels = new HashMap<>(); - - public Prometheus(String defaultTags, int reportStepSec, MetricRegistry registry) { - this.defaultTags = defaultTags; - this.reportStepSec = reportStepSec; - this.registry = registry; - this.collectorTask = Executors.newScheduledThreadPool(1); - } - - public List collect() { - return new ArrayList<>(metrics.values()); - } - - public String updateMetric() { - for (Map.Entry entry : registry.getMeters().entrySet()) { - String QPSName = format(entry.getKey()); - updateQPSMetric(entry, QPSName); - } - - for (Map.Entry entry : registry.getHistograms().entrySet()) { - String latencyName = format(entry.getKey()); - updateLatencyMetric(entry, latencyName + "-99th"); - updateLatencyMetric(entry, latencyName + "-999th"); - } - - return "OK"; - } - - private void updateQPSMetric(Map.Entry meter, String name) { - Map labels = getLabel(getTableTag(name, defaultTags, ":")); - if (!metrics.containsKey(name)) { - // assert labels != null; - GaugeMetricFamily labeledGauge = - new GaugeMetricFamily(name, "help", new ArrayList<>(labels.keySet())); - labeledGauge.addMetric(new ArrayList<>(labels.values()), meter.getValue().getMeanRate()); - metrics.put(name, labeledGauge); - } - ((GaugeMetricFamily) metrics.get(name)) - .addMetric(new ArrayList<>(labels.values()), meter.getValue().getMeanRate()); - } - - private void updateLatencyMetric(Map.Entry meter, String name) { - Map labels = getLabel(getTableTag(name, defaultTags, ":")); - ArrayList labelList = new ArrayList<>(labels.values()); - Snapshot snapshot = meter.getValue().getSnapshot(); - double value = - name.contains("99th") ? snapshot.get99thPercentile() : snapshot.get999thPercentile(); - if (!metrics.containsKey(name)) { - // assert labels != null; - GaugeMetricFamily labeledGauge = - new GaugeMetricFamily(name, "help", new ArrayList<>(labels.keySet())); - labeledGauge.addMetric(new ArrayList<>(labels.values()), value); - metrics.put(name, labeledGauge); - } - - ((GaugeMetricFamily) metrics.get(name)).addMetric(labelList, value); - } - - private Map getLabel(String labels) { - if (tableLabels.containsKey(labels)) { - return tableLabels.get(labels); - } - - // todo nullptr - HashMap labelMap = new HashMap<>(); - String[] labelsString = labels.split(","); - for (String label : labelsString) { - String[] labelPair = label.split("="); - labelMap.put(labelPair[0], labelPair[1]); - } - tableLabels.put(labels, labelMap); - return labelMap; - } - - private String format(String name) { - return name.replaceAll("\\.", "_").replaceAll("@", ":"); - } - - public void start() { - try { - register(); - new HTTPServer(9091); - collectorTask.scheduleAtFixedRate(this::updateMetric, 0, 10, TimeUnit.SECONDS); - System.out.println("XXXX"); - } catch (IOException e) { - e.printStackTrace(); - } - } - - public void stop() { - collectorTask.shutdown(); - } -} diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/PrometheusCollector.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/PrometheusCollector.java new file mode 100644 index 00000000..a2beafab --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/PrometheusCollector.java @@ -0,0 +1,129 @@ +package com.xiaomi.infra.pegasus.metrics; + +import static com.xiaomi.infra.pegasus.metrics.MetricsPool.getTableTag; + +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Snapshot; +import com.google.common.collect.ImmutableList; +import io.prometheus.client.Collector; +import io.prometheus.client.GaugeMetricFamily; +import io.prometheus.client.exporter.HTTPServer; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.slf4j.Logger; + +public class PrometheusCollector extends Collector implements PegasusMonitor { + private static final Logger logger = org.slf4j.LoggerFactory.getLogger(PrometheusCollector.class); + + private static final int PORT = 9091; + + private final String defaultTags; + private final MetricRegistry registry; + private Map> tableLabels = new HashMap<>(); + + public PrometheusCollector(String defaultTags, MetricRegistry registry) { + this.defaultTags = defaultTags; + this.registry = registry; + } + + public List collect() { + final ImmutableList.Builder metricFamilySamplesBuilder = + ImmutableList.builder(); + updateMetric(metricFamilySamplesBuilder); + return metricFamilySamplesBuilder.build(); + } + + public void updateMetric(final ImmutableList.Builder builder) { + for (Map.Entry entry : registry.getMeters().entrySet()) { + updateQPSMetric(entry.getKey(), entry, builder); + } + + for (Map.Entry entry : registry.getHistograms().entrySet()) { + updateLatencyMetric(entry.getKey(), entry, builder); + } + } + + private void updateQPSMetric( + String name, + Map.Entry meter, + final ImmutableList.Builder builder) { + Map labels = getLabel(getTableTag(name, defaultTags)); + GaugeMetricFamily labeledGauge = + new GaugeMetricFamily( + formatQPSMetricName(name), "pegasus operation qps", new ArrayList<>(labels.keySet())); + labeledGauge.addMetric(new ArrayList<>(labels.values()), meter.getValue().getMeanRate()); + builder.add(labeledGauge); + } + + private void updateLatencyMetric( + String name, + Map.Entry meter, + final ImmutableList.Builder builder) { + Map labels = getLabel(getTableTag(name, defaultTags)); + Snapshot snapshot = meter.getValue().getSnapshot(); + updateLatencyMetric( + formatLatencyMetricName(name, "p99"), snapshot.get99thPercentile(), labels, builder); + updateLatencyMetric( + formatLatencyMetricName(name, "p999"), snapshot.get999thPercentile(), labels, builder); + } + + private void updateLatencyMetric( + String key, + double value, + Map labels, + final ImmutableList.Builder builder) { + GaugeMetricFamily labeledGauge = + new GaugeMetricFamily(key, "pegasus operation latency", new ArrayList<>(labels.keySet())); + labeledGauge.addMetric(new ArrayList<>(labels.values()), value); + builder.add(labeledGauge); + } + + private Map getLabel(String labels) { + if (tableLabels.containsKey(labels)) { + return tableLabels.get(labels); + } + + HashMap labelMap = new HashMap<>(); + String[] labelsString = labels.split(","); + for (String label : labelsString) { + String[] labelPair = label.split("="); + assert (labelPair.length == 2); + labelMap.put(labelPair[0], labelPair[1]); + } + tableLabels.put(labels, labelMap); + return labelMap; + } + + private String formatLatencyMetricName(String name, String percentage) { + String[] metricName = name.split(":"); + assert (metricName.length == 2); + return metricName[0] + "_" + percentage; + } + + private String formatQPSMetricName(String name) { + String[] metricName = name.split(":"); + assert (metricName.length == 2); + return metricName[0]; + } + + @Override + public void start() { + try { + register(); + new HTTPServer(PORT); + logger.debug("start the prometheus collector server, port = {}", PORT); + } catch (IOException e) { + logger.debug("start the prometheus collector server failed, error = {}", e.getMessage()); + } + } + + @Override + public void stop() { + logger.debug("stop the prometheus collector server, port = {}", PORT); + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java b/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java index 9be285ea..f5139493 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java +++ b/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java @@ -56,23 +56,23 @@ public String getQPSCounter() { break; } - // pegasus.client.put.succ.qps + // pegasus_client_put_succ_qps return new StringBuilder() - .append("pegasus.client.") + .append("pegasus_client_") .append(name()) - .append(".") + .append("_") .append(mark) - .append(".qps@") + .append("_qps:") .append(tableName) .toString(); } public String getLatencyCounter() { - // pegasus.client.put.latency + // pegasus_client_put_latency return new StringBuilder() - .append("pegasus.client.") + .append("pegasus_client_") .append(name()) - .append(".latency@") + .append("_latency:") .append(tableName) .toString(); } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/ClusterOptions.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/ClusterOptions.java index af14149e..64cd67dd 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/ClusterOptions.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/ClusterOptions.java @@ -115,6 +115,7 @@ public static ClusterOptions create(Properties config) { PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY, PEGASUS_PUSH_COUNTER_INTERVAL_SECS_DEF)); String perfCounterType = config.getProperty(PEGASUS_PUSH_COUNTER_TYPE_KEY, PEGASUS_PUSH_COUNTER_TYPE_DEF); + assert ((perfCounterType.equals("falcon") || perfCounterType.equals("prometheus"))); int metaQueryTimeout = Integer.parseInt( config.getProperty(PEGASUS_META_QUERY_TIMEOUT_KEY, PEGASUS_META_QUERY_TIMEOUT_DEF)); diff --git a/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java b/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java index 2ccebe42..86aa56aa 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java +++ b/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java @@ -136,53 +136,44 @@ public void testSetGetDel() throws PException { try { // set - while (true) { - Thread.sleep(1000); - - client.set( - tableName, - hashKey, - "basic_test_sort_key_1".getBytes(), - "basic_test_value_1".getBytes()); + client.set( + tableName, hashKey, "basic_test_sort_key_1".getBytes(), "basic_test_value_1".getBytes()); - // check exist - boolean exist = client.exist(tableName, hashKey, "basic_test_sort_key_1".getBytes()); - Assert.assertTrue(exist); + // check exist + boolean exist = client.exist(tableName, hashKey, "basic_test_sort_key_1".getBytes()); + Assert.assertTrue(exist); - exist = client.exist(tableName, hashKey, "basic_test_sort_key_2".getBytes()); - Assert.assertFalse(exist); + exist = client.exist(tableName, hashKey, "basic_test_sort_key_2".getBytes()); + Assert.assertFalse(exist); - // check sortkey count - long sortKeyCount = client.sortKeyCount(tableName, hashKey); - Assert.assertEquals(1, sortKeyCount); + // check sortkey count + long sortKeyCount = client.sortKeyCount(tableName, hashKey); + Assert.assertEquals(1, sortKeyCount); - // get - byte[] value = client.get(tableName, hashKey, "basic_test_sort_key_1".getBytes()); - Assert.assertArrayEquals("basic_test_value_1".getBytes(), value); + // get + byte[] value = client.get(tableName, hashKey, "basic_test_sort_key_1".getBytes()); + Assert.assertArrayEquals("basic_test_value_1".getBytes(), value); - value = client.get(tableName, hashKey, "basic_test_sort_key_2".getBytes()); - Assert.assertEquals(null, value); + value = client.get(tableName, hashKey, "basic_test_sort_key_2".getBytes()); + Assert.assertEquals(null, value); - // del - client.del(tableName, hashKey, "basic_test_sort_key_1".getBytes()); + // del + client.del(tableName, hashKey, "basic_test_sort_key_1".getBytes()); - // check exist - exist = client.exist(tableName, hashKey, "basic_test_sort_key_1".getBytes()); - Assert.assertFalse(exist); + // check exist + exist = client.exist(tableName, hashKey, "basic_test_sort_key_1".getBytes()); + Assert.assertFalse(exist); - // check sortkey count - sortKeyCount = client.sortKeyCount(tableName, hashKey); - Assert.assertEquals(0, sortKeyCount); + // check sortkey count + sortKeyCount = client.sortKeyCount(tableName, hashKey); + Assert.assertEquals(0, sortKeyCount); - // check deleted - value = client.get(tableName, hashKey, "basic_test_sort_key_1".getBytes()); - Assert.assertEquals(null, value); - } + // check deleted + value = client.get(tableName, hashKey, "basic_test_sort_key_1".getBytes()); + Assert.assertEquals(null, value); } catch (PException e) { e.printStackTrace(); Assert.assertTrue(false); - } catch (InterruptedException e) { - e.printStackTrace(); } PegasusClientFactory.closeSingletonClient(); diff --git a/src/test/java/com/xiaomi/infra/pegasus/metrics/MetricsPoolTest.java b/src/test/java/com/xiaomi/infra/pegasus/metrics/MetricsPoolTest.java index 198ec26a..d6e582b5 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/metrics/MetricsPoolTest.java +++ b/src/test/java/com/xiaomi/infra/pegasus/metrics/MetricsPoolTest.java @@ -23,14 +23,14 @@ public void before() { public void genJsonsFromMeter() throws Exception { String host = "simple-test-host.bj"; String tags = "what=you,like=another"; - Falcon falcon = new Falcon(host, tags, 20, r); + FalconCollector falconCollector = new FalconCollector(host, tags, 20, r); Meter m = r.meter("TestName"); m.mark(1); m.mark(1); StringBuilder builder = new StringBuilder(); - falcon.genJsonsFromMeter("TestName", m, builder); + falconCollector.genJsonsFromMeter("TestName", m, builder); JSONArray array = new JSONArray("[" + builder.toString() + "]"); Assert.assertEquals(1, array.length()); @@ -54,12 +54,12 @@ public void genJsonsFromMeter() throws Exception { public void genJsonFromHistogram() throws Exception { String host = "simple-test-host.bj"; String tags = "what=you,like=another"; - Falcon falcon = new Falcon(host, tags, 20, r); + FalconCollector falconCollector = new FalconCollector(host, tags, 20, r); Histogram h = r.histogram("TestHist"); for (int i = 0; i < 1000000; ++i) h.update((long) i); StringBuilder builder = new StringBuilder(); - falcon.genJsonsFromHistogram("TestHist", h, builder); + falconCollector.genJsonsFromHistogram("TestHist", h, builder); JSONArray array = new JSONArray("[" + builder.toString() + "]"); Assert.assertEquals(2, array.length()); @@ -79,7 +79,7 @@ public void genJsonFromHistogram() throws Exception { @Test public void oneMetricToJson() throws Exception { - Falcon.FalconMetric metric = new Falcon.FalconMetric(); + FalconCollector.FalconMetric metric = new FalconCollector.FalconMetric(); metric.endpoint = "1.2.3.4"; metric.metric = "simple_metric"; metric.timestamp = 12343455L; @@ -89,7 +89,7 @@ public void oneMetricToJson() throws Exception { metric.tags = "cluster=onebox,app=new"; StringBuilder builder = new StringBuilder(); - Falcon.oneMetricToJson(metric, builder); + FalconCollector.oneMetricToJson(metric, builder); JSONObject obj = new JSONObject(builder.toString()); Assert.assertEquals(metric.endpoint, obj.getString("endpoint")); @@ -102,7 +102,7 @@ public void oneMetricToJson() throws Exception { builder.setLength(0); metric.tags = ""; - Falcon.oneMetricToJson(metric, builder); + FalconCollector.oneMetricToJson(metric, builder); obj = new JSONObject(builder.toString()); Assert.assertEquals(metric.endpoint, obj.getString("endpoint")); Assert.assertEquals(metric.metric, obj.getString("metric")); @@ -117,7 +117,7 @@ public void oneMetricToJson() throws Exception { public void metricsToJson() throws Exception { String host = "simple-test-host.bj"; String tags = "what=you,like=another"; - MetricsPool pool = new MetricsPool(host, tags, 20, "falcon"); + MetricsPool pool = new MetricsPool(host, tags, 20, "falconCollector"); pool.setMeter("aaa@temp", 1); pool.setMeter("aaa", 2); @@ -127,7 +127,9 @@ public void metricsToJson() throws Exception { pool.setHistorgram("ccc@temp", i); } - JSONArray array = new JSONArray(MetricsPool.collector.updateMetric()); + JSONArray array = + new JSONArray( + ((FalconReporter) MetricsPool.pegasusMonitor).falconCollector.metricsToJson()); Assert.assertEquals(6, array.length()); for (int i = 0; i < array.length(); ++i) { JSONObject j = array.getJSONObject(i); diff --git a/src/test/resource/pegasus.properties b/src/test/resource/pegasus.properties index ec4da28e..6282eea7 100644 --- a/src/test/resource/pegasus.properties +++ b/src/test/resource/pegasus.properties @@ -1,7 +1,6 @@ meta_servers = 127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603 operation_timeout = 10000 async_workers = 2 -enable_perf_counter = true -push_counter_type = prometheus +enable_perf_counter = false perf_counter_tags = cluster=onebox,app=unit_test push_counter_interval_secs = 10