Skip to content

Commit

Permalink
Merging master and including force commit PR changes
Browse files Browse the repository at this point in the history
  • Loading branch information
9aman committed Jan 24, 2025
2 parents 791ac21 + 82c3d2a commit 8db5bae
Show file tree
Hide file tree
Showing 225 changed files with 5,814 additions and 2,109 deletions.
14 changes: 7 additions & 7 deletions .github/workflows/pinot_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ jobs:
cache: 'maven'
- name: Linter Test
env:
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
MAVEN_OPTS: >
-Xmx2G -DskipShade -DfailIfNoTests=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=25
-Dmaven.wagon.http.retryHandler.count=30 -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false
Expand Down Expand Up @@ -112,7 +112,7 @@ jobs:
env:
RUN_INTEGRATION_TESTS: false
RUN_TEST_SET: ${{ matrix.testset }}
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
MAVEN_OPTS: >
-Xmx2G -DskipShade -DfailIfNoTests=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=25
-Dmaven.wagon.http.retryHandler.count=30 -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false
Expand All @@ -134,7 +134,7 @@ jobs:
RUN_INTEGRATION_TESTS: false
RUN_TEST_SET: ${{ matrix.testset }}
PINOT_OFFHEAP_SKIP_BYTEBUFFER: ${{ matrix.skip_bytebuffer }}
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
MAVEN_OPTS: >
-Xmx2G -DskipShade -DfailIfNoTests=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=25
-Dmaven.wagon.http.retryHandler.count=30 -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false
Expand Down Expand Up @@ -202,7 +202,7 @@ jobs:
- name: Build Project
env:
RUN_INTEGRATION_TESTS: true
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
MAVEN_OPTS: >
-Xmx2G -DskipShade -DfailIfNoTests=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=25
-Dmaven.wagon.http.retryHandler.count=30 -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false
Expand All @@ -219,7 +219,7 @@ jobs:
RUN_INTEGRATION_TESTS: true
RUN_TEST_SET: ${{ matrix.testset }}
PINOT_OFFHEAP_SKIP_BYTEBUFFER: ${{ matrix.skip_bytebuffer }}
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
MAVEN_OPTS: >
-Xmx2G -DskipShade -DfailIfNoTests=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=25
-Dmaven.wagon.http.retryHandler.count=30 -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false
Expand All @@ -245,7 +245,7 @@ jobs:
env:
RUN_INTEGRATION_TESTS: true
RUN_TEST_SET: ${{ matrix.testset }}
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
MAVEN_OPTS: >
-Xmx2G -DskipShade -DfailIfNoTests=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=25
-Dmaven.wagon.http.retryHandler.count=30 -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false
Expand Down Expand Up @@ -407,7 +407,7 @@ jobs:
- uses: actions/cache@v4
env:
SEGMENT_DOWNLOAD_TIMEOUT_MINS: 10
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
with:
path: ~/.m2/repository
key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,4 @@ kubernetes/helm/**/Chart.lock

