Skip to content

Commit

Permalink
[Streaming Indexing] Ensure support of the new transport by security …
Browse files Browse the repository at this point in the history
…plugin (#13174)

Signed-off-by: Andriy Redko <[email protected]>
  • Loading branch information
reta authored Apr 15, 2024
1 parent a944967 commit 6afbcd4
Show file tree
Hide file tree
Showing 14 changed files with 938 additions and 51 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Tiered Caching] Add dimension-based stats to ICache implementations. ([#12531](https://github.com/opensearch-project/OpenSearch/pull/12531))
- Add changes for overriding remote store and replication settings during snapshot restore. ([#11868](https://github.com/opensearch-project/OpenSearch/pull/11868))
- Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959))
- [Streaming Indexing] Ensure support of the new transport by security plugin ([#13174](https://github.com/opensearch-project/OpenSearch/pull/13174))

### Dependencies
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.tasks.Task;
import org.opensearch.transport.NettyAllocator;
import org.opensearch.transport.netty4.ssl.TrustAllManager;

import java.io.Closeable;
import java.net.SocketAddress;
Expand Down Expand Up @@ -90,6 +89,7 @@
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.util.AttributeKey;

import static io.netty.handler.codec.http.HttpHeaderNames.HOST;
Expand Down Expand Up @@ -270,7 +270,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
final SslHandler sslHandler = new SslHandler(
SslContextBuilder.forClient()
.clientAuth(ClientAuth.NONE)
.trustManager(TrustAllManager.INSTANCE)
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build()
.newEngine(ch.alloc())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.NettyAllocator;
import org.opensearch.transport.SharedGroupFactory;
import org.opensearch.transport.netty4.ssl.TrustAllManager;
import org.junit.After;
import org.junit.Before;

Expand Down Expand Up @@ -84,6 +83,7 @@
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;

import static org.opensearch.core.rest.RestStatus.BAD_REQUEST;
import static org.opensearch.core.rest.RestStatus.OK;
Expand Down Expand Up @@ -131,7 +131,7 @@ public Optional<SSLEngine> buildSecureHttpServerEngine(Settings settings, HttpSe
keyManagerFactory.init(keyStore, "password".toCharArray());

SSLEngine engine = SslContextBuilder.forServer(keyManagerFactory)
.trustManager(TrustAllManager.INSTANCE)
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build()
.newEngine(NettyAllocator.getAllocator());
return Optional.of(engine);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@

import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
Expand Down Expand Up @@ -87,7 +88,7 @@ public Optional<SSLEngine> buildSecureServerTransportEngine(Settings settings, T

SSLEngine engine = SslContextBuilder.forServer(keyManagerFactory)
.clientAuth(ClientAuth.NONE)
.trustManager(TrustAllManager.INSTANCE)
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build()
.newEngine(NettyAllocator.getAllocator());
return Optional.of(engine);
Expand All @@ -103,7 +104,7 @@ public Optional<SSLEngine> buildSecureClientTransportEngine(Settings settings, S
return Optional.of(
SslContextBuilder.forClient()
.clientAuth(ClientAuth.NONE)
.trustManager(TrustAllManager.INSTANCE)
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build()
.newEngine(NettyAllocator.getAllocator())
);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.http.reactor.netty4;

import org.opensearch.common.Nullable;
import org.opensearch.common.network.NetworkService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
Expand All @@ -23,21 +24,33 @@
import org.opensearch.http.HttpChannel;
import org.opensearch.http.HttpReadTimeoutException;
import org.opensearch.http.HttpServerChannel;
import org.opensearch.http.reactor.netty4.ssl.SslUtils;
import org.opensearch.plugins.SecureHttpTransportSettingsProvider;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.reactor.SharedGroupFactory;
import org.opensearch.transport.reactor.netty4.Netty4Utils;

import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLSessionContext;

import java.net.InetSocketAddress;
import java.net.SocketOption;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelOption;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.ssl.ApplicationProtocolNegotiator;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.ReferenceCountUtil;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
Expand Down Expand Up @@ -116,6 +129,7 @@ public class ReactorNetty4HttpServerTransport extends AbstractHttpServerTranspor
private final ByteSizeValue maxInitialLineLength;
private final ByteSizeValue maxHeaderSize;
private final ByteSizeValue maxChunkSize;
private final SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider;
private volatile SharedGroupFactory.SharedGroup sharedGroup;
private volatile DisposableServer disposableServer;
private volatile Scheduler scheduler;
Expand All @@ -142,6 +156,45 @@ public ReactorNetty4HttpServerTransport(
ClusterSettings clusterSettings,
SharedGroupFactory sharedGroupFactory,
Tracer tracer
) {
this(
settings,
networkService,
bigArrays,
threadPool,
xContentRegistry,
dispatcher,
clusterSettings,
sharedGroupFactory,
null,
tracer
);
}

/**
* Creates new HTTP transport implementations based on Reactor Netty (see please {@link HttpServer}).
* @param settings settings
* @param networkService network service
* @param bigArrays big array allocator
* @param threadPool thread pool instance
* @param xContentRegistry XContent registry instance
* @param dispatcher dispatcher instance
* @param clusterSettings cluster settings
* @param sharedGroupFactory shared group factory
* @param secureHttpTransportSettingsProvider secure HTTP transport settings provider
* @param tracer tracer instance
*/
public ReactorNetty4HttpServerTransport(
Settings settings,
NetworkService networkService,
BigArrays bigArrays,
ThreadPool threadPool,
NamedXContentRegistry xContentRegistry,
Dispatcher dispatcher,
ClusterSettings clusterSettings,
SharedGroupFactory sharedGroupFactory,
@Nullable SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider,
Tracer tracer
) {
super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher, clusterSettings, tracer);
Netty4Utils.setAvailableProcessors(OpenSearchExecutors.NODE_PROCESSORS_SETTING.get(settings));
Expand All @@ -152,6 +205,7 @@ public ReactorNetty4HttpServerTransport(
this.maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings);
this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings);
this.maxInitialLineLength = SETTING_HTTP_MAX_INITIAL_LINE_LENGTH.get(settings);
this.secureHttpTransportSettingsProvider = secureHttpTransportSettingsProvider;
}

/**
Expand All @@ -160,7 +214,7 @@ public ReactorNetty4HttpServerTransport(
*/
@Override
protected HttpServerChannel bind(InetSocketAddress socketAddress) throws Exception {
final HttpServer server = configureChannelOptions(
final HttpServer server = configure(
HttpServer.create()
.httpFormDecoder(builder -> builder.scheduler(scheduler))
.idleTimeout(Duration.ofMillis(connectTimeoutMillis))
Expand All @@ -173,16 +227,15 @@ protected HttpServerChannel bind(InetSocketAddress socketAddress) throws Excepti
.maxHeaderSize(maxHeaderSize.bytesAsInt())
.maxInitialLineLength(maxInitialLineLength.bytesAsInt())
)
.protocol(HttpProtocol.HTTP11, HttpProtocol.H2C)
.handle((req, res) -> incomingRequest(req, res))
);

disposableServer = server.bindNow();
return new ReactorNetty4HttpServerChannel(disposableServer.channel());
}

private HttpServer configureChannelOptions(final HttpServer server1) {
HttpServer configured = server1.childOption(ChannelOption.TCP_NODELAY, SETTING_HTTP_TCP_NO_DELAY.get(settings))
private HttpServer configure(final HttpServer server) throws Exception {
HttpServer configured = server.childOption(ChannelOption.TCP_NODELAY, SETTING_HTTP_TCP_NO_DELAY.get(settings))
.childOption(ChannelOption.SO_KEEPALIVE, SETTING_HTTP_TCP_KEEP_ALIVE.get(settings));

if (SETTING_HTTP_TCP_KEEP_ALIVE.get(settings)) {
Expand Down Expand Up @@ -229,6 +282,65 @@ private HttpServer configureChannelOptions(final HttpServer server1) {
configured = configured.option(ChannelOption.SO_REUSEADDR, reuseAddress);
configured = configured.childOption(ChannelOption.SO_REUSEADDR, reuseAddress);

// Configure SSL context if available
if (secureHttpTransportSettingsProvider != null) {
final SSLEngine engine = secureHttpTransportSettingsProvider.buildSecureHttpServerEngine(settings, this)
.orElseGet(SslUtils::createDefaultServerSSLEngine);

try {
final List<String> cipherSuites = Arrays.asList(engine.getEnabledCipherSuites());
final List<String> applicationProtocols = Arrays.asList(engine.getSSLParameters().getApplicationProtocols());

configured = configured.secure(spec -> spec.sslContext(new SslContext() {
@Override
public SSLSessionContext sessionContext() {
throw new UnsupportedOperationException(); /* server only, should never be called */
}

@Override
public SSLEngine newEngine(ByteBufAllocator alloc, String peerHost, int peerPort) {
throw new UnsupportedOperationException(); /* server only, should never be called */
}

@Override
public SSLEngine newEngine(ByteBufAllocator alloc) {
try {
return secureHttpTransportSettingsProvider.buildSecureHttpServerEngine(
settings,
ReactorNetty4HttpServerTransport.this
).orElseGet(SslUtils::createDefaultServerSSLEngine);
} catch (final SSLException ex) {
throw new UnsupportedOperationException("Unable to create SSLEngine", ex);
}
}

@Override
public boolean isClient() {
return false; /* server only */
}

@Override
public List<String> cipherSuites() {
return cipherSuites;
}

@Override
public ApplicationProtocolNegotiator applicationProtocolNegotiator() {
return new ApplicationProtocolNegotiator() {
@Override
public List<String> protocols() {
return applicationProtocols;
}
};
}
}).build()).protocol(HttpProtocol.HTTP11, HttpProtocol.H2);
} finally {
ReferenceCountUtil.release(engine);
}
} else {
configured = configured.protocol(HttpProtocol.HTTP11, HttpProtocol.H2C);
}

return configured;
}

Expand Down Expand Up @@ -302,6 +414,11 @@ protected void doStart() {
}
}

/**
* Exception handler
* @param channel HTTP channel
* @param cause exception occurred
*/
@Override
public void onException(HttpChannel channel, Exception cause) {
if (cause instanceof ReadTimeoutException) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/
package org.opensearch.http.reactor.netty4.ssl;

import org.opensearch.OpenSearchSecurityException;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;

import java.security.NoSuchAlgorithmException;

/**
* Helper class for creating default SSL engines
*/
public class SslUtils {
private static final String[] DEFAULT_SSL_PROTOCOLS = { "TLSv1.3", "TLSv1.2", "TLSv1.1" };

private SslUtils() {}

/**
* Creates default server {@link SSLEngine} instance
* @return default server {@link SSLEngine} instance
*/
public static SSLEngine createDefaultServerSSLEngine() {
try {
final SSLEngine engine = SSLContext.getDefault().createSSLEngine();
engine.setEnabledProtocols(DEFAULT_SSL_PROTOCOLS);
engine.setUseClientMode(false);
return engine;
} catch (final NoSuchAlgorithmException ex) {
throw new OpenSearchSecurityException("Unable to initialize default server SSL engine", ex);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* SSL supporting utility classes
*/
package org.opensearch.http.reactor.netty4.ssl;
Loading

0 comments on commit 6afbcd4

Please sign in to comment.