Skip to content

Commit

Permalink
Prometheus Query Exemplars
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vamsi-amazon committed Jun 27, 2023
1 parent 6c3744e commit e72bd26
Show file tree
Hide file tree
Showing 30 changed files with 1,389 additions and 196 deletions.
79 changes: 73 additions & 6 deletions docs/user/ppl/admin/prometheus_connector.rst
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,11 @@ PromQL Support for prometheus Connector

`query_range` Table Function
----------------------------
Prometheus connector offers `query_range` table function. This table function can be used to query metrics in a specific time range using promQL.
The function takes inputs similar to parameters mentioned for query range api mentioned here: https://prometheus.io/docs/prometheus/latest/querying/api/
Arguments should be either passed by name or positionArguments should be either passed by name or position.
`source=my_prometheus.query_range('prometheus_http_requests_total', 1686694425, 1686700130, 14)`
or
`source=my_prometheus.query_range(query='prometheus_http_requests_total', starttime=1686694425, endtime=1686700130, step=14)`
* Prometheus connector offers `query_range` table function. This table function can be used to query metrics in a specific time range using promQL.
* The function takes inputs similar to parameters mentioned for query range api mentioned here: https://prometheus.io/docs/prometheus/latest/querying/api/
* Arguments should be either passed by name or positionArguments should be either passed by name or position.
- `source=my_prometheus.query_range('prometheus_http_requests_total', 1686694425, 1686700130, 14)`
- `source=my_prometheus.query_range(query='prometheus_http_requests_total', starttime=1686694425, endtime=1686700130, step=14)`
Example::

> source=my_prometheus.query_range('prometheus_http_requests_total', 1686694425, 1686700130, 14)
Expand All @@ -210,3 +209,71 @@ Example::
| 9 | "2022-11-03 07:18:54" | "/-/promql" | 400 | 192.15.2.1 | prometheus |
| 11 | "2022-11-03 07:18:64" |"/-/metrics" | 500 | 192.15.2.1 | prometheus |
+------------+------------------------+--------------------------------+---------------+-------------+-------------+


Prometheus Connector Table Functions
==========================================

`query_exemplars` Table Function
----------------------------
* This table function can be used to fetch exemplars of a query in a specific time range.
* The function takes inputs similar to parameters mentioned for query exemplars api mentioned here: https://prometheus.io/docs/prometheus/latest/querying/api/
* Arguments should be either passed by name or positionArguments should be either passed by name or position.
- `source=my_prometheus.query_exemplars('prometheus_http_requests_total', 1686694425, 1686700130)`
- `source=my_prometheus.query_exemplars(query='prometheus_http_requests_total', starttime=1686694425, endtime=1686700130)`
Example::

> source=my_prometheus.query_exemplars('prometheus_http_requests_total', 1686694425, 1686700130)
"schema": [
{
"name": "seriesLabels",
"type": "struct"
},
{
"name": "exemplars",
"type": "array"
}
],
"datarows": [
[
{
"instance": "localhost:8090",
"__name__": "test_exemplar_metric_total",
"service": "bar",
"job": "prometheus"
},
[
{
"labels": {
"traceID": "EpTxMJ40fUus7aGY"
},
"timestamp": "2020-09-14 15:22:25.479",
"value": 6.0
}
]
],
[
{
"instance": "localhost:8090",
"__name__": "test_exemplar_metric_total",
"service": "foo",
"job": "prometheus"
},
[
{
"labels": {
"traceID": "Olp9XHlq763ccsfa"
},
"timestamp": "2020-09-14 15:22:35.479",
"value": 19.0
},
{
"labels": {
"traceID": "hCtjygkIHwAN9vs4"
},
"timestamp": "2020-09-14 15:22:45.489",
"value": 20.0
}
]
]
]
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.json.JSONArray;
import org.json.JSONObject;
import org.opensearch.sql.prometheus.request.system.model.MetricMetadata;