#Develocity
.mvn/.gradle-enterprise/
.mvn/.develocity/
19 changes: 9 additions & 10 deletions .mvn/gradle-enterprise.xml → .mvn/develocity.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,19 @@
under the License.
-->
<gradleEnterprise xmlns="https://www.gradle.com/gradle-enterprise-maven" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="https://www.gradle.com/gradle-enterprise-maven https://www.gradle.com/schema/gradle-enterprise-maven.xsd">
<develocity xmlns="https://www.gradle.com/develocity-maven" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="https://www.gradle.com/gradle-enterprise-maven https://www.gradle.com/schema/develocity-maven.xsd">
<projectId>pinot</projectId>
<server>
<url>https://ge.apache.org</url>
<url>https://develocity.apache.org</url>
<allowUntrusted>false</allowUntrusted>
</server>
<buildScan>
<capture>
<goalInputFiles>true</goalInputFiles>
<buildLogging>true</buildLogging>
<testLogging>true</testLogging>
</capture>
<backgroundBuildScanUpload>#{isFalse(env['GITHUB_ACTIONS'])}</backgroundBuildScanUpload>
<publish>ALWAYS</publish>
<publishIfAuthenticated>true</publishIfAuthenticated>
<publishing>
<onlyIf>
<![CDATA[authenticated]]>
</onlyIf>
</publishing>
<obfuscation>
<ipAddresses>#{{'0.0.0.0'}}</ipAddresses>
</obfuscation>
Expand All @@ -45,4 +44,4 @@
<enabled>false</enabled>
</remote>
</buildCache>
</gradleEnterprise>
</develocity>
4 changes: 2 additions & 2 deletions .mvn/extensions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
<extensions xmlns="http://maven.apache.org/EXTENSIONS/1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/EXTENSIONS/1.0.0 http://maven.apache.org/xsd/core-extensions-1.0.0.xsd">
<extension>
<groupId>com.gradle</groupId>
<artifactId>gradle-enterprise-maven-extension</artifactId>
<version>1.20.1</version>
<artifactId>develocity-maven-extension</artifactId>
<version>1.23.1</version>
</extension>
<extension>
<groupId>com.gradle</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ rules:
tableType: "$6"
partition: "$7"
# Gauges that accept the controller taskType
- pattern: "\"org\\.apache\\.pinot\\.common\\.metrics\"<type=\"ControllerMetrics\", name=\"pinot\\.controller\\.(numMinionTasksInProgress|numMinionSubtasksRunning|numMinionSubtasksWaiting|numMinionSubtasksError|numMinionSubtasksUnknown|percentMinionSubtasksInQueue|percentMinionSubtasksInError)\\.(\\w+)\"><>(\\w+)"
- pattern: "\"org\\.apache\\.pinot\\.common\\.metrics\"<type=\"ControllerMetrics\", name=\"pinot\\.controller\\.(numMinionTasksInProgress|numMinionSubtasksRunning|numMinionSubtasksWaiting|numMinionSubtasksError|numMinionSubtasksUnknown|numMinionSubtasksDropped|numMinionSubtasksTimedOut|numMinionSubtasksAborted|percentMinionSubtasksInQueue|percentMinionSubtasksInError)\\.(\\w+)\"><>(\\w+)"
name: "pinot_controller_$1_$3"
cache: true
labels:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,22 +512,21 @@ private void updateInstanceConfigAndBrokerResourceIfNeeded() {
boolean shouldUpdateBrokerResource = false;
List<String> instanceTags = instanceConfig.getTags();
if (instanceTags.isEmpty()) {
// This is a new broker (first time joining the cluster)
if (ZKMetadataProvider.getClusterTenantIsolationEnabled(_propertyStore)) {
// This is a new broker (first time joining the cluster). We allow configuring initial broker tags regardless of
// tenant isolation mode since it defaults to true and is relatively obscure.
String instanceTagsConfig = _brokerConf.getProperty(Broker.CONFIG_OF_BROKER_INSTANCE_TAGS);
if (StringUtils.isNotEmpty(instanceTagsConfig)) {
for (String instanceTag : StringUtils.split(instanceTagsConfig, ',')) {
Preconditions.checkArgument(TagNameUtils.isBrokerTag(instanceTag), "Illegal broker instance tag: %s",
instanceTag);
instanceConfig.addTag(instanceTag);
}
shouldUpdateBrokerResource = true;
} else if (ZKMetadataProvider.getClusterTenantIsolationEnabled(_propertyStore)) {
instanceConfig.addTag(TagNameUtils.getBrokerTagForTenant(null));
shouldUpdateBrokerResource = true;
} else {
String instanceTagsConfig = _brokerConf.getProperty(Broker.CONFIG_OF_BROKER_INSTANCE_TAGS);
if (StringUtils.isNotEmpty(instanceTagsConfig)) {
for (String instanceTag : StringUtils.split(instanceTagsConfig, ',')) {
Preconditions.checkArgument(TagNameUtils.isBrokerTag(instanceTag), "Illegal broker instance tag: %s",
instanceTag);
instanceConfig.addTag(instanceTag);
}
shouldUpdateBrokerResource = true;
} else {
instanceConfig.addTag(Helix.UNTAGGED_BROKER_INSTANCE);
}
instanceConfig.addTag(Helix.UNTAGGED_BROKER_INSTANCE);
}
instanceTags = instanceConfig.getTags();
updated = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,10 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
1);
}

// server returns STRING as default dataType for all columns in (some) scenarios where no rows are returned
// this is an attempt to return more faithful information based on other sources
ParserUtils.fillEmptyResponseSchema(brokerResponse, _tableCache, schema, database, query);

// Set total query processing time
long totalTimeMs = System.currentTimeMillis() - requestContext.getRequestArrivalTimeMillis();
brokerResponse.setTimeUsedMs(totalTimeMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -64,6 +65,7 @@
import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.routing.QueryServerInstance;
import org.apache.pinot.query.routing.WorkerManager;
import org.apache.pinot.query.runtime.MultiStageStatsTreeBuilder;
import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
Expand Down Expand Up @@ -199,8 +201,13 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
}

DispatchableSubPlan dispatchableSubPlan = queryPlanResult.getQueryPlan();
Set<String> tableNames = queryPlanResult.getTableNames();

Set<QueryServerInstance> servers = new HashSet<>();
for (DispatchablePlanFragment planFragment: dispatchableSubPlan.getQueryStageList()) {
servers.addAll(planFragment.getServerInstances());
}

Set<String> tableNames = queryPlanResult.getTableNames();
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.MULTI_STAGE_QUERIES_GLOBAL, 1);
for (String tableName : tableNames) {
_brokerMetrics.addMeteredTableValue(tableName, BrokerMeter.MULTI_STAGE_QUERIES, 1);
Expand Down Expand Up @@ -231,10 +238,11 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
}

