Skip to content

Commit

Permalink
Merge pull request #29 from albertocsm/pr_elasticsearch_connector
Browse files Browse the repository at this point in the history
Added fields support to improve query performance
  • Loading branch information
FlavioF authored Nov 30, 2016
2 parents a1bba88 + cede992 commit 5d9f6da
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,12 @@ public class ElasticsearchQueryBuilder
final Client client;
final TupleDomain<ColumnHandle> tupleDomain;
final List<ElasticsearchColumnHandle> columns;
final boolean isToAddFields;

private final String index;
private final String type;

public ElasticsearchQueryBuilder(List<ElasticsearchColumnHandle> columnHandles, ElasticsearchSplit split, ElasticsearchClient elasticsearchClient)
public ElasticsearchQueryBuilder(List<ElasticsearchColumnHandle> columnHandles, ElasticsearchSplit split, ElasticsearchClient elasticsearchClient, boolean isToAddFields)
{
ElasticsearchTableSource tableSource = split.getUri();
String clusterName = tableSource.getClusterName();
Expand All @@ -70,6 +71,8 @@ public ElasticsearchQueryBuilder(List<ElasticsearchColumnHandle> columnHandles,

this.tupleDomain = split.getTupleDomain();
this.columns = columnHandles;

this.isToAddFields = isToAddFields;
}

public SearchRequestBuilder buildScrollSearchRequest()
Expand All @@ -82,6 +85,14 @@ public SearchRequestBuilder buildScrollSearchRequest()
.setQuery(getSearchQuery())
.setSize(SCROLL_SIZE); // per shard

// elasticsearch doesn't support adding fields when there is a nested type
if (isToAddFields) {
searchRequestBuilder.addFields(columns
.stream()
.map((c) -> c.getColumnJsonPath())
.toArray(size -> new String[size]));
}

return searchRequestBuilder;
}

Expand All @@ -98,12 +109,14 @@ private FilteredQueryBuilder getSearchQuery()

for (ElasticsearchColumnHandle column : columns) {
Type type = column.getColumnType();
// if (isAcceptedType(type)) {
Domain domain = tupleDomain.getDomains().get().get(column);
if (domain != null) {
boolFilterBuilder.must(addFilter(column.getColumnJsonPath(), domain, type));
}
// }
tupleDomain
.getDomains()
.ifPresent((e) -> {
Domain domain = e.get(column);
if (domain != null) {
boolFilterBuilder.must(addFilter(column.getColumnJsonPath(), domain, type));
}
});
}

return QueryBuilders.filteredQuery(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.airlift.slice.Slices;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -44,6 +45,7 @@ public class ElasticsearchRecordCursor
private final Iterator<SearchHit> lines;
private long totalBytes;
private List<Object> fields;
private final boolean isFieldQuery;

public ElasticsearchRecordCursor(List<ElasticsearchColumnHandle> columnHandles, ElasticsearchSplit split, ElasticsearchClient elasticsearchClient)
{
Expand All @@ -55,7 +57,13 @@ public ElasticsearchRecordCursor(List<ElasticsearchColumnHandle> columnHandles,
this.jsonPathToIndex.put(columnHandles.get(i).getColumnJsonPath(), i);
}

this.lines = getRows(new ElasticsearchQueryBuilder(columnHandles, split, elasticsearchClient)).iterator();
// in elasticsearch when there is nested types it is not possible to add fields in queries
this.isFieldQuery = columnHandles
.stream()
.filter((c) -> c.getColumnJsonType().equals("nested"))
.count() == 0;

this.lines = getRows(new ElasticsearchQueryBuilder(columnHandles, split, elasticsearchClient, isFieldQuery)).iterator();
}

@Override
Expand Down Expand Up @@ -96,12 +104,11 @@ public boolean advanceNextPosition()
setFieldIfExists("_id", hit.getId());
setFieldIfExists("_index", hit.getIndex());

Map<String, Object> map = hit.getSource();
for (Map.Entry<String, Object> entry : map.entrySet()) {
String jsonPath = entry.getKey().toString();
Object entryValue = entry.getValue();

setFieldIfExists(jsonPath, entryValue);
if (isFieldQuery) {
extractFromHitField(hit);
}
else {
extractFromSource(hit);
}

totalBytes += fields.size();
Expand Down Expand Up @@ -201,4 +208,26 @@ private Object getFieldValue(int field)
checkState(fields != null, "Cursor has not been advanced yet");
return fields.get(field);
}

private void extractFromHitField(SearchHit hit)
{
Map<String, SearchHitField> map = hit.getFields();
for (Map.Entry<String, SearchHitField> entry : map.entrySet()) {
String jsonPath = entry.getKey().toString();
Object entryValue = entry.getValue().getValue();

setFieldIfExists(jsonPath, entryValue);
}
}

private void extractFromSource(SearchHit hit)
{
Map<String, Object> map = hit.getSource();
for (Map.Entry<String, Object> entry : map.entrySet()) {
String jsonPath = entry.getKey().toString();
Object entryValue = entry.getValue();

setFieldIfExists(jsonPath, entryValue);
}
}
}

0 comments on commit 5d9f6da

Please sign in to comment.