Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-25568] Support Elasticsearch Source Connector #62

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions flink-connector-elasticsearch-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ under the License.
<optional>true</optional>
</dependency>

<!-- Elasticsearch table descriptor testing -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>

<!-- Elasticsearch -->

<dependency>
Expand Down Expand Up @@ -173,6 +182,21 @@ under the License.
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<!-- ArchUit test dependencies -->

<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
package org.apache.flink.streaming.connectors.elasticsearch;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplitAssigner;

import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import static org.apache.flink.shaded.curator5.com.google.common.base.Preconditions.checkNotNull;

@Internal
public class ElasticSearchInputFormatBase<T, C extends AutoCloseable>
extends RichInputFormat<T, ElasticsearchInputSplit> implements ResultTypeQueryable<T> {

private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchInputFormatBase.class);
private static final long serialVersionUID = 1L;
private final DeserializationSchema<T> deserializationSchema;
kaori-seasons marked this conversation as resolved.
Show resolved Hide resolved
private final String index;
private final String type;

private final String[] fieldNames;
private final QueryBuilder predicate;
private final long limit;

private final long scrollTimeout;
private final int scrollSize;

private Scroll scroll;
private String currentScrollWindowId;
private String[] currentScrollWindowHits;

private int nextRecordIndex = 0;
private long currentReadCount = 0L;

/** Call bridge for different version-specific. */
private final ElasticsearchApiCallBridge<C> callBridge;

private final Map<String, String> userConfig;
/** Elasticsearch client created using the call bridge. */
private transient C client;

public ElasticSearchInputFormatBase(
ElasticsearchApiCallBridge<C> callBridge,
Map<String, String> userConfig,
DeserializationSchema<T> deserializationSchema,
String[] fieldNames,
String index,
String type,
long scrollTimeout,
int scrollSize,
QueryBuilder predicate,
long limit) {

this.callBridge = checkNotNull(callBridge);
checkNotNull(userConfig);
// copy config so we can remove entries without side-effects
this.userConfig = new HashMap<>(userConfig);

this.deserializationSchema = checkNotNull(deserializationSchema);
this.index = index;
this.type = type;

this.fieldNames = fieldNames;
this.predicate = predicate;
this.limit = limit;

this.scrollTimeout = scrollTimeout;
this.scrollSize = scrollSize;
}

@Override
public TypeInformation<T> getProducedType() {
return deserializationSchema.getProducedType();
}

@Override
public void configure(Configuration configuration) {

try {
client = callBridge.createClient();
callBridge.verifyClientConnection(client);
} catch (IOException ex) {
LOG.error("Exception while creating connection to Elasticsearch.", ex);
throw new RuntimeException("Cannot create connection to Elasticsearcg.", ex);
}
}

@Override
public BaseStatistics getStatistics(BaseStatistics baseStatistics) throws IOException {
return null;
}

@Override
public ElasticsearchInputSplit[] createInputSplits(int minNumSplits) throws IOException {
return callBridge.createInputSplitsInternal(client, index, type, minNumSplits);
}

@Override
public InputSplitAssigner getInputSplitAssigner(ElasticsearchInputSplit[] inputSplits) {
return new LocatableInputSplitAssigner(inputSplits);
}

@Override
public void open(ElasticsearchInputSplit split) throws IOException {
SearchRequest searchRequest = new SearchRequest(index);
if (type == null) {
searchRequest.types(Strings.EMPTY_ARRAY);
} else {
searchRequest.types(type);
}
this.scroll = new Scroll(TimeValue.timeValueMinutes(scrollTimeout));
searchRequest.scroll(scroll);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
int size;
if (limit > 0) {
size = (int) Math.min(limit, scrollSize);
} else {
size = scrollSize;
}
// elasticsearch default value is 10 here.
searchSourceBuilder.size(size);
searchSourceBuilder.fetchSource(fieldNames, null);
if (predicate != null) {
searchSourceBuilder.query(predicate);
} else {
searchSourceBuilder.query(QueryBuilders.matchAllQuery());
}
searchRequest.source(searchSourceBuilder);
searchRequest.preference("_shards:" + split.getShard());
Tuple2<String, String[]> searchResponse = null;
try {

searchResponse = callBridge.search(client, searchRequest);
} catch (IOException e) {
LOG.error("Search has error: ", e.getMessage());
}
if (searchResponse != null) {
currentScrollWindowId = searchResponse.f0;
currentScrollWindowHits = searchResponse.f1;
nextRecordIndex = 0;
}
}

