From 36b7b01e5653c4bd487e2e6642c94960deb2572c Mon Sep 17 00:00:00 2001 From: Vamsi Manohar Date: Thu, 13 Jul 2023 16:49:37 -0700 Subject: [PATCH] Change query range response structure Signed-off-by: Vamsi Manohar --- .../org/opensearch/sql/ppl/ExplainIT.java | 10 +++ .../ppl/explain_query_range.json | 9 +++ ...faultQueryRangeFunctionResponseHandle.java | 75 +++++++++---------- .../response/PrometheusResponse.java | 25 +------ .../storage/PrometheusMetricScan.java | 7 +- .../storage/PrometheusMetricTable.java | 4 - .../storage/PrometheusMetricScanTest.java | 33 -------- .../storage/PrometheusMetricTableTest.java | 26 ------- 8 files changed, 59 insertions(+), 130 deletions(-) create mode 100644 integ-test/src/test/resources/expectedOutput/ppl/explain_query_range.json diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/ExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/ExplainIT.java index 1a785e9074..d41b2cde8a 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/ExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/ExplainIT.java @@ -81,6 +81,16 @@ public void testSortPushDownExplain() throws Exception { ); } + @Test + public void explainQueryRange() throws Exception { + String expected = loadFromFile("expectedOutput/ppl/explain_query_range.json"); + assertJsonEquals( + expected, + explainQueryToString("source = my_prometheus" + + ".query_range('prometheus_http_requests_total',1689281439,1689291439,14)") + ); + } + String loadFromFile(String filename) throws Exception { URI uri = Resources.getResource(filename).toURI(); return new String(Files.readAllBytes(Paths.get(uri))); diff --git a/integ-test/src/test/resources/expectedOutput/ppl/explain_query_range.json b/integ-test/src/test/resources/expectedOutput/ppl/explain_query_range.json new file mode 100644 index 0000000000..bbc00e0c43 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/ppl/explain_query_range.json @@ -0,0 +1,9 @@ +{ + "root": { + "name": "QueryRangeFunctionTableScanOperator", + "description": { + "request": "query_range(prometheus_http_requests_total, 1689281439, 1689291439, 14)" + }, + "children": [] + } +} \ No newline at end of file diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/response/DefaultQueryRangeFunctionResponseHandle.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/response/DefaultQueryRangeFunctionResponseHandle.java index 7f261360f7..128fe72359 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/response/DefaultQueryRangeFunctionResponseHandle.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/response/DefaultQueryRangeFunctionResponseHandle.java @@ -5,6 +5,8 @@ package org.opensearch.sql.prometheus.functions.response; +import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.LABELS; +import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.TIMESTAMP; import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.VALUE; import java.time.Instant; @@ -12,9 +14,9 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; -import org.jetbrains.annotations.NotNull; import org.json.JSONArray; import org.json.JSONObject; +import org.opensearch.sql.data.model.ExprCollectionValue; import org.opensearch.sql.data.model.ExprDoubleValue; import org.opensearch.sql.data.model.ExprStringValue; import org.opensearch.sql.data.model.ExprTimestampValue; @@ -22,7 +24,6 @@ import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.data.type.ExprCoreType; import org.opensearch.sql.executor.ExecutionEngine; -import org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants; /** * Default implementation of QueryRangeFunctionResponseHandle. @@ -40,63 +41,61 @@ public class DefaultQueryRangeFunctionResponseHandle implements QueryRangeFuncti */ public DefaultQueryRangeFunctionResponseHandle(JSONObject responseObject) { this.responseObject = responseObject; - constructIteratorAndSchema(); + constructSchema(); + constructIterator(); } - private void constructIteratorAndSchema() { + private void constructIterator() { List result = new ArrayList<>(); - List columnList = new ArrayList<>(); if ("matrix".equals(responseObject.getString("resultType"))) { JSONArray itemArray = responseObject.getJSONArray("result"); for (int i = 0; i < itemArray.length(); i++) { + LinkedHashMap linkedHashMap = new LinkedHashMap<>(); JSONObject item = itemArray.getJSONObject(i); - JSONObject metric = item.getJSONObject("metric"); - JSONArray values = item.getJSONArray("values"); - if (i == 0) { - columnList = getColumnList(metric); - } - for (int j = 0; j < values.length(); j++) { - LinkedHashMap linkedHashMap = - extractRow(metric, values.getJSONArray(j), columnList); - result.add(new ExprTupleValue(linkedHashMap)); - } + linkedHashMap.put(LABELS, extractLabels(item.getJSONObject("metric"))); + extractTimestampAndValues(item.getJSONArray("values"), linkedHashMap); + result.add(new ExprTupleValue(linkedHashMap)); } } else { throw new RuntimeException(String.format("Unexpected Result Type: %s during Prometheus " + "Response Parsing. 'matrix' resultType is expected", responseObject.getString("resultType"))); } - this.schema = new ExecutionEngine.Schema(columnList); this.responseIterator = result.iterator(); } - @NotNull - private static LinkedHashMap extractRow(JSONObject metric, - JSONArray values, List columnList) { - LinkedHashMap linkedHashMap = new LinkedHashMap<>(); - for (ExecutionEngine.Schema.Column column : columnList) { - if (PrometheusFieldConstants.TIMESTAMP.equals(column.getName())) { - linkedHashMap.put(PrometheusFieldConstants.TIMESTAMP, - new ExprTimestampValue(Instant.ofEpochMilli((long) (values.getDouble(0) * 1000)))); - } else if (column.getName().equals(VALUE)) { - linkedHashMap.put(VALUE, new ExprDoubleValue(values.getDouble(1))); - } else { - linkedHashMap.put(column.getName(), - new ExprStringValue(metric.getString(column.getName()))); - } + private static void extractTimestampAndValues(JSONArray values, + LinkedHashMap linkedHashMap) { + List timestampList = new ArrayList<>(); + List valueList = new ArrayList<>(); + for (int j = 0; j < values.length(); j++) { + JSONArray value = values.getJSONArray(j); + timestampList.add(new ExprTimestampValue( + Instant.ofEpochMilli((long) (values.getDouble(0) * 1000)))); + valueList.add(new ExprDoubleValue(value.getDouble(1))); } - return linkedHashMap; + linkedHashMap.put(TIMESTAMP, + new ExprCollectionValue(timestampList)); + linkedHashMap.put(VALUE, new ExprCollectionValue(valueList)); } + private void constructSchema() { + this.schema = new ExecutionEngine.Schema(getColumnList()); + } - private List getColumnList(JSONObject metric) { + private ExprValue extractLabels(JSONObject metric) { + LinkedHashMap labelsMap = new LinkedHashMap<>(); + metric.keySet().forEach(key + -> labelsMap.put(key, new ExprStringValue(metric.getString(key)))); + return new ExprTupleValue(labelsMap); + } + + + private List getColumnList() { List columnList = new ArrayList<>(); - columnList.add(new ExecutionEngine.Schema.Column(PrometheusFieldConstants.TIMESTAMP, - PrometheusFieldConstants.TIMESTAMP, ExprCoreType.TIMESTAMP)); - columnList.add(new ExecutionEngine.Schema.Column(VALUE, VALUE, ExprCoreType.DOUBLE)); - for (String key : metric.keySet()) { - columnList.add(new ExecutionEngine.Schema.Column(key, key, ExprCoreType.STRING)); - } + columnList.add(new ExecutionEngine.Schema.Column(TIMESTAMP, TIMESTAMP, ExprCoreType.ARRAY)); + columnList.add(new ExecutionEngine.Schema.Column(VALUE, VALUE, ExprCoreType.ARRAY)); + columnList.add(new ExecutionEngine.Schema.Column(LABELS, LABELS, ExprCoreType.STRUCT)); return columnList; } diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/response/PrometheusResponse.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/response/PrometheusResponse.java index 331605b1d5..bd9e36ccdc 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/response/PrometheusResponse.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/response/PrometheusResponse.java @@ -35,8 +35,6 @@ public class PrometheusResponse implements Iterable { private final PrometheusResponseFieldNames prometheusResponseFieldNames; - private final Boolean isQueryRangeFunctionScan; - /** * Constructor. * @@ -46,11 +44,9 @@ public class PrometheusResponse implements Iterable { * and timestamp fieldName. */ public PrometheusResponse(JSONObject responseObject, - PrometheusResponseFieldNames prometheusResponseFieldNames, - Boolean isQueryRangeFunctionScan) { + PrometheusResponseFieldNames prometheusResponseFieldNames) { this.responseObject = responseObject; this.prometheusResponseFieldNames = prometheusResponseFieldNames; - this.isQueryRangeFunctionScan = isQueryRangeFunctionScan; } @NonNull @@ -70,24 +66,7 @@ public Iterator iterator() { new ExprTimestampValue(Instant.ofEpochMilli((long) (val.getDouble(0) * 1000)))); linkedHashMap.put(prometheusResponseFieldNames.getValueFieldName(), getValue(val, 1, prometheusResponseFieldNames.getValueType())); - // Concept: - // {\"instance\":\"localhost:9090\",\"__name__\":\"up\",\"job\":\"prometheus\"}" - // This is the label string in the prometheus response. - // Q: how do we map this to columns in a table. - // For queries like source = prometheus.metric_name | .... - // we can get the labels list in prior as we know which metric we are working on. - // In case of commands like source = prometheus.query_range('promQL'); - // Any arbitrary command can be written and we don't know the labels - // in the prometheus response in prior. - // So for PPL like commands...output structure is @value, @timestamp - // and each label is treated as a separate column where as in case of query_range - // function irrespective of promQL, the output structure is - // @value, @timestamp, @labels [jsonfied string of all the labels for a data point] - if (isQueryRangeFunctionScan) { - linkedHashMap.put(LABELS, new ExprStringValue(metric.toString())); - } else { - insertLabels(linkedHashMap, metric); - } + insertLabels(linkedHashMap, metric); result.add(new ExprTupleValue(linkedHashMap)); } } diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusMetricScan.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusMetricScan.java index 8611ae04f1..7f75cb3c07 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusMetricScan.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusMetricScan.java @@ -40,10 +40,6 @@ public class PrometheusMetricScan extends TableScanOperator { private Iterator iterator; - @Setter - @Getter - private Boolean isQueryRangeFunctionScan = Boolean.FALSE; - @Setter private PrometheusResponseFieldNames prometheusResponseFieldNames; @@ -69,8 +65,7 @@ public void open() { JSONObject responseObject = prometheusClient.queryRange( request.getPromQl(), request.getStartTime(), request.getEndTime(), request.getStep()); - return new PrometheusResponse(responseObject, prometheusResponseFieldNames, - isQueryRangeFunctionScan).iterator(); + return new PrometheusResponse(responseObject, prometheusResponseFieldNames).iterator(); } catch (IOException e) { LOG.error(e.getMessage()); throw new RuntimeException("Error fetching data from prometheus server. " + e.getMessage()); diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTable.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTable.java index a03d69bc41..b3b63327d0 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTable.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTable.java @@ -97,10 +97,6 @@ public Map getFieldTypes() { public PhysicalPlan implement(LogicalPlan plan) { PrometheusMetricScan metricScan = new PrometheusMetricScan(prometheusClient); - if (prometheusQueryRequest != null) { - metricScan.setRequest(prometheusQueryRequest); - metricScan.setIsQueryRangeFunctionScan(Boolean.TRUE); - } return plan.accept(new PrometheusDefaultImplementor(), metricScan); } diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusMetricScanTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusMetricScanTest.java index cb70e9e064..68e03c758c 100644 --- a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusMetricScanTest.java +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusMetricScanTest.java @@ -209,39 +209,6 @@ void testQueryResponseIteratorWithGivenPrometheusResponseWithBackQuotedFieldName Assertions.assertFalse(prometheusMetricScan.hasNext()); } - @Test - @SneakyThrows - void testQueryResponseIteratorForQueryRangeFunction() { - PrometheusMetricScan prometheusMetricScan = new PrometheusMetricScan(prometheusClient); - prometheusMetricScan.setIsQueryRangeFunctionScan(Boolean.TRUE); - prometheusMetricScan.getRequest().setPromQl(QUERY); - prometheusMetricScan.getRequest().setStartTime(STARTTIME); - prometheusMetricScan.getRequest().setEndTime(ENDTIME); - prometheusMetricScan.getRequest().setStep(STEP); - - when(prometheusClient.queryRange(any(), any(), any(), any())) - .thenReturn(new JSONObject(getJson("query_range_result.json"))); - prometheusMetricScan.open(); - Assertions.assertTrue(prometheusMetricScan.hasNext()); - ExprTupleValue firstRow = new ExprTupleValue(new LinkedHashMap<>() {{ - put(TIMESTAMP, new ExprTimestampValue(Instant.ofEpochMilli(1435781430781L))); - put(VALUE, new ExprLongValue(1)); - put(LABELS, new ExprStringValue( - "{\"instance\":\"localhost:9090\",\"__name__\":\"up\",\"job\":\"prometheus\"}")); - } - }); - assertEquals(firstRow, prometheusMetricScan.next()); - Assertions.assertTrue(prometheusMetricScan.hasNext()); - ExprTupleValue secondRow = new ExprTupleValue(new LinkedHashMap<>() {{ - put(TIMESTAMP, new ExprTimestampValue(Instant.ofEpochMilli(1435781430781L))); - put(VALUE, new ExprLongValue(0)); - put(LABELS, new ExprStringValue( - "{\"instance\":\"localhost:9091\",\"__name__\":\"up\",\"job\":\"node\"}")); - } - }); - assertEquals(secondRow, prometheusMetricScan.next()); - Assertions.assertFalse(prometheusMetricScan.hasNext()); - } @Test @SneakyThrows diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTableTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTableTest.java index de95b2bd64..d43c38fc68 100644 --- a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTableTest.java +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTableTest.java @@ -111,32 +111,6 @@ void testGetFieldTypesFromPrometheusQueryRequest() { assertNull(prometheusMetricTable.getMetricName()); } - @Test - void testImplementWithQueryRangeFunction() { - PrometheusQueryRequest prometheusQueryRequest = new PrometheusQueryRequest(); - prometheusQueryRequest.setPromQl("test"); - prometheusQueryRequest.setStep("15m"); - PrometheusMetricTable prometheusMetricTable = - new PrometheusMetricTable(client, prometheusQueryRequest); - List finalProjectList = new ArrayList<>(); - finalProjectList.add(DSL.named(VALUE, DSL.ref(VALUE, STRING))); - finalProjectList.add(DSL.named(TIMESTAMP, DSL.ref(TIMESTAMP, ExprCoreType.TIMESTAMP))); - PhysicalPlan plan = prometheusMetricTable.implement( - project(relation("query_range", prometheusMetricTable), - finalProjectList, null)); - - - assertTrue(plan instanceof ProjectOperator); - List projectList = ((ProjectOperator) plan).getProjectList(); - List outputFields - = projectList.stream().map(NamedExpression::getName).collect(Collectors.toList()); - assertEquals(List.of(VALUE, TIMESTAMP), outputFields); - assertTrue(((ProjectOperator) plan).getInput() instanceof PrometheusMetricScan); - PrometheusMetricScan prometheusMetricScan = - (PrometheusMetricScan) ((ProjectOperator) plan).getInput(); - assertEquals(prometheusQueryRequest, prometheusMetricScan.getRequest()); - } - @Test void testImplementWithBasicMetricQuery() { PrometheusMetricTable prometheusMetricTable =