Skip to content

Commit

Permalink
Refactor metrics system to reduce periodic reporting load
Browse files Browse the repository at this point in the history
  • Loading branch information
kuszz committed Aug 14, 2024
1 parent 3673b55 commit f556887
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.Arrays;
import java.util.Map;
import java.util.function.Supplier;

import com.google.common.collect.Maps;
import io.prometheus.client.CollectorRegistry;
Expand All @@ -27,12 +28,15 @@
import io.prometheus.client.Histogram;
import io.prometheus.client.Summary;

import org.apache.uniffle.common.util.JavaUtils;

public class MetricsManager {
private final CollectorRegistry collectorRegistry;
private final String[] defaultLabelNames;
private final String[] defaultLabelValues;
private static final double[] QUANTILES = {0.50, 0.75, 0.90, 0.95, 0.99};
private static final double QUANTILE_ERROR = 0.01;
private Map<String, SupplierGauge> supplierGaugeMap;

public MetricsManager() {
this(null, Maps.newHashMap());
Expand All @@ -47,6 +51,7 @@ public MetricsManager(CollectorRegistry collectorRegistry, Map<String, String> d
this.defaultLabelNames = defaultLabels.keySet().toArray(new String[0]);
this.defaultLabelValues =
Arrays.stream(defaultLabelNames).map(defaultLabels::get).toArray(String[]::new);
this.supplierGaugeMap = JavaUtils.newConcurrentMap();
}

public CollectorRegistry getCollectorRegistry() {
Expand Down Expand Up @@ -79,6 +84,19 @@ public Gauge.Child addLabeledGauge(String name) {
return c.labels(this.defaultLabelValues);
}

public void addLabeledGauge(String name, Supplier<Double> supplier) {
supplierGaugeMap.computeIfAbsent(
name,
metricName ->
new SupplierGauge(
name,
"Gauge " + name,
supplier,
this.defaultLabelNames,
this.defaultLabelValues)
.register(collectorRegistry));
}

public Histogram addHistogram(String name, double[] buckets, String... labels) {
return addHistogram(name, "Histogram " + name, buckets, labels);
}
Expand Down Expand Up @@ -112,4 +130,18 @@ public Summary.Child addLabeledSummary(String name) {
}
return builder.register(collectorRegistry).labels(defaultLabelValues);
}

public void unregisterAllSupplierGauge() {
for (SupplierGauge gauge : supplierGaugeMap.values()) {
collectorRegistry.unregister(gauge);
}
supplierGaugeMap.clear();
}

public void unregisterSupplierGauge(String name) {
if (supplierGaugeMap.containsKey(name)) {
collectorRegistry.unregister(supplierGaugeMap.get(name));
supplierGaugeMap.remove(name);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.common.metrics;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;

import io.prometheus.client.Collector;
import io.prometheus.client.GaugeMetricFamily;

class SupplierGauge extends Collector implements Collector.Describable {
private String name;
private String help;
private Supplier<Double> supplier;
private List<String> labelNames;
private List<String> labelValues;

SupplierGauge(
String name,
String help,
Supplier<Double> supplier,
String[] labelNames,
String[] labelValues) {
this.name = name;
this.help = help;
this.supplier = supplier;
this.labelNames = Arrays.asList(labelNames);
this.labelValues = Arrays.asList(labelValues);
}

@Override
public List<MetricFamilySamples> collect() {
List<MetricFamilySamples.Sample> samples = new ArrayList<>();
samples.add(
new MetricFamilySamples.Sample(
this.name, this.labelNames, this.labelValues, this.supplier.get()));
MetricFamilySamples mfs = new MetricFamilySamples(this.name, Type.GAUGE, this.help, samples);
List<MetricFamilySamples> mfsList = new ArrayList<MetricFamilySamples>(1);
mfsList.add(mfs);
return mfsList;
}

@Override
public List<MetricFamilySamples> describe() {
return Collections.<MetricFamilySamples>singletonList(
new GaugeMetricFamily(this.name, this.help, this.labelNames));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.apache.uniffle.storage.common.Storage;
import org.apache.uniffle.storage.util.StorageType;

import static org.apache.uniffle.server.ShuffleServerMetrics.EVENT_QUEUE_SIZE;

public class DefaultFlushEventHandler implements FlushEventHandler {
private static final Logger LOG = LoggerFactory.getLogger(DefaultFlushEventHandler.class);

Expand Down Expand Up @@ -77,8 +79,6 @@ public void handle(ShuffleDataFlushEvent event) {
// We need to release the memory when discarding the event
event.doCleanup();
ShuffleServerMetrics.counterTotalDroppedEventNum.inc();
} else {
ShuffleServerMetrics.gaugeEventQueueSize.inc();
}
}

Expand Down Expand Up @@ -160,8 +160,6 @@ private void handleEventAndUpdateMetrics(ShuffleDataFlushEvent event, Storage st
} else {
ShuffleServerMetrics.gaugeFallbackFlushThreadPoolQueueSize.dec();
}

ShuffleServerMetrics.gaugeEventQueueSize.dec();
}
}

Expand All @@ -178,6 +176,7 @@ protected void initFlushEventExecutor() {
hadoopThreadPoolExecutor = createFlushEventExecutor(poolSize, "HadoopFlushEventThreadPool");
}
fallbackThreadPoolExecutor = createFlushEventExecutor(5, "FallBackFlushEventThreadPool");
ShuffleServerMetrics.addLabeledGauge(EVENT_QUEUE_SIZE, () -> (double) flushQueue.size());
startEventProcessor();
}

Expand Down Expand Up @@ -248,7 +247,7 @@ protected Executor createFlushEventExecutor(int poolSize, String threadFactoryNa

@Override
public int getEventNumInFlush() {
return (int) ShuffleServerMetrics.gaugeEventQueueSize.get();
return flushQueue.size();
}

@Override
Expand Down

This file was deleted.

25 changes: 18 additions & 7 deletions server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.netty.util.internal.PlatformDependent;
import io.prometheus.client.CollectorRegistry;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -73,13 +74,15 @@
import static org.apache.uniffle.common.config.RssBaseConf.RSS_TEST_MODE_ENABLE;
import static org.apache.uniffle.server.ShuffleServerConf.SERVER_DECOMMISSION_CHECK_INTERVAL;
import static org.apache.uniffle.server.ShuffleServerConf.SERVER_DECOMMISSION_SHUTDOWN;
import static org.apache.uniffle.server.ShuffleServerMetrics.USED_DIRECT_MEMORY_SIZE;
import static org.apache.uniffle.server.ShuffleServerMetrics.USED_DIRECT_MEMORY_SIZE_BY_GRPC_NETTY;
import static org.apache.uniffle.server.ShuffleServerMetrics.USED_DIRECT_MEMORY_SIZE_BY_NETTY;

/** Server that manages startup/shutdown of a {@code Greeter} server. */
public class ShuffleServer {

private static final Logger LOG = LoggerFactory.getLogger(ShuffleServer.class);
private RegisterHeartBeat registerHeartBeat;
private NettyDirectMemoryTracker directMemoryUsageReporter;
private String id;
private String ip;
private int grpcPort;
Expand Down Expand Up @@ -156,7 +159,6 @@ public void start() throws Exception {
initMetricsReporter();

registerHeartBeat.startHeartBeat();
directMemoryUsageReporter.start();
Runtime.getRuntime()
.addShutdownHook(
new Thread() {
Expand Down Expand Up @@ -184,10 +186,6 @@ public void stopServer() throws Exception {
registerHeartBeat.shutdown();
LOG.info("HeartBeat Stopped!");
}
if (directMemoryUsageReporter != null) {
directMemoryUsageReporter.stop();
LOG.info("Direct memory usage tracker Stopped!");
}
if (storageManager != null) {
storageManager.stop();
LOG.info("MultiStorage Stopped!");
Expand Down Expand Up @@ -304,7 +302,6 @@ private void initialization() throws Exception {
}

registerHeartBeat = new RegisterHeartBeat(this);
directMemoryUsageReporter = new NettyDirectMemoryTracker(shuffleServerConf);
shuffleFlushManager = new ShuffleFlushManager(shuffleServerConf, this, storageManager);
shuffleBufferManager =
new ShuffleBufferManager(shuffleServerConf, shuffleFlushManager, nettyServerEnabled);
Expand All @@ -320,6 +317,20 @@ private void initialization() throws Exception {
storageManager,
shuffleMergeManager);
shuffleTaskManager.start();
ShuffleServerMetrics.addLabeledGauge(
USED_DIRECT_MEMORY_SIZE_BY_NETTY, () -> (double) PlatformDependent.usedDirectMemory());
ShuffleServerMetrics.addLabeledGauge(
USED_DIRECT_MEMORY_SIZE_BY_GRPC_NETTY,
() ->
(double)
io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent.usedDirectMemory());
ShuffleServerMetrics.addLabeledGauge(
USED_DIRECT_MEMORY_SIZE,
() ->
(double)
(PlatformDependent.usedDirectMemory()
+ io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent
.usedDirectMemory()));

setServer();
}
Expand Down
Loading

0 comments on commit f556887

Please sign in to comment.