Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Map BadQueryRequestException to QueryException.QUERY_VALIDATION_ERROR #14917

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerQueryPhase;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
import org.apache.pinot.common.response.broker.ResultTable;
Expand All @@ -73,6 +74,7 @@
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.auth.TableAuthorizationResult;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.exception.DatabaseConflictException;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.trace.Tracing;
Expand Down Expand Up @@ -269,11 +271,18 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
requestContext.setErrorCode(QueryException.EXECUTION_TIMEOUT_ERROR_CODE);
return new BrokerResponseNative(QueryException.EXECUTION_TIMEOUT_ERROR);
} catch (Throwable t) {
ProcessingException queryException = QueryException.QUERY_EXECUTION_ERROR;
if (t instanceof BadQueryRequestException) {
// provide more specific error code if available
queryException = QueryException.QUERY_VALIDATION_ERROR;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should update broker metrics - BrokerMeter.QUERY_VALIDATION_EXCEPTIONS ?

We should also update this in SingleStageBrokerRequestHandler?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For SingleStageBrokerRequestHandler, this is not needed because BadQueryRequestException thrown by ServerQueryExecutorV1Impl is stored in BrokerResponseNative object. It's directly parsed in PinotClientRequest::getPinotQueryResponse.

For this MultiStageBrokerRequestHandler, this was needed because query was handled by _queryDispatcher.submitAndReduce.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Diagrams for these two failure scenarios. Many middle steps are omitted and only relevant steps are shown 👇🏼

PinotClientRequest::processSqlQueryPost
  - BaseSingleStageBrokerRequestHandler::handleRequest
    - SingleConnectionBrokerRequestHandler::processBrokerRequest // broker metric is updated here
      - ServerQueryExecutorV1Impl::executeInternal // BadQueryRequestException is caught. InstanceResponseBlock is updated here
PinotClientRequest::getPinotQueryResponse // 700 error code is parsed

MultiStage query

PinotClientRequest::processSqlQueryPost
  - MultiStageBrokerRequestHandler::handleRequest
    - QueryDispatcher::submitAndReduce // exception is thrown if ANY block errors. I re-throw BadQueryRequestException here
  - MultiStageBrokerRequestHandler::handleRequest // catch BadQueryRequestException. Exceptions in requestContext are added to brokerResponse via augmentStatistics
PinotClientRequest::getPinotQueryResponse // 700 error code is parsed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Metric incremented now. Resolved

_brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1);
}

