From 5c7ac6c46e22e8e00fef954e62d2d34736181fbf Mon Sep 17 00:00:00 2001 From: "Xiaotian (Jackie) Jiang" Date: Fri, 24 Jan 2025 16:39:35 -0800 Subject: [PATCH] Enhance data schema generation for empty response --- .../BaseSingleStageBrokerRequestHandler.java | 4 +- .../apache/pinot/common/utils/DataSchema.java | 4 - .../tests/EmptyResponseIntegrationTest.java | 91 +++++------ .../tests/OfflineClusterIntegrationTest.java | 6 - .../pinot/query/parser/utils/ParserUtils.java | 144 +++++++++--------- 5 files changed, 106 insertions(+), 143 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index b8c04140dc7..7f41132a5a3 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -841,7 +841,9 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO // server returns STRING as default dataType for all columns in (some) scenarios where no rows are returned // this is an attempt to return more faithful information based on other sources - ParserUtils.fillEmptyResponseSchema(brokerResponse, _tableCache, schema, database, query); + if (brokerResponse.getNumRowsResultSet() == 0) { + ParserUtils.fillEmptyResponseSchema(brokerResponse, _tableCache, schema, database, query); + } // Set total query processing time long totalTimeMs = System.currentTimeMillis() - requestContext.getRequestArrivalTimeMillis(); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java index 2fb2aef48a2..7751d4dc568 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java @@ -419,10 +419,6 @@ public boolean isWholeNumberArray() { return INTEGRAL_ARRAY_TYPES.contains(this); } - public boolean isUnknown() { - return UNKNOWN.equals(this); - } - public boolean isCompatible(ColumnDataType anotherColumnDataType) { // All numbers are compatible with each other return this == anotherColumnDataType || (this.isNumber() && anotherColumnDataType.isNumber()) || ( diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java index d49d2555abf..5c0c3454a92 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java @@ -41,7 +41,6 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.util.TestUtils; import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; @@ -55,6 +54,15 @@ public class EmptyResponseIntegrationTest extends BaseClusterIntegrationTestSet { private static final int NUM_BROKERS = 1; private static final int NUM_SERVERS = 1; + private static final String[] SELECT_STAR_TYPES = new String[]{ + "INT", "INT", "LONG", "INT", "FLOAT", "DOUBLE", "INT", "STRING", "INT", "INT", "INT", "INT", "STRING", "INT", + "STRING", "INT", "INT", "INT", "INT", "INT", "DOUBLE", "FLOAT", "INT", "STRING", "INT", "STRING", "INT", "INT", + "INT", "STRING", "STRING", "INT", "STRING", "INT", "INT", "INT", "INT", "INT_ARRAY", "INT", "INT_ARRAY", + "STRING_ARRAY", "INT", "INT", "FLOAT_ARRAY", "INT", "STRING_ARRAY", "LONG_ARRAY", "INT_ARRAY", "INT_ARRAY", + "INT", "INT", "STRING", "INT", "INT", "INT", "INT", "INT", "INT", "STRING", "INT", "INT", "INT", "STRING", + "STRING", "INT", "STRING", "INT", "INT", "STRING_ARRAY", "INT", "STRING", "INT", "INT", "INT", "STRING", "INT", + "INT", "INT", "INT" + }; private final List _serviceStatusCallbacks = new ArrayList<>(getNumBrokers() + getNumServers()); @@ -134,11 +142,6 @@ public void setUp() waitForAllDocsLoaded(600_000L); } - @BeforeMethod - public void resetMultiStage() { - setUseMultiStageQueryEngine(false); - } - protected void startBrokers() throws Exception { startBrokers(getNumBrokers()); @@ -184,21 +187,17 @@ public void testInstancesStarted() { } @Test - public void testCompiledByV2StarField() throws Exception { + public void testStarField() + throws Exception { String sqlQuery = "SELECT * FROM myTable WHERE AirlineID = 0 LIMIT 1"; JsonNode response = postQuery(sqlQuery); assertNoRowsReturned(response); - assertDataTypes(response, "INT", "INT", "LONG", "INT", "FLOAT", "DOUBLE", "INT", "STRING", "INT", "INT", "INT", - "INT", "STRING", "INT", "STRING", "INT", "INT", "INT", "INT", "INT", "DOUBLE", "FLOAT", "INT", "STRING", "INT", - "STRING", "INT", "INT", "INT", "STRING", "STRING", "INT", "STRING", "INT", "INT", "INT", "INT", "INT_ARRAY", - "INT", "INT_ARRAY", "STRING_ARRAY", "INT", "INT", "FLOAT_ARRAY", "INT", "STRING_ARRAY", "LONG_ARRAY", - "INT_ARRAY", "INT_ARRAY", "INT", "INT", "STRING", "INT", "INT", "INT", "INT", "INT", "INT", "STRING", "INT", - "INT", "INT", "STRING", "STRING", "INT", "STRING", "INT", "INT", "STRING_ARRAY", "INT", "STRING", "INT", "INT", - "INT", "STRING", "INT", "INT", "INT", "INT"); + assertDataTypes(response, SELECT_STAR_TYPES); } @Test - public void testCompiledByV2SelectionFields() throws Exception { + public void testSelectionFields() + throws Exception { String sqlQuery = "SELECT AirlineID, ArrTime, ArrTimeBlk FROM myTable WHERE AirlineID = 0 LIMIT 1"; JsonNode response = postQuery(sqlQuery); assertNoRowsReturned(response); @@ -206,7 +205,8 @@ public void testCompiledByV2SelectionFields() throws Exception { } @Test - public void testCompiledByV2TrasformedFields() throws Exception { + public void testTransformedFields() + throws Exception { String sqlQuery = "SELECT AirlineID, ArrTime, ArrTime+1 FROM myTable WHERE AirlineID = 0 LIMIT 1"; JsonNode response = postQuery(sqlQuery); assertNoRowsReturned(response); @@ -214,7 +214,8 @@ public void testCompiledByV2TrasformedFields() throws Exception { } @Test - public void testCompiledByV2AggregatedFields() throws Exception { + public void testAggregatedFields() + throws Exception { String sqlQuery = "SELECT AirlineID, avg(ArrTime) FROM myTable WHERE AirlineID = 0 GROUP BY AirlineID LIMIT 1"; JsonNode response = postQuery(sqlQuery); assertNoRowsReturned(response); @@ -222,29 +223,27 @@ public void testCompiledByV2AggregatedFields() throws Exception { } @Test - public void testSchemaFallbackStarField() throws Exception { + public void testSchemaFallbackStarField() + throws Exception { String sqlQuery = "SELECT * FROM myTable WHERE AirlineID = 0 AND add(AirTime, AirTime, ArrTime) > 0 LIMIT 1"; JsonNode response = postQuery(sqlQuery); assertNoRowsReturned(response); - assertDataTypes(response, "INT", "INT", "LONG", "INT", "FLOAT", "DOUBLE", "INT", "STRING", "INT", "INT", "INT", - "INT", "STRING", "INT", "STRING", "INT", "INT", "INT", "INT", "INT", "DOUBLE", "FLOAT", "INT", "STRING", "INT", - "STRING", "INT", "INT", "INT", "STRING", "STRING", "INT", "STRING", "INT", "INT", "INT", "INT", "INT", "INT", - "INT", "STRING", "INT", "INT", "FLOAT", "INT", "STRING", "LONG", "INT", "INT", "INT", "INT", "STRING", "INT", - "INT", "INT", "INT", "INT", "INT", "STRING", "INT", "INT", "INT", "STRING", "STRING", "INT", "STRING", "INT", - "INT", "STRING", "INT", "STRING", "INT", "INT", "INT", "STRING", "INT", "INT", "INT", "INT"); + assertDataTypes(response, SELECT_STAR_TYPES); } @Test - public void testSchemaFallbackSelectionFields() throws Exception { + public void testSchemaFallbackSelectionFields() + throws Exception { String sqlQuery = "SELECT AirlineID, ArrTime, ArrTimeBlk FROM myTable" - + " WHERE AirlineID = 0 AND add(ArrTime, ArrTime, ArrTime) > 0 LIMIT 1"; + + " WHERE AirlineID = 0 AND add(ArrTime, ArrTime, ArrTime) > 0 LIMIT 1"; JsonNode response = postQuery(sqlQuery); assertNoRowsReturned(response); assertDataTypes(response, "LONG", "INT", "STRING"); } @Test - public void testSchemaFallbackTransformedFields() throws Exception { + public void testSchemaFallbackTransformedFields() + throws Exception { String sqlQuery = "SELECT AirlineID, ArrTime, ArrTime+1 FROM myTable" + " WHERE AirlineID = 0 AND add(ArrTime, ArrTime, ArrTime) > 0 LIMIT 1"; JsonNode response = postQuery(sqlQuery); @@ -253,7 +252,8 @@ public void testSchemaFallbackTransformedFields() throws Exception { } @Test - public void testSchemaFallbackAggregatedFields() throws Exception { + public void testSchemaFallbackAggregatedFields() + throws Exception { String sqlQuery = "SELECT AirlineID, avg(ArrTime) FROM myTable" + " WHERE AirlineID = 0 AND add(ArrTime, ArrTime, ArrTime) > 0 GROUP BY AirlineID LIMIT 1"; JsonNode response = postQuery(sqlQuery); @@ -262,24 +262,13 @@ public void testSchemaFallbackAggregatedFields() throws Exception { } @Test - public void testDataSchemaForBrokerPrunedEmptyResults() throws Exception { + public void testDataSchemaForBrokerPrunedEmptyResults() + throws Exception { TableConfig tableConfig = getOfflineTableConfig(); tableConfig.setRoutingConfig( new RoutingConfig(null, Collections.singletonList(RoutingConfig.TIME_SEGMENT_PRUNER_TYPE), null, null)); updateTableConfig(tableConfig); - TestUtils.waitForCondition(aVoid -> { - try { - TableConfig cfg = getOfflineTableConfig(); - if (cfg.getRoutingConfig() == null || cfg.getRoutingConfig().getSegmentPrunerTypes().isEmpty()) { - return false; - } - return true; - } catch (Exception e) { - throw new RuntimeException(e); - } - }, 600_000L, "Failed to update table config"); - String query = "Select DestAirportID, Carrier from myTable WHERE DaysSinceEpoch < -1231231 and FlightNum > 121231231231"; @@ -293,31 +282,19 @@ public void testDataSchemaForBrokerPrunedEmptyResults() throws Exception { assertNoRowsReturned(queryResponse); assertDataTypes(queryResponse, "INT", "STRING"); - // Reset and remove the Time Segment Pruner - tableConfig = getOfflineTableConfig(); - tableConfig.setRoutingConfig(new RoutingConfig(null, Collections.emptyList(), null, null)); + // Reset the routing config + tableConfig.setRoutingConfig(null); updateTableConfig(tableConfig); - TestUtils.waitForCondition(aVoid -> { - try { - TableConfig cfg = getOfflineTableConfig(); - if (cfg.getRoutingConfig() == null || cfg.getRoutingConfig().getSegmentPrunerTypes().size() > 0) { - return false; - } - return true; - } catch (Exception e) { - throw new RuntimeException(e); - } - }, 600_000L, "Failed to update table config"); } - private void assertNoRowsReturned(JsonNode response) { assertNotNull(response.get("resultTable")); assertNotNull(response.get("resultTable").get("rows")); assertEquals(response.get("resultTable").get("rows").size(), 0); } - private void assertDataTypes(JsonNode response, String... types) throws Exception { + private void assertDataTypes(JsonNode response, String... types) + throws Exception { assertNotNull(response.get("resultTable")); assertNotNull(response.get("resultTable").get("dataSchema")); assertNotNull(response.get("resultTable").get("dataSchema").get("columnDataTypes")); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index 3322c6bcd24..f73852db5bd 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -89,7 +89,6 @@ import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import static org.apache.pinot.common.function.scalar.StringFunctions.*; @@ -267,11 +266,6 @@ private void reloadAllSegments(String testQuery, boolean forceDownload, long num }, 600_000L, "Failed to reload table with force download"); } - @BeforeMethod - public void resetMultiStage() { - setUseMultiStageQueryEngine(false); - } - protected void startBrokers() throws Exception { startBrokers(getNumBrokers()); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java index 27f2842ed73..ec58dd296c7 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.query.parser.utils; +import com.google.common.base.Preconditions; import java.util.List; import org.apache.calcite.rel.RelRoot; import org.apache.calcite.rel.type.RelDataTypeField; @@ -25,6 +26,7 @@ import org.apache.pinot.common.response.BrokerResponse; import org.apache.pinot.common.response.broker.ResultTable; import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.query.QueryEnvironment; import org.apache.pinot.query.planner.logical.RelToPlanNodeConverter; import org.apache.pinot.spi.data.FieldSpec; @@ -53,95 +55,87 @@ public static boolean canCompileWithMultiStageEngine(String query, String databa } /** - * Tries to fill an empty or not properly filled schema when no rows have been returned. + * Tries to fill an empty or not properly filled {@link DataSchema} when no row has been returned. + * + * Response data schema can be inaccurate or incomplete in several forms: + * 1. No result table at all (when all segments have been pruned on broker). + * 2. Data schema has all columns set to default type (STRING) (when all segments pruned on server). * * Priority is: - * - Types in schema provided by V2 validation for the given query. - * - Types in schema provided by V1 for the given table (only appliable to selection fields). - * - Types in response provided by V1 server (no action). + * - Types from multi-stage engine validation for the given query. + * - Types from schema for the given table (only applicable to selection fields). + * - Types from single-stage engine response (no action). + * + * Multi-stage engine schema will be available only if query compiles. */ - public static void fillEmptyResponseSchema( - BrokerResponse response, TableCache tableCache, Schema schema, String database, String query - ) { - if (response == null || response.getNumRowsResultSet() > 0) { - return; - } + public static void fillEmptyResponseSchema(BrokerResponse response, TableCache tableCache, Schema schema, + String database, String query) { + Preconditions.checkState(response.getNumRowsResultSet() == 0, "Cannot fill schema for non-empty response"); - QueryEnvironment queryEnvironment = new QueryEnvironment(database, tableCache, null); - RelRoot node = queryEnvironment.getRelRootIfCanCompile(query); - DataSchema.ColumnDataType resolved; + DataSchema dataSchema = response.getResultTable() != null ? response.getResultTable().getDataSchema() : null; - // V1 schema info for the response can be inaccurate or incomplete in several forms: - // 1) No schema provided at all (when no segments have been even pruned). - // 2) Schema provided but all columns set to default type (STRING) (when no segments have been matched). - // V2 schema will be available only if query compiles. + List dataTypeFields = null; + try { + QueryEnvironment queryEnvironment = new QueryEnvironment(database, tableCache, null); + RelRoot node = queryEnvironment.getRelRootIfCanCompile(query); + if (node != null && node.validatedRowType != null) { + dataTypeFields = node.validatedRowType.getFieldList(); + } + } catch (Exception ignored) { + // Ignored + } - boolean hasV1Schema = response.getResultTable() != null; - boolean hasV2Schema = node != null && node.validatedRowType != null; + if (dataSchema == null && dataTypeFields == null) { + // No schema available, nothing we can do + return; + } - if (hasV1Schema && hasV2Schema) { - // match v1 column types with v2 column types using column names - // if no match, rely on v1 schema based on column name - // if no match either, just leave it as it is - DataSchema responseSchema = response.getResultTable().getDataSchema(); - List fields = node.validatedRowType.getFieldList(); - for (int i = 0; i < responseSchema.size(); i++) { - resolved = RelToPlanNodeConverter.convertToColumnDataType(fields.get(i).getType()); - if (resolved == null || resolved.isUnknown()) { - FieldSpec spec = schema.getFieldSpecFor(responseSchema.getColumnName(i)); - try { - resolved = DataSchema.ColumnDataType.fromDataType(spec.getDataType(), false); - } catch (Exception e) { - try { - resolved = DataSchema.ColumnDataType.fromDataType(spec.getDataType(), true); - } catch (Exception e2) { - resolved = DataSchema.ColumnDataType.UNKNOWN; - } - } - } - if (resolved == null || resolved.isUnknown()) { - resolved = responseSchema.getColumnDataType(i); + if (dataSchema == null || (dataTypeFields != null && dataSchema.size() != dataTypeFields.size())) { + // If data schema is not available or has different number of columns than the validated row type, we use the + // validated row type to populate the schema. + int numColumns = dataTypeFields.size(); + String[] columnNames = new String[numColumns]; + ColumnDataType[] columnDataTypes = new ColumnDataType[numColumns]; + for (int i = 0; i < numColumns; i++) { + RelDataTypeField dataTypeField = dataTypeFields.get(i); + columnNames[i] = dataTypeField.getName(); + ColumnDataType columnDataType; + try { + columnDataType = RelToPlanNodeConverter.convertToColumnDataType(dataTypeField.getType()); + } catch (Exception ignored) { + columnDataType = ColumnDataType.UNKNOWN; } - responseSchema.getColumnDataTypes()[i] = resolved; + columnDataTypes[i] = columnDataType; } - } else if (hasV1Schema) { - // match v1 column types with v1 schema columns using column names - // if no match, just leave it as it is - DataSchema responseSchema = response.getResultTable().getDataSchema(); - for (int i = 0; i < responseSchema.size(); i++) { - FieldSpec spec = schema.getFieldSpecFor(responseSchema.getColumnName(i)); + response.setResultTable(new ResultTable(new DataSchema(columnNames, columnDataTypes), List.of())); + return; + } + + // When data schema is available, try to fix the data types within it. + ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes(); + int numColumns = columnDataTypes.length; + if (dataTypeFields != null) { + // Fill data type with the validated row type when it is available. + for (int i = 0; i < numColumns; i++) { try { - // try single value first - resolved = DataSchema.ColumnDataType.fromDataType(spec.getDataType(), true); - } catch (Exception e) { - try { - // fallback to multi value - resolved = DataSchema.ColumnDataType.fromDataType(spec.getDataType(), false); - } catch (Exception e2) { - resolved = DataSchema.ColumnDataType.UNKNOWN; - } - } - if (resolved == null || resolved.isUnknown()) { - resolved = responseSchema.getColumnDataType(i); + columnDataTypes[i] = RelToPlanNodeConverter.convertToColumnDataType(dataTypeFields.get(i).getType()); + } catch (Exception ignored) { + // Ignore exception and keep the type from response } - responseSchema.getColumnDataTypes()[i] = resolved; } - } else if (hasV2Schema) { - // trust v2 column types blindly - // if a type cannot be resolved, leave it as UNKNOWN - List fields = node.validatedRowType.getFieldList(); - String[] columnNames = new String[fields.size()]; - DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[fields.size()]; - for (int i = 0; i < fields.size(); i++) { - columnNames[i] = fields.get(i).getName(); - resolved = RelToPlanNodeConverter.convertToColumnDataType(fields.get(i).getType()); - if (resolved == null) { - resolved = DataSchema.ColumnDataType.UNKNOWN; + } else { + // Fill data type with the schema when validated row type is not available. + String[] columnNames = dataSchema.getColumnNames(); + for (int i = 0; i < numColumns; i++) { + FieldSpec fieldSpec = schema.getFieldSpecFor(columnNames[i]); + if (fieldSpec != null) { + try { + columnDataTypes[i] = ColumnDataType.fromDataType(fieldSpec.getDataType(), fieldSpec.isSingleValueField()); + } catch (Exception ignored) { + // Ignore exception and keep the type from response + } } - columnDataTypes[i] = resolved; } - response.setResultTable(new ResultTable(new DataSchema(columnNames, columnDataTypes), List.of())); } - // else { /* nothing else we can do */ } } }