2828import static org .asynchttpclient .util .HttpConstants .ResponseStatusCodes .TEMPORARY_REDIRECT_307 ;
2929import static org .asynchttpclient .util .MiscUtils .isNonEmpty ;
3030import com .spotify .futures .ConcurrencyReducer ;
31+ import io .netty .channel .EventLoopGroup ;
3132import io .netty .handler .codec .http .DefaultHttpHeaders ;
3233import io .netty .handler .codec .http .HttpRequest ;
3334import io .netty .handler .codec .http .HttpResponse ;
35+ import io .netty .resolver .NameResolver ;
3436import io .netty .util .concurrent .DefaultThreadFactory ;
3537import java .io .ByteArrayOutputStream ;
3638import java .io .IOException ;
39+ import java .net .InetAddress ;
3740import java .net .InetSocketAddress ;
3841import java .net .URI ;
3942import java .security .GeneralSecurityException ;
5356import javax .ws .rs .client .Client ;
5457import javax .ws .rs .core .HttpHeaders ;
5558import javax .ws .rs .core .Response .Status ;
59+ import lombok .Data ;
5660import lombok .Getter ;
5761import lombok .SneakyThrows ;
5862import lombok .extern .slf4j .Slf4j ;
5963import org .apache .commons .lang3 .Validate ;
6064import org .apache .pulsar .PulsarVersion ;
6165import org .apache .pulsar .client .admin .internal .PulsarAdminImpl ;
6266import org .apache .pulsar .client .api .PulsarClientException ;
67+ import org .apache .pulsar .client .impl .PulsarClientSharedResourcesImpl ;
6368import org .apache .pulsar .client .impl .PulsarServiceNameResolver ;
6469import org .apache .pulsar .client .impl .ServiceNameResolver ;
6570import org .apache .pulsar .client .impl .conf .ClientConfigurationData ;
71+ import org .apache .pulsar .client .util .ExecutorProvider ;
6672import org .apache .pulsar .client .util .PulsarHttpAsyncSslEngineFactory ;
6773import org .apache .pulsar .common .util .FutureUtil ;
6874import org .apache .pulsar .common .util .PulsarSslConfiguration ;
6975import org .apache .pulsar .common .util .PulsarSslFactory ;
76+ import org .apache .pulsar .common .util .netty .DnsResolverUtil ;
77+ import org .apache .pulsar .common .util .netty .EventLoopUtil ;
7078import org .asynchttpclient .AsyncCompletionHandlerBase ;
7179import org .asynchttpclient .AsyncHandler ;
7280import org .asynchttpclient .AsyncHttpClient ;
@@ -103,6 +111,10 @@ public class AsyncHttpConnector implements Connector, AsyncHttpRequestExecutor {
103111 new DefaultThreadFactory ("delayer" ));
104112 private ScheduledExecutorService sslRefresher ;
105113 private final boolean acceptGzipCompression ;
114+ @ Getter
115+ private final NameResolver <InetAddress > nameResolver ;
116+ private final EventLoopGroup eventLoopGroup ;
117+ private final boolean createdEventLoopGroup ;
106118 private final Map <String , ConcurrencyReducer <Response >> concurrencyReducers = new ConcurrentHashMap <>();
107119 private PulsarSslFactory sslFactory ;
108120
@@ -112,33 +124,66 @@ public AsyncHttpConnector(Client client, ClientConfigurationData conf, int autoC
112124 (int ) client .getConfiguration ().getProperty (ClientProperties .READ_TIMEOUT ),
113125 PulsarAdminImpl .DEFAULT_REQUEST_TIMEOUT_SECONDS * 1000 ,
114126 autoCertRefreshTimeSeconds ,
115- conf , acceptGzipCompression );
127+ conf , acceptGzipCompression , null );
116128 }
117129
118130 @ SneakyThrows
119131 public AsyncHttpConnector (int connectTimeoutMs , int readTimeoutMs ,
120132 int requestTimeoutMs ,
121133 int autoCertRefreshTimeSeconds , ClientConfigurationData conf ,
122- boolean acceptGzipCompression ) {
134+ boolean acceptGzipCompression ,
135+ PulsarClientSharedResourcesImpl sharedResources ) {
123136 Validate .notEmpty (conf .getServiceUrl (), "Service URL is not provided" );
124137 serviceNameResolver = new PulsarServiceNameResolver ();
125138 String serviceUrl = conf .getServiceUrl ();
126139 serviceNameResolver .updateServiceUrl (serviceUrl );
127140 this .acceptGzipCompression = acceptGzipCompression ;
141+ SharedResourceHolder sharedResourceHolder =
142+ buildResourcesIfConfigured (sharedResources );
143+ this .nameResolver = sharedResourceHolder .getNameResolver ();
144+ this .eventLoopGroup = sharedResourceHolder .getEventLoopGroup ();
145+ this .createdEventLoopGroup = sharedResourceHolder .isCreateEventLoop ();
128146 AsyncHttpClientConfig asyncHttpClientConfig =
129147 createAsyncHttpClientConfig (conf , connectTimeoutMs , readTimeoutMs , requestTimeoutMs ,
130- autoCertRefreshTimeSeconds );
148+ autoCertRefreshTimeSeconds , sharedResources );
131149 httpClient = createAsyncHttpClient (asyncHttpClientConfig );
132150 this .requestTimeout = requestTimeoutMs > 0 ? Duration .ofMillis (requestTimeoutMs ) : null ;
133151 this .maxRetries = httpClient .getConfig ().getMaxRequestRetry ();
134152 }
135153
154+ private SharedResourceHolder buildResourcesIfConfigured (
155+ PulsarClientSharedResourcesImpl sharedResources ) {
156+ EventLoopGroup eventLoopGroup = null ;
157+ NameResolver <InetAddress > nameResolver = null ;
158+ boolean createdEventLoopGroup = false ;
159+ if (sharedResources != null && sharedResources .getDnsResolverGroup () != null ) {
160+ if (sharedResources .getIoEventLoopGroup () != null ) {
161+ eventLoopGroup = sharedResources .getIoEventLoopGroup ();
162+ } else {
163+ // build an EventLoopGroup with default value
164+ eventLoopGroup = EventLoopUtil .newEventLoopGroup (
165+ Runtime .getRuntime ().availableProcessors (), false ,
166+ new ExecutorProvider .ExtendedThreadFactory ("pulsar-admin-client-io" ,
167+ Thread .currentThread ().isDaemon ()));
168+ createdEventLoopGroup = true ;
169+ }
170+ nameResolver = DnsResolverUtil .adaptToNameResolver (
171+ sharedResources .getDnsResolverGroup ().createAddressResolver (eventLoopGroup ));
172+ } else {
173+ return SharedResourceHolder .EMPTY ;
174+ }
175+ return new SharedResourceHolder (nameResolver , eventLoopGroup , createdEventLoopGroup );
176+ }
177+
136178 private AsyncHttpClientConfig createAsyncHttpClientConfig (ClientConfigurationData conf , int connectTimeoutMs ,
137179 int readTimeoutMs ,
138- int requestTimeoutMs , int autoCertRefreshTimeSeconds )
180+ int requestTimeoutMs ,
181+ int autoCertRefreshTimeSeconds ,
182+ PulsarClientSharedResourcesImpl sharedResources )
139183 throws GeneralSecurityException , IOException {
140184 DefaultAsyncHttpClientConfig .Builder confBuilder = new DefaultAsyncHttpClientConfig .Builder ();
141- configureAsyncHttpClientConfig (conf , connectTimeoutMs , readTimeoutMs , requestTimeoutMs , confBuilder );
185+ configureAsyncHttpClientConfig (conf , connectTimeoutMs ,
186+ readTimeoutMs , requestTimeoutMs , confBuilder , sharedResources );
142187 if (conf .getServiceUrl ().startsWith ("https://" )) {
143188 configureAsyncHttpClientSslEngineFactory (conf , autoCertRefreshTimeSeconds , confBuilder );
144189 }
@@ -148,7 +193,8 @@ private AsyncHttpClientConfig createAsyncHttpClientConfig(ClientConfigurationDat
148193
149194 private void configureAsyncHttpClientConfig (ClientConfigurationData conf , int connectTimeoutMs , int readTimeoutMs ,
150195 int requestTimeoutMs ,
151- DefaultAsyncHttpClientConfig .Builder confBuilder ) {
196+ DefaultAsyncHttpClientConfig .Builder confBuilder ,
197+ PulsarClientSharedResourcesImpl sharedResources ) {
152198 if (conf .getConnectionsPerBroker () > 0 ) {
153199 confBuilder .setMaxConnectionsPerHost (conf .getConnectionsPerBroker ());
154200 // Use the request timeout value for acquireFreeChannelTimeout so that we don't need to add
@@ -159,6 +205,14 @@ private void configureAsyncHttpClientConfig(ClientConfigurationData conf, int co
159205 if (conf .getConnectionMaxIdleSeconds () > 0 ) {
160206 confBuilder .setPooledConnectionIdleTimeout (conf .getConnectionMaxIdleSeconds () * 1000 );
161207 }
208+ if (sharedResources != null ) {
209+ if (this .eventLoopGroup != null ) {
210+ confBuilder .setEventLoopGroup (this .eventLoopGroup );
211+ }
212+ if (sharedResources .getTimer () != null ) {
213+ confBuilder .setNettyTimer (sharedResources .getTimer ());
214+ }
215+ }
162216 confBuilder .setCookieStore (null );
163217 confBuilder .setUseProxyProperties (true );
164218 confBuilder .setFollowRedirect (false );
@@ -177,7 +231,7 @@ public boolean keepAlive(InetSocketAddress remoteAddress, Request ahcRequest,
177231 HttpRequest request , HttpResponse response ) {
178232 // Close connection upon a server error or per HTTP spec
179233 return (response .status ().code () / 100 != 5 )
180- && super .keepAlive (remoteAddress , ahcRequest , request , response );
234+ && super .keepAlive (remoteAddress , ahcRequest , request , response );
181235 }
182236 });
183237 confBuilder .setDisableHttpsEndpointIdentificationAlgorithm (!conf .isTlsHostnameVerificationEnable ());
@@ -331,9 +385,9 @@ private <T> void retryOperation(
331385 throwable );
332386 }
333387 resultFuture .completeExceptionally (
334- new RetryException ("Could not complete the operation. Number of retries "
335- + "has been exhausted. Failed reason: " + throwable .getMessage (),
336- throwable ));
388+ new RetryException ("Could not complete the operation. Number of retries "
389+ + "has been exhausted. Failed reason: " + throwable .getMessage (),
390+ throwable ));
337391 }
338392 }
339393 } else {
@@ -376,7 +430,7 @@ public CompletableFuture<Response> executeRequest(Request request) {
376430 }
377431
378432 public CompletableFuture <Response > executeRequest (Request request ,
379- Supplier <AsyncHandler <Response >> handlerSupplier ) {
433+ Supplier <AsyncHandler <Response >> handlerSupplier ) {
380434 return executeRequest (request , handlerSupplier , 0 );
381435 }
382436
@@ -426,14 +480,17 @@ private CompletableFuture<Response> executeRedirect(Request request, Response re
426480 if (switchToGet ) {
427481 builder .setMethod (GET );
428482 }
483+ if (this .nameResolver != null ) {
484+ builder .setNameResolver (this .nameResolver );
485+ }
429486 builder .setUri (newUri );
430487 if (keepBody ) {
431488 builder .setCharset (request .getCharset ());
432489 if (isNonEmpty (request .getFormParams ())) {
433490 builder .setFormParams (request .getFormParams ());
434491 } else if (request .getStringData () != null ) {
435492 builder .setBody (request .getStringData ());
436- } else if (request .getByteData () != null ){
493+ } else if (request .getByteData () != null ) {
437494 builder .setBody (request .getByteData ());
438495 } else if (request .getByteBufferData () != null ) {
439496 builder .setBody (request .getByteBufferData ());
@@ -485,6 +542,9 @@ private Request prepareRequest(InetSocketAddress host, ClientRequest request) th
485542 BoundRequestBuilder builder =
486543 httpClient .prepare (currentRequest .getMethod (), currentRequest .getUri ().toString ());
487544
545+ if (this .nameResolver != null ) {
546+ builder .setNameResolver (this .nameResolver );
547+ }
488548 if (currentRequest .hasEntity ()) {
489549 ByteArrayOutputStream outStream = new ByteArrayOutputStream ();
490550 currentRequest .setStreamProvider (contentLength -> outStream );
@@ -518,6 +578,9 @@ public void close() {
518578 if (sslRefresher != null ) {
519579 sslRefresher .shutdownNow ();
520580 }
581+ if (createdEventLoopGroup && eventLoopGroup != null && !eventLoopGroup .isShutdown ()) {
582+ eventLoopGroup .shutdownGracefully ();
583+ }
521584 } catch (IOException e ) {
522585 log .warn ("Failed to close http client" , e );
523586 }
@@ -556,4 +619,21 @@ protected void refreshSslContext() {
556619 }
557620 }
558621
622+ @ Data
623+ private static class SharedResourceHolder {
624+ static final SharedResourceHolder EMPTY = new SharedResourceHolder (null , null , false );
625+
626+ final NameResolver <InetAddress > nameResolver ;
627+ final EventLoopGroup eventLoopGroup ;
628+ final boolean createEventLoop ;
629+
630+ SharedResourceHolder (NameResolver <InetAddress > nameResolver ,
631+ EventLoopGroup eventLoopGroup ,
632+ boolean createEventLoop ) {
633+ this .nameResolver = nameResolver ;
634+ this .eventLoopGroup = eventLoopGroup ;
635+ this .createEventLoop = createEventLoop ;
636+ }
637+ }
638+
559639}
0 commit comments