From 64ac0ff46ebb67fe5c2ce80a672cdf9e5c09d7e6 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Thu, 4 Jun 2020 21:11:13 +0800 Subject: [PATCH 1/9] init --- .../xiaomi/infra/pegasus/metrics/Falcon.java | 100 +++++++++++++++++ .../infra/pegasus/metrics/MetricsPool.java | 103 +++--------------- .../pegasus/metrics/MetricsReporter.java | 8 +- .../pegasus/metrics/PegasusCollector.java | 8 ++ .../infra/pegasus/metrics/Prometheus.java | 94 ++++++++++++++++ 5 files changed, 219 insertions(+), 94 deletions(-) create mode 100644 src/main/java/com/xiaomi/infra/pegasus/metrics/Falcon.java create mode 100644 src/main/java/com/xiaomi/infra/pegasus/metrics/PegasusCollector.java create mode 100644 src/main/java/com/xiaomi/infra/pegasus/metrics/Prometheus.java diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/Falcon.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/Falcon.java new file mode 100644 index 00000000..c28844e7 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/Falcon.java @@ -0,0 +1,100 @@ +package com.xiaomi.infra.pegasus.metrics; + +import static com.xiaomi.infra.pegasus.metrics.MetricsPool.getTableTag; + +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Snapshot; +import com.xiaomi.infra.pegasus.tools.Tools; +import java.util.Map; +import java.util.SortedMap; +import org.json.JSONException; +import org.json.JSONObject; + +public class Falcon implements PegasusCollector { + + private FalconMetric falconMetric = new FalconMetric(); + public final String defaultTags; + + public Falcon(String host, String tags, int reportStepSec) { + defaultTags = tags; + falconMetric.endpoint = host; + falconMetric.step = reportStepSec; + } + + public String addMetric(MetricRegistry registry) { + falconMetric.timestamp = Tools.unixEpochMills() / 1000; + + StringBuilder builder = new StringBuilder(); + builder.append('['); + SortedMap meters = registry.getMeters(); + for (Map.Entry entry : meters.entrySet()) { + genJsonsFromMeter(entry.getKey(), entry.getValue(), builder); + builder.append(','); + } + + for (Map.Entry entry : registry.getHistograms().entrySet()) { + genJsonsFromHistogram(entry.getKey(), entry.getValue(), builder); + builder.append(','); + } + + if (builder.charAt(builder.length() - 1) == ',') { + builder.deleteCharAt(builder.length() - 1); + } + + builder.append("]"); + + return builder.toString(); + } + + public void genJsonsFromMeter(String name, Meter meter, StringBuilder output) + throws JSONException { + falconMetric.counterType = "GAUGE"; + + falconMetric.metric = name + ".cps-1sec"; + falconMetric.tags = getTableTag(name, defaultTags); + falconMetric.value = meter.getMeanRate(); + oneMetricToJson(falconMetric, output); + } + + public void genJsonsFromHistogram(String name, Histogram hist, StringBuilder output) + throws JSONException { + falconMetric.counterType = "GAUGE"; + Snapshot s = hist.getSnapshot(); + + falconMetric.metric = name + ".p99"; + falconMetric.tags = getTableTag(name, defaultTags); + falconMetric.value = s.get99thPercentile(); + oneMetricToJson(falconMetric, output); + output.append(','); + + falconMetric.metric = name + ".p999"; + falconMetric.tags = getTableTag(name, defaultTags); + falconMetric.value = s.get999thPercentile(); + oneMetricToJson(falconMetric, output); + } + + public static void oneMetricToJson(FalconMetric metric, StringBuilder output) + throws JSONException { + JSONObject obj = new JSONObject(); + obj.put("endpoint", metric.endpoint); + obj.put("metric", metric.metric); + obj.put("timestamp", metric.timestamp); + obj.put("step", metric.step); + obj.put("value", metric.value); + obj.put("counterType", metric.counterType); + obj.put("tags", metric.tags); + output.append(obj.toString()); + } + + static final class FalconMetric { + public String endpoint; // metric host + public String metric; // metric name + public long timestamp; // report time in unix seconds + public int step; // report interval in seconds; + public double value; // metric value + public String counterType; // GAUGE or COUNTER + public String tags; // metrics description + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java index 8ecb5508..8c151b0c 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java +++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java @@ -3,27 +3,21 @@ // can be found in the LICENSE file in the root directory of this source tree. package com.xiaomi.infra.pegasus.metrics; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.Snapshot; -import com.xiaomi.infra.pegasus.tools.Tools; -import java.util.Map; -import java.util.SortedMap; -import org.json.JSONException; -import org.json.JSONObject; /** Created by weijiesun on 18-3-9. */ public final class MetricsPool { - private final String defaultTags; + public final String metricType; - public MetricsPool(String host, String tags, int reportStepSec) { - theMetric = new FalconMetric(); - theMetric.endpoint = host; - theMetric.step = reportStepSec; - theMetric.tags = tags; - defaultTags = tags; + public MetricsPool(String host, String tags, int reportStepSec, String type) { + metricType = type; + collector = new Falcon(host, tags, reportStepSec); + } + + public MetricsPool(String tags, String type) { + metricType = type; + collector = new Prometheus(tags); } public void setMeter(String counterName, long count) { @@ -34,72 +28,11 @@ public void setHistorgram(String counterName, long value) { registry.histogram(counterName).update(value); } - public void genJsonsFromMeter(String name, Meter meter, StringBuilder output) - throws JSONException { - theMetric.counterType = "GAUGE"; - - theMetric.metric = name + ".cps-1sec"; - theMetric.tags = getTableTag(name); - theMetric.value = meter.getMeanRate(); - oneMetricToJson(theMetric, output); + public String metricToCollector() { + return collector.addMetric(registry); } - public void genJsonsFromHistogram(String name, Histogram hist, StringBuilder output) - throws JSONException { - theMetric.counterType = "GAUGE"; - Snapshot s = hist.getSnapshot(); - - theMetric.metric = name + ".p99"; - theMetric.tags = getTableTag(name); - theMetric.value = s.get99thPercentile(); - oneMetricToJson(theMetric, output); - output.append(','); - - theMetric.metric = name + ".p999"; - theMetric.tags = getTableTag(name); - theMetric.value = s.get999thPercentile(); - oneMetricToJson(theMetric, output); - } - - public static void oneMetricToJson(FalconMetric metric, StringBuilder output) - throws JSONException { - JSONObject obj = new JSONObject(); - obj.put("endpoint", metric.endpoint); - obj.put("metric", metric.metric); - obj.put("timestamp", metric.timestamp); - obj.put("step", metric.step); - obj.put("value", metric.value); - obj.put("counterType", metric.counterType); - obj.put("tags", metric.tags); - output.append(obj.toString()); - } - - public String metricsToJson() throws JSONException { - theMetric.timestamp = Tools.unixEpochMills() / 1000; - - StringBuilder builder = new StringBuilder(); - builder.append('['); - SortedMap meters = registry.getMeters(); - for (Map.Entry entry : meters.entrySet()) { - genJsonsFromMeter(entry.getKey(), entry.getValue(), builder); - builder.append(','); - } - - for (Map.Entry entry : registry.getHistograms().entrySet()) { - genJsonsFromHistogram(entry.getKey(), entry.getValue(), builder); - builder.append(','); - } - - if (builder.charAt(builder.length() - 1) == ',') { - builder.deleteCharAt(builder.length() - 1); - } - - builder.append("]"); - - return builder.toString(); - } - - private String getTableTag(String counterName) { + public static String getTableTag(String counterName, String defaultTags) { if (defaultTags.contains("table=")) { return defaultTags; } @@ -112,16 +45,6 @@ private String getTableTag(String counterName) { return defaultTags; } - static final class FalconMetric { - public String endpoint; // metric host - public String metric; // metric name - public long timestamp; // report time in unix seconds - public int step; // report interval in seconds; - public double value; // metric value - public String counterType; // GAUGE or COUNTER - public String tags; // metrics description - } - - private FalconMetric theMetric; + private PegasusCollector collector; private final MetricRegistry registry = new MetricRegistry(); } diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsReporter.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsReporter.java index bb3df12c..f71ba627 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsReporter.java +++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsReporter.java @@ -141,22 +141,22 @@ public void run() { } public void reportMetrics(final Channel channel) { - String json_metrics; + String result; try { - json_metrics = metrics.metricsToJson(); + result = metrics.metricToCollector(); } catch (JSONException ex) { logger.warn("encode metrics to json failed, skip current report, retry later: ", ex); scheduleNextReport(channel); return; } - logger.debug("generate metrics {} and try to report", json_metrics); + logger.debug("generate metrics {} and try to report", result); FullHttpRequest request = new DefaultFullHttpRequest( HttpVersion.HTTP_1_1, HttpMethod.POST, falconRequestPath, - Unpooled.copiedBuffer(json_metrics.getBytes())); + Unpooled.copiedBuffer(result.getBytes())); request.headers().add(HttpHeaders.Names.HOST, falconAgentSocket); request.headers().add(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); request.headers().add(HttpHeaders.Names.CONTENT_LENGTH, request.content().readableBytes()); diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/PegasusCollector.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/PegasusCollector.java new file mode 100644 index 00000000..9cc55a13 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/PegasusCollector.java @@ -0,0 +1,8 @@ +package com.xiaomi.infra.pegasus.metrics; + +import com.codahale.metrics.MetricRegistry; + +public interface PegasusCollector { + + public String addMetric(MetricRegistry registry); +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/Prometheus.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/Prometheus.java new file mode 100644 index 00000000..f6f5d0f4 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/Prometheus.java @@ -0,0 +1,94 @@ +package com.xiaomi.infra.pegasus.metrics; + +import static com.xiaomi.infra.pegasus.metrics.MetricsPool.getTableTag; + +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Snapshot; +import io.prometheus.client.Collector; +import io.prometheus.client.GaugeMetricFamily; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class Prometheus extends Collector implements PegasusCollector { + private final String defaultTags; + + private Map metrics = new ConcurrentHashMap<>(); + private Map> tableLabels = new HashMap<>(); + + public Prometheus(String defaultTags) { + this.defaultTags = defaultTags; + } + + public List collect() { + return new ArrayList<>(metrics.values()); + } + + public String addMetric(MetricRegistry registry) { + for (Map.Entry entry : registry.getMeters().entrySet()) { + String QPSName = format(entry.getKey()); + updateQPSMetric(entry, QPSName); + } + + for (Map.Entry entry : registry.getHistograms().entrySet()) { + String latencyName = format(entry.getKey()); + updateLatencyMetric(entry, latencyName + "-99th"); + updateLatencyMetric(entry, latencyName + "-999th"); + } + + return "OK"; + } + + private void updateQPSMetric(Map.Entry meter, String name) { + Map labels = getLabel(getTableTag(name, defaultTags)); + if (!metrics.containsKey(name)) { + // assert labels != null; + GaugeMetricFamily labeledGauge = + new GaugeMetricFamily(name, "help", new ArrayList<>(labels.keySet())); + labeledGauge.addMetric(new ArrayList<>(labels.values()), meter.getValue().getMeanRate()); + metrics.put(name, labeledGauge); + } + ((GaugeMetricFamily) metrics.get(name)) + .addMetric(new ArrayList<>(labels.values()), meter.getValue().getMeanRate()); + } + + private void updateLatencyMetric(Map.Entry meter, String name) { + Map labels = getLabel(getTableTag(name, defaultTags)); + ArrayList labelList = new ArrayList<>(labels.values()); + Snapshot snapshot = meter.getValue().getSnapshot(); + double value = + name.contains("99th") ? snapshot.get99thPercentile() : snapshot.get999thPercentile(); + if (!metrics.containsKey(name)) { + // assert labels != null; + GaugeMetricFamily labeledGauge = + new GaugeMetricFamily(name, "help", new ArrayList<>(labels.keySet())); + labeledGauge.addMetric(new ArrayList<>(labels.values()), value); + metrics.put(name, labeledGauge); + } + + ((GaugeMetricFamily) metrics.get(name)).addMetric(labelList, value); + } + + private Map getLabel(String labels) { + if (tableLabels.containsKey(labels)) { + return tableLabels.get(labels); + } + + HashMap labelMap = new HashMap<>(); + String[] labelsString = labels.split(","); + for (String label : labelsString) { + String[] labelPair = label.split("="); + labelMap.put(labelPair[0], labelPair[1]); + } + tableLabels.put(labels, labelMap); + return labelMap; + } + + private String format(String name) { + return name.replaceAll("\\.", "-"); + } +} From 21ef3877a56dbc6e32d52265ab74b0bd2577774c Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Fri, 5 Jun 2020 19:51:44 +0800 Subject: [PATCH 2/9] init but not update test code --- .../xiaomi/infra/pegasus/metrics/Falcon.java | 8 +++-- .../infra/pegasus/metrics/MetricsManager.java | 17 ++++++----- .../infra/pegasus/metrics/MetricsPool.java | 30 ++++++++++++------- .../pegasus/metrics/MetricsReporter.java | 10 +++---- .../pegasus/metrics/PegasusCollector.java | 4 +-- .../infra/pegasus/metrics/Prometheus.java | 25 ++++++++++++++-- .../infra/pegasus/rpc/ClusterOptions.java | 15 +++++++++- .../pegasus/rpc/async/ClusterManager.java | 3 +- 8 files changed, 77 insertions(+), 35 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/Falcon.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/Falcon.java index c28844e7..ce26e5b5 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/metrics/Falcon.java +++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/Falcon.java @@ -15,15 +15,17 @@ public class Falcon implements PegasusCollector { private FalconMetric falconMetric = new FalconMetric(); + private final MetricRegistry registry; public final String defaultTags; - public Falcon(String host, String tags, int reportStepSec) { - defaultTags = tags; + public Falcon(String host, String tags, int reportStepSec, MetricRegistry registry) { + this.defaultTags = tags; + this.registry = registry; falconMetric.endpoint = host; falconMetric.step = reportStepSec; } - public String addMetric(MetricRegistry registry) { + public String updateMetric() { falconMetric.timestamp = Tools.unixEpochMills() / 1000; StringBuilder builder = new StringBuilder(); diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsManager.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsManager.java index ce1a5277..37360bad 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsManager.java +++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsManager.java @@ -16,7 +16,8 @@ public static void setHistogramValue(String counterName, long value) { metrics.setHistorgram(counterName, value); } - public static final void initFromHost(String host, String tag, int reportIntervalSec) { + public static final void initFromHost( + String host, String tag, int reportIntervalSec, String perfCounterType) { synchronized (logger) { if (started) { logger.warn( @@ -37,21 +38,21 @@ public static final void initFromHost(String host, String tag, int reportInterva MetricsManager.host = host; MetricsManager.tag = tag; MetricsManager.reportIntervalSecs = reportIntervalSec; - metrics = new MetricsPool(host, tag, reportIntervalSec); - reporter = new MetricsReporter(reportIntervalSec, metrics); - reporter.start(); + metrics = new MetricsPool(host, tag, reportIntervalSec, perfCounterType); started = true; } } - public static final void detectHostAndInit(String tag, int reportIntervalSec) { - initFromHost(Tools.getLocalHostAddress().getHostName(), tag, reportIntervalSec); + public static final void detectHostAndInit( + String tag, int reportIntervalSec, String perfCounterType) { + initFromHost( + Tools.getLocalHostAddress().getHostName(), tag, reportIntervalSec, perfCounterType); } public static final void finish() { synchronized (logger) { if (started) { - reporter.stop(); + metrics.stop(); started = false; } } @@ -63,6 +64,6 @@ public static final void finish() { private static int reportIntervalSecs; private static MetricsPool metrics; - private static MetricsReporter reporter; + private static final Logger logger = org.slf4j.LoggerFactory.getLogger(MetricsManager.class); } diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java index 8c151b0c..69b1c0f5 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java +++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java @@ -10,14 +10,29 @@ public final class MetricsPool { public final String metricType; + private final MetricRegistry registry = new MetricRegistry(); + public static MetricsReporter reporter; + private static PegasusCollector collector; + public MetricsPool(String host, String tags, int reportStepSec, String type) { metricType = type; - collector = new Falcon(host, tags, reportStepSec); + + if (metricType.equals("falcon")) { + collector = new Falcon(host, tags, reportStepSec, registry); + reporter = new MetricsReporter(reportStepSec, collector); + reporter.start(); + } else if (type.equals("prometheus")) { + collector = new Prometheus(tags, reportStepSec, registry); + ((Prometheus) collector).start(); + } } - public MetricsPool(String tags, String type) { - metricType = type; - collector = new Prometheus(tags); + public void stop() { + if (metricType.equals("falcon")) { + reporter.stop(); + } else if (metricType.equals("prometheus")) { + ((Prometheus) collector).stop(); + } } public void setMeter(String counterName, long count) { @@ -28,10 +43,6 @@ public void setHistorgram(String counterName, long value) { registry.histogram(counterName).update(value); } - public String metricToCollector() { - return collector.addMetric(registry); - } - public static String getTableTag(String counterName, String defaultTags) { if (defaultTags.contains("table=")) { return defaultTags; @@ -44,7 +55,4 @@ public static String getTableTag(String counterName, String defaultTags) { } return defaultTags; } - - private PegasusCollector collector; - private final MetricRegistry registry = new MetricRegistry(); } diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsReporter.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsReporter.java index f71ba627..3285c5aa 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsReporter.java +++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsReporter.java @@ -18,14 +18,14 @@ /** Created by weijiesun on 18-3-9. */ public class MetricsReporter { - public MetricsReporter(int reportSecs, MetricsPool pool) { + public MetricsReporter(int reportSecs, PegasusCollector collector) { falconAgentIP = "127.0.0.1"; falconAgentPort = 1988; - falconAgentSocket = falconAgentIP + ":" + String.valueOf(falconAgentPort); + falconAgentSocket = falconAgentIP + ":" + falconAgentPort; reportIntervalSecs = reportSecs; falconRequestPath = "/v1/push"; - metrics = pool; + pegasusCollector = collector; boot = new Bootstrap(); httpClientGroup = new NioEventLoopGroup(1); @@ -143,7 +143,7 @@ public void run() { public void reportMetrics(final Channel channel) { String result; try { - result = metrics.metricToCollector(); + result = pegasusCollector.updateMetric(); } catch (JSONException ex) { logger.warn("encode metrics to json failed, skip current report, retry later: ", ex); scheduleNextReport(channel); @@ -226,7 +226,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { private int reportIntervalSecs; private String falconRequestPath; - private MetricsPool metrics; + private PegasusCollector pegasusCollector; private Bootstrap boot; private EventLoopGroup httpClientGroup; diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/PegasusCollector.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/PegasusCollector.java index 9cc55a13..1434d76f 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/metrics/PegasusCollector.java +++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/PegasusCollector.java @@ -1,8 +1,6 @@ package com.xiaomi.infra.pegasus.metrics; -import com.codahale.metrics.MetricRegistry; - public interface PegasusCollector { - public String addMetric(MetricRegistry registry); + public String updateMetric(); } diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/Prometheus.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/Prometheus.java index f6f5d0f4..0afaa489 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/metrics/Prometheus.java +++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/Prometheus.java @@ -13,22 +13,32 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; public class Prometheus extends Collector implements PegasusCollector { + + private final ScheduledExecutorService collectorTask; private final String defaultTags; + private final MetricRegistry registry; + private int reportStepSec; private Map metrics = new ConcurrentHashMap<>(); private Map> tableLabels = new HashMap<>(); - public Prometheus(String defaultTags) { + public Prometheus(String defaultTags, int reportStepSec, MetricRegistry registry) { this.defaultTags = defaultTags; + this.reportStepSec = reportStepSec; + this.registry = registry; + this.collectorTask = Executors.newScheduledThreadPool(1); } public List collect() { return new ArrayList<>(metrics.values()); } - public String addMetric(MetricRegistry registry) { + public String updateMetric() { for (Map.Entry entry : registry.getMeters().entrySet()) { String QPSName = format(entry.getKey()); updateQPSMetric(entry, QPSName); @@ -78,6 +88,7 @@ private Map getLabel(String labels) { return tableLabels.get(labels); } + // todo nullptr HashMap labelMap = new HashMap<>(); String[] labelsString = labels.split(","); for (String label : labelsString) { @@ -89,6 +100,14 @@ private Map getLabel(String labels) { } private String format(String name) { - return name.replaceAll("\\.", "-"); + return name.replaceAll("\\.", "_").replaceAll("@", ":"); + } + + public void start() { + collectorTask.scheduleAtFixedRate(() -> updateMetric(), 0, 10, TimeUnit.SECONDS); + } + + public void stop() { + collectorTask.shutdown(); } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/ClusterOptions.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/ClusterOptions.java index c3a6fb46..af14149e 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/ClusterOptions.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/ClusterOptions.java @@ -27,6 +27,9 @@ public class ClusterOptions { public static final String PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY = "push_counter_interval_secs"; public static final String PEGASUS_PUSH_COUNTER_INTERVAL_SECS_DEF = "60"; + public static final String PEGASUS_PUSH_COUNTER_TYPE_KEY = "push_counter_type"; + public static final String PEGASUS_PUSH_COUNTER_TYPE_DEF = "falcon"; + public static final String PEGASUS_META_QUERY_TIMEOUT_KEY = "meta_query_timeout"; public static final String PEGASUS_META_QUERY_TIMEOUT_DEF = "5000"; @@ -47,6 +50,7 @@ public static String[] allKeys() { private final int asyncWorkers; private final boolean enablePerfCounter; private final String perfCounterTags; + private final String perfCounterType; private final int pushCounterIntervalSecs; private final int metaQueryTimeout; @@ -78,6 +82,10 @@ public int metaQueryTimeout() { return this.metaQueryTimeout; } + public String pushCounterType() { + return this.perfCounterType; + } + public static ClusterOptions create(Properties config) { int operationTimeout = Integer.parseInt( @@ -105,6 +113,8 @@ public static ClusterOptions create(Properties config) { Integer.parseInt( config.getProperty( PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY, PEGASUS_PUSH_COUNTER_INTERVAL_SECS_DEF)); + String perfCounterType = + config.getProperty(PEGASUS_PUSH_COUNTER_TYPE_KEY, PEGASUS_PUSH_COUNTER_TYPE_DEF); int metaQueryTimeout = Integer.parseInt( config.getProperty(PEGASUS_META_QUERY_TIMEOUT_KEY, PEGASUS_META_QUERY_TIMEOUT_DEF)); @@ -115,12 +125,13 @@ public static ClusterOptions create(Properties config) { asyncWorkers, enablePerfCounter, perfCounterTags, + perfCounterType, pushIntervalSecs, metaQueryTimeout); } public static ClusterOptions forTest(String[] metaList) { - return new ClusterOptions(1000, metaList, 1, false, null, 60, 1000); + return new ClusterOptions(1000, metaList, 1, false, null, "falcon", 60, 1000); } private ClusterOptions( @@ -129,6 +140,7 @@ private ClusterOptions( int asyncWorkers, boolean enablePerfCounter, String perfCounterTags, + String perfCounterType, int pushCounterIntervalSecs, int metaQueryTimeout) { this.operationTimeout = operationTimeout; @@ -136,6 +148,7 @@ private ClusterOptions( this.asyncWorkers = asyncWorkers; this.enablePerfCounter = enablePerfCounter; this.perfCounterTags = perfCounterTags; + this.perfCounterType = perfCounterType; this.pushCounterIntervalSecs = pushCounterIntervalSecs; this.metaQueryTimeout = metaQueryTimeout; } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java index 3c1c6963..1e25d85d 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java @@ -47,7 +47,8 @@ public ClusterManager(ClusterOptions opts) throws IllegalArgumentException { setTimeout(opts.operationTimeout()); this.enableCounter = opts.enablePerfCounter(); if (enableCounter) { - MetricsManager.detectHostAndInit(opts.perfCounterTags(), opts.pushCounterIntervalSecs()); + MetricsManager.detectHostAndInit( + opts.perfCounterTags(), opts.pushCounterIntervalSecs(), opts.pushCounterType()); } replicaSessions = new ConcurrentHashMap(); From 24216739de0dc0d39923f949458aee4146566171 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Fri, 5 Jun 2020 20:06:55 +0800 Subject: [PATCH 3/9] update test code --- .../infra/pegasus/metrics/MetricsPool.java | 2 +- .../infra/pegasus/metrics/MetricsPoolTest.java | 18 +++++++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java index 69b1c0f5..8c7c2601 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java +++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java @@ -12,7 +12,7 @@ public final class MetricsPool { private final MetricRegistry registry = new MetricRegistry(); public static MetricsReporter reporter; - private static PegasusCollector collector; + public static PegasusCollector collector; public MetricsPool(String host, String tags, int reportStepSec, String type) { metricType = type; diff --git a/src/test/java/com/xiaomi/infra/pegasus/metrics/MetricsPoolTest.java b/src/test/java/com/xiaomi/infra/pegasus/metrics/MetricsPoolTest.java index 88c0676f..198ec26a 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/metrics/MetricsPoolTest.java +++ b/src/test/java/com/xiaomi/infra/pegasus/metrics/MetricsPoolTest.java @@ -23,14 +23,14 @@ public void before() { public void genJsonsFromMeter() throws Exception { String host = "simple-test-host.bj"; String tags = "what=you,like=another"; - MetricsPool pool = new MetricsPool(host, tags, 20); + Falcon falcon = new Falcon(host, tags, 20, r); Meter m = r.meter("TestName"); m.mark(1); m.mark(1); StringBuilder builder = new StringBuilder(); - pool.genJsonsFromMeter("TestName", m, builder); + falcon.genJsonsFromMeter("TestName", m, builder); JSONArray array = new JSONArray("[" + builder.toString() + "]"); Assert.assertEquals(1, array.length()); @@ -54,12 +54,12 @@ public void genJsonsFromMeter() throws Exception { public void genJsonFromHistogram() throws Exception { String host = "simple-test-host.bj"; String tags = "what=you,like=another"; - MetricsPool pool = new MetricsPool(host, tags, 20); + Falcon falcon = new Falcon(host, tags, 20, r); Histogram h = r.histogram("TestHist"); for (int i = 0; i < 1000000; ++i) h.update((long) i); StringBuilder builder = new StringBuilder(); - pool.genJsonsFromHistogram("TestHist", h, builder); + falcon.genJsonsFromHistogram("TestHist", h, builder); JSONArray array = new JSONArray("[" + builder.toString() + "]"); Assert.assertEquals(2, array.length()); @@ -79,7 +79,7 @@ public void genJsonFromHistogram() throws Exception { @Test public void oneMetricToJson() throws Exception { - MetricsPool.FalconMetric metric = new MetricsPool.FalconMetric(); + Falcon.FalconMetric metric = new Falcon.FalconMetric(); metric.endpoint = "1.2.3.4"; metric.metric = "simple_metric"; metric.timestamp = 12343455L; @@ -89,7 +89,7 @@ public void oneMetricToJson() throws Exception { metric.tags = "cluster=onebox,app=new"; StringBuilder builder = new StringBuilder(); - MetricsPool.oneMetricToJson(metric, builder); + Falcon.oneMetricToJson(metric, builder); JSONObject obj = new JSONObject(builder.toString()); Assert.assertEquals(metric.endpoint, obj.getString("endpoint")); @@ -102,7 +102,7 @@ public void oneMetricToJson() throws Exception { builder.setLength(0); metric.tags = ""; - MetricsPool.oneMetricToJson(metric, builder); + Falcon.oneMetricToJson(metric, builder); obj = new JSONObject(builder.toString()); Assert.assertEquals(metric.endpoint, obj.getString("endpoint")); Assert.assertEquals(metric.metric, obj.getString("metric")); @@ -117,7 +117,7 @@ public void oneMetricToJson() throws Exception { public void metricsToJson() throws Exception { String host = "simple-test-host.bj"; String tags = "what=you,like=another"; - MetricsPool pool = new MetricsPool(host, tags, 20); + MetricsPool pool = new MetricsPool(host, tags, 20, "falcon"); pool.setMeter("aaa@temp", 1); pool.setMeter("aaa", 2); @@ -127,7 +127,7 @@ public void metricsToJson() throws Exception { pool.setHistorgram("ccc@temp", i); } - JSONArray array = new JSONArray(pool.metricsToJson()); + JSONArray array = new JSONArray(MetricsPool.collector.updateMetric()); Assert.assertEquals(6, array.length()); for (int i = 0; i < array.length(); ++i) { JSONObject j = array.getJSONObject(i); From 677d0452a781cf02a06f8268d24681a855bb60a2 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Fri, 5 Jun 2020 20:30:26 +0800 Subject: [PATCH 4/9] update test code --- .../xiaomi/infra/pegasus/metrics/Falcon.java | 6 +- .../infra/pegasus/metrics/MetricsPool.java | 4 +- .../infra/pegasus/metrics/Prometheus.java | 15 ++++- .../infra/pegasus/client/TestBasic.java | 61 +++++++++++-------- src/test/resource/pegasus.properties | 3 +- 5 files changed, 54 insertions(+), 35 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/Falcon.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/Falcon.java index ce26e5b5..1dc14885 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/metrics/Falcon.java +++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/Falcon.java @@ -55,7 +55,7 @@ public void genJsonsFromMeter(String name, Meter meter, StringBuilder output) falconMetric.counterType = "GAUGE"; falconMetric.metric = name + ".cps-1sec"; - falconMetric.tags = getTableTag(name, defaultTags); + falconMetric.tags = getTableTag(name, defaultTags, "@"); falconMetric.value = meter.getMeanRate(); oneMetricToJson(falconMetric, output); } @@ -66,13 +66,13 @@ public void genJsonsFromHistogram(String name, Histogram hist, StringBuilder out Snapshot s = hist.getSnapshot(); falconMetric.metric = name + ".p99"; - falconMetric.tags = getTableTag(name, defaultTags); + falconMetric.tags = getTableTag(name, defaultTags, "@"); falconMetric.value = s.get99thPercentile(); oneMetricToJson(falconMetric, output); output.append(','); falconMetric.metric = name + ".p999"; - falconMetric.tags = getTableTag(name, defaultTags); + falconMetric.tags = getTableTag(name, defaultTags, "@"); falconMetric.value = s.get999thPercentile(); oneMetricToJson(falconMetric, output); } diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java index 8c7c2601..9623fb6f 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java +++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java @@ -43,11 +43,11 @@ public void setHistorgram(String counterName, long value) { registry.histogram(counterName).update(value); } - public static String getTableTag(String counterName, String defaultTags) { + public static String getTableTag(String counterName, String defaultTags, String regex) { if (defaultTags.contains("table=")) { return defaultTags; } - String[] result = counterName.split("@"); + String[] result = counterName.split(regex); if (result.length >= 2) { return defaultTags.equals("") ? ("table=" + result[1]) diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/Prometheus.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/Prometheus.java index 0afaa489..1572012f 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/metrics/Prometheus.java +++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/Prometheus.java @@ -8,6 +8,8 @@ import com.codahale.metrics.Snapshot; import io.prometheus.client.Collector; import io.prometheus.client.GaugeMetricFamily; +import io.prometheus.client.exporter.HTTPServer; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -54,7 +56,7 @@ public String updateMetric() { } private void updateQPSMetric(Map.Entry meter, String name) { - Map labels = getLabel(getTableTag(name, defaultTags)); + Map labels = getLabel(getTableTag(name, defaultTags, ":")); if (!metrics.containsKey(name)) { // assert labels != null; GaugeMetricFamily labeledGauge = @@ -67,7 +69,7 @@ private void updateQPSMetric(Map.Entry meter, String name) { } private void updateLatencyMetric(Map.Entry meter, String name) { - Map labels = getLabel(getTableTag(name, defaultTags)); + Map labels = getLabel(getTableTag(name, defaultTags, ":")); ArrayList labelList = new ArrayList<>(labels.values()); Snapshot snapshot = meter.getValue().getSnapshot(); double value = @@ -104,7 +106,14 @@ private String format(String name) { } public void start() { - collectorTask.scheduleAtFixedRate(() -> updateMetric(), 0, 10, TimeUnit.SECONDS); + try { + register(); + new HTTPServer(9091); + collectorTask.scheduleAtFixedRate(this::updateMetric, 0, 10, TimeUnit.SECONDS); + System.out.println("XXXX"); + } catch (IOException e) { + e.printStackTrace(); + } } public void stop() { diff --git a/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java b/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java index 86aa56aa..2ccebe42 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java +++ b/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java @@ -136,44 +136,53 @@ public void testSetGetDel() throws PException { try { // set - client.set( - tableName, hashKey, "basic_test_sort_key_1".getBytes(), "basic_test_value_1".getBytes()); + while (true) { + Thread.sleep(1000); - // check exist - boolean exist = client.exist(tableName, hashKey, "basic_test_sort_key_1".getBytes()); - Assert.assertTrue(exist); + client.set( + tableName, + hashKey, + "basic_test_sort_key_1".getBytes(), + "basic_test_value_1".getBytes()); - exist = client.exist(tableName, hashKey, "basic_test_sort_key_2".getBytes()); - Assert.assertFalse(exist); + // check exist + boolean exist = client.exist(tableName, hashKey, "basic_test_sort_key_1".getBytes()); + Assert.assertTrue(exist); - // check sortkey count - long sortKeyCount = client.sortKeyCount(tableName, hashKey); - Assert.assertEquals(1, sortKeyCount); + exist = client.exist(tableName, hashKey, "basic_test_sort_key_2".getBytes()); + Assert.assertFalse(exist); - // get - byte[] value = client.get(tableName, hashKey, "basic_test_sort_key_1".getBytes()); - Assert.assertArrayEquals("basic_test_value_1".getBytes(), value); + // check sortkey count + long sortKeyCount = client.sortKeyCount(tableName, hashKey); + Assert.assertEquals(1, sortKeyCount); - value = client.get(tableName, hashKey, "basic_test_sort_key_2".getBytes()); - Assert.assertEquals(null, value); + // get + byte[] value = client.get(tableName, hashKey, "basic_test_sort_key_1".getBytes()); + Assert.assertArrayEquals("basic_test_value_1".getBytes(), value); - // del - client.del(tableName, hashKey, "basic_test_sort_key_1".getBytes()); + value = client.get(tableName, hashKey, "basic_test_sort_key_2".getBytes()); + Assert.assertEquals(null, value); - // check exist - exist = client.exist(tableName, hashKey, "basic_test_sort_key_1".getBytes()); - Assert.assertFalse(exist); + // del + client.del(tableName, hashKey, "basic_test_sort_key_1".getBytes()); - // check sortkey count - sortKeyCount = client.sortKeyCount(tableName, hashKey); - Assert.assertEquals(0, sortKeyCount); + // check exist + exist = client.exist(tableName, hashKey, "basic_test_sort_key_1".getBytes()); + Assert.assertFalse(exist); - // check deleted - value = client.get(tableName, hashKey, "basic_test_sort_key_1".getBytes()); - Assert.assertEquals(null, value); + // check sortkey count + sortKeyCount = client.sortKeyCount(tableName, hashKey); + Assert.assertEquals(0, sortKeyCount); + + // check deleted + value = client.get(tableName, hashKey, "basic_test_sort_key_1".getBytes()); + Assert.assertEquals(null, value); + } } catch (PException e) { e.printStackTrace(); Assert.assertTrue(false); + } catch (InterruptedException e) { + e.printStackTrace(); } PegasusClientFactory.closeSingletonClient(); diff --git a/src/test/resource/pegasus.properties b/src/test/resource/pegasus.properties index 6282eea7..ec4da28e 100644 --- a/src/test/resource/pegasus.properties +++ b/src/test/resource/pegasus.properties @@ -1,6 +1,7 @@ meta_servers = 127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603 operation_timeout = 10000 async_workers = 2 -enable_perf_counter = false +enable_perf_counter = true +push_counter_type = prometheus perf_counter_tags = cluster=onebox,app=unit_test push_counter_interval_secs = 10 From a0abfd65da1623572af0fabd02e4203d38fe9b95 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Fri, 5 Jun 2020 20:53:01 +0800 Subject: [PATCH 5/9] update test code --- pom.xml | 11 ++ .../infra/pegasus/client/ClientOptions.java | 81 +++++++---- .../{Falcon.java => FalconCollector.java} | 19 ++- ...tricsReporter.java => FalconReporter.java} | 120 +++++++--------- .../infra/pegasus/metrics/MetricsManager.java | 15 +- .../infra/pegasus/metrics/MetricsPool.java | 34 ++--- .../pegasus/metrics/PegasusCollector.java | 6 - .../infra/pegasus/metrics/PegasusMonitor.java | 8 ++ .../infra/pegasus/metrics/Prometheus.java | 122 ----------------- .../pegasus/metrics/PrometheusCollector.java | 129 ++++++++++++++++++ .../pegasus/operator/client_operator.java | 14 +- .../infra/pegasus/rpc/ClusterOptions.java | 1 + .../infra/pegasus/client/TestBasic.java | 61 ++++----- .../pegasus/metrics/MetricsPoolTest.java | 20 +-- src/test/resource/pegasus.properties | 3 +- 15 files changed, 331 insertions(+), 313 deletions(-) rename src/main/java/com/xiaomi/infra/pegasus/metrics/{Falcon.java => FalconCollector.java} (85%) rename src/main/java/com/xiaomi/infra/pegasus/metrics/{MetricsReporter.java => FalconReporter.java} (67%) delete mode 100644 src/main/java/com/xiaomi/infra/pegasus/metrics/PegasusCollector.java create mode 100644 src/main/java/com/xiaomi/infra/pegasus/metrics/PegasusMonitor.java delete mode 100644 src/main/java/com/xiaomi/infra/pegasus/metrics/Prometheus.java create mode 100644 src/main/java/com/xiaomi/infra/pegasus/metrics/PrometheusCollector.java diff --git a/pom.xml b/pom.xml index e37ee80d..3f8a49bb 100644 --- a/pom.xml +++ b/pom.xml @@ -6,6 +6,7 @@ jar 1.12-thrift-0.11.0-inlined-SNAPSHOT Pegasus Java Client + org.junit.jupiter @@ -80,6 +81,16 @@ javax.annotation-api 1.3.2 + + io.prometheus + simpleclient + 0.4.0 + + + io.prometheus + simpleclient_httpserver + 0.4.0 + diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java b/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java index 2552678c..5b1e9710 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java @@ -23,7 +23,7 @@ * .operationTimeout(Duration.ofMillis(1000)) * .asyncWorkers(4) * .enablePerfCounter(false) - * .falconPerfCounterTags("") + * .perfCounterTags("") * .falconPushInterval(Duration.ofSeconds(10)) * .metaQueryTimeout(Duration.ofMillis(5000)) * .build(); @@ -36,7 +36,8 @@ public class ClientOptions { public static final Duration DEFAULT_OPERATION_TIMEOUT = Duration.ofMillis(1000); public static final int DEFAULT_ASYNC_WORKERS = Runtime.getRuntime().availableProcessors(); public static final boolean DEFAULT_ENABLE_PERF_COUNTER = true; - public static final String DEFAULT_FALCON_PERF_COUNTER_TAGS = ""; + public static final String DEFAULT_PERF_COUNTER_TYPE = "falcon"; + public static final String DEFAULT_PERF_COUNTER_TAGS = ""; public static final Duration DEFAULT_FALCON_PUSH_INTERVAL = Duration.ofSeconds(10); public static final boolean DEFAULT_ENABLE_WRITE_LIMIT = true; public static final Duration DEFAULT_META_QUERY_TIMEOUT = Duration.ofMillis(5000); @@ -45,7 +46,8 @@ public class ClientOptions { private final Duration operationTimeout; private final int asyncWorkers; private final boolean enablePerfCounter; - private final String falconPerfCounterTags; + private final String perfCounterType; + private final String perfCounterTags; private final Duration falconPushInterval; private final boolean enableWriteLimit; private final Duration metaQueryTimeout; @@ -55,7 +57,8 @@ protected ClientOptions(Builder builder) { this.operationTimeout = builder.operationTimeout; this.asyncWorkers = builder.asyncWorkers; this.enablePerfCounter = builder.enablePerfCounter; - this.falconPerfCounterTags = builder.falconPerfCounterTags; + this.perfCounterType = builder.perfCounterType; + this.perfCounterTags = builder.perfCounterTags; this.falconPushInterval = builder.falconPushInterval; this.enableWriteLimit = builder.enableWriteLimit; this.metaQueryTimeout = builder.metaQueryTimeout; @@ -66,7 +69,8 @@ protected ClientOptions(ClientOptions original) { this.operationTimeout = original.getOperationTimeout(); this.asyncWorkers = original.getAsyncWorkers(); this.enablePerfCounter = original.isEnablePerfCounter(); - this.falconPerfCounterTags = original.getFalconPerfCounterTags(); + this.perfCounterType = original.perfCounterType; + this.perfCounterTags = original.getPerfCounterTags(); this.falconPushInterval = original.getFalconPushInterval(); this.enableWriteLimit = original.isWriteLimitEnabled(); this.metaQueryTimeout = original.getMetaQueryTimeout(); @@ -111,7 +115,8 @@ public boolean equals(Object options) { && this.operationTimeout.toMillis() == clientOptions.operationTimeout.toMillis() && this.asyncWorkers == clientOptions.asyncWorkers && this.enablePerfCounter == clientOptions.enablePerfCounter - && this.falconPerfCounterTags.equals(clientOptions.falconPerfCounterTags) + && this.perfCounterType.equals(clientOptions.perfCounterType) + && this.perfCounterTags.equals(clientOptions.perfCounterTags) && this.falconPushInterval.toMillis() == clientOptions.falconPushInterval.toMillis() && this.enableWriteLimit == clientOptions.enableWriteLimit && this.metaQueryTimeout.toMillis() == clientOptions.metaQueryTimeout.toMillis(); @@ -129,10 +134,12 @@ public String toString() { + operationTimeout.toMillis() + ", asyncWorkers=" + asyncWorkers - + ", enablePerfCounter=" + + ", perfCounterType=" + enablePerfCounter - + ", falconPerfCounterTags='" - + falconPerfCounterTags + + ", perfCounterTags='" + + perfCounterType + + ", enablePerfCounter=" + + perfCounterTags + '\'' + ", falconPushInterval(s)=" + falconPushInterval.getSeconds() @@ -149,7 +156,8 @@ public static class Builder { private Duration operationTimeout = DEFAULT_OPERATION_TIMEOUT; private int asyncWorkers = DEFAULT_ASYNC_WORKERS; private boolean enablePerfCounter = DEFAULT_ENABLE_PERF_COUNTER; - private String falconPerfCounterTags = DEFAULT_FALCON_PERF_COUNTER_TAGS; + private String perfCounterType = DEFAULT_PERF_COUNTER_TYPE; + private String perfCounterTags = DEFAULT_PERF_COUNTER_TAGS; private Duration falconPushInterval = DEFAULT_FALCON_PUSH_INTERVAL; private boolean enableWriteLimit = DEFAULT_ENABLE_WRITE_LIMIT; private Duration metaQueryTimeout = DEFAULT_META_QUERY_TIMEOUT; @@ -194,8 +202,8 @@ public Builder asyncWorkers(int asyncWorkers) { /** * Whether to enable performance statistics. If true, the client will periodically report - * metrics to local falcon agent (currently we only support falcon as monitoring system). - * Defaults to {@literal true}, see {@link #DEFAULT_ENABLE_PERF_COUNTER}. + * metrics to local falcon agent (if set falcon as monitoring system) or open prometheus + * collector http server. Defaults to {@literal true}, see {@link #DEFAULT_ENABLE_PERF_COUNTER}. * * @param enablePerfCounter enablePerfCounter * @return {@code this} @@ -205,22 +213,34 @@ public Builder enablePerfCounter(boolean enablePerfCounter) { return this; } + /** + * set the perf-counter type, now only support falcon and prometheus, Defaults to {@literal + * falcon}, see {@link #DEFAULT_PERF_COUNTER_TYPE} + * + * @param perfCounterType perfCounterType + * @return this + */ + public Builder perfCounterType(String perfCounterType) { + this.perfCounterType = perfCounterType; + return this; + } + /** * Additional tags for falcon metrics. For example: * "cluster=c3srv-ad,job=recommend-service-history". Defaults to empty string, see {@link - * #DEFAULT_FALCON_PERF_COUNTER_TAGS}. + * #DEFAULT_PERF_COUNTER_TAGS}. * - * @param falconPerfCounterTags falconPerfCounterTags + * @param perfCounterTags perfCounterTags * @return {@code this} */ - public Builder falconPerfCounterTags(String falconPerfCounterTags) { - this.falconPerfCounterTags = falconPerfCounterTags; + public Builder perfCounterTags(String perfCounterTags) { + this.perfCounterTags = perfCounterTags; return this; } /** - * The interval to report metrics to local falcon agent. Defaults to {@literal 10s}, see {@link - * #DEFAULT_FALCON_PUSH_INTERVAL}. + * The interval to report metrics to local falcon agent(if set falcon as monitor system). + * Defaults to {@literal 10s}, see {@link #DEFAULT_FALCON_PUSH_INTERVAL}. * * @param falconPushInterval falconPushInterval * @return {@code this} @@ -279,7 +299,8 @@ public ClientOptions.Builder mutate() { .operationTimeout(getOperationTimeout()) .asyncWorkers(getAsyncWorkers()) .enablePerfCounter(isEnablePerfCounter()) - .falconPerfCounterTags(getFalconPerfCounterTags()) + .perfCounterType(getPerfCounterType()) + .perfCounterTags(getPerfCounterTags()) .falconPushInterval(getFalconPushInterval()) .enableWriteLimit(isWriteLimitEnabled()) .metaQueryTimeout(getMetaQueryTimeout()); @@ -316,8 +337,8 @@ public int getAsyncWorkers() { /** * Whether to enable performance statistics. If true, the client will periodically report metrics - * to local falcon agent (currently we only support falcon as monitoring system). Defaults to - * {@literal true}. + * to local falcon agent (if set falcon as monitoring system) or open prometheus collector http + * server. Defaults to {@literal true}. * * @return whether to enable performance statistics. */ @@ -326,16 +347,26 @@ public boolean isEnablePerfCounter() { } /** - * Additional tags for falcon metrics. Defaults to empty string. + * get perf-counter type, now only support falcon and prometheus + * + * @return perf-counter type + */ + public String getPerfCounterType() { + return perfCounterType; + } + + /** + * Additional tags for metrics. Defaults to empty string. * * @return additional tags for falcon metrics. */ - public String getFalconPerfCounterTags() { - return falconPerfCounterTags; + public String getPerfCounterTags() { + return perfCounterTags; } /** - * The interval to report metrics to local falcon agent. Defaults to {@literal 10s}. + * The interval to report metrics to local falcon agent(if set falcon as monitor system). Defaults + * to {@literal 10s}. * * @return the interval to report metrics to local falcon agent. */ diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/Falcon.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/FalconCollector.java similarity index 85% rename from src/main/java/com/xiaomi/infra/pegasus/metrics/Falcon.java rename to src/main/java/com/xiaomi/infra/pegasus/metrics/FalconCollector.java index 1dc14885..f8f1ee58 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/metrics/Falcon.java +++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/FalconCollector.java @@ -12,20 +12,19 @@ import org.json.JSONException; import org.json.JSONObject; -public class Falcon implements PegasusCollector { - +public class FalconCollector { private FalconMetric falconMetric = new FalconMetric(); private final MetricRegistry registry; public final String defaultTags; - public Falcon(String host, String tags, int reportStepSec, MetricRegistry registry) { + public FalconCollector(String host, String tags, int reportStepSec, MetricRegistry registry) { this.defaultTags = tags; this.registry = registry; falconMetric.endpoint = host; falconMetric.step = reportStepSec; } - public String updateMetric() { + public String metricsToJson() { falconMetric.timestamp = Tools.unixEpochMills() / 1000; StringBuilder builder = new StringBuilder(); @@ -54,8 +53,8 @@ public void genJsonsFromMeter(String name, Meter meter, StringBuilder output) throws JSONException { falconMetric.counterType = "GAUGE"; - falconMetric.metric = name + ".cps-1sec"; - falconMetric.tags = getTableTag(name, defaultTags, "@"); + falconMetric.metric = name + "_cps_1sec"; + falconMetric.tags = getTableTag(name, defaultTags); falconMetric.value = meter.getMeanRate(); oneMetricToJson(falconMetric, output); } @@ -65,14 +64,14 @@ public void genJsonsFromHistogram(String name, Histogram hist, StringBuilder out falconMetric.counterType = "GAUGE"; Snapshot s = hist.getSnapshot(); - falconMetric.metric = name + ".p99"; - falconMetric.tags = getTableTag(name, defaultTags, "@"); + falconMetric.metric = name + "_p99"; + falconMetric.tags = getTableTag(name, defaultTags); falconMetric.value = s.get99thPercentile(); oneMetricToJson(falconMetric, output); output.append(','); - falconMetric.metric = name + ".p999"; - falconMetric.tags = getTableTag(name, defaultTags, "@"); + falconMetric.metric = name + "_p999"; + falconMetric.tags = getTableTag(name, defaultTags); falconMetric.value = s.get999thPercentile(); oneMetricToJson(falconMetric, output); } diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsReporter.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/FalconReporter.java similarity index 67% rename from src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsReporter.java rename to src/main/java/com/xiaomi/infra/pegasus/metrics/FalconReporter.java index 3285c5aa..2be86666 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsReporter.java +++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/FalconReporter.java @@ -17,15 +17,15 @@ import org.slf4j.Logger; /** Created by weijiesun on 18-3-9. */ -public class MetricsReporter { - public MetricsReporter(int reportSecs, PegasusCollector collector) { +public class FalconReporter implements PegasusMonitor { + public FalconReporter(int reportSecs, FalconCollector falconCollectorMetric) { falconAgentIP = "127.0.0.1"; falconAgentPort = 1988; falconAgentSocket = falconAgentIP + ":" + falconAgentPort; reportIntervalSecs = reportSecs; falconRequestPath = "/v1/push"; - pegasusCollector = collector; + falconCollector = falconCollectorMetric; boot = new Bootstrap(); httpClientGroup = new NioEventLoopGroup(1); @@ -51,36 +51,32 @@ public void initChannel(SocketChannel ch) { reportTarget = null; } + @Override public void start() { reportStopped = false; tryConnect(); } + @Override public void stop() { httpClientGroup.execute( - new Runnable() { - @Override - public void run() { - reportStopped = true; - if (actionLater != null) { - actionLater.cancel(false); - } - if (reportTarget != null) { - reportTarget - .close() - .addListener( - new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture channelFuture) - throws Exception { + () -> { + reportStopped = true; + if (actionLater != null) { + actionLater.cancel(false); + } + if (reportTarget != null) { + reportTarget + .close() + .addListener( + (ChannelFutureListener) + channelFuture -> { if (channelFuture.isSuccess()) { logger.info("close channel to {} succeed", falconAgentSocket); } else { logger.warn("close channel to {} failed: ", channelFuture.cause()); } - } - }); - } + }); } }); @@ -95,68 +91,52 @@ public void operationComplete(ChannelFuture channelFuture) public void tryConnect() { boot.connect(falconAgentIP, falconAgentPort) .addListener( - new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture channelFuture) throws Exception { - if (channelFuture.isSuccess()) { - reportTarget = channelFuture.channel(); - logger.info("create channel with {} succeed, wait it active", falconAgentSocket); - } else { - logger.error( - "create channel with {} failed, connect later: ", - falconAgentSocket, - channelFuture.cause()); - scheduleNextConnect(); - } - } - }); + (ChannelFutureListener) + channelFuture -> { + if (channelFuture.isSuccess()) { + reportTarget = channelFuture.channel(); + logger.info( + "create channel with {} succeed, wait it active", falconAgentSocket); + } else { + logger.error( + "create channel with {} failed, connect later: ", + falconAgentSocket, + channelFuture.cause()); + scheduleNextConnect(); + } + }); } public void scheduleNextConnect() { if (reportStopped) return; actionLater = - httpClientGroup.schedule( - new Runnable() { - @Override - public void run() { - tryConnect(); - } - }, - (long) reportIntervalSecs, - TimeUnit.SECONDS); + httpClientGroup.schedule(this::tryConnect, (long) reportIntervalSecs, TimeUnit.SECONDS); } public void scheduleNextReport(final Channel channel) { if (reportStopped) return; actionLater = httpClientGroup.schedule( - new Runnable() { - @Override - public void run() { - reportMetrics(channel); - } - }, - reportIntervalSecs, - TimeUnit.SECONDS); + () -> reportMetrics(channel), reportIntervalSecs, TimeUnit.SECONDS); } public void reportMetrics(final Channel channel) { - String result; + String json_metrics; try { - result = pegasusCollector.updateMetric(); + json_metrics = falconCollector.metricsToJson(); } catch (JSONException ex) { logger.warn("encode metrics to json failed, skip current report, retry later: ", ex); scheduleNextReport(channel); return; } - logger.debug("generate metrics {} and try to report", result); + logger.debug("generate metrics {} and try to report", json_metrics); FullHttpRequest request = new DefaultFullHttpRequest( HttpVersion.HTTP_1_1, HttpMethod.POST, falconRequestPath, - Unpooled.copiedBuffer(result.getBytes())); + Unpooled.copiedBuffer(json_metrics.getBytes())); request.headers().add(HttpHeaders.Names.HOST, falconAgentSocket); request.headers().add(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); request.headers().add(HttpHeaders.Names.CONTENT_LENGTH, request.content().readableBytes()); @@ -165,18 +145,16 @@ public void reportMetrics(final Channel channel) { channel .writeAndFlush(request) .addListener( - new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture channelFuture) throws Exception { - if (!channelFuture.isSuccess()) { - logger.warn( - "report to {} failed, skip current report, retry later: ", - channel.toString(), - channelFuture.cause()); - channel.close(); - } - } - }); + (ChannelFutureListener) + channelFuture -> { + if (!channelFuture.isSuccess()) { + logger.warn( + "report to {} failed, skip current report, retry later: ", + channel.toString(), + channelFuture.cause()); + channel.close(); + } + }); } class HttpClientHandler extends SimpleChannelInboundHandler { @@ -226,7 +204,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { private int reportIntervalSecs; private String falconRequestPath; - private PegasusCollector pegasusCollector; + public FalconCollector falconCollector; private Bootstrap boot; private EventLoopGroup httpClientGroup; @@ -235,5 +213,5 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { private boolean reportStopped; private Channel reportTarget; - private static final Logger logger = org.slf4j.LoggerFactory.getLogger(MetricsReporter.class); + private static final Logger logger = org.slf4j.LoggerFactory.getLogger(FalconReporter.class); } diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsManager.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsManager.java index 37360bad..89c0fc1b 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsManager.java +++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsManager.java @@ -8,6 +8,7 @@ /** Created by weijiesun on 18-3-8. */ public final class MetricsManager { + public static void updateCount(String counterName, long count) { metrics.setMeter(counterName, count); } @@ -16,13 +17,14 @@ public static void setHistogramValue(String counterName, long value) { metrics.setHistorgram(counterName, value); } - public static final void initFromHost( + public static void initFromHost( String host, String tag, int reportIntervalSec, String perfCounterType) { synchronized (logger) { if (started) { logger.warn( - "perf counter system has started with host({}), tag({}), interval({}), " + "perf counter system({}) has started with host({}), tag({}), interval(if set falcon as system)({}), " + "skip this init with host({}), tag({}), interval(){}", + MetricsManager.perfCounterType, MetricsManager.host, MetricsManager.tag, MetricsManager.reportIntervalSecs, @@ -35,21 +37,22 @@ public static final void initFromHost( logger.info( "init metrics with host({}), tag({}), interval({})", host, tag, reportIntervalSec); + MetricsManager.perfCounterType = perfCounterType; MetricsManager.host = host; MetricsManager.tag = tag; MetricsManager.reportIntervalSecs = reportIntervalSec; metrics = new MetricsPool(host, tag, reportIntervalSec, perfCounterType); + metrics.start(); started = true; } } - public static final void detectHostAndInit( - String tag, int reportIntervalSec, String perfCounterType) { + public static void detectHostAndInit(String tag, int reportIntervalSec, String perfCounterType) { initFromHost( Tools.getLocalHostAddress().getHostName(), tag, reportIntervalSec, perfCounterType); } - public static final void finish() { + public static void finish() { synchronized (logger) { if (started) { metrics.stop(); @@ -62,7 +65,7 @@ public static final void finish() { private static String host; private static String tag; private static int reportIntervalSecs; - + private static String perfCounterType; private static MetricsPool metrics; private static final Logger logger = org.slf4j.LoggerFactory.getLogger(MetricsManager.class); diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java index 9623fb6f..df7e386f 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java +++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java @@ -8,31 +8,25 @@ /** Created by weijiesun on 18-3-9. */ public final class MetricsPool { - public final String metricType; - private final MetricRegistry registry = new MetricRegistry(); - public static MetricsReporter reporter; - public static PegasusCollector collector; - - public MetricsPool(String host, String tags, int reportStepSec, String type) { - metricType = type; + public static PegasusMonitor pegasusMonitor; + public MetricsPool(String host, String tags, int reportStepSec, String metricType) { if (metricType.equals("falcon")) { - collector = new Falcon(host, tags, reportStepSec, registry); - reporter = new MetricsReporter(reportStepSec, collector); - reporter.start(); - } else if (type.equals("prometheus")) { - collector = new Prometheus(tags, reportStepSec, registry); - ((Prometheus) collector).start(); + pegasusMonitor = + new FalconReporter( + reportStepSec, new FalconCollector(host, tags, reportStepSec, registry)); + } else if (metricType.equals("prometheus")) { + pegasusMonitor = new PrometheusCollector(tags, registry); } } + public void start() { + pegasusMonitor.start(); + } + public void stop() { - if (metricType.equals("falcon")) { - reporter.stop(); - } else if (metricType.equals("prometheus")) { - ((Prometheus) collector).stop(); - } + pegasusMonitor.stop(); } public void setMeter(String counterName, long count) { @@ -43,11 +37,11 @@ public void setHistorgram(String counterName, long value) { registry.histogram(counterName).update(value); } - public static String getTableTag(String counterName, String defaultTags, String regex) { + public static String getTableTag(String counterName, String defaultTags) { if (defaultTags.contains("table=")) { return defaultTags; } - String[] result = counterName.split(regex); + String[] result = counterName.split(":"); if (result.length >= 2) { return defaultTags.equals("") ? ("table=" + result[1]) diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/PegasusCollector.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/PegasusCollector.java deleted file mode 100644 index 1434d76f..00000000 --- a/src/main/java/com/xiaomi/infra/pegasus/metrics/PegasusCollector.java +++ /dev/null @@ -1,6 +0,0 @@ -package com.xiaomi.infra.pegasus.metrics; - -public interface PegasusCollector { - - public String updateMetric(); -} diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/PegasusMonitor.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/PegasusMonitor.java new file mode 100644 index 00000000..3c8a66a1 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/PegasusMonitor.java @@ -0,0 +1,8 @@ +package com.xiaomi.infra.pegasus.metrics; + +public interface PegasusMonitor { + + public void start(); + + public void stop(); +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/Prometheus.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/Prometheus.java deleted file mode 100644 index 1572012f..00000000 --- a/src/main/java/com/xiaomi/infra/pegasus/metrics/Prometheus.java +++ /dev/null @@ -1,122 +0,0 @@ -package com.xiaomi.infra.pegasus.metrics; - -import static com.xiaomi.infra.pegasus.metrics.MetricsPool.getTableTag; - -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Meter; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.Snapshot; -import io.prometheus.client.Collector; -import io.prometheus.client.GaugeMetricFamily; -import io.prometheus.client.exporter.HTTPServer; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -public class Prometheus extends Collector implements PegasusCollector { - - private final ScheduledExecutorService collectorTask; - private final String defaultTags; - private final MetricRegistry registry; - private int reportStepSec; - - private Map metrics = new ConcurrentHashMap<>(); - private Map> tableLabels = new HashMap<>(); - - public Prometheus(String defaultTags, int reportStepSec, MetricRegistry registry) { - this.defaultTags = defaultTags; - this.reportStepSec = reportStepSec; - this.registry = registry; - this.collectorTask = Executors.newScheduledThreadPool(1); - } - - public List collect() { - return new ArrayList<>(metrics.values()); - } - - public String updateMetric() { - for (Map.Entry entry : registry.getMeters().entrySet()) { - String QPSName = format(entry.getKey()); - updateQPSMetric(entry, QPSName); - } - - for (Map.Entry entry : registry.getHistograms().entrySet()) { - String latencyName = format(entry.getKey()); - updateLatencyMetric(entry, latencyName + "-99th"); - updateLatencyMetric(entry, latencyName + "-999th"); - } - - return "OK"; - } - - private void updateQPSMetric(Map.Entry meter, String name) { - Map labels = getLabel(getTableTag(name, defaultTags, ":")); - if (!metrics.containsKey(name)) { - // assert labels != null; - GaugeMetricFamily labeledGauge = - new GaugeMetricFamily(name, "help", new ArrayList<>(labels.keySet())); - labeledGauge.addMetric(new ArrayList<>(labels.values()), meter.getValue().getMeanRate()); - metrics.put(name, labeledGauge); - } - ((GaugeMetricFamily) metrics.get(name)) - .addMetric(new ArrayList<>(labels.values()), meter.getValue().getMeanRate()); - } - - private void updateLatencyMetric(Map.Entry meter, String name) { - Map labels = getLabel(getTableTag(name, defaultTags, ":")); - ArrayList labelList = new ArrayList<>(labels.values()); - Snapshot snapshot = meter.getValue().getSnapshot(); - double value = - name.contains("99th") ? snapshot.get99thPercentile() : snapshot.get999thPercentile(); - if (!metrics.containsKey(name)) { - // assert labels != null; - GaugeMetricFamily labeledGauge = - new GaugeMetricFamily(name, "help", new ArrayList<>(labels.keySet())); - labeledGauge.addMetric(new ArrayList<>(labels.values()), value); - metrics.put(name, labeledGauge); - } - - ((GaugeMetricFamily) metrics.get(name)).addMetric(labelList, value); - } - - private Map getLabel(String labels) { - if (tableLabels.containsKey(labels)) { - return tableLabels.get(labels); - } - - // todo nullptr - HashMap labelMap = new HashMap<>(); - String[] labelsString = labels.split(","); - for (String label : labelsString) { - String[] labelPair = label.split("="); - labelMap.put(labelPair[0], labelPair[1]); - } - tableLabels.put(labels, labelMap); - return labelMap; - } - - private String format(String name) { - return name.replaceAll("\\.", "_").replaceAll("@", ":"); - } - - public void start() { - try { - register(); - new HTTPServer(9091); - collectorTask.scheduleAtFixedRate(this::updateMetric, 0, 10, TimeUnit.SECONDS); - System.out.println("XXXX"); - } catch (IOException e) { - e.printStackTrace(); - } - } - - public void stop() { - collectorTask.shutdown(); - } -} diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/PrometheusCollector.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/PrometheusCollector.java new file mode 100644 index 00000000..a2beafab --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/PrometheusCollector.java @@ -0,0 +1,129 @@ +package com.xiaomi.infra.pegasus.metrics; + +import static com.xiaomi.infra.pegasus.metrics.MetricsPool.getTableTag; + +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Snapshot; +import com.google.common.collect.ImmutableList; +import io.prometheus.client.Collector; +import io.prometheus.client.GaugeMetricFamily; +import io.prometheus.client.exporter.HTTPServer; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.slf4j.Logger; + +public class PrometheusCollector extends Collector implements PegasusMonitor { + private static final Logger logger = org.slf4j.LoggerFactory.getLogger(PrometheusCollector.class); + + private static final int PORT = 9091; + + private final String defaultTags; + private final MetricRegistry registry; + private Map> tableLabels = new HashMap<>(); + + public PrometheusCollector(String defaultTags, MetricRegistry registry) { + this.defaultTags = defaultTags; + this.registry = registry; + } + + public List collect() { + final ImmutableList.Builder metricFamilySamplesBuilder = + ImmutableList.builder(); + updateMetric(metricFamilySamplesBuilder); + return metricFamilySamplesBuilder.build(); + } + + public void updateMetric(final ImmutableList.Builder builder) { + for (Map.Entry entry : registry.getMeters().entrySet()) { + updateQPSMetric(entry.getKey(), entry, builder); + } + + for (Map.Entry entry : registry.getHistograms().entrySet()) { + updateLatencyMetric(entry.getKey(), entry, builder); + } + } + + private void updateQPSMetric( + String name, + Map.Entry meter, + final ImmutableList.Builder builder) { + Map labels = getLabel(getTableTag(name, defaultTags)); + GaugeMetricFamily labeledGauge = + new GaugeMetricFamily( + formatQPSMetricName(name), "pegasus operation qps", new ArrayList<>(labels.keySet())); + labeledGauge.addMetric(new ArrayList<>(labels.values()), meter.getValue().getMeanRate()); + builder.add(labeledGauge); + } + + private void updateLatencyMetric( + String name, + Map.Entry meter, + final ImmutableList.Builder builder) { + Map labels = getLabel(getTableTag(name, defaultTags)); + Snapshot snapshot = meter.getValue().getSnapshot(); + updateLatencyMetric( + formatLatencyMetricName(name, "p99"), snapshot.get99thPercentile(), labels, builder); + updateLatencyMetric( + formatLatencyMetricName(name, "p999"), snapshot.get999thPercentile(), labels, builder); + } + + private void updateLatencyMetric( + String key, + double value, + Map labels, + final ImmutableList.Builder builder) { + GaugeMetricFamily labeledGauge = + new GaugeMetricFamily(key, "pegasus operation latency", new ArrayList<>(labels.keySet())); + labeledGauge.addMetric(new ArrayList<>(labels.values()), value); + builder.add(labeledGauge); + } + + private Map getLabel(String labels) { + if (tableLabels.containsKey(labels)) { + return tableLabels.get(labels); + } + + HashMap labelMap = new HashMap<>(); + String[] labelsString = labels.split(","); + for (String label : labelsString) { + String[] labelPair = label.split("="); + assert (labelPair.length == 2); + labelMap.put(labelPair[0], labelPair[1]); + } + tableLabels.put(labels, labelMap); + return labelMap; + } + + private String formatLatencyMetricName(String name, String percentage) { + String[] metricName = name.split(":"); + assert (metricName.length == 2); + return metricName[0] + "_" + percentage; + } + + private String formatQPSMetricName(String name) { + String[] metricName = name.split(":"); + assert (metricName.length == 2); + return metricName[0]; + } + + @Override + public void start() { + try { + register(); + new HTTPServer(PORT); + logger.debug("start the prometheus collector server, port = {}", PORT); + } catch (IOException e) { + logger.debug("start the prometheus collector server failed, error = {}", e.getMessage()); + } + } + + @Override + public void stop() { + logger.debug("stop the prometheus collector server, port = {}", PORT); + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java b/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java index 9be285ea..f5139493 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java +++ b/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java @@ -56,23 +56,23 @@ public String getQPSCounter() { break; } - // pegasus.client.put.succ.qps + // pegasus_client_put_succ_qps return new StringBuilder() - .append("pegasus.client.") + .append("pegasus_client_") .append(name()) - .append(".") + .append("_") .append(mark) - .append(".qps@") + .append("_qps:") .append(tableName) .toString(); } public String getLatencyCounter() { - // pegasus.client.put.latency + // pegasus_client_put_latency return new StringBuilder() - .append("pegasus.client.") + .append("pegasus_client_") .append(name()) - .append(".latency@") + .append("_latency:") .append(tableName) .toString(); } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/ClusterOptions.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/ClusterOptions.java index af14149e..64cd67dd 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/ClusterOptions.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/ClusterOptions.java @@ -115,6 +115,7 @@ public static ClusterOptions create(Properties config) { PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY, PEGASUS_PUSH_COUNTER_INTERVAL_SECS_DEF)); String perfCounterType = config.getProperty(PEGASUS_PUSH_COUNTER_TYPE_KEY, PEGASUS_PUSH_COUNTER_TYPE_DEF); + assert ((perfCounterType.equals("falcon") || perfCounterType.equals("prometheus"))); int metaQueryTimeout = Integer.parseInt( config.getProperty(PEGASUS_META_QUERY_TIMEOUT_KEY, PEGASUS_META_QUERY_TIMEOUT_DEF)); diff --git a/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java b/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java index 2ccebe42..86aa56aa 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java +++ b/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java @@ -136,53 +136,44 @@ public void testSetGetDel() throws PException { try { // set - while (true) { - Thread.sleep(1000); - - client.set( - tableName, - hashKey, - "basic_test_sort_key_1".getBytes(), - "basic_test_value_1".getBytes()); + client.set( + tableName, hashKey, "basic_test_sort_key_1".getBytes(), "basic_test_value_1".getBytes()); - // check exist - boolean exist = client.exist(tableName, hashKey, "basic_test_sort_key_1".getBytes()); - Assert.assertTrue(exist); + // check exist + boolean exist = client.exist(tableName, hashKey, "basic_test_sort_key_1".getBytes()); + Assert.assertTrue(exist); - exist = client.exist(tableName, hashKey, "basic_test_sort_key_2".getBytes()); - Assert.assertFalse(exist); + exist = client.exist(tableName, hashKey, "basic_test_sort_key_2".getBytes()); + Assert.assertFalse(exist); - // check sortkey count - long sortKeyCount = client.sortKeyCount(tableName, hashKey); - Assert.assertEquals(1, sortKeyCount); + // check sortkey count + long sortKeyCount = client.sortKeyCount(tableName, hashKey); + Assert.assertEquals(1, sortKeyCount); - // get - byte[] value = client.get(tableName, hashKey, "basic_test_sort_key_1".getBytes()); - Assert.assertArrayEquals("basic_test_value_1".getBytes(), value); + // get + byte[] value = client.get(tableName, hashKey, "basic_test_sort_key_1".getBytes()); + Assert.assertArrayEquals("basic_test_value_1".getBytes(), value); - value = client.get(tableName, hashKey, "basic_test_sort_key_2".getBytes()); - Assert.assertEquals(null, value); + value = client.get(tableName, hashKey, "basic_test_sort_key_2".getBytes()); + Assert.assertEquals(null, value); - // del - client.del(tableName, hashKey, "basic_test_sort_key_1".getBytes()); + // del + client.del(tableName, hashKey, "basic_test_sort_key_1".getBytes()); - // check exist - exist = client.exist(tableName, hashKey, "basic_test_sort_key_1".getBytes()); - Assert.assertFalse(exist); + // check exist + exist = client.exist(tableName, hashKey, "basic_test_sort_key_1".getBytes()); + Assert.assertFalse(exist); - // check sortkey count - sortKeyCount = client.sortKeyCount(tableName, hashKey); - Assert.assertEquals(0, sortKeyCount); + // check sortkey count + sortKeyCount = client.sortKeyCount(tableName, hashKey); + Assert.assertEquals(0, sortKeyCount); - // check deleted - value = client.get(tableName, hashKey, "basic_test_sort_key_1".getBytes()); - Assert.assertEquals(null, value); - } + // check deleted + value = client.get(tableName, hashKey, "basic_test_sort_key_1".getBytes()); + Assert.assertEquals(null, value); } catch (PException e) { e.printStackTrace(); Assert.assertTrue(false); - } catch (InterruptedException e) { - e.printStackTrace(); } PegasusClientFactory.closeSingletonClient(); diff --git a/src/test/java/com/xiaomi/infra/pegasus/metrics/MetricsPoolTest.java b/src/test/java/com/xiaomi/infra/pegasus/metrics/MetricsPoolTest.java index 198ec26a..d6e582b5 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/metrics/MetricsPoolTest.java +++ b/src/test/java/com/xiaomi/infra/pegasus/metrics/MetricsPoolTest.java @@ -23,14 +23,14 @@ public void before() { public void genJsonsFromMeter() throws Exception { String host = "simple-test-host.bj"; String tags = "what=you,like=another"; - Falcon falcon = new Falcon(host, tags, 20, r); + FalconCollector falconCollector = new FalconCollector(host, tags, 20, r); Meter m = r.meter("TestName"); m.mark(1); m.mark(1); StringBuilder builder = new StringBuilder(); - falcon.genJsonsFromMeter("TestName", m, builder); + falconCollector.genJsonsFromMeter("TestName", m, builder); JSONArray array = new JSONArray("[" + builder.toString() + "]"); Assert.assertEquals(1, array.length()); @@ -54,12 +54,12 @@ public void genJsonsFromMeter() throws Exception { public void genJsonFromHistogram() throws Exception { String host = "simple-test-host.bj"; String tags = "what=you,like=another"; - Falcon falcon = new Falcon(host, tags, 20, r); + FalconCollector falconCollector = new FalconCollector(host, tags, 20, r); Histogram h = r.histogram("TestHist"); for (int i = 0; i < 1000000; ++i) h.update((long) i); StringBuilder builder = new StringBuilder(); - falcon.genJsonsFromHistogram("TestHist", h, builder); + falconCollector.genJsonsFromHistogram("TestHist", h, builder); JSONArray array = new JSONArray("[" + builder.toString() + "]"); Assert.assertEquals(2, array.length()); @@ -79,7 +79,7 @@ public void genJsonFromHistogram() throws Exception { @Test public void oneMetricToJson() throws Exception { - Falcon.FalconMetric metric = new Falcon.FalconMetric(); + FalconCollector.FalconMetric metric = new FalconCollector.FalconMetric(); metric.endpoint = "1.2.3.4"; metric.metric = "simple_metric"; metric.timestamp = 12343455L; @@ -89,7 +89,7 @@ public void oneMetricToJson() throws Exception { metric.tags = "cluster=onebox,app=new"; StringBuilder builder = new StringBuilder(); - Falcon.oneMetricToJson(metric, builder); + FalconCollector.oneMetricToJson(metric, builder); JSONObject obj = new JSONObject(builder.toString()); Assert.assertEquals(metric.endpoint, obj.getString("endpoint")); @@ -102,7 +102,7 @@ public void oneMetricToJson() throws Exception { builder.setLength(0); metric.tags = ""; - Falcon.oneMetricToJson(metric, builder); + FalconCollector.oneMetricToJson(metric, builder); obj = new JSONObject(builder.toString()); Assert.assertEquals(metric.endpoint, obj.getString("endpoint")); Assert.assertEquals(metric.metric, obj.getString("metric")); @@ -117,7 +117,7 @@ public void oneMetricToJson() throws Exception { public void metricsToJson() throws Exception { String host = "simple-test-host.bj"; String tags = "what=you,like=another"; - MetricsPool pool = new MetricsPool(host, tags, 20, "falcon"); + MetricsPool pool = new MetricsPool(host, tags, 20, "falconCollector"); pool.setMeter("aaa@temp", 1); pool.setMeter("aaa", 2); @@ -127,7 +127,9 @@ public void metricsToJson() throws Exception { pool.setHistorgram("ccc@temp", i); } - JSONArray array = new JSONArray(MetricsPool.collector.updateMetric()); + JSONArray array = + new JSONArray( + ((FalconReporter) MetricsPool.pegasusMonitor).falconCollector.metricsToJson()); Assert.assertEquals(6, array.length()); for (int i = 0; i < array.length(); ++i) { JSONObject j = array.getJSONObject(i); diff --git a/src/test/resource/pegasus.properties b/src/test/resource/pegasus.properties index ec4da28e..6282eea7 100644 --- a/src/test/resource/pegasus.properties +++ b/src/test/resource/pegasus.properties @@ -1,7 +1,6 @@ meta_servers = 127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603 operation_timeout = 10000 async_workers = 2 -enable_perf_counter = true -push_counter_type = prometheus +enable_perf_counter = false perf_counter_tags = cluster=onebox,app=unit_test push_counter_interval_secs = 10 From 12df7dd41113daa6fe1f885b3621710da6fba1a6 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Mon, 8 Jun 2020 19:32:59 +0800 Subject: [PATCH 6/9] refactor --- .../pegasus/metrics/MetricsPoolTest.java | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/src/test/java/com/xiaomi/infra/pegasus/metrics/MetricsPoolTest.java b/src/test/java/com/xiaomi/infra/pegasus/metrics/MetricsPoolTest.java index d6e582b5..e8f6fbff 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/metrics/MetricsPoolTest.java +++ b/src/test/java/com/xiaomi/infra/pegasus/metrics/MetricsPoolTest.java @@ -6,11 +6,15 @@ import com.codahale.metrics.Histogram; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; +import io.prometheus.client.Collector.MetricFamilySamples; +import java.util.Arrays; +import java.util.List; import junit.framework.Assert; import org.json.JSONArray; import org.json.JSONObject; import org.junit.Before; import org.junit.Test; +import org.junit.jupiter.api.Assertions; /** Created by weijiesun on 18-3-9. */ public class MetricsPoolTest { @@ -145,5 +149,32 @@ public void metricsToJson() throws Exception { } } + @Test + public void testPrometheus() { + + String tags = "what=you,like=another"; + PrometheusCollector prometheusCollector = new PrometheusCollector(tags, r); + + Meter m = r.meter("TestQPSName:temp"); + for (int i = 0; i < 100; ++i) m.mark(1); + + Histogram h = r.histogram("testLatency:temp"); + for (int i = 0; i < 10000; ++i) h.update((long) i); + + List metricFamilySamples = prometheusCollector.collect(); + + Assertions.assertEquals(3, metricFamilySamples.size()); + + MetricFamilySamples QPSMetric = metricFamilySamples.get(0); + Assertions.assertEquals("TestQPSName", QPSMetric.name); + Assertions.assertArrayEquals( + QPSMetric.samples.get(0).labelNames.toArray(), + Arrays.asList("what", "like", "table").toArray()); + Assertions.assertArrayEquals( + QPSMetric.samples.get(0).labelValues.toArray(), + Arrays.asList("you", "another", "temp").toArray()); + Assertions.assertTrue(QPSMetric.samples.get(0).value != 0); + } + MetricRegistry r; } From 04f3e0655d45d9c7967c63371f4b9a5230a34369 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Mon, 8 Jun 2020 19:55:29 +0800 Subject: [PATCH 7/9] refactor --- .../infra/pegasus/metrics/MetricsPoolTest.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/test/java/com/xiaomi/infra/pegasus/metrics/MetricsPoolTest.java b/src/test/java/com/xiaomi/infra/pegasus/metrics/MetricsPoolTest.java index e8f6fbff..5c68045e 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/metrics/MetricsPoolTest.java +++ b/src/test/java/com/xiaomi/infra/pegasus/metrics/MetricsPoolTest.java @@ -40,7 +40,7 @@ public void genJsonsFromMeter() throws Exception { Assert.assertEquals(1, array.length()); String[] metrics = { - "TestName.cps-1sec", "TestName.cps-1min", "TestName.cps-5min", "TestName.cps-15min" + "TestName_cps_1sec", "TestName_cps_1min", "TestName_cps_5min", "TestName_cps_15min" }; for (int i = 0; i < array.length(); ++i) { @@ -68,7 +68,7 @@ public void genJsonFromHistogram() throws Exception { JSONArray array = new JSONArray("[" + builder.toString() + "]"); Assert.assertEquals(2, array.length()); - String[] metrics = {"TestHist.p99", "TestHist.p999"}; + String[] metrics = {"TestHist_p99", "TestHist_p999"}; for (int i = 0; i < array.length(); ++i) { JSONObject j = array.getJSONObject(i); @@ -121,14 +121,14 @@ public void oneMetricToJson() throws Exception { public void metricsToJson() throws Exception { String host = "simple-test-host.bj"; String tags = "what=you,like=another"; - MetricsPool pool = new MetricsPool(host, tags, 20, "falconCollector"); + MetricsPool pool = new MetricsPool(host, tags, 20, "falcon"); - pool.setMeter("aaa@temp", 1); + pool.setMeter("aaa:temp", 1); pool.setMeter("aaa", 2); for (int i = 0; i < 10000; ++i) { pool.setHistorgram("ccc", i); - pool.setHistorgram("ccc@temp", i); + pool.setHistorgram("ccc:temp", i); } JSONArray array = @@ -138,7 +138,7 @@ public void metricsToJson() throws Exception { for (int i = 0; i < array.length(); ++i) { JSONObject j = array.getJSONObject(i); - if (j.getString("metric").contains("@")) { + if (j.getString("metric").contains(":")) { Assert.assertEquals(tags + ",table=temp", j.getString("tags")); } else { Assert.assertEquals(tags, j.getString("tags")); From 6c0c371633d24edd687a4bbf51a4b21e5718d9e8 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Fri, 12 Jun 2020 17:02:09 +0800 Subject: [PATCH 8/9] update log and client options --- .../pegasus/metrics/FalconCollector.java | 8 ++++---- .../infra/pegasus/metrics/MetricsPool.java | 8 +++++++- .../pegasus/metrics/PrometheusCollector.java | 20 ++++--------------- .../pegasus/operator/client_operator.java | 4 ++-- 4 files changed, 17 insertions(+), 23 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/FalconCollector.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/FalconCollector.java index f8f1ee58..71ee7bb1 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/metrics/FalconCollector.java +++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/FalconCollector.java @@ -1,5 +1,6 @@ package com.xiaomi.infra.pegasus.metrics; +import static com.xiaomi.infra.pegasus.metrics.MetricsPool.getMetricName; import static com.xiaomi.infra.pegasus.metrics.MetricsPool.getTableTag; import com.codahale.metrics.Histogram; @@ -52,8 +53,7 @@ public String metricsToJson() { public void genJsonsFromMeter(String name, Meter meter, StringBuilder output) throws JSONException { falconMetric.counterType = "GAUGE"; - - falconMetric.metric = name + "_cps_1sec"; + falconMetric.metric = getMetricName(name, ""); falconMetric.tags = getTableTag(name, defaultTags); falconMetric.value = meter.getMeanRate(); oneMetricToJson(falconMetric, output); @@ -64,13 +64,13 @@ public void genJsonsFromHistogram(String name, Histogram hist, StringBuilder out falconMetric.counterType = "GAUGE"; Snapshot s = hist.getSnapshot(); - falconMetric.metric = name + "_p99"; + falconMetric.metric = getMetricName(name, "_p99"); falconMetric.tags = getTableTag(name, defaultTags); falconMetric.value = s.get99thPercentile(); oneMetricToJson(falconMetric, output); output.append(','); - falconMetric.metric = name + "_p999"; + falconMetric.metric = getMetricName(name, "_p999"); falconMetric.tags = getTableTag(name, defaultTags); falconMetric.value = s.get999thPercentile(); oneMetricToJson(falconMetric, output); diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java index df7e386f..1328342f 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java +++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java @@ -41,7 +41,7 @@ public static String getTableTag(String counterName, String defaultTags) { if (defaultTags.contains("table=")) { return defaultTags; } - String[] result = counterName.split(":"); + String[] result = counterName.split("@"); if (result.length >= 2) { return defaultTags.equals("") ? ("table=" + result[1]) @@ -49,4 +49,10 @@ public static String getTableTag(String counterName, String defaultTags) { } return defaultTags; } + + public static String getMetricName(String name, String suffix) { + String[] result = name.split("@"); + assert (result.length == 2); + return result[0] + suffix; + } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/PrometheusCollector.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/PrometheusCollector.java index a2beafab..3a9d5b1d 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/metrics/PrometheusCollector.java +++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/PrometheusCollector.java @@ -1,5 +1,6 @@ package com.xiaomi.infra.pegasus.metrics; +import static com.xiaomi.infra.pegasus.metrics.MetricsPool.getMetricName; import static com.xiaomi.infra.pegasus.metrics.MetricsPool.getTableTag; import com.codahale.metrics.Histogram; @@ -55,7 +56,7 @@ private void updateQPSMetric( Map labels = getLabel(getTableTag(name, defaultTags)); GaugeMetricFamily labeledGauge = new GaugeMetricFamily( - formatQPSMetricName(name), "pegasus operation qps", new ArrayList<>(labels.keySet())); + getMetricName(name, ""), "pegasus operation qps", new ArrayList<>(labels.keySet())); labeledGauge.addMetric(new ArrayList<>(labels.values()), meter.getValue().getMeanRate()); builder.add(labeledGauge); } @@ -66,10 +67,9 @@ private void updateLatencyMetric( final ImmutableList.Builder builder) { Map labels = getLabel(getTableTag(name, defaultTags)); Snapshot snapshot = meter.getValue().getSnapshot(); + updateLatencyMetric(getMetricName(name, "_p99"), snapshot.get99thPercentile(), labels, builder); updateLatencyMetric( - formatLatencyMetricName(name, "p99"), snapshot.get99thPercentile(), labels, builder); - updateLatencyMetric( - formatLatencyMetricName(name, "p999"), snapshot.get999thPercentile(), labels, builder); + getMetricName(name, "_p999"), snapshot.get999thPercentile(), labels, builder); } private void updateLatencyMetric( @@ -99,18 +99,6 @@ private Map getLabel(String labels) { return labelMap; } - private String formatLatencyMetricName(String name, String percentage) { - String[] metricName = name.split(":"); - assert (metricName.length == 2); - return metricName[0] + "_" + percentage; - } - - private String formatQPSMetricName(String name) { - String[] metricName = name.split(":"); - assert (metricName.length == 2); - return metricName[0]; - } - @Override public void start() { try { diff --git a/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java b/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java index f5139493..61959d2d 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java +++ b/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java @@ -62,7 +62,7 @@ public String getQPSCounter() { .append(name()) .append("_") .append(mark) - .append("_qps:") + .append("_qps@") .append(tableName) .toString(); } @@ -72,7 +72,7 @@ public String getLatencyCounter() { return new StringBuilder() .append("pegasus_client_") .append(name()) - .append("_latency:") + .append("_latency@") .append(tableName) .toString(); } From ad42d21ed73011c2a4f941041f32a308e90301cc Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Wed, 17 Jun 2020 10:43:09 +0800 Subject: [PATCH 9/9] update test --- .../infra/pegasus/metrics/MetricsPool.java | 6 ++++-- .../infra/pegasus/metrics/MetricsPoolTest.java | 18 +++++++----------- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java index 1328342f..905fd58d 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java +++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java @@ -52,7 +52,9 @@ public static String getTableTag(String counterName, String defaultTags) { public static String getMetricName(String name, String suffix) { String[] result = name.split("@"); - assert (result.length == 2); - return result[0] + suffix; + if (result.length >= 2) { + return result[0] + suffix; + } + return name + suffix; } } diff --git a/src/test/java/com/xiaomi/infra/pegasus/metrics/MetricsPoolTest.java b/src/test/java/com/xiaomi/infra/pegasus/metrics/MetricsPoolTest.java index 5c68045e..c687ead2 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/metrics/MetricsPoolTest.java +++ b/src/test/java/com/xiaomi/infra/pegasus/metrics/MetricsPoolTest.java @@ -39,15 +39,13 @@ public void genJsonsFromMeter() throws Exception { JSONArray array = new JSONArray("[" + builder.toString() + "]"); Assert.assertEquals(1, array.length()); - String[] metrics = { - "TestName_cps_1sec", "TestName_cps_1min", "TestName_cps_5min", "TestName_cps_15min" - }; + String metric = "TestName"; for (int i = 0; i < array.length(); ++i) { JSONObject j = array.getJSONObject(i); Assert.assertEquals(tags, j.getString("tags")); - Assert.assertEquals(metrics[i], j.getString("metric")); + Assert.assertEquals(metric, j.getString("metric")); Assert.assertEquals("GAUGE", j.getString("counterType")); Assert.assertEquals(20, j.getInt("step")); Assert.assertEquals(host, j.getString("endpoint")); @@ -123,22 +121,20 @@ public void metricsToJson() throws Exception { String tags = "what=you,like=another"; MetricsPool pool = new MetricsPool(host, tags, 20, "falcon"); - pool.setMeter("aaa:temp", 1); - pool.setMeter("aaa", 2); + pool.setMeter("aaa@temp", 1); for (int i = 0; i < 10000; ++i) { pool.setHistorgram("ccc", i); - pool.setHistorgram("ccc:temp", i); } JSONArray array = new JSONArray( ((FalconReporter) MetricsPool.pegasusMonitor).falconCollector.metricsToJson()); - Assert.assertEquals(6, array.length()); + Assert.assertEquals(3, array.length()); for (int i = 0; i < array.length(); ++i) { JSONObject j = array.getJSONObject(i); - if (j.getString("metric").contains(":")) { + if (j.getString("metric").contains("aaa")) { Assert.assertEquals(tags + ",table=temp", j.getString("tags")); } else { Assert.assertEquals(tags, j.getString("tags")); @@ -155,10 +151,10 @@ public void testPrometheus() { String tags = "what=you,like=another"; PrometheusCollector prometheusCollector = new PrometheusCollector(tags, r); - Meter m = r.meter("TestQPSName:temp"); + Meter m = r.meter("TestQPSName@temp"); for (int i = 0; i < 100; ++i) m.mark(1); - Histogram h = r.histogram("testLatency:temp"); + Histogram h = r.histogram("testLatency@temp"); for (int i = 0; i < 10000; ++i) h.update((long) i); List metricFamilySamples = prometheusCollector.collect();