Skip to content

Commit

Permalink
[#1271] improvement(server): change transportTime and processTime sum…
Browse files Browse the repository at this point in the history
…mary to Thread Pool Instead of block (#1272)

Change transportTime and processTime summary to Thread Pool Instead of block

Closes #1271


### What changes were proposed in this pull request?

Change transportTime and processTime summary to Thread Pool Instead of block

### Why are the changes needed?

Not block GRPC Thread

Fix: #1271 

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?
No
  • Loading branch information
lifeSo authored Nov 1, 2023
1 parent d992c38 commit 5a89609
Show file tree
Hide file tree
Showing 14 changed files with 105 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public GrpcServer getServer(ShuffleManagerGrpcService service) {
}
return GrpcServer.Builder.newBuilder()
.conf(conf)
.grpcMetrics(GRPCMetrics.getEmptyGRPCMetrics())
.grpcMetrics(GRPCMetrics.getEmptyGRPCMetrics(conf))
.addService(service)
.build();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.uniffle.common.metrics;

import org.apache.uniffle.common.config.RssConf;

public class EmptyGRPCMetrics extends GRPCMetrics {

public EmptyGRPCMetrics(String tags) {
super(tags);
public EmptyGRPCMetrics(RssConf rssConf, String tags) {
super(rssConf, tags);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;

import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.Constants;

public abstract class GRPCMetrics extends RPCMetrics {
Expand All @@ -40,10 +41,11 @@ public abstract class GRPCMetrics extends RPCMetrics {
protected Gauge.Child gaugeGrpcOpen;
protected Counter.Child counterGrpcTotal;

public GRPCMetrics(String tags) {
super(tags);
public GRPCMetrics(RssConf rssConf, String tags) {
super(rssConf, tags);
}

@Override
public abstract void registerMetrics();

@Override
Expand All @@ -61,6 +63,7 @@ public void registerGeneralMetrics() {
metricsManager.addLabeledGauge(GRPC_SERVER_CONNECTION_NUMBER));
}

@Override
public void incCounter(String methodName) {
if (isRegistered) {
super.incCounter(methodName);
Expand All @@ -69,6 +72,7 @@ public void incCounter(String methodName) {
}
}

@Override
public void decCounter(String methodName) {
if (isRegistered) {
super.decCounter(methodName);
Expand All @@ -84,7 +88,7 @@ public Counter.Child getCounterGrpcTotal() {
return counterGrpcTotal;
}

public static GRPCMetrics getEmptyGRPCMetrics() {
return new EmptyGRPCMetrics(Constants.SHUFFLE_SERVER_VERSION);
public static GRPCMetrics getEmptyGRPCMetrics(RssConf rssConf) {
return new EmptyGRPCMetrics(rssConf, Constants.SHUFFLE_SERVER_VERSION);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;

import org.apache.uniffle.common.config.RssConf;

public abstract class NettyMetrics extends RPCMetrics {

private static final String NETTY_ACTIVE_CONNECTION = "netty_active_connection";
Expand All @@ -28,8 +30,8 @@ public abstract class NettyMetrics extends RPCMetrics {
protected Gauge.Child gaugeNettyActiveConn;
protected Counter.Child counterNettyException;

public NettyMetrics(String tags) {
super(tags);
public NettyMetrics(RssConf rssConf, String tags) {
super(rssConf, tags);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,63 @@
package org.apache.uniffle.common.metrics;

import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.prometheus.client.Summary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.ThreadUtils;

public abstract class RPCMetrics {
private static final Logger LOG = LoggerFactory.getLogger(RPCMetrics.class);

protected boolean isRegistered = false;
protected Map<String, Counter.Child> counterMap = JavaUtils.newConcurrentMap();
protected Map<String, Gauge.Child> gaugeMap = JavaUtils.newConcurrentMap();
protected Map<String, Summary.Child> transportTimeSummaryMap = JavaUtils.newConcurrentMap();
protected Map<String, Summary.Child> processTimeSummaryMap = JavaUtils.newConcurrentMap();
private static final String THREAD_POOL_CORE_SIZE =
"rss.server.summary.metric.thread.pool.core.size";
private static final int THREAD_POOL_CORE_SIZE_DEFAULT_VALUE = 2;
private static final String THREAD_POOL_MAX_SIZE =
"rss.server.summary.metric.thread.pool.max.size";
private static final int THREAD_POOL_MAX_SIZE_DEFAULT_VALUE = 20;
private static final String KEEP_ALIVE_TIME =
"rss.server.summary.metric.thread.pool.keep.alive.time";
private static final int KEEP_ALIVE_TIME_DEFAULT_VALUE = 60;
private final ExecutorService summaryObservePool;
protected MetricsManager metricsManager;
protected String tags;

public RPCMetrics(String tags) {
public RPCMetrics(RssConf rssConf, String tags) {
this.tags = tags;
int coreSize = rssConf.getInteger(THREAD_POOL_CORE_SIZE, THREAD_POOL_CORE_SIZE_DEFAULT_VALUE);
int maxSize = rssConf.getInteger(THREAD_POOL_MAX_SIZE, THREAD_POOL_MAX_SIZE_DEFAULT_VALUE);
int keepAliveTime = rssConf.getInteger(KEEP_ALIVE_TIME, KEEP_ALIVE_TIME_DEFAULT_VALUE);
this.summaryObservePool =
new ThreadPoolExecutor(
coreSize,
maxSize,
keepAliveTime,
TimeUnit.SECONDS,
Queues.newLinkedBlockingQueue(),
ThreadUtils.getThreadFactory("SummaryObserveThreadPool"));
LOG.info(
"Init summary observe thread pool, core size:{}, max size:{}, keep alive time:{}",
coreSize,
maxSize,
keepAliveTime);
}

public abstract void registerMetrics();
Expand Down Expand Up @@ -116,14 +152,17 @@ public void decCounter(String metricKey) {
public void recordTransportTime(String methodName, long transportTimeInMillionSecond) {
Summary.Child summary = transportTimeSummaryMap.get(methodName);
if (summary != null) {
summary.observe(transportTimeInMillionSecond / Constants.MILLION_SECONDS_PER_SECOND);
summaryObservePool.execute(
() ->
summary.observe(transportTimeInMillionSecond / Constants.MILLION_SECONDS_PER_SECOND));
}
}

public void recordProcessTime(String methodName, long processTimeInMillionSecond) {
Summary.Child summary = processTimeSummaryMap.get(methodName);
if (summary != null) {
summary.observe(processTimeInMillionSecond / Constants.MILLION_SECONDS_PER_SECOND);
summaryObservePool.execute(
() -> summary.observe(processTimeInMillionSecond / Constants.MILLION_SECONDS_PER_SECOND));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.metrics.GRPCMetrics;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.proto.ShuffleManagerGrpc;
Expand All @@ -43,8 +44,9 @@ public class GrpcServerTest {
@Test
public void testGrpcExecutorPool() throws Exception {
// Explicitly setting the synchronizing variable as false at the beginning of test run
RssConf rssConf = new RssConf();
GrpcServer.reset();
GRPCMetrics grpcMetrics = GRPCMetrics.getEmptyGRPCMetrics();
GRPCMetrics grpcMetrics = GRPCMetrics.getEmptyGRPCMetrics(rssConf);
grpcMetrics.register(new CollectorRegistry(true));
GrpcServer.GrpcThreadPoolExecutor executor =
new GrpcServer.GrpcThreadPoolExecutor(
Expand Down Expand Up @@ -96,7 +98,8 @@ public void testGrpcExecutorPool() throws Exception {

@Test
public void testRandomPort() throws Exception {
GRPCMetrics grpcMetrics = GRPCMetrics.getEmptyGRPCMetrics();
RssConf rssConf = new RssConf();
GRPCMetrics grpcMetrics = GRPCMetrics.getEmptyGRPCMetrics(rssConf);
grpcMetrics.register(new CollectorRegistry(true));
RssBaseConf conf = new RssBaseConf();
conf.set(RPC_SERVER_PORT, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ private void registerMetrics() throws Exception {
LOG.info("Register metrics");
CollectorRegistry coordinatorCollectorRegistry = new CollectorRegistry(true);
CoordinatorMetrics.register(coordinatorCollectorRegistry);
grpcMetrics = new CoordinatorGrpcMetrics();
grpcMetrics = new CoordinatorGrpcMetrics(coordinatorConf);
grpcMetrics.register(new CollectorRegistry(true));
boolean verbose = coordinatorConf.getBoolean(CoordinatorConf.RSS_JVM_METRICS_VERBOSE_ENABLE);
CollectorRegistry jvmCollectorRegistry = new CollectorRegistry(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.uniffle.coordinator.metric;

import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.metrics.GRPCMetrics;
import org.apache.uniffle.common.util.Constants;

Expand All @@ -31,8 +32,8 @@ public class CoordinatorGrpcMetrics extends GRPCMetrics {
"grpc_get_shuffle_assignments_total";
private static final String GRPC_HEARTBEAT_TOTAL = "grpc_heartbeat_total";

public CoordinatorGrpcMetrics() {
super(Constants.COORDINATOR_TAG);
public CoordinatorGrpcMetrics(RssConf rssConf) {
super(rssConf, Constants.COORDINATOR_TAG);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void testGrpcConnectionSize() throws Exception {
baseConf.set(RssBaseConf.RPC_SERVER_PORT, 20001);
baseConf.set(RssBaseConf.RPC_EXECUTOR_SIZE, 2);

GRPCMetrics grpcMetrics = new CoordinatorGrpcMetrics();
GRPCMetrics grpcMetrics = new CoordinatorGrpcMetrics(baseConf);
grpcMetrics.register(new CollectorRegistry(true));
GrpcServer grpcServer =
GrpcServer.Builder.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,9 @@ private void registerMetrics() {
CollectorRegistry shuffleServerCollectorRegistry = new CollectorRegistry(true);
String tags = coverToString();
ShuffleServerMetrics.register(shuffleServerCollectorRegistry, tags);
grpcMetrics = new ShuffleServerGrpcMetrics(tags);
grpcMetrics = new ShuffleServerGrpcMetrics(this.shuffleServerConf, tags);
grpcMetrics.register(new CollectorRegistry(true));
nettyMetrics = new ShuffleServerNettyMetrics(tags);
nettyMetrics = new ShuffleServerNettyMetrics(shuffleServerConf, tags);
nettyMetrics.register(new CollectorRegistry(true));
CollectorRegistry jvmCollectorRegistry = new CollectorRegistry(true);
boolean verbose =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,32 @@ public class ShuffleServerConf extends RssBaseConf {
+ "network_bandwidth = 10Gbps, buffer size should be ~ 1.25MB."
+ "Default is 0, OS will dynamically adjust the buf size.");

public static final ConfigOption<Integer> SUMMARY_METRIC_WAIT_QUEUE_SIZE =
ConfigOptions.key("rss.server.summary.metric.wait.queue.size")
.intType()
.defaultValue(1000)
.withDescription(
"size of waiting queue for thread pool that used for calc summary metric.");

public static final ConfigOption<Integer> SUMMARY_METRIC_THREAD_POOL_CORE_SIZE =
ConfigOptions.key("rss.server.summary.metric.thread.pool.core.size")
.intType()
.defaultValue(2)
.withDescription("core thread number of thread pool that used for calc summary metric.");

public static final ConfigOption<Integer> SUMMARY_METRIC_THREAD_POOL_MAX_SIZE =
ConfigOptions.key("rss.server.summary.metric.thread.pool.max.size")
.intType()
.defaultValue(20)
.withDescription("max thread number of thread pool that used for calc summary metric.");

public static final ConfigOption<Integer> SUMMARY_METRIC_THREAD_POOL_KEEP_ALIVE_TIME =
ConfigOptions.key("rss.server.summary.metric.thread.pool.keep.alive.time")
.intType()
.defaultValue(60)
.withDescription(
"keep alive time of thread pool that used for calc summary metric, in SECONDS.");

public ShuffleServerConf() {}

public ShuffleServerConf(String fileName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ public class ShuffleServerGrpcMetrics extends GRPCMetrics {
private static final String GRPC_GET_MEMORY_SHUFFLE_DATA_PROCESS_LATENCY =
"grpc_get_memory_shuffle_data_process_latency";

public ShuffleServerGrpcMetrics(String tags) {
super(tags);
public ShuffleServerGrpcMetrics(ShuffleServerConf shuffleServerConf, String tags) {
super(shuffleServerConf, tags);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public class ShuffleServerNettyMetrics extends NettyMetrics {
private static final String NETTY_GET_MEMORY_SHUFFLE_DATA_REQUEST =
"netty_get_memory_shuffle_data_request";

public ShuffleServerNettyMetrics(String tags) {
super(tags);
public ShuffleServerNettyMetrics(ShuffleServerConf shuffleServerConf, String tags) {
super(shuffleServerConf, tags);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@

public class ShuffleServerGrpcMetricsTest {
@Test
public void testLatencyMetrics() {
public void testLatencyMetrics() throws InterruptedException {
ShuffleServerConf shuffleServerConf = new ShuffleServerConf();
ShuffleServerGrpcMetrics metrics =
new ShuffleServerGrpcMetrics(Constants.SHUFFLE_SERVER_VERSION);
new ShuffleServerGrpcMetrics(shuffleServerConf, Constants.SHUFFLE_SERVER_VERSION);
metrics.register(new CollectorRegistry(true));
metrics.recordTransportTime(ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD, 1000);
metrics.recordTransportTime(ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD, 500);
Expand All @@ -44,6 +45,7 @@ public void testLatencyMetrics() {
assertEquals(3, sendTimeSummaryTime.size());
assertEquals(3, processTimeSummaryTime.size());

Thread.sleep(1000L);
assertEquals(
1D, sendTimeSummaryTime.get(ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD).get().sum);
assertEquals(
Expand Down

0 comments on commit 5a89609

Please sign in to comment.