Expand All @@ -18,4 +19,6 @@ public interface PrometheusClient {
List<String> getLabels(String metricName) throws IOException;

Map<String, List<MetricMetadata>> getAllMetrics() throws IOException;

JSONArray queryExemplars(String query, Long start, Long end) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,20 @@ public Map<String, List<MetricMetadata>> getAllMetrics() throws IOException {
return new ObjectMapper().readValue(jsonObject.getJSONObject("data").toString(), typeRef);
}

@Override
public JSONArray queryExemplars(String query, Long start, Long end) throws IOException {
String queryUrl = String.format("%s/api/v1/query_exemplars?query=%s&start=%s&end=%s",
uri.toString().replaceAll("/$", ""), URLEncoder.encode(query, StandardCharsets.UTF_8),
start, end);
logger.debug("queryUrl: " + queryUrl);
Request request = new Request.Builder()
.url(queryUrl)
.build();
Response response = this.okHttpClient.newCall(request).execute();
JSONObject jsonObject = readResponse(response);
return jsonObject.getJSONArray("data");
}

private List<String> toListOfLabels(JSONArray array) {
List<String> result = new ArrayList<>();
for (int i = 0; i < array.length(); i++) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.prometheus.functions.implementation;

import static org.opensearch.sql.prometheus.functions.resolver.QueryRangeTableFunctionResolver.ENDTIME;
import static org.opensearch.sql.prometheus.functions.resolver.QueryRangeTableFunctionResolver.QUERY;
import static org.opensearch.sql.prometheus.functions.resolver.QueryRangeTableFunctionResolver.STARTTIME;

import java.util.List;
import java.util.stream.Collectors;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.exception.ExpressionEvaluationException;
import org.opensearch.sql.expression.Expression;
import org.opensearch.sql.expression.FunctionExpression;
import org.opensearch.sql.expression.NamedArgumentExpression;
import org.opensearch.sql.expression.env.Environment;
import org.opensearch.sql.expression.function.FunctionName;
import org.opensearch.sql.expression.function.TableFunctionImplementation;
import org.opensearch.sql.prometheus.client.PrometheusClient;
import org.opensearch.sql.prometheus.request.PrometheusQueryExemplarsRequest;
import org.opensearch.sql.prometheus.storage.QueryExemplarsTable;
import org.opensearch.sql.storage.Table;

public class QueryExemplarFunctionImplementation extends FunctionExpression implements
TableFunctionImplementation {

private final FunctionName functionName;
private final List<Expression> arguments;
private final PrometheusClient prometheusClient;

/**
* Required argument constructor.
*
* @param functionName name of the function
* @param arguments a list of arguments provided
*/
public QueryExemplarFunctionImplementation(FunctionName functionName, List<Expression> arguments,
PrometheusClient prometheusClient) {
super(functionName, arguments);
this.functionName = functionName;
this.arguments = arguments;
this.prometheusClient = prometheusClient;
}

@Override
public ExprValue valueOf(Environment<Expression, ExprValue> valueEnv) {
throw new UnsupportedOperationException(String.format(
"Prometheus defined function [%s] is only "
+ "supported in SOURCE clause with prometheus connector catalog",
functionName));
}

@Override
public ExprType type() {
return ExprCoreType.STRUCT;
}

@Override
public String toString() {
List<String> args = arguments.stream()
.map(arg -> String.format("%s=%s", ((NamedArgumentExpression) arg)
.getArgName(), ((NamedArgumentExpression) arg).getValue().toString()))
.collect(Collectors.toList());
return String.format("%s(%s)", functionName, String.join(", ", args));
}

@Override
public Table applyArguments() {
return new QueryExemplarsTable(prometheusClient, buildExemplarsQueryRequest(arguments));
}

private PrometheusQueryExemplarsRequest buildExemplarsQueryRequest(List<Expression> arguments) {

PrometheusQueryExemplarsRequest request = new PrometheusQueryExemplarsRequest();
arguments.forEach(arg -> {
String argName = ((NamedArgumentExpression) arg).getArgName();
Expression argValue = ((NamedArgumentExpression) arg).getValue();
ExprValue literalValue = argValue.valueOf();
switch (argName) {
case QUERY:
request
.setQuery((String) literalValue.value());
break;
case STARTTIME:
request.setStartTime(((Number) literalValue.value()).longValue());
break;
case ENDTIME:
request.setEndTime(((Number) literalValue.value()).longValue());
break;
default:
throw new ExpressionEvaluationException(
String.format("Invalid Function Argument:%s", argName));
}
});
return request;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.opensearch.sql.prometheus.functions.resolver;

import static org.opensearch.sql.data.type.ExprCoreType.LONG;
import static org.opensearch.sql.data.type.ExprCoreType.STRING;
import static org.opensearch.sql.prometheus.utils.TableFunctionUtils.getNamedArgumentsOfTableFunction;
import static org.opensearch.sql.prometheus.utils.TableFunctionUtils.validatePrometheusTableFunctionArguments;

import java.util.List;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.expression.Expression;
import org.opensearch.sql.expression.function.FunctionBuilder;
import org.opensearch.sql.expression.function.FunctionName;
import org.opensearch.sql.expression.function.FunctionResolver;
import org.opensearch.sql.expression.function.FunctionSignature;
import org.opensearch.sql.prometheus.client.PrometheusClient;
import org.opensearch.sql.prometheus.functions.implementation.QueryExemplarFunctionImplementation;

@RequiredArgsConstructor
public class QueryExemplarsTableFunctionResolver implements FunctionResolver {

private final PrometheusClient prometheusClient;
public static final String QUERY_EXEMPLARS = "query_exemplars";

public static final String QUERY = "query";
public static final String STARTTIME = "starttime";
public static final String ENDTIME = "endtime";

@Override
public Pair<FunctionSignature, FunctionBuilder> resolve(FunctionSignature unresolvedSignature) {
final FunctionName functionName = FunctionName.of(QUERY_EXEMPLARS);
FunctionSignature functionSignature =
new FunctionSignature(FunctionName.of(QUERY_EXEMPLARS), List.of(STRING, LONG, LONG));
FunctionBuilder functionBuilder = (functionProperties, arguments) -> {
final List<String> argumentNames = List.of(QUERY, STARTTIME, ENDTIME);
validatePrometheusTableFunctionArguments(arguments, argumentNames);
List<Expression> namedArguments = getNamedArgumentsOfTableFunction(arguments, argumentNames);
return new QueryExemplarFunctionImplementation(functionName,
namedArguments, prometheusClient);
};
return Pair.of(functionSignature, functionBuilder);
}

@Override
public FunctionName getFunctionName() {
return FunctionName.of(QUERY_EXEMPLARS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,13 @@

import static org.opensearch.sql.data.type.ExprCoreType.LONG;
import static org.opensearch.sql.data.type.ExprCoreType.STRING;
import static org.opensearch.sql.prometheus.utils.TableFunctionUtils.getNamedArgumentsOfTableFunction;
import static org.opensearch.sql.prometheus.utils.TableFunctionUtils.validatePrometheusTableFunctionArguments;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.exception.SemanticCheckException;
import org.opensearch.sql.expression.Expression;
import org.opensearch.sql.expression.NamedArgumentExpression;
import org.opensearch.sql.expression.function.FunctionBuilder;
import org.opensearch.sql.expression.function.FunctionName;
import org.opensearch.sql.expression.function.FunctionResolver;
Expand All @@ -32,7 +27,6 @@
public class QueryRangeTableFunctionResolver implements FunctionResolver {

private final PrometheusClient prometheusClient;

public static final String QUERY_RANGE = "query_range";
public static final String QUERY = "query";
public static final String STARTTIME = "starttime";
Expand All @@ -45,51 +39,14 @@ public Pair<FunctionSignature, FunctionBuilder> resolve(FunctionSignature unreso
FunctionSignature functionSignature =
new FunctionSignature(functionName, List.of(STRING, LONG, LONG, LONG));
final List<String> argumentNames = List.of(QUERY, STARTTIME, ENDTIME, STEP);

FunctionBuilder functionBuilder = (functionProperties, arguments) -> {
Boolean argumentsPassedByName = arguments.stream()
.noneMatch(arg -> StringUtils.isEmpty(((NamedArgumentExpression) arg).getArgName()));
Boolean argumentsPassedByPosition = arguments.stream()
.allMatch(arg -> StringUtils.isEmpty(((NamedArgumentExpression) arg).getArgName()));
if (!(argumentsPassedByName || argumentsPassedByPosition)) {
throw new SemanticCheckException("Arguments should be either passed by name or position");
}

if (arguments.size() != argumentNames.size()) {
throw new SemanticCheckException(
generateErrorMessageForMissingArguments(argumentsPassedByPosition, arguments,
argumentNames));
}

if (argumentsPassedByPosition) {
List<Expression> namedArguments = new ArrayList<>();
for (int i = 0; i < arguments.size(); i++) {
namedArguments.add(new NamedArgumentExpression(argumentNames.get(i),
((NamedArgumentExpression) arguments.get(i)).getValue()));
}
return new QueryRangeFunctionImplementation(functionName, namedArguments, prometheusClient);
}
return new QueryRangeFunctionImplementation(functionName, arguments, prometheusClient);
validatePrometheusTableFunctionArguments(arguments, argumentNames);
List<Expression> namedArguments = getNamedArgumentsOfTableFunction(arguments, argumentNames);
return new QueryRangeFunctionImplementation(functionName, namedArguments, prometheusClient);
};
return Pair.of(functionSignature, functionBuilder);
}

private String generateErrorMessageForMissingArguments(Boolean argumentsPassedByPosition,
List<Expression> arguments,
List<String> argumentNames) {
if (argumentsPassedByPosition) {
return String.format("Missing arguments:[%s]",
String.join(",", argumentNames.subList(arguments.size(), argumentNames.size())));
} else {
Set<String> requiredArguments = new HashSet<>(argumentNames);
Set<String> providedArguments =
arguments.stream().map(expression -> ((NamedArgumentExpression) expression).getArgName())
.collect(Collectors.toSet());
requiredArguments.removeAll(providedArguments);
return String.format("Missing arguments:[%s]", String.join(",", requiredArguments));
}
}

@Override
public FunctionName getFunctionName() {
return FunctionName.of(QUERY_RANGE);
Expand Down
Loading

0 comments on commit e72bd26

Please sign in to comment.