Skip to content

Commit

Permalink
Merge branch 'localmain' into dev-metafields-set-routing-shard
Browse files Browse the repository at this point in the history
Signed-off-by: acarbonetto <[email protected]>
  • Loading branch information
acarbonetto committed Jul 27, 2023
2 parents 1287139 + 430d7a9 commit 54d5e7b
Show file tree
Hide file tree
Showing 37 changed files with 1,506 additions and 214 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public Pair<FunctionSignature, FunctionBuilder> resolve(
FunctionSignature unresolvedSignature) {
FunctionName functionName = FunctionName.of("query_range");
FunctionSignature functionSignature =
new FunctionSignature(functionName, List.of(STRING, LONG, LONG, LONG));
new FunctionSignature(functionName, List.of(STRING, LONG, LONG, STRING));
return Pair.of(
functionSignature,
(functionProperties, args) ->
Expand Down
79 changes: 73 additions & 6 deletions docs/user/ppl/admin/prometheus_connector.rst
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,11 @@ PromQL Support for prometheus Connector

`query_range` Table Function
----------------------------
Prometheus connector offers `query_range` table function. This table function can be used to query metrics in a specific time range using promQL.
The function takes inputs similar to parameters mentioned for query range api mentioned here: https://prometheus.io/docs/prometheus/latest/querying/api/
Arguments should be either passed by name or positionArguments should be either passed by name or position.
`source=my_prometheus.query_range('prometheus_http_requests_total', 1686694425, 1686700130, 14)`
or
`source=my_prometheus.query_range(query='prometheus_http_requests_total', starttime=1686694425, endtime=1686700130, step=14)`
* Prometheus connector offers `query_range` table function. This table function can be used to query metrics in a specific time range using promQL.
* The function takes inputs similar to parameters mentioned for query range api mentioned here: https://prometheus.io/docs/prometheus/latest/querying/api/
* Arguments should be either passed by name or positionArguments should be either passed by name or position.
- `source=my_prometheus.query_range('prometheus_http_requests_total', 1686694425, 1686700130, 14)`
- `source=my_prometheus.query_range(query='prometheus_http_requests_total', starttime=1686694425, endtime=1686700130, step=14)`
Example::

> source=my_prometheus.query_range('prometheus_http_requests_total', 1686694425, 1686700130, 14)
Expand All @@ -210,3 +209,71 @@ Example::
| 9 | "2022-11-03 07:18:54" | "/-/promql" | 400 | 192.15.2.1 | prometheus |
| 11 | "2022-11-03 07:18:64" |"/-/metrics" | 500 | 192.15.2.1 | prometheus |
+------------+------------------------+--------------------------------+---------------+-------------+-------------+


Prometheus Connector Table Functions
==========================================

`query_exemplars` Table Function
----------------------------
* This table function can be used to fetch exemplars of a query in a specific time range.
* The function takes inputs similar to parameters mentioned for query exemplars api mentioned here: https://prometheus.io/docs/prometheus/latest/querying/api/
* Arguments should be either passed by name or positionArguments should be either passed by name or position.
- `source=my_prometheus.query_exemplars('prometheus_http_requests_total', 1686694425, 1686700130)`
- `source=my_prometheus.query_exemplars(query='prometheus_http_requests_total', starttime=1686694425, endtime=1686700130)`
Example::

> source=my_prometheus.query_exemplars('prometheus_http_requests_total', 1686694425, 1686700130)
"schema": [
{
"name": "seriesLabels",
"type": "struct"
},
{
"name": "exemplars",
"type": "array"
}
],
"datarows": [
[
{
"instance": "localhost:8090",
"__name__": "test_exemplar_metric_total",
"service": "bar",
"job": "prometheus"
},
[
{
"labels": {
"traceID": "EpTxMJ40fUus7aGY"
},
"timestamp": "2020-09-14 15:22:25.479",
"value": 6.0
}
]
],
[
{
"instance": "localhost:8090",
"__name__": "test_exemplar_metric_total",
"service": "foo",
"job": "prometheus"
},
[
{
"labels": {
"traceID": "Olp9XHlq763ccsfa"
},
"timestamp": "2020-09-14 15:22:35.479",
"value": 19.0
},
{
"labels": {
"traceID": "hCtjygkIHwAN9vs4"
},
"timestamp": "2020-09-14 15:22:45.489",
"value": 20.0
}
]
]
]
2 changes: 1 addition & 1 deletion integ-test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ apply plugin: 'java'
apply plugin: 'io.freefair.lombok'
apply plugin: 'com.wiredforcode.spawn'

String baseVersion = "2.9.0"
String baseVersion = "2.10.0"
String bwcVersion = baseVersion + ".0";
String baseName = "sqlBwcCluster"
String bwcFilePath = "src/test/resources/bwc/"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.nio.file.Paths;
import java.text.SimpleDateFormat;
import java.util.Date;
import lombok.Data;
import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
import org.json.JSONArray;
Expand Down Expand Up @@ -231,11 +230,11 @@ public void testQueryRange() {
long currentTimestamp = new Date().getTime();
JSONObject response =
executeQuery("source=my_prometheus.query_range('prometheus_http_requests_total',"
+ ((currentTimestamp/1000)-3600) + "," + currentTimestamp/1000 + ", " + 14 + ")" );
+ ((currentTimestamp/1000)-3600) + "," + currentTimestamp/1000 + ", " + "'14'" + ")" );
verifySchema(response,
schema(LABELS, "struct"),
schema(VALUE, "array"),
schema(TIMESTAMP, "array"),
schema(LABELS, "struct"));
schema(TIMESTAMP, "array"));
Assertions.assertTrue(response.getInt("size") > 0);
}