@Override
public boolean reachedEnd() throws IOException {
if (limit > 0 && currentReadCount >= limit) {
return true;
}

// SearchResponse can be InternalSearchHits.empty(), and the InternalSearchHit[] EMPTY = new
// InternalSearchHit[0]
if (currentScrollWindowHits.length != 0
&& nextRecordIndex > currentScrollWindowHits.length - 1) {
fetchNextScrollWindow();
}

return currentScrollWindowHits.length == 0;
}

@Override
public T nextRecord(T t) throws IOException {
if (reachedEnd()) {
LOG.warn("Already reached the end of the split.");
}

String hit = currentScrollWindowHits[nextRecordIndex];
nextRecordIndex++;
currentReadCount++;
LOG.debug("Yielding new record for hit: " + hit);

return parseSearchHit(hit);
}

@Override
public void close() throws IOException {
callBridge.close(client);
}

private void fetchNextScrollWindow() {
Tuple2<String, String[]> searchResponse = null;
SearchScrollRequest scrollRequest = new SearchScrollRequest(currentScrollWindowId);
scrollRequest.scroll(scroll);

try {
searchResponse = callBridge.scroll(client, scrollRequest);
} catch (IOException e) {
LOG.error("Scroll failed: " + e.getMessage());
}

if (searchResponse != null) {
currentScrollWindowId = searchResponse.f0;
currentScrollWindowHits = searchResponse.f1;
nextRecordIndex = 0;
}
}

private T parseSearchHit(String hit) {
T row = null;
try {
row = deserializationSchema.deserialize(hit.getBytes());
} catch (IOException e) {
LOG.error("Deserialize search hit failed: " + e.getMessage());
}

return row;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchScrollRequest;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -63,6 +64,13 @@ public interface ElasticsearchApiCallBridge<C extends AutoCloseable> extends Ser
*/
BulkProcessor.Builder createBulkProcessorBuilder(C client, BulkProcessor.Listener listener);

ElasticsearchInputSplit[] createInputSplitsInternal(
C client, String index, String type, int minNumSplits);

Tuple2<String, String[]> scroll(C client, SearchScrollRequest searchScrollRequest)
throws IOException;


/**
* Executes a search using the Search API.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.apache.flink.streaming.connectors.elasticsearch;

import org.apache.flink.core.io.LocatableInputSplit;

public class ElasticsearchInputSplit extends LocatableInputSplit {

private static final long seriaVersionUID = 1L;

/** The name of the index to retrieve data from. */
private final String index;

/** It is null in flink elasticsearch connector 7+. */
private final String type;

/** Index will split diffirent shards when index created. */
private final int shard;

public ElasticsearchInputSplit(
int splitNumber, String[] hostnames, String index, String type, int shard) {
super(splitNumber, hostnames);
this.index = index;
this.type = type;
this.shard = shard;
}

public String getIndex() {
return index;
}

public String getType() {
return type;
}

public int getShard() {
return shard;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_INTERVAL_OPTION;
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.FAILURE_HANDLER_OPTION;
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION;
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.SCROLL_MAX_SIZE_OPTION;
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.SCROLL_TIMEOUT_OPTION;
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION;

/** Accessor methods to elasticsearch options. */
Expand Down Expand Up @@ -150,6 +152,26 @@ public Optional<String> getPathPrefix() {
return config.getOptional(ElasticsearchConnectorOptions.CONNECTION_PATH_PREFIX);
}

public Optional<Integer> getScrollMaxSize() {
return config.getOptional(SCROLL_MAX_SIZE_OPTION);
}

public Optional<Long> getScrollTimeout() {
return config.getOptional(SCROLL_TIMEOUT_OPTION).map(Duration::toMillis);
}

public long getCacheMaxSize() {
return config.get(ElasticsearchConnectorOptions.LOOKUP_CACHE_MAX_ROWS);
}

public Duration getCacheExpiredMs() {
return config.get(ElasticsearchConnectorOptions.LOOKUP_CACHE_TTL);
}

public int getMaxRetryTimes() {
return config.get(ElasticsearchConnectorOptions.LOOKUP_MAX_RETRIES);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Loading