diff --git a/pom.xml b/pom.xml index 3d4ba4f76918..f9494e2cf29f 100644 --- a/pom.xml +++ b/pom.xml @@ -98,6 +98,7 @@ presto-docs presto-verifier presto-testing-server-launcher + presto-elasticsearch @@ -133,6 +134,13 @@ zip + + com.facebook.presto + presto-elasticsearch + ${project.version} + zip + + com.facebook.presto presto-example-http diff --git a/presto-elasticsearch/pom.xml b/presto-elasticsearch/pom.xml index 6375bb7e76aa..fc1d20a6ff18 100644 --- a/presto-elasticsearch/pom.xml +++ b/presto-elasticsearch/pom.xml @@ -1,123 +1,188 @@ - - 4.0.0 - - com.facebook.presto - presto-root - 0.108-SNAPSHOT - - com.facebook.presto.elasticsearch - presto-elasticsearch - 1.0-SNAPSHOT - presto-elasticsearch - http://maven.apache.org - - - junit - junit - 3.8.1 - test - - - - org.elasticsearch - elasticsearch - 1.6.0 - - - + + 4.0.0 + com.facebook.presto - presto-spi - provided - - - - io.airlift - bootstrap - provided - - - - io.airlift - json - provided - - - - io.airlift - log - provided - - - - com.fasterxml.jackson.core - jackson-annotations - provided - - - - com.fasterxml.jackson.core - jackson-core - provided - - - - com.fasterxml.jackson.core - jackson-databind - provided - - - - io.airlift - units - provided - - - - io.airlift - configuration - provided - - - - io.airlift - slice - provided - - - - com.google.guava - guava - provided - - - - javax.inject - javax.inject - provided - - - - com.google.inject - guice - provided - - - - javax.validation - validation-api - provided - - - - org.json - json - 20080701 - - - - - - + presto-root + 0.131-SNAPSHOT + + presto-elasticsearch + Presto - Elasticsearch Connector + presto-plugin + + + ${project.parent.basedir} + + + + + + org.elasticsearch + elasticsearch + 1.6.0 + + + org.yaml + snakeyaml + + + org.ow2.asm + asm-all + + + org.ow2.asm + asm-commons + + + org.ow2.asm + asm + + + + + + org.json + json + 20080701 + + + + com.facebook.presto + presto-spi + provided + + + + io.airlift + bootstrap + provided + + + + io.airlift + json + provided + + + + io.airlift + log + provided + + + + com.fasterxml.jackson.core + jackson-annotations + provided + + + + com.fasterxml.jackson.core + jackson-core + provided + + + + com.fasterxml.jackson.core + jackson-databind + provided + + + + io.airlift + units + provided + + + + io.airlift + configuration + provided + + + + io.airlift + slice + provided + + + + com.google.guava + guava + provided + + + + javax.inject + javax.inject + provided + + + + com.google.inject + guice + provided + + + + javax.validation + validation-api + provided + + + + + com.facebook.presto + presto-main + test + + + + org.testng + testng + test + + + + io.airlift + testing + test + + + + io.airlift + http-server + test + + + + io.airlift + node + test + + + + javax.servlet + javax.servlet-api + test + + + + + + + + + com.mycila + license-maven-plugin + + + **/*.java + + + + + + + diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchClient.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchClient.java index d39a27be964e..dacdb7fafbdb 100644 --- a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchClient.java +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchClient.java @@ -2,37 +2,40 @@ package com.facebook.presto.elasticsearch; import com.facebook.presto.spi.type.Type; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.io.Resources; import io.airlift.json.JsonCodec; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; -import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.collect.ImmutableOpenMap; -import org.elasticsearch.common.hppc.cursors.ObjectObjectCursor; +import org.elasticsearch.common.hppc.cursors.ObjectCursor; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; import javax.inject.Inject; import java.io.IOException; -import java.net.MalformedURLException; import java.net.URI; import java.net.URL; -import java.util.*; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; import static com.facebook.presto.spi.type.BigintType.BIGINT; import static com.facebook.presto.spi.type.DoubleType.DOUBLE; @@ -41,10 +44,8 @@ import static com.google.common.collect.Iterables.transform; import static com.google.common.collect.Maps.transformValues; import static com.google.common.collect.Maps.uniqueIndex; -import static io.airlift.json.JsonCodec.listJsonCodec; import static java.nio.charset.StandardCharsets.UTF_8; - - +import static java.util.Locale.ENGLISH; public class ElasticsearchClient { @@ -68,58 +69,63 @@ public ElasticsearchClient(ElasticsearchConfig config, JsonCodec getSchemaNames() + { + return schemas.get().keySet(); + } - if(items.length != 2) { - /* - System.out.println("The items are :"); - for (String it : items) - { - System.out.println(it); - } - */ - return null; - //assert (items.length == 2); + public Set getTableNames(String schema) + { + checkNotNull(schema, "schema is null"); + Map tables = schemas.get().get(schema); + if (tables == null) { + return ImmutableSet.of(); } + return tables.keySet(); + } - String type = items[1]; - String path = items[0]; - Type prestoType = VARCHAR; // default. It will be corrected below + public ElasticsearchTable getTable(String schema, String tableName) + { + try { + updateSchemas(); + } + catch (IOException e) { + e.printStackTrace(); + } - // take only properties from dimensions and measurements for now - if(!(path.startsWith("measurements"))) return null; + checkNotNull(schema, "schema is null"); + checkNotNull(tableName, "tableName is null"); + Map tables = schemas.get().get(schema.toLowerCase(ENGLISH)); + if (tables == null) { + return null; + } + return tables.get(tableName.toLowerCase(ENGLISH)); + } + Map> updateSchemas() + throws IOException + { + schemas = Suppliers.memoize(schemasSupplier(catalogCodec, config.getMetadata())); - if(path.endsWith(".type")) - { - path = path.substring(0, path.lastIndexOf('.')); + Map> schemasMap = schemas.get(); + for (Map.Entry> schemaEntry : schemasMap.entrySet()) { - // replace '.properties.' with '.' - path = path.replaceAll("\\.properties\\.", "."); - } + Map tablesMap = schemaEntry.getValue(); + for (Map.Entry tableEntry : tablesMap.entrySet()) { - if(type.equals("double") || type.equals("float") || type.equals("integer") || type.equals("string")) - { - if(type.equals("double")) prestoType = DOUBLE; - else if(type.equals("float")) prestoType = DOUBLE; - else if(type.equals("integer")) prestoType = BIGINT; - else if(type.equals("long")) prestoType = BIGINT; - else if(type.equals("string")) prestoType = VARCHAR; + updateTableColumns(tableEntry.getValue()); + } } - else return null; + schemas = Suppliers.memoize(Suppliers.ofInstance(schemasMap)); - ElasticsearchColumn column = new ElasticsearchColumn(path.replaceAll("\\.","_"), prestoType, path, type); - return column; + return schemasMap; } - private void getColumns(ElasticsearchTableSource src, Set columns) throws ExecutionException, InterruptedException, IOException, JSONException { - - /* - Get the current set of columns for one of the sources of a table - */ - String hostaddress = src.getHostaddress(); + ImmutableOpenMap> getMappings(ElasticsearchTableSource src) + throws ExecutionException, InterruptedException + { int port = src.getPort(); + String hostaddress = src.getHostaddress(); String clusterName = src.getClusterName(); String index = src.getIndex(); String type = src.getType(); @@ -131,174 +137,186 @@ private void getColumns(ElasticsearchTableSource src, Set c System.out.println("index :" + index); System.out.println("type :" + type); - - Settings settings = ImmutableSettings.settingsBuilder() .put("cluster.name", clusterName) .build(); - Client client = new TransportClient(settings) - .addTransportAddress(new InetSocketTransportAddress( - hostaddress, port)); - - GetMappingsResponse res = client.admin().indices().getMappings(new GetMappingsRequest().indices(index).types(type)).get(); + try (Client client = new TransportClient(settings).addTransportAddress(new InetSocketTransportAddress(hostaddress, port))) { + GetMappingsRequest mappingsRequest = new GetMappingsRequest().types(type); - ImmutableOpenMap mapping = res.mappings().get(index); - for (ObjectObjectCursor c : mapping) { - //System.out.println(c.key+" = "+c.value.source()); - String data = c.value.source().toString(); - JSONObject json = new JSONObject(data); - json = json.getJSONObject(type).getJSONObject("properties"); - //System.out.println(json.toString(2)); - - List leaves = (new MyJSONTest()).getListJson(json); - for (String fieldPath_Type : leaves) - { - ElasticsearchColumn clm = makeColumn(fieldPath_Type); - if (!(clm == null)) { - columns.add(clm); - } + // an index is optional - if no index is configured for the table, it will retrieve all indices for the doc type + if (index != null && !index.isEmpty()) { + mappingsRequest.indices(index); } - //System.out.println("--------------------"); - } - client.close(); + return client + .admin() + .indices() + .getMappings(mappingsRequest) + .get() + .getMappings(); + } } - private List getColumnsMetadata(List columns) + Set getColumns(ElasticsearchTableSource src) + throws ExecutionException, InterruptedException, IOException, JSONException { - List columnsMetadata = new ArrayList<>(); - for (ElasticsearchColumn clm : columns) - { - columnsMetadata.add(new ElasticsearchColumnMetadata(clm)); - } - return columnsMetadata; - } + Set result = new HashSet(); + String type = src.getType(); + ImmutableOpenMap> allMappings = getMappings(src); + // what makes sense is to get the reunion of all the columns from all the mappings for the specified document type + for (ObjectCursor currentIndex : allMappings.keys()) { - private void updateTableColumns_ColumnsMetadata(ElasticsearchTable table) - { - Set columns = new HashSet(); - for(ElasticsearchTableSource src : table.getSources()) - { - try { - getColumns(src,columns); - } catch (ExecutionException e) { - e.printStackTrace(); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (IOException e) { - e.printStackTrace(); - } catch (JSONException e) { - System.out.println("JSONException caught !!!"); - e.printStackTrace(); - System.out.println("JSONException caught !!!"); + MappingMetaData mappingMetaData = allMappings.get(currentIndex.value).get(type); + JSONObject json = new JSONObject(mappingMetaData.source().toString()) + .getJSONObject(type) + .getJSONObject("properties"); + + List allColumnMetadata = getColumnsMetadata(null, json); + for (String columnMetadata : allColumnMetadata) { + ElasticsearchColumn clm = createColumn(columnMetadata); + if (!(clm == null)) { + result.add(clm); + } } } - List columnsList = new ArrayList(columns); - table.setColumns(columnsList); - table.setColumnsMetadata(getColumnsMetadata(columnsList)); + return result; } - private void updateSchemas() throws IOException + List getColumnsMetadata(String parent, JSONObject json) + throws JSONException { - // load from the metadata json file - schemas = Suppliers.memoize(schemasSupplier(catalogCodec, config.getMetadata())); + List leaves = new ArrayList(); - Map> schemasMap = schemas.get(); - for (Map.Entry> entry : schemasMap.entrySet()) { + Iterator it = json.keys(); + while (it.hasNext()) { + String key = (String) it.next(); + Object child = json.get(key); + String childKey = parent == null || parent.isEmpty() ? key : parent.concat(".").concat(key); - Map tablesMap = entry.getValue(); - for (Map.Entry tableEntry : tablesMap.entrySet()) { - updateTableColumns_ColumnsMetadata(tableEntry.getValue()); + if (child instanceof JSONObject) { + leaves.addAll(getColumnsMetadata(childKey, (JSONObject) child)); + } + else if (child instanceof JSONArray) { + // ignoring arrays for now + continue; + } + else { + leaves.add(childKey.concat(":").concat(child.toString())); } } - //schemas = Suppliers.ofInstance(schemasMap); - schemas = Suppliers.memoize(Suppliers.ofInstance(schemasMap)); - } - - public Set getSchemaNames() { - //System.out.println("mark : getSchemaNames()"); - return schemas.get().keySet(); + return leaves; } - public Set getTableNames(String schema) + ElasticsearchColumn createColumn(String fieldPath_Type) + throws JSONException, IOException { - checkNotNull(schema, "schema is null"); - Map tables = schemas.get().get(schema); - if (tables == null) { - return ImmutableSet.of(); - } - return tables.keySet(); - } + String[] items = fieldPath_Type.split(":"); + String type = items[1]; + String path = items[0]; + Type prestoType; - public ElasticsearchTable getTable(String schema, String tableName) - { - try { - this.updateSchemas(); - } catch (IOException e) { - e.printStackTrace(); + if (items.length != 2) { + System.out.println("Invalid column path format. Ignoring..."); + return null; + } + if (!path.endsWith(".type")) { + System.out.println("Invalid column has no type info. Ignoring..."); + return null; } - checkNotNull(schema, "schema is null"); - checkNotNull(tableName, "tableName is null"); - Map tables = schemas.get().get(schema); - if (tables == null) { + if (path.contains(".properties.")) { + System.out.println("Invalid complex column type. Ignoring..."); return null; } - return tables.get(tableName); + + switch (type) { + + case "double": + case "float": + prestoType = DOUBLE; + break; + case "integer": + case "long": + prestoType = BIGINT; + break; + case "string": + prestoType = VARCHAR; + break; + default: + System.out.println("Unsupported column type. Ignoring..."); + return null; + } + + path = path.substring(0, path.lastIndexOf('.')); + //path = path.replaceAll("\\.properties\\.", "."); + return new ElasticsearchColumn(path.replaceAll("\\.", "_"), prestoType, path, type); } - private static Supplier>> schemasSupplier(final JsonCodec>> catalogCodec, final URI metadataUri) + void updateTableColumns(ElasticsearchTable table) { + Set columns = new HashSet(); - return () -> { + // the table can have multiple sources + // the column set should be the reunion of all + for (ElasticsearchTableSource src : table.getSources()) { try { - //System.out.println("mark : executing method schemasSupplier() :"); - return lookupSchemas(metadataUri, catalogCodec); + columns.addAll(getColumns(src)); } - catch (IOException e) { - throw Throwables.propagate(e); + catch (ExecutionException | InterruptedException | IOException | JSONException e) { + e.printStackTrace(); } - }; - + } + table.setColumns(columns + .stream() + .collect(Collectors.toList())); + table.setColumnsMetadata(columns + .stream() + .map(ElasticsearchColumnMetadata::new) + .collect(Collectors.toList())); } - private static Map> lookupSchemas(URI metadataUri, JsonCodec>> catalogCodec) + static Map> lookupSchemas(URI metadataUri, JsonCodec>> catalogCodec) throws IOException { - // This function is called in the constructor of ElasticsearchClient - //System.out.println("mark : in method lookupSchemas()"); - - URL result = metadataUri.toURL(); - System.out.println("result : " + result); + URL url = metadataUri.toURL(); + System.out.println("url: " + url); - String json = Resources.toString(result, UTF_8); - System.out.println("json : " + json); + String tableMappings = Resources.toString(url, UTF_8); + System.out.println("tableMappings: " + tableMappings); - Map> catalog = catalogCodec.fromJson(json); + Map> catalog = catalogCodec.fromJson(tableMappings); - return ImmutableMap.copyOf(transformValues(catalog, resolveAndIndexTables(metadataUri))); + return ImmutableMap.copyOf( + transformValues( + catalog, + resolveAndIndexTablesFunction())); } - private static Function, Map> resolveAndIndexTables(final URI metadataUri) + static Supplier>> schemasSupplier(final JsonCodec>> catalogCodec, final URI metadataUri) { - return tables -> { - Iterable resolvedTables = transform(tables, tableUriResolver(metadataUri)); - return ImmutableMap.copyOf(uniqueIndex(resolvedTables, ElasticsearchTable::getName)); + return () -> { + try { + return lookupSchemas(metadataUri, catalogCodec); + } + catch (IOException e) { + throw Throwables.propagate(e); + } }; } - private static Function tableUriResolver(final URI baseUri) + static Function, Map> resolveAndIndexTablesFunction() { - return table -> { - //List sources = ImmutableList.copyOf(transform(table.getSources(), baseUri::resolve)); - List sources = table.getSources(); - return new ElasticsearchTable(table.getName(), table.getColumns(), sources); - }; + return tables -> ImmutableMap.copyOf( + uniqueIndex( + transform( + tables, + table -> new ElasticsearchTable(table.getName(), table.getSources())), + ElasticsearchTable::getName)); } } diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConnectorFactory.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConnectorFactory.java index edf037eaa0ce..a29d493a3bf6 100644 --- a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConnectorFactory.java +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConnectorFactory.java @@ -3,6 +3,7 @@ import com.facebook.presto.spi.Connector; import com.facebook.presto.spi.ConnectorFactory; +import com.facebook.presto.spi.classloader.ThreadContextClassLoader; import com.facebook.presto.spi.type.TypeManager; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; @@ -13,17 +14,20 @@ import java.util.Map; import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Objects.requireNonNull; public class ElasticsearchConnectorFactory implements ConnectorFactory { private final TypeManager typeManager; private final Map optionalConfig; + private final ClassLoader classLoader; - public ElasticsearchConnectorFactory(TypeManager typeManager, Map optionalConfig) + public ElasticsearchConnectorFactory(TypeManager typeManager, Map optionalConfig, ClassLoader classLoader) { - this.typeManager = checkNotNull(typeManager, "typeManager is null"); - this.optionalConfig = ImmutableMap.copyOf(checkNotNull(optionalConfig, "optionalConfig is null")); + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.optionalConfig = ImmutableMap.copyOf(requireNonNull(optionalConfig, "optionalConfig is null")); + this.classLoader = requireNonNull(classLoader, "classLoader is null"); } @Override @@ -35,11 +39,10 @@ public String getName() @Override public Connector create(final String connectorId, Map requiredConfig) { - checkNotNull(requiredConfig, "requiredConfig is null"); - checkNotNull(optionalConfig, "optionalConfig is null"); + requireNonNull(requiredConfig, "requiredConfig is null"); + requireNonNull(optionalConfig, "optionalConfig is null"); - try { - // A plugin is not required to use Guice; it is just very convenient + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { Bootstrap app = new Bootstrap( new JsonModule(), new ElasticsearchModule(connectorId, typeManager)); @@ -51,8 +54,6 @@ public Connector create(final String connectorId, Map requiredCo .setOptionalConfigurationProperties(optionalConfig) .initialize(); - - return injector.getInstance(ElasticsearchConnector.class); } catch (Exception e) { diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchMetadata.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchMetadata.java index 82f71c65108b..c694b75a80a1 100644 --- a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchMetadata.java +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchMetadata.java @@ -1,12 +1,12 @@ package com.facebook.presto.elasticsearch; -import com.facebook.presto.spi.ColumnMetadata; import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ColumnMetadata; +import com.facebook.presto.spi.ConnectorMetadata; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.ConnectorTableHandle; import com.facebook.presto.spi.ConnectorTableMetadata; -import com.facebook.presto.spi.ReadOnlyConnectorMetadata; import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.spi.SchemaTablePrefix; import com.facebook.presto.spi.TableNotFoundException; @@ -25,7 +25,7 @@ import static com.google.common.base.Preconditions.checkNotNull; public class ElasticsearchMetadata - extends ReadOnlyConnectorMetadata + implements ConnectorMetadata { private final String connectorId; @@ -65,7 +65,12 @@ public ElasticsearchTableHandle getTableHandle(ConnectorSession session, SchemaT } @Override - public ConnectorTableMetadata getTableMetadata(ConnectorTableHandle table) + public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table) + { + return getTableMetadata(table); + } + + private ConnectorTableMetadata getTableMetadata(ConnectorTableHandle table) { ElasticsearchTableHandle elasticsearchTableHandle = checkType(table, ElasticsearchTableHandle.class, "table"); checkArgument(elasticsearchTableHandle.getConnectorId().equals(connectorId), "tableHandle is not for this connector"); @@ -95,13 +100,12 @@ public List listTables(ConnectorSession session, String schemaN } @Override - public ColumnHandle getSampleWeightColumnHandle(ConnectorTableHandle tableHandle) + public Map getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle) { - return null; + return getColumnHandles(tableHandle); } - @Override - public Map getColumnHandles(ConnectorTableHandle tableHandle) + private Map getColumnHandles(ConnectorTableHandle tableHandle) { ElasticsearchTableHandle elasticsearchTableHandle = checkType(tableHandle, ElasticsearchTableHandle.class, "tableHandle"); checkArgument(elasticsearchTableHandle.getConnectorId().equals(connectorId), "tableHandle is not for this connector"); @@ -114,14 +118,33 @@ public Map getColumnHandles(ConnectorTableHandle tableHand ImmutableMap.Builder columnHandles = ImmutableMap.builder(); int index = 0; for (ColumnMetadata column : table.getColumnsMetadata()) { - ElasticsearchColumnMetadata esColumn = (ElasticsearchColumnMetadata)column; - columnHandles.put(esColumn.getName(), new ElasticsearchColumnHandle(connectorId, column.getName(), column.getType(), - esColumn.getJsonPath(), esColumn.getJsonType(), index)); + ElasticsearchColumnMetadata esColumn = (ElasticsearchColumnMetadata) column; + columnHandles.put( + esColumn.getName(), + new ElasticsearchColumnHandle( + connectorId, + column.getName(), + column.getType(), + esColumn.getJsonPath(), + esColumn.getJsonType(), + index)); index++; } return columnHandles.build(); } + @Override + public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle) + { + return getColumnMetadata(tableHandle, columnHandle); + } + + private ColumnMetadata getColumnMetadata(ConnectorTableHandle tableHandle, ColumnHandle columnHandle) + { + checkType(tableHandle, ElasticsearchTableHandle.class, "tableHandle"); + return checkType(columnHandle, ElasticsearchColumnHandle.class, "columnHandle").getColumnMetadata(); + } + @Override public Map> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix) { @@ -158,11 +181,4 @@ private List listTables(ConnectorSession session, SchemaTablePr } return ImmutableList.of(new SchemaTableName(prefix.getSchemaName(), prefix.getTableName())); } - - @Override - public ColumnMetadata getColumnMetadata(ConnectorTableHandle tableHandle, ColumnHandle columnHandle) - { - checkType(tableHandle, ElasticsearchTableHandle.class, "tableHandle"); - return checkType(columnHandle, ElasticsearchColumnHandle.class, "columnHandle").getColumnMetadata(); - } } diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchPartition.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchPartition.java index 7cf9249af767..c5bc3ebb4712 100644 --- a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchPartition.java +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchPartition.java @@ -3,7 +3,7 @@ import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorPartition; -import com.facebook.presto.spi.TupleDomain; +import com.facebook.presto.spi.predicate.TupleDomain; import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkNotNull; diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchPlugin.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchPlugin.java index d5b01cf49bfc..8a2afc124848 100644 --- a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchPlugin.java +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchPlugin.java @@ -6,13 +6,16 @@ import com.facebook.presto.spi.type.TypeManager; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; import javax.inject.Inject; import java.util.List; import java.util.Map; -import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.MoreObjects.firstNonNull; +import static java.util.Objects.requireNonNull; public class ElasticsearchPlugin implements Plugin @@ -23,7 +26,7 @@ public class ElasticsearchPlugin @Override public synchronized void setOptionalConfig(Map optionalConfig) { - this.optionalConfig = ImmutableMap.copyOf(checkNotNull(optionalConfig, "optionalConfig is null")); + this.optionalConfig = ImmutableMap.copyOf(requireNonNull(optionalConfig, "optionalConfig is null")); } @Inject @@ -41,8 +44,13 @@ public synchronized Map getOptionalConfig() public synchronized List getServices(Class type) { if (type == ConnectorFactory.class) { - return ImmutableList.of(type.cast(new ElasticsearchConnectorFactory(typeManager, getOptionalConfig()))); + return ImmutableList.of(type.cast(new ElasticsearchConnectorFactory(typeManager, getOptionalConfig(), getClassLoader()))); } return ImmutableList.of(); } + + private static ClassLoader getClassLoader() + { + return firstNonNull(Thread.currentThread().getContextClassLoader(), ElasticsearchPlugin.class.getClassLoader()); + } } diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchRecordCursor.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchRecordCursor.java index 61b9c83eaaf9..3398fdebf9f7 100644 --- a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchRecordCursor.java +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchRecordCursor.java @@ -3,28 +3,28 @@ import com.facebook.presto.spi.RecordCursor; import com.facebook.presto.spi.type.Type; -import com.google.common.base.Splitter; import com.google.common.base.Strings; -import com.google.common.base.Throwables; -import com.google.common.io.ByteSource; -import com.google.common.io.CountingInputStream; import io.airlift.slice.Slice; import io.airlift.slice.Slices; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHitField; -import org.elasticsearch.common.settings.ImmutableSettings; -import org.elasticsearch.common.settings.Settings; - -import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; import static com.facebook.presto.spi.type.BigintType.BIGINT; import static com.facebook.presto.spi.type.BooleanType.BOOLEAN; @@ -32,114 +32,29 @@ import static com.facebook.presto.spi.type.VarcharType.VARCHAR; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; -import static java.nio.charset.StandardCharsets.UTF_8; public class ElasticsearchRecordCursor implements RecordCursor { - //private static final Splitter LINE_SPLITTER = Splitter.on(",").trimResults(); - private final List columnHandles; - //private final int[] fieldToColumnIndex; - Map jsonpathToIndex = new HashMap(); - + private final Map jsonPathToIndex; private final Iterator lines; private long totalBytes; - private List fields; public ElasticsearchRecordCursor(List columnHandles, ElasticsearchTableSource tableSource) { this.columnHandles = columnHandles; + this.jsonPathToIndex = new HashMap(); + this.totalBytes = 0; + ArrayList fieldsNeeded = new ArrayList(); - //fieldToColumnIndex = new int[columnHandles.size()]; - for (int i = 0; i < columnHandles.size(); i++) { - ElasticsearchColumnHandle columnHandle = columnHandles.get(i); - //fieldToColumnIndex[i] = columnHandle.getOrdinalPosition(); - //jsonpathToIndex.put(columnHandle.getColumnJsonPath(), columnHandle.getOrdinalPosition()); - - jsonpathToIndex.put(columnHandle.getColumnJsonPath(), i); - } - - /* - try (CountingInputStream input = new CountingInputStream(byteSource.openStream())) { - lines = byteSource.asCharSource(UTF_8).readLines().iterator(); - totalBytes = input.getCount(); - } - catch (IOException e) { - throw Throwables.propagate(e); - } - */ - - Settings settings = ImmutableSettings.settingsBuilder() - .put("cluster.name", tableSource.getClusterName()) - /*.put("client.transport.sniff", true)*/.build(); - - Client client = new TransportClient(settings) - .addTransportAddress(new InetSocketTransportAddress( - tableSource.getHostaddress(), tableSource.getPort())); - - - //String[] fields = new String[] {"user", "dim.age" , "measurements.FACEBOOK_PAGE_CONSUMPTIONS_UNIQUE"}; - ArrayList fieldsNeeded = new ArrayList(); for (int i = 0; i < columnHandles.size(); i++) { - ElasticsearchColumnHandle columnHandle = columnHandles.get(i); - fieldsNeeded.add(columnHandle.getColumnJsonPath()); + this.jsonPathToIndex.put(columnHandles.get(i).getColumnJsonPath(), i); + fieldsNeeded.add(columnHandles.get(i).getColumnJsonPath()); } - /*SearchResponse response = client.prepareSearch(tableSource.getIndex()) - .setTypes(tableSource.getType()) - //.setQuery(QueryBuilders.termQuery("dimensions.SN_TYPE", "facebook")) - .addFields(fieldsNeeded.toArray(new String[fieldsNeeded.size()])) - .setFrom(0).setSize(1000000).setExplain(true) - .execute() - .actionGet(); - lines = Arrays.asList(response.getHits().getHits()).iterator();*/ - - - - lines = getRows_faster(client, tableSource, fieldsNeeded).iterator(); - - totalBytes = 0; - - client.close(); - } - - private List getRows(Client client, ElasticsearchTableSource tableSource, ArrayList fieldsNeeded) - { - SearchResponse response = client.prepareSearch(tableSource.getIndex()) - .setTypes(tableSource.getType()) - //.setQuery(QueryBuilders.termQuery("dimensions.SN_TYPE", "facebook")) - .addFields(fieldsNeeded.toArray(new String[fieldsNeeded.size()])) - .setFrom(0).setSize(1000000).setExplain(true) - .execute() - .actionGet(); - return Arrays.asList(response.getHits().getHits()); - } - - private List getRows_faster(Client client, ElasticsearchTableSource tableSource, ArrayList fieldsNeeded) - { - List rows = new ArrayList<>(); - SearchResponse scrollResp = client.prepareSearch(tableSource.getIndex()) - .setTypes(tableSource.getType()) - .addFields(fieldsNeeded.toArray(new String[fieldsNeeded.size()])) - .setSearchType(SearchType.SCAN) - .setScroll(new TimeValue(60000)) - .setSize(20000).execute().actionGet(); //20000 hits per shard will be returned for each scroll - //Scroll until no hits are returned - while (true) { - - for (SearchHit hit : scrollResp.getHits().getHits()) { - //Handle the hit... - rows.add(hit); - } - scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet(); - //Break condition: No hits are returned - if (scrollResp.getHits().getHits().length == 0) { - break; - } - } - return rows; + this.lines = getRows(tableSource, fieldsNeeded).iterator(); } @Override @@ -175,40 +90,25 @@ public boolean advanceNextPosition() } SearchHit hit = lines.next(); - //fields = LINE_SPLITTER.splitToList(line); - fields = new ArrayList(Collections.nCopies(columnHandles.size(), "-1")); + fields = new ArrayList(Collections.nCopies(columnHandles.size(), "-1")); - Map map = hit.getFields(); + Map map = hit.getFields(); for (Map.Entry entry : map.entrySet()) { String jsonPath = entry.getKey().toString(); - SearchHitField fieldvar = entry.getValue(); + SearchHitField entryValue = entry.getValue(); - // we get the value , wrapped in a list (of size 1 ofcourse) -> [value] (The java api returns in this way) - ArrayList lis = new ArrayList(fieldvar.getValues()); - // get the value + // we get the value, wrapped in a list (of size 1 of course) -> [value] (The java api returns in this way) + ArrayList lis = new ArrayList(entryValue.getValues()); String value = String.valueOf(lis.get(0)); - fields.set(jsonpathToIndex.get(jsonPath), value); - - - //System.out.println("key, " + path + " value " + lis.get(0) ); + fields.set(jsonPathToIndex.get(jsonPath), value); } totalBytes += fields.size(); - return true; } - private String getFieldValue(int field) - { - checkState(fields != null, "Cursor has not been advanced yet"); - - //int columnIndex = fieldToColumnIndex[field]; - //return fields.get(columnIndex); - return fields.get(field); - } - @Override public boolean getBoolean(int field) { @@ -237,6 +137,12 @@ public Slice getSlice(int field) return Slices.utf8Slice(getFieldValue(field)); } + @Override + public Object getObject(int field) + { + return null; + } + @Override public boolean isNull(int field) { @@ -254,4 +160,71 @@ private void checkFieldType(int field, Type expected) public void close() { } + + String[] getIndices(Client client, String type) + { + return Arrays.asList(client + .admin() + .cluster() + .prepareState() + .execute() + .actionGet() + .getState() + .getMetaData() + .concreteAllIndices()) + .stream() + .filter(e -> e.startsWith(type.concat("_"))) + .toArray(size -> new String[size]); + } + + List getRows(ElasticsearchTableSource tableSource, ArrayList fieldsNeeded) + { + List result = new ArrayList<>(); + int port = tableSource.getPort(); + String hostaddress = tableSource.getHostaddress(); + String type = tableSource.getType(); + + System.out.println("type :" + type); + System.out.println("hostaddress :" + hostaddress); + System.out.println("port :" + port); + + Settings settings = ImmutableSettings.settingsBuilder() + .put("cluster.name", tableSource.getClusterName()) + .build(); + try (Client client = new TransportClient(settings).addTransportAddress(new InetSocketTransportAddress(hostaddress, port))) { + + SearchResponse scrollResp = client + .prepareSearch(getIndices(client, type)) + .setTypes(tableSource.getType()) + .addFields(fieldsNeeded.toArray(new String[fieldsNeeded.size()])) + .setSearchType(SearchType.SCAN) + .setScroll(new TimeValue(60000)) + .setSize(20000).execute() + .actionGet(); //20000 hits per shard will be returned for each scroll + + //Scroll until no hits are returned + while (true) { + + for (SearchHit hit : scrollResp.getHits().getHits()) { + result.add(hit); + } + + scrollResp = client + .prepareSearchScroll(scrollResp.getScrollId()) + .setScroll(new TimeValue(600000)).execute().actionGet(); + + if (scrollResp.getHits().getHits().length == 0) { + break; + } + } + } + + return result; + } + + String getFieldValue(int field) + { + checkState(fields != null, "Cursor has not been advanced yet"); + return fields.get(field); + } } diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchRecordSetProvider.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchRecordSetProvider.java index 3da14ca6bf43..40cbfed0e1f6 100644 --- a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchRecordSetProvider.java +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchRecordSetProvider.java @@ -3,6 +3,7 @@ import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorRecordSetProvider; +import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.ConnectorSplit; import com.facebook.presto.spi.RecordSet; import com.google.common.collect.ImmutableList; @@ -27,7 +28,11 @@ public ElasticsearchRecordSetProvider(ElasticsearchConnectorId connectorId) } @Override - public RecordSet getRecordSet(ConnectorSplit split, List columns) + public RecordSet getRecordSet(ConnectorSession session, ConnectorSplit split, List columns) { + return getRecordSet(split, columns); + } + + private RecordSet getRecordSet(ConnectorSplit split, List columns) { checkNotNull(split, "partitionChunk is null"); ElasticsearchSplit elasticsearchSplit = checkType(split, ElasticsearchSplit.class, "split"); diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchSplitManager.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchSplitManager.java index 227dcd7a1e58..a5b58d121adb 100644 --- a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchSplitManager.java +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchSplitManager.java @@ -4,12 +4,13 @@ import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorPartition; import com.facebook.presto.spi.ConnectorPartitionResult; +import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.ConnectorSplit; import com.facebook.presto.spi.ConnectorSplitManager; import com.facebook.presto.spi.ConnectorSplitSource; import com.facebook.presto.spi.ConnectorTableHandle; import com.facebook.presto.spi.FixedSplitSource; -import com.facebook.presto.spi.TupleDomain; +import com.facebook.presto.spi.predicate.TupleDomain; import com.google.common.collect.ImmutableList; import javax.inject.Inject; @@ -38,7 +39,11 @@ public ElasticsearchSplitManager(ElasticsearchConnectorId connectorId, Elasticse } @Override - public ConnectorPartitionResult getPartitions(ConnectorTableHandle tableHandle, TupleDomain tupleDomain) + public ConnectorPartitionResult getPartitions(ConnectorSession session, ConnectorTableHandle table, TupleDomain tupleDomain) { + return getPartitions(table, tupleDomain); + } + + private ConnectorPartitionResult getPartitions(ConnectorTableHandle tableHandle, TupleDomain tupleDomain) { ElasticsearchTableHandle elasticsearchTableHandle = checkType(tableHandle, ElasticsearchTableHandle.class, "tableHandle"); @@ -48,7 +53,12 @@ public ConnectorPartitionResult getPartitions(ConnectorTableHandle tableHandle, return new ConnectorPartitionResult(partitions, tupleDomain); } + @Override + public ConnectorSplitSource getPartitionSplits(ConnectorSession session, ConnectorTableHandle table, List partitions) { + return getPartitionSplits(table, partitions); + } + public ConnectorSplitSource getPartitionSplits(ConnectorTableHandle tableHandle, List partitions) { checkNotNull(partitions, "partitions is null"); diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchTable.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchTable.java index 719fe35c471d..16adf9f8f75e 100644 --- a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchTable.java +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchTable.java @@ -5,13 +5,13 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; -import java.net.URI; import java.util.ArrayList; import java.util.List; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Strings.isNullOrEmpty; +import static java.util.Locale.ENGLISH; public class ElasticsearchTable { @@ -24,19 +24,11 @@ public class ElasticsearchTable @JsonCreator public ElasticsearchTable( @JsonProperty("name") String name, - @JsonProperty("columns") List columns, @JsonProperty("sources") List sources) { checkArgument(!isNullOrEmpty(name), "name is null or is empty"); - this.name = checkNotNull(name, "name is null"); - this.columns = ImmutableList.copyOf(checkNotNull(columns, "columns is null")); + this.name = checkNotNull(name.toLowerCase(ENGLISH), "name is null"); this.sources = ImmutableList.copyOf(checkNotNull(sources, "sources is null")); - - ImmutableList.Builder columnsMetadata = ImmutableList.builder(); - for (ElasticsearchColumn column : this.columns) { - columnsMetadata.add(new ElasticsearchColumnMetadata(column.getName(), column.getType(), column.getJsonPath(), column.getJsonType(), false)); - } - this.columnsMetadata = columnsMetadata.build(); } @JsonProperty @@ -51,7 +43,8 @@ public List getColumns() return columns; } - public void setColumns(List columns) { + public void setColumns(List columns) + { this.columns = columns; } @@ -72,7 +65,8 @@ public List getColumnsMetadata() return new ArrayList<>(columnsMetadata); } - public void setColumnsMetadata(List columnsMetadata) { + public void setColumnsMetadata(List columnsMetadata) + { this.columnsMetadata = columnsMetadata; } } diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchTableHandle.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchTableHandle.java index 2b57ed3f659b..f7535d9e32fd 100644 --- a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchTableHandle.java +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchTableHandle.java @@ -10,6 +10,7 @@ import java.util.Objects; import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Locale.ENGLISH; public final class ElasticsearchTableHandle implements ConnectorTableHandle @@ -24,9 +25,9 @@ public ElasticsearchTableHandle( @JsonProperty("schemaName") String schemaName, @JsonProperty("tableName") String tableName) { - this.connectorId = checkNotNull(connectorId, "connectorId is null"); - this.schemaName = checkNotNull(schemaName, "schemaName is null"); - this.tableName = checkNotNull(tableName, "tableName is null"); + this.connectorId = checkNotNull(connectorId.toLowerCase(ENGLISH), "connectorId is null"); + this.schemaName = checkNotNull(schemaName.toLowerCase(ENGLISH), "schemaName is null"); + this.tableName = checkNotNull(tableName.toLowerCase(ENGLISH), "tableName is null"); } @JsonProperty diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchTableSource.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchTableSource.java index 18fb8d3ac06d..d6b31a7cd801 100644 --- a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchTableSource.java +++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchTableSource.java @@ -5,10 +5,8 @@ import static com.google.common.base.Preconditions.checkNotNull; -/** - * Created by sprinklr on 03/07/15. - */ -public class ElasticsearchTableSource { +public class ElasticsearchTableSource +{ private String hostaddress; private int port; @@ -27,32 +25,37 @@ public ElasticsearchTableSource( this.hostaddress = checkNotNull(hostaddress, "hostaddress is null"); this.port = checkNotNull(port, "port is null"); this.clusterName = checkNotNull(clusterName, "clusterName is null"); - this.index = checkNotNull(index, "index is null"); + this.index = index; this.type = checkNotNull(type, "type is null"); } @JsonProperty - public String getHostaddress() { + public String getHostaddress() + { return hostaddress; } @JsonProperty - public int getPort() { + public int getPort() + { return port; } @JsonProperty - public String getClusterName() { + public String getClusterName() + { return clusterName; } @JsonProperty - public String getIndex() { + public String getIndex() + { return index; } @JsonProperty - public String getType() { + public String getType() + { return type; } } diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/MyJSONTest.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/MyJSONTest.java deleted file mode 100644 index 58345538b8df..000000000000 --- a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/MyJSONTest.java +++ /dev/null @@ -1,76 +0,0 @@ -package com.facebook.presto.elasticsearch; - -// http://stackoverflow.com/questions/26183948/output-list-of-all-paths-to-leaf-nodes-in-a-json-document-in-java - -import org.json.JSONArray; -import org.json.JSONException; -import org.json.JSONObject; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -public class MyJSONTest { - - /* - leaf is of the form -> path:obj - */ - private ArrayList leaves; - - public MyJSONTest() - { - leaves = new ArrayList(); - } - - public List getListJson(JSONObject json) throws JSONException { - listJSONObject("", json); - return leaves; - } - - private void listObject(String parent, Object data) throws JSONException { - if (data instanceof JSONObject) { - listJSONObject(parent, (JSONObject)data); - } else if (data instanceof JSONArray) { - listJSONArray(parent, (JSONArray) data); - } else { - listPrimitive(parent, data); - } - } - - private void listJSONObject(String parent, JSONObject json) throws JSONException { - Iterator it = json.keys(); - while (it.hasNext()) { - String key = (String)it.next(); - Object child = json.get(key); - String childKey = parent.isEmpty() ? key : parent + "." + key; - listObject(childKey, child); - } - } - - private void listJSONArray(String parent, JSONArray json) throws JSONException { - for (int i = 0; i < json.length(); i++) { - Object data = json.get(i); - listObject(parent, data); - } - } - - private void listPrimitive(String parent, Object obj) { - //System.out.println(parent + ":" + obj); - leaves.add(parent + ":" + obj.toString()); - } - - public static void main(String[] args) throws JSONException { - String data = "{\"store\":{\"book\":[{\"category\":\"reference\",\"author\":\"NigelRees\",\"title\":\"SayingsoftheCentury\",\"price\":8.95},{\"category\":\"fiction\",\"author\":\"HermanMelville\",\"title\":\"MobyDick\",\"isbn\":\"0-553-21311-3\",\"price\":8.99},],\"bicycle\":{\"color\":\"red\",\"price\":19.95}},\"expensive\":10}"; - JSONObject json = new JSONObject(data); - System.out.println(json.get("store")); - //System.out.println(json.toString(2)); - List leaves = (new MyJSONTest()).getListJson(json); - - for(String s : leaves) - { - System.out.println(s); - System.out.println("....."); - } - } - -} \ No newline at end of file diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/TestClient.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/TestClient.java deleted file mode 100644 index e41ea47a8349..000000000000 --- a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/TestClient.java +++ /dev/null @@ -1,51 +0,0 @@ - -package com.facebook.presto.elasticsearch; - -import com.google.common.base.Function; -import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.io.Resources; -import io.airlift.json.JsonCodec; - -import javax.inject.Inject; - -import java.io.IOException; -import java.net.MalformedURLException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URL; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ExecutionException; - - -import static com.google.common.collect.Iterables.transform; -import static com.google.common.collect.Maps.transformValues; -import static com.google.common.collect.Maps.uniqueIndex; -import static io.airlift.json.JsonCodec.listJsonCodec; -import static java.nio.charset.StandardCharsets.UTF_8; -import static io.airlift.json.JsonCodec.*; - -//import org.json.JSONException; -//import org.json.JSONObject; - -public class TestClient { - - - - public static void main( String[] args ) throws InterruptedException, IOException, ExecutionException { - - //getSpecificFields(); - //getJsonKeys_IndexType(); - - - System.out.println("Hello World!"); - } - - -} diff --git a/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/AppTest.java b/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/AppTest.java deleted file mode 100644 index 880e5969e160..000000000000 --- a/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/AppTest.java +++ /dev/null @@ -1,39 +0,0 @@ -package com.facebook.presto.elasticsearch; - -import junit.framework.Test; -import junit.framework.TestCase; -import junit.framework.TestSuite; - -/** - * Unit test for simple App. - */ -public class AppTest - extends TestCase -{ - /** - * Create the test case - * - * @param testName name of the test case - */ - public AppTest( String testName ) - { - super( testName ); - } - - /** - * @return the suite of tests being tested - */ - public static Test suite() - { - return new TestSuite( AppTest.class ); - } - - /** - * Rigourous Test :-) - */ - public void testApp() - { - assertTrue( true ); - - } -} diff --git a/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/MetadataUtil.java b/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/MetadataUtil.java new file mode 100644 index 000000000000..ab86da503ec5 --- /dev/null +++ b/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/MetadataUtil.java @@ -0,0 +1,79 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.elasticsearch; + +import com.facebook.presto.spi.type.StandardTypes; +import com.facebook.presto.spi.type.Type; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.deser.std.FromStringDeserializer; +import com.google.common.collect.ImmutableMap; +import io.airlift.json.JsonCodec; +import io.airlift.json.JsonCodecFactory; +import io.airlift.json.ObjectMapperProvider; + +import java.util.List; +import java.util.Map; + +import static com.facebook.presto.spi.type.BigintType.BIGINT; +import static com.facebook.presto.spi.type.BooleanType.BOOLEAN; +import static com.facebook.presto.spi.type.DoubleType.DOUBLE; +import static com.facebook.presto.spi.type.VarcharType.VARCHAR; +import static io.airlift.json.JsonCodec.listJsonCodec; +import static java.util.Locale.ENGLISH; + +public final class MetadataUtil +{ + private MetadataUtil() + { + } + + public static final JsonCodec>> CATALOG_CODEC; + public static final JsonCodec TABLE_CODEC; + public static final JsonCodec COLUMN_CODEC; + + static { + ObjectMapperProvider objectMapperProvider = new ObjectMapperProvider(); + objectMapperProvider.setJsonDeserializers(ImmutableMap., JsonDeserializer>of(Type.class, new ElasticsearchTypeDeserializer())); + JsonCodecFactory codecFactory = new JsonCodecFactory(objectMapperProvider); + CATALOG_CODEC = codecFactory.mapJsonCodec(String.class, listJsonCodec(ElasticsearchTable.class)); + TABLE_CODEC = codecFactory.jsonCodec(ElasticsearchTable.class); + COLUMN_CODEC = codecFactory.jsonCodec(ElasticsearchColumnHandle.class); + } + + public static final class ElasticsearchTypeDeserializer + extends FromStringDeserializer + { + private final Map types = ImmutableMap.of( + StandardTypes.BOOLEAN, BOOLEAN, + StandardTypes.BIGINT, BIGINT, + StandardTypes.DOUBLE, DOUBLE, + StandardTypes.VARCHAR, VARCHAR); + + public ElasticsearchTypeDeserializer() + { + super(Type.class); + } + + @Override + protected Type _deserialize(String value, DeserializationContext context) + { + Type type = types.get(value.toLowerCase(ENGLISH)); + if (type == null) { + throw new IllegalArgumentException(String.valueOf("Unknown type " + value)); + } + return type; + } + } +} diff --git a/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/TestElasticsearchClient.java b/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/TestElasticsearchClient.java new file mode 100644 index 000000000000..4f8102fb25a2 --- /dev/null +++ b/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/TestElasticsearchClient.java @@ -0,0 +1,49 @@ +package com.facebook.presto.elasticsearch; + +import com.google.common.io.Resources; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.net.URI; +import java.net.URL; +import java.util.Map; + +import static com.facebook.presto.elasticsearch.MetadataUtil.CATALOG_CODEC; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + +public class TestElasticsearchClient +{ + private static final String ES_SCHEMA = "be"; + private static final String ES_TBL_1 = "fancyPantsTable"; + private ElasticsearchClient client; + + @BeforeClass + public void setUp() + throws Exception + { + URL metadataUrl = Resources.getResource(TestElasticsearchClient.class, "/example-metadata.json"); + assertNotNull(metadataUrl, "metadataUrl is null"); + URI metadata = metadataUrl.toURI(); + + client = new ElasticsearchClient(new ElasticsearchConfig().setMetadata(metadata), CATALOG_CODEC); + } + + @Test + public void testSchema() + throws Exception + { + Map> schemas = client.updateSchemas(); + assertNotNull(schemas); + } + + @Test + public void testTable() + throws Exception + { + ElasticsearchTable table = client.getTable(ES_SCHEMA, ES_TBL_1); + assertNotNull(table); + assertNotNull(table.getColumns()); + assertEquals(table.getColumns().size(), 24); + } +} diff --git a/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/TestElasticsearchMetadata.java b/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/TestElasticsearchMetadata.java new file mode 100644 index 000000000000..fb1f447c5064 --- /dev/null +++ b/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/TestElasticsearchMetadata.java @@ -0,0 +1,63 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.elasticsearch; + +import com.facebook.presto.spi.SchemaTableName; +import com.google.common.collect.ImmutableSet; +import com.google.common.io.Resources; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.net.URI; +import java.net.URL; + +import static com.facebook.presto.testing.TestingConnectorSession.SESSION; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + +@Test(singleThreaded = true) +public class TestElasticsearchMetadata +{ + private static final String CONNECTOR_ID = "TEST"; + private static final String ES_SCHEMA = "be"; + private static final String ES_TBL_1 = "fancyPantsTable"; + private static final ElasticsearchTableHandle ES_TABLE_HANDLE = new ElasticsearchTableHandle(CONNECTOR_ID, ES_SCHEMA, ES_TBL_1); + private ElasticsearchMetadata metadata; + private URI metadataUri; + + @BeforeMethod + public void setUp() + throws Exception + { + URL metadataUrl = Resources.getResource(TestElasticsearchClient.class, "/example-metadata.json"); + assertNotNull(metadataUrl, "metadataUrl is null"); + metadataUri = metadataUrl.toURI(); + ElasticsearchClient client = new ElasticsearchClient(new ElasticsearchConfig().setMetadata(metadataUri), MetadataUtil.CATALOG_CODEC); + metadata = new ElasticsearchMetadata(new ElasticsearchConnectorId(CONNECTOR_ID), client); + } + + @Test + public void testListSchemaNames() + { + assertEquals(metadata.listSchemaNames(SESSION), ImmutableSet.of("be")); + } + + @Test + public void testGetTableHandle() + { + assertEquals( + metadata.getTableHandle(SESSION, new SchemaTableName(ES_SCHEMA, ES_TBL_1)), + ES_TABLE_HANDLE); + } +} diff --git a/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/TestElasticsearchRecordCursor.java b/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/TestElasticsearchRecordCursor.java new file mode 100644 index 000000000000..af35dc9736ef --- /dev/null +++ b/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/TestElasticsearchRecordCursor.java @@ -0,0 +1,66 @@ +package com.facebook.presto.elasticsearch; + +import com.facebook.presto.spi.ColumnMetadata; +import com.google.common.io.Resources; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.net.URI; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; + +import static com.facebook.presto.elasticsearch.MetadataUtil.CATALOG_CODEC; +import static org.testng.Assert.assertNotNull; + +public class TestElasticsearchRecordCursor +{ + private static final String ES_SCHEMA = "be"; + private static final String ES_TBL_1 = "fancyPantsTable"; + private static final String CONNECTOR_ID = "elasticsearch"; + + private ElasticsearchClient client; + + @BeforeClass + public void setUp() + throws Exception + { + URL metadataUrl = Resources.getResource(TestElasticsearchClient.class, "/example-metadata.json"); + assertNotNull(metadataUrl, "metadataUrl is null"); + URI metadata = metadataUrl.toURI(); + + client = new ElasticsearchClient(new ElasticsearchConfig().setMetadata(metadata), CATALOG_CODEC); + } + + @Test + public void testCursor() + { + ElasticsearchTable table = client.getTable(ES_SCHEMA, ES_TBL_1); + assertNotNull(table); + + ElasticsearchRecordCursor elasticsearchRecordCursor = + new ElasticsearchRecordCursor( + getElasticsearchColumnHandles(table), + table.getSources().get(0)); + + assertNotNull(elasticsearchRecordCursor); + } + + private List getElasticsearchColumnHandles(ElasticsearchTable table) + { + List columnHandles = new ArrayList(); + int index = 0; + for (ColumnMetadata column : table.getColumnsMetadata()) { + ElasticsearchColumnMetadata esColumn = (ElasticsearchColumnMetadata) column; + + columnHandles.add(new ElasticsearchColumnHandle( + CONNECTOR_ID, + column.getName(), + column.getType(), + esColumn.getJsonPath(), + esColumn.getJsonType(), + index++)); + } + return columnHandles; + } +} diff --git a/presto-elasticsearch/src/test/resources/example-metadata.json b/presto-elasticsearch/src/test/resources/example-metadata.json new file mode 100644 index 000000000000..c5960ffe9e25 --- /dev/null +++ b/presto-elasticsearch/src/test/resources/example-metadata.json @@ -0,0 +1,15 @@ +{ + "be": [ + { + "name": "fancyPantsTable", + "sources": [ + { + "hostaddress": "172.17.42.1", + "port": 9300, + "clusterName": "es-routes", + "type": "route" + } + ] + } + ] +} \ No newline at end of file diff --git a/presto-example-http/src/main/java/com/facebook/presto/example/ExampleConnectorFactory.java b/presto-example-http/src/main/java/com/facebook/presto/example/ExampleConnectorFactory.java index 0d3fcd037b51..34b0dd9d4b59 100644 --- a/presto-example-http/src/main/java/com/facebook/presto/example/ExampleConnectorFactory.java +++ b/presto-example-http/src/main/java/com/facebook/presto/example/ExampleConnectorFactory.java @@ -56,7 +56,7 @@ public Connector create(final String connectorId, Map requiredCo new JsonModule(), new ExampleModule(connectorId, typeManager)); - Injector injector = app + Injector injector = app .strictConfig() .doNotInitializeLogging() .setRequiredConfigurationProperties(requiredConfig) diff --git a/presto-server/src/main/provisio/presto.xml b/presto-server/src/main/provisio/presto.xml index 5d37ab286ca7..30a13cc905db 100644 --- a/presto-server/src/main/provisio/presto.xml +++ b/presto-server/src/main/provisio/presto.xml @@ -38,6 +38,12 @@ + + + + + +