Expand All @@ -249,8 +248,20 @@ public void explainQueryRange() throws Exception {
);
}

@Test
public void testExplainForQueryExemplars() throws Exception {
String expected = loadFromFile("expectedOutput/ppl/explain_query_exemplars.json");
assertJsonEquals(
expected,
explainQueryToString("source = my_prometheus."
+ "query_exemplars('app_ads_ad_requests_total',1689228292,1689232299)")
);
}

String loadFromFile(String filename) throws Exception {
URI uri = Resources.getResource(filename).toURI();
return new String(Files.readAllBytes(Paths.get(uri)));
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"root": {
"name": "QueryExemplarsFunctionTableScanOperator",
"description": {
"request": "query_exemplars(app_ads_ad_requests_total, 1689228292, 1689232299)"
},
"children": []
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.json.JSONArray;
import org.json.JSONObject;
import org.opensearch.sql.prometheus.request.system.model.MetricMetadata;

Expand All @@ -18,4 +19,6 @@ public interface PrometheusClient {
List<String> getLabels(String metricName) throws IOException;

Map<String, List<MetricMetadata>> getAllMetrics() throws IOException;

JSONArray queryExemplars(String query, Long start, Long end) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,20 @@ public Map<String, List<MetricMetadata>> getAllMetrics() throws IOException {
return new ObjectMapper().readValue(jsonObject.getJSONObject("data").toString(), typeRef);
}

@Override
public JSONArray queryExemplars(String query, Long start, Long end) throws IOException {
String queryUrl = String.format("%s/api/v1/query_exemplars?query=%s&start=%s&end=%s",
uri.toString().replaceAll("/$", ""), URLEncoder.encode(query, StandardCharsets.UTF_8),
start, end);
logger.debug("queryUrl: " + queryUrl);
Request request = new Request.Builder()
.url(queryUrl)
.build();
Response response = this.okHttpClient.newCall(request).execute();
JSONObject jsonObject = readResponse(response);
return jsonObject.getJSONArray("data");
}

private List<String> toListOfLabels(JSONArray array) {
List<String> result = new ArrayList<>();
for (int i = 0; i < array.length(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,15 @@ public class PrometheusFieldConstants {
public static final String TIMESTAMP = "@timestamp";
public static final String VALUE = "@value";
public static final String LABELS = "@labels";
public static final String MATRIX_KEY = "matrix";
public static final String RESULT_TYPE_KEY = "resultType";
public static final String METRIC_KEY = "metric";
public static final String RESULT_KEY = "result";
public static final String VALUES_KEY = "values";
public static final String SERIES_LABELS_KEY = "seriesLabels";
public static final String EXEMPLARS_KEY = "exemplars";
public static final String TRACE_ID_KEY = "traceID";
public static final String LABELS_KEY = "labels";
public static final String TIMESTAMP_KEY = "timestamp";
public static final String VALUE_KEY = "value";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.prometheus.functions.implementation;

import static org.opensearch.sql.prometheus.functions.resolver.QueryRangeTableFunctionResolver.ENDTIME;
import static org.opensearch.sql.prometheus.functions.resolver.QueryRangeTableFunctionResolver.QUERY;
import static org.opensearch.sql.prometheus.functions.resolver.QueryRangeTableFunctionResolver.STARTTIME;

import java.util.List;
import java.util.stream.Collectors;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.exception.ExpressionEvaluationException;
import org.opensearch.sql.expression.Expression;
import org.opensearch.sql.expression.FunctionExpression;
import org.opensearch.sql.expression.NamedArgumentExpression;
import org.opensearch.sql.expression.env.Environment;
import org.opensearch.sql.expression.function.FunctionName;
import org.opensearch.sql.expression.function.TableFunctionImplementation;
import org.opensearch.sql.prometheus.client.PrometheusClient;
import org.opensearch.sql.prometheus.request.PrometheusQueryExemplarsRequest;
import org.opensearch.sql.prometheus.storage.QueryExemplarsTable;
import org.opensearch.sql.storage.Table;

public class QueryExemplarFunctionImplementation extends FunctionExpression implements
TableFunctionImplementation {

private final FunctionName functionName;
private final List<Expression> arguments;
private final PrometheusClient prometheusClient;

/**
* Required argument constructor.
*
* @param functionName name of the function
* @param arguments a list of arguments provided
*/
public QueryExemplarFunctionImplementation(FunctionName functionName, List<Expression> arguments,
PrometheusClient prometheusClient) {
super(functionName, arguments);
this.functionName = functionName;
this.arguments = arguments;
this.prometheusClient = prometheusClient;
}

@Override
public ExprValue valueOf(Environment<Expression, ExprValue> valueEnv) {
throw new UnsupportedOperationException(String.format(
"Prometheus defined function [%s] is only "
+ "supported in SOURCE clause with prometheus connector catalog",
functionName));
}

@Override
public ExprType type() {
return ExprCoreType.STRUCT;
}

@Override
public String toString() {
List<String> args = arguments.stream()
.map(arg -> String.format("%s=%s", ((NamedArgumentExpression) arg)
.getArgName(), ((NamedArgumentExpression) arg).getValue().toString()))
.collect(Collectors.toList());
return String.format("%s(%s)", functionName, String.join(", ", args));
}

@Override
public Table applyArguments() {
return new QueryExemplarsTable(prometheusClient, buildExemplarsQueryRequest(arguments));
}

private PrometheusQueryExemplarsRequest buildExemplarsQueryRequest(List<Expression> arguments) {

PrometheusQueryExemplarsRequest request = new PrometheusQueryExemplarsRequest();
arguments.forEach(arg -> {
String argName = ((NamedArgumentExpression) arg).getArgName();
Expression argValue = ((NamedArgumentExpression) arg).getValue();
ExprValue literalValue = argValue.valueOf();
switch (argName) {
case QUERY:
request
.setQuery((String) literalValue.value());
break;
case STARTTIME:
request.setStartTime(((Number) literalValue.value()).longValue());
break;
case ENDTIME:
request.setEndTime(((Number) literalValue.value()).longValue());
break;
default:
throw new ExpressionEvaluationException(
String.format("Invalid Function Argument:%s", argName));
}
});
return request;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.prometheus.functions.resolver;

import static org.opensearch.sql.data.type.ExprCoreType.LONG;
import static org.opensearch.sql.data.type.ExprCoreType.STRING;
import static org.opensearch.sql.prometheus.utils.TableFunctionUtils.getNamedArgumentsOfTableFunction;
import static org.opensearch.sql.prometheus.utils.TableFunctionUtils.validatePrometheusTableFunctionArguments;

import java.util.List;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.expression.Expression;
import org.opensearch.sql.expression.function.FunctionBuilder;
import org.opensearch.sql.expression.function.FunctionName;
import org.opensearch.sql.expression.function.FunctionResolver;
import org.opensearch.sql.expression.function.FunctionSignature;
import org.opensearch.sql.prometheus.client.PrometheusClient;
import org.opensearch.sql.prometheus.functions.implementation.QueryExemplarFunctionImplementation;

/**
* This class is for query_exemplars table function resolver {@link FunctionResolver}.
* It takes care of validating function arguments and also creating
* required {@link org.opensearch.sql.expression.function.TableFunctionImplementation} Class.
*/
@RequiredArgsConstructor
public class QueryExemplarsTableFunctionResolver implements FunctionResolver {

private final PrometheusClient prometheusClient;
public static final String QUERY_EXEMPLARS = "query_exemplars";

public static final String QUERY = "query";
public static final String STARTTIME = "starttime";
public static final String ENDTIME = "endtime";

@Override
public Pair<FunctionSignature, FunctionBuilder> resolve(FunctionSignature unresolvedSignature) {
final FunctionName functionName = FunctionName.of(QUERY_EXEMPLARS);
FunctionSignature functionSignature =
new FunctionSignature(FunctionName.of(QUERY_EXEMPLARS), List.of(STRING, LONG, LONG));
FunctionBuilder functionBuilder = (functionProperties, arguments) -> {
final List<String> argumentNames = List.of(QUERY, STARTTIME, ENDTIME);
validatePrometheusTableFunctionArguments(arguments, argumentNames);
List<Expression> namedArguments = getNamedArgumentsOfTableFunction(arguments, argumentNames);
return new QueryExemplarFunctionImplementation(functionName,
namedArguments, prometheusClient);
};
return Pair.of(functionSignature, functionBuilder);
}

@Override
public FunctionName getFunctionName() {
return FunctionName.of(QUERY_EXEMPLARS);
}
}
Loading

0 comments on commit 54d5e7b

Please sign in to comment.