Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,6 @@ public Field(Optional<NodeLocation> nodeLocation, Optional<QualifiedName> relati
this.aliased = aliased;
}

public static Field newUnqualified(Optional<String> name, Type type)
{
requireNonNull(name, "name is null");
requireNonNull(type, "type is null");

return new Field(Optional.empty(), Optional.empty(), name, type, false, Optional.empty(), Optional.empty(), false);
}

public Optional<NodeLocation> getNodeLocation()
{
return nodeLocation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1520,7 +1520,7 @@ private void verifyRequiredColumns(TableFunctionInvocation node, Map<String, Lis
// the scope is recorded, because table arguments are already analyzed
Scope inputScope = analysis.getScope(tableArgumentsByName.get(name).getRelation());
columns.stream()
.filter(column -> column < 0 || column >= inputScope.getRelationType().getAllFieldCount()) // hidden columns can be required as well as visible columns
.filter(column -> column < 0 || column >= inputScope.getRelationType().getVisibleFieldCount())
.findFirst()
.ifPresent(column -> {
throw new SemanticException(TABLE_FUNCTION_IMPLEMENTATION_ERROR, "Invalid index: %s of required column from table argument %s", column, name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

class QueryPlanner
public class QueryPlanner
{
private final Analysis analysis;
private final VariableAllocator variableAllocator;
Expand Down Expand Up @@ -1355,6 +1355,11 @@ private static List<Expression> toSymbolReferences(List<VariableReferenceExpress
.collect(toImmutableList());
}

private static SymbolReference toSymbolReference(VariableReferenceExpression variable)
{
return new SymbolReference(variable.getSourceLocation().map(location -> new NodeLocation(location.getLine(), location.getColumn())), variable.getName());
}

public static class PlanAndMappings
{
private final PlanBuilder subPlan;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ protected RelationPlan visitTableFunctionInvocation(TableFunctionInvocation node
outputVariablesBuilder.build(),
sources.stream().map(RelationPlan::getRoot).collect(toImmutableList()),
inputRelationsProperties,
functionAnalysis.getCopartitioningLists(),
new TableFunctionHandle(functionAnalysis.getConnectorId(), functionAnalysis.getConnectorTableFunctionHandle(), functionAnalysis.getTransactionHandle()));

return new RelationPlan(root, scope, outputVariables);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ public PlanNode visitTableFunction(TableFunctionNode node, RewriteContext<Void>
node.getOutputVariables(),
node.getSources(),
node.getTableArgumentProperties(),
node.getCopartitioningLists(),
node.getHandle());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ public C get()
return userContext;
}

public SimplePlanRewriter<C> getNodeRewriter()
{
return nodeRewriter;
}

/**
* Invoke the rewrite logic recursively on children of the given node and swap it
* out with an identical copy with the rewritten children
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.errorprone.annotations.Immutable;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;

@Immutable
Expand All @@ -37,22 +41,24 @@ public class TableFunctionNode
{
private final String name;
private final Map<String, Argument> arguments;
private final List<VariableReferenceExpression> outputVariables;
private final List<VariableReferenceExpression> properOutputs;
private final List<PlanNode> sources;
private final List<TableArgumentProperties> tableArgumentProperties;
private final List<List<String>> copartitioningLists;
private final TableFunctionHandle handle;

@JsonCreator
public TableFunctionNode(
@JsonProperty("id") PlanNodeId id,
@JsonProperty("name") String name,
@JsonProperty("arguments") Map<String, Argument> arguments,
@JsonProperty("outputVariables") List<VariableReferenceExpression> outputVariables,
@JsonProperty("properOutputs") List<VariableReferenceExpression> properOutputs,
@JsonProperty("sources") List<PlanNode> sources,
@JsonProperty("tableArgumentProperties") List<TableArgumentProperties> tableArgumentProperties,
@JsonProperty("copartitioningLists") List<List<String>> copartitioningLists,
@JsonProperty("handle") TableFunctionHandle handle)
{
this(Optional.empty(), id, Optional.empty(), name, arguments, outputVariables, sources, tableArgumentProperties, handle);
this(Optional.empty(), id, Optional.empty(), name, arguments, properOutputs, sources, tableArgumentProperties, copartitioningLists, handle);
}

public TableFunctionNode(
Expand All @@ -61,17 +67,21 @@ public TableFunctionNode(
Optional<PlanNode> statsEquivalentPlanNode,
String name,
Map<String, Argument> arguments,
List<VariableReferenceExpression> outputVariables,
List<VariableReferenceExpression> properOutputs,
List<PlanNode> sources,
List<TableArgumentProperties> tableArgumentProperties,
List<List<String>> copartitioningLists,
TableFunctionHandle handle)
{
super(sourceLocation, id, statsEquivalentPlanNode);
this.name = requireNonNull(name, "name is null");
this.arguments = requireNonNull(arguments, "arguments is null");
this.outputVariables = requireNonNull(outputVariables, "outputVariables is null");
this.sources = requireNonNull(sources, "sources is null");
this.tableArgumentProperties = requireNonNull(tableArgumentProperties, "tableArgumentProperties is null");
this.arguments = ImmutableMap.copyOf(arguments);
this.properOutputs = ImmutableList.copyOf(properOutputs);
this.sources = ImmutableList.copyOf(sources);
this.tableArgumentProperties = ImmutableList.copyOf(tableArgumentProperties);
this.copartitioningLists = requireNonNull(copartitioningLists, "copartitioningLists is null").stream()
.map(ImmutableList::copyOf)
.collect(toImmutableList());
this.handle = requireNonNull(handle, "handle is null");
}

Expand All @@ -87,10 +97,25 @@ public Map<String, Argument> getArguments()
return arguments;
}

@JsonProperty
@Override
public List<VariableReferenceExpression> getOutputVariables()
{
return outputVariables;
ImmutableList.Builder<VariableReferenceExpression> variables = ImmutableList.builder();
variables.addAll(properOutputs);

tableArgumentProperties.stream()
.map(TableArgumentProperties::getPassThroughSpecification)
.map(PassThroughSpecification::getColumns)
.flatMap(Collection::stream)
.map(PassThroughColumn::getVariable)
.forEach(variables::add);

return variables.build();
}

public List<VariableReferenceExpression> getProperOutputs()
{
return properOutputs;
}

@JsonProperty
Expand All @@ -99,6 +124,12 @@ public List<TableArgumentProperties> getTableArgumentProperties()
return tableArgumentProperties;
}

@JsonProperty
public List<List<String>> getCopartitioningLists()
{
return copartitioningLists;
}

@JsonProperty
public TableFunctionHandle getHandle()
{
Expand All @@ -122,35 +153,47 @@ public <R, C> R accept(InternalPlanVisitor<R, C> visitor, C context)
public PlanNode replaceChildren(List<PlanNode> newSources)
{
checkArgument(sources.size() == newSources.size(), "wrong number of new children");
return new TableFunctionNode(getId(), name, arguments, outputVariables, newSources, tableArgumentProperties, handle);
return new TableFunctionNode(getId(), name, arguments, properOutputs, newSources, tableArgumentProperties, copartitioningLists, handle);
}

@Override
public PlanNode assignStatsEquivalentPlanNode(Optional<PlanNode> statsEquivalentPlanNode)
{
return new TableFunctionNode(getSourceLocation(), getId(), statsEquivalentPlanNode, name, arguments, outputVariables, sources, tableArgumentProperties, handle);
return new TableFunctionNode(getSourceLocation(), getId(), statsEquivalentPlanNode, name, arguments, properOutputs, sources, tableArgumentProperties, copartitioningLists, handle);
}

public static class TableArgumentProperties
{
private final String argumentName;
private final boolean rowSemantics;
private final boolean pruneWhenEmpty;
private final boolean passThroughColumns;
private final PassThroughSpecification passThroughSpecification;
private final List<VariableReferenceExpression> requiredColumns;
private final Optional<DataOrganizationSpecification> specification;

@JsonCreator
public TableArgumentProperties(
@JsonProperty("argumentName") String argumentName,
@JsonProperty("rowSemantics") boolean rowSemantics,
@JsonProperty("pruneWhenEmpty") boolean pruneWhenEmpty,
@JsonProperty("passThroughColumns") boolean passThroughColumns,
@JsonProperty("passThroughSpecification") PassThroughSpecification passThroughSpecification,
@JsonProperty("requiredColumns") List<VariableReferenceExpression> requiredColumns,
@JsonProperty("specification") Optional<DataOrganizationSpecification> specification)
{
this.argumentName = requireNonNull(argumentName, "argumentName is null");
this.rowSemantics = rowSemantics;
this.pruneWhenEmpty = pruneWhenEmpty;
this.passThroughColumns = passThroughColumns;
this.passThroughSpecification = requireNonNull(passThroughSpecification, "passThroughSpecification is null");
this.requiredColumns = ImmutableList.copyOf(requiredColumns);
this.specification = requireNonNull(specification, "specification is null");
}

@JsonProperty
public String getArgumentName()
{
return argumentName;
}

@JsonProperty
public boolean isRowSemantics()
{
Expand All @@ -164,15 +207,83 @@ public boolean isPruneWhenEmpty()
}

@JsonProperty
public boolean isPassThroughColumns()
public PassThroughSpecification getPassThroughSpecification()
{
return passThroughSpecification;
}

@JsonProperty
public List<VariableReferenceExpression> getRequiredColumns()
{
return passThroughColumns;
return requiredColumns;
}

@JsonProperty
public Optional<DataOrganizationSpecification> specification()
public Optional<DataOrganizationSpecification> getSpecification()
{
return specification;
}
}

/**
* Specifies how columns from source tables are passed through to the output of a table function.
* This class manages both explicitly declared pass-through columns and partitioning columns
* that must be preserved in the output.
*/
public static class PassThroughSpecification
{
private final boolean declaredAsPassThrough;
private final List<PassThroughColumn> columns;

@JsonCreator
public PassThroughSpecification(
@JsonProperty("declaredAsPassThrough") boolean declaredAsPassThrough,
@JsonProperty("columns") List<PassThroughColumn> columns)
{
this.declaredAsPassThrough = declaredAsPassThrough;
this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
checkArgument(
declaredAsPassThrough || this.columns.stream().allMatch(PassThroughColumn::isPartitioningColumn),
"non-partitioning pass-through column for non-pass-through source of a table function");
}

@JsonProperty
public boolean isDeclaredAsPassThrough()
{
return declaredAsPassThrough;
}

@JsonProperty
public List<PassThroughColumn> getColumns()
{
return columns;
}
}

public static class PassThroughColumn
{
private final VariableReferenceExpression variable;
private final boolean isPartitioningColumn;

@JsonCreator
public PassThroughColumn(
@JsonProperty("variable") VariableReferenceExpression variable,
@JsonProperty("partitioningColumn") boolean isPartitioningColumn)
{
this.variable = requireNonNull(variable, "variable is null");
this.isPartitioningColumn = isPartitioningColumn;
}

@JsonProperty
public VariableReferenceExpression getVariable()
{
return variable;
}

@JsonProperty
public boolean isPartitioningColumn()
{
return isPartitioningColumn;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -772,7 +772,8 @@ public void installPlugin(Plugin plugin)
@Override
public void createCatalog(String catalogName, String connectorName, Map<String, String> properties)
{
throw new UnsupportedOperationException();
nodeManager.addCurrentNodeConnector(new ConnectorId(catalogName));
connectorManager.createConnection(catalogName, connectorName, properties);
}

@Override
Expand Down
Loading
Loading