diff --git a/flink-connector-elasticsearch-base/pom.xml b/flink-connector-elasticsearch-base/pom.xml index 8a930bfb..bb22c50f 100644 --- a/flink-connector-elasticsearch-base/pom.xml +++ b/flink-connector-elasticsearch-base/pom.xml @@ -67,6 +67,15 @@ under the License. true + + + org.apache.flink + flink-table-common + ${flink.version} + provided + true + + @@ -173,6 +182,21 @@ under the License. true + + org.apache.flink + flink-table-planner_${scala.binary.version} + ${flink.version} + test + + + + org.apache.flink + flink-table-planner_${scala.binary.version} + ${flink.version} + test-jar + test + + diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticSearchInputFormatBase.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticSearchInputFormatBase.java new file mode 100644 index 00000000..7b93cd50 --- /dev/null +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticSearchInputFormatBase.java @@ -0,0 +1,227 @@ +package org.apache.flink.streaming.connectors.elasticsearch; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.LocatableInputSplitAssigner; +import org.apache.flink.api.common.io.RichInputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputSplitAssigner; + +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchScrollRequest; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.Scroll; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.shaded.curator5.com.google.common.base.Preconditions.checkNotNull; + +@Internal +public class ElasticSearchInputFormatBase + extends RichInputFormat implements ResultTypeQueryable { + + private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchInputFormatBase.class); + private static final long serialVersionUID = 1L; + private final DeserializationSchema deserializationSchema; + private final String index; + private final String type; + + private final String[] fieldNames; + private final QueryBuilder predicate; + private final long limit; + + private final long scrollTimeout; + private final int scrollSize; + + private Scroll scroll; + private String currentScrollWindowId; + private String[] currentScrollWindowHits; + + private int nextRecordIndex = 0; + private long currentReadCount = 0L; + + /** Call bridge for different version-specific. */ + private final ElasticsearchApiCallBridge callBridge; + + private final Map userConfig; + /** Elasticsearch client created using the call bridge. */ + private transient C client; + + public ElasticSearchInputFormatBase( + ElasticsearchApiCallBridge callBridge, + Map userConfig, + DeserializationSchema deserializationSchema, + String[] fieldNames, + String index, + String type, + long scrollTimeout, + int scrollSize, + QueryBuilder predicate, + long limit) { + + this.callBridge = checkNotNull(callBridge); + checkNotNull(userConfig); + // copy config so we can remove entries without side-effects + this.userConfig = new HashMap<>(userConfig); + + this.deserializationSchema = checkNotNull(deserializationSchema); + this.index = index; + this.type = type; + + this.fieldNames = fieldNames; + this.predicate = predicate; + this.limit = limit; + + this.scrollTimeout = scrollTimeout; + this.scrollSize = scrollSize; + } + + @Override + public TypeInformation getProducedType() { + return deserializationSchema.getProducedType(); + } + + @Override + public void configure(Configuration configuration) { + + try { + client = callBridge.createClient(); + callBridge.verifyClientConnection(client); + } catch (IOException ex) { + LOG.error("Exception while creating connection to Elasticsearch.", ex); + throw new RuntimeException("Cannot create connection to Elasticsearcg.", ex); + } + } + + @Override + public BaseStatistics getStatistics(BaseStatistics baseStatistics) throws IOException { + return null; + } + + @Override + public ElasticsearchInputSplit[] createInputSplits(int minNumSplits) throws IOException { + return callBridge.createInputSplitsInternal(client, index, type, minNumSplits); + } + + @Override + public InputSplitAssigner getInputSplitAssigner(ElasticsearchInputSplit[] inputSplits) { + return new LocatableInputSplitAssigner(inputSplits); + } + + @Override + public void open(ElasticsearchInputSplit split) throws IOException { + SearchRequest searchRequest = new SearchRequest(index); + if (type == null) { + searchRequest.types(Strings.EMPTY_ARRAY); + } else { + searchRequest.types(type); + } + this.scroll = new Scroll(TimeValue.timeValueMinutes(scrollTimeout)); + searchRequest.scroll(scroll); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + int size; + if (limit > 0) { + size = (int) Math.min(limit, scrollSize); + } else { + size = scrollSize; + } + // elasticsearch default value is 10 here. + searchSourceBuilder.size(size); + searchSourceBuilder.fetchSource(fieldNames, null); + if (predicate != null) { + searchSourceBuilder.query(predicate); + } else { + searchSourceBuilder.query(QueryBuilders.matchAllQuery()); + } + searchRequest.source(searchSourceBuilder); + searchRequest.preference("_shards:" + split.getShard()); + Tuple2 searchResponse = null; + try { + + searchResponse = callBridge.search(client, searchRequest); + } catch (IOException e) { + LOG.error("Search has error: ", e.getMessage()); + } + if (searchResponse != null) { + currentScrollWindowId = searchResponse.f0; + currentScrollWindowHits = searchResponse.f1; + nextRecordIndex = 0; + } + } + + @Override + public boolean reachedEnd() throws IOException { + if (limit > 0 && currentReadCount >= limit) { + return true; + } + + // SearchResponse can be InternalSearchHits.empty(), and the InternalSearchHit[] EMPTY = new + // InternalSearchHit[0] + if (currentScrollWindowHits.length != 0 + && nextRecordIndex > currentScrollWindowHits.length - 1) { + fetchNextScrollWindow(); + } + + return currentScrollWindowHits.length == 0; + } + + @Override + public T nextRecord(T t) throws IOException { + if (reachedEnd()) { + LOG.warn("Already reached the end of the split."); + } + + String hit = currentScrollWindowHits[nextRecordIndex]; + nextRecordIndex++; + currentReadCount++; + LOG.debug("Yielding new record for hit: " + hit); + + return parseSearchHit(hit); + } + + @Override + public void close() throws IOException { + callBridge.close(client); + } + + private void fetchNextScrollWindow() { + Tuple2 searchResponse = null; + SearchScrollRequest scrollRequest = new SearchScrollRequest(currentScrollWindowId); + scrollRequest.scroll(scroll); + + try { + searchResponse = callBridge.scroll(client, scrollRequest); + } catch (IOException e) { + LOG.error("Scroll failed: " + e.getMessage()); + } + + if (searchResponse != null) { + currentScrollWindowId = searchResponse.f0; + currentScrollWindowHits = searchResponse.f1; + nextRecordIndex = 0; + } + } + + private T parseSearchHit(String hit) { + T row = null; + try { + row = deserializationSchema.deserialize(hit.getBytes()); + } catch (IOException e) { + LOG.error("Deserialize search hit failed: " + e.getMessage()); + } + + return row; + } +} diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java index aab5cf5a..8e3f6886 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchScrollRequest; import javax.annotation.Nullable; @@ -63,6 +64,13 @@ public interface ElasticsearchApiCallBridge extends Ser */ BulkProcessor.Builder createBulkProcessorBuilder(C client, BulkProcessor.Listener listener); + ElasticsearchInputSplit[] createInputSplitsInternal( + C client, String index, String type, int minNumSplits); + + Tuple2 scroll(C client, SearchScrollRequest searchScrollRequest) + throws IOException; + + /** * Executes a search using the Search API. * diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchInputSplit.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchInputSplit.java new file mode 100644 index 00000000..86e787e5 --- /dev/null +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchInputSplit.java @@ -0,0 +1,37 @@ +package org.apache.flink.streaming.connectors.elasticsearch; + +import org.apache.flink.core.io.LocatableInputSplit; + +public class ElasticsearchInputSplit extends LocatableInputSplit { + + private static final long seriaVersionUID = 1L; + + /** The name of the index to retrieve data from. */ + private final String index; + + /** It is null in flink elasticsearch connector 7+. */ + private final String type; + + /** Index will split diffirent shards when index created. */ + private final int shard; + + public ElasticsearchInputSplit( + int splitNumber, String[] hostnames, String index, String type, int shard) { + super(splitNumber, hostnames); + this.index = index; + this.type = type; + this.shard = shard; + } + + public String getIndex() { + return index; + } + + public String getType() { + return type; + } + + public int getShard() { + return shard; + } +} diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java index 04c76333..00244b6e 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java @@ -38,6 +38,8 @@ import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_INTERVAL_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.FAILURE_HANDLER_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.SCROLL_MAX_SIZE_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.SCROLL_TIMEOUT_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION; /** Accessor methods to elasticsearch options. */ @@ -150,6 +152,26 @@ public Optional getPathPrefix() { return config.getOptional(ElasticsearchConnectorOptions.CONNECTION_PATH_PREFIX); } + public Optional getScrollMaxSize() { + return config.getOptional(SCROLL_MAX_SIZE_OPTION); + } + + public Optional getScrollTimeout() { + return config.getOptional(SCROLL_TIMEOUT_OPTION).map(Duration::toMillis); + } + + public long getCacheMaxSize() { + return config.get(ElasticsearchConnectorOptions.LOOKUP_CACHE_MAX_ROWS); + } + + public Duration getCacheExpiredMs() { + return config.get(ElasticsearchConnectorOptions.LOOKUP_CACHE_TTL); + } + + public int getMaxRetryTimes() { + return config.get(ElasticsearchConnectorOptions.LOOKUP_MAX_RETRIES); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConnectorOptions.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConnectorOptions.java index 4838b035..2e0d30db 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConnectorOptions.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConnectorOptions.java @@ -77,6 +77,21 @@ public class ElasticsearchConnectorOptions { .withDescription( "Delimiter for composite keys e.g., \"$\" would result in IDs \"KEY1$KEY2$KEY3\"."); + // elasticsearch source config options + public static final ConfigOption SCROLL_MAX_SIZE_OPTION = + ConfigOptions.key("scan.scroll.max-size") + .intType() + .noDefaultValue() + .withDescription( + "Maximum number of hits to be returned with each Elasticsearch scroll request"); + + public static final ConfigOption SCROLL_TIMEOUT_OPTION = + ConfigOptions.key("scan.scroll.timeout") + .durationType() + .noDefaultValue() + .withDescription( + "Amount of time Elasticsearch will keep the search context alive for scroll requests"); + public static final ConfigOption FAILURE_HANDLER_OPTION = ConfigOptions.key("failure-handler") .stringType() @@ -152,6 +167,25 @@ public class ElasticsearchConnectorOptions { "The format must produce a valid JSON document. " + "Please refer to the documentation on formats for more details."); + // look up config options + public static final ConfigOption LOOKUP_CACHE_MAX_ROWS = + ConfigOptions.key("lookup.cache.max-rows") + .longType() + .defaultValue(-1L) + .withDescription( + "the max number of rows of lookup cache, over this value, the oldest rows will " + + "be eliminated. \"cache.max-rows\" and \"cache.ttl\" options must all be specified if any of them is " + + "specified. Cache is not enabled as default."); + public static final ConfigOption LOOKUP_CACHE_TTL = + ConfigOptions.key("lookup.cache.ttl") + .durationType() + .defaultValue(Duration.ofSeconds(10)) + .withDescription("the cache time to live."); + public static final ConfigOption LOOKUP_MAX_RETRIES = + ConfigOptions.key("lookup.max-retries") + .intType() + .defaultValue(3) + .withDescription("the max retry times if lookup database failed."); // -------------------------------------------------------------------------------------------- // Enums // -------------------------------------------------------------------------------------------- diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchLookupOptions.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchLookupOptions.java new file mode 100644 index 00000000..ee0704bb --- /dev/null +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchLookupOptions.java @@ -0,0 +1,74 @@ +package org.apache.flink.streaming.connectors.elasticsearch.table; + +import java.io.Serializable; +import java.util.Objects; + +public class ElasticsearchLookupOptions implements Serializable { + + private final long cacheMaxSize; + private final long cacheExpireMs; + private final int maxRetryTimes; + + public ElasticsearchLookupOptions(long cacheMaxSize, long cacheExpireMs, int maxRetryTimes) { + this.cacheMaxSize = cacheMaxSize; + this.cacheExpireMs = cacheExpireMs; + this.maxRetryTimes = maxRetryTimes; + } + + public long getCacheMaxSize() { + return cacheMaxSize; + } + + public long getCacheExpireMs() { + return cacheExpireMs; + } + + public int getMaxRetryTimes() { + return maxRetryTimes; + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public boolean equals(Object o) { + if (o instanceof ElasticsearchLookupOptions) { + ElasticsearchLookupOptions options = (ElasticsearchLookupOptions) o; + return Objects.equals(cacheMaxSize, options.cacheMaxSize) + && Objects.equals(cacheExpireMs, options.cacheExpireMs) + && Objects.equals(maxRetryTimes, options.maxRetryTimes); + } else { + return false; + } + } + + /** Builder of {@link ElasticsearchLookupOptions}. */ + public static class Builder { + private long cacheMaxSize = -1L; + private long cacheExpireMs = -1L; + private int maxRetryTimes = 3; + + /** optional, lookup cache max size, over this value, the old data will be eliminated. */ + public Builder setCacheMaxSize(long cacheMaxSize) { + this.cacheMaxSize = cacheMaxSize; + return this; + } + + /** optional, lookup cache expire mills, over this time, the old data will expire. */ + public Builder setCacheExpireMs(long cacheExpireMs) { + this.cacheExpireMs = cacheExpireMs; + return this; + } + + /** optional, max retry times for jdbc connector. */ + public Builder setMaxRetryTimes(int maxRetryTimes) { + this.maxRetryTimes = maxRetryTimes; + return this; + } + + public ElasticsearchLookupOptions build() { + return new ElasticsearchLookupOptions(cacheMaxSize, cacheExpireMs, maxRetryTimes); + } + } +} diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchRowDataLookupFunction.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchRowDataLookupFunction.java index 1cde2714..e4a77836 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchRowDataLookupFunction.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchRowDataLookupFunction.java @@ -23,8 +23,10 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge; import org.apache.flink.table.connector.source.LookupTableSource; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.util.DataFormatConverters; +import org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.LookupFunction; import org.apache.flink.table.types.DataType; @@ -33,6 +35,9 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.cache.Cache; +import org.elasticsearch.common.cache.CacheBuilder; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -44,6 +49,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -56,7 +62,6 @@ public class ElasticsearchRowDataLookupFunction extends private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchRowDataLookupFunction.class); - private static final long serialVersionUID = 1L; private final DeserializationSchema deserializationSchema; @@ -67,16 +72,19 @@ public class ElasticsearchRowDataLookupFunction extends private final String[] lookupKeys; private final int maxRetryTimes; // converters to convert data from internal to external in order to generate keys for the cache. - private final DataFormatConverters.DataFormatConverter[] converters; + private final DataFormatConverter[] converters; private SearchRequest searchRequest; private SearchSourceBuilder searchSourceBuilder; - + private final long cacheMaxSize; + private final long cacheExpireMs; private final ElasticsearchApiCallBridge callBridge; private transient C client; + private transient Cache> cache; public ElasticsearchRowDataLookupFunction( DeserializationSchema deserializationSchema, + ElasticsearchLookupOptions lookupOptions, int maxRetryTimes, String index, String type, @@ -86,6 +94,7 @@ public ElasticsearchRowDataLookupFunction( ElasticsearchApiCallBridge callBridge) { checkNotNull(deserializationSchema, "No DeserializationSchema supplied."); + checkNotNull(lookupOptions, "No ElasticsearchLookupOptions supplied."); checkNotNull(maxRetryTimes, "No maxRetryTimes supplied."); checkNotNull(producedNames, "No fieldNames supplied."); checkNotNull(producedTypes, "No fieldTypes supplied."); @@ -93,12 +102,15 @@ public ElasticsearchRowDataLookupFunction( checkNotNull(callBridge, "No ElasticsearchApiCallBridge supplied."); this.deserializationSchema = deserializationSchema; - this.maxRetryTimes = maxRetryTimes; + this.cacheExpireMs = lookupOptions.getCacheExpireMs(); + this.cacheMaxSize = lookupOptions.getCacheMaxSize(); + this.maxRetryTimes = lookupOptions.getMaxRetryTimes(); + this.index = index; this.type = type; this.producedNames = producedNames; this.lookupKeys = lookupKeys; - this.converters = new DataFormatConverters.DataFormatConverter[lookupKeys.length]; + this.converters = new DataFormatConverter[lookupKeys.length]; Map nameToIndex = IntStream.range(0, producedNames.length) .boxed() @@ -109,7 +121,6 @@ public ElasticsearchRowDataLookupFunction( position != null, "Lookup keys %s not selected", Arrays.toString(lookupKeys)); converters[i] = DataFormatConverters.getConverterForDataType(producedTypes[position]); } - this.callBridge = callBridge; } @@ -117,6 +128,14 @@ public ElasticsearchRowDataLookupFunction( public void open(FunctionContext context) throws Exception { this.client = callBridge.createClient(); + this.cache = + cacheMaxSize == -1 || cacheExpireMs == -1 + ? null + : CacheBuilder.>builder() + .setExpireAfterWrite(TimeValue.timeValueMillis(cacheExpireMs)) + .setMaximumWeight(cacheMaxSize) + .build(); + this.client = callBridge.createClient(); // Set searchRequest in open method in case of amount of calling in eval method when every // record comes. this.searchRequest = new SearchRequest(index); @@ -130,6 +149,7 @@ public void open(FunctionContext context) throws Exception { deserializationSchema.open(null); } + @Override public Collection lookup(RowData keyRow) { BoolQueryBuilder lookupCondition = new BoolQueryBuilder(); @@ -137,6 +157,17 @@ public Collection lookup(RowData keyRow) { lookupCondition.must( new TermQueryBuilder(lookupKeys[i], converters[i].toExternal(keyRow, i))); } + + if (cache != null) { + List cachedRows = cache.get(keyRow); + if (cachedRows != null) { + for (RowData cachedRow : cachedRows) { + collect(cachedRow); + } + return new ArrayList<>(); + } + } + searchSourceBuilder.query(lookupCondition); searchRequest.source(searchSourceBuilder); @@ -147,7 +178,7 @@ public Collection lookup(RowData keyRow) { if (searchResponse.f1.length > 0) { String[] result = searchResponse.f1; for (String s : result) { - RowData row = parseSearchResult(s); + RowData row = parseSearchHit(s); rows.add(row); } rows.trimToSize(); @@ -170,10 +201,10 @@ public Collection lookup(RowData keyRow) { return Collections.emptyList(); } - private RowData parseSearchResult(String result) { + private RowData parseSearchHit(String hit) { RowData row = null; try { - row = deserializationSchema.deserialize(result.getBytes()); + row = deserializationSchema.deserialize(hit.getBytes()); } catch (IOException e) { LOG.error("Deserialize search hit failed: " + e.getMessage()); } diff --git a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java index 1b66937c..03b59566 100644 --- a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java +++ b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java @@ -37,6 +37,7 @@ import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; import org.junit.Test; @@ -615,6 +616,18 @@ public Tuple2 search(Client client, SearchRequest searchReques @Override public void close(Client client) throws IOException {} + @Override + public ElasticsearchInputSplit[] createInputSplitsInternal( + Client client, String index, String type, int minNumSplits) { + return new ElasticsearchInputSplit[0]; + } + + @Override + public Tuple2 scroll( + Client client, SearchScrollRequest searchScrollRequest) throws IOException { + return null; + } + @Nullable @Override public Throwable extractFailureCauseFromBulkItemResponse( diff --git a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java index 71a1e40b..99a1e961 100644 --- a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java +++ b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java @@ -17,20 +17,37 @@ package org.apache.flink.streaming.connectors.elasticsearch; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.formats.common.TimestampFormat; +import org.apache.flink.formats.json.JsonRowDataDeserializationSchema; import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.runtime.utils.TestingAppendRowDataSink; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; import org.apache.flink.test.util.AbstractTestBase; import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.index.query.QueryBuilder; +import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.function.Function; +import static org.apache.flink.table.api.DataTypes.FIELD; +import static org.apache.flink.table.api.DataTypes.ROW; +import static org.apache.flink.table.api.DataTypes.STRING; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; /** * Environment preparation and suite of tests for version-specific {@link ElasticsearchSinkBase} @@ -39,9 +56,11 @@ * @param Elasticsearch client type * @param The address type to use */ -public abstract class ElasticsearchSinkTestBase +public abstract class ElasticsearchSinkTestBase extends AbstractTestBase { + protected static final String CLUSTER_NAME = "test-cluster"; + protected abstract RestHighLevelClient getClient(); /** Tests that the Elasticsearch sink works properly with json. */ @@ -77,15 +96,88 @@ private void runElasticSearchSinkTest( client.close(); } + protected void runElasticSearchInputFormatTest() throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + Map userConfig = new HashMap<>(); + userConfig.put("cluster.name", CLUSTER_NAME); + DataType dataType = ROW(FIELD("data", STRING())); + RowType schema = (RowType) dataType.getLogicalType(); + + // pass on missing field + DeserializationSchema deserializationSchema = + new JsonRowDataDeserializationSchema( + schema, + InternalTypeInfo.of(schema), + false, + false, + TimestampFormat.ISO_8601); + + ElasticSearchInputFormatBase inputFormat = + createElasticsearchInputFormat( + userConfig, + (DeserializationSchema) deserializationSchema, + null, + "elasticsearch-sink-test-json-index", + "flink-es-test-type", + 1000, + 10, + null, + 0); + + DataStream dataStream = env.createInput(inputFormat); + TestingAppendRowDataSink sink = new TestingAppendRowDataSink(InternalTypeInfo.of(schema)); + dataStream.addSink(sink); + env.execute("Elasticsearch Source Test"); + List expected = + Arrays.asList( + "+I(message #0)", + "+I(message #1)", + "+I(message #10)", + "+I(message #11)", + "+I(message #12)", + "+I(message #13)", + "+I(message #14)", + "+I(message #15)", + "+I(message #16)", + "+I(message #17)", + "+I(message #18)", + "+I(message #19)", + "+I(message #2)", + "+I(message #3)", + "+I(message #4)", + "+I(message #5)", + "+I(message #6)", + "+I(message #7)", + "+I(message #8)", + "+I(message #9)"); + List results = sink.getJavaAppendResults(); + results.sort(String::compareTo); + assertEquals(expected, results); + } + + protected abstract ElasticSearchInputFormatBase createElasticsearchInputFormat( + Map userConfig, + DeserializationSchema deserializationSchema, + String[] fieldNames, + String index, + String type, + long scrollTimeout, + int scrollMaxSize, + QueryBuilder predicate, + int limit) + throws Exception; + /** * Tests that the Elasticsearch sink fails eagerly if the provided list of addresses is {@code * null}. */ public void runNullAddressesTest() { assertThatThrownBy( - () -> - createElasticsearchSink( - 1, null, SourceSinkDataTestKit.getJsonSinkFunction("test"))) + () -> + createElasticsearchSink( + 1, null, SourceSinkDataTestKit.getJsonSinkFunction("test"))) .isInstanceOfAny(IllegalArgumentException.class, NullPointerException.class); } @@ -94,11 +186,11 @@ public void runNullAddressesTest() { */ public void runEmptyAddressesTest() { assertThatThrownBy( - () -> - createElasticsearchSink( - 1, - Collections.emptyList(), - SourceSinkDataTestKit.getJsonSinkFunction("test"))) + () -> + createElasticsearchSink( + 1, + Collections.emptyList(), + SourceSinkDataTestKit.getJsonSinkFunction("test"))) .isInstanceOf(IllegalArgumentException.class); } @@ -134,18 +226,18 @@ protected abstract ElasticsearchSinkBase, C> createElast * because the Elasticsearch Java API to do so is incompatible across different versions. */ protected abstract ElasticsearchSinkBase, C> - createElasticsearchSinkForEmbeddedNode( - int bulkFlushMaxActions, - ElasticsearchSinkFunction> elasticsearchSinkFunction) - throws Exception; + createElasticsearchSinkForEmbeddedNode( + int bulkFlushMaxActions, + ElasticsearchSinkFunction> elasticsearchSinkFunction) + throws Exception; /** * Creates a version-specific Elasticsearch sink to connect to a specific Elasticsearch node. */ protected abstract ElasticsearchSinkBase, C> - createElasticsearchSinkForNode( - int bulkFlushMaxActions, - ElasticsearchSinkFunction> elasticsearchSinkFunction, - String ipAddress) - throws Exception; + createElasticsearchSinkForNode( + int bulkFlushMaxActions, + ElasticsearchSinkFunction> elasticsearchSinkFunction, + String ipAddress) + throws Exception; } diff --git a/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java b/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java index 68bb99e2..ac86ff26 100644 --- a/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java +++ b/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java @@ -82,9 +82,9 @@ public void testInvalidElasticsearchCluster() throws Exception { @Override protected ElasticsearchSinkBase, RestHighLevelClient> createElasticsearchSink( - int bulkFlushMaxActions, - List httpHosts, - ElasticsearchSinkFunction> elasticsearchSinkFunction) { + int bulkFlushMaxActions, + List httpHosts, + ElasticsearchSinkFunction> elasticsearchSinkFunction) { ElasticsearchSink.Builder> builder = new ElasticsearchSink.Builder<>(httpHosts, elasticsearchSinkFunction); diff --git a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSource.java b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSource.java index a538122b..9e5b999e 100644 --- a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSource.java +++ b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSource.java @@ -20,63 +20,96 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.streaming.connectors.elasticsearch7.ElasticSearch7InputFormat; import org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge; import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory; -import org.apache.flink.table.connector.Projection; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.flink.table.catalog.WatermarkSpec; +import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.format.DecodingFormat; import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.InputFormatProvider; import org.apache.flink.table.connector.source.LookupTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.TableFunctionProvider; +import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; +import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown; import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; -import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider; -import org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider; -import org.apache.flink.table.connector.source.lookup.cache.LookupCache; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.expressions.ValueLiteralExpression; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.FieldsDataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.DataTypeUtils; import org.apache.flink.util.Preconditions; + import org.apache.flink.util.StringUtils; -import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.ExistsQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.query.QueryStringQueryBuilder; +import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.index.query.TermsQueryBuilder; -import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.table.utils.TableSchemaUtils.containsPhysicalColumnsOnly; +import static org.apache.flink.util.Preconditions.checkArgument; -/** - * A {@link DynamicTableSource} that describes how to create a {@link Elasticsearch7DynamicSource} - * from a logical description. - */ @Internal -public class Elasticsearch7DynamicSource implements LookupTableSource, SupportsProjectionPushDown { +public class Elasticsearch7DynamicSource + implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown, SupportsFilterPushDown, SupportsLimitPushDown { private final DecodingFormat> format; private final Elasticsearch7Configuration config; - private final int lookupMaxRetryTimes; - private final LookupCache lookupCache; - private DataType physicalRowDataType; + private final ElasticsearchLookupOptions lookupOptions; + private TableSchema physicalSchema; + private Set filterableFields; + private Long limit; + private List filterPredicates; public Elasticsearch7DynamicSource( DecodingFormat> format, Elasticsearch7Configuration config, - DataType physicalRowDataType, - int lookupMaxRetryTimes, - @Nullable LookupCache lookupCache) { + TableSchema physicalSchema, + ElasticsearchLookupOptions lookupOptions) { this.format = format; this.config = config; - this.physicalRowDataType = physicalRowDataType; - this.lookupMaxRetryTimes = lookupMaxRetryTimes; - this.lookupCache = lookupCache; + this.physicalSchema = physicalSchema; + this.lookupOptions = lookupOptions; + List fieldNameList = Arrays.asList(physicalSchema.getFieldNames()); + this.filterableFields = new HashSet(fieldNameList); } @Override - public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { + public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) { RestClientFactory restClientFactory = null; - if (config.getUsername().isPresent() + if (config.getPathPrefix().isPresent() && config.getPassword().isPresent() && !StringUtils.isNullOrWhitespaceOnly(config.getUsername().get()) && !StringUtils.isNullOrWhitespaceOnly(config.getPassword().get())) { restClientFactory = - new Elasticsearch7DynamicSink.AuthRestClientFactory( - config.getPathPrefix().orElse(null), - config.getUsername().get(), - config.getPassword().get()); + new Elasticsearch7DynamicSink.DefaultRestClientFactory( + config.getPathPrefix().get()); } else { restClientFactory = new Elasticsearch7DynamicSink.DefaultRestClientFactory( @@ -87,40 +120,70 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { new Elasticsearch7ApiCallBridge(config.getHosts(), restClientFactory); // Elasticsearch only support non-nested look up keys - String[] keyNames = new String[context.getKeys().length]; - for (int i = 0; i < keyNames.length; i++) { - int[] innerKeyArr = context.getKeys()[i]; + String[] lookupKeys = new String[lookupContext.getKeys().length]; + String[] columnNames = physicalSchema.getFieldNames(); + for (int i = 0; i < lookupKeys.length; i++) { + int[] innerKeyArr = lookupContext.getKeys()[i]; Preconditions.checkArgument( innerKeyArr.length == 1, "Elasticsearch only support non-nested look up keys"); - keyNames[i] = DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]); + lookupKeys[i] = columnNames[innerKeyArr[0]]; } + DataType[] columnDataTypes = physicalSchema.getFieldDataTypes(); - ElasticsearchRowDataLookupFunction lookupFunction = - new ElasticsearchRowDataLookupFunction<>( - this.format.createRuntimeDecoder(context, physicalRowDataType), - lookupMaxRetryTimes, + return TableFunctionProvider.of( + new ElasticsearchRowDataLookupFunction( + this.format.createRuntimeDecoder( + lookupContext, physicalSchema.toRowDataType()), + lookupOptions, + config.getMaxRetryTimes(), config.getIndex(), config.getDocumentType(), - DataType.getFieldNames(physicalRowDataType).toArray(new String[0]), - DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]), - keyNames, - elasticsearch7ApiCallBridge); - if (lookupCache != null) { - return PartialCachingLookupProvider.of(lookupFunction, lookupCache); + columnNames, + columnDataTypes, + lookupKeys, + elasticsearch7ApiCallBridge)); + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.insertOnly(); + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { + ElasticSearch7InputFormat.Builder elasticsearchInputformatBuilder = new ElasticSearch7InputFormat.Builder(); + elasticsearchInputformatBuilder.setHttpHosts(config.getHosts()); + + RestClientFactory restClientFactory = null; + if (config.getPathPrefix().isPresent()) { + restClientFactory = new Elasticsearch7DynamicSink.DefaultRestClientFactory(config.getPathPrefix().get()); } else { - return LookupFunctionProvider.of(lookupFunction); + restClientFactory = restClientBuilder -> { }; } + + elasticsearchInputformatBuilder.setRestClientFactory(restClientFactory); + elasticsearchInputformatBuilder.setDeserializationSchema(this.format.createRuntimeDecoder(runtimeProviderContext, physicalSchema.toRowDataType())); + elasticsearchInputformatBuilder.setFieldNames(physicalSchema.getFieldNames()); + elasticsearchInputformatBuilder.setIndex(config.getIndex()); + elasticsearchInputformatBuilder.setPredicate(assembleQuery(filterPredicates)); + elasticsearchInputformatBuilder.setLimit(limit.intValue()); + config.getScrollMaxSize().ifPresent(elasticsearchInputformatBuilder::setScrollMaxSize); + config.getScrollTimeout().ifPresent(elasticsearchInputformatBuilder::setScrollTimeout); + + + return InputFormatProvider.of( + elasticsearchInputformatBuilder.build() + ); } @Override public DynamicTableSource copy() { - return new Elasticsearch7DynamicSource( - format, config, physicalRowDataType, lookupMaxRetryTimes, lookupCache); + return null; } @Override public String asSummaryString() { - return "Elasticsearch7"; + return null; } @Override @@ -129,7 +192,100 @@ public boolean supportsNestedProjection() { } @Override - public void applyProjection(int[][] projectedFields, DataType type) { - this.physicalRowDataType = Projection.of(projectedFields).project(type); + public void applyProjection(int[][] projectedFields) { + this.physicalSchema = projectSchema(physicalSchema, projectedFields); + } + + @Override + public Result applyFilters(List filters) { + List acceptedFilters = new ArrayList<>(); + List remainingFilters = new ArrayList<>(); + for (ResolvedExpression expr : filters) { + if (FilterUtils.shouldPushDown(expr, filterableFields)) { + acceptedFilters.add(expr); + } else { + remainingFilters.add(expr); + } + } + this.filterPredicates = acceptedFilters; + return Result.of(acceptedFilters, remainingFilters); + } + + + @Override + public void applyLimit(long limit) { + this.limit = limit; + } + + public static TableSchema projectSchema(TableSchema tableSchema, int[][] projectedFields) { + checkArgument( + containsPhysicalColumnsOnly(tableSchema), + "Projection is only supported for physical columns."); + TableSchema.Builder builder = TableSchema.builder(); + + FieldsDataType fields = + (FieldsDataType) + DataTypeUtils.projectRow(tableSchema.toRowDataType(), projectedFields); + RowType topFields = (RowType) fields.getLogicalType(); + for (int i = 0; i < topFields.getFieldCount(); i++) { + builder.field(topFields.getFieldNames().get(i), fields.getChildren().get(i)); + } + return builder.build(); + } + + public static QueryBuilder assembleQuery(List filterPredicates) { + + BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); + for (ResolvedExpression resolvedExpression : filterPredicates) { + + if (!(resolvedExpression instanceof CallExpression)) { + continue; + } + CallExpression callExpression = (CallExpression) resolvedExpression; + FunctionDefinition functionDefinition = callExpression.getFunctionDefinition(); + ResolvedExpression valueLiteralExpression = Optional.of(callExpression.getResolvedChildren()).get().get(0); + ResolvedExpression fieldReferenceExpression = Optional.of(callExpression.getResolvedChildren()).get().get(1); + ValueLiteralExpression value = (ValueLiteralExpression) valueLiteralExpression; + FieldReferenceExpression field = (FieldReferenceExpression) fieldReferenceExpression; + if (functionDefinition.equals(BuiltInFunctionDefinitions.AND)) { + boolQueryBuilder = boolQueryBuilder.must(QueryBuilders.wildcardQuery(field.getName(), value.asSummaryString())); + } + + if (functionDefinition.equals(BuiltInFunctionDefinitions.OR)) { + boolQueryBuilder = boolQueryBuilder.should(QueryBuilders.wildcardQuery(field.getName(), value.asSummaryString())); + } + + if (functionDefinition.equals(BuiltInFunctionDefinitions.NOT)) { + boolQueryBuilder = boolQueryBuilder.mustNot(QueryBuilders.wildcardQuery(field.getName(), value.asSummaryString())); + } + + if (functionDefinition.equals(BuiltInFunctionDefinitions.LESS_THAN)) { + RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(field.getName()) + .gte(value.asSummaryString()); + boolQueryBuilder = boolQueryBuilder.must(rangeQueryBuilder); + } + + if (functionDefinition.equals(functionDefinition.equals(BuiltInFunctionDefinitions.GREATER_THAN))) { + RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(field.getName()) + .gte( value.asSummaryString()); + boolQueryBuilder = boolQueryBuilder.must(rangeQueryBuilder); + } + + if (functionDefinition.equals(BuiltInFunctionDefinitions.EQUALS)) { + TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery(field.getName(), value.asSummaryString()); + boolQueryBuilder = boolQueryBuilder.must(termQueryBuilder); + } + + if (functionDefinition.equals(BuiltInFunctionDefinitions.IF_NULL)) { + ExistsQueryBuilder existsQueryBuilder = QueryBuilders.existsQuery(field.getName()); + boolQueryBuilder = boolQueryBuilder.must(existsQueryBuilder); + } + + if (functionDefinition.equals(BuiltInFunctionDefinitions.IS_NOT_NULL)) { + ExistsQueryBuilder existsQueryBuilder = QueryBuilders.existsQuery(field.getName()); + boolQueryBuilder = boolQueryBuilder.must(existsQueryBuilder); + } + } + return boolQueryBuilder; } } diff --git a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicTableFactory.java b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicTableFactory.java index b516777d..9c977615 100644 --- a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicTableFactory.java +++ b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicTableFactory.java @@ -31,11 +31,13 @@ import org.apache.flink.table.connector.format.EncodingFormat; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.lookup.LookupOptions; import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache; import org.apache.flink.table.connector.source.lookup.cache.LookupCache; import org.apache.flink.table.data.RowData; import org.apache.flink.table.factories.DeserializationFormatFactory; +import org.apache.flink.table.factories.DeserializationFormatFactory; import org.apache.flink.table.factories.DynamicTableFactory; import org.apache.flink.table.factories.DynamicTableSinkFactory; import org.apache.flink.table.factories.DynamicTableSourceFactory; @@ -66,7 +68,12 @@ import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.HOSTS_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.LOOKUP_CACHE_MAX_ROWS; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.LOOKUP_CACHE_TTL; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.LOOKUP_MAX_RETRIES; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.SCROLL_MAX_SIZE_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.SCROLL_TIMEOUT_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION; import static org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE; import static org.apache.flink.table.connector.source.lookup.LookupOptions.MAX_RETRIES; @@ -86,6 +93,8 @@ public class Elasticsearch7DynamicTableFactory Stream.of(HOSTS_OPTION, INDEX_OPTION).collect(Collectors.toSet()); private static final Set> optionalOptions = Stream.of( + SCROLL_MAX_SIZE_OPTION, + SCROLL_TIMEOUT_OPTION, KEY_DELIMITER_OPTION, FAILURE_HANDLER_OPTION, FLUSH_ON_CHECKPOINT_OPTION, @@ -97,6 +106,9 @@ public class Elasticsearch7DynamicTableFactory BULK_FLUSH_BACKOFF_DELAY_OPTION, CONNECTION_PATH_PREFIX, FORMAT_OPTION, + LOOKUP_CACHE_MAX_ROWS, + LOOKUP_CACHE_TTL, + LOOKUP_MAX_RETRIES, PASSWORD_OPTION, USERNAME_OPTION, CACHE_TYPE, @@ -109,7 +121,7 @@ public class Elasticsearch7DynamicTableFactory @Override public DynamicTableSource createDynamicTableSource(Context context) { - DataType physicalRowDataType = context.getPhysicalRowDataType(); + TableSchema schema = context.getCatalogTable().getSchema(); final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); final ReadableConfig options = helper.getOptions(); @@ -129,9 +141,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { return new Elasticsearch7DynamicSource( format, config, - physicalRowDataType, - options.get(MAX_RETRIES), - getLookupCache(options)); + TableSchemaUtils.getPhysicalSchema(schema), + new ElasticsearchLookupOptions.Builder().setMaxRetryTimes(options.get(MAX_RETRIES)).build()); } @Override @@ -160,17 +171,6 @@ public DynamicTableSink createDynamicTableSink(Context context) { getLocalTimeZoneId(context.getConfiguration())); } - @Nullable - private LookupCache getLookupCache(ReadableConfig tableOptions) { - LookupCache cache = null; - if (tableOptions - .get(LookupOptions.CACHE_TYPE) - .equals(LookupOptions.LookupCacheType.PARTIAL)) { - cache = DefaultLookupCache.fromConfig(tableOptions); - } - return cache; - } - ZoneId getLocalTimeZoneId(ReadableConfig readableConfig) { final String zone = readableConfig.get(TableConfigOptions.LOCAL_TIME_ZONE); final ZoneId zoneId = diff --git a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/FilterUtils.java b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/FilterUtils.java new file mode 100644 index 00000000..c3221b9b --- /dev/null +++ b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/FilterUtils.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.table; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.expressions.ValueLiteralExpression; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.util.Preconditions; + +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; + +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.LOWER; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.UPPER; + +/** Utils for source to filter partition or row. */ +public class FilterUtils { + + public static boolean shouldPushDown(ResolvedExpression expr, Set filterableFields) { + if (expr instanceof CallExpression && expr.getChildren().size() == 2) { + return shouldPushDownUnaryExpression( + expr.getResolvedChildren().get(0), filterableFields) + && shouldPushDownUnaryExpression( + expr.getResolvedChildren().get(1), filterableFields); + } + return false; + } + + private static boolean shouldPushDownUnaryExpression( + ResolvedExpression expr, Set filterableFields) { + // validate that type is comparable + if (!isComparable(expr.getOutputDataType().getConversionClass())) { + return false; + } + if (expr instanceof FieldReferenceExpression) { + if (filterableFields.contains(((FieldReferenceExpression) expr).getName())) { + return true; + } + } + + if (expr instanceof ValueLiteralExpression) { + return true; + } + + if (expr instanceof CallExpression && expr.getChildren().size() == 1) { + if (((CallExpression) expr).getFunctionDefinition().equals(UPPER) + || ((CallExpression) expr).getFunctionDefinition().equals(LOWER)) { + return shouldPushDownUnaryExpression( + expr.getResolvedChildren().get(0), filterableFields); + } + } + // other resolved expressions return false + return false; + } + + private static boolean isComparable(Class clazz) { + return Comparable.class.isAssignableFrom(clazz); + } +} diff --git a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch7/ElasticSearch7InputFormat.java b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch7/ElasticSearch7InputFormat.java new file mode 100644 index 00000000..56ee7d25 --- /dev/null +++ b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch7/ElasticSearch7InputFormat.java @@ -0,0 +1,162 @@ +package org.apache.flink.streaming.connectors.elasticsearch7; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticSearchInputFormatBase; +import org.apache.flink.util.Preconditions; + +import org.apache.http.HttpHost; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.index.query.QueryBuilder; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@PublicEvolving +public class ElasticSearch7InputFormat + extends ElasticSearchInputFormatBase { + + private static final long serialVersionUID = 1L; + + public ElasticSearch7InputFormat( + Map userConfig, + List httpHosts, + RestClientFactory restClientFactory, + DeserializationSchema serializationSchema, + String[] fieldNames, + String index, + long scrollTimeout, + int scrollSize, + QueryBuilder predicate, + int limit) { + + super( + new Elasticsearch7ApiCallBridge(httpHosts, restClientFactory), + userConfig, + serializationSchema, + fieldNames, + index, + null, + scrollTimeout, + scrollSize, + predicate, + limit); + } + + /** + * A builder for creating an {@link ElasticSearch7InputFormat}. + * + * @param Type of the elements. + */ + @PublicEvolving + public static class Builder { + private Map userConfig = new HashMap<>(); + private List httpHosts; + private RestClientFactory restClientFactory = restClientBuilder -> {}; + private DeserializationSchema deserializationSchema; + private String index; + + private long scrollTimeout; + private int scrollMaxSize; + + private String[] fieldNames; + private QueryBuilder predicate; + private int limit; + + public Builder() {} + + /** + * Sets HttpHost which the RestHighLevelClient connects to. + * + * @param httpHosts The list of {@link HttpHost} to which the {@link RestHighLevelClient} + * connects to. + */ + public Builder setHttpHosts(List httpHosts) { + this.httpHosts = httpHosts; + return this; + } + + /** + * Sets a REST client factory for custom client configuration. + * + * @param restClientFactory the factory that configures the rest client. + */ + public Builder setRestClientFactory(RestClientFactory restClientFactory) { + this.restClientFactory = Preconditions.checkNotNull(restClientFactory); + return this; + } + + public Builder setDeserializationSchema(DeserializationSchema deserializationSchema) { + this.deserializationSchema = deserializationSchema; + return this; + } + + public Builder setIndex(String index) { + this.index = index; + return this; + } + + /** + * Sets the maxinum number of each Elasticsearch scroll request. + * + * @param scrollMaxSize the maxinum number of each Elasticsearch scroll request. + */ + public Builder setScrollMaxSize(int scrollMaxSize) { + Preconditions.checkArgument( + scrollMaxSize > 0, + "Maximum number each Elasticsearch scroll request must be larger than 0."); + + this.scrollMaxSize = scrollMaxSize; + return this; + } + + /** + * Sets the search context alive for scroll requests, in milliseconds. + * + * @param scrollTimeout the search context alive for scroll requests, in milliseconds. + */ + public Builder setScrollTimeout(long scrollTimeout) { + Preconditions.checkArgument( + scrollTimeout >= 0, + "Yhe search context alive for scroll requests must be larger than or equal to 0."); + + this.scrollTimeout = scrollTimeout; + return this; + } + + public Builder setFieldNames(String[] fieldNames) { + this.fieldNames = fieldNames; + return this; + } + + public Builder setPredicate(QueryBuilder predicate) { + this.predicate = predicate; + return this; + } + + public Builder setLimit(int limit) { + this.limit = limit; + return this; + } + + /** + * Creates the ElasticSearch7RowDataInputFormat. + * + * @return the created ElasticSearch7RowDataInputFormat. + */ + public ElasticSearch7InputFormat build() { + return new ElasticSearch7InputFormat( + userConfig, + httpHosts, + restClientFactory, + deserializationSchema, + fieldNames, + index, + scrollTimeout, + scrollMaxSize, + predicate, + limit); + } + } +} diff --git a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch7/Elasticsearch7ApiCallBridge.java b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch7/Elasticsearch7ApiCallBridge.java index e0b173c2..f9b31131 100644 --- a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch7/Elasticsearch7ApiCallBridge.java +++ b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch7/Elasticsearch7ApiCallBridge.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchInputSplit; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.util.Preconditions; @@ -30,6 +31,7 @@ import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; @@ -88,6 +90,22 @@ public BulkProcessor.Builder createBulkProcessorBuilder( listener); } + @Override + public ElasticsearchInputSplit[] createInputSplitsInternal( + RestHighLevelClient client, + String index, + String type, + int minNumSplits) { + return new ElasticsearchInputSplit[0]; + } + + @Override + public Tuple2 scroll( + RestHighLevelClient client, + SearchScrollRequest searchScrollRequest) throws IOException { + return null; + } + @Override public Tuple2 search(RestHighLevelClient client, SearchRequest searchRequest) throws IOException { diff --git a/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java b/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java index 117782a6..4b273016 100644 --- a/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java +++ b/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java @@ -38,19 +38,13 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.types.RowKind; -import org.apache.flink.util.TestLogger; -import org.apache.http.HttpHost; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.search.SearchHits; -import org.junit.ClassRule; import org.junit.Test; -import org.testcontainers.elasticsearch.ElasticsearchContainer; -import org.testcontainers.utility.DockerImageName; import java.time.Duration; import java.time.LocalDate; @@ -63,22 +57,13 @@ import java.util.Map; import java.util.Optional; +import static org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7DynamicTableTestBase.elasticsearchContainer; import static org.apache.flink.streaming.connectors.elasticsearch.table.TestContext.context; import static org.apache.flink.table.api.Expressions.row; import static org.assertj.core.api.Assertions.assertThat; /** IT tests for {@link Elasticsearch7DynamicSink}. */ -public class Elasticsearch7DynamicSinkITCase extends TestLogger { - - @ClassRule - public static ElasticsearchContainer elasticsearchContainer = - new ElasticsearchContainer(DockerImageName.parse(DockerImageVersions.ELASTICSEARCH_7)); - - @SuppressWarnings("deprecation") - protected final RestHighLevelClient getClient() { - return new RestHighLevelClient( - RestClient.builder(HttpHost.create(elasticsearchContainer.getHttpHostAddress()))); - } +public class Elasticsearch7DynamicSinkITCase extends Elasticsearch7DynamicTableTestBase { @Test public void testWritingDocuments() throws Exception { @@ -107,11 +92,11 @@ public void testWritingDocuments() throws Exception { LocalDateTime.parse("2012-12-12T12:12:12"))); String index = "writing-documents"; - Elasticsearch7DynamicTableFactory factory = new Elasticsearch7DynamicTableFactory(); + Elasticsearch7DynamicTableFactory sinkFactory = new Elasticsearch7DynamicTableFactory(); SinkFunctionProvider sinkRuntimeProvider = (SinkFunctionProvider) - factory.createDynamicTableSink( + sinkFactory.createDynamicTableSink( context() .withSchema(schema) .withOption( diff --git a/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSourceITCase.java b/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSourceITCase.java new file mode 100644 index 00000000..e6d915d2 --- /dev/null +++ b/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSourceITCase.java @@ -0,0 +1,89 @@ +package org.apache.flink.streaming.connectors.elasticsearch.table; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; + +import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; + +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class Elasticsearch7DynamicSourceITCase extends Elasticsearch7DynamicTableTestBase { + + private final String scanKeywordIndex = "scan-keyword-index"; + private final String scanKeywordType = "scan-keyword-type"; + + @Before + public void before() throws IOException, ExecutionException, InterruptedException { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + assertTrue(createIndex(getClient(), scanKeywordIndex, scanKeywordType, "keyword")); + insertData(tEnv, scanKeywordIndex, scanKeywordType); + } + + @Test + public void testElasticsearchSource() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + tEnv.executeSql( + "CREATE TABLE esTableSource (" + + "a BIGINT NOT NULL,\n" + + "b STRING NOT NULL,\n" + + "c FLOAT,\n" + + "d TINYINT NOT NULL,\n" + + "e TIME,\n" + + "f DATE,\n" + + "g TIMESTAMP NOT NULL,\n" + + "h as a + 2,\n" + + "PRIMARY KEY (c, d) NOT ENFORCED\n" + + ")\n" + + "WITH (\n" + + String.format("'%s'='%s',\n", "connector", "elasticsearch-7") + + String.format( + "'%s'='%s',\n", + ElasticsearchConnectorOptions.INDEX_OPTION.key(), scanKeywordIndex) + + String.format( + "'%s'='%s',\n", + ElasticsearchConnectorOptions.HOSTS_OPTION.key(), + "http://127.0.0.1:9200") + + String.format( + "'%s'='%s',\n", + ElasticsearchConnectorOptions.SCROLL_MAX_SIZE_OPTION.key(), 10) + + String.format( + "'%s'='%s'\n", + ElasticsearchConnectorOptions.SCROLL_TIMEOUT_OPTION.key(), 1000) + + ")"); + + Iterator collected = + tEnv.executeSql("SELECT a, b, c, d, e, f, g, h FROM esTableSource").collect(); + List result = + Lists.newArrayList(collected).stream() + .map(Row::toString) + .sorted() + .collect(Collectors.toList()); + + List expected = + Stream.of( + "1,A B,12.1,2,00:00:12,2003-10-20,2012-12-12T12:12:12,3", + "1,A,12.11,2,00:00:12,2003-10-20,2012-12-12T12:12:12,3", + "1,A,12.12,2,00:00:12,2003-10-20,2012-12-12T12:12:12,3", + "2,B,12.13,3,00:00:12,2003-10-21,2012-12-12T12:12:13,4", + "3,C,12.14,4,00:00:12,2003-10-22,2012-12-12T12:12:14,5", + "4,D,12.15,5,00:00:12,2003-10-23,2012-12-12T12:12:15,6", + "5,E,12.16,6,00:00:12,2003-10-24,2012-12-12T12:12:16,7") + .sorted() + .collect(Collectors.toList()); + assertEquals(expected, result); + } +} diff --git a/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicTableFactoryTest.java b/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicTableFactoryTest.java index f83286f1..47fbc42f 100644 --- a/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicTableFactoryTest.java +++ b/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicTableFactoryTest.java @@ -41,7 +41,7 @@ public class Elasticsearch7DynamicTableFactoryTest extends TestLogger { @Test public void validateEmptyConfiguration() { - Elasticsearch7DynamicTableFactory factory = new Elasticsearch7DynamicTableFactory(); + Elasticsearch7DynamicTableFactory sinkFactory = new Elasticsearch7DynamicTableFactory(); thrown.expect(ValidationException.class); thrown.expectMessage( @@ -51,16 +51,16 @@ public void validateEmptyConfiguration() { + "\n" + "hosts\n" + "index"); - factory.createDynamicTableSink(context().build()); + sinkFactory.createDynamicTableSink(context().build()); } @Test public void validateWrongIndex() { - Elasticsearch7DynamicTableFactory factory = new Elasticsearch7DynamicTableFactory(); + Elasticsearch7DynamicTableFactory sinkFactory = new Elasticsearch7DynamicTableFactory(); thrown.expect(ValidationException.class); thrown.expectMessage("'index' must not be empty"); - factory.createDynamicTableSink( + sinkFactory.createDynamicTableSink( context() .withOption("index", "") .withOption("hosts", "http://localhost:12345") @@ -69,23 +69,108 @@ public void validateWrongIndex() { @Test public void validateWrongHosts() { - Elasticsearch7DynamicTableFactory factory = new Elasticsearch7DynamicTableFactory(); + Elasticsearch7DynamicTableFactory sinkFactory = new Elasticsearch7DynamicTableFactory(); thrown.expect(ValidationException.class); thrown.expectMessage( "Could not parse host 'wrong-host' in option 'hosts'. It should follow the format 'http://host_name:port'."); - factory.createDynamicTableSink( + sinkFactory.createDynamicTableSink( context().withOption("index", "MyIndex").withOption("hosts", "wrong-host").build()); } + @Test + public void validateWrongScrollMaxSize() { + Elasticsearch7DynamicTableFactory tableFactory = new Elasticsearch7DynamicTableFactory(); + + thrown.expect(ValidationException.class); + thrown.expectMessage("'scan.scroll.max-size' must be at least 1. Got: 0"); + tableFactory.createDynamicTableSource( + context() + .withSchema(ResolvedSchema.of(Column.physical("a", DataTypes.TIME()))) + .withOption(ElasticsearchConnectorOptions.INDEX_OPTION.key(), "MyIndex") + .withOption( + ElasticsearchConnectorOptions.HOSTS_OPTION.key(), + "http://localhost:1234") + .withOption(ElasticsearchConnectorOptions.SCROLL_MAX_SIZE_OPTION.key(), "0") + .build()); + } + + @Test + public void validateWrongScrollTimeout() { + Elasticsearch7DynamicTableFactory tableFactory = new Elasticsearch7DynamicTableFactory(); + + thrown.expect(ValidationException.class); + thrown.expectMessage("'scan.scroll.timeout' must be at least 1. Got: 0"); + tableFactory.createDynamicTableSource( + context() + .withSchema(ResolvedSchema.of(Column.physical("a", DataTypes.TIME()))) + .withOption(ElasticsearchConnectorOptions.INDEX_OPTION.key(), "MyIndex") + .withOption( + ElasticsearchConnectorOptions.HOSTS_OPTION.key(), + "http://localhost:1234") + .withOption(ElasticsearchConnectorOptions.SCROLL_TIMEOUT_OPTION.key(), "0") + .build()); + } + + @Test + public void validateWrongCacheMaxSize() { + Elasticsearch7DynamicTableFactory tableFactory = new Elasticsearch7DynamicTableFactory(); + + thrown.expect(ValidationException.class); + thrown.expectMessage("'lookup.cache.max-rows' must be at least 1. Got: 0"); + tableFactory.createDynamicTableSource( + context() + .withSchema(ResolvedSchema.of(Column.physical("a", DataTypes.TIME()))) + .withOption(ElasticsearchConnectorOptions.INDEX_OPTION.key(), "MyIndex") + .withOption( + ElasticsearchConnectorOptions.HOSTS_OPTION.key(), + "http://localhost:1234") + .withOption(ElasticsearchConnectorOptions.LOOKUP_CACHE_MAX_ROWS.key(), "0") + .build()); + } + + @Test + public void validateWrongCacheTTL() { + Elasticsearch7DynamicTableFactory tableFactory = new Elasticsearch7DynamicTableFactory(); + + thrown.expect(ValidationException.class); + thrown.expectMessage("'lookup.cache.ttl' must be at least 1. Got: 0"); + tableFactory.createDynamicTableSource( + context() + .withSchema(ResolvedSchema.of(Column.physical("a", DataTypes.TIME()))) + .withOption(ElasticsearchConnectorOptions.INDEX_OPTION.key(), "MyIndex") + .withOption( + ElasticsearchConnectorOptions.HOSTS_OPTION.key(), + "http://localhost:1234") + .withOption(ElasticsearchConnectorOptions.LOOKUP_CACHE_TTL.key(), "0") + .build()); + } + + @Test + public void validateWrongMaxRetries() { + Elasticsearch7DynamicTableFactory tableFactory = new Elasticsearch7DynamicTableFactory(); + + thrown.expect(ValidationException.class); + thrown.expectMessage("'lookup.max-retries' must be at least 1. Got: 0"); + tableFactory.createDynamicTableSource( + context() + .withSchema(ResolvedSchema.of(Column.physical("a", DataTypes.TIME()))) + .withOption(ElasticsearchConnectorOptions.INDEX_OPTION.key(), "MyIndex") + .withOption( + ElasticsearchConnectorOptions.HOSTS_OPTION.key(), + "http://localhost:1234") + .withOption(ElasticsearchConnectorOptions.LOOKUP_MAX_RETRIES.key(), "0") + .build()); + } + @Test public void validateWrongFlushSize() { - Elasticsearch7DynamicTableFactory factory = new Elasticsearch7DynamicTableFactory(); + Elasticsearch7DynamicTableFactory sinkFactory = new Elasticsearch7DynamicTableFactory(); thrown.expect(ValidationException.class); thrown.expectMessage( "'sink.bulk-flush.max-size' must be in MB granularity. Got: 1024 bytes"); - factory.createDynamicTableSink( + sinkFactory.createDynamicTableSink( context() .withOption(ElasticsearchConnectorOptions.INDEX_OPTION.key(), "MyIndex") .withOption( @@ -99,11 +184,11 @@ public void validateWrongFlushSize() { @Test public void validateWrongRetries() { - Elasticsearch7DynamicTableFactory factory = new Elasticsearch7DynamicTableFactory(); + Elasticsearch7DynamicTableFactory sinkFactory = new Elasticsearch7DynamicTableFactory(); thrown.expect(ValidationException.class); thrown.expectMessage("'sink.bulk-flush.backoff.max-retries' must be at least 1. Got: 0"); - factory.createDynamicTableSink( + sinkFactory.createDynamicTableSink( context() .withOption(ElasticsearchConnectorOptions.INDEX_OPTION.key(), "MyIndex") .withOption( @@ -118,11 +203,11 @@ public void validateWrongRetries() { @Test public void validateWrongMaxActions() { - Elasticsearch7DynamicTableFactory factory = new Elasticsearch7DynamicTableFactory(); + Elasticsearch7DynamicTableFactory sinkFactory = new Elasticsearch7DynamicTableFactory(); thrown.expect(ValidationException.class); thrown.expectMessage("'sink.bulk-flush.max-actions' must be at least 1. Got: -2"); - factory.createDynamicTableSink( + sinkFactory.createDynamicTableSink( context() .withOption(ElasticsearchConnectorOptions.INDEX_OPTION.key(), "MyIndex") .withOption( @@ -136,11 +221,11 @@ public void validateWrongMaxActions() { @Test public void validateWrongBackoffDelay() { - Elasticsearch7DynamicTableFactory factory = new Elasticsearch7DynamicTableFactory(); + Elasticsearch7DynamicTableFactory sinkFactory = new Elasticsearch7DynamicTableFactory(); thrown.expect(ValidationException.class); thrown.expectMessage("Invalid value for option 'sink.bulk-flush.backoff.delay'."); - factory.createDynamicTableSink( + sinkFactory.createDynamicTableSink( context() .withOption(ElasticsearchConnectorOptions.INDEX_OPTION.key(), "MyIndex") .withOption( @@ -154,7 +239,7 @@ public void validateWrongBackoffDelay() { @Test public void validatePrimaryKeyOnIllegalColumn() { - Elasticsearch7DynamicTableFactory factory = new Elasticsearch7DynamicTableFactory(); + Elasticsearch7DynamicTableFactory sinkFactory = new Elasticsearch7DynamicTableFactory(); thrown.expect(ValidationException.class); thrown.expectMessage( @@ -162,7 +247,7 @@ public void validatePrimaryKeyOnIllegalColumn() { + "[ARRAY, MAP, MULTISET, ROW, RAW, VARBINARY].\n" + " Elasticsearch sink does not support primary keys on columns of types: " + "[ARRAY, MAP, MULTISET, STRUCTURED_TYPE, ROW, RAW, BINARY, VARBINARY]."); - factory.createDynamicTableSink( + sinkFactory.createDynamicTableSink( context() .withSchema( new ResolvedSchema( @@ -216,12 +301,12 @@ public void validatePrimaryKeyOnIllegalColumn() { @Test public void validateWrongCredential() { - Elasticsearch7DynamicTableFactory factory = new Elasticsearch7DynamicTableFactory(); + Elasticsearch7DynamicTableFactory sinkFactory = new Elasticsearch7DynamicTableFactory(); thrown.expect(ValidationException.class); thrown.expectMessage( "'username' and 'password' must be set at the same time. Got: username 'username' and password ''"); - factory.createDynamicTableSink( + sinkFactory.createDynamicTableSink( context() .withOption(ElasticsearchConnectorOptions.INDEX_OPTION.key(), "MyIndex") .withOption( diff --git a/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicTableTestBase.java b/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicTableTestBase.java new file mode 100644 index 00000000..e6f7948c --- /dev/null +++ b/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicTableTestBase.java @@ -0,0 +1,171 @@ +package org.apache.flink.streaming.connectors.elasticsearch.table; + +import org.apache.flink.connector.elasticsearch.test.DockerImageVersions; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.test.util.AbstractTestBase; + +import org.apache.http.HttpHost; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.indices.CreateIndexRequest; +import org.elasticsearch.client.indices.CreateIndexResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.junit.ClassRule; +import org.testcontainers.elasticsearch.ElasticsearchContainer; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.Random; +import java.util.concurrent.ExecutionException; + +import static org.apache.flink.table.api.Expressions.row; + +public class Elasticsearch7DynamicTableTestBase extends AbstractTestBase { + + @ClassRule + public static ElasticsearchContainer elasticsearchContainer = + new ElasticsearchContainer(DockerImageName.parse(DockerImageVersions.ELASTICSEARCH_7)); + + public final RestHighLevelClient getClient() { + return new RestHighLevelClient( + RestClient.builder(HttpHost.create(elasticsearchContainer.getHttpHostAddress()))); + } + + public boolean createIndex( + RestHighLevelClient client, String index, String type, String stringType) + throws IOException { + // create index + CreateIndexRequest request = new CreateIndexRequest(index); + request.settings( + Settings.builder() + .put("index.number_of_shards", 3) + .put("index.number_of_replicas", 0)); + + // set the string field if 'b' to keyword or text + /** + * request.mapping( type, "{\n" + " \"properties\": {\n" + " \"b\": {\n" + " \"type\": + * \"text\"\n" + " }\n" + " }\n" + "}", XContentType.JSON ); + */ + XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.startObject(); + { + builder.startObject("properties"); + { + builder.startObject("b"); + { + builder.field("type", stringType); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + request.mapping(builder); + CreateIndexResponse createIndexResponse = + client.indices().create(request, RequestOptions.DEFAULT); + return createIndexResponse.isAcknowledged(); + } + + public void insertData(StreamTableEnvironment tEnv, String index, String type) + throws ExecutionException, InterruptedException { + String sinkTable = "esTable" + Math.abs(new Random().nextInt()); + + tEnv.executeSql( + "CREATE TABLE " + + sinkTable + + "(" + + "a BIGINT NOT NULL,\n" + + "b STRING NOT NULL,\n" + + "c FLOAT,\n" + + "d TINYINT NOT NULL,\n" + + "e TIME,\n" + + "f DATE,\n" + + "g TIMESTAMP NOT NULL,\n" + + "h as a + 2,\n" + + "PRIMARY KEY (c, d) NOT ENFORCED\n" + + ")\n" + + "WITH (\n" + + String.format("'%s'='%s',\n", "connector", "elasticsearch-7") + + String.format( + "'%s'='%s',\n", + ElasticsearchConnectorOptions.INDEX_OPTION.key(), index) + + String.format( + "'%s'='%s',\n", + ElasticsearchConnectorOptions.HOSTS_OPTION.key(), + "http://127.0.0.1:9200") + + String.format( + "'%s'='%s'\n", + ElasticsearchConnectorOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), + "false") + + ")"); + + tEnv.fromValues( + row( + 1L, + "A B", + 12.10f, + (byte) 2, + LocalTime.ofNanoOfDay(12345L * 1_000_000L), + LocalDate.ofEpochDay(12345), + LocalDateTime.parse("2012-12-12T12:12:12")), + row( + 1L, + "A", + 12.11f, + (byte) 2, + LocalTime.ofNanoOfDay(12345L * 1_000_000L), + LocalDate.ofEpochDay(12345), + LocalDateTime.parse("2012-12-12T12:12:12")), + row( + 1L, + "A", + 12.12f, + (byte) 2, + LocalTime.ofNanoOfDay(12345L * 1_000_000L), + LocalDate.ofEpochDay(12345), + LocalDateTime.parse("2012-12-12T12:12:12")), + row( + 2L, + "B", + 12.13f, + (byte) 3, + LocalTime.ofNanoOfDay(12346L * 1_000_000L), + LocalDate.ofEpochDay(12346), + LocalDateTime.parse("2012-12-12T12:12:13")), + row( + 3L, + "C", + 12.14f, + (byte) 4, + LocalTime.ofNanoOfDay(12347L * 1_000_000L), + LocalDate.ofEpochDay(12347), + LocalDateTime.parse("2012-12-12T12:12:14")), + row( + 4L, + "D", + 12.15f, + (byte) 5, + LocalTime.ofNanoOfDay(12348L * 1_000_000L), + LocalDate.ofEpochDay(12348), + LocalDateTime.parse("2012-12-12T12:12:15")), + row( + 5L, + "E", + 12.16f, + (byte) 6, + LocalTime.ofNanoOfDay(12349L * 1_000_000L), + LocalDate.ofEpochDay(12349), + LocalDateTime.parse("2012-12-12T12:12:16"))) + .executeInsert(sinkTable) + .getJobClient() + .get() + .getJobExecutionResult() + .get(); + } +} diff --git a/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch7/ElasticsearchSinkITCase.java b/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch7/ElasticsearchSinkITCase.java index 55ff8ed5..b486f03e 100644 --- a/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch7/ElasticsearchSinkITCase.java +++ b/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch7/ElasticsearchSinkITCase.java @@ -18,9 +18,11 @@ package org.apache.flink.streaming.connectors.elasticsearch7; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.elasticsearch.ElasticsearchUtil; import org.apache.flink.connector.elasticsearch.test.DockerImageVersions; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticSearchInputFormatBase; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase; @@ -28,18 +30,22 @@ import org.apache.http.HttpHost; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.index.query.QueryBuilder; import org.junit.ClassRule; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.elasticsearch.ElasticsearchContainer; +import java.net.InetAddress; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; +import java.util.Map; /** IT cases for the {@link ElasticsearchSink}. */ -public class ElasticsearchSinkITCase - extends ElasticsearchSinkTestBase { +public class ElasticsearchSinkITCase + extends ElasticsearchSinkTestBase { private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkITCase.class); @@ -64,6 +70,12 @@ public void testElasticsearchSinkWithSmile() throws Exception { runElasticsearchSinkSmileTest(); } + @Test + public void testElasticsearchInputFormat() throws Exception { + runElasticsearchSinkTest(); + runElasticSearchInputFormatTest(); + } + @Test public void testNullAddresses() { runNullAddressesTest(); @@ -93,6 +105,35 @@ public void testInvalidElasticsearchCluster() throws Exception { return builder.build(); } + + @Override + protected ElasticSearchInputFormatBase createElasticsearchInputFormat( + Map userConfig, + DeserializationSchema deserializationSchema, + String[] fieldNames, + String index, + String type, + long scrollTimeout, + int scrollMaxSize, + QueryBuilder predicate, + int limit) + throws Exception { + List transports = new ArrayList<>(); + transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); + + ElasticSearch7InputFormat builder = + new ElasticSearch7InputFormat.Builder() + .setDeserializationSchema(deserializationSchema) + .setFieldNames(fieldNames) + .setIndex(index) + .setScrollTimeout(scrollTimeout) + .setScrollMaxSize(scrollMaxSize) + .setPredicate(predicate) + .setLimit(limit) + .build(); + return builder; + } + @Override protected ElasticsearchSinkBase, RestHighLevelClient> createElasticsearchSinkForEmbeddedNode(