String consolidatedMessage = ExceptionUtils.consolidateExceptionMessages(t);
LOGGER.error("Caught exception executing request {}: {}, {}", requestId, query, consolidatedMessage);
requestContext.setErrorCode(QueryException.QUERY_EXECUTION_ERROR_CODE);
requestContext.setErrorCode(queryException.getErrorCode());
return new BrokerResponseNative(
QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, consolidatedMessage));
QueryException.getException(queryException, consolidatedMessage));
} finally {
Tracing.getThreadAccountant().clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.exception.EarlyTerminationException;
import org.apache.pinot.spi.trace.Tracing;
import org.slf4j.Logger;
Expand Down Expand Up @@ -191,10 +192,17 @@ protected static RuntimeException wrapOperatorException(Operator operator, Runti
// Not all operators have associated segment, so do this at best effort.
IndexSegment segment = operator.getIndexSegment();
if (segment == null) {
if (e instanceof IllegalArgumentException) {
return new BadQueryRequestException(e);
}
return e;
}
throw new RuntimeException(
"Caught exception while doing operator: " + operator.getClass() + " on segment: " + segment.getSegmentName(),
e);

String errorMessage = "Caught exception while doing operator: " + operator.getClass()
+ " on segment: " + segment.getSegmentName();
if (e instanceof IllegalArgumentException) {
throw new BadQueryRequestException(errorMessage, e);
}
throw new RuntimeException(errorMessage, e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pinot.core.operator.combine.merger.ResultsBlockMerger;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.exception.EarlyTerminationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -111,7 +112,11 @@ protected void processSegments() {
@Override
protected void onProcessSegmentsException(Throwable t) {
_processingException.compareAndSet(null, t);
_blockingQueue.offer(new ExceptionResultsBlock(t));
if (t instanceof BadQueryRequestException) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In ServerQueryExecutorV1Impl, we are setting QUERY_VALIDATION_ERROR for BadQueryRequestException. Do we require the same changes here?

Either way I see this change only being made in BaseSingleBlockCombineOperator. What about GroupByCombineOperator and BaseStreamingCombineOperator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In ServerQueryExecutorV1Impl, we are setting QUERY_VALIDATION_ERROR for BadQueryRequestException. Do we require the same changes here?

It does. Without this change, generic RuntimeException is thrown to QueryDispatcher.java which will bypass that check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either way I see this change only being made in BaseSingleBlockCombineOperator. What about GroupByCombineOperator and BaseStreamingCombineOperator?

Yeah just noticed these implement the same interface. Will update them as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All patched.

_blockingQueue.offer(new ExceptionResultsBlock(QueryException.QUERY_VALIDATION_ERROR, t));
} else {
_blockingQueue.offer(new ExceptionResultsBlock(t));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.data.table.IndexedTable;
import org.apache.pinot.core.data.table.IntermediateRecord;
Expand All @@ -41,6 +42,7 @@
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import org.apache.pinot.core.util.GroupByUtils;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.trace.Tracing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -203,6 +205,9 @@ public BaseResultsBlock mergeResults()

Throwable processingException = _processingException.get();
if (processingException != null) {
if (processingException instanceof BadQueryRequestException) {
return new ExceptionResultsBlock(QueryException.QUERY_VALIDATION_ERROR, processingException);
}
return new ExceptionResultsBlock(processingException);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.pinot.core.operator.combine.merger.ResultsBlockMerger;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.exception.EarlyTerminationException;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
Expand Down Expand Up @@ -171,7 +172,11 @@ protected void onProcessSegmentsException(Throwable t) {
_processingException.compareAndSet(null, t);
// Clear the blocking queue and add the exception results block to terminate the main thread
_blockingQueue.clear();
_blockingQueue.offer(new ExceptionResultsBlock(t));
if (t instanceof BadQueryRequestException) {
_blockingQueue.offer(new ExceptionResultsBlock(QueryException.QUERY_VALIDATION_ERROR, t));
} else {
_blockingQueue.offer(new ExceptionResultsBlock(t));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.exception.BadQueryRequestException;


public class MaxAggregationFunction extends NullableSingleInputAggregationFunction<Double, Double> {
Expand Down Expand Up @@ -143,7 +144,7 @@ public void aggregate(int length, AggregationResultHolder aggregationResultHolde
break;
}
default:
throw new IllegalStateException("Cannot compute max for non-numeric type: " + blockValSet.getValueType());
throw new BadQueryRequestException("Cannot compute max for non-numeric type: " + blockValSet.getValueType());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.exception.BadQueryRequestException;


public class MinAggregationFunction extends NullableSingleInputAggregationFunction<Double, Double> {
Expand Down Expand Up @@ -143,7 +144,7 @@ public void aggregate(int length, AggregationResultHolder aggregationResultHolde
break;
}
default:
throw new IllegalStateException("Cannot compute min for non-numeric type: " + blockValSet.getValueType());
throw new BadQueryRequestException("Cannot compute min for non-numeric type: " + blockValSet.getValueType());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.exception.BadQueryRequestException;


public class SumAggregationFunction extends NullableSingleInputAggregationFunction<Double, Double> {
Expand Down Expand Up @@ -139,7 +140,7 @@ public void aggregate(int length, AggregationResultHolder aggregationResultHolde
break;
}
default:
throw new IllegalStateException("Cannot compute sum for non-numeric type: " + blockValSet.getValueType());
throw new BadQueryRequestException("Cannot compute sum for non-numeric type: " + blockValSet.getValueType());
}
updateAggregationResultHolder(aggregationResultHolder, sum);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ private InstanceResponseBlock executeInternal(ServerQueryRequest queryRequest, E
// Do not log verbose error for BadQueryRequestException and QueryCancelledException.
if (e instanceof BadQueryRequestException) {
LOGGER.info("Caught BadQueryRequestException while processing requestId: {}, {}", requestId, e.getMessage());
instanceResponse.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
instanceResponse.addException(QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, e));
} else if (e instanceof QueryCancelledException) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Cancelled while processing requestId: {}", requestId, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,10 +515,15 @@ public void testQueryExceptions()

testQueryException("SELECT POTATO(ArrTime) FROM mytable",
useMultiStageQueryEngine()
? QueryException.QUERY_PLANNING_ERROR_CODE : QueryException.QUERY_EXECUTION_ERROR_CODE);
? QueryException.QUERY_PLANNING_ERROR_CODE : QueryException.QUERY_VALIDATION_ERROR_CODE);

// ArrTime expects a numeric type
testQueryException("SELECT COUNT(*) FROM mytable where ArrTime = 'potato'",
QueryException.QUERY_EXECUTION_ERROR_CODE);
QueryException.QUERY_VALIDATION_ERROR_CODE);

// Cannot use numeric aggregate function for string column
testQueryException("SELECT MAX(OriginState) FROM mytable where ArrTime > 5",
QueryException.QUERY_VALIDATION_ERROR_CODE);
}

private void testQueryException(String query, int errorCode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1034,7 +1034,7 @@ public void testBase64Func(boolean useMultiStageQueryEngine)
testQueryError(sqlQuery, QueryException.QUERY_PLANNING_ERROR_CODE);
} else {
response = postQuery(sqlQuery);
assertTrue(response.get("exceptions").get(0).get("message").toString().startsWith("\"QueryExecutionError"));
assertTrue(response.get("exceptions").get(0).get("message").toString().startsWith("\"QueryValidationError"));
}

// invalid argument
Expand All @@ -1043,7 +1043,7 @@ public void testBase64Func(boolean useMultiStageQueryEngine)
testQueryError(sqlQuery, QueryException.QUERY_PLANNING_ERROR_CODE);
} else {
response = postQuery(sqlQuery);
assertTrue(response.get("exceptions").get(0).get("message").toString().startsWith("\"QueryExecutionError"));
assertTrue(response.get("exceptions").get(0).get("message").toString().startsWith("\"QueryValidationError"));
}

// invalid argument
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.calcite.runtime.PairList;
import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.proto.Plan;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse;
Expand Down Expand Up @@ -76,6 +77,7 @@
import org.apache.pinot.query.service.dispatch.timeseries.TimeSeriesDispatchClient;
import org.apache.pinot.query.service.dispatch.timeseries.TimeSeriesDispatchObserver;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.CommonConstants;
Expand Down Expand Up @@ -459,7 +461,12 @@ public static QueryResult runReducer(long requestId,
}
// TODO: Improve the error handling, e.g. return partial response
if (block.isErrorBlock()) {
throw new RuntimeException("Received error query execution result block: " + block.getExceptions());
Map<Integer, String> queryExceptions = block.getExceptions();
if (queryExceptions.containsKey(QueryException.QUERY_VALIDATION_ERROR_CODE)) {
throw new BadQueryRequestException("Received error query execution result block: " + queryExceptions);
}

throw new RuntimeException("Received error query execution result block: " + queryExceptions);
}
assert block.isSuccessfulEndOfStreamBlock();
MultiStageQueryStats queryStats = block.getQueryStats();
Expand Down
Loading