From 5e66fcd3fdee53586b3449e2ec7f09c112caaee2 Mon Sep 17 00:00:00 2001 From: Mingliang Liu Date: Fri, 24 May 2024 17:36:57 -0700 Subject: [PATCH] [FLINK-35472] Improve tests for Elasticsearch 8 connector --- .../sink/Elasticsearch8AsyncSinkITCase.java | 57 +---- .../Elasticsearch8AsyncSinkSecureITCase.java | 149 ------------- .../sink/Elasticsearch8AsyncWriterITCase.java | 83 ++++--- .../sink/ElasticsearchSinkBaseITCase.java | 204 ++++++++++++++---- 4 files changed, 204 insertions(+), 289 deletions(-) delete mode 100644 flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkSecureITCase.java diff --git a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkITCase.java b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkITCase.java index 22defb14..719c66f8 100644 --- a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkITCase.java +++ b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkITCase.java @@ -27,59 +27,29 @@ import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import co.elastic.clients.elasticsearch.core.bulk.IndexOperation; -import org.apache.http.HttpHost; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.testcontainers.junit.jupiter.Testcontainers; - -import java.io.IOException; +import org.junit.jupiter.api.TestTemplate; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; /** Integration tests for {@link Elasticsearch8AsyncSink}. */ -@Testcontainers public class Elasticsearch8AsyncSinkITCase extends ElasticsearchSinkBaseITCase { private static boolean failed; @BeforeEach void setUp() { - this.client = getRestClient(); failed = false; } - @AfterEach - void shutdown() throws IOException { - if (client != null) { - client.close(); - } - } - - @Test + @TestTemplate public void testWriteToElasticsearch() throws Exception { String index = "test-write-to-elasticsearch"; try (StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1)) { - env.setRestartStrategy(RestartStrategies.noRestart()); - final Elasticsearch8AsyncSink sink = - Elasticsearch8AsyncSinkBuilder.builder() - .setMaxBatchSize(5) - .setHosts( - new HttpHost( - ES_CONTAINER.getHost(), - ES_CONTAINER.getFirstMappedPort())) - .setElementConverter( - (element, ctx) -> - new IndexOperation.Builder<>() - .index(index) - .id(element.getId()) - .document(element) - .build()) - .build(); + Elasticsearch8AsyncSink sink = getSinkForDummyData(index); env.fromElements("first", "second", "third", "fourth", "fifth") .map( @@ -90,32 +60,17 @@ public void testWriteToElasticsearch() throws Exception { env.execute(); } - assertIdsAreWritten(client, index, new String[] {"first_v1_index", "second_v1_index"}); + assertIdsAreWritten(index, new String[] {"first_v1_index", "second_v1_index"}); } - @Test + @TestTemplate public void testRecovery() throws Exception { String index = "test-recovery"; try (final StreamExecutionEnvironment env = new LocalStreamEnvironment()) { - env.enableCheckpointing(100L); - final Elasticsearch8AsyncSink sink = - Elasticsearch8AsyncSinkBuilder.builder() - .setMaxBatchSize(5) - .setHosts( - new HttpHost( - ES_CONTAINER.getHost(), - ES_CONTAINER.getFirstMappedPort())) - .setElementConverter( - (element, ctx) -> - new IndexOperation.Builder<>() - .index(index) - .id(element.getId()) - .document(element) - .build()) - .build(); + final Elasticsearch8AsyncSink sink = getSinkForDummyData(index); env.fromElements("first", "second", "third", "fourth", "fifth") .map( diff --git a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkSecureITCase.java b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkSecureITCase.java deleted file mode 100644 index ba546166..00000000 --- a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkSecureITCase.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.flink.connector.elasticsearch.sink; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; - -import co.elastic.clients.elasticsearch.core.bulk.IndexOperation; -import org.apache.http.HttpHost; -import org.apache.http.auth.AuthScope; -import org.apache.http.auth.UsernamePasswordCredentials; -import org.apache.http.client.CredentialsProvider; -import org.apache.http.impl.client.BasicCredentialsProvider; -import org.elasticsearch.client.RestClient; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; -import org.testcontainers.elasticsearch.ElasticsearchContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; - -import java.io.IOException; - -import static org.apache.flink.connector.elasticsearch.sink.ElasticsearchSinkBaseITCase.DummyData; -import static org.apache.flink.connector.elasticsearch.sink.ElasticsearchSinkBaseITCase.ELASTICSEARCH_IMAGE; -import static org.apache.flink.connector.elasticsearch.sink.ElasticsearchSinkBaseITCase.assertIdsAreWritten; - -/** Integration tests for {@link Elasticsearch8AsyncSink} against a secure Elasticsearch cluster. */ -@Testcontainers -class Elasticsearch8AsyncSinkSecureITCase { - private static final Logger LOG = - LoggerFactory.getLogger(Elasticsearch8AsyncSinkSecureITCase.class); - private static final String ES_CLUSTER_USERNAME = "elastic"; - private static final String ES_CLUSTER_PASSWORD = "s3cret"; - - @Container - private static final ElasticsearchContainer ES_CONTAINER = createSecureElasticsearchContainer(); - - private RestClient client; - - @BeforeEach - void setUp() { - this.client = getRestClient(); - } - - @AfterEach - void shutdown() throws IOException { - if (client != null) { - client.close(); - } - } - - @Test - void testWriteToSecureElasticsearch8() throws Exception { - final String index = "test-write-to-secure-elasticsearch8"; - - try (StreamExecutionEnvironment env = - StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1)) { - - env.setRestartStrategy(RestartStrategies.noRestart()); - - final Elasticsearch8AsyncSink sink = - Elasticsearch8AsyncSinkBuilder.builder() - .setMaxBatchSize(5) - .setHosts( - new HttpHost( - ES_CONTAINER.getHost(), - ES_CONTAINER.getFirstMappedPort(), - "https")) - .setElementConverter( - (element, ctx) -> - new IndexOperation.Builder<>() - .index(index) - .id(element.getId()) - .document(element) - .build()) - .setUsername(ES_CLUSTER_USERNAME) - .setPassword(ES_CLUSTER_PASSWORD) - .setSslContextSupplier(() -> ES_CONTAINER.createSslContextFromCa()) - .build(); - - env.fromElements("first", "second", "third", "fourth", "fifth") - .map( - (MapFunction) - value -> new DummyData(value + "_v1_index", value)) - .sinkTo(sink); - - env.execute(); - } - - assertIdsAreWritten(client, index, new String[] {"first_v1_index", "second_v1_index"}); - } - - static ElasticsearchContainer createSecureElasticsearchContainer() { - ElasticsearchContainer container = - new ElasticsearchContainer(ELASTICSEARCH_IMAGE) - .withPassword(ES_CLUSTER_PASSWORD) /* set password */ - .withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g") - .withLogConsumer(new Slf4jLogConsumer(LOG)); - - // Set log message based wait strategy as the default wait strategy is not aware of TLS - container - .withEnv("logger.org.elasticsearch", "INFO") - .setWaitStrategy( - new LogMessageWaitStrategy().withRegEx(".*\"message\":\"started.*")); - - return container; - } - - private RestClient getRestClient() { - final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials( - AuthScope.ANY, - new UsernamePasswordCredentials(ES_CLUSTER_USERNAME, ES_CLUSTER_PASSWORD)); - return RestClient.builder( - new HttpHost( - ES_CONTAINER.getHost(), ES_CONTAINER.getFirstMappedPort(), "https")) - .setHttpClientConfigCallback( - httpClientBuilder -> - httpClientBuilder - .setDefaultCredentialsProvider(credentialsProvider) - .setSSLContext(ES_CONTAINER.createSslContextFromCa())) - .build(); - } -} diff --git a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriterITCase.java b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriterITCase.java index ba41ad88..db6a7808 100644 --- a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriterITCase.java +++ b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriterITCase.java @@ -24,14 +24,11 @@ import org.apache.flink.connector.base.sink.writer.TestSinkInitContext; import org.apache.flink.metrics.Gauge; -import co.elastic.clients.elasticsearch.core.bulk.IndexOperation; import co.elastic.clients.elasticsearch.core.bulk.UpdateOperation; import org.apache.http.HttpHost; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.Timeout; -import org.testcontainers.junit.jupiter.Testcontainers; import java.io.IOException; import java.util.Collections; @@ -45,7 +42,6 @@ import static org.assertj.core.api.Assertions.assertThat; /** Integration tests for {@link Elasticsearch8AsyncWriter}. */ -@Testcontainers public class Elasticsearch8AsyncWriterITCase extends ElasticsearchSinkBaseITCase { private TestSinkInitContext context; @@ -56,17 +52,9 @@ public class Elasticsearch8AsyncWriterITCase extends ElasticsearchSinkBaseITCase @BeforeEach void setUp() { this.context = new TestSinkInitContext(); - this.client = getRestClient(); } - @AfterEach - void shutdown() throws IOException { - if (client != null) { - client.close(); - } - } - - @Test + @TestTemplate @Timeout(5) public void testBulkOnFlush() throws IOException, InterruptedException { String index = "test-bulk-on-flush"; @@ -78,16 +66,16 @@ public void testBulkOnFlush() throws IOException, InterruptedException { writer.write(new DummyData("test-2", "test-2"), null); writer.flush(false); - assertIdsAreWritten(client, index, new String[] {"test-1", "test-2"}); + assertIdsAreWritten(index, new String[] {"test-1", "test-2"}); writer.write(new DummyData("3", "test-3"), null); writer.flush(true); - assertIdsAreWritten(client, index, new String[] {"test-3"}); + assertIdsAreWritten(index, new String[] {"test-3"}); } } - @Test + @TestTemplate @Timeout(5) public void testBulkOnBufferTimeFlush() throws Exception { String index = "test-bulk-on-time-in-buffer"; @@ -98,21 +86,21 @@ public void testBulkOnBufferTimeFlush() throws Exception { writer.write(new DummyData("test-1", "test-1"), null); writer.flush(true); - assertIdsAreWritten(client, index, new String[] {"test-1"}); + assertIdsAreWritten(index, new String[] {"test-1"}); writer.write(new DummyData("test-2", "test-2"), null); writer.write(new DummyData("test-3", "test-3"), null); - assertIdsAreNotWritten(client, index, new String[] {"test-2", "test-3"}); + assertIdsAreNotWritten(index, new String[] {"test-2", "test-3"}); context.getTestProcessingTimeService().advance(6000L); await(); } - assertIdsAreWritten(client, index, new String[] {"test-2", "test-3"}); + assertIdsAreWritten(index, new String[] {"test-2", "test-3"}); } - @Test + @TestTemplate @Timeout(5) public void testBytesSentMetric() throws Exception { String index = "test-bytes-sent-metrics"; @@ -130,10 +118,10 @@ public void testBytesSentMetric() throws Exception { } assertThat(context.getNumBytesOutCounter().getCount()).isGreaterThan(0); - assertIdsAreWritten(client, index, new String[] {"test-1", "test-2", "test-3"}); + assertIdsAreWritten(index, new String[] {"test-1", "test-2", "test-3"}); } - @Test + @TestTemplate @Timeout(5) public void testRecordsSentMetric() throws Exception { String index = "test-records-sent-metric"; @@ -151,10 +139,10 @@ public void testRecordsSentMetric() throws Exception { } assertThat(context.getNumRecordsOutCounter().getCount()).isEqualTo(3); - assertIdsAreWritten(client, index, new String[] {"test-1", "test-2", "test-3"}); + assertIdsAreWritten(index, new String[] {"test-1", "test-2", "test-3"}); } - @Test + @TestTemplate @Timeout(5) public void testSendTimeMetric() throws Exception { String index = "test-send-time-metric"; @@ -174,10 +162,10 @@ public void testSendTimeMetric() throws Exception { assertThat(currentSendTime.get().getValue()).isGreaterThan(0L); } - assertIdsAreWritten(client, index, new String[] {"test-1", "test-2", "test-3"}); + assertIdsAreWritten(index, new String[] {"test-1", "test-2", "test-3"}); } - @Test + @TestTemplate @Timeout(5) public void testHandlePartiallyFailedBulk() throws Exception { String index = "test-partially-failed-bulk"; @@ -198,7 +186,7 @@ public void testHandlePartiallyFailedBulk() throws Exception { .build()); try (final Elasticsearch8AsyncWriter writer = - createWriter(index, maxBatchSize, elementConverter)) { + createWriter(maxBatchSize, elementConverter)) { writer.write(new DummyData("test-1", "test-1-updated"), null); writer.write(new DummyData("test-2", "test-2-updated"), null); } @@ -206,34 +194,35 @@ public void testHandlePartiallyFailedBulk() throws Exception { await(); assertThat(context.metricGroup().getNumRecordsOutErrorsCounter().getCount()).isEqualTo(1); - assertIdsAreWritten(client, index, new String[] {"test-2"}); - assertIdsAreNotWritten(client, index, new String[] {"test-1"}); - } - - private Elasticsearch8AsyncSinkBuilder.OperationConverter - getDefaultTestElementConverter(String index) { - return new Elasticsearch8AsyncSinkBuilder.OperationConverter<>( - (element, ctx) -> - new IndexOperation.Builder() - .id(element.getId()) - .document(element) - .index(index) - .build()); + assertIdsAreWritten(index, new String[] {"test-2"}); + assertIdsAreNotWritten(index, new String[] {"test-1"}); } private Elasticsearch8AsyncWriter createWriter(String index, int maxBatchSize) throws IOException { - return createWriter(index, maxBatchSize, getDefaultTestElementConverter(index)); + return createWriter( + maxBatchSize, + new Elasticsearch8AsyncSinkBuilder.OperationConverter<>( + getElementConverterForDummyData(index))); + } + + private NetworkConfig createNetworkConfig() { + final List esHost = Collections.singletonList(getHost()); + return secure + ? new NetworkConfig( + esHost, + ES_CLUSTER_USERNAME, + ES_CLUSTER_PASSWORD, + null, + () -> ES_CONTAINER_SECURE.createSslContextFromCa(), + null) + : new NetworkConfig(esHost, null, null, null, null, null); } private Elasticsearch8AsyncWriter createWriter( - String index, int maxBatchSize, Elasticsearch8AsyncSinkBuilder.OperationConverter elementConverter) throws IOException { - List esHost = - Collections.singletonList( - new HttpHost(ES_CONTAINER.getHost(), ES_CONTAINER.getFirstMappedPort())); Elasticsearch8AsyncSink sink = new Elasticsearch8AsyncSink( @@ -244,7 +233,7 @@ private Elasticsearch8AsyncWriter createWriter( 5 * 1024 * 1024, 5000, 1024 * 1024, - new NetworkConfig(esHost, null, null, null, null, null)) { + createNetworkConfig()) { @Override public StatefulSinkWriter createWriter(InitContext context) { return new Elasticsearch8AsyncWriter( diff --git a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBaseITCase.java b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBaseITCase.java index 7977cfe9..692579c2 100644 --- a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBaseITCase.java +++ b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBaseITCase.java @@ -21,86 +21,144 @@ package org.apache.flink.connector.elasticsearch.sink; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; + +import co.elastic.clients.elasticsearch.core.bulk.BulkOperationVariant; +import co.elastic.clients.elasticsearch.core.bulk.IndexOperation; import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.util.EntityUtils; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.elasticsearch.ElasticsearchContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName; import java.io.IOException; import java.time.Duration; +import java.util.Arrays; +import java.util.List; import static org.assertj.core.api.Assertions.assertThat; -/** Base Integration tests class. */ -@Testcontainers -public class ElasticsearchSinkBaseITCase { +/** + * {@link ElasticsearchSinkBaseITCase} is the base class for integration tests. + * + *

It is extended with the {@link ParameterizedTestExtension} for parameterized testing against + * secure and non-secure Elasticsearch clusters. Tests must be annotated by {@link TestTemplate} in + * order to be parameterized. + * + *

The cluster is running via test containers. In order to reuse the singleton containers by all + * inheriting test classes, we manage their lifecycle. The two containers are started only once when + * this class is loaded. At the end of the test suite the Ryuk container that is started by + * Testcontainers core will take care of stopping the singleton container. + */ +@ExtendWith(ParameterizedTestExtension.class) +public abstract class ElasticsearchSinkBaseITCase { protected static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkBaseITCase.class); - public RestClient client; - public static final String ELASTICSEARCH_VERSION = "8.12.1"; - public static final DockerImageName ELASTICSEARCH_IMAGE = DockerImageName.parse("docker.elastic.co/elasticsearch/elasticsearch") .withTag(ELASTICSEARCH_VERSION); + protected static final String ES_CLUSTER_USERNAME = "elastic"; + protected static final String ES_CLUSTER_PASSWORD = "s3cret"; + + protected static final ElasticsearchContainer ES_CONTAINER = createElasticsearchContainer(); + protected static final ElasticsearchContainer ES_CONTAINER_SECURE = + createSecureElasticsearchContainer(); + + // Use singleton test containers which are only started once for several test classes. + // There is no special support for this use case provided by the Testcontainers + // extension @Testcontainers. + static { + ES_CONTAINER.start(); + ES_CONTAINER_SECURE.start(); + } - @Container - public static final ElasticsearchContainer ES_CONTAINER = createElasticsearchContainer(); + @Parameter public boolean secure; - public static ElasticsearchContainer createElasticsearchContainer() { - try (ElasticsearchContainer container = - new ElasticsearchContainer(ELASTICSEARCH_IMAGE) - .withEnv("xpack.security.enabled", "false"); ) { - container - .withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g") - .withEnv("logger.org.elasticsearch", "ERROR") - .withLogConsumer(new Slf4jLogConsumer(LOG)); + protected RestClient client; - container.setWaitStrategy( - Wait.defaultWaitStrategy().withStartupTimeout(Duration.ofMinutes(5))); + @Parameters(name = "ES secured = {0}") + public static List secureEnabled() { + return Arrays.asList(false, true); + } - return container; - } + @BeforeEach + public void setUpBase() { + LOG.info("Setting up elasticsearch client, host: {}, secure: {}", getHost(), secure); + client = secure ? createSecureElasticsearchClient() : createElasticsearchClient(); } - public RestClient getRestClient() { - return RestClient.builder( - new HttpHost(ES_CONTAINER.getHost(), ES_CONTAINER.getFirstMappedPort())) - .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder) - .build(); + @AfterEach + public void shutdownBase() throws IOException { + client.close(); } - public static void assertIdsAreWritten(RestClient client, String index, String[] ids) - throws IOException { - client.performRequest(new Request("GET", "_refresh")); - Response response = client.performRequest(new Request("GET", index + "/_search/")); - String responseEntity = EntityUtils.toString(response.getEntity()); + /** Get the element converter for testing data type {@link DummyData}. */ + protected ElementConverter getElementConverterForDummyData( + String index) { + return (element, ctx) -> + new IndexOperation.Builder() + .id(element.getId()) + .document(element) + .index(index) + .build(); + } - assertThat(response.getStatusLine().getStatusCode()).isEqualTo(200); + /** Create an Elasticsearch8AsyncSink for DummyData type against the ES test container. */ + protected Elasticsearch8AsyncSink getSinkForDummyData(String index) { + final Elasticsearch8AsyncSinkBuilder builder = + Elasticsearch8AsyncSinkBuilder.builder() + .setHosts(getHost()) + .setMaxBatchSize(5) + .setElementConverter(getElementConverterForDummyData(index)); + + if (secure) { + builder.setUsername(ES_CLUSTER_USERNAME) + .setPassword(ES_CLUSTER_PASSWORD) + .setSslContextSupplier(() -> ES_CONTAINER_SECURE.createSslContextFromCa()); + } + return builder.build(); + } + + /** Get Elasticsearch host depending on the parameter secure. */ + protected HttpHost getHost() { + return secure + ? new HttpHost( + ES_CONTAINER_SECURE.getHost(), + ES_CONTAINER_SECURE.getFirstMappedPort(), + "https") + : new HttpHost(ES_CONTAINER.getHost(), ES_CONTAINER.getFirstMappedPort()); + } + + protected void assertIdsAreWritten(String index, String[] ids) throws IOException { + final String responseEntity = queryElasticsearchIndex(index); for (String id : ids) { - System.out.println(id); + LOG.info("Checking document id {}", id); assertThat(responseEntity).contains(id); } } - public static void assertIdsAreNotWritten(RestClient client, String index, String[] ids) - throws IOException { - client.performRequest(new Request("GET", "_refresh")); - Response response = client.performRequest(new Request("GET", index + "/_search/")); - String responseEntity = EntityUtils.toString(response.getEntity()); - - assertThat(response.getStatusLine().getStatusCode()).isEqualTo(200); - + protected void assertIdsAreNotWritten(String index, String[] ids) throws IOException { + final String responseEntity = queryElasticsearchIndex(index); for (String id : ids) { assertThat(responseEntity).doesNotContain(id); } @@ -125,4 +183,66 @@ public String getName() { return name; } } + + private static ElasticsearchContainer createElasticsearchContainer() { + final ElasticsearchContainer container = + new ElasticsearchContainer(ELASTICSEARCH_IMAGE) + .withEnv("xpack.security.enabled", "false") + .withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g") + .withEnv("logger.org.elasticsearch", "ERROR") + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + container.setWaitStrategy( + Wait.defaultWaitStrategy().withStartupTimeout(Duration.ofMinutes(5))); + + return container; + } + + private static ElasticsearchContainer createSecureElasticsearchContainer() { + ElasticsearchContainer container = + new ElasticsearchContainer(ELASTICSEARCH_IMAGE) + .withPassword(ES_CLUSTER_PASSWORD) /* set password */ + .withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g") + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + // Set log message based wait strategy as the default wait strategy is not aware of TLS + container + .withEnv("logger.org.elasticsearch", "INFO") + .setWaitStrategy( + new LogMessageWaitStrategy().withRegEx(".*\"message\":\"started.*")); + + return container; + } + + private RestClient createElasticsearchClient() { + return RestClient.builder(getHost()) + .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder) + .build(); + } + + private RestClient createSecureElasticsearchClient() { + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials( + AuthScope.ANY, + new UsernamePasswordCredentials(ES_CLUSTER_USERNAME, ES_CLUSTER_PASSWORD)); + return RestClient.builder(getHost()) + .setHttpClientConfigCallback( + httpClientBuilder -> + httpClientBuilder + .setDefaultCredentialsProvider(credentialsProvider) + .setSSLContext( + ES_CONTAINER_SECURE.createSslContextFromCa())) + .build(); + } + + private String queryElasticsearchIndex(String index) throws IOException { + client.performRequest(new Request("GET", "_refresh")); + Response response = client.performRequest(new Request("GET", index + "/_search/")); + String responseEntity = EntityUtils.toString(response.getEntity()); + LOG.debug("Got response: {}", responseEntity); + + assertThat(response.getStatusLine().getStatusCode()).isEqualTo(200); + + return responseEntity; + } }