Skip to content

Commit

Permalink
Return 503 for all interrupted queries. Refactor the query killing co…
Browse files Browse the repository at this point in the history
…de. (apache#10683)

* Change the query cancellation error code to 503

Refine the return error code of query killing

* Trigger Test

* Trigger Test
  • Loading branch information
jasperjiaguo authored May 10, 2023
1 parent 738e607 commit 3a8c578
Show file tree
Hide file tree
Showing 13 changed files with 57 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ public void start()
_brokerConf.getProperty(CommonConstants.Broker.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
CommonConstants.Broker.DEFAULT_THREAD_ALLOCATED_BYTES_MEASUREMENT));
Tracing.ThreadAccountantOps
.initializeThreadAccountant(_brokerConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX));
.initializeThreadAccountant(_brokerConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), _instanceId);

String controllerUrl = _brokerConf.getProperty(Broker.CONTROLLER_URL);
if (controllerUrl != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public static void setMaxLinesOfStackTrace(int maxLinesOfStackTracePerFrame) {
public static final int ACCESS_DENIED_ERROR_CODE = 180;
public static final int TABLE_DOES_NOT_EXIST_ERROR_CODE = 190;
public static final int QUERY_EXECUTION_ERROR_CODE = 200;
public static final int QUERY_CANCELLATION_ERROR_CODE = 205;
public static final int QUERY_CANCELLATION_ERROR_CODE = 503;
// TODO: Handle these errors in broker
public static final int SERVER_SHUTTING_DOWN_ERROR_CODE = 210;
public static final int SERVER_OUT_OF_CAPACITY_ERROR_CODE = 211;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
public class HeapUsagePublishingAccountantFactory implements ThreadAccountantFactory {

@Override
public ThreadResourceUsageAccountant init(PinotConfiguration config) {
public ThreadResourceUsageAccountant init(PinotConfiguration config, String instanceId) {
int period = config.getProperty(CommonConstants.Accounting.CONFIG_OF_HEAP_USAGE_PUBLISHING_PERIOD_MS,
CommonConstants.Accounting.DEFAULT_HEAP_USAGE_PUBLISH_PERIOD);
return new HeapUsagePublishingResourceUsageAccountant(period);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@
public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory {

@Override
public ThreadResourceUsageAccountant init(PinotConfiguration config) {
return new PerQueryCPUMemResourceUsageAccountant(config);
public ThreadResourceUsageAccountant init(PinotConfiguration config, String instanceId) {
return new PerQueryCPUMemResourceUsageAccountant(config, instanceId);
}

public static class PerQueryCPUMemResourceUsageAccountant extends Tracing.DefaultThreadResourceUsageAccountant {
Expand Down Expand Up @@ -125,10 +125,14 @@ public static class PerQueryCPUMemResourceUsageAccountant extends Tracing.Defaul
// the periodical task that aggregates and preempts queries
private final WatcherTask _watcherTask;

public PerQueryCPUMemResourceUsageAccountant(PinotConfiguration config) {
// instance id of the current instance, for logging purpose
private final String _instanceId;

public PerQueryCPUMemResourceUsageAccountant(PinotConfiguration config, String instanceId) {

LOGGER.info("Initializing PerQueryCPUMemResourceUsageAccountant");
_config = config;
_instanceId = instanceId;

boolean threadCpuTimeMeasurementEnabled = ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled();
boolean threadMemoryMeasurementEnabled = ThreadResourceUsageProvider.isThreadMemoryMeasurementEnabled();
Expand Down Expand Up @@ -540,6 +544,11 @@ class WatcherTask implements Runnable {
_config.getProperty(CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_THRESHOLD_MS,
CommonConstants.Accounting.DEFAULT_CPU_TIME_BASED_KILLING_THRESHOLD_MS) * 1000_000L;

//
private final boolean _isQueryKilledMetricEnabled =
_config.getProperty(CommonConstants.Accounting.CONFIG_OF_QUERY_KILLED_METRIC_ENABLED,
CommonConstants.Accounting.DEFAULT_QUERY_KILLED_METRIC_ENABLED);

private final InstanceType _instanceType =
InstanceType.valueOf(_config.getProperty(CommonConstants.Accounting.CONFIG_OF_INSTANCE_TYPE,
CommonConstants.Accounting.DEFAULT_CONFIG_OF_INSTANCE_TYPE.toString()));
Expand Down Expand Up @@ -730,7 +739,9 @@ void killAllQueries() {
killedCount += 1;
}
}
_metrics.addMeteredGlobalValue(_queryKilledMeter, killedCount);
if (_isQueryKilledMetricEnabled) {
_metrics.addMeteredGlobalValue(_queryKilledMeter, killedCount);
}
try {
Thread.sleep(_normalSleepTime);
} catch (InterruptedException ignored) {
Expand Down Expand Up @@ -778,8 +789,8 @@ private void killMostExpensiveQuery() {
if (shouldKill) {
maxUsageTuple._exceptionAtomicReference
.set(new RuntimeException(String.format(
" Query %s got killed because using %d bytes of memory on %s, exceeding the quota",
maxUsageTuple._queryId, maxUsageTuple.getAllocatedBytes(), _instanceType)));
" Query %s got killed because using %d bytes of memory on %s: %s, exceeding the quota",
maxUsageTuple._queryId, maxUsageTuple.getAllocatedBytes(), _instanceType, _instanceId)));
interruptRunnerThread(maxUsageTuple.getAnchorThread());
LOGGER.error("Query {} got picked because using {} bytes of memory, actual kill committed true}",
maxUsageTuple._queryId, maxUsageTuple._allocatedBytes);
Expand All @@ -797,8 +808,8 @@ private void killMostExpensiveQuery() {
if (_oomKillQueryEnabled) {
maxUsageTuple._exceptionAtomicReference
.set(new RuntimeException(String.format(
" Query %s got killed because memory pressure, using %d ns of CPU time on %s",
maxUsageTuple._queryId, maxUsageTuple.getAllocatedBytes(), _instanceType)));
" Query %s got killed because memory pressure, using %d ns of CPU time on %s: %s",
maxUsageTuple._queryId, maxUsageTuple.getAllocatedBytes(), _instanceType, _instanceId)));
interruptRunnerThread(maxUsageTuple.getAnchorThread());
LOGGER.error("Query {} got picked because using {} ns of cpu time, actual kill committed true",
maxUsageTuple._allocatedBytes, maxUsageTuple._queryId);
Expand All @@ -819,8 +830,9 @@ private void killCPUTimeExceedQueries() {
LOGGER.error("Query {} got picked because using {} ns of cpu time, greater than threshold {}",
value._queryId, value.getCpuNS(), _cpuTimeBasedKillingThresholdNS);
value._exceptionAtomicReference.set(new RuntimeException(
String.format("Query %s got killed on %s because using %d CPU time exceeding limit of %d ns CPU time",
value._queryId, _instanceType, value.getCpuNS(), _cpuTimeBasedKillingThresholdNS)));
String.format("Query %s got killed on %s: %s because using %d "
+ "CPU time exceeding limit of %d ns CPU time",
value._queryId, _instanceType, _instanceId, value.getCpuNS(), _cpuTimeBasedKillingThresholdNS)));
interruptRunnerThread(value.getAnchorThread());
}
}
Expand All @@ -829,7 +841,9 @@ private void killCPUTimeExceedQueries() {

private void interruptRunnerThread(Thread thread) {
thread.interrupt();
_metrics.addMeteredGlobalValue(_queryKilledMeter, 1);
if (_isQueryKilledMetricEnabled) {
_metrics.addMeteredGlobalValue(_queryKilledMeter, 1);
}
_numQueriesKilledConsecutively += 1;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@
*/
public class PerQueryCPUMemAccountantFactoryForTest implements ThreadAccountantFactory {
@Override
public ThreadResourceUsageAccountant init(PinotConfiguration config) {
return new PerQueryCPUMemResourceUsageAccountantBrokerKillingTest(config);
public ThreadResourceUsageAccountant init(PinotConfiguration config, String instanceId) {
return new PerQueryCPUMemResourceUsageAccountantBrokerKillingTest(config, instanceId);
}

public static class PerQueryCPUMemResourceUsageAccountantBrokerKillingTest
extends PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant {
public PerQueryCPUMemResourceUsageAccountantBrokerKillingTest(PinotConfiguration config) {
super(config);
public PerQueryCPUMemResourceUsageAccountantBrokerKillingTest(PinotConfiguration config, String instanceId) {
super(config, instanceId);
}

public void postAggregation(Map<String, AggregatedStats> aggregatedUsagePerActiveQuery) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.spi.exception.QueryCancelledException;


public class ExceptionResultsBlock extends BaseResultsBlock {
Expand All @@ -38,6 +39,10 @@ public ExceptionResultsBlock(Throwable t) {
this(QueryException.QUERY_EXECUTION_ERROR, t);
}

public ExceptionResultsBlock(QueryCancelledException t) {
this(QueryException.QUERY_CANCELLATION_ERROR, t);
}

@Override
public int getNumRows() {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
import org.apache.pinot.core.util.trace.TracedThreadFactory;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -94,8 +93,6 @@ public ResourceManager(PinotConfiguration config) {
CommonConstants.ExecutorService.PINOT_QUERY_WORKER_NAME_FORMAT);
_queryWorkers =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(_numQueryWorkerThreads, queryWorkersFactory));

Tracing.ThreadAccountantOps.initializeThreadAccountant(config);
}

public void stop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory;
import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactoryForTest;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
Expand Down Expand Up @@ -254,6 +255,8 @@ public void testDigestOOMMultipleQueries()
LOGGER.info("testDigestOOMMultipleQueries: {}", queryResponse1);
Assert.assertTrue(queryResponse1.get().get("exceptions").toString().contains(
"Interrupted in broker reduce phase"));
Assert.assertTrue(queryResponse1.get().get("exceptions").toString().contains("\"errorCode\":"
+ QueryException.QUERY_CANCELLATION_ERROR_CODE));
Assert.assertTrue(queryResponse1.get().get("exceptions").toString().contains("got killed because"));
Assert.assertFalse(StringUtils.isEmpty(queryResponse2.get().get("exceptions").toString()));
Assert.assertFalse(StringUtils.isEmpty(queryResponse3.get().get("exceptions").toString()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.config.instance.InstanceType;
Expand All @@ -58,8 +59,8 @@
/**
* Integration test for heap size based server query killing, this works only for xmx4G
*/
public class OfflineClusterMemBasedServerQueryKilingTest extends BaseClusterIntegrationTestSet {
private static final Logger LOGGER = LoggerFactory.getLogger(OfflineClusterMemBasedServerQueryKilingTest.class);
public class OfflineClusterMemBasedServerQueryKillingTest extends BaseClusterIntegrationTestSet {
private static final Logger LOGGER = LoggerFactory.getLogger(OfflineClusterMemBasedServerQueryKillingTest.class);
public static final String STRING_DIM_SV1 = "stringDimSV1";
public static final String STRING_DIM_SV2 = "stringDimSV2";
public static final String INT_DIM_SV1 = "intDimSV1";
Expand Down Expand Up @@ -101,7 +102,7 @@ protected int getNumServers() {
public void setUp()
throws Exception {
// Setup logging and resource accounting
LogManager.getLogger(OfflineClusterMemBasedServerQueryKilingTest.class).setLevel(Level.INFO);
LogManager.getLogger(OfflineClusterMemBasedServerQueryKillingTest.class).setLevel(Level.INFO);
LogManager.getLogger(PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant.class)
.setLevel(Level.INFO);
LogManager.getLogger(ThreadResourceUsageProvider.class).setLevel(Level.INFO);
Expand Down Expand Up @@ -216,6 +217,8 @@ public void testDigestOOM()
throws Exception {
JsonNode queryResponse = postQuery(OOM_QUERY);
LOGGER.info("testDigestOOM: {}", queryResponse);
Assert.assertTrue(queryResponse.get("exceptions").toString().contains("\"errorCode\":"
+ QueryException.QUERY_CANCELLATION_ERROR_CODE));
Assert.assertTrue(queryResponse.get("exceptions").toString().contains("QueryCancelledException"));
Assert.assertTrue(queryResponse.get("exceptions").toString().contains("got killed because"));
}
Expand Down Expand Up @@ -267,6 +270,7 @@ public void testDigestOOMMultipleQueries()
);
countDownLatch.await();
LOGGER.info("testDigestOOMMultipleQueries: {}", queryResponse1);
Assert.assertTrue(queryResponse1.get().get("exceptions").toString().contains("\"errorCode\":503"));
Assert.assertTrue(queryResponse1.get().get("exceptions").toString().contains("QueryCancelledException"));
Assert.assertTrue(queryResponse1.get().get("exceptions").toString().contains("got killed because"));
Assert.assertFalse(StringUtils.isEmpty(queryResponse2.get().get("exceptions").toString()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.services.ServiceRole;
import org.apache.pinot.spi.services.ServiceStartable;
import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Helix;
import org.apache.pinot.spi.utils.CommonConstants.Helix.Instance;
Expand Down Expand Up @@ -544,6 +545,9 @@ public void start()
ServerMetrics serverMetrics = _serverInstance.getServerMetrics();
InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager();
instanceDataManager.setSupplierOfIsServerReadyToServeQueries(() -> _isServerReadyToServeQueries);
// initialize the thread accountant for query killing
Tracing.ThreadAccountantOps
.initializeThreadAccountant(_serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), _instanceId);
initSegmentFetcher(_serverConf);
StateModelFactory<?> stateModelFactory =
new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@


public interface ThreadAccountantFactory {
ThreadResourceUsageAccountant init(PinotConfiguration config);
ThreadResourceUsageAccountant init(PinotConfiguration config, String instanceId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ public static void clear() {
Tracing.getThreadAccountant().clear();
}

public static void initializeThreadAccountant(PinotConfiguration config) {
public static void initializeThreadAccountant(PinotConfiguration config, String instanceId) {
String factoryName = config.getProperty(CommonConstants.Accounting.CONFIG_OF_FACTORY_NAME);
if (factoryName == null) {
LOGGER.warn("No thread accountant factory provided, using default implementation");
Expand All @@ -266,7 +266,7 @@ public static void initializeThreadAccountant(PinotConfiguration config) {
try {
ThreadAccountantFactory threadAccountantFactory =
(ThreadAccountantFactory) Class.forName(factoryName).getDeclaredConstructor().newInstance();
boolean registered = Tracing.register(threadAccountantFactory.init(config));
boolean registered = Tracing.register(threadAccountantFactory.init(config, instanceId));
LOGGER.info("Using accountant provided by {}", factoryName);
if (!registered) {
LOGGER.warn("ThreadAccountant {} register unsuccessful, as it is already registered.", factoryName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,9 @@ public static class Accounting {

public static final String CONFIG_OF_GC_WAIT_TIME_MS = "accounting.gc.wait.time.ms";
public static final int DEFAULT_CONFIG_OF_GC_WAIT_TIME_MS = 0;

public static final String CONFIG_OF_QUERY_KILLED_METRIC_ENABLED = "accounting.query.killed.metric.enabled";
public static final boolean DEFAULT_QUERY_KILLED_METRIC_ENABLED = false;
}

public static class ExecutorService {
Expand Down

0 comments on commit 3a8c578

Please sign in to comment.