Skip to content

Commit

Permalink
[NO ISSUE] Readability improvements
Browse files Browse the repository at this point in the history
Change-Id: I8b27805be1668fe6591c442fcd44020a418c2931
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2333
Reviewed-by: Murtadha Hubail <[email protected]>
Integration-Tests: Murtadha Hubail <[email protected]>
Tested-by: Murtadha Hubail <[email protected]>
  • Loading branch information
westmann committed Jan 29, 2018
1 parent 06f16ca commit f125348
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.io.IOException;
import java.io.PrintWriter;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -95,6 +94,7 @@
import org.apache.hyracks.algebricks.core.rewriter.base.AlgebricksOptimizationContext;
import org.apache.hyracks.algebricks.core.rewriter.base.IOptimizationContextFactory;
import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import org.apache.hyracks.algebricks.data.IPrinterFactoryProvider;
import org.apache.hyracks.api.client.IClusterInfoCollector;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.client.NodeControllerInfo;
Expand All @@ -106,6 +106,7 @@
import org.apache.hyracks.control.common.config.OptionTypes;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.collect.ImmutableSet;

/**
Expand Down Expand Up @@ -201,63 +202,48 @@ public Pair<IReturningStatement, Integer> reWriteQuery(List<FunctionDecl> declar
}

public JobSpecification compileQuery(IClusterInfoCollector clusterInfoCollector, MetadataProvider metadataProvider,
Query rwQ, int varCounter, String outputDatasetName, SessionOutput output, ICompiledDmlStatement statement)
throws AlgebricksException, RemoteException, ACIDException {
Query query, int varCounter, String outputDatasetName, SessionOutput output,
ICompiledDmlStatement statement) throws AlgebricksException, ACIDException {

// establish facts
final boolean isQuery = query != null;
final boolean isLoad = statement != null && statement.getKind() == Statement.Kind.LOAD;

SessionConfig conf = output.config();
if (!conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) && conf.is(SessionConfig.OOB_REWRITTEN_EXPR_TREE)) {
output.out().println();

printPlanPrefix(output, "Rewritten expression tree");
if (rwQ != null) {
rwQ.accept(astPrintVisitorFactory.createLangVisitor(output.out()), 0);
if (isQuery) {
query.accept(astPrintVisitorFactory.createLangVisitor(output.out()), 0);
}
printPlanPostfix(output);
}

TxnId txnId = TxnIdFactory.create();
final TxnId txnId = TxnIdFactory.create();
metadataProvider.setTxnId(txnId);
ILangExpressionToPlanTranslator t =
translatorFactory.createExpressionToPlanTranslator(metadataProvider, varCounter);

ILogicalPlan plan;
// statement = null when it's a query
if (statement == null || statement.getKind() != Statement.Kind.LOAD) {
plan = t.translate(rwQ, outputDatasetName, statement);
} else {
plan = t.translateLoad(statement);
}
ILogicalPlan plan = isLoad ? t.translateLoad(statement) : t.translate(query, outputDatasetName, statement);

if (!conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) && conf.is(SessionConfig.OOB_LOGICAL_PLAN)) {
output.out().println();

printPlanPrefix(output, "Logical plan");
if (rwQ != null || (statement != null && statement.getKind() == Statement.Kind.LOAD)) {
if (isQuery || isLoad) {
PlanPrettyPrinter.printPlan(plan, getPrettyPrintVisitor(output.config().getLpfmt(), output.out()), 0);
}
printPlanPostfix(output);
}
CompilerProperties compilerProperties = metadataProvider.getApplicationContext().getCompilerProperties();
int frameSize = compilerProperties.getFrameSize();
Map<String, String> querySpecificConfig = metadataProvider.getConfig();
validateConfig(querySpecificConfig); // Validates the user-overridden query parameters.
int sortFrameLimit = getFrameLimit(CompilerProperties.COMPILER_SORTMEMORY_KEY,
querySpecificConfig.get(CompilerProperties.COMPILER_SORTMEMORY_KEY),
compilerProperties.getSortMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_SORT);
int groupFrameLimit = getFrameLimit(CompilerProperties.COMPILER_GROUPMEMORY_KEY,
querySpecificConfig.get(CompilerProperties.COMPILER_GROUPMEMORY_KEY),
compilerProperties.getGroupMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_GROUP_BY);
int joinFrameLimit = getFrameLimit(CompilerProperties.COMPILER_JOINMEMORY_KEY,
querySpecificConfig.get(CompilerProperties.COMPILER_JOINMEMORY_KEY),
compilerProperties.getJoinMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_JOIN);
OptimizationConfUtil.getPhysicalOptimizationConfig().setFrameSize(frameSize);
OptimizationConfUtil.getPhysicalOptimizationConfig().setMaxFramesExternalSort(sortFrameLimit);
OptimizationConfUtil.getPhysicalOptimizationConfig().setMaxFramesExternalGroupBy(groupFrameLimit);
OptimizationConfUtil.getPhysicalOptimizationConfig().setMaxFramesForJoin(joinFrameLimit);
Map<String, String> querySpecificConfig = validateConfig(metadataProvider.getConfig());
final PhysicalOptimizationConfig physOptConf =
getPhysicalOptimizationConfig(compilerProperties, querySpecificConfig);

HeuristicCompilerFactoryBuilder builder =
new HeuristicCompilerFactoryBuilder(OptimizationContextFactory.INSTANCE);
builder.setPhysicalOptimizationConfig(OptimizationConfUtil.getPhysicalOptimizationConfig());
builder.setPhysicalOptimizationConfig(physOptConf);
builder.setLogicalRewrites(ruleSetFactory.getLogicalRewrites(metadataProvider.getApplicationContext()));
builder.setPhysicalRewrites(ruleSetFactory.getPhysicalRewrites(metadataProvider.getApplicationContext()));
IDataFormat format = metadataProvider.getDataFormat();
Expand Down Expand Up @@ -285,15 +271,15 @@ public JobSpecification compileQuery(IClusterInfoCollector clusterInfoCollector,
PlanPrettyPrinter.printPhysicalOps(plan, buffer, 0);
} else {
printPlanPrefix(output, "Optimized logical plan");
if (rwQ != null || (statement != null && statement.getKind() == Statement.Kind.LOAD)) {
if (isQuery || isLoad) {
PlanPrettyPrinter.printPlan(plan,
getPrettyPrintVisitor(output.config().getLpfmt(), output.out()), 0);
}
printPlanPostfix(output);
}
}
}
if (rwQ != null && rwQ.isExplain()) {
if (isQuery && query.isExplain()) {
try {
LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor();
PlanPrettyPrinter.printPlan(plan, pvisitor, 0);
Expand All @@ -318,25 +304,7 @@ public JobSpecification compileQuery(IClusterInfoCollector clusterInfoCollector,
builder.setHashFunctionFamilyProvider(format.getBinaryHashFunctionFamilyProvider());
builder.setMissingWriterFactory(format.getMissingWriterFactory());
builder.setPredicateEvaluatorFactoryProvider(format.getPredicateEvaluatorFactoryProvider());

final SessionConfig.OutputFormat outputFormat = conf.fmt();
switch (outputFormat) {
case LOSSLESS_JSON:
builder.setPrinterProvider(format.getLosslessJSONPrinterFactoryProvider());
break;
case CSV:
builder.setPrinterProvider(format.getCSVPrinterFactoryProvider());
break;
case ADM:
builder.setPrinterProvider(format.getADMPrinterFactoryProvider());
break;
case CLEAN_JSON:
builder.setPrinterProvider(format.getCleanJSONPrinterFactoryProvider());
break;
default:
throw new AlgebricksException("Unexpected OutputFormat: " + outputFormat);
}

builder.setPrinterProvider(getPrinterFactoryProvider(format, conf.fmt()));
builder.setSerializerDeserializerProvider(format.getSerdeProvider());
builder.setTypeTraitProvider(format.getTypeTraitProvider());
builder.setNormalizedKeyComputerFactoryProvider(format.getNormalizedKeyComputerFactoryProvider());
Expand All @@ -345,32 +313,73 @@ public JobSpecification compileQuery(IClusterInfoCollector clusterInfoCollector,
new JobEventListenerFactory(txnId, metadataProvider.isWriteTransaction());
JobSpecification spec = compiler.createJob(metadataProvider.getApplicationContext(), jobEventListenerFactory);

// When the top-level statement is a query, the statement parameter is null.
if (statement == null) {
if (isQuery) {
// Sets a required capacity, only for read-only queries.
// DDLs and DMLs are considered not that frequent.
// limit the computation locations to the locations that will be used in the query
final AlgebricksAbsolutePartitionConstraint jobLocations = getJobLocations(spec,
metadataProvider.getApplicationContext().getNodeJobTracker(), computationLocations);
final IClusterCapacity jobRequiredCapacity = ResourceUtils.getRequiredCapacity(plan, jobLocations,
sortFrameLimit, groupFrameLimit, joinFrameLimit, frameSize);
final INodeJobTracker nodeJobTracker = metadataProvider.getApplicationContext().getNodeJobTracker();
final AlgebricksAbsolutePartitionConstraint jobLocations =
getJobLocations(spec, nodeJobTracker, computationLocations);
final IClusterCapacity jobRequiredCapacity =
ResourceUtils.getRequiredCapacity(plan, jobLocations, physOptConf);
spec.setRequiredClusterCapacity(jobRequiredCapacity);
}

printJobSpec(query, spec, conf, output);
return spec;
}

protected PhysicalOptimizationConfig getPhysicalOptimizationConfig(CompilerProperties compilerProperties,
Map<String, String> querySpecificConfig) throws AlgebricksException {
int frameSize = compilerProperties.getFrameSize();
int sortFrameLimit = getFrameLimit(CompilerProperties.COMPILER_SORTMEMORY_KEY,
querySpecificConfig.get(CompilerProperties.COMPILER_SORTMEMORY_KEY),
compilerProperties.getSortMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_SORT);
int groupFrameLimit = getFrameLimit(CompilerProperties.COMPILER_GROUPMEMORY_KEY,
querySpecificConfig.get(CompilerProperties.COMPILER_GROUPMEMORY_KEY),
compilerProperties.getGroupMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_GROUP_BY);
int joinFrameLimit = getFrameLimit(CompilerProperties.COMPILER_JOINMEMORY_KEY,
querySpecificConfig.get(CompilerProperties.COMPILER_JOINMEMORY_KEY),
compilerProperties.getJoinMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_JOIN);
final PhysicalOptimizationConfig physOptConf = OptimizationConfUtil.getPhysicalOptimizationConfig();
physOptConf.setFrameSize(frameSize);
physOptConf.setMaxFramesExternalSort(sortFrameLimit);
physOptConf.setMaxFramesExternalGroupBy(groupFrameLimit);
physOptConf.setMaxFramesForJoin(joinFrameLimit);
return physOptConf;
}

protected IPrinterFactoryProvider getPrinterFactoryProvider(IDataFormat format,
SessionConfig.OutputFormat outputFormat) throws AlgebricksException {
switch (outputFormat) {
case LOSSLESS_JSON:
return format.getLosslessJSONPrinterFactoryProvider();
case CSV:
return format.getCSVPrinterFactoryProvider();
case ADM:
return format.getADMPrinterFactoryProvider();
case CLEAN_JSON:
return format.getCleanJSONPrinterFactoryProvider();
default:
throw new AlgebricksException("Unexpected OutputFormat: " + outputFormat);
}
}

protected void printJobSpec(Query rwQ, JobSpecification spec, SessionConfig conf, SessionOutput output)
throws AlgebricksException {
if (conf.is(SessionConfig.OOB_HYRACKS_JOB)) {
printPlanPrefix(output, "Hyracks job");
if (rwQ != null) {
try {
output.out().println(
new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(spec.toJSON()));
final ObjectWriter objectWriter = new ObjectMapper().writerWithDefaultPrettyPrinter();
output.out().println(objectWriter.writeValueAsString(spec.toJSON()));
} catch (IOException e) {
throw new AlgebricksException(e);
}
output.out().println(spec.getUserConstraints());
}
printPlanPostfix(output);
}
return spec;
}

private AbstractLogicalOperatorPrettyPrintVisitor getPrettyPrintVisitor(SessionConfig.PlanFormat planFormat,
Expand All @@ -390,7 +399,6 @@ public void executeJobArray(IHyracksClientConnection hcc, JobSpecification[] spe
double duration = (endTime - startTime) / 1000.00;
out.println("<pre>Duration: " + duration + " sec</pre>");
}

}

public void executeJobArray(IHyracksClientConnection hcc, Job[] jobs, PrintWriter out) throws Exception {
Expand Down Expand Up @@ -499,12 +507,13 @@ private static int getParallelism(String parameter, int parallelismInConfigurati
}

// Validates if the query contains unsupported query parameters.
private static void validateConfig(Map<String, String> config) throws AlgebricksException {
private static Map<String, String> validateConfig(Map<String, String> config) throws AlgebricksException {
for (String parameterName : config.keySet()) {
if (!CONFIGURABLE_PARAMETER_NAMES.contains(parameterName)) {
throw AsterixException.create(ErrorCode.COMPILATION_UNSUPPORTED_QUERY_PARAMETER, parameterName);
}
}
return config;
}

public static AlgebricksAbsolutePartitionConstraint getJobLocations(JobSpecification spec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import org.apache.hyracks.api.job.resource.ClusterCapacity;
import org.apache.hyracks.api.job.resource.IClusterCapacity;

Expand All @@ -40,21 +41,20 @@ private ResourceUtils() {
* a given query plan.
* @param computationLocations,
* the partitions for computation.
* @param sortFrameLimit,
* the frame limit for one sorter partition.
* @param groupFrameLimit,
* the frame limit for one group-by partition.
* @param joinFrameLimit
* the frame limit for one joiner partition.
* @param frameSize
* the frame size used in query execution.
* @param physicalOptimizationConfig,
* a PhysicalOptimizationConfig.
* @return the required cluster capacity for executing the query.
* @throws AlgebricksException
* if the query plan is malformed.
*/
public static IClusterCapacity getRequiredCapacity(ILogicalPlan plan,
AlgebricksAbsolutePartitionConstraint computationLocations, int sortFrameLimit, int groupFrameLimit,
int joinFrameLimit, int frameSize) throws AlgebricksException {
AlgebricksAbsolutePartitionConstraint computationLocations,
PhysicalOptimizationConfig physicalOptimizationConfig) throws AlgebricksException {
final int frameSize = physicalOptimizationConfig.getFrameSize();
final int sortFrameLimit = physicalOptimizationConfig.getMaxFramesExternalSort();
final int groupFrameLimit = physicalOptimizationConfig.getMaxFramesForGroupBy();
final int joinFrameLimit = physicalOptimizationConfig.getMaxFramesForJoin();

// Creates a cluster capacity visitor.
IClusterCapacity clusterCapacity = new ClusterCapacity();
RequiredCapacityVisitor visitor = new RequiredCapacityVisitor(computationLocations.getLocations().length,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,7 @@ public IDataSourceIndex<String, DataSourceId> findDataSourceIndex(String indexId
throws AlgebricksException {
DataSource source = findDataSource(dataSourceId);
Dataset dataset = ((DatasetDataSource) source).getDataset();
String indexName = indexId;
Index secondaryIndex = getIndex(dataset.getDataverseName(), dataset.getDatasetName(), indexName);
Index secondaryIndex = getIndex(dataset.getDataverseName(), dataset.getDatasetName(), indexId);
return (secondaryIndex != null)
? new DataSourceIndex(secondaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), this)
: null;
Expand All @@ -381,26 +380,19 @@ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntim
List<LogicalVariable> projectVariables, boolean projectPushed, List<LogicalVariable> minFilterVars,
List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
JobGenContext context, JobSpecification jobSpec, Object implConfig) throws AlgebricksException {
try {
return ((DataSource) dataSource).buildDatasourceScanRuntime(this, dataSource, scanVariables,
projectVariables, projectPushed, minFilterVars, maxFilterVars, opSchema, typeEnv, context, jobSpec,
implConfig);
} catch (AsterixException e) {
throw new AlgebricksException(e);
}
return ((DataSource) dataSource).buildDatasourceScanRuntime(this, dataSource, scanVariables, projectVariables,
projectPushed, minFilterVars, maxFilterVars, opSchema, typeEnv, context, jobSpec, implConfig);
}

protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildLoadableDatasetScan(
JobSpecification jobSpec, IAdapterFactory adapterFactory, RecordDescriptor rDesc)
throws AlgebricksException {
ExternalScanOperatorDescriptor dataScanner = new ExternalScanOperatorDescriptor(jobSpec, rDesc, adapterFactory);
AlgebricksPartitionConstraint constraint;
try {
constraint = adapterFactory.getPartitionConstraint();
return new Pair<>(dataScanner, adapterFactory.getPartitionConstraint());
} catch (Exception e) {
throw new AlgebricksException(e);
}
return new Pair<>(dataScanner, constraint);
}

public Dataverse findDataverse(String dataverseName) throws AlgebricksException {
Expand Down

0 comments on commit f125348

Please sign in to comment.