From 5d6fa72bd0c117c57e3debf817d8acb923d3b687 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EC=9D=B4=EC=9C=A0=EB=B9=84?= Date: Tue, 24 Aug 2021 16:55:09 +0900 Subject: [PATCH] HADOOP-17861. improve YARN Registry DNS Server qps --- .../client/api/DNSOperationsFactory.java | 2 +- .../client/api/RegistryConstants.java | 7 + .../registry/server/dns/RegistryDNS.java | 168 +++++++++++------- 3 files changed, 115 insertions(+), 62 deletions(-) diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/api/DNSOperationsFactory.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/api/DNSOperationsFactory.java index 8a26b4b450def..3d6456abcbdb0 100644 --- a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/api/DNSOperationsFactory.java +++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/api/DNSOperationsFactory.java @@ -63,7 +63,7 @@ public static DNSOperations createInstance(String name, DNSOperations operations = null; switch (impl) { case DNSJAVA: - operations = new RegistryDNS(name); + operations = new RegistryDNS(name, conf); break; default: diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/api/RegistryConstants.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/api/RegistryConstants.java index f9c0fd77d1739..2af1d0acb579f 100644 --- a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/api/RegistryConstants.java +++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/api/RegistryConstants.java @@ -385,4 +385,11 @@ public interface RegistryConstants { * {@value}. */ String SUBPATH_COMPONENTS = "/components/"; + + + /** + * num of threads for serving dns query. + */ + String KEY_NUM_THREADS = DNS_PREFIX + "num-threads"; + } diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNS.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNS.java index 8dbe79682e2d5..b59938e0411ca 100644 --- a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNS.java +++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNS.java @@ -126,6 +126,7 @@ public class RegistryDNS extends AbstractService implements DNSOperations, static final int FLAG_DNSSECOK = 1; static final int FLAG_SIGONLY = 2; + private static final int DEFAULT_NUM_THREADS = 4; private static final Logger LOG = LoggerFactory.getLogger(RegistryDNS.class); @@ -167,7 +168,12 @@ public class RegistryDNS extends AbstractService implements DNSOperations, */ public RegistryDNS(String name) { super(name); - executor = HadoopExecutors.newCachedThreadPool( + int nThreads = Runtime.getRuntime().availableProcessors() / 4; + if (nThreads < DEFAULT_NUM_THREADS) { + nThreads = DEFAULT_NUM_THREADS; + } + executor = HadoopExecutors.newFixedThreadPool( + nThreads, new ThreadFactory() { private AtomicInteger counter = new AtomicInteger(1); @@ -180,6 +186,27 @@ public Thread newThread(Runnable r) { }); } + public RegistryDNS(String name, Configuration conf) { + super(name); + int nThreads = conf.getInt(KEY_NUM_THREADS, Runtime.getRuntime().availableProcessors() / 4); + if (nThreads < DEFAULT_NUM_THREADS) { + nThreads = DEFAULT_NUM_THREADS; + } + LOG.info("using {} threads for serving dns query", nThreads); + executor = HadoopExecutors.newFixedThreadPool( + nThreads, + new ThreadFactory() { + private AtomicInteger counter = new AtomicInteger(1); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, + "RegistryDNS " + + counter.getAndIncrement()); + } + }); + } + public void initializeChannels(Configuration conf) throws Exception { if (channelsInitialized) { return; @@ -805,6 +832,65 @@ public void nioTCPClient(SocketChannel ch) throws IOException { } + /** + * Process a UDP request. + * + * @param channel the datagram channel for the request. + * @param remoteAddress the socket address of client. + * @param input the input ByteBuffer. + * @throws IOException if the udp processing generates an issue. + */ + public void nioUDPClient(DatagramChannel channel, SocketAddress remoteAddress, ByteBuffer input) throws IOException { + ByteBuffer output = ByteBuffer.allocate(4096); + byte[] in = null; + byte[] response = null; + Message query = null; + try { + try { + int position = input.position(); + in = new byte[position]; + input.flip(); + input.get(in); + query = new Message(in); + LOG.info("{}: received UDP query {}", remoteAddress, + query.getQuestion()); + response = generateReply(query, null); + if (response.length > output.capacity()) { + LOG.warn("{}: Response of UDP query {} exceeds limit {}", + remoteAddress, query.getQuestion(), output.limit()); + query.getHeader().setFlag(Flags.TC); + response = query.toWire(); + } + if (response == null) { + return; + } + } catch (IOException e) { + response = formErrorMessage(in); + if (response == null) { + LOG.debug("Error during create an error message." + + " Failed to parse a header", e); + return; + } + } + output.clear(); + output.put(response); + output.flip(); + + LOG.debug("{}: sending response", remoteAddress); + channel.send(output, remoteAddress); + } catch (Exception e) { + if (e instanceof IOException && remoteAddress != null) { + throw NetUtils.wrapException(channel.socket().getInetAddress().getHostName(), + channel.socket().getPort(), + ((InetSocketAddress) remoteAddress).getHostName(), + ((InetSocketAddress) remoteAddress).getPort(), + (IOException) e); + } else { + throw e; + } + } + } + /** * Calculate the inbound message length, which is related in the message as an * unsigned short value. @@ -832,9 +918,8 @@ private int getMessgeLength(ByteBuffer buf) throws EOFException { */ public void serveNIOTCP(ServerSocketChannel serverSocketChannel, InetAddress addr, int port) throws Exception { - try { - - while (true) { + while (true) { + try { final SocketChannel socketChannel = serverSocketChannel.accept(); if (socketChannel != null) { executor.submit(new Callable() { @@ -848,10 +933,9 @@ public Boolean call() throws Exception { } else { Thread.sleep(500); } + } catch (Exception e) { + LOG.error("Error during accept", e); } - } catch (IOException e) { - throw NetUtils.wrapException(addr.getHostName(), port, - addr.getHostName(), port, e); } } @@ -868,7 +952,7 @@ private ServerSocketChannel openTCPChannel(InetAddress addr, int port) ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); try { serverSocketChannel.socket().bind(new InetSocketAddress(addr, port)); - serverSocketChannel.configureBlocking(false); + serverSocketChannel.configureBlocking(true); } catch (IOException e) { throw NetUtils.wrapException(null, 0, InetAddress.getLocalHost().getHostName(), @@ -917,7 +1001,7 @@ public void addNIOUDP(final InetAddress addr, final int port) @Override public Boolean call() throws Exception { try { - serveNIOUDP(udpChannel, addr, port); + serveNIOUDP(udpChannel); } catch (Exception e) { LOG.error("Error initializing DNS UDP listener", e); throw e; @@ -931,60 +1015,22 @@ public Boolean call() throws Exception { * Process an inbound UDP request. * * @param channel the UDP datagram channel. - * @param addr local host address. - * @param port local port. - * @throws IOException if the UDP processing fails. */ - private synchronized void serveNIOUDP(DatagramChannel channel, - InetAddress addr, int port) throws Exception { - SocketAddress remoteAddress = null; - try { - - ByteBuffer input = ByteBuffer.allocate(4096); - ByteBuffer output = ByteBuffer.allocate(4096); - byte[] in = null; - - while (true) { + private synchronized void serveNIOUDP(DatagramChannel channel) { + while (true) { + try { + ByteBuffer input = ByteBuffer.allocate(4096); input.clear(); - try { - remoteAddress = channel.receive(input); - } catch (IOException e) { - LOG.debug("Error during message receipt", e); - continue; - } - Message query; - byte[] response = null; - try { - int position = input.position(); - in = new byte[position]; - input.flip(); - input.get(in); - query = new Message(in); - LOG.info("{}: received UDP query {}", remoteAddress, - query.getQuestion()); - response = generateReply(query, null); - if (response == null) { - continue; + SocketAddress remoteAddress = channel.receive(input); + executor.submit(new Callable() { + @Override + public Boolean call() throws Exception { + nioUDPClient(channel, remoteAddress, input); + return true; } - } catch (IOException e) { - response = formErrorMessage(in); - } - output.clear(); - output.put(response); - output.flip(); - - LOG.debug("{}: sending response", remoteAddress); - channel.send(output, remoteAddress); - } - } catch (Exception e) { - if (e instanceof IOException && remoteAddress != null) { - throw NetUtils.wrapException(addr.getHostName(), - port, - ((InetSocketAddress) remoteAddress).getHostName(), - ((InetSocketAddress) remoteAddress).getPort(), - (IOException) e); - } else { - throw e; + }); + } catch (Exception e) { + LOG.debug("Error during message receive", e); } } } @@ -1392,7 +1438,7 @@ byte addAnswer(Message response, Name name, int type, int dclass, } } else if (sr.isSuccessful()) { List rrsets = sr.answers(); - LOG.info("found answers {}", rrsets); + LOG.debug("found answers {}", rrsets); for (RRset rrset : rrsets) { addRRset(name, response, rrset, Section.ANSWER, flags); }