Skip to content

Commit

Permalink
Change buildQueryFromNodes to return list of queries, added ArcadeDBC…
Browse files Browse the repository at this point in the history
…lientService
  • Loading branch information
mattyb149 committed Sep 19, 2023
1 parent b459e5e commit f25f585
Show file tree
Hide file tree
Showing 15 changed files with 455 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,36 +23,45 @@
import java.util.Map;

public interface GraphClientService extends ControllerService {
String NODES_CREATED= "graph.nodes.created";
String NODES_CREATED = "graph.nodes.created";
String RELATIONS_CREATED = "graph.relations.created";
String LABELS_ADDED = "graph.labels.added";
String NODES_DELETED = "graph.nodes.deleted";
String RELATIONS_DELETED = "graph.relations.deleted";
String PROPERTIES_SET = "graph.properties.set";
String ROWS_RETURNED = "graph.rows.returned";

// Supported query languages (service-dependent)
String SQL = "sql";
String SQL_SCRIPT = "sqlscript";
String GRAPHQL = "graphql";
String CYPHER = "cypher";
String GREMLIN = "gremlin";
String MONGO = "mongo";

/**
* Executes the specified query using the client service, and handles and returned results by calling the specified callback
*
* @param query The query to execute
* @param query The query to execute
* @param parameters A Map of parameter values to use in the query and/or execution
* @param handler The callback handler invoked with any returned results
* @param handler The callback handler invoked with any returned results
* @return Any results returned after handling the query response
*/
Map<String, String> executeQuery(String query, Map<String, Object> parameters, GraphQueryResultCallback handler);

/**
* Returns the URL used to submit the query
*
* @return the URL (as a string) used to submit the query
*/
String getTransitUrl();

/**
* Builds a list of client-specific queries based on a list of property map nodes. Usually followed by a call to executeQuery
*
* @param nodeList A List of Maps corresponding to property map nodes
* @param nodeList A List of Maps corresponding to property map nodes
* @param parameters A Map of parameter values to use in the query and/or execution
* @return A List of queries each corresponding to an operation on the node list
*/
List<String> buildQueryFromNodes(List<Map<String, Object>> nodeList, Map<String, Object> parameters);
List<GraphQuery> buildQueryFromNodes(List<Map<String, Object>> nodeList, Map<String, Object> parameters);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.graph;

import java.util.Objects;

public class GraphQuery {

private final String query;
private final String language;

public GraphQuery(final String query, final String language) {
this.language = language;
this.query = query;
}

public String getQuery() {
return query;
}

public String getLanguage() {
return language;
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
GraphQuery that = (GraphQuery) o;
return Objects.equals(query, that.query) && Objects.equals(language, that.language);
}

@Override
public int hashCode() {
return Objects.hash(query, language);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.graph.GraphClientService;
import org.apache.nifi.graph.GraphQuery;
import org.apache.nifi.graph.GraphQueryResultCallback;

import java.util.ArrayList;
Expand Down Expand Up @@ -57,19 +58,19 @@ public String getTransitUrl() {
}

@Override
public List<String> buildQueryFromNodes(List<Map<String, Object>> eventList, Map<String, Object> parameters) {
public List<GraphQuery> buildQueryFromNodes(List<Map<String, Object>> eventList, Map<String, Object> parameters) {
// Build queries from event list
List<String> queryList = new ArrayList<>(eventList.size());
StringBuilder queryBuilder = new StringBuilder();
List<GraphQuery> queryList = new ArrayList<>(eventList.size());
for (Map<String,Object> eventNode : eventList) {
StringBuilder queryBuilder = new StringBuilder();
queryBuilder.append("MERGE (p:NiFiProvenanceEvent {");
List<String> propertyDefinitions = new ArrayList<>(eventNode.entrySet().size());
for (Map.Entry<String,Object> properties : eventNode.entrySet()) {
propertyDefinitions.add(properties.getKey() + ": \"" + properties.getValue() + "\"");
}
queryBuilder.append(String.join(",", propertyDefinitions));
queryBuilder.append("})");
queryList.add(queryBuilder.toString());
queryList.add(new GraphQuery(queryBuilder.toString(), GraphClientService.CYPHER));
}
return queryList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.graph.GraphClientService;
import org.apache.nifi.graph.GraphQuery;
import org.apache.nifi.graph.GraphQueryResultCallback;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.tinkerpop.gremlin.structure.Graph;
Expand All @@ -30,7 +31,7 @@
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
import java.util.AbstractMap.SimpleEntry;
import java.util.Collections;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -126,10 +127,27 @@ public String getTransitUrl() {
}

@Override
public List<String> buildQueryFromNodes(List<Map<String, Object>> eventList, Map<String, Object> parameters) {
public List<GraphQuery> buildQueryFromNodes(List<Map<String, Object>> eventList, Map<String, Object> parameters) {
// Build query from event list
StringBuilder queryBuilder = new StringBuilder();
// Build queries from event list
List<GraphQuery> queryList = new ArrayList<>(eventList.size());
for (Map<String, Object> eventNode : eventList) {
StringBuilder queryBuilder = new StringBuilder();
queryBuilder.append("g.V()has(\"NiFiProvenanceEvent\", \"");
queryBuilder.append("eventId\", \"");
queryBuilder.append(eventNode.get("eventId"));
queryBuilder.append("\").fold().coalesce(unfold(), addV(\"NiFiProvenanceEvent\")");

return Collections.emptyList(); // TODO
for (Map.Entry<String, Object> properties : eventNode.entrySet()) {
queryBuilder.append(".property(\"");
queryBuilder.append(properties.getKey());
queryBuilder.append("\", \"");
queryBuilder.append(properties.getValue());
queryBuilder.append("\")");
}
queryBuilder.append(")");
queryList.add(new GraphQuery(queryBuilder.toString(), GraphClientService.GREMLIN));
}
return queryList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;
import java.util.Collections;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -101,10 +101,27 @@ public Graph getGraph() {
}

@Override
public List<String> buildQueryFromNodes(List<Map<String, Object>> eventList, Map<String, Object> parameters) {
public List<GraphQuery> buildQueryFromNodes(List<Map<String, Object>> eventList, Map<String, Object> parameters) {
// Build query from event list
StringBuilder queryBuilder = new StringBuilder();
// Build queries from event list
List<GraphQuery> queryList = new ArrayList<>(eventList.size());
for (Map<String, Object> eventNode : eventList) {
StringBuilder queryBuilder = new StringBuilder();
queryBuilder.append("g.V()has(\"NiFiProvenanceEvent\", \"");
queryBuilder.append("eventId\", \"");
queryBuilder.append(eventNode.get("eventId"));
queryBuilder.append("\").fold().coalesce(unfold(), addV(\"NiFiProvenanceEvent\")");

return Collections.emptyList(); // TODO
for (Map.Entry<String, Object> properties : eventNode.entrySet()) {
queryBuilder.append(".property(\"");
queryBuilder.append(properties.getKey());
queryBuilder.append("\", \"");
queryBuilder.append(properties.getValue());
queryBuilder.append("\")");
}
queryBuilder.append(")");
queryList.add(new GraphQuery(queryBuilder.toString(), GraphClientService.GREMLIN));
}
return queryList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,9 @@ public String getTransitUrl() {
}

@Override
public List<String> buildQueryFromNodes(List<Map<String, Object>> eventList, Map<String, Object> parameters) {
public List<GraphQuery> buildQueryFromNodes(List<Map<String, Object>> eventList, Map<String, Object> parameters) {
// Build queries from event list
List<String> queryList = new ArrayList<>(eventList.size());
List<GraphQuery> queryList = new ArrayList<>(eventList.size());
for (Map<String,Object> eventNode : eventList) {
StringBuilder queryBuilder = new StringBuilder();
queryBuilder.append("MERGE (p:NiFiProvenanceEvent {");
Expand All @@ -328,7 +328,7 @@ public List<String> buildQueryFromNodes(List<Map<String, Object>> eventList, Map
}
queryBuilder.append(String.join(",", propertyDefinitions));
queryBuilder.append("})");
queryList.add(queryBuilder.toString());
queryList.add(new GraphQuery(queryBuilder.toString(), GraphClientService.GREMLIN));
}
return queryList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,9 @@ public String getTransitUrl() {
}

@Override
public List<String> buildQueryFromNodes(List<Map<String, Object>> eventList, Map<String, Object> parameters) {
public List<GraphQuery> buildQueryFromNodes(List<Map<String, Object>> eventList, Map<String, Object> parameters) {
// Build queries from event list
List<String> queryList = new ArrayList<>(eventList.size());
List<GraphQuery> queryList = new ArrayList<>(eventList.size());
for (Map<String,Object> eventNode : eventList) {
StringBuilder queryBuilder = new StringBuilder();
queryBuilder.append("MERGE (p:NiFiProvenanceEvent {");
Expand All @@ -311,7 +311,7 @@ public List<String> buildQueryFromNodes(List<Map<String, Object>> eventList, Map
}
queryBuilder.append(String.join(",", propertyDefinitions));
queryBuilder.append("})");
queryList.add(queryBuilder.toString());
queryList.add(new GraphQuery(queryBuilder.toString(), GraphClientService.CYPHER));
}
return queryList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,14 +198,16 @@ public void testBuildQueryFromNodes() {
node3.put("state", "FL");
nodeList.add(node3);

final List<String> expectedQuery = Arrays.asList("MERGE (p:NiFiProvenanceEvent {name: \"Matt\"})",
"MERGE (p:NiFiProvenanceEvent {color: \"blue\",name: \"Joe\",age: \"40\"})",
"MERGE (p:NiFiProvenanceEvent {name: \"Mary\",state: \"FL\",age: \"40\"})");
final List<String> queryList = clientService.buildQueryFromNodes(nodeList, new HashMap<>());
final List<GraphQuery> expectedQuery = Arrays.asList(
new GraphQuery("MERGE (p:NiFiProvenanceEvent {name: \"Matt\"})", GraphClientService.CYPHER),
new GraphQuery("MERGE (p:NiFiProvenanceEvent {color: \"blue\",name: \"Joe\",age: \"40\"})", GraphClientService.CYPHER),
new GraphQuery("MERGE (p:NiFiProvenanceEvent {name: \"Mary\",state: \"FL\",age: \"40\"})", GraphClientService.CYPHER)
);
final List<GraphQuery> queryList = clientService.buildQueryFromNodes(nodeList, new HashMap<>());
assertEquals(expectedQuery, queryList);
final List<Map<String, Object>> result = new ArrayList<>();
for (String query : queryList) {
Map<String, String> attributes = clientService.executeQuery(query, new HashMap<>(), (record, hasMore) -> result.add(record));
for (GraphQuery query : queryList) {
Map<String, String> attributes = clientService.executeQuery(query.getQuery(), new HashMap<>(), (record, hasMore) -> result.add(record));
assertEquals("0", attributes.get(GraphClientService.LABELS_ADDED));
assertEquals("1", attributes.get(GraphClientService.NODES_CREATED));
assertEquals("0", attributes.get(GraphClientService.NODES_DELETED));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class ITNeo4JCypherExecutorNoSSL {
protected String password = "testing1234";

private GraphClientService clientService;
private GraphQueryResultCallback EMPTY_CALLBACK = (record, hasMore) -> {};
private static final GraphQueryResultCallback EMPTY_CALLBACK = (record, hasMore) -> {};

@BeforeEach
public void setUp() throws Exception {
Expand Down Expand Up @@ -180,14 +180,16 @@ public void testBuildQueryFromNodes() {
node3.put("state", "FL");
nodeList.add(node3);

final List<String> expectedQuery = Arrays.asList("MERGE (p:NiFiProvenanceEvent {name: \"Matt\"})",
"MERGE (p:NiFiProvenanceEvent {color: \"blue\",name: \"Joe\",age: \"40\"})",
"MERGE (p:NiFiProvenanceEvent {name: \"Mary\",state: \"FL\",age: \"40\"})");
final List<String> queryList = clientService.buildQueryFromNodes(nodeList, new HashMap<>());
final List<GraphQuery> expectedQuery = Arrays.asList(
new GraphQuery("MERGE (p:NiFiProvenanceEvent {name: \"Matt\"})", GraphClientService.CYPHER),
new GraphQuery("MERGE (p:NiFiProvenanceEvent {color: \"blue\",name: \"Joe\",age: \"40\"})", GraphClientService.CYPHER),
new GraphQuery("MERGE (p:NiFiProvenanceEvent {name: \"Mary\",state: \"FL\",age: \"40\"})", GraphClientService.CYPHER)
);
final List<GraphQuery> queryList = clientService.buildQueryFromNodes(nodeList, new HashMap<>());
assertEquals(expectedQuery, queryList);
final List<Map<String, Object>> result = new ArrayList<>();
for (String query : queryList) {
Map<String, String> attributes = clientService.executeQuery(query, new HashMap<>(), (record, hasMore) -> result.add(record));
for (GraphQuery query : queryList) {
Map<String, String> attributes = clientService.executeQuery(query.getQuery(), new HashMap<>(), (record, hasMore) -> result.add(record));
assertEquals("0", attributes.get(GraphClientService.LABELS_ADDED));
assertEquals("1", attributes.get(GraphClientService.NODES_CREATED));
assertEquals("0", attributes.get(GraphClientService.NODES_DELETED));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,12 @@ public void testBuildQueryFromNodes() {
node3.put("state", "FL");
nodeList.add(node3);

final List<String> expectedQueryList = Arrays.asList("MERGE (p:NiFiProvenanceEvent {name: \"Matt\"})",
"MERGE (p:NiFiProvenanceEvent {color: \"blue\",name: \"Joe\",age: \"40\"})",
"MERGE (p:NiFiProvenanceEvent {name: \"Mary\",state: \"FL\",age: \"40\"})");
final List<String> queryList = clientService.buildQueryFromNodes(nodeList, new HashMap<>());
final List<GraphQuery> expectedQueryList = Arrays.asList(
new GraphQuery("MERGE (p:NiFiProvenanceEvent {name: \"Matt\"})", GraphClientService.CYPHER),
new GraphQuery("MERGE (p:NiFiProvenanceEvent {color: \"blue\",name: \"Joe\",age: \"40\"})", GraphClientService.CYPHER),
new GraphQuery("MERGE (p:NiFiProvenanceEvent {name: \"Mary\",state: \"FL\",age: \"40\"})", GraphClientService.CYPHER)
);
final List<GraphQuery> queryList = clientService.buildQueryFromNodes(nodeList, new HashMap<>());
assertEquals(expectedQueryList, queryList);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@
<artifactId>gremlin-driver</artifactId>
<version>${gremlin.version}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-client-provider-api</artifactId>
<scope>compile</scope>
</dependency>
</dependencies>

<build>
Expand Down
Loading

0 comments on commit f25f585

Please sign in to comment.