Timer queryTimer = new Timer(queryTimeoutMs);
int estimatedNumQueryThreads = dispatchableSubPlan.getEstimatedNumQueryThreads();
try {
// It's fine to block in this thread because we use a separate thread pool from the main Jersey server to process
// these requests.
if (!_queryThrottler.tryAcquire(queryTimeoutMs, TimeUnit.MILLISECONDS)) {
if (!_queryThrottler.tryAcquire(estimatedNumQueryThreads, queryTimeoutMs, TimeUnit.MILLISECONDS)) {
LOGGER.warn("Timed out waiting to execute request {}: {}", requestId, query);
requestContext.setErrorCode(QueryException.EXECUTION_TIMEOUT_ERROR_CODE);
return new BrokerResponseNative(QueryException.EXECUTION_TIMEOUT_ERROR);
Expand Down Expand Up @@ -277,8 +285,12 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
brokerResponse.setResultTable(queryResults.getResultTable());
brokerResponse.setTablesQueried(tableNames);
// TODO: Add servers queried/responded stats
brokerResponse.setBrokerReduceTimeMs(queryResults.getBrokerReduceTimeMs());
// MSE cannot finish if a single queried server did not respond, so we can use the same count for
// both the queried and responded stats. Minus one prevents the broker to be included in the count
// (it will always be included because of the root of the query plan)
brokerResponse.setNumServersQueried(servers.size() - 1);
brokerResponse.setNumServersResponded(servers.size() - 1);

// Attach unavailable segments
int numUnavailableSegments = 0;
Expand Down Expand Up @@ -311,7 +323,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO

return brokerResponse;
} finally {
_queryThrottler.release();
_queryThrottler.release(estimatedNumQueryThreads);
}
}

Expand Down
Loading

0 comments on commit 8db5bae

Please sign in